diff --git a/cbc.yaml b/cbc.yaml index 4df19d8..a3d61ab 100644 --- a/cbc.yaml +++ b/cbc.yaml @@ -36,6 +36,18 @@ entrypoints: normalized_docs_PG_USER: null description: Database Output, containing bib_id aswell as the normalized text type: pg_table + normalized_overwritten_file_output: + config: + normalized_overwritten_file_output_BUCKET_NAME: null + normalized_overwritten_file_output_FILE_EXT: bib + normalized_overwritten_file_output_FILE_NAME: null + normalized_overwritten_file_output_FILE_PATH: null + normalized_overwritten_file_output_S3_ACCESS_KEY: null + normalized_overwritten_file_output_S3_HOST: null + normalized_overwritten_file_output_S3_PORT: null + normalized_overwritten_file_output_S3_SECRET_KEY: null + description: The File Input Overwritten with the normalized output + type: file preprocess_txt_file: description: Entrypoint to preprocess a .txt file envs: @@ -69,4 +81,16 @@ entrypoints: normalized_docs_PG_USER: null description: Database Output, containing bib_id aswell as the normalized text type: pg_table + normalized_overwritten_file_output: + config: + normalized_overwritten_file_output_BUCKET_NAME: null + normalized_overwritten_file_output_FILE_EXT: txt + normalized_overwritten_file_output_FILE_NAME: null + normalized_overwritten_file_output_FILE_PATH: null + normalized_overwritten_file_output_S3_ACCESS_KEY: null + normalized_overwritten_file_output_S3_HOST: null + normalized_overwritten_file_output_S3_PORT: null + normalized_overwritten_file_output_S3_SECRET_KEY: null + description: The File Input Overwritten with the normalized output + type: file name: Language-Preprocessing diff --git a/main.py b/main.py index b1ec208..24e3350 100644 --- a/main.py +++ b/main.py @@ -1,21 +1,22 @@ -import hashlib import logging - import pandas as pd -from preprocessing.core import Preprocessor -from preprocessing.loader import BibLoader, TxtLoader -from preprocessing.models import DocumentRecord, PreprocessedDocument +from sqlalchemy import create_engine + +from pathlib import Path +from typing import List from scystream.sdk.core import entrypoint from scystream.sdk.env.settings import ( EnvSettings, - FileSettings, InputSettings, OutputSettings, + FileSettings, PostgresSettings, ) from scystream.sdk.file_handling.s3_manager import S3Operations -from sqlalchemy import create_engine -from sqlalchemy.sql import quoted_name + +from preprocessing.core import Preprocessor +from preprocessing.loader import TxtLoader, BibLoader +from preprocessing.models import DocumentRecord, PreprocessedDocument logging.basicConfig( level=logging.INFO, @@ -24,25 +25,15 @@ logger = logging.getLogger(__name__) -def _normalize_table_name(table_name: str) -> str: - max_length = 63 - if len(table_name) <= max_length: - return table_name - digest = hashlib.sha1(table_name.encode("utf-8")).hexdigest()[:10] - prefix_length = max_length - len(digest) - 1 - return f"{table_name[:prefix_length]}_{digest}" - - -def _resolve_db_table(settings: PostgresSettings) -> str: - normalized_name = _normalize_table_name(settings.DB_TABLE) - settings.DB_TABLE = normalized_name - return normalized_name - - class NormalizedDocsOutput(PostgresSettings, OutputSettings): __identifier__ = "normalized_docs" +class NormalizedTXTOutput(FileSettings, OutputSettings): + __identifier__ = "normalized_overwritten_file_output" + FILE_EXT: str = "txt" + + class TXTFileInput(FileSettings, InputSettings): __identifier__ = "txt_file" FILE_EXT: str = "txt" @@ -55,6 +46,11 @@ class BIBFileInput(FileSettings, InputSettings): SELECTED_ATTRIBUTE: str = "Abstract" +class NormalizedBIBOutput(FileSettings, OutputSettings): + __identifier__ = "normalized_overwritten_file_output" + FILE_EXT: str = "bib" + + class PreprocessTXT(EnvSettings): LANGUAGE: str = "en" FILTER_STOPWORDS: bool = True @@ -67,6 +63,7 @@ class PreprocessTXT(EnvSettings): txt_input: TXTFileInput normalized_docs_output: NormalizedDocsOutput + normalized_overwritten_file_output: NormalizedTXTOutput class PreprocessBIB(EnvSettings): @@ -81,44 +78,38 @@ class PreprocessBIB(EnvSettings): bib_input: BIBFileInput normalized_docs_output: NormalizedDocsOutput + normalized_overwritten_file_output: NormalizedBIBOutput def _write_preprocessed_docs_to_postgres( - preprocessed_ouput: list[PreprocessedDocument], - settings: PostgresSettings, + preprocessed_ouput: List[PreprocessedDocument], settings: PostgresSettings ): - resolved_table_name = _resolve_db_table(settings) df = pd.DataFrame( - [ - { - "doc_id": d.doc_id, - "tokens": d.tokens, - } - for d in preprocessed_ouput - ], + [{"doc_id": d.doc_id, "tokens": d.tokens} for d in preprocessed_ouput] ) logger.info( "Writing %s processed documents to DB table '%s'…", len(df), - resolved_table_name, + settings.DB_TABLE, ) engine = create_engine( f"postgresql+psycopg2://{settings.PG_USER}:{settings.PG_PASS}" f"@{settings.PG_HOST}:{int(settings.PG_PORT)}/", ) - - table_name = quoted_name(resolved_table_name, quote=True) - df.to_sql(table_name, engine, if_exists="replace", index=False) + df.to_sql(settings.DB_TABLE, engine, if_exists="replace", index=False) logger.info( - "Successfully stored normalized documents into '%s'.", - resolved_table_name, + f"Successfully stored normalized documents into '{settings.DB_TABLE}'." ) -def _preprocess_and_store(documents: list[DocumentRecord], settings): - """Shared preprocessing logic for TXT and BIB.""" +def _preprocess_and_store( + documents: List[DocumentRecord], + overwrite_callback, + settings, +) -> List[PreprocessedDocument]: + logger.info(f"Starting preprocessing with {len(documents)} documents") pre = Preprocessor( @@ -134,30 +125,49 @@ def _preprocess_and_store(documents: list[DocumentRecord], settings): result = pre.generate_normalized_output() _write_preprocessed_docs_to_postgres( - result, - settings.normalized_docs_output, + result, settings.normalized_docs_output + ) + + # Overwrite file using injected behavior + export_path = Path( + f"output.{settings.normalized_overwritten_file_output.FILE_EXT}" + ) + overwrite_callback(result, export_path) + + S3Operations.upload( + settings.normalized_overwritten_file_output, export_path ) logger.info("Preprocessing completed successfully.") + return result @entrypoint(PreprocessTXT) def preprocess_txt_file(settings): - logger.info("Downloading TXT input from S3...") + logger.info("Downloading TXT file...") S3Operations.download(settings.txt_input, settings.TXT_DOWNLOAD_PATH) - texts = TxtLoader.load(settings.TXT_DOWNLOAD_PATH) + documents = TxtLoader.load(settings.TXT_DOWNLOAD_PATH) - _preprocess_and_store(texts, settings) + _preprocess_and_store( + documents=documents, + overwrite_callback=TxtLoader.overwrite_with_results, + settings=settings, + ) @entrypoint(PreprocessBIB) def preprocess_bib_file(settings): - logger.info("Downloading BIB input from S3...") + logger.info("Downloading BIB file...") S3Operations.download(settings.bib_input, settings.BIB_DOWNLOAD_PATH) - texts = BibLoader.load( - settings.BIB_DOWNLOAD_PATH, + loader = BibLoader( + file_path=settings.BIB_DOWNLOAD_PATH, attribute=settings.bib_input.SELECTED_ATTRIBUTE, ) - _preprocess_and_store(texts, settings) + + _preprocess_and_store( + documents=loader.document_records, + overwrite_callback=loader.overwrite_with_results, + settings=settings, + ) diff --git a/preprocessing/core.py b/preprocessing/core.py index f819839..b2fe866 100644 --- a/preprocessing/core.py +++ b/preprocessing/core.py @@ -5,10 +5,7 @@ from nltk.stem.porter import PorterStemmer from preprocessing.models import PreprocessedDocument, DocumentRecord -LANG_TO_SPACY_MODELS = { - "en": "en_core_web_sm", - "de": "de_core_news_sm" -} +LANG_TO_SPACY_MODELS = {"en": "en_core_web_sm", "de": "de_core_news_sm"} logger = logging.getLogger(__name__) @@ -45,12 +42,11 @@ def __init__( self.documents: List[DocumentRecord] = [] def filter_tokens( - self, - tokens: list[spacy.tokens.Token], - filter_stopwords: bool = False + self, tokens: list[spacy.tokens.Token], filter_stopwords: bool = False ) -> list[spacy.tokens.Token]: return [ - t for t in tokens + t + for t in tokens if t.is_alpha and (not filter_stopwords or not t.is_stop) and len(t.text) > 2 @@ -80,20 +76,17 @@ def generate_normalized_output(self) -> List[PreprocessedDocument]: if self.use_ngrams and self.ngram_min > 1: for n in range(self.ngram_min, self.ngram_max + 1): for i in range(len(normalized) - n + 1): - ngram = " ".join(normalized[i:i+n]) + ngram = " ".join(normalized[i:i + n]) # fmt: off doc_terms.append(ngram) - processed_docs.append(PreprocessedDocument( - doc_id=record.doc_id, - tokens=doc_terms - )) + processed_docs.append( + PreprocessedDocument(doc_id=record.doc_id, tokens=doc_terms) + ) return processed_docs def normalize_token( - self, - token: spacy.tokens.Token, - porter: PorterStemmer + self, token: spacy.tokens.Token, porter: PorterStemmer ): """Apply lemma or stem normalization.""" word = token.text.lower() if not token.text.isupper() else token.text diff --git a/preprocessing/loader.py b/preprocessing/loader.py index ecb5193..d34e36c 100644 --- a/preprocessing/loader.py +++ b/preprocessing/loader.py @@ -1,8 +1,10 @@ import logging import re import bibtexparser +from typing import List +from pathlib import Path -from preprocessing.models import DocumentRecord +from preprocessing.models import DocumentRecord, PreprocessedDocument logger = logging.getLogger(__name__) @@ -12,15 +14,10 @@ def normalize_text(text: str) -> str: return "" text = re.sub(r"\\[a-zA-Z]+\{([^}]*)\}", r"\1", text) - text = re.sub(r"\\[a-zA-Z]+", "", text) - text = re.sub(r"[{}]", "", text) - - text = re.sub(r'\\"([a-zA-Z])', r'\1', text) - + text = re.sub(r'\\"([a-zA-Z])', r"\1", text) text = re.sub(r"\\'", "", text) - text = re.sub(r"\s+", " ", text) return text.strip() @@ -33,41 +30,84 @@ def load(file_path: str) -> list[DocumentRecord]: lines = f.readlines() return [ - DocumentRecord( - doc_id=str(i), - text=normalize_text(line) - ) + DocumentRecord(doc_id=str(i), text=normalize_text(line)) for i, line in enumerate(lines, start=1) ] + @staticmethod + def overwrite_with_results( + preprocessed_docs: List[PreprocessedDocument], + export_path: Path, + ) -> None: + logger.info("Writing preprocessed TXT file...") + + output_path = Path.cwd() / export_path.name + + # Ensure correct order (IDs are numeric strings) + sorted_docs = sorted(preprocessed_docs, key=lambda d: int(d.doc_id)) + + with open(output_path, "w", encoding="utf-8") as f: + for doc in sorted_docs: + line = " ".join(doc.tokens) + f.write(line + "\n") + + logger.info(f"TXT file successfully written to: {output_path}") + class BibLoader: - @staticmethod - def load(file_path: str, attribute: str) -> list[DocumentRecord]: + def __init__(self, file_path: str, attribute: str): logger.info(f"Loading BIB file (attribute={attribute})...") with open(file_path, "r", encoding="utf-8") as f: - bib_database = bibtexparser.load(f) - - results = [] - attribute_lower = attribute.lower() - - for entry in bib_database.entries: - bib_id = ( - entry.get("id") - or entry.get("ID") - or entry.get("citekey") - or entry.get("entrykey") - or entry.get("Unique-ID") - or "UNKNOWN_ID" - ) - - raw_value = entry.get(attribute_lower, "") + self.bib_db = bibtexparser.load(f) + + self.file_path = file_path + self.attribute = attribute.lower() + + self.document_records = self._build_document_records() + + @staticmethod + def _extract_bib_id(entry: dict) -> str: + return ( + entry.get("id") + or entry.get("ID") + or entry.get("citekey") + or entry.get("entrykey") + or entry.get("Unique-ID") + or "UNKNOWN_ID" + ) + + def _build_document_records(self) -> List[DocumentRecord]: + records = [] + + for entry in self.bib_db.entries: + bib_id = self._extract_bib_id(entry) + raw_value = entry.get(self.attribute, "") normalized = normalize_text(raw_value) - results.append(DocumentRecord( - doc_id=bib_id, - text=normalized - )) + records.append(DocumentRecord(doc_id=bib_id, text=normalized)) + + return records + + def overwrite_with_results( + self, preprocessed_docs: List[PreprocessedDocument], export_path: Path + ) -> None: + logger.info("Overwriting input documents with preprocessed text...") + + output_path = Path.cwd() / export_path.name + + preprocessed_dict = {doc.doc_id: doc for doc in preprocessed_docs} + + for entry in self.bib_db.entries: + bib_id = self._extract_bib_id(entry) + preprocessed = preprocessed_dict.get(bib_id) + + if not preprocessed: + continue + + entry[self.attribute] = " ".join(preprocessed.tokens) + + with open(output_path, "w", encoding="utf-8") as f: + bibtexparser.dump(self.bib_db, f) - return results + logger.info(f"BIB file successfully written to: {output_path}") diff --git a/test/test_full.py b/test/test_full.py index 6a04e61..7bee867 100644 --- a/test/test_full.py +++ b/test/test_full.py @@ -15,6 +15,9 @@ PG_USER = "postgres" PG_PASS = "postgres" +INPUT_FILE_NAME = "input" +OUTPUT_FILE_NAME = "output" + def parse_pg_array(arr: str) -> list[str]: # Convert Postgres literal → Python list @@ -55,15 +58,13 @@ def s3_minio(): def test_full_bib(s3_minio): - input_file_name = "input" - - bib_path = Path(__file__).parent / "files" / f"{input_file_name}.bib" + bib_path = Path(__file__).parent / "files" / f"{INPUT_FILE_NAME}.bib" bib_bytes = bib_path.read_bytes() # Upload to MinIO s3_minio.put_object( Bucket=BUCKET_NAME, - Key=f"{input_file_name}.bib", + Key=f"{INPUT_FILE_NAME}.bib", Body=bib_bytes ) @@ -79,7 +80,7 @@ def test_full_bib(s3_minio): "bib_file_S3_SECRET_KEY": MINIO_PWD, "bib_file_BUCKET_NAME": BUCKET_NAME, "bib_file_FILE_PATH": "", - "bib_file_FILE_NAME": input_file_name, + "bib_file_FILE_NAME": INPUT_FILE_NAME, "bib_file_SELECTED_ATTRIBUTE": "abstract", # PostgreSQL output @@ -88,6 +89,14 @@ def test_full_bib(s3_minio): "normalized_docs_PG_USER": PG_USER, "normalized_docs_PG_PASS": PG_PASS, "normalized_docs_DB_TABLE": "normalized_docs_bib", + + "normalized_overwritten_file_output_S3_HOST": "http://127.0.0.1", + "normalized_overwritten_file_output_S3_PORT": "9000", + "normalized_overwritten_file_output_S3_ACCESS_KEY": MINIO_USER, + "normalized_overwritten_file_output_S3_SECRET_KEY": MINIO_PWD, + "normalized_overwritten_file_output_BUCKET_NAME": BUCKET_NAME, + "normalized_overwritten_file_output_FILE_PATH": "", + "normalized_overwritten_file_output_FILE_NAME": OUTPUT_FILE_NAME } for k, v in env.items(): @@ -122,17 +131,17 @@ def test_full_bib(s3_minio): assert isinstance(df.iloc[0]["tokens"], list) assert all(isinstance(t, str) for t in df.iloc[0]["tokens"]) + # TODO: Test overwritten file upload -def test_full_txt(s3_minio): - input_file_name = "input" - txt_path = Path(__file__).parent / "files" / f"{input_file_name}.txt" +def test_full_txt(s3_minio): + txt_path = Path(__file__).parent / "files" / f"{INPUT_FILE_NAME}.txt" txt_bytes = txt_path.read_bytes() # Upload input to MinIO s3_minio.put_object( Bucket=BUCKET_NAME, - Key=f"{input_file_name}.txt", + Key=f"{INPUT_FILE_NAME}.txt", Body=txt_bytes ) @@ -146,7 +155,7 @@ def test_full_txt(s3_minio): "txt_file_S3_SECRET_KEY": MINIO_PWD, "txt_file_BUCKET_NAME": BUCKET_NAME, "txt_file_FILE_PATH": "", - "txt_file_FILE_NAME": input_file_name, + "txt_file_FILE_NAME": INPUT_FILE_NAME, # Postgres output "normalized_docs_PG_HOST": "localhost", @@ -154,6 +163,14 @@ def test_full_txt(s3_minio): "normalized_docs_PG_USER": PG_USER, "normalized_docs_PG_PASS": PG_PASS, "normalized_docs_DB_TABLE": "normalized_docs_txt", + + "normalized_overwritten_file_output_S3_HOST": "http://127.0.0.1", + "normalized_overwritten_file_output_S3_PORT": "9000", + "normalized_overwritten_file_output_S3_ACCESS_KEY": MINIO_USER, + "normalized_overwritten_file_output_S3_SECRET_KEY": MINIO_PWD, + "normalized_overwritten_file_output_BUCKET_NAME": BUCKET_NAME, + "normalized_overwritten_file_output_FILE_PATH": "", + "normalized_overwritten_file_output_FILE_NAME": OUTPUT_FILE_NAME } for k, v in env.items(): @@ -177,3 +194,5 @@ def test_full_txt(s3_minio): assert isinstance(df.iloc[0]["tokens"], list) assert all(isinstance(t, str) for t in df.iloc[0]["tokens"]) + + # TODO: Test overwritten file upload diff --git a/test/test_loaders.py b/test/test_loaders.py index c55827c..c3575ce 100644 --- a/test/test_loaders.py +++ b/test/test_loaders.py @@ -13,7 +13,6 @@ def test_txt_loader_reads_and_normalizes(): result = TxtLoader.load(fname) os.unlink(fname) - # Expect list of DocumentRecord assert len(result) == 2 assert isinstance(result[0], DocumentRecord) @@ -33,11 +32,13 @@ def test_bib_loader_extracts_attribute(): } """ - with tempfile.NamedTemporaryFile("w+", delete=False) as f: + with tempfile.NamedTemporaryFile("w+", delete=False, suffix=".bib") as f: f.write(bib_content) fname = f.name - result = BibLoader.load(fname, "abstract") + loader = BibLoader(file_path=fname, attribute="abstract") + + result = loader.document_records os.unlink(fname) assert len(result) == 1 @@ -45,7 +46,7 @@ def test_bib_loader_extracts_attribute(): record = result[0] assert isinstance(record, DocumentRecord) - # ID taken from bib entry key: "@article{a,..." + # ID taken from entry key assert record.doc_id == "a" # Normalized abstract text