Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,52 +40,20 @@
# https://docs.kedro.org/en/stable/data/data_catalog.html

companies:
type: pandas.CSVDataset
filepath: data/01_raw/companies.csv
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about when the input data isn't coming from a local place? I'd imagine that if you have massive data you want all of it to be handled by Spark and not just the intermediate results.

type: spark.SparkDataset
file_format: csv
load_args:
header: True
inferSchema: True
save_args:
sep: ','
header: True
mode: overwrite

reviews:
type: pandas.CSVDataset
filepath: data/01_raw/reviews.csv
type: spark.SparkDataset
file_format: csv
load_args:
header: True
inferSchema: True
save_args:
sep: ','
header: True
mode: overwrite

shuttles_excel:
shuttles:
type: pandas.ExcelDataset
filepath: data/01_raw/shuttles.xlsx

shuttles@csv:
type: pandas.CSVDataset
filepath: data/02_intermediate/shuttles.csv

shuttles@spark:
filepath: data/02_intermediate/shuttles.csv
type: spark.SparkDataset
file_format: csv
load_args:
header: True
inferSchema: True
save_args:
sep: ','
header: True
mode: overwrite

preprocessed_companies:
filepath: data/02_intermediate/preprocessed_companies.csv
type: spark.SparkDataset
type: spark.SparkDatasetV2
file_format: csv
load_args:
header: True
Expand All @@ -97,7 +65,7 @@ preprocessed_companies:

preprocessed_shuttles:
filepath: data/02_intermediate/preprocessed_shuttles.csv
type: spark.SparkDataset
type: spark.SparkDatasetV2
file_format: csv
load_args:
header: True
Expand All @@ -109,7 +77,7 @@ preprocessed_shuttles:

preprocessed_reviews:
filepath: data/02_intermediate/preprocessed_reviews.csv
type: spark.SparkDataset
type: spark.SparkDatasetV2
file_format: csv
load_args:
header: True
Expand All @@ -121,7 +89,7 @@ preprocessed_reviews:

model_input_table@spark:
filepath: data/03_primary/model_input_table.parquet
type: spark.SparkDataset
type: spark.SparkDatasetV2
file_format: parquet
load_args:
header: True
Expand All @@ -139,28 +107,3 @@ regressor:
type: pickle.PickleDataset
filepath: data/06_models/regressor.pickle
versioned: true

shuttle_passenger_capacity_plot_exp:
type: plotly.PlotlyDataset
filepath: data/08_reporting/shuttle_passenger_capacity_plot_exp.json
versioned: true
plotly_args:
type: bar
fig:
x: shuttle_type
y: passenger_capacity
orientation: h
layout:
xaxis_title: Shuttles
yaxis_title: Average passenger capacity
title: Shuttle Passenger capacity

shuttle_passenger_capacity_plot_go:
type: plotly.JSONDataset
filepath: data/08_reporting/shuttle_passenger_capacity_plot_go.json
versioned: true

dummy_confusion_matrix:
type: matplotlib.MatplotlibDataset
filepath: data/08_reporting/dummy_confusion_matrix.png
versioned: true
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ ipython>=8.10
jupyterlab>=3.0
notebook
kedro[jupyter]~={{ cookiecutter.kedro_version }}
kedro-datasets[pandas-csvdataset, pandas-exceldataset, pandas-parquetdataset, spark-sparkdataset, plotly-plotlydataset, plotly-jsondataset, matplotlib-matplotlibdataset]>=3.0
kedro-datasets[pandas-csvdataset, pandas-exceldataset, pandas-parquetdataset, plotly-plotlydataset, plotly-jsondataset, matplotlib-matplotlibdataset]>=3.0
# Temporary(before plugins release) install SparkDatasetV2 from main branch (overrides PyPI package) and pyspark.
git+https://github.com/kedro-org/kedro-plugins.git@main#subdirectory=kedro-datasets
pyspark>=2.2,<5.0

kedro-viz>=6.7.0
scikit-learn~=1.5.1
seaborn~=0.12.1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,24 @@
import pandas as pd
from pyspark.sql import Column
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql.functions import regexp_replace
from pyspark.sql.types import DoubleType


def _is_true(x: Column) -> Column:
def _is_true(x: pd.Series) -> pd.Series:
return x == "t"


def _parse_percentage(x: Column) -> Column:
x = regexp_replace(x, "%", "")
x = x.cast("float") / 100
def _parse_percentage(x: pd.Series) -> pd.Series:
x = x.str.replace("%", "")
x = x.astype(float) / 100
return x


def _parse_money(x: Column) -> Column:
x = regexp_replace(x, "[$£€]", "")
x = regexp_replace(x, ",", "")
x = x.cast(DoubleType())
def _parse_money(x: pd.Series) -> pd.Series:
x = x.str.replace("$", "").str.replace(",", "")
x = x.astype(float)
return x


