Skip to content

Add streaming backend for convert_and_aggregate#497

Open
FabianHofmann wants to merge 7 commits intorefac/type-annotationfrom
perf/conversion
Open

Add streaming backend for convert_and_aggregate#497
FabianHofmann wants to merge 7 commits intorefac/type-annotationfrom
perf/conversion

Conversation

@FabianHofmann
Copy link
Copy Markdown
Contributor

Closes # (if applicable).

Changes proposed in this Pull Request

Add a chunk-based streaming execution backend for convert_and_aggregate that processes weather-to-energy conversions one time-chunk at a time, reducing peak memory for large cutouts.

Key changes:

  • atlite/aggregate.py: Extract shared helpers (resolve_matrix, normalize_aggregate_time, reduce_time, wrap_matrix_result, finalize_aggregated_result) used by both dask and streaming paths.
  • atlite/streaming.py: New streaming backend that reads storage-aligned chunks eagerly, applies the convert function, and either multiplies by a sparse matrix or accumulates into a temporal reducer — never materialising the full (time, y, x) grid.
  • atlite/convert.py: New backend parameter ("auto", "dask", "streaming") on convert_and_aggregate; body slimmed down using shared helpers.
  • atlite/cutout.py: Auto-detect storage-aligned chunk sizes when opening existing cutouts.
  • Benchmarks: On the default Europe-scale solar profile (6.2 GB, 50 clusters), streaming runs single-threaded in ~111s vs dask (4 distributed workers) in ~100s — comparable throughput with significantly lower peak memory.

Checklist

  • Code changes are sufficiently documented; i.e. new functions contain docstrings and further explanations may be given in doc.
  • Unit tests for new features were added (if applicable).
  • Newly introduced dependencies are added to environment.yaml, environment_docs.yaml and setup.py (if applicable).
  • A note for the release notes doc/release_notes.rst of the upcoming release is included.
  • I consent to the release of this PR's code under the MIT license.

FabianHofmann and others added 2 commits April 2, 2026 10:02
…on helpers

Extract resolve_matrix, normalize_aggregate_time, reduce_time, wrap_matrix_result
and finalize_aggregated_result into aggregate.py. Add streaming.py for chunk-based
conversion with reduced peak memory. Support storage-aligned chunking in Cutout.
@coroa
Copy link
Copy Markdown
Member

coroa commented Apr 5, 2026

Why is this necessary? Didn't the dask path already process each chunk separately? We loose multiprocessing capabilities with this.

FabianHofmann and others added 3 commits April 5, 2026 21:56
…, thread pool

Replace multiprocessing.Pool(spawn) with ThreadPoolExecutor sharing a
RasterCache that pre-reads rasters into memory. Add fast_isin (LUT),
fast_dilation (distance transform), and per-shape cached availability.
Includes numpy-style docstrings and 13 new tests for all new functions.
…, thread pool

Replace multiprocessing.Pool(spawn) with ThreadPoolExecutor sharing a
RasterCache that pre-reads rasters into memory. Add fast_isin (LUT),
fast_dilation (distance transform), and per-shape cached availability.
Includes numpy-style docstrings and 13 new tests for all new functions.
@FabianHofmann
Copy link
Copy Markdown
Contributor Author

It is mostly overhead from multiple dask processes. I'll post a detailed benchmark here, in summary: pypsa-eur takes 23 min for all availability matrices and renewable profiles right now which I always found suspicious. With this changes is takes 4-5 min in total. the main inefficiencies on master and pypsa-eur are that dask chunks are not aligned with the cutout chunks on the stored and compresed netcdf cutout. So each dask chunk needs to decompress the larger chunk from the netcdf file, leading to redundant decompressions (dask chunks 100 time steps and netcdf are 2760 timesteps). The streaming backend auto-aligns with the stored chunks to avoid the IO overhead.

The other part of the story is that dask generally leads to significant overhead. While you can still boost a single conversion with parallelization by dask using aligned chunks, a one-core computation per conversion in case of multiple parallel conversion as we do in pypsa-eur is just faster. Probably because we save time on spawning processes and parallelize on the workflow level instead

…o exclude outside-geometry pixels from reproject averaging
@coroa
Copy link
Copy Markdown
Member

coroa commented Apr 7, 2026

Sounds like this chunk/storage alignment would be a gain we would also want for the dask path and maybe we could experiment with the threading scheduler of dask: https://docs.dask.org/en/stable/scheduling.html#local-threads

@FabianHofmann
Copy link
Copy Markdown
Contributor Author

FabianHofmann commented Apr 7, 2026

Sounds like this chunk/storage alignment would be a gain we would also want for the dask path and maybe we could experiment with the threading scheduler of dask: https://docs.dask.org/en/stable/scheduling.html#local-threads

yes, likely there is a gain from that as well. but this would be done solely on the pypsa-eur side when setting the dask args. it is often a bit cumbersome trying to tweak these. atm I lean towards using the straight forward numpy based calculation on chunks as the gain is significant. there is no memory overhead even for finer resolutions btw

@coroa
Copy link
Copy Markdown
Member

coroa commented Apr 7, 2026

Yes, but you are adding another layer of complexity that is non-trivial and needs to be maintained, and i am not convinced that the same gains would not be achieved with a lot simpler tweaking of the dask backend.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants