Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,52 +40,20 @@
# https://docs.kedro.org/en/stable/data/data_catalog.html

companies:
type: pandas.CSVDataset
filepath: data/01_raw/companies.csv
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about when the input data isn't coming from a local place? I'd imagine that if you have massive data you want all of it to be handled by Spark and not just the intermediate results.

type: spark.SparkDataset
file_format: csv
load_args:
header: True
inferSchema: True
save_args:
sep: ','
header: True
mode: overwrite

reviews:
type: pandas.CSVDataset
filepath: data/01_raw/reviews.csv
type: spark.SparkDataset
file_format: csv
load_args:
header: True
inferSchema: True
save_args:
sep: ','
header: True
mode: overwrite

shuttles_excel:
shuttles:
type: pandas.ExcelDataset
filepath: data/01_raw/shuttles.xlsx

shuttles@csv:
type: pandas.CSVDataset
filepath: data/02_intermediate/shuttles.csv

shuttles@spark:
filepath: data/02_intermediate/shuttles.csv
type: spark.SparkDataset
file_format: csv
load_args:
header: True
inferSchema: True
save_args:
sep: ','
header: True
mode: overwrite

preprocessed_companies:
filepath: data/02_intermediate/preprocessed_companies.csv
type: spark.SparkDataset
type: spark.SparkDatasetV2
file_format: csv
load_args:
header: True
Expand All @@ -97,7 +65,7 @@ preprocessed_companies:

preprocessed_shuttles:
filepath: data/02_intermediate/preprocessed_shuttles.csv
type: spark.SparkDataset
type: spark.SparkDatasetV2
file_format: csv
load_args:
header: True
Expand All @@ -109,7 +77,7 @@ preprocessed_shuttles:

preprocessed_reviews:
filepath: data/02_intermediate/preprocessed_reviews.csv
type: spark.SparkDataset
type: spark.SparkDatasetV2
file_format: csv
load_args:
header: True
Expand All @@ -121,7 +89,7 @@ preprocessed_reviews:

model_input_table@spark:
filepath: data/03_primary/model_input_table.parquet
type: spark.SparkDataset
type: spark.SparkDatasetV2
file_format: parquet
load_args:
header: True
Expand All @@ -139,28 +107,3 @@ regressor:
type: pickle.PickleDataset
filepath: data/06_models/regressor.pickle
versioned: true

shuttle_passenger_capacity_plot_exp:
type: plotly.PlotlyDataset
filepath: data/08_reporting/shuttle_passenger_capacity_plot_exp.json
versioned: true
plotly_args:
type: bar
fig:
x: shuttle_type
y: passenger_capacity
orientation: h
layout:
xaxis_title: Shuttles
yaxis_title: Average passenger capacity
title: Shuttle Passenger capacity

shuttle_passenger_capacity_plot_go:
type: plotly.JSONDataset
filepath: data/08_reporting/shuttle_passenger_capacity_plot_go.json
versioned: true

