From 4e415550cdda389ccf7ab6cb85faf4451e5af84d Mon Sep 17 00:00:00 2001 From: Russ Fellows Date: Fri, 19 Jun 2026 12:34:12 -0600 Subject: [PATCH 1/8] feat(listing): add skip_listing for deterministic file URI generation (issue #472) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit For DLIO-generated datasets, file URIs follow a known naming pattern: {file_prefix}_{index:0N}_of_{num_files}.{format} When skip_listing=True, each rank independently generates its own round-robin shard using this convention — zero S3 API calls, zero MPI communication for the listing phase. This eliminates multi-hour S3 listing for large flat datasets: - 50M files at 100ms/page × 50K pages = ~5000s (83 min) of listing - With skip_listing=True: ~5-10s of Python string generation - 100-500× speedup for retinanet-scale workloads (issue #472) Also supports subfoldered layouts: subfolder index computed as str(file_index % num_subfolders).zfill(nd_sf) Usage: ++workload.dataset.skip_listing=True Default: False (backward compatible, listing behavior unchanged) --- dlio_benchmark/main.py | 34 +++++++++++++++++++++++++++++++++- dlio_benchmark/utils/config.py | 8 ++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/dlio_benchmark/main.py b/dlio_benchmark/main.py index 5d05de26..a52a2022 100644 --- a/dlio_benchmark/main.py +++ b/dlio_benchmark/main.py @@ -239,7 +239,39 @@ def _filter_round_robin(chunk, start_idx): if (start_idx + i) % self.comm_size == self.my_rank: my_files.append(fpath) - if num_subfolders > 0: + if self.args.skip_listing: + # ── Deterministic file list (skip S3 listing entirely) ─ + # Generate file URIs from DLIO's naming convention without + # any storage API calls or MPI communication. Each rank + # independently computes its own round-robin shard. + # Convention: {file_prefix}_{index:0N}_of_{total}.{format} + # For subfoldered layouts: {subfolder}/{file_prefix}_{index:0N}_of_{total}.{format} + # where subfolder = str(index % num_subfolders).zfill(nd_sf) + num_files_expected = ( + self.num_files_train if dataset_type is DatasetType.TRAIN + else (self.num_files_eval if self.do_eval else 0) + ) + if num_files_expected > 0: + nd_f = len(str(num_files_expected)) + nd_sf = len(str(max(num_subfolders - 1, 0))) if num_subfolders > 0 else 0 + for idx in range(self.my_rank, num_files_expected, self.comm_size): + fname = f"{self.args.file_prefix}_{str(idx).zfill(nd_f)}_of_{num_files_expected}.{self.args.format}" + if num_subfolders > 0: + sf = str(idx % num_subfolders).zfill(nd_sf) + rel = os.path.join(sf, fname) + else: + rel = fname + uri = self.storage.get_uri( + os.path.join(self.args.data_folder, f"{dataset_type}", rel)) + my_files.append(uri) + global_count = num_files_expected + if self.my_rank == 0: + self.logger.output( + f"{utcnow()} skip_listing [{dataset_type}]: generated " + f"{len(my_files)} file URIs deterministically " + f"({global_count} total, no S3 API calls)") + + elif num_subfolders > 0: # ── Subfoldered layout: stream with chunked bcast ───── subfolder_names = None if self.my_rank == 0: diff --git a/dlio_benchmark/utils/config.py b/dlio_benchmark/utils/config.py index 35356229..ccacf7ca 100644 --- a/dlio_benchmark/utils/config.py +++ b/dlio_benchmark/utils/config.py @@ -223,6 +223,14 @@ class ConfigArguments: files_pre_sharded: bool = False # Number of threads rank 0 uses to list subfolders in parallel. listing_threads: int = 4 + # When True, skip S3/filesystem listing entirely and generate file URIs + # deterministically from DLIO's known naming convention: + # {file_prefix}_{index:0N}_of_{num_files}.{format} + # Each rank independently computes its own round-robin shard with zero + # network calls and zero MPI communication. Use this for DLIO-generated + # datasets where filenames are guaranteed to follow this pattern. + # Eliminates multi-hour S3 listing for large datasets (issue #472). + skip_listing: bool = False # derived fields required_samples: int = 1 From 62916c30abb321504cbae1406cb853e23df57e26 Mon Sep 17 00:00:00 2001 From: Russ Fellows Date: Fri, 19 Jun 2026 12:52:17 -0600 Subject: [PATCH 2/8] feat(listing): add sampling validation for skip_listing (issue #472) When skip_listing=True, rank 0 now verifies that the deterministically generated file URIs actually exist before training begins. Validation checks: - The first file (index 0) - The last file (index num_files - 1) - Every listing_validation_interval-th file (default: every 1,000th) If any sampled file is missing, an informative exception is raised directing the user to either fix the file prefix/format or set skip_listing=False to fall back to directory listing. New config fields: listing_validation_interval: int = 1000 Set to 0 to disable validation entirely. New storage methods: ObjStoreLibStorage.file_exists(uri) -- uses s3dlio.exists() or stat_object() FileStorage.file_exists(id) -- uses os.path.isfile() For 50M files with interval=1000: 50,001 HEAD requests vs 50,000 listing pages -- same order of magnitude but fully parallel-capable and verifies bounds and uniform sampling of the dataset. --- dlio_benchmark/main.py | 46 +++++++++++++++++++++++-- dlio_benchmark/storage/file_storage.py | 4 +++ dlio_benchmark/storage/obj_store_lib.py | 17 +++++++++ dlio_benchmark/utils/config.py | 5 +++ 4 files changed, 70 insertions(+), 2 deletions(-) diff --git a/dlio_benchmark/main.py b/dlio_benchmark/main.py index a52a2022..8b9fd3a3 100644 --- a/dlio_benchmark/main.py +++ b/dlio_benchmark/main.py @@ -265,11 +265,53 @@ def _filter_round_robin(chunk, start_idx): os.path.join(self.args.data_folder, f"{dataset_type}", rel)) my_files.append(uri) global_count = num_files_expected - if self.my_rank == 0: + # ── Sampling validation (rank 0 only) ───────────── + # Confirm the naming convention is correct by checking + # that a sample of files actually exists in storage. + # Always checks the first and last file, plus every + # listing_validation_interval-th file in between. + # If any check fails, raises an informative error. + if self.my_rank == 0 and num_files_expected > 0 and \ + self.args.listing_validation_interval > 0: + interval = self.args.listing_validation_interval + val_indices = sorted( + {0, num_files_expected - 1} | + set(range(0, num_files_expected, interval)) + ) + failed_uris = [] + t_val_start = time.time() + for vidx in val_indices: + vfname = f"{self.args.file_prefix}_{str(vidx).zfill(nd_f)}_of_{num_files_expected}.{self.args.format}" + if num_subfolders > 0: + vsf = str(vidx % num_subfolders).zfill(nd_sf) + vrel = os.path.join(vsf, vfname) + else: + vrel = vfname + vuri = self.storage.get_uri( + os.path.join(self.args.data_folder, f"{dataset_type}", vrel)) + if not self.storage.file_exists(vuri): + failed_uris.append(vuri) + t_val_end = time.time() + if failed_uris: + sample_shown = failed_uris[:3] + raise Exception( + f"skip_listing validation failed for {len(failed_uris)} of " + f"{len(val_indices)} sampled files in [{dataset_type}]. " + f"First failures: {sample_shown}. " + f"Ensure data was generated with DLIO's standard naming " + f"convention or set skip_listing=False to use directory " + f"listing instead.") + self.logger.output( + f"{utcnow()} skip_listing [{dataset_type}]: generated " + f"{len(my_files)} URIs deterministically ({global_count} total); " + f"validated {len(val_indices)} samples " + f"(first, last, every {interval}) in " + f"{t_val_end - t_val_start:.2f}s — all exist") + elif self.my_rank == 0: self.logger.output( f"{utcnow()} skip_listing [{dataset_type}]: generated " f"{len(my_files)} file URIs deterministically " - f"({global_count} total, no S3 API calls)") + f"({global_count} total, validation disabled)") elif num_subfolders > 0: # ── Subfoldered layout: stream with chunked bcast ───── diff --git a/dlio_benchmark/storage/file_storage.py b/dlio_benchmark/storage/file_storage.py index 59d580fb..600e7e99 100644 --- a/dlio_benchmark/storage/file_storage.py +++ b/dlio_benchmark/storage/file_storage.py @@ -107,6 +107,10 @@ def get_data(self, id, data, offset=None, length=None): def isfile(self, id): return os.path.isfile(id) + def file_exists(self, id): + """Return True if the local file exists.""" + return os.path.isfile(id) + def get_basename(self, id): return os.path.basename(id) diff --git a/dlio_benchmark/storage/obj_store_lib.py b/dlio_benchmark/storage/obj_store_lib.py index 903746d3..e4805c85 100644 --- a/dlio_benchmark/storage/obj_store_lib.py +++ b/dlio_benchmark/storage/obj_store_lib.py @@ -591,5 +591,22 @@ def list_objects(self, container_name, prefix=None): def isfile(self, id): return super().isfile(self.get_uri(id)) + def file_exists(self, id): + """Return True if the object exists in the store, False otherwise. + + Uses s3dlio.exists() for s3dlio backend (HEAD request), or + s3_client.stat_object() for s3torchconnector/minio backends. + """ + uri = self.get_uri(id) + if self.storage_library == "s3dlio": + return self._s3dlio.exists(uri) + else: + bucket_name, object_key = self._normalize_object_key(uri) + try: + self.s3_client.stat_object(bucket_name, object_key) + return True + except Exception: + return False + def get_basename(self, id): return os.path.basename(id) diff --git a/dlio_benchmark/utils/config.py b/dlio_benchmark/utils/config.py index ccacf7ca..009c261a 100644 --- a/dlio_benchmark/utils/config.py +++ b/dlio_benchmark/utils/config.py @@ -231,6 +231,11 @@ class ConfigArguments: # datasets where filenames are guaranteed to follow this pattern. # Eliminates multi-hour S3 listing for large datasets (issue #472). skip_listing: bool = False + # When skip_listing=True, rank 0 verifies that a sample of the generated + # file URIs actually exist in storage before training begins. + # The first file, last file, and every N-th file are checked via HEAD + # (s3dlio.exists() / os.path.isfile()). Set to 0 to disable validation. + listing_validation_interval: int = 1000 # derived fields required_samples: int = 1 From dd2b0c626de52e3736ce500c3a7f8dc73947307c Mon Sep 17 00:00:00 2001 From: Russ Fellows Date: Fri, 19 Jun 2026 13:07:14 -0600 Subject: [PATCH 3/8] feat(listing): add progress output during skip_listing validation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Validation now emits three kinds of log lines on rank 0: 1. Header before any checks: skip_listing [train]: validating 50,001 of 50,136,788 files (first, last, every 1,000) via HEAD requests ... 2. Progress every ~10% of checks (at least every 500, no more than every 100): skip_listing [train]: 5,000/50,001 checked (10%) — 483 checks/s — ETA 93s — 0 failed so far 3. Final summary on success: skip_listing [train]: validation complete — all 50,001 samples exist (103.6s, 483 checks/s); 12,534,197 URIs ready for rank 0 (50,136,788 total across all ranks) On failure the exception now includes elapsed time and shows the first 3 missing URIs for diagnosis. --- dlio_benchmark/main.py | 45 ++++++++++++++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 10 deletions(-) diff --git a/dlio_benchmark/main.py b/dlio_benchmark/main.py index 8b9fd3a3..e2c343c4 100644 --- a/dlio_benchmark/main.py +++ b/dlio_benchmark/main.py @@ -278,9 +278,18 @@ def _filter_round_robin(chunk, start_idx): {0, num_files_expected - 1} | set(range(0, num_files_expected, interval)) ) + n_checks = len(val_indices) + # ── Header: tell the user what is about to happen ── + self.logger.output( + f"{utcnow()} skip_listing [{dataset_type}]: validating " + f"{n_checks:,} of {num_files_expected:,} files " + f"(first, last, every {interval:,}) via HEAD requests ...") failed_uris = [] t_val_start = time.time() - for vidx in val_indices: + # Report progress every ~10 % of checks, but at least + # every 500 checks and no more often than every 100. + progress_stride = max(100, min(500, n_checks // 10)) + for check_num, vidx in enumerate(val_indices): vfname = f"{self.args.file_prefix}_{str(vidx).zfill(nd_f)}_of_{num_files_expected}.{self.args.format}" if num_subfolders > 0: vsf = str(vidx % num_subfolders).zfill(nd_sf) @@ -291,27 +300,43 @@ def _filter_round_robin(chunk, start_idx): os.path.join(self.args.data_folder, f"{dataset_type}", vrel)) if not self.storage.file_exists(vuri): failed_uris.append(vuri) + # Periodic progress line (but not on the very first check) + if check_num > 0 and check_num % progress_stride == 0: + elapsed = time.time() - t_val_start + rate = check_num / elapsed if elapsed > 0 else 0 + pct = 100.0 * check_num / n_checks + eta = (n_checks - check_num) / rate if rate > 0 else 0 + self.logger.output( + f"{utcnow()} skip_listing [{dataset_type}]: " + f"{check_num:,}/{n_checks:,} checked " + f"({pct:.0f}%) — " + f"{rate:.0f} checks/s — " + f"ETA {eta:.0f}s — " + f"{len(failed_uris)} failed so far") t_val_end = time.time() + elapsed_total = t_val_end - t_val_start + rate_total = n_checks / elapsed_total if elapsed_total > 0 else 0 if failed_uris: sample_shown = failed_uris[:3] raise Exception( - f"skip_listing validation failed for {len(failed_uris)} of " - f"{len(val_indices)} sampled files in [{dataset_type}]. " + f"skip_listing validation failed: {len(failed_uris)} of " + f"{n_checks:,} sampled files missing in [{dataset_type}] " + f"after {elapsed_total:.1f}s. " f"First failures: {sample_shown}. " f"Ensure data was generated with DLIO's standard naming " f"convention or set skip_listing=False to use directory " f"listing instead.") self.logger.output( - f"{utcnow()} skip_listing [{dataset_type}]: generated " - f"{len(my_files)} URIs deterministically ({global_count} total); " - f"validated {len(val_indices)} samples " - f"(first, last, every {interval}) in " - f"{t_val_end - t_val_start:.2f}s — all exist") + f"{utcnow()} skip_listing [{dataset_type}]: validation complete — " + f"all {n_checks:,} samples exist " + f"({elapsed_total:.1f}s, {rate_total:.0f} checks/s); " + f"{len(my_files):,} URIs ready for rank 0 " + f"({global_count:,} total across all ranks)") elif self.my_rank == 0: self.logger.output( f"{utcnow()} skip_listing [{dataset_type}]: generated " - f"{len(my_files)} file URIs deterministically " - f"({global_count} total, validation disabled)") + f"{len(my_files):,} file URIs deterministically " + f"({global_count:,} total — validation disabled)") elif num_subfolders > 0: # ── Subfoldered layout: stream with chunked bcast ───── From dba0cafcb4079f52663dfe6e4e4b73dfc7ce6362 Mon Sep 17 00:00:00 2001 From: Russ Fellows Date: Fri, 19 Jun 2026 13:35:11 -0600 Subject: [PATCH 4/8] fix(storage): fix BytesView incompatibility and S3 epoch-2+ cache bypass MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Issue #451: MinioWriter.write() called .encode() on data which fails for s3dlio BytesView objects (and any buffer-protocol type that is not str). Replace data.encode() with bytes(data), which works for bytes, bytearray, memoryview, BytesView, and any object implementing the buffer protocol. Issue #464 (object storage gap): _s3_ensure_cached() had the same cache- guard bug (if filename not in self._object_cache) that PR #26 fixed for _localfs_ensure_cached. With persistent_workers=True still set on the iterable dataset paths, a cached byte count from epoch 1 would survive to epoch 2+ and _prefetch() would never be called — producing zero S3 traffic and invalid AU measurements. Remove the guard so every epoch always issues a real GET. --- dlio_benchmark/reader/_s3_iterable_mixin.py | 15 ++++++++++++--- dlio_benchmark/storage/obj_store_lib.py | 9 +++++++-- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/dlio_benchmark/reader/_s3_iterable_mixin.py b/dlio_benchmark/reader/_s3_iterable_mixin.py index 79f2b81d..85bfd9ce 100644 --- a/dlio_benchmark/reader/_s3_iterable_mixin.py +++ b/dlio_benchmark/reader/_s3_iterable_mixin.py @@ -562,9 +562,18 @@ def _s3_prefetch_all(self) -> None: self._object_cache = self._prefetch(obj_keys) def _s3_ensure_cached(self, filename: str) -> None: - """Fetch a single object on demand if it is not already in the cache.""" - if filename not in self._object_cache: - self._object_cache.update(self._prefetch([filename])) + """Fetch a single object on demand, always re-fetching from storage. + + The cache is intentionally NOT short-circuited so that every epoch + measures real I/O. With persistent_workers=True (still used on the + iterable dataset paths), reusing a cached byte count from a previous + epoch would skip the GET entirely in epochs 2+, producing invalid AU. + + This mirrors the fix applied to _localfs_ensure_cached in PR #26 — + that fix covered the local-filesystem map-style path but the identical + guard (``if filename not in self._object_cache``) was not removed here. + """ + self._object_cache.update(self._prefetch([filename])) def finalize_s3_bytes(self) -> None: """ diff --git a/dlio_benchmark/storage/obj_store_lib.py b/dlio_benchmark/storage/obj_store_lib.py index e4805c85..443463b6 100644 --- a/dlio_benchmark/storage/obj_store_lib.py +++ b/dlio_benchmark/storage/obj_store_lib.py @@ -105,10 +105,15 @@ def __init__(self, client, bucket, obj_name): self.buffer = BytesIO() def write(self, data): - if isinstance(data, bytes): + if isinstance(data, (bytes, bytearray, memoryview)): self.buffer.write(data) else: - self.buffer.write(data.encode()) + # Handle buffer-protocol objects (e.g. s3dlio BytesView) that + # are not bytes but support the buffer protocol. bytes() works + # for BytesView, memoryview, bytearray, and any C-extension type + # that implements __buffer__. Calling .encode() on these fails + # with AttributeError — .encode() is a str-only method. + self.buffer.write(bytes(data)) def close(self): self.buffer.seek(0) From 7319379e59e01f30b73750e99a9abfba28e17c19 Mon Sep 17 00:00:00 2001 From: Russ Fellows Date: Fri, 19 Jun 2026 14:02:37 -0600 Subject: [PATCH 5/8] fix(mpi): allreduce_min/alltoall safe in child-process and single-rank contexts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When TorchIterableDatasetSimple.__iter__ runs with num_workers=0, it calls worker_init(0) directly in the main process, which deserializes ConfigArguments via pickle.loads → __setstate__ → DLIOMPI.reset() + set_parent_values(). This leaves the DLIOMPI singleton in CHILD_INITIALIZED state for the remainder of the epoch. After the epoch, reconfigure() → get_global_map_index() → allreduce_min() then calls comm(), which raises 'called in a child process'. Fix: allreduce_min() and alltoall() now short-circuit when state is CHILD_INITIALIZED or mpi_size <= 1. For child processes, MPI collectives are impossible and returning the local value is always correct (rank 0 owns the authoritative state). For single-rank runs, no allreduce is needed at all. Also fix: _S3_EXTENDED missing definition in test_s3dlio_object_store.py. Removes hardcoded fallback IP from _endpoint() — now skips if AWS_ENDPOINT_URL is not set rather than silently using a real server address. --- dlio_benchmark/utils/utility.py | 10 +++++++++- tests/test_s3dlio_object_store.py | 8 +++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/dlio_benchmark/utils/utility.py b/dlio_benchmark/utils/utility.py index a40bb1b5..7532f2d0 100644 --- a/dlio_benchmark/utils/utility.py +++ b/dlio_benchmark/utils/utility.py @@ -346,14 +346,22 @@ def reduce(self, num): return MPI.COMM_WORLD.allreduce(num, op=MPI.SUM) def allreduce_min(self, value): - from mpi4py import MPI if self.mpi_state == MPIState.UNINITIALIZED: raise Exception(f"method {self.classname()}.allreduce_min() called before initializing MPI") + # Single-rank or child-process (DataLoader worker): no collective needed. + # Child processes can never issue MPI collectives; returning the local + # value is correct for single-rank runs and safe for workers. + if self.mpi_state == MPIState.CHILD_INITIALIZED or self.mpi_size <= 1: + return value + from mpi4py import MPI return self.comm().allreduce(value, op=MPI.MIN) def alltoall(self, data): if self.mpi_state == MPIState.UNINITIALIZED: raise Exception(f"method {self.classname()}.alltoall() called before initializing MPI") + # Single-rank or child-process: identity operation. + if self.mpi_state == MPIState.CHILD_INITIALIZED or self.mpi_size <= 1: + return data return self.comm().alltoall(data) def finalize(self): diff --git a/tests/test_s3dlio_object_store.py b/tests/test_s3dlio_object_store.py index 88453cb2..779d6977 100644 --- a/tests/test_s3dlio_object_store.py +++ b/tests/test_s3dlio_object_store.py @@ -116,6 +116,9 @@ def _load_env_file(): # Using a value much shorter than TEST_TIMEOUT_SECONDS (600 s) so a hang is # caught quickly rather than after 10 minutes. _S3_TEST_TIMEOUT = int(os.environ.get("DLIO_S3_TEST_TIMEOUT", "120")) # seconds +# When DLIO_S3_EXTENDED=1, run all supported formats. Default: npy only +# (fastest smoke test — verifies the put+get cycle without exhausting time). +_S3_EXTENDED = os.environ.get("DLIO_S3_EXTENDED", "0") == "1" comm = MPI.COMM_WORLD _config_dir = os.path.dirname(dlio_benchmark.__file__) + "/configs/" @@ -126,7 +129,10 @@ def _load_env_file(): # ─── Helpers ────────────────────────────────────────────────────────────────── def _endpoint(): - return os.environ.get("AWS_ENDPOINT_URL", "https://172.16.1.40:9000") + ep = os.environ.get("AWS_ENDPOINT_URL") + if not ep: + pytest.skip("AWS_ENDPOINT_URL not set — cannot run live S3 tests") + return ep def _region(): From 31759a85e9a5e837c758a88145524d0d139d3165 Mon Sep 17 00:00:00 2001 From: Russ Fellows Date: Fri, 19 Jun 2026 14:06:30 -0600 Subject: [PATCH 6/8] fix(mpi): repair CHILD_INITIALIZED state in initialize(); fix TFRecord datagen validation utility.py: DLIOMPI.initialize() now repairs CHILD_INITIALIZED state when MPI is actually running (main process had its state corrupted by a DataLoader worker_init deserialization in the num_workers=0 path). Previously this raised 'called in a child process' when the second S3 test called initialize() after the first test left the state dirty. config.py: TFRecord+PyTorch validation guard now only fires when a data loader is actually used (do_train or do_eval). Previously it fired unconditionally, rejecting datagen-only runs (generate_data=True, train=False, evaluation=False) even though no data loading occurs during generation. test_s3dlio_object_store.py: add evaluation=False override to TFRecord datagen test so the eval phase does not attempt to load TFRecords via pytorch. --- dlio_benchmark/utils/config.py | 2 +- dlio_benchmark/utils/utility.py | 14 ++++++++++++++ tests/test_s3dlio_object_store.py | 1 + 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/dlio_benchmark/utils/config.py b/dlio_benchmark/utils/config.py index 009c261a..7f918a7b 100644 --- a/dlio_benchmark/utils/config.py +++ b/dlio_benchmark/utils/config.py @@ -379,7 +379,7 @@ def validate(self): if (self.do_profiling == True) and (self.profiler == Profiler('darshan')): if ('LD_PRELOAD' not in os.environ or os.environ["LD_PRELOAD"].find("libdarshan") == -1): raise Exception("Please set darshan runtime library in LD_PRELOAD") - if self.format is FormatType.TFRECORD and (self.data_loader is DataLoaderType.PYTORCH): + if self.format is FormatType.TFRECORD and (self.data_loader is DataLoaderType.PYTORCH) and (self.do_train or self.do_eval): raise Exception(f"{self.framework} support for tfrecord is not implemented for {self.data_loader}.") if (self.framework == FrameworkType.TENSORFLOW and self.data_loader == DataLoaderType.PYTORCH) or ( self.framework == FrameworkType.PYTORCH and self.data_loader == DataLoaderType.TENSORFLOW): diff --git a/dlio_benchmark/utils/utility.py b/dlio_benchmark/utils/utility.py index 7532f2d0..e56f1e04 100644 --- a/dlio_benchmark/utils/utility.py +++ b/dlio_benchmark/utils/utility.py @@ -222,6 +222,20 @@ def classname(cls): def initialize(self): from mpi4py import MPI + if self.mpi_state == MPIState.CHILD_INITIALIZED: + # The main process can end up in CHILD_INITIALIZED when + # TorchIterableDatasetSimple.__iter__ calls worker_init(0) directly + # in the main thread (num_workers=0 path). That deserializes + # ConfigArguments via pickle.loads → __setstate__ → DLIOMPI.reset() + # + set_parent_values(), leaving the singleton in CHILD_INITIALIZED. + # If MPI is actually running (MPI.Is_initialized()), we are the + # real MPI process — reset to UNINITIALIZED so initialization + # proceeds normally below. If MPI is not running, we truly are + # in a child process and must refuse. + if MPI.Is_initialized(): + self.mpi_state = MPIState.UNINITIALIZED + else: + raise Exception(f"method {self.classname()}.initialize() called in a child process") if self.mpi_state == MPIState.UNINITIALIZED: # MPI may have already been initialized by dlio_benchmark_test.py if not MPI.Is_initialized(): diff --git a/tests/test_s3dlio_object_store.py b/tests/test_s3dlio_object_store.py index 779d6977..3384f65b 100644 --- a/tests/test_s3dlio_object_store.py +++ b/tests/test_s3dlio_object_store.py @@ -382,6 +382,7 @@ def test_s3dlio_tfrecord_datagen(): cfg = compose(config_name="config", overrides=base + [ "++workload.workflow.generate_data=True", "++workload.workflow.train=False", + "++workload.workflow.evaluation=False", # TFRecord read requires TF, not pytorch "++workload.workflow.checkpoint=False", ]) _run_benchmark(OmegaConf.to_container(cfg["workload"], resolve=True), From 9bdbd38f850a6e6354f6e633d79ca5bd31f5bd59 Mon Sep 17 00:00:00 2001 From: Russ Fellows Date: Fri, 19 Jun 2026 14:46:50 -0600 Subject: [PATCH 7/8] feat(tfrecord): full generate+read via s3dlio; fix routing and read_index - reader/reader_factory.py: route TFRECORD to TFRecordReaderS3Iterable whenever storage_library=s3dlio, regardless of storage_type. s3dlio handles both s3:// and file:// URIs, so the old check of storage_type in (S3, AISTORE) was both too narrow and too broad. - reader/tfrecord_reader_s3_iterable.py: fix read_index() to call FormatReader.read_index() directly instead of super(), which was resolving to NPYReader.read_index() -> _localfs_ensure_cached(), causing FileNotFoundError when reading from S3/object URIs. Add FormatReader import. Clarify class docstring. - reader/_s3_iterable_mixin.py: storage_library now defaults to 's3dlio' instead of raising ValueError when not set in YAML, consistent with how data generation defaults. - utils/config.py: pytorch+tfrecord restriction for train/eval now checks storage_library != 's3dlio' rather than storage_type not in (S3, AISTORE). TFRecord reading with pytorch is supported exclusively through s3dlio's TFRecordReaderS3Iterable. - tests/test_s3dlio_object_store.py: rename test_s3dlio_tfrecord_datagen to test_s3dlio_tfrecord_datagen_and_read; add full read phase. Fix stale comments (S3Storage uses tf.io.gfile, not boto3). Remove botocore from logger noise-suppression list. - docs: remove stale boto3 references in two analysis docs. All 130 unit tests pass. Live S3 tests: 2 passed (npy, tfrecord). --- dlio_benchmark/reader/_s3_iterable_mixin.py | 12 ++--- dlio_benchmark/reader/reader_factory.py | 9 ++-- .../reader/tfrecord_reader_s3_iterable.py | 9 +++- dlio_benchmark/utils/config.py | 4 +- docs/DLIO-Object-Storage_Analysis.md | 7 +-- docs/DLRM-Parquet-S3-Throughput-Analysis.md | 2 +- tests/test_s3dlio_object_store.py | 45 ++++++++++++------- 7 files changed, 53 insertions(+), 35 deletions(-) diff --git a/dlio_benchmark/reader/_s3_iterable_mixin.py b/dlio_benchmark/reader/_s3_iterable_mixin.py index 85bfd9ce..62985118 100644 --- a/dlio_benchmark/reader/_s3_iterable_mixin.py +++ b/dlio_benchmark/reader/_s3_iterable_mixin.py @@ -103,15 +103,9 @@ def _s3_init(self, opts: dict) -> None: Raises ``ImportError`` immediately if the configured library is not installed, rather than deferring failure to the first I/O call. """ - # storage_library is REQUIRED — there is no default. Every object - # storage workload must explicitly declare which library to use. - self._storage_library: str = opts.get("storage_library") - if self._storage_library is None: - raise ValueError( - "storage_options['storage_library'] is required for S3 readers. " - "Add 'storage_library: ' under the 'storage:' section of " - "your workload YAML. Supported values: minio, s3dlio, s3torchconnector." - ) + # Default to s3dlio — consistent with how data is generated. Users can + # override by setting storage_library in storage_options. + self._storage_library: str = opts.get("storage_library") or "s3dlio" self._opts: dict = opts self._object_cache: dict = {} # obj_key → int (raw byte count only) self._minio_client = None # cached across epochs for TCP keep-alive diff --git a/dlio_benchmark/reader/reader_factory.py b/dlio_benchmark/reader/reader_factory.py index 2484ff8d..74ceb422 100644 --- a/dlio_benchmark/reader/reader_factory.py +++ b/dlio_benchmark/reader/reader_factory.py @@ -95,11 +95,10 @@ def get_reader(type, dataset_type, thread_index, epoch_number): elif type == FormatType.TFRECORD: if _args.odirect == True: raise Exception("O_DIRECT for %s format is not yet supported." %type) - elif _args.storage_type in (StorageType.S3, StorageType.AISTORE): - storage_library = (getattr(_args, "storage_options", {}) or {}).get("storage_library") - if storage_library in ("s3dlio", "s3torchconnector", "minio"): - from dlio_benchmark.reader.tfrecord_reader_s3_iterable import TFRecordReaderS3Iterable - return TFRecordReaderS3Iterable(dataset_type, thread_index, epoch_number) + elif (getattr(_args, "storage_options", {}) or {}).get("storage_library") == "s3dlio": + # s3dlio handles both s3:// and file:// URIs. + from dlio_benchmark.reader.tfrecord_reader_s3_iterable import TFRecordReaderS3Iterable + return TFRecordReaderS3Iterable(dataset_type, thread_index, epoch_number) if _args.data_loader == DataLoaderType.NATIVE_DALI: from dlio_benchmark.reader.dali_tfrecord_reader import DaliTFRecordReader return DaliTFRecordReader(dataset_type, thread_index, epoch_number) diff --git a/dlio_benchmark/reader/tfrecord_reader_s3_iterable.py b/dlio_benchmark/reader/tfrecord_reader_s3_iterable.py index 8eb9cfac..ff3b5574 100644 --- a/dlio_benchmark/reader/tfrecord_reader_s3_iterable.py +++ b/dlio_benchmark/reader/tfrecord_reader_s3_iterable.py @@ -28,6 +28,7 @@ """ # Copyright (c) 2025, UChicago Argonne, LLC. Apache 2.0 License. from dlio_benchmark.common.constants import MODULE_DATA_READER +from dlio_benchmark.reader.reader_handler import FormatReader from dlio_benchmark.reader.npy_reader import NPYReader from dlio_benchmark.reader._s3_iterable_mixin import _S3IterableMixin from dlio_benchmark.utils.utility import Profile, utcnow @@ -52,6 +53,10 @@ class TFRecordReaderS3Iterable(NPYReader, _S3IterableMixin): _object_cache[filename] holds an int (byte count), same pattern as all other S3 iterable readers. + + Note: read_index() calls FormatReader.read_index() directly to bypass + NPYReader._localfs_ensure_cached() which would attempt a local filesystem + read on an S3 URI. """ @dlp.log_init @@ -119,7 +124,9 @@ def read_index(self, image_idx, step): filename, _ = self.global_index_map[image_idx] self._s3_ensure_cached(filename) dlp.update(step=step) - return super().read_index(image_idx, step) + # Call FormatReader.read_index() directly — skips NPYReader.read_index() + # which would invoke _localfs_ensure_cached() on an S3 URI and fail. + return FormatReader.read_index(self, image_idx, step) @dlp.log def finalize(self): diff --git a/dlio_benchmark/utils/config.py b/dlio_benchmark/utils/config.py index 7f918a7b..de59117e 100644 --- a/dlio_benchmark/utils/config.py +++ b/dlio_benchmark/utils/config.py @@ -380,7 +380,9 @@ def validate(self): if ('LD_PRELOAD' not in os.environ or os.environ["LD_PRELOAD"].find("libdarshan") == -1): raise Exception("Please set darshan runtime library in LD_PRELOAD") if self.format is FormatType.TFRECORD and (self.data_loader is DataLoaderType.PYTORCH) and (self.do_train or self.do_eval): - raise Exception(f"{self.framework} support for tfrecord is not implemented for {self.data_loader}.") + # TFRecordReaderS3Iterable handles pytorch+tfrecord via s3dlio (s3:// and file://). + if (self.storage_options or {}).get("storage_library") != "s3dlio": + raise Exception(f"{self.framework} support for tfrecord is not implemented for {self.data_loader}.") if (self.framework == FrameworkType.TENSORFLOW and self.data_loader == DataLoaderType.PYTORCH) or ( self.framework == FrameworkType.PYTORCH and self.data_loader == DataLoaderType.TENSORFLOW): raise Exception("Imcompatible between framework and data_loader setup.") diff --git a/docs/DLIO-Object-Storage_Analysis.md b/docs/DLIO-Object-Storage_Analysis.md index 3e0f823f..1b2d8e82 100644 --- a/docs/DLIO-Object-Storage_Analysis.md +++ b/docs/DLIO-Object-Storage_Analysis.md @@ -83,10 +83,11 @@ However, this code path is **only triggered by the TFDataLoader**, which calls ` ## 6. One Actual Issue Found (Not Timing-Related) -The `configs/dlio/workload/unet3d_h100_s3dlio.yaml` file still contains the hardcoded endpoint and personal paths that were cleaned from `tests/object-store/`. Specifically: +The `configs/dlio/workload/unet3d_h100_s3dlio.yaml` file still contains a hardcoded +endpoint and personal paths that were cleaned from `tests/object-store/`. Specifically: -- `endpoint_url: http://172.16.1.40:9000` -- `source /home/eval/Documents/Code/mlp-storage/.env` in the comments +- `endpoint_url: :9000` +- A local filesystem path in the comments This was outside the scope of the previous cleanup pass and is a separate issue from timing correctness. diff --git a/docs/DLRM-Parquet-S3-Throughput-Analysis.md b/docs/DLRM-Parquet-S3-Throughput-Analysis.md index edfa4c94..f3f474ba 100644 --- a/docs/DLRM-Parquet-S3-Throughput-Analysis.md +++ b/docs/DLRM-Parquet-S3-Throughput-Analysis.md @@ -185,7 +185,7 @@ This is the only path to full 10 GiB/s with the existing per-file/per-row-group **Pros:** - Clean separation: s3dlio stays format-agnostic -- Can be used without s3dlio (e.g. with boto3 backend) +- Can be used without s3dlio (e.g. with a different S3 backend) - Easier to publish independently **Cons:** diff --git a/tests/test_s3dlio_object_store.py b/tests/test_s3dlio_object_store.py index 3384f65b..29cd4aab 100644 --- a/tests/test_s3dlio_object_store.py +++ b/tests/test_s3dlio_object_store.py @@ -30,10 +30,8 @@ Formats tested -------------- -npy, npz, hdf5, csv, parquet, jpeg, png -TFRecord: generate-only (put) test included; read phase excluded because -reading TFRecords requires framework=tensorflow which routes through -S3Storage (bare AWS SDK), not ObjStoreLibStorage (s3dlio). +npy, npz, hdf5, csv, parquet, jpeg, png, tfrecord (full generate + read cycle). +All formats use s3dlio for both write and read. """ import os @@ -84,7 +82,7 @@ def _load_env_file(): force=True, # override any earlier basicConfig from conftest or dlio imports ) # Keep noisy third-party loggers at INFO level. -for _noisy in ("urllib3", "botocore", "s3transfer", "filelock", "hydra"): +for _noisy in ("urllib3", "s3transfer", "filelock", "hydra"): logging.getLogger(_noisy).setLevel(logging.WARNING) # ─── Object-storage opt-in gate ────────────────────────────────────────────── @@ -254,7 +252,8 @@ def _base_overrides(bucket: str, prefix: str, fmt: str, _FORMATS = ["npy"] if not _S3_EXTENDED else ["npy", "npz", "hdf5", "csv", "parquet", "jpeg", "png"] # TFRecord excluded: reading requires framework=tensorflow which routes through -# S3Storage (bare boto3), not ObjStoreLibStorage (s3dlio). Generate-only test +# S3Storage (tf.io.gfile), not ObjStoreLibStorage (s3dlio). Generate-only test +# (TFRecord full datagen+read tested separately in test_s3dlio_tfrecord_datagen_and_read). # for TFRecord is covered by test_s3dlio_tfrecord_datagen below. @@ -350,17 +349,17 @@ def test_s3dlio_datagen_and_read(fmt): shutil.rmtree(_DLIO_TEST_OUTPUT_DIR, ignore_errors=True) -# ─── TFRecord: generate-only (put) test ─────────────────────────────────────── +# ─── TFRecord: full generate + read test ───────────────────────────────────── @pytest.mark.timeout(_S3_TEST_TIMEOUT, method="thread") -def test_s3dlio_tfrecord_datagen(): +def test_s3dlio_tfrecord_datagen_and_read(): """ - Put-only test for TFRecord format. + Full generate + read test for TFRecord format using s3dlio. - TFRecord generation works with framework=pytorch (uses TFRecordGenerator to - write objects via s3dlio). Reading TFRecords requires tf.data and - framework=tensorflow, which routes through S3Storage (boto3), not - ObjStoreLibStorage (s3dlio) — so no read phase is included here. + TFRecord generation writes objects via s3dlio (TFRecordGenerator + put_data). + Reading uses TFRecordReaderS3Iterable which fetches raw bytes via s3dlio + get_many() — no tensorflow/protobuf decoding required. Both phases use + framework=pytorch so no tensorflow installation is needed. """ DLIOMPI.get_instance().initialize() @@ -382,7 +381,7 @@ def test_s3dlio_tfrecord_datagen(): cfg = compose(config_name="config", overrides=base + [ "++workload.workflow.generate_data=True", "++workload.workflow.train=False", - "++workload.workflow.evaluation=False", # TFRecord read requires TF, not pytorch + "++workload.workflow.evaluation=False", "++workload.workflow.checkpoint=False", ]) _run_benchmark(OmegaConf.to_container(cfg["workload"], resolve=True), @@ -406,7 +405,23 @@ def test_s3dlio_tfrecord_datagen(): f"found {len(found_valid)}: {found_valid}" ) - log.info("test_s3dlio_tfrecord_datagen PASSED — put confirmed") + log.info("tfrecord datagen PASSED — now running read phase ...") + + # Read phase: TFRecordReaderS3Iterable fetches objects via s3dlio. + # Uses files_pre_sharded=True so DLIO does not attempt to re-list S3. + with initialize_config_dir(version_base=None, config_dir=_config_dir): + cfg = compose(config_name="config", overrides=base + [ + "++workload.workflow.generate_data=False", + "++workload.workflow.train=True", + "++workload.workflow.evaluation=False", + "++workload.workflow.checkpoint=False", + "++workload.dataset.files_pre_sharded=True", + "++workload.train.epochs=1", + ]) + _run_benchmark(OmegaConf.to_container(cfg["workload"], resolve=True), + phase="train", verify=True) + + log.info("test_s3dlio_tfrecord_datagen_and_read PASSED — put + get confirmed") finally: if comm.rank == 0: From 9091a4c398b3dd607fddf26a5f29425b725d5f40 Mon Sep 17 00:00:00 2001 From: Russ Fellows Date: Fri, 19 Jun 2026 16:46:07 -0600 Subject: [PATCH 8/8] =?UTF-8?q?docs:=20add=20skip-listing.md=20=E2=80=94?= =?UTF-8?q?=20operational=20guide=20for=20skip=5Flisting=20feature?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Documents skip_listing and listing_validation_interval: - What the problem is (S3 listing latency for large datasets) - How skip_listing works (deterministic URI generation) - Whether the listing phase is scored (it is NOT — in initialize(), before stats.start_run() in run()) - Configuration: Hydra CLI overrides, no YAML changes required - Worked examples for direct dlio_benchmark and via mlp-storage - Comparability guidance for benchmark submissions --- docs/skip-listing.md | 185 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 185 insertions(+) create mode 100644 docs/skip-listing.md diff --git a/docs/skip-listing.md b/docs/skip-listing.md new file mode 100644 index 00000000..3a0e4c3e --- /dev/null +++ b/docs/skip-listing.md @@ -0,0 +1,185 @@ +# skip_listing — Fast File Discovery for Large Object-Storage Datasets + +**Fixes:** [mlcommons/DLIO_local_changes issue #472](https://github.com/mlcommons/DLIO_local_changes/issues/472) + +--- + +## Problem + +On S3-compatible object storage, DLIO discovers dataset files by calling +`list_objects_v2` (or equivalent). For a 50 million-file dataset this +requires 50,000 paginated API calls (1,000 keys/page). At 100 ms/page on a +WAN-connected or busy S3 endpoint this takes **83 minutes** and has been +observed in the field to take **12+ hours**. + +PR #26 reduced the amplification from N×ranks to a single rank-0 listing, but +rank 0 still paginates every key. + +`skip_listing` eliminates the listing entirely. + +--- + +## How it works + +When `skip_listing=True`, each MPI rank independently reconstructs its own +file-URI shard from DLIO's standard file-naming convention: + +``` +{file_prefix}_{index:0Nd}_of_{num_files}.{format} +``` + +For sub-foldered layouts: + +``` +{subfolder}/{file_prefix}_{index:0Nd}_of_{num_files}.{format} + where subfolder = str(index % num_subfolders).zfill(nd_sf) +``` + +Every rank generates only its own round-robin slice +(`index % comm_size == my_rank`) with **zero S3 API calls** and **zero MPI +communication**. + +A sampling validation step (rank 0 only) then confirms the convention matches +by checking a small fraction of URIs via HEAD requests. + +### Is the listing phase timed / scored? + +**No.** File discovery runs inside `DLIOBenchmark.initialize()`, which +completes before `DLIOBenchmark.run()` is called. The benchmark score (AU, +throughput) starts at `stats.start_run()` — the very first line of `run()`. +`skip_listing` does not affect AU or throughput scores. It only affects +**total wall-clock job time** and **cold-start latency**. + +--- + +## Configuration + +`skip_listing` is a `dataset:` field. You do **not** need to modify any YAML +workload file. + +### Option A — Hydra CLI override (recommended, no YAML changes) + +Pass the override directly on the `dlio_benchmark` command line: + +```bash +dlio_benchmark workload=unet3d_a100 \ + ++workload.dataset.skip_listing=True \ + ++workload.dataset.listing_validation_interval=1000 \ + --config-dir /path/to/configs +``` + +### Option B — Environment variable (not supported natively by DLIO) + +DLIO uses Hydra for configuration; there is no environment-variable shim for +individual dataset fields. Use the `++workload.dataset.*` CLI syntax shown +above, or configure your orchestration layer to inject the override (mlp-storage +does this automatically — see below). + +### Option C — Add to your own workload YAML override file + +If you maintain a site-specific YAML overlay, add: + +```yaml +dataset: + skip_listing: true + listing_validation_interval: 1000 # 0 = disable validation +``` + +--- + +## Parameters + +### `dataset.skip_listing` (bool, default: `False`) + +When `True`, DLIO generates file URIs from the dataset naming convention instead +of calling the storage listing API. + +**Requirement:** Dataset filenames must follow DLIO's standard convention +(`{prefix}_{index:0N}_of_{total}.{format}`). All data generated by +`dlio_benchmark` with `workflow.generate_data=True` satisfies this +automatically. Externally generated datasets with different naming **must** +use `skip_listing=False`. + +**Default is `False` in DLIO itself.** When calling DLIO via mlp-storage with +`--object` mode, mlp-storage injects `skip_listing=True` automatically (see +[mlp-storage object storage guide](../../mlp-storage/docs/OBJECT_STORAGE_GUIDE.md)). + +### `dataset.listing_validation_interval` (int, default: `1000`) + +When `skip_listing=True`, rank 0 validates that a sample of the generated URIs +actually exist in storage before training begins. Always checks the first and +last file, plus every N-th file in between, via HEAD requests. + +- Set to `0` to **disable validation entirely** (fastest startup; no safety net). +- Set to `1` to check every single file (equivalent to a full listing). +- Default `1000` checks ~50,000 files for a 50M-file dataset in ~100 seconds. + +Progress is printed to the log at ~10 % intervals: + +``` +skip_listing [train]: validating 50,001 of 50,000,000 files + (first, last, every 1,000) via HEAD requests ... +5,000/50,001 checked (10%) — 483 checks/s — ETA 93s — 0 failed so far +skip_listing [train]: validation complete — all 50,001 samples exist + (103.6s, 483 checks/s); 781,250 URIs ready for rank 0 +``` + +--- + +## Worked examples + +### Direct dlio_benchmark — S3, 50M files, skip listing + +```bash +mpirun -n 64 dlio_benchmark workload=unet3d_a100 \ + ++workload.storage.storage_type=s3 \ + ++workload.storage.storage_root=my-bucket \ + ++workload.storage.storage_options.storage_library=s3dlio \ + ++workload.dataset.num_files_train=50000000 \ + ++workload.dataset.skip_listing=True \ + ++workload.dataset.listing_validation_interval=1000 \ + ++workload.workflow.generate_data=False \ + ++workload.workflow.train=True \ + --config-dir /path/to/mlp-storage/configs/dlio +``` + +### Disable validation for faster startup (trusted dataset) + +```bash +dlio_benchmark workload=unet3d_a100 \ + ++workload.dataset.skip_listing=True \ + ++workload.dataset.listing_validation_interval=0 \ + ... +``` + +### Opt out of skip_listing when using mlp-storage + +If your dataset was not generated by DLIO and does not follow the standard +naming convention, pass `--params dataset.skip_listing=False` to override the +mlp-storage default: + +```bash +uv run mlpstorage training run \ + --model unet3d --object ... \ + --params dataset.skip_listing=False +``` + +--- + +## Comparability of benchmark scores + +Because the listing phase is entirely within `initialize()` and the scored +measurement window only opens in `run()`, **AU and throughput scores are +directly comparable** between: + +- A run with `skip_listing=True` (fast startup) +- A run with `skip_listing=False` (slow listing) +- A run on local filesystem (instant `os.listdir()`) + +All three measure the same thing: how fast the storage system delivers bytes +during training. The listing phase measures object-store metadata performance, +which is a separate concern. + +For submission comparability, mlp-storage enables `skip_listing=True` by +default for all `--object` runs so that no submission is penalised by slow +metadata APIs.