def preprocess_companies(companies: SparkDataFrame) -> tuple[SparkDataFrame, dict]:
def preprocess_companies(companies: pd.DataFrame) -> pd.DataFrame:
"""Preprocesses the data for companies.

Args:
Expand All @@ -31,21 +27,12 @@ def preprocess_companies(companies: SparkDataFrame) -> tuple[SparkDataFrame, dic
Preprocessed data, with `company_rating` converted to a float and
`iata_approved` converted to boolean.
"""
companies = companies.withColumn("iata_approved", _is_true(companies.iata_approved))
companies = companies.withColumn("company_rating", _parse_percentage(companies.company_rating))

# Drop columns that aren't used for model training
companies = companies.drop('company_location', 'total_fleet_count')
companies["iata_approved"] = _is_true(companies["iata_approved"])
companies["company_rating"] = _parse_percentage(companies["company_rating"])
return companies


def load_shuttles_to_csv(shuttles: pd.DataFrame) -> pd.DataFrame:
"""Load shuttles to csv because it's not possible to load excel directly into spark.
"""
return shuttles


def preprocess_shuttles(shuttles: SparkDataFrame) -> SparkDataFrame:
def preprocess_shuttles(shuttles: pd.DataFrame) -> pd.DataFrame:
"""Preprocesses the data for shuttles.

Args:
Expand All @@ -54,19 +41,25 @@ def preprocess_shuttles(shuttles: SparkDataFrame) -> SparkDataFrame:
Preprocessed data, with `price` converted to a float and `d_check_complete`,
`moon_clearance_complete` converted to boolean.
"""
shuttles = shuttles.withColumn("d_check_complete", _is_true(shuttles.d_check_complete))
shuttles = shuttles.withColumn("moon_clearance_complete", _is_true(shuttles.moon_clearance_complete))
shuttles = shuttles.withColumn("price", _parse_money(shuttles.price))

# Drop columns that aren't used for model training
shuttles = shuttles.drop('shuttle_location', 'engine_type', 'engine_vendor', 'cancellation_policy')
shuttles["d_check_complete"] = _is_true(shuttles["d_check_complete"])
shuttles["moon_clearance_complete"] = _is_true(shuttles["moon_clearance_complete"])
shuttles["price"] = _parse_money(shuttles["price"])
return shuttles


def preprocess_reviews(reviews: SparkDataFrame) -> SparkDataFrame:
def preprocess_reviews(reviews: pd.DataFrame) -> pd.DataFrame:
# Drop columns that aren't used for model training
reviews = reviews.drop('review_scores_comfort', 'review_scores_amenities', 'review_scores_trip', 'review_scores_crew', 'review_scores_location', 'review_scores_price', 'number_of_reviews', 'reviews_per_month')
return reviews
cols_to_drop = [
'review_scores_comfort',
'review_scores_amenities',
'review_scores_trip',
'review_scores_crew',
'review_scores_location',
'review_scores_price',
'number_of_reviews',
'reviews_per_month',
]
return reviews.drop(columns=cols_to_drop, errors="ignore")


def create_model_input_table(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from .nodes import (
create_model_input_table,
load_shuttles_to_csv,
preprocess_companies,
preprocess_reviews,
preprocess_shuttles,
Expand All @@ -12,12 +11,6 @@
def create_pipeline(**kwargs) -> Pipeline:
return Pipeline(
[
Node(
func=load_shuttles_to_csv,
inputs="shuttles_excel",
outputs="shuttles@csv",
name="load_shuttles_to_csv_node",
),
Node(
func=preprocess_companies,
inputs="companies",
Expand All @@ -26,7 +19,7 @@ def create_pipeline(**kwargs) -> Pipeline:
),
Node(
func=preprocess_shuttles,
inputs="shuttles@spark",
inputs="shuttles",
outputs="preprocessed_shuttles",
name="preprocess_shuttles_node",
),
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
https://docs.kedro.org/en/stable/kedro_project_setup/settings.html."""

# Instantiated project hooks.
from {{cookiecutter.python_package}}.hooks import SparkHooks # noqa: E402

# Hooks are executed in a Last-In-First-Out (LIFO) order.
HOOKS = (SparkHooks(),)
HOOKS = ()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the hooks.py file now right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also spark.yml is unnecessary for SparkDatasetV2 that file is only useful if using SparkHooks` maybe we should also remove it or add some comment that it's optional/for advanced configuration.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fully agree, I thought I removed them, fixed it now.


# Installed plugins for which to disable hook auto-registration.
# DISABLE_HOOKS_FOR_PLUGINS = ("kedro-viz",)
Expand Down
Loading