Improve large scale training file lists with distributed approach#26
Conversation
…memory Single-rank directory walk with chunked bcast (1M files per broadcast), hash-based sharding (adler32), epoch-dependent reshard via MPI alltoall, SharedFileList backed by POSIX shared memory, worker pre-warming, and fix for cross-epoch cache invalidation in persistent workers. Key changes: - Only rank 0 walks the filesystem; files streamed to all ranks in chunks - Each rank keeps files where adler32(path+epoch_salt) % comm_size == rank - SharedFileList stores paths in /dev/shm (139B pickle vs 67MB per worker) - alltoall reshard each epoch so files migrate between ranks (when shuffle=ON) - Workers pre-warmed before epoch 1 timing via iter()+next() - _localfs_ensure_cached always re-reads (no stale cache with persistent_workers) - allreduce(MIN) alignment prevents barrier deadlocks from uneven shards - Timing logs for listing, sharding, and resharding Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
Elements to strictly review and check before merging:
|
|
I will work on reviewing these changes this weekend, and attempt to test on Object storage as well. |
|
One more note. It appears that a recent change to the upstream DLIO code may have been attempting to do something similar to this fix. We would examine it as well to determine if it is complimentary and can be applied along with this PR, if it supersedes this PR, or if this PR supersedes it. The original PR was argonne-lcf#341: argonne-lcf#341 Commit ID: |
russfellows
left a comment
There was a problem hiding this comment.
This should address several open issues, and seems like a good solution that is minimally invasive. Will approve and merge.
|
@FileSystemGuy , My "Claude" bot tried this code and it seems to work well. Let's merge this if you're OK with it as the starting point for fixing about 3 or 4 open issues. |
… unit tests DLIOSampler.__init__ now wraps ConfigArguments.get_instance() in try/except so that tests running without MPI (no mpirun, no mpi4py init) fall back cleanly to the non-pre-sharded path instead of raising an exception. In real distributed runs files_pre_sharded=True is only set after MPI is fully initialized, so the fallback is always correct in production.
|
Note: When I said "this code seems to work well." I meant AFTER I applied the above fixes with hash ID ab033c3. These are required in order for the unit tests to pass, as otherwise there are several failures. |
|
I've not been able to do a truly in-depth review of the logic, but I don't see anything that sticks out as questionable or risky. The core question since we're changing the I/O pattern (for the better!) is whether we're changing it correctly or not. On the other hand, what this does is so much better than we had before that it's hard to say that we shouldn't pull the trigger on merging this and then keep looking and/or patching after that. |
…3 object storage, epoch-2+ AU, TFRecord via s3dlio (#27) * feat(listing): add skip_listing for deterministic file URI generation (issue #472) For DLIO-generated datasets, file URIs follow a known naming pattern: {file_prefix}_{index:0N}_of_{num_files}.{format} When skip_listing=True, each rank independently generates its own round-robin shard using this convention — zero S3 API calls, zero MPI communication for the listing phase. This eliminates multi-hour S3 listing for large flat datasets: - 50M files at 100ms/page × 50K pages = ~5000s (83 min) of listing - With skip_listing=True: ~5-10s of Python string generation - 100-500× speedup for retinanet-scale workloads (issue #472) Also supports subfoldered layouts: subfolder index computed as str(file_index % num_subfolders).zfill(nd_sf) Usage: ++workload.dataset.skip_listing=True Default: False (backward compatible, listing behavior unchanged) * feat(listing): add sampling validation for skip_listing (issue #472) When skip_listing=True, rank 0 now verifies that the deterministically generated file URIs actually exist before training begins. Validation checks: - The first file (index 0) - The last file (index num_files - 1) - Every listing_validation_interval-th file (default: every 1,000th) If any sampled file is missing, an informative exception is raised directing the user to either fix the file prefix/format or set skip_listing=False to fall back to directory listing. New config fields: listing_validation_interval: int = 1000 Set to 0 to disable validation entirely. New storage methods: ObjStoreLibStorage.file_exists(uri) -- uses s3dlio.exists() or stat_object() FileStorage.file_exists(id) -- uses os.path.isfile() For 50M files with interval=1000: 50,001 HEAD requests vs 50,000 listing pages -- same order of magnitude but fully parallel-capable and verifies bounds and uniform sampling of the dataset. * feat(listing): add progress output during skip_listing validation Validation now emits three kinds of log lines on rank 0: 1. Header before any checks: skip_listing [train]: validating 50,001 of 50,136,788 files (first, last, every 1,000) via HEAD requests ... 2. Progress every ~10% of checks (at least every 500, no more than every 100): skip_listing [train]: 5,000/50,001 checked (10%) — 483 checks/s — ETA 93s — 0 failed so far 3. Final summary on success: skip_listing [train]: validation complete — all 50,001 samples exist (103.6s, 483 checks/s); 12,534,197 URIs ready for rank 0 (50,136,788 total across all ranks) On failure the exception now includes elapsed time and shows the first 3 missing URIs for diagnosis. * fix(storage): fix BytesView incompatibility and S3 epoch-2+ cache bypass Issue #451: MinioWriter.write() called .encode() on data which fails for s3dlio BytesView objects (and any buffer-protocol type that is not str). Replace data.encode() with bytes(data), which works for bytes, bytearray, memoryview, BytesView, and any object implementing the buffer protocol. Issue #464 (object storage gap): _s3_ensure_cached() had the same cache- guard bug (if filename not in self._object_cache) that PR #26 fixed for _localfs_ensure_cached. With persistent_workers=True still set on the iterable dataset paths, a cached byte count from epoch 1 would survive to epoch 2+ and _prefetch() would never be called — producing zero S3 traffic and invalid AU measurements. Remove the guard so every epoch always issues a real GET. * fix(mpi): allreduce_min/alltoall safe in child-process and single-rank contexts When TorchIterableDatasetSimple.__iter__ runs with num_workers=0, it calls worker_init(0) directly in the main process, which deserializes ConfigArguments via pickle.loads → __setstate__ → DLIOMPI.reset() + set_parent_values(). This leaves the DLIOMPI singleton in CHILD_INITIALIZED state for the remainder of the epoch. After the epoch, reconfigure() → get_global_map_index() → allreduce_min() then calls comm(), which raises 'called in a child process'. Fix: allreduce_min() and alltoall() now short-circuit when state is CHILD_INITIALIZED or mpi_size <= 1. For child processes, MPI collectives are impossible and returning the local value is always correct (rank 0 owns the authoritative state). For single-rank runs, no allreduce is needed at all. Also fix: _S3_EXTENDED missing definition in test_s3dlio_object_store.py. Removes hardcoded fallback IP from _endpoint() — now skips if AWS_ENDPOINT_URL is not set rather than silently using a real server address. * fix(mpi): repair CHILD_INITIALIZED state in initialize(); fix TFRecord datagen validation utility.py: DLIOMPI.initialize() now repairs CHILD_INITIALIZED state when MPI is actually running (main process had its state corrupted by a DataLoader worker_init deserialization in the num_workers=0 path). Previously this raised 'called in a child process' when the second S3 test called initialize() after the first test left the state dirty. config.py: TFRecord+PyTorch validation guard now only fires when a data loader is actually used (do_train or do_eval). Previously it fired unconditionally, rejecting datagen-only runs (generate_data=True, train=False, evaluation=False) even though no data loading occurs during generation. test_s3dlio_object_store.py: add evaluation=False override to TFRecord datagen test so the eval phase does not attempt to load TFRecords via pytorch. * feat(tfrecord): full generate+read via s3dlio; fix routing and read_index - reader/reader_factory.py: route TFRECORD to TFRecordReaderS3Iterable whenever storage_library=s3dlio, regardless of storage_type. s3dlio handles both s3:// and file:// URIs, so the old check of storage_type in (S3, AISTORE) was both too narrow and too broad. - reader/tfrecord_reader_s3_iterable.py: fix read_index() to call FormatReader.read_index() directly instead of super(), which was resolving to NPYReader.read_index() -> _localfs_ensure_cached(), causing FileNotFoundError when reading from S3/object URIs. Add FormatReader import. Clarify class docstring. - reader/_s3_iterable_mixin.py: storage_library now defaults to 's3dlio' instead of raising ValueError when not set in YAML, consistent with how data generation defaults. - utils/config.py: pytorch+tfrecord restriction for train/eval now checks storage_library != 's3dlio' rather than storage_type not in (S3, AISTORE). TFRecord reading with pytorch is supported exclusively through s3dlio's TFRecordReaderS3Iterable. - tests/test_s3dlio_object_store.py: rename test_s3dlio_tfrecord_datagen to test_s3dlio_tfrecord_datagen_and_read; add full read phase. Fix stale comments (S3Storage uses tf.io.gfile, not boto3). Remove botocore from logger noise-suppression list. - docs: remove stale boto3 references in two analysis docs. All 130 unit tests pass. Live S3 tests: 2 passed (npy, tfrecord). * docs: add skip-listing.md — operational guide for skip_listing feature Documents skip_listing and listing_validation_interval: - What the problem is (S3 listing latency for large datasets) - How skip_listing works (deterministic URI generation) - Whether the listing phase is scored (it is NOT — in initialize(), before stats.start_run() in run()) - Configuration: Hydra CLI overrides, no YAML changes required - Worked examples for direct dlio_benchmark and via mlp-storage - Comparability guidance for benchmark submissions
This pull request introduces major improvements to the file sharding and data loading logic in the DLIO benchmark, focusing on scalable, balanced file assignment for distributed training. The changes implement a streaming, round-robin file sharding mechanism performed by rank 0, with chunked communication to all ranks, and update the data loader and configuration to support these new workflows. Additional enhancements include more accurate worker initialization, improved cache handling, and new configuration options for sharding and file listing.
Distributed File Sharding and Loader Workflow Improvements
main.py)files_pre_shardedandlisting_threadsconfiguration options: The newfiles_pre_shardedflag signals when file lists are already partitioned per rank, disabling further sample-level sharding and affecting step calculations.listing_threadscontrols parallelism for subfolder listing. (utils/config.py)Data Loader and Worker Management Enhancements
files_pre_shardedflag, aligning sample counts across ranks and bypassing redundant sharding logic. (torch_data_loader.py)refresh_argsmethod ensures that resharded file lists are propagated to new worker processes. (main.py,torch_data_loader.py) [1] [2] [3]main.py) [1] [2] [3] [4]I/O and Caching Behavior Fixes
_local_fs_iterable_mixin.py)These changes collectively make the benchmark more robust, scalable, and accurate in distributed environments, especially for large datasets and high concurrency.