Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions cbc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ entrypoints:
normalized_docs_PG_USER: null
description: Database Output, containing bib_id aswell as the normalized text
type: pg_table
normalized_overwritten_file_output:
config:
normalized_overwritten_file_output_BUCKET_NAME: null
normalized_overwritten_file_output_FILE_EXT: bib
normalized_overwritten_file_output_FILE_NAME: null
normalized_overwritten_file_output_FILE_PATH: null
normalized_overwritten_file_output_S3_ACCESS_KEY: null
normalized_overwritten_file_output_S3_HOST: null
normalized_overwritten_file_output_S3_PORT: null
normalized_overwritten_file_output_S3_SECRET_KEY: null
description: The File Input Overwritten with the normalized output
type: file
preprocess_txt_file:
description: Entrypoint to preprocess a .txt file
envs:
Expand Down Expand Up @@ -69,4 +81,16 @@ entrypoints:
normalized_docs_PG_USER: null
description: Database Output, containing bib_id aswell as the normalized text
type: pg_table
normalized_overwritten_file_output:
config:
normalized_overwritten_file_output_BUCKET_NAME: null
normalized_overwritten_file_output_FILE_EXT: txt
normalized_overwritten_file_output_FILE_NAME: null
normalized_overwritten_file_output_FILE_PATH: null
normalized_overwritten_file_output_S3_ACCESS_KEY: null
normalized_overwritten_file_output_S3_HOST: null
normalized_overwritten_file_output_S3_PORT: null
normalized_overwritten_file_output_S3_SECRET_KEY: null
description: The File Input Overwritten with the normalized output
type: file
name: Language-Preprocessing
110 changes: 60 additions & 50 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
import hashlib
import logging

import pandas as pd
from preprocessing.core import Preprocessor
from preprocessing.loader import BibLoader, TxtLoader
from preprocessing.models import DocumentRecord, PreprocessedDocument
from sqlalchemy import create_engine

from pathlib import Path
from typing import List
from scystream.sdk.core import entrypoint
from scystream.sdk.env.settings import (
EnvSettings,
FileSettings,
InputSettings,
OutputSettings,
FileSettings,
PostgresSettings,
)
from scystream.sdk.file_handling.s3_manager import S3Operations
from sqlalchemy import create_engine
from sqlalchemy.sql import quoted_name

from preprocessing.core import Preprocessor
from preprocessing.loader import TxtLoader, BibLoader
from preprocessing.models import DocumentRecord, PreprocessedDocument

