diff --git a/runtime/python-executor/datamate/core/base_op.py b/runtime/python-executor/datamate/core/base_op.py index c0256c17a..7dd2ff473 100644 --- a/runtime/python-executor/datamate/core/base_op.py +++ b/runtime/python-executor/datamate/core/base_op.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import json +import mimetypes import os import time import traceback @@ -170,6 +171,28 @@ def read_file_first(self, sample): if self.is_first_op: self.read_file(sample) + def convert_to_dj(self, sample): + filepath = sample[self.filepath_key] + mime_type, _ = mimetypes.guess_type(filepath) + file_type = None + if mime_type: + file_type = mime_type.split('/')[0] + if file_type == "text": + return self.read_file(sample) + elif file_type == "image": + sample["text"] = "" + sample["data"] = b"" + sample["images"] = [filepath] + elif file_type == "audio": + sample["text"] = "" + sample["data"] = b"" + sample["audios"] = [filepath] + elif file_type == "video": + sample["text"] = "" + sample["data"] = b"" + sample["videos"] = [filepath] + return sample + @staticmethod def save_file_and_db(sample): if FileExporter().execute(sample): @@ -447,16 +470,17 @@ def execute(self, sample: Dict[str, Any]): try: start = time.time() + save_path = "" if file_type in self.text_support_ext: sample, save_path = self.get_textfile_handler(sample) elif file_type in self.data_support_ext: sample, save_path = self.get_datafile_handler(sample) elif file_type in self.medical_support_ext: sample, save_path = self.get_medicalfile_handler(sample) - else: - return False if sample[self.text_key] == '' and sample[self.data_key] == b'': + if sample.get("executor") == "datajuicer": + return True sample[self.filesize_key] = "0" return False @@ -567,7 +591,7 @@ def _get_from_text(self, sample: Dict[str, Any]) -> Dict[str, Any]: return sample def _get_from_text_or_data(self, sample: Dict[str, Any]) -> Dict[str, Any]: - if sample[self.data_key] is not None and sample[self.data_key] != b'' and sample[self.data_key] != "": + if sample.get(self.data_key) is not None and sample[self.data_key] != b'' and sample[self.data_key] != "": return self._get_from_data(sample) else: return self._get_from_text(sample) diff --git a/runtime/python-executor/datamate/wrappers/data_juicer_executor.py b/runtime/python-executor/datamate/wrappers/data_juicer_executor.py index 6d345f4bb..4a97152d1 100644 --- a/runtime/python-executor/datamate/wrappers/data_juicer_executor.py +++ b/runtime/python-executor/datamate/wrappers/data_juicer_executor.py @@ -86,6 +86,7 @@ def __init__(self, cfg = None, meta = None): def add_column(self, batch): batch_size = len(batch["filePath"]) batch["execute_status"] = [SUCCESS_STATUS] * batch_size + batch["executor"] = ["datajuicer"] * batch_size batch[Fields.instance_id] = [self.cfg.instance_id] * batch_size batch[Fields.export_path] = [self.cfg.export_path] * batch_size return batch @@ -102,7 +103,7 @@ def run(self): dataset = self.load_dataset() logger.info('Read data...') - dataset = dataset.map(FileExporter().read_file, num_cpus=0.05) + dataset = dataset.map(FileExporter().convert_to_dj, num_cpus=0.05) # 保存原始数据文件ID集合,用于后续过滤数据检测 original_file_ids = set(dataset.unique("fileId")) @@ -118,11 +119,11 @@ def run(self): dj_config = self.client.init_config(self.dataset_path, self.export_path, self.cfg.process) result_path = self.client.execute_config(dj_config) - processed_dataset = self.load_dataset(result_path) + processed_dataset = self.load_dj_dataset(result_path) processed_dataset = processed_dataset.map_batches(self.add_column, num_cpus=0.05) processed_dataset = processed_dataset.map(FileExporter().save_file_and_db, num_cpus=0.05) - for _ in processed_dataset.iter_batches(): - pass + + processed_dataset = processed_dataset.materialize() # 特殊处理:识别被过滤的数据 if processed_dataset.count() == 0: diff --git a/runtime/python-executor/datamate/wrappers/executor.py b/runtime/python-executor/datamate/wrappers/executor.py index 4b40948f7..478f47fa9 100644 --- a/runtime/python-executor/datamate/wrappers/executor.py +++ b/runtime/python-executor/datamate/wrappers/executor.py @@ -1,5 +1,8 @@ import json +import os +import shutil import time +from pathlib import Path from typing import Dict from datamate.common.utils.file_scanner import FileScanner @@ -54,6 +57,35 @@ def load_meta(self, line): meta["dataset_id"] = self.cfg.dataset_id return meta + def load_dj_meta(self, line): + meta = json.loads(line) + filepath = "" + file = "" + if meta.get("images"): + if isinstance(meta["images"], list): + filepath = meta["images"][0] + file = Path(filepath) + del meta["images"] + elif meta.get("audios"): + if isinstance(meta["audios"], list): + filepath = meta["audios"][0] + file = Path(filepath) + del meta["audios"] + elif meta.get("videos"): + if isinstance(meta["videos"], list): + filepath = meta["videos"][0] + file = Path(filepath) + del meta["videos"] + if filepath and file: + filename = f"{Path(meta['fileName']).stem}{file.suffix}" + meta["fileName"] = filename + meta["filePath"] = f"/dataset/{self.cfg.dataset_id}/{filename}" + meta["fileType"] = file.suffix[1:] + meta["fileSize"] = file.stat().st_size + os.makedirs(f"/dataset/{self.cfg.dataset_id}", exist_ok=True) + shutil.move(filepath, f"/dataset/{self.cfg.dataset_id}/{filename}") + return {k: v for k, v in meta.items() if not (isinstance(k, str) and k.startswith('_'))} + def run(self): pass @@ -78,6 +110,27 @@ def load_dataset(self, jsonl_file_path = None): return dataset + def load_dj_dataset(self, jsonl_file_path = None): + retry = 0 + dataset = None + if jsonl_file_path is None: + jsonl_file_path = self.cfg.dataset_path + while True: + if check_valid_path(jsonl_file_path): + with open(jsonl_file_path, "r", encoding='utf-8') as meta: + lines = meta.readlines() + dataset = ray.data.from_items([self.load_dj_meta(line) for line in lines]) + break + if retry < 5: + retry += 1 + time.sleep(retry) + continue + else: + logger.error(f"can not load dataset from dataset_path") + raise RuntimeError(f"Load dataset Failed!, dataset_path: {self.cfg.dataset_path}.") + + return dataset + def update_db(self, status): task_info = TaskInfoPersistence() task_info.update_result(self.cfg.dataset_id, self.cfg.instance_id, status)