dummy_confusion_matrix:
type: matplotlib.MatplotlibDataset
filepath: data/08_reporting/dummy_confusion_matrix.png
versioned: true

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ dependencies = [
"jupyterlab>=3.0",
"notebook",
"kedro[jupyter]~={{ cookiecutter.kedro_version }}",
"kedro-datasets[pandas-csvdataset, pandas-exceldataset, pandas-parquetdataset, spark-sparkdataset, plotly-plotlydataset, plotly-jsondataset, matplotlib-matplotlibdataset]>=3.0",
"kedro-datasets[pandas-csvdataset, pandas-exceldataset, pandas-parquetdataset, plotly-plotlydataset, plotly-jsondataset, matplotlib-matplotlibdataset]>=3.0",
# Temporary override spark dataset via GitHub
"kedro-datasets @ git+https://github.com/kedro-org/kedro-plugins.git@main#subdirectory=kedro-datasets",
# Temporary manually install spark extras
"pyspark>=2.2,<5.0",
"kedro-viz>=6.7.0",
"scikit-learn~=1.5.1",
"seaborn~=0.12.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ ipython>=8.10
jupyterlab>=3.0
notebook
kedro[jupyter]~={{ cookiecutter.kedro_version }}
kedro-datasets[pandas-csvdataset, pandas-exceldataset, pandas-parquetdataset, spark-sparkdataset, plotly-plotlydataset, plotly-jsondataset, matplotlib-matplotlibdataset]>=3.0
kedro-datasets[pandas-csvdataset, pandas-exceldataset, pandas-parquetdataset, plotly-plotlydataset, plotly-jsondataset, matplotlib-matplotlibdataset]>=3.0
# Temporary(before plugins release) install SparkDatasetV2 from main branch (overrides PyPI package) and pyspark.
git+https://github.com/kedro-org/kedro-plugins.git@main#subdirectory=kedro-datasets
pyspark>=2.2,<5.0

kedro-viz>=6.7.0
scikit-learn~=1.5.1
seaborn~=0.12.1
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,28 +1,24 @@
import pandas as pd
from pyspark.sql import Column
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql.functions import regexp_replace
from pyspark.sql.types import DoubleType


def _is_true(x: Column) -> Column:
def _is_true(x: pd.Series) -> pd.Series:
return x == "t"


def _parse_percentage(x: Column) -> Column:
x = regexp_replace(x, "%", "")
x = x.cast("float") / 100
def _parse_percentage(x: pd.Series) -> pd.Series:
x = x.str.replace("%", "")
x = x.astype(float) / 100
return x


def _parse_money(x: Column) -> Column:
x = regexp_replace(x, "[$£€]", "")
x = regexp_replace(x, ",", "")
x = x.cast(DoubleType())
def _parse_money(x: pd.Series) -> pd.Series:
x = x.str.replace("$", "").str.replace(",", "")
x = x.astype(float)
return x


def preprocess_companies(companies: SparkDataFrame) -> tuple[SparkDataFrame, dict]:
def preprocess_companies(companies: pd.DataFrame) -> pd.DataFrame:
"""Preprocesses the data for companies.

Args:
Expand All @@ -31,21 +27,12 @@ def preprocess_companies(companies: SparkDataFrame) -> tuple[SparkDataFrame, dic
Preprocessed data, with `company_rating` converted to a float and
`iata_approved` converted to boolean.
"""
companies = companies.withColumn("iata_approved", _is_true(companies.iata_approved))
companies = companies.withColumn("company_rating", _parse_percentage(companies.company_rating))

# Drop columns that aren't used for model training
companies = companies.drop('company_location', 'total_fleet_count')
companies["iata_approved"] = _is_true(companies["iata_approved"])
companies["company_rating"] = _parse_percentage(companies["company_rating"])
return companies


def load_shuttles_to_csv(shuttles: pd.DataFrame) -> pd.DataFrame:
"""Load shuttles to csv because it's not possible to load excel directly into spark.
"""
return shuttles


def preprocess_shuttles(shuttles: SparkDataFrame) -> SparkDataFrame:
def preprocess_shuttles(shuttles: pd.DataFrame) -> pd.DataFrame:
"""Preprocesses the data for shuttles.

Args:
Expand All @@ -54,19 +41,25 @@ def preprocess_shuttles(shuttles: SparkDataFrame) -> SparkDataFrame:
Preprocessed data, with `price` converted to a float and `d_check_complete`,
`moon_clearance_complete` converted to boolean.
"""
shuttles = shuttles.withColumn("d_check_complete", _is_true(shuttles.d_check_complete))
shuttles = shuttles.withColumn("moon_clearance_complete", _is_true(shuttles.moon_clearance_complete))
shuttles = shuttles.withColumn("price", _parse_money(shuttles.price))

# Drop columns that aren't used for model training
shuttles = shuttles.drop('shuttle_location', 'engine_type', 'engine_vendor', 'cancellation_policy')
shuttles["d_check_complete"] = _is_true(shuttles["d_check_complete"])
shuttles["moon_clearance_complete"] = _is_true(shuttles["moon_clearance_complete"])
shuttles["price"] = _parse_money(shuttles["price"])
return shuttles


def preprocess_reviews(reviews: SparkDataFrame) -> SparkDataFrame:
def preprocess_reviews(reviews: pd.DataFrame) -> pd.DataFrame:
# Drop columns that aren't used for model training
reviews = reviews.drop('review_scores_comfort', 'review_scores_amenities', 'review_scores_trip', 'review_scores_crew', 'review_scores_location', 'review_scores_price', 'number_of_reviews', 'reviews_per_month')
return reviews
cols_to_drop = [
'review_scores_comfort',
'review_scores_amenities',
'review_scores_trip',
'review_scores_crew',
'review_scores_location',
'review_scores_price',
'number_of_reviews',
'reviews_per_month',
]
return reviews.drop(columns=cols_to_drop, errors="ignore")


def create_model_input_table(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from .nodes import (
create_model_input_table,
load_shuttles_to_csv,
preprocess_companies,
preprocess_reviews,
preprocess_shuttles,
Expand All @@ -12,12 +11,6 @@
def create_pipeline(**kwargs) -> Pipeline:
return Pipeline(
[
Node(
func=load_shuttles_to_csv,
inputs="shuttles_excel",
outputs="shuttles@csv",
name="load_shuttles_to_csv_node",
),
Node(
func=preprocess_companies,
inputs="companies",
Expand All @@ -26,7 +19,7 @@ def create_pipeline(**kwargs) -> Pipeline:
),
Node(
func=preprocess_shuttles,
inputs="shuttles@spark",
inputs="shuttles",
outputs="preprocessed_shuttles",
name="preprocess_shuttles_node",
),
Expand Down

This file was deleted.

This file was deleted.

Loading
Loading