Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions runtime/python-executor/datamate/core/base_op.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-

import json
import mimetypes
import os
import time
import traceback
Expand Down Expand Up @@ -170,6 +171,28 @@ def read_file_first(self, sample):
if self.is_first_op:
self.read_file(sample)

def convert_to_dj(self, sample):
filepath = sample[self.filepath_key]
mime_type, _ = mimetypes.guess_type(filepath)
file_type = None
if mime_type:
file_type = mime_type.split('/')[0]
if file_type == "text":
return self.read_file(sample)
elif file_type == "image":
sample["text"] = ""
sample["data"] = b""
sample["images"] = [filepath]
elif file_type == "audio":
sample["text"] = ""
sample["data"] = b""
sample["audios"] = [filepath]
elif file_type == "video":
sample["text"] = ""
sample["data"] = b""
sample["videos"] = [filepath]
return sample

@staticmethod
def save_file_and_db(sample):
if FileExporter().execute(sample):
Expand Down Expand Up @@ -447,16 +470,17 @@ def execute(self, sample: Dict[str, Any]):

try:
start = time.time()
save_path = ""
if file_type in self.text_support_ext:
sample, save_path = self.get_textfile_handler(sample)
elif file_type in self.data_support_ext:
sample, save_path = self.get_datafile_handler(sample)
elif file_type in self.medical_support_ext:
sample, save_path = self.get_medicalfile_handler(sample)
else:
return False

if sample[self.text_key] == '' and sample[self.data_key] == b'':
if sample.get("executor") == "datajuicer":
return True
sample[self.filesize_key] = "0"
return False

Expand Down Expand Up @@ -567,7 +591,7 @@ def _get_from_text(self, sample: Dict[str, Any]) -> Dict[str, Any]:
return sample

def _get_from_text_or_data(self, sample: Dict[str, Any]) -> Dict[str, Any]:
if sample[self.data_key] is not None and sample[self.data_key] != b'' and sample[self.data_key] != "":
if sample.get(self.data_key) is not None and sample[self.data_key] != b'' and sample[self.data_key] != "":
return self._get_from_data(sample)
else:
return self._get_from_text(sample)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def __init__(self, cfg = None, meta = None):
def add_column(self, batch):
batch_size = len(batch["filePath"])
batch["execute_status"] = [SUCCESS_STATUS] * batch_size
batch["executor"] = ["datajuicer"] * batch_size
batch[Fields.instance_id] = [self.cfg.instance_id] * batch_size
batch[Fields.export_path] = [self.cfg.export_path] * batch_size
return batch
Expand All @@ -102,7 +103,7 @@ def run(self):
dataset = self.load_dataset()

logger.info('Read data...')
dataset = dataset.map(FileExporter().read_file, num_cpus=0.05)
dataset = dataset.map(FileExporter().convert_to_dj, num_cpus=0.05)

# 保存原始数据文件ID集合,用于后续过滤数据检测
original_file_ids = set(dataset.unique("fileId"))
Expand All @@ -118,11 +119,11 @@ def run(self):
dj_config = self.client.init_config(self.dataset_path, self.export_path, self.cfg.process)
result_path = self.client.execute_config(dj_config)

processed_dataset = self.load_dataset(result_path)
processed_dataset = self.load_dj_dataset(result_path)
processed_dataset = processed_dataset.map_batches(self.add_column, num_cpus=0.05)
processed_dataset = processed_dataset.map(FileExporter().save_file_and_db, num_cpus=0.05)
for _ in processed_dataset.iter_batches():
pass

processed_dataset = processed_dataset.materialize()

# 特殊处理:识别被过滤的数据
if processed_dataset.count() == 0:
Expand Down
53 changes: 53 additions & 0 deletions runtime/python-executor/datamate/wrappers/executor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import json
import os
import shutil
import time
from pathlib import Path
from typing import Dict

from datamate.common.utils.file_scanner import FileScanner
Expand Down Expand Up @@ -54,6 +57,35 @@ def load_meta(self, line):
meta["dataset_id"] = self.cfg.dataset_id
return meta

def load_dj_meta(self, line):
meta = json.loads(line)
filepath = ""
file = ""
if meta.get("images"):
if isinstance(meta["images"], list):
filepath = meta["images"][0]
file = Path(filepath)
del meta["images"]
elif meta.get("audios"):
if isinstance(meta["audios"], list):
filepath = meta["audios"][0]
file = Path(filepath)
del meta["audios"]
elif meta.get("videos"):
if isinstance(meta["videos"], list):
filepath = meta["videos"][0]
file = Path(filepath)
del meta["videos"]
if filepath and file:
filename = f"{Path(meta['fileName']).stem}{file.suffix}"
meta["fileName"] = filename
meta["filePath"] = f"/dataset/{self.cfg.dataset_id}/{filename}"
meta["fileType"] = file.suffix[1:]
meta["fileSize"] = file.stat().st_size
os.makedirs(f"/dataset/{self.cfg.dataset_id}", exist_ok=True)
shutil.move(filepath, f"/dataset/{self.cfg.dataset_id}/{filename}")
return {k: v for k, v in meta.items() if not (isinstance(k, str) and k.startswith('_'))}

def run(self):
pass

Expand All @@ -78,6 +110,27 @@ def load_dataset(self, jsonl_file_path = None):

return dataset

def load_dj_dataset(self, jsonl_file_path = None):
retry = 0
dataset = None
if jsonl_file_path is None:
jsonl_file_path = self.cfg.dataset_path
while True:
if check_valid_path(jsonl_file_path):
with open(jsonl_file_path, "r", encoding='utf-8') as meta:
lines = meta.readlines()
dataset = ray.data.from_items([self.load_dj_meta(line) for line in lines])
break
if retry < 5:
retry += 1
time.sleep(retry)
continue
else:
logger.error(f"can not load dataset from dataset_path")
raise RuntimeError(f"Load dataset Failed!, dataset_path: {self.cfg.dataset_path}.")

return dataset

def update_db(self, status):
task_info = TaskInfoPersistence()
task_info.update_result(self.cfg.dataset_id, self.cfg.instance_id, status)
Expand Down
Loading