logging.basicConfig(
level=logging.INFO,
Expand All @@ -24,25 +25,15 @@
logger = logging.getLogger(__name__)


def _normalize_table_name(table_name: str) -> str:
max_length = 63
if len(table_name) <= max_length:
return table_name
digest = hashlib.sha1(table_name.encode("utf-8")).hexdigest()[:10]
prefix_length = max_length - len(digest) - 1
return f"{table_name[:prefix_length]}_{digest}"


def _resolve_db_table(settings: PostgresSettings) -> str:
normalized_name = _normalize_table_name(settings.DB_TABLE)
settings.DB_TABLE = normalized_name
return normalized_name


class NormalizedDocsOutput(PostgresSettings, OutputSettings):
__identifier__ = "normalized_docs"


class NormalizedTXTOutput(FileSettings, OutputSettings):
__identifier__ = "normalized_overwritten_file_output"
FILE_EXT: str = "txt"


class TXTFileInput(FileSettings, InputSettings):
__identifier__ = "txt_file"
FILE_EXT: str = "txt"
Expand All @@ -55,6 +46,11 @@ class BIBFileInput(FileSettings, InputSettings):
SELECTED_ATTRIBUTE: str = "Abstract"


class NormalizedBIBOutput(FileSettings, OutputSettings):
__identifier__ = "normalized_overwritten_file_output"
FILE_EXT: str = "bib"


class PreprocessTXT(EnvSettings):
LANGUAGE: str = "en"
FILTER_STOPWORDS: bool = True
Expand All @@ -67,6 +63,7 @@ class PreprocessTXT(EnvSettings):

txt_input: TXTFileInput
normalized_docs_output: NormalizedDocsOutput
normalized_overwritten_file_output: NormalizedTXTOutput


class PreprocessBIB(EnvSettings):
Expand All @@ -81,44 +78,38 @@ class PreprocessBIB(EnvSettings):

bib_input: BIBFileInput
normalized_docs_output: NormalizedDocsOutput
normalized_overwritten_file_output: NormalizedBIBOutput


def _write_preprocessed_docs_to_postgres(
preprocessed_ouput: list[PreprocessedDocument],
settings: PostgresSettings,
preprocessed_ouput: List[PreprocessedDocument], settings: PostgresSettings
):
resolved_table_name = _resolve_db_table(settings)
df = pd.DataFrame(
[
{
"doc_id": d.doc_id,
"tokens": d.tokens,
}
for d in preprocessed_ouput
],
[{"doc_id": d.doc_id, "tokens": d.tokens} for d in preprocessed_ouput]
)

logger.info(
"Writing %s processed documents to DB table '%s'…",
len(df),
resolved_table_name,
settings.DB_TABLE,
)
engine = create_engine(
f"postgresql+psycopg2://{settings.PG_USER}:{settings.PG_PASS}"
f"@{settings.PG_HOST}:{int(settings.PG_PORT)}/",
)

table_name = quoted_name(resolved_table_name, quote=True)
df.to_sql(table_name, engine, if_exists="replace", index=False)
df.to_sql(settings.DB_TABLE, engine, if_exists="replace", index=False)

logger.info(
"Successfully stored normalized documents into '%s'.",
resolved_table_name,
f"Successfully stored normalized documents into '{settings.DB_TABLE}'."
)


def _preprocess_and_store(documents: list[DocumentRecord], settings):
"""Shared preprocessing logic for TXT and BIB."""
def _preprocess_and_store(
documents: List[DocumentRecord],
overwrite_callback,
settings,
) -> List[PreprocessedDocument]:

logger.info(f"Starting preprocessing with {len(documents)} documents")

pre = Preprocessor(
Expand All @@ -134,30 +125,49 @@ def _preprocess_and_store(documents: list[DocumentRecord], settings):
result = pre.generate_normalized_output()

_write_preprocessed_docs_to_postgres(
result,
settings.normalized_docs_output,
result, settings.normalized_docs_output
)

# Overwrite file using injected behavior
export_path = Path(
f"output.{settings.normalized_overwritten_file_output.FILE_EXT}"
)
overwrite_callback(result, export_path)

S3Operations.upload(
settings.normalized_overwritten_file_output, export_path
)

logger.info("Preprocessing completed successfully.")
return result


@entrypoint(PreprocessTXT)
def preprocess_txt_file(settings):
logger.info("Downloading TXT input from S3...")
logger.info("Downloading TXT file...")
S3Operations.download(settings.txt_input, settings.TXT_DOWNLOAD_PATH)

texts = TxtLoader.load(settings.TXT_DOWNLOAD_PATH)
documents = TxtLoader.load(settings.TXT_DOWNLOAD_PATH)

_preprocess_and_store(texts, settings)
_preprocess_and_store(
documents=documents,
overwrite_callback=TxtLoader.overwrite_with_results,
settings=settings,
)


@entrypoint(PreprocessBIB)
def preprocess_bib_file(settings):
logger.info("Downloading BIB input from S3...")
logger.info("Downloading BIB file...")
S3Operations.download(settings.bib_input, settings.BIB_DOWNLOAD_PATH)

texts = BibLoader.load(
settings.BIB_DOWNLOAD_PATH,
loader = BibLoader(
file_path=settings.BIB_DOWNLOAD_PATH,
attribute=settings.bib_input.SELECTED_ATTRIBUTE,
)
_preprocess_and_store(texts, settings)

_preprocess_and_store(
documents=loader.document_records,
overwrite_callback=loader.overwrite_with_results,
settings=settings,
)
25 changes: 9 additions & 16 deletions preprocessing/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
from nltk.stem.porter import PorterStemmer
from preprocessing.models import PreprocessedDocument, DocumentRecord

LANG_TO_SPACY_MODELS = {
"en": "en_core_web_sm",
"de": "de_core_news_sm"
}
LANG_TO_SPACY_MODELS = {"en": "en_core_web_sm", "de": "de_core_news_sm"}
logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -45,12 +42,11 @@ def __init__(
self.documents: List[DocumentRecord] = []

def filter_tokens(
self,
tokens: list[spacy.tokens.Token],
filter_stopwords: bool = False
self, tokens: list[spacy.tokens.Token], filter_stopwords: bool = False
) -> list[spacy.tokens.Token]:
return [
t for t in tokens
t
for t in tokens
if t.is_alpha
and (not filter_stopwords or not t.is_stop)
and len(t.text) > 2
Expand Down Expand Up @@ -80,20 +76,17 @@ def generate_normalized_output(self) -> List[PreprocessedDocument]:
if self.use_ngrams and self.ngram_min > 1:
for n in range(self.ngram_min, self.ngram_max + 1):
for i in range(len(normalized) - n + 1):
ngram = " ".join(normalized[i:i+n])
ngram = " ".join(normalized[i:i + n]) # fmt: off
doc_terms.append(ngram)

processed_docs.append(PreprocessedDocument(
doc_id=record.doc_id,
tokens=doc_terms
))
processed_docs.append(
PreprocessedDocument(doc_id=record.doc_id, tokens=doc_terms)
)

return processed_docs

def normalize_token(
self,
token: spacy.tokens.Token,
porter: PorterStemmer
self, token: spacy.tokens.Token, porter: PorterStemmer
):
"""Apply lemma or stem normalization."""
word = token.text.lower() if not token.text.isupper() else token.text
Expand Down
Loading
Loading