From 24874036497f90e33b982f351f0558d89d6acffb Mon Sep 17 00:00:00 2001 From: Janick Martinez Esturo Date: Thu, 2 Apr 2026 08:36:49 +0200 Subject: [PATCH] =?UTF-8?q?feat(data):=20migrate=20zarr=20v2=20=E2=86=92?= =?UTF-8?q?=20v3=20(3.1.6)=20with=20zero-dead-space=20itar=20writes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Migrate ncore data layer from zarr-python 2.x to zarr-python 3.1.6. New .zarr.itar files are written in zarr format 3; existing v2 .zarr.itar files remain readable (backwards compatible). Core changes: - Rewrite IndexedTarStore as zarr3 Store ABC implementation with async methods, byte range support, and consolidated metadata intercept - All zarr.json writes are deferred in memory and flushed once on close(), guaranteeing zero dead space in tar archives - Root zarr.json is intercepted at flush time and compressed as zarr.cbor.xz (CBOR+LZMA consolidated metadata format) - Transparent fallback for legacy v2 .zmetadata.cbor.xz on read - Update components.py to zarr3 APIs (create_array, attrs.update, group.members/groups, LocalStore, consolidated metadata) Cleanup: - Drop Python 3.8 support: remove toolchain, pip targets, lockfiles - Remove sys.version_info guards in types.py, base.py, transformations.py - Simplify dataclass decorators to unconditional slots=True, kw_only=True Dependencies: - zarr>=3.1.6, cbor2>=5.9.0, python_requires>=3.11 - Lockfile regeneration blocked by numpy<2 / torch constraint conflict --- MODULE.bazel | 11 +- deps/pip/BUILD.bazel | 15 - deps/pip/requirements_3_8.in | 25 - deps/pip/requirements_3_8.txt | 281 ---------- deps/pip/requirements_ncore.in | 13 +- ncore/BUILD.bazel | 10 +- ncore/impl/common/transformations.py | 4 +- ncore/impl/data/stores.py | 721 +++++++++++++++++++------- ncore/impl/data/stores_test.py | 109 ++-- ncore/impl/data/types.py | 23 +- ncore/impl/data/v4/components.py | 109 ++-- ncore/impl/data/v4/components_test.py | 13 +- ncore/impl/data_converter/base.py | 5 +- pyproject.toml | 2 +- 14 files changed, 660 insertions(+), 681 deletions(-) delete mode 100644 deps/pip/requirements_3_8.in delete mode 100644 deps/pip/requirements_3_8.txt diff --git a/MODULE.bazel b/MODULE.bazel index a1bc74b1..20575385 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -51,10 +51,7 @@ python.toolchain( is_default = True, python_version = "3.11", ) -python.toolchain( - python_version = "3.8", -) -use_repo(python, "python_3_11", "python_3_8", "python_versions") +use_repo(python, "python_3_11", "python_versions") pip = use_extension("@rules_python//python/extensions:pip.bzl", "pip") pip.parse( @@ -63,12 +60,6 @@ pip.parse( python_version = "3.11", requirements_lock = "//deps/pip:requirements_3_11.txt", ) -pip.parse( - experimental_index_url = "https://pypi.org/simple", # download wheels via bazel downloader instead of pip - hub_name = "ncore_pip_deps", - python_version = "3.8", - requirements_lock = "//deps/pip:requirements_3_8.txt", -) use_repo(pip, "ncore_pip_deps") ## rules_multitool (for ruff formatter) diff --git a/deps/pip/BUILD.bazel b/deps/pip/BUILD.bazel index fd49864d..ac3450fe 100644 --- a/deps/pip/BUILD.bazel +++ b/deps/pip/BUILD.bazel @@ -19,7 +19,6 @@ load("@rules_uv//uv:pip.bzl", "pip_compile") # Export lock files for use as constraints by other modules exports_files([ "requirements_3_11.txt", - "requirements_3_8.txt", ]) # Check that our compiled pip requirements are up-to-date @@ -48,25 +47,11 @@ pip_compile( requirements_txt = "requirements_3_11.txt", ) -pip_compile( - name = "requirements_3_8", - size = "medium", - args = PIP_COMPILE_ARGS, - data = [ - ":requirements_ncore.in", - ":requirements_tests.in", - ], - py3_runtime = "@python_3_8//:py3_runtime", - requirements_in = "requirements_3_8.in", - requirements_txt = "requirements_3_8.txt", -) - # Single command to update all requirements files (no order dependency) multirun( name = "update_all_requirements", commands = [ ":requirements_3_11", - ":requirements_3_8", ], keep_going = False, # Fail on error ) diff --git a/deps/pip/requirements_3_8.in b/deps/pip/requirements_3_8.in deleted file mode 100644 index e5c33b6d..00000000 --- a/deps/pip/requirements_3_8.in +++ /dev/null @@ -1,25 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Import common dependencies --r requirements_tests.in --r requirements_ncore.in - -# Public API dependencies for 3.8: intentionally mimic restrictive environments using very old numpy versions -numpy<1.20 # particularly older versions that don't ship numpy.typing type hints -zarr<=2.13.3 # more recent versions require numpy>=1.20 - ---find-links https://download.pytorch.org/whl/torch_stable.html -torch==1.12.1+cu116 diff --git a/deps/pip/requirements_3_8.txt b/deps/pip/requirements_3_8.txt deleted file mode 100644 index fc006fa4..00000000 --- a/deps/pip/requirements_3_8.txt +++ /dev/null @@ -1,281 +0,0 @@ -# This file was autogenerated by uv via the following command: -# bazel run @@//deps/pip:requirements_3_8 ---index-url https://pypi.org/simple ---find-links https://download.pytorch.org/whl/torch_stable.html - -asciitree==0.3.3 \ - --hash=sha256:4aa4b9b649f85e3fcb343363d97564aa1fb62e249677f2e18a96765145cc0f6e - # via zarr -cbor2==5.4.6 \ - --hash=sha256:0b956f19e93ba3180c336282cd1b6665631f2d3a196a9c19b29a833bf979e7a4 \ - --hash=sha256:0bd12c54a48949d11f5ffc2fa27f5df1b4754111f5207453e5fae3512ebb3cab \ - --hash=sha256:0d2b926b024d3a1549b819bc82fdc387062bbd977b0299dd5fa5e0ea3267b98b \ - --hash=sha256:1618d16e310f7ffed141762b0ff5d8bb6b53ad449406115cc465bf04213cefcf \ - --hash=sha256:181ac494091d1f9c5bb373cd85514ce1eb967a8cf3ec298e8dfa8878aa823956 \ - --hash=sha256:1835536e76ea16e88c934aac5e369ba9f93d495b01e5fa2d93f0b4986b89146d \ - --hash=sha256:1c12c0ab78f5bc290b08a79152a8621822415836a86f8f4b50dadba371736fda \ - --hash=sha256:24144822f8d2b0156f4cda9427f071f969c18683ffed39663dc86bc0a75ae4dd \ - --hash=sha256:309fffbb7f561d67f02095d4b9657b73c9220558701c997e9bfcfbca2696e927 \ - --hash=sha256:3316f09a77af85e7772ecfdd693b0f450678a60b1aee641bac319289757e3fa0 \ - --hash=sha256:3545b16f9f0d5f34d4c99052829c3726020a07be34c99c250d0df87418f02954 \ - --hash=sha256:39452c799453f5bf33281ffc0752c620b8bfa0b7c13070b87d370257a1311976 \ - --hash=sha256:3950be57a1698086cf26d8710b4e5a637b65133c5b1f9eec23967d4089d8cfed \ - --hash=sha256:456cdff668a50a52fdb8aa6d0742511e43ed46d6a5b463dba80a5a720fa0d320 \ - --hash=sha256:4b9f3924da0e460a93b3674c7e71020dd6c9e9f17400a34e52a88c0af2dcd2aa \ - --hash=sha256:4bbbdb2e3ef274865dc3f279aae109b5d94f4654aea3c72c479fb37e4a1e7ed7 \ - --hash=sha256:4ce1a2c272ba8523a55ea2f1d66e3464e89fa0e37c9a3d786a919fe64e68dbd7 \ - --hash=sha256:56dfa030cd3d67e5b6701d3067923f2f61536a8ffb1b45be14775d1e866b59ae \ - --hash=sha256:6709d97695205cd08255363b54afa035306d5302b7b5e38308c8ff5a47e60f2a \ - --hash=sha256:6e1b5aee920b6a2f737aa12e2b54de3826b09f885a7ce402db84216343368140 \ - --hash=sha256:6f9c702bee2954fffdfa3de95a5af1a6b1c5f155e39490353d5654d83bb05bb9 \ - --hash=sha256:78304df140b9e13b93bcbb2aecee64c9aaa9f1cadbd45f043b5e7b93cc2f21a2 \ - --hash=sha256:79e048e623846d60d735bb350263e8fdd36cb6195d7f1a2b57eacd573d9c0b33 \ - --hash=sha256:7bbd3470eb685325398023e335be896b74f61b014896604ed45049a7b7b6d8ac \ - --hash=sha256:80ac8ba450c7a41c5afe5f7e503d3092442ed75393e1de162b0bf0d97edf7c7f \ - --hash=sha256:9394ca49ecdf0957924e45d09a4026482d184a465a047f60c4044eb464c43de9 \ - --hash=sha256:94f844d0e232aca061a86dd6ff191e47ba0389ddd34acb784ad9a41594dc99a4 \ - --hash=sha256:96087fa5336ebfc94465c0768cd5de0fcf9af3840d2cf0ce32f5767855f1a293 \ - --hash=sha256:b893500db0fe033e570c3adc956af6eefc57e280026bd2d86fd53da9f1e594d7 \ - --hash=sha256:c285a2cb2c04004bfead93df89d92a0cef1874ad337d0cb5ea53c2c31e97bfdb \ - --hash=sha256:d2984a488f350aee1d54fa9cb8c6a3c1f1f5b268abbc91161e47185de4d829f3 \ - --hash=sha256:d54bd840b4fe34f097b8665fc0692c7dd175349e53976be6c5de4433b970daa4 \ - --hash=sha256:db9eb582fce972f0fa429d8159b7891ff8deccb7affc4995090afc61ce0d328a \ - --hash=sha256:e5094562dfe3e5583202b93ef7ca5082c2ba5571accb2c4412d27b7d0ba8a563 \ - --hash=sha256:e73ca40dd3c7210ff776acff9869ddc9ff67bae7c425b58e5715dcf55275163f \ - --hash=sha256:ff95b33e5482313a74648ca3620c9328e9f30ecfa034df040b828e476597d352 - # via -r deps/pip/requirements_ncore.in -dataclasses-json==0.5.14 \ - --hash=sha256:5ec6fed642adb1dbdb4182badb01e0861badfd8fda82e3b67f44b2d1e9d10d21 \ - --hash=sha256:d82896a94c992ffaf689cd1fafc180164e2abdd415b8f94a7f78586af5886236 - # via -r deps/pip/requirements_ncore.in -entrypoints==0.4 \ - --hash=sha256:b706eddaa9218a19ebcd67b56818f05bb27589b1ca9e8d797b74affad4ccacd4 \ - --hash=sha256:f174b5ff827504fd3cd97cc3f8649f3693f51538c7e4bdf3ef002c8429d42f9f - # via numcodecs -exceptiongroup==1.1.3 \ - --hash=sha256:097acd85d473d75af5bb98e41b61ff7fe35efe6675e4f9370ec6ec5126d160e9 \ - --hash=sha256:343280667a4585d195ca1cf9cef84a4e178c4b6cf2274caef9859782b567d5e3 - # via pytest -fasteners==0.18 \ - --hash=sha256:1d4caf5f8db57b0e4107d94fd5a1d02510a450dced6ca77d1839064c1bacf20c \ - --hash=sha256:cb7c13ef91e0c7e4fe4af38ecaf6b904ec3f5ce0dda06d34924b6b74b869d953 - # via zarr -fsspec==2025.3.0 \ - --hash=sha256:a935fd1ea872591f2b5148907d103488fc523295e6c64b835cfad8c3eca44972 \ - --hash=sha256:efb87af3efa9103f94ca91a7f8cb7a4df91af9f74fc106c9c7ea0efd7277c1b3 - # via universal-pathlib -iniconfig==2.0.0 \ - --hash=sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3 \ - --hash=sha256:b6a85871a79d2e3b22d2d1b94ac2824226a63c6b741c88f7ae975f18b6778374 - # via pytest -marshmallow==3.20.1 \ - --hash=sha256:5d2371bbe42000f2b3fb5eaa065224df7d8f8597bc19a1bbfa5bfe7fba8da889 \ - --hash=sha256:684939db93e80ad3561392f47be0230743131560a41c5110684c16e21ade0a5c - # via dataclasses-json -mypy-extensions==1.0.0 \ - --hash=sha256:4392f6c0eb8a5668a69e23d168ffa70f0be9ccfd32b5cc2d26a34ae5b844552d \ - --hash=sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782 - # via typing-inspect -numcodecs==0.11.0 \ - --hash=sha256:0c240858bf29e0ff254b1db60430e8b2658b8c8328b684f80033289d94807a7c \ - --hash=sha256:0fabc7dfdf64a9555bf8a34911e05b415793c67a1377207dc79cd96342291fa1 \ - --hash=sha256:11596b71267417425ea8afb407477a67d684f434c8b07b1dd59c25a97d5c3ccb \ - --hash=sha256:32697785b786bb0039d3feeaabdc10f25eda6c149700cde954653aaa47637832 \ - --hash=sha256:694dc2e80b1f169b7deb14bdd0a04b20e5f17ef32cb0f81b71ab690406ec6bd9 \ - --hash=sha256:6c058b321de84a1729299b0eae4d652b2e48ea1ca7f9df0da65cb13470e635eb \ - --hash=sha256:7dae3f5678f247336c84e7315a0c59a4fec7c33eb7db72d78ff5c776479a812e \ - --hash=sha256:8c2f36b21162c6ebccc05d3fe896f86b91dcf8709946809f730cc23a37f8234d \ - --hash=sha256:bd05cdb853c7bcfde2efc809a9df2c5e205b96f70405b810e5788b45d0d81f73 \ - --hash=sha256:bf3925eeb37aed0e6c04d7fb9614133a3c8426dc77f8bda54c99c601a44b3bd3 \ - --hash=sha256:c0bc116752be45b4f9dca4315e5a2b4185e3b46f68c997dbb84aef334ceb5a1d \ - --hash=sha256:c27dfca402f69fbfa01c46fb572086e77f38121192160cc8ed1177dc30702c52 \ - --hash=sha256:ee5bda16e9d26a7a39fc20b6c1cec23b4debc314df5cfae3ed505149c2eeafc4 - # via zarr -numpy==1.19.5 \ - --hash=sha256:012426a41bc9ab63bb158635aecccc7610e3eff5d31d1eb43bc099debc979d94 \ - --hash=sha256:06fab248a088e439402141ea04f0fffb203723148f6ee791e9c75b3e9e82f080 \ - --hash=sha256:0eef32ca3132a48e43f6a0f5a82cb508f22ce5a3d6f67a8329c81c8e226d3f6e \ - --hash=sha256:1ded4fce9cfaaf24e7a0ab51b7a87be9038ea1ace7f34b841fe3b6894c721d1c \ - --hash=sha256:2e55195bc1c6b705bfd8ad6f288b38b11b1af32f3c8289d6c50d47f950c12e76 \ - --hash=sha256:2ea52bd92ab9f768cc64a4c3ef8f4b2580a17af0a5436f6126b08efbd1838371 \ - --hash=sha256:36674959eed6957e61f11c912f71e78857a8d0604171dfd9ce9ad5cbf41c511c \ - --hash=sha256:384ec0463d1c2671170901994aeb6dce126de0a95ccc3976c43b0038a37329c2 \ - --hash=sha256:39b70c19ec771805081578cc936bbe95336798b7edf4732ed102e7a43ec5c07a \ - --hash=sha256:400580cbd3cff6ffa6293df2278c75aef2d58d8d93d3c5614cd67981dae68ceb \ - --hash=sha256:43d4c81d5ffdff6bae58d66a3cd7f54a7acd9a0e7b18d97abb255defc09e3140 \ - --hash=sha256:50a4a0ad0111cc1b71fa32dedd05fa239f7fb5a43a40663269bb5dc7877cfd28 \ - --hash=sha256:603aa0706be710eea8884af807b1b3bc9fb2e49b9f4da439e76000f3b3c6ff0f \ - --hash=sha256:6149a185cece5ee78d1d196938b2a8f9d09f5a5ebfbba66969302a778d5ddd1d \ - --hash=sha256:759e4095edc3c1b3ac031f34d9459fa781777a93ccc633a472a5468587a190ff \ - --hash=sha256:7fb43004bce0ca31d8f13a6eb5e943fa73371381e53f7074ed21a4cb786c32f8 \ - --hash=sha256:811daee36a58dc79cf3d8bdd4a490e4277d0e4b7d103a001a4e73ddb48e7e6aa \ - --hash=sha256:8b5e972b43c8fc27d56550b4120fe6257fdc15f9301914380b27f74856299fea \ - --hash=sha256:99abf4f353c3d1a0c7a5f27699482c987cf663b1eac20db59b8c7b061eabd7fc \ - --hash=sha256:a0d53e51a6cb6f0d9082decb7a4cb6dfb33055308c4c44f53103c073f649af73 \ - --hash=sha256:a12ff4c8ddfee61f90a1633a4c4afd3f7bcb32b11c52026c92a12e1325922d0d \ - --hash=sha256:a4646724fba402aa7504cd48b4b50e783296b5e10a524c7a6da62e4a8ac9698d \ - --hash=sha256:a76f502430dd98d7546e1ea2250a7360c065a5fdea52b2dffe8ae7180909b6f4 \ - --hash=sha256:a9d17f2be3b427fbb2bce61e596cf555d6f8a56c222bd2ca148baeeb5e5c783c \ - --hash=sha256:ab83f24d5c52d60dbc8cd0528759532736b56db58adaa7b5f1f76ad551416a1e \ - --hash=sha256:aeb9ed923be74e659984e321f609b9ba54a48354bfd168d21a2b072ed1e833ea \ - --hash=sha256:c843b3f50d1ab7361ca4f0b3639bf691569493a56808a0b0c54a051d260b7dbd \ - --hash=sha256:cae865b1cae1ec2663d8ea56ef6ff185bad091a5e33ebbadd98de2cfa3fa668f \ - --hash=sha256:cc6bd4fd593cb261332568485e20a0712883cf631f6f5e8e86a52caa8b2b50ff \ - --hash=sha256:cf2402002d3d9f91c8b01e66fbb436a4ed01c6498fffed0e4c7566da1d40ee1e \ - --hash=sha256:d051ec1c64b85ecc69531e1137bb9751c6830772ee5c1c426dbcfe98ef5788d7 \ - --hash=sha256:d6631f2e867676b13026e2846180e2c13c1e11289d67da08d71cacb2cd93d4aa \ - --hash=sha256:dbd18bcf4889b720ba13a27ec2f2aac1981bd41203b3a3b27ba7a33f88ae4827 \ - --hash=sha256:df609c82f18c5b9f6cb97271f03315ff0dbe481a2a02e56aeb1b1a985ce38e60 - # via - # -r deps/pip/requirements_3_8.in - # -r deps/pip/requirements_ncore.in - # numcodecs - # opencv-python-headless - # scipy - # zarr -opencv-python-headless==4.13.0.92 \ - --hash=sha256:0525a3d2c0b46c611e2130b5fdebc94cf404845d8fa64d2f3a3b679572a5bd22 \ - --hash=sha256:0bd48544f77c68b2941392fcdf9bcd2b9cdf00e98cb8c29b2455d194763cf99e \ - --hash=sha256:1a7d040ac656c11b8c38677cc8cccdc149f98535089dbe5b081e80a4e5903209 \ - --hash=sha256:3e0a6f0a37994ec6ce5f59e936be21d5d6384a4556f2d2da9c2f9c5dc948394c \ - --hash=sha256:5c8cfc8e87ed452b5cecb9419473ee5560a989859fe1d10d1ce11ae87b09a2cb \ - --hash=sha256:77a82fe35ddcec0f62c15f2ba8a12ecc2ed4207c17b0902c7a3151ae29f37fb6 \ - --hash=sha256:a7cf08e5b191f4ebb530791acc0825a7986e0d0dee2a3c491184bd8599848a4b \ - --hash=sha256:eb60e36b237b1ebd40a912da5384b348df8ed534f6f644d8e0b4f103e272ba7d - # via -r deps/pip/requirements_tests.in -packaging==23.1 \ - --hash=sha256:994793af429502c4ea2ebf6bf664629d07c1a9fe974af92966e4b8d2df7edc61 \ - --hash=sha256:a392980d2b6cffa644431898be54b0045151319d1e7ec34f0cfed48767dd334f - # via - # marshmallow - # pytest -parameterized==0.9.0 \ - --hash=sha256:4e0758e3d41bea3bbd05ec14fc2c24736723f243b28d702081aef438c9372b1b \ - --hash=sha256:7fc905272cefa4f364c1a3429cbbe9c0f98b793988efb5bf90aac80f08db09b1 - # via -r deps/pip/requirements_tests.in -pillow==10.0.0 \ - --hash=sha256:00e65f5e822decd501e374b0650146063fbb30a7264b4d2744bdd7b913e0cab5 \ - --hash=sha256:040586f7d37b34547153fa383f7f9aed68b738992380ac911447bb78f2abe530 \ - --hash=sha256:0b6eb5502f45a60a3f411c63187db83a3d3107887ad0d036c13ce836f8a36f1d \ - --hash=sha256:1ce91b6ec08d866b14413d3f0bbdea7e24dfdc8e59f562bb77bc3fe60b6144ca \ - --hash=sha256:1f62406a884ae75fb2f818694469519fb685cc7eaff05d3451a9ebe55c646891 \ - --hash=sha256:22c10cc517668d44b211717fd9775799ccec4124b9a7f7b3635fc5386e584992 \ - --hash=sha256:3400aae60685b06bb96f99a21e1ada7bc7a413d5f49bce739828ecd9391bb8f7 \ - --hash=sha256:349930d6e9c685c089284b013478d6f76e3a534e36ddfa912cde493f235372f3 \ - --hash=sha256:368ab3dfb5f49e312231b6f27b8820c823652b7cd29cfbd34090565a015e99ba \ - --hash=sha256:38250a349b6b390ee6047a62c086d3817ac69022c127f8a5dc058c31ccef17f3 \ - --hash=sha256:3a684105f7c32488f7153905a4e3015a3b6c7182e106fe3c37fbb5ef3e6994c3 \ - --hash=sha256:3a82c40d706d9aa9734289740ce26460a11aeec2d9c79b7af87bb35f0073c12f \ - --hash=sha256:3b08d4cc24f471b2c8ca24ec060abf4bebc6b144cb89cba638c720546b1cf538 \ - --hash=sha256:3ed64f9ca2f0a95411e88a4efbd7a29e5ce2cea36072c53dd9d26d9c76f753b3 \ - --hash=sha256:3f07ea8d2f827d7d2a49ecf1639ec02d75ffd1b88dcc5b3a61bbb37a8759ad8d \ - --hash=sha256:520f2a520dc040512699f20fa1c363eed506e94248d71f85412b625026f6142c \ - --hash=sha256:5c6e3df6bdd396749bafd45314871b3d0af81ff935b2d188385e970052091017 \ - --hash=sha256:608bfdee0d57cf297d32bcbb3c728dc1da0907519d1784962c5f0c68bb93e5a3 \ - --hash=sha256:685ac03cc4ed5ebc15ad5c23bc555d68a87777586d970c2c3e216619a5476223 \ - --hash=sha256:76de421f9c326da8f43d690110f0e79fe3ad1e54be811545d7d91898b4c8493e \ - --hash=sha256:76edb0a1fa2b4745fb0c99fb9fb98f8b180a1bbceb8be49b087e0b21867e77d3 \ - --hash=sha256:7be600823e4c8631b74e4a0d38384c73f680e6105a7d3c6824fcf226c178c7e6 \ - --hash=sha256:81ff539a12457809666fef6624684c008e00ff6bf455b4b89fd00a140eecd640 \ - --hash=sha256:88af2003543cc40c80f6fca01411892ec52b11021b3dc22ec3bc9d5afd1c5334 \ - --hash=sha256:8c11160913e3dd06c8ffdb5f233a4f254cb449f4dfc0f8f4549eda9e542c93d1 \ - --hash=sha256:8f8182b523b2289f7c415f589118228d30ac8c355baa2f3194ced084dac2dbba \ - --hash=sha256:9211e7ad69d7c9401cfc0e23d49b69ca65ddd898976d660a2fa5904e3d7a9baa \ - --hash=sha256:92be919bbc9f7d09f7ae343c38f5bb21c973d2576c1d45600fce4b74bafa7ac0 \ - --hash=sha256:9c82b5b3e043c7af0d95792d0d20ccf68f61a1fec6b3530e718b688422727396 \ - --hash=sha256:9f7c16705f44e0504a3a2a14197c1f0b32a95731d251777dcb060aa83022cb2d \ - --hash=sha256:9fb218c8a12e51d7ead2a7c9e101a04982237d4855716af2e9499306728fb485 \ - --hash=sha256:a74ba0c356aaa3bb8e3eb79606a87669e7ec6444be352870623025d75a14a2bf \ - --hash=sha256:b4f69b3700201b80bb82c3a97d5e9254084f6dd5fb5b16fc1a7b974260f89f43 \ - --hash=sha256:bc2ec7c7b5d66b8ec9ce9f720dbb5fa4bace0f545acd34870eff4a369b44bf37 \ - --hash=sha256:c189af0545965fa8d3b9613cfdb0cd37f9d71349e0f7750e1fd704648d475ed2 \ - --hash=sha256:c1fbe7621c167ecaa38ad29643d77a9ce7311583761abf7836e1510c580bf3dd \ - --hash=sha256:c7cf14a27b0d6adfaebb3ae4153f1e516df54e47e42dcc073d7b3d76111a8d86 \ - --hash=sha256:c9f72a021fbb792ce98306ffb0c348b3c9cb967dce0f12a49aa4c3d3fdefa967 \ - --hash=sha256:cd25d2a9d2b36fcb318882481367956d2cf91329f6892fe5d385c346c0649629 \ - --hash=sha256:ce543ed15570eedbb85df19b0a1a7314a9c8141a36ce089c0a894adbfccb4568 \ - --hash=sha256:ce7b031a6fc11365970e6a5686d7ba8c63e4c1cf1ea143811acbb524295eabed \ - --hash=sha256:d35e3c8d9b1268cbf5d3670285feb3528f6680420eafe35cccc686b73c1e330f \ - --hash=sha256:d50b6aec14bc737742ca96e85d6d0a5f9bfbded018264b3b70ff9d8c33485551 \ - --hash=sha256:d5d0dae4cfd56969d23d94dc8e89fb6a217be461c69090768227beb8ed28c0a3 \ - --hash=sha256:d5db32e2a6ccbb3d34d87c87b432959e0db29755727afb37290e10f6e8e62614 \ - --hash=sha256:d72e2ecc68a942e8cf9739619b7f408cc7b272b279b56b2c83c6123fcfa5cdff \ - --hash=sha256:d737a602fbd82afd892ca746392401b634e278cb65d55c4b7a8f48e9ef8d008d \ - --hash=sha256:d80cf684b541685fccdd84c485b31ce73fc5c9b5d7523bf1394ce134a60c6883 \ - --hash=sha256:db24668940f82321e746773a4bc617bfac06ec831e5c88b643f91f122a785684 \ - --hash=sha256:dbc02381779d412145331789b40cc7b11fdf449e5d94f6bc0b080db0a56ea3f0 \ - --hash=sha256:dffe31a7f47b603318c609f378ebcd57f1554a3a6a8effbc59c3c69f804296de \ - --hash=sha256:edf4392b77bdc81f36e92d3a07a5cd072f90253197f4a52a55a8cec48a12483b \ - --hash=sha256:efe8c0681042536e0d06c11f48cebe759707c9e9abf880ee213541c5b46c5bf3 \ - --hash=sha256:f31f9fdbfecb042d046f9d91270a0ba28368a723302786c0009ee9b9f1f60199 \ - --hash=sha256:f88a0b92277de8e3ca715a0d79d68dc82807457dae3ab8699c758f07c20b3c51 \ - --hash=sha256:faaf07ea35355b01a35cb442dd950d8f1bb5b040a7787791a535de13db15ed90 - # via -r deps/pip/requirements_ncore.in -pluggy==1.3.0 \ - --hash=sha256:cf61ae8f126ac6f7c451172cf30e3e43d3ca77615509771b3a984a0730651e12 \ - --hash=sha256:d89c696a773f8bd377d18e5ecda92b7a3793cbe66c87060a6fb58c7b6e1061f7 - # via pytest -pytest==7.4.0 \ - --hash=sha256:78bf16451a2eb8c7a2ea98e32dc119fd2aa758f1d5d66dbf0a59d69a3969df32 \ - --hash=sha256:b4bf8c45bd59934ed84001ad51e11b4ee40d40a1229d2c79f9c592b0a3f6bd8a - # via -r deps/pip/requirements_tests.in -scipy==1.10.1 \ - --hash=sha256:049a8bbf0ad95277ffba9b3b7d23e5369cc39e66406d60422c8cfef40ccc8415 \ - --hash=sha256:07c3457ce0b3ad5124f98a86533106b643dd811dd61b548e78cf4c8786652f6f \ - --hash=sha256:0f1564ea217e82c1bbe75ddf7285ba0709ecd503f048cb1236ae9995f64217bd \ - --hash=sha256:1553b5dcddd64ba9a0d95355e63fe6c3fc303a8fd77c7bc91e77d61363f7433f \ - --hash=sha256:15a35c4242ec5f292c3dd364a7c71a61be87a3d4ddcc693372813c0b73c9af1d \ - --hash=sha256:1b4735d6c28aad3cdcf52117e0e91d6b39acd4272f3f5cd9907c24ee931ad601 \ - --hash=sha256:2cf9dfb80a7b4589ba4c40ce7588986d6d5cebc5457cad2c2880f6bc2d42f3a5 \ - --hash=sha256:39becb03541f9e58243f4197584286e339029e8908c46f7221abeea4b749fa88 \ - --hash=sha256:43b8e0bcb877faf0abfb613d51026cd5cc78918e9530e375727bf0625c82788f \ - --hash=sha256:4b3f429188c66603a1a5c549fb414e4d3bdc2a24792e061ffbd607d3d75fd84e \ - --hash=sha256:4c0ff64b06b10e35215abce517252b375e580a6125fd5fdf6421b98efbefb2d2 \ - --hash=sha256:51af417a000d2dbe1ec6c372dfe688e041a7084da4fdd350aeb139bd3fb55353 \ - --hash=sha256:5678f88c68ea866ed9ebe3a989091088553ba12c6090244fdae3e467b1139c35 \ - --hash=sha256:79c8e5a6c6ffaf3a2262ef1be1e108a035cf4f05c14df56057b64acc5bebffb6 \ - --hash=sha256:7ff7f37b1bf4417baca958d254e8e2875d0cc23aaadbe65b3d5b3077b0eb23ea \ - --hash=sha256:aaea0a6be54462ec027de54fca511540980d1e9eea68b2d5c1dbfe084797be35 \ - --hash=sha256:bce5869c8d68cf383ce240e44c1d9ae7c06078a9396df68ce88a1230f93a30c1 \ - --hash=sha256:cd9f1027ff30d90618914a64ca9b1a77a431159df0e2a195d8a9e8a04c78abf9 \ - --hash=sha256:d925fa1c81b772882aa55bcc10bf88324dadb66ff85d548c71515f6689c6dac5 \ - --hash=sha256:e7354fd7527a4b0377ce55f286805b34e8c54b91be865bac273f527e1b839019 \ - --hash=sha256:fae8a7b898c42dffe3f7361c40d5952b6bf32d10c4569098d276b4c547905ee1 - # via -r deps/pip/requirements_ncore.in -tomli==2.0.1 \ - --hash=sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc \ - --hash=sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f - # via pytest -torch==1.12.1+cu116 \ - --hash=sha256:7725420dabebfcaf44984edce3283eea91f98f0f7d5874bc68c7a164bd8126e3 \ - --hash=sha256:832effad8b21109700323a5aa137a2e4bdea711dac3d8491ff542f798dab0101 \ - --hash=sha256:84f031e4ee25d95368d7531aa58e79da9808d3fa53b4b363ea03a2450b6fd0af \ - --hash=sha256:b6bc31244aa2818929fbb30c483c221df471e9d856e805c5a1ff72b131ae9e7b \ - --hash=sha256:b8e8906e770bcad12e67c269e1bcdd7661a8abd96519a4ba643e86440bbcc1bf \ - --hash=sha256:bca5a77071d7eb901beb775648b125e6d9279f231d1f23e56530b5a189df8975 \ - --hash=sha256:dda312901220895087cc83d3665464a3dc171d04460c61c31af463efbfb54896 \ - --hash=sha256:fc9b4786ec54be67eaa8b0c7c9999e2f4ae2b89a1c18e41de1515a190440c691 - # via - # -r deps/pip/requirements_3_8.in - # -r deps/pip/requirements_ncore.in -typing-extensions==4.7.1 \ - --hash=sha256:440d5dd3af93b060174bf433bccd69b0babc3b15b1a8dca43789fd7f61514b36 \ - --hash=sha256:b75ddc264f0ba5615db7ba217daeb99701ad295353c45f9e95963337ceeeffb2 - # via - # -r deps/pip/requirements_ncore.in - # torch - # typing-inspect -typing-inspect==0.9.0 \ - --hash=sha256:9ee6fc59062311ef8547596ab6b955e1b8aa46242d854bfc78f4f6b0eff35f9f \ - --hash=sha256:b23fc42ff6f6ef6954e4852c1fb512cdd18dbea03134f91f856a95ccc9461f78 - # via dataclasses-json -universal-pathlib==0.2.6 \ - --hash=sha256:50817aaeaa9f4163cb1e76f5bdf84207fa05ce728b23fd779479b3462e5430ac \ - --hash=sha256:700dec2b58ef34b87998513de6d2ae153b22f083197dfafb8544744edabd1b18 - # via -r deps/pip/requirements_ncore.in -zarr==2.13.3 \ - --hash=sha256:883305e8ded972e25992269b0355436f11d7057b2943d278bf33cdcd2debfe2d \ - --hash=sha256:db24b090616c638f65e33a6bc5d956d642221182961515ccbc28b17fb0d0b48c - # via - # -r deps/pip/requirements_3_8.in - # -r deps/pip/requirements_ncore.in diff --git a/deps/pip/requirements_ncore.in b/deps/pip/requirements_ncore.in index 45a67d43..4a78668e 100644 --- a/deps/pip/requirements_ncore.in +++ b/deps/pip/requirements_ncore.in @@ -18,18 +18,15 @@ numpy dataclasses_json>=0.2.12 # deserialization fails with earlier versions pillow -# zarr version rationales: -# - 2.17.0 contains crucial perf improvements ["Cache result of FSStore._fsspec_installed()" https://github.com/zarr-developers/zarr-python/pull/1581] -# - 2.12.0 is the last version supporting python3.8 with old numpy versions, which we still want to support for now -# - we can't switch to zarr 3 yet unconditionally as it's not available for python3.8 anymore -zarr>=2.12.0,<3.0.0 +# zarr version rationale: +# - 3.1.6 is the zarr3 release used for the zarr v2→v3 migration +# - Python 3.8 support was dropped; zarr3 requires Python >=3.11 +zarr>=3.1.6 # cbor2 version rationale: # - 5.9.0 fixes CWE-674 / CVE-2026-26209 (uncontrolled recursion DoS) # - 5.8.0 fixes CVE-2025-68131 (shared value info disclosure) -# - 5.8.0+ dropped Python 3.8 support; last 3.8-compatible version is 5.7.0 -cbor2>=5.9.0; python_version>="3.9" -cbor2; python_version<"3.9" +cbor2>=5.9.0 scipy torch typing-extensions diff --git a/ncore/BUILD.bazel b/ncore/BUILD.bazel index 4dc01400..7595bd6f 100644 --- a/ncore/BUILD.bazel +++ b/ncore/BUILD.bazel @@ -48,18 +48,16 @@ py_wheel( homepage = "https://github.com/NVIDIA/ncore", license = "Apache-2.0", platform = "any", - python_requires = ">=3.8", + python_requires = ">=3.11", python_tag = "py3", requires = [ "numpy", # dataclasses_json version restriction rationale: deserialization fails with earlier versions "dataclasses_json>=0.2.12", "pillow", - # zarr version restrictions / suggestions rationale: - # - 2.17.0 contains crucial perf improvements ["Cache result of FSStore._fsspec_installed()" https://github.com/zarr-developers/zarr-python/pull/1581] - # - 2.12.0 is the last version supporting python3.8 with old numpy versions, which we still want to support for now - # - we can't switch to zarr 3 yet unconditionally as it's not available for python3.8 anymore - "zarr>=2.12.0,<3.0.0", + # zarr version rationale: + # - 3.1.6 is the zarr3 release used for the zarr v2→v3 migration + "zarr>=3.1.6", "cbor2", "scipy", "torch", diff --git a/ncore/impl/common/transformations.py b/ncore/impl/common/transformations.py index a6877591..30431266 100644 --- a/ncore/impl/common/transformations.py +++ b/ncore/impl/common/transformations.py @@ -15,8 +15,6 @@ from __future__ import annotations -import sys - from dataclasses import dataclass from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Union @@ -62,7 +60,7 @@ def time_bounds(timestamps_us: List[int], seek_sec: Optional[float], duration_se return start_timestamp_us, end_timestamp_us -@dataclass(**({"slots": True, "frozen": True} if sys.version_info >= (3, 10) else {"frozen": True})) +@dataclass(slots=True, frozen=True) class HalfClosedInterval: """Represents a half closed interval [start, stop) of integers""" diff --git a/ncore/impl/data/stores.py b/ncore/impl/data/stores.py index 37791596..345d9c68 100644 --- a/ncore/impl/data/stores.py +++ b/ncore/impl/data/stores.py @@ -13,35 +13,63 @@ # See the License for the specific language governing permissions and # limitations under the License. +"""Zarr store implementations and utilities for ncore data storage. + +Backwards Compatibility with v2 .zarr.itar Files +================================================= + +Files written by ncore versions prior to the zarr3 migration use zarr format v2 +with custom compressed consolidated metadata stored under the ``.zmetadata.cbor.xz`` +key (CBOR-encoded, LZMA-compressed). + +The :class:`IndexedTarStore` transparently handles reading these legacy files: + +- **Non-consolidated reads** (``zarr.open``): zarr3 auto-detects format v2 and reads + ``.zarray`` / ``.zgroup`` / ``.zattrs`` keys directly from the tar index. +- **Consolidated reads** (``zarr.open_consolidated``): The store intercepts requests + for ``zarr.json`` and checks for compressed consolidated metadata: + + 1. ``zarr.cbor.xz`` (new v3 format) -- decompress CBOR+LZMA, return as JSON + 2. ``.zmetadata.cbor.xz`` (old v2 format) -- decompress, return v2 metadata dict + +- :func:`open_store` provides a fallback chain: if consolidated read fails, falls back + to non-consolidated ``zarr.open()``. + +New files are written in zarr format v3 with native zarr3 consolidation, +compressed via the same CBOR+LZMA scheme into ``zarr.cbor.xz``. +""" + from __future__ import annotations import io +import json import logging import lzma import os import struct import tarfile +import threading +from collections.abc import AsyncIterator, Iterable from dataclasses import dataclass, field from enum import IntEnum, auto, unique from pathlib import Path -from threading import RLock -from typing import IO, Any, Dict, Iterator, Literal, NamedTuple, Union +from typing import IO, Any, Dict, Literal, NamedTuple, Union import cbor2 import zarr -from numcodecs import compat +from zarr.abc.store import ByteRequest, OffsetByteRequest, RangeByteRequest, Store, SuffixByteRequest +from zarr.core.buffer import Buffer, BufferPrototype + from upath import UPath -from zarr._storage.store import Store -from zarr.util import json_loads _logger = logging.getLogger(__name__) class IndexedTarStore(Store): - """A zarr store over *indexed* tar files + """A zarr store over *indexed* tar files. Parameters ---------- @@ -57,13 +85,32 @@ class also supports the context manager protocol, which ensures the ``close()`` method is called on leaving the context, e.g.:: >>> with IndexedTarStore('data/array.itar', mode='w') as store: - ... z = zarr.zeros((10, 10), chunks=(5, 5), store=store) - ... z[...] = 42 + ... z = zarr.open_group(store=store, mode='w', zarr_format=3) + ... z.create_array('data', data=np.zeros((10, 10))) ... # no need to call store.close() + Thread safety + ------------- + A ``threading.RLock`` protects the shared tar file handle. This is needed because: + + 1. zarr3's async event loop runs store coroutines sequentially (no lock needed for + that alone), but ``reload_resources()`` may be called from user threads + concurrently with zarr reads, creating a race on the shared file handle. + 2. If ``asyncio.to_thread`` is adopted in the future for non-blocking I/O, + concurrent thread access becomes real. The RLock handles this already. """ - _erasable = False + supports_writes: bool = True + supports_deletes: bool = False + supports_partial_writes: bool = False + supports_listing: bool = True + + itar_path: Path | UPath + + _tar_file: tarfile.TarFile + _index: TarRecordIndex + _lock: threading.RLock + _deferred_zarr_json: Dict[str, bytes] @dataclass class TarRecord: @@ -80,156 +127,455 @@ class TarRecordIndex: def __init__(self, itar_path: Union[str, Path, UPath], mode: Literal["r", "w"] = "r"): if mode not in ["r", "w"]: - raise ValueError("TarRecordIndex: only r/w modes supported") - - # store properties - self.mode = mode + raise ValueError("IndexedTarStore: only r/w modes supported") - # Current understanding is that tarfile module in stdlib is not thread-safe, - # and so locking is required for both read and write. However, this has not - # been investigated in detail, perhaps no lock is needed if mode='r'. - self.mutex = RLock() + super().__init__(read_only=mode == "r") - # convert str / Path to absolute UPath uncondtionally + # store properties itar_upath = UPath(itar_path) if itar_upath.protocol == "": # use UPath-internal `file://` protocol for local files itar_upath = UPath("file://" + str(itar_upath)) - self.itar_upath = itar_upath.absolute() + self.itar_path = itar_upath.absolute() + self._mode = mode - # open file object and tar file (require file to be both writeable and readable when writing) - self.tar_file_object: IO[Any] - if self.mode == "r": - self.tar_file_object = self.itar_upath.open("rb") - else: - # universal_path for Python 3.8 (<=0.2.6) doesn't expose a - # write/read mode in it's static type-hints, although "wb+" is still accepted - # if the FS supports it, so ignore type-checker here - self.tar_file_object = self.itar_upath.open("wb+") # type: ignore[call-overload] + def _sync_open(self) -> None: + """Eagerly open the tar file and load/initialize the index.""" + if self._is_open: + raise ValueError("store is already open") - self.tar_file = tarfile.TarFile(fileobj=self.tar_file_object, mode=self.mode) + self._lock = threading.RLock() + self._deferred_zarr_json = {} - # init / load index table - if self.mode == "r": - self.index = self._load_tar_index(self.tar_file_object) + # Open file object and tar file (require file to be both writeable and readable when writing) + self._tar_file_object: IO[Any] + if self._mode == "r": + self._tar_file_object = self.itar_path.open("rb") else: - self.index = self.TarRecordIndex() - - def __delitem__(self, _: str): - raise NotImplementedError("Deleting items is not supported") - - def __iter__(self) -> Iterator[str]: - with self.mutex: - return iter(self.index.records.keys()) - - def __len__(self) -> int: - with self.mutex: - return len(self.index.records) + self._tar_file_object = self.itar_path.open("wb+") # type: ignore[call-overload] - def __contains__(self, item: object) -> bool: - with self.mutex: - return item in self.index.records + self._tar_file = tarfile.TarFile(fileobj=self._tar_file_object, mode=self._mode) - def __getitem__(self, item: str) -> bytes: - with self.mutex: - # Query index for file record - record = self.index.records[item] # raises KeyError if not in archive - - # Remember current tar file position - current_position = self.tar_file_object.tell() + # init / load index table + if self._mode == "r": + self._index = self._load_tar_index(self._tar_file_object) + else: + self._index = self.TarRecordIndex() + + self._is_open = True + + async def _open(self) -> None: + self._sync_open() + + def _ensure_open_sync(self) -> None: + """Lazily open the store on first I/O if not already open (sync version). + + This is used by sync internal methods (_get, _set) that may be called + before the async _open() has been awaited (e.g., from close() or reload_resources()). + """ + if not self._is_open: + self._sync_open() + + def __eq__(self, other: object) -> bool: + return isinstance(other, type(self)) and self.itar_path == other.itar_path + + # ------------------------------------------------------------------------- + # Read operations + # ------------------------------------------------------------------------- + + @staticmethod + def _apply_byte_range_to_bytes(data: bytes, byte_range: ByteRequest) -> bytes: + """Apply a :class:`ByteRequest` to an in-memory ``bytes`` object.""" + if isinstance(byte_range, RangeByteRequest): + return data[byte_range.start : byte_range.end] + elif isinstance(byte_range, OffsetByteRequest): + return data[byte_range.offset :] + elif isinstance(byte_range, SuffixByteRequest): + return data[-byte_range.suffix :] if byte_range.suffix > 0 else b"" + else: + raise TypeError(f"Unexpected byte_range type, got {type(byte_range)}.") + + def _get( + self, + key: str, + prototype: BufferPrototype, + byte_range: ByteRequest | None = None, + ) -> Buffer | None: + """Synchronous get implementation. Must be called under ``self._lock``. + + Checks the in-memory ``_deferred_zarr_json`` overlay first for buffered + ``zarr.json`` writes that have not yet been flushed to the tar archive + (see :meth:`_set` for details). + + Then handles transparent decompression of consolidated metadata: + + - When ``key == "zarr.json"`` and a ``zarr.cbor.xz`` record exists, the + compressed consolidated metadata is decompressed (LZMA -> CBOR -> JSON) + and returned. This is the path for **new v3 itar files**. + - When ``key == "zarr.json"`` and a ``.zmetadata.cbor.xz`` record exists, + the **legacy v2 compressed consolidated metadata** is decompressed and + returned as-is (the v2 metadata dict). This enables backwards-compatible + reading of old v2 itar files via ``zarr.open_consolidated()``. + """ + self._ensure_open_sync() + + # Check the deferred zarr.json overlay first (write-back buffer for + # child node metadata that zarr3 may rewrite multiple times). + if key in self._deferred_zarr_json: + value = self._deferred_zarr_json[key] + if byte_range is not None: + value = self._apply_byte_range_to_bytes(value, byte_range) + return prototype.buffer.from_bytes(value) + + try: + # Handle consolidated metadata intercept for zarr.json + consolidated_metadata = False + legacy_v2_consolidated = False + + if key == "zarr.json": + if (record := self._index.records.get("zarr.cbor.xz")) is not None: + # New v3 compressed consolidated metadata + consolidated_metadata = True + assert byte_range is None, "Byte range not supported for consolidated metadata" + elif (record := self._index.records.get(".zmetadata.cbor.xz")) is not None: + # Legacy v2 compressed consolidated metadata -- backwards compat + legacy_v2_consolidated = True + assert byte_range is None, "Byte range not supported for consolidated metadata" + else: + # Regular key lookup + record = self._index.records[key] + else: + # Regular key lookup + record = self._index.records[key] + except KeyError: + return None + + fileobj = self._tar_file_object + + # Remember current tar file position + current_position = fileobj.tell() + + # Read the value depending on the byte_range + if byte_range is None: + fileobj.seek(record.offset_data) + value = fileobj.read(record.size) + elif isinstance(byte_range, RangeByteRequest): + fileobj.seek(record.offset_data + byte_range.start) + value = fileobj.read(byte_range.end - byte_range.start) + elif isinstance(byte_range, OffsetByteRequest): + fileobj.seek(record.offset_data + byte_range.offset) + value = fileobj.read(record.size - byte_range.offset) + elif isinstance(byte_range, SuffixByteRequest): + fileobj.seek(max(0, record.offset_data + record.size - byte_range.suffix)) + value = fileobj.read(byte_range.suffix) + else: + raise TypeError(f"Unexpected byte_range, got {byte_range}.") + + if consolidated_metadata: + # Decompress new v3 compressed consolidated metadata (LZMA -> CBOR -> JSON) + meta = cbor2.loads(lzma.LZMADecompressor().decompress(value)) + + consolidated_format = meta.get("zarr_consolidated_format", None) + if consolidated_format != 1: + raise zarr.errors.MetadataError( + "unsupported zarr consolidated metadata format: %s" % consolidated_format + ) + + # Return the inner metadata dict as JSON bytes (expected zarr.json format) + value = json.dumps(meta["metadata"]).encode("utf-8") + + elif legacy_v2_consolidated: + # Decompress legacy v2 compressed consolidated metadata + # and return the raw metadata dict -- zarr.open_consolidated() will + # fail to parse this as v3 metadata, triggering the fallback in open_store() + meta = cbor2.loads(lzma.LZMADecompressor().decompress(value)) + + consolidated_format = meta.get("zarr_consolidated_format", None) + if consolidated_format != 1: + raise zarr.errors.MetadataError( + "unsupported zarr consolidated metadata format: %s" % consolidated_format + ) + + # Return raw v2 metadata as JSON -- this won't parse as v3 consolidated + # metadata, but the fallback in open_store() handles this correctly + value = json.dumps(meta["metadata"]).encode("utf-8") + + ret = prototype.buffer.from_bytes(value) + + # Return tar file to previous location + fileobj.seek(current_position) + + return ret + + async def get( + self, + key: str, + prototype: BufferPrototype, + byte_range: ByteRequest | None = None, + ) -> Buffer | None: + assert isinstance(key, str) + + with self._lock: + return self._get(key, prototype=prototype, byte_range=byte_range) + + async def get_partial_values( + self, + prototype: BufferPrototype, + key_ranges: Iterable[tuple[str, ByteRequest | None]], + ) -> list[Buffer | None]: + out = [] + + with self._lock: + for key, byte_range in key_ranges: + out.append(self._get(key, prototype=prototype, byte_range=byte_range)) + + return out + + async def exists(self, key: str) -> bool: + self._ensure_open_sync() + with self._lock: + return key in self._deferred_zarr_json or key in self._index.records + + # ------------------------------------------------------------------------- + # Write operations + # ------------------------------------------------------------------------- + + def _set(self, key: str, value: Buffer) -> None: + """Synchronous set implementation. Must be called under ``self._lock``. + + Deferred-write strategy for ``zarr.json`` keys + ----------------------------------------------- + **All** ``zarr.json`` writes (both first-write and subsequent overwrites) + are buffered in memory (``_deferred_zarr_json``) and only the final + version is materialized to the tar when the store is closed (see + :meth:`close` and :meth:`_flush_deferred`). + + This guarantees **zero dead space** in the tar archive regardless of how + many times zarr3 rewrites a node's metadata during a session. zarr3 + writes a ``zarr.json`` once on ``create_group()`` / ``create_array()`` + and then again every time ``group.attrs.update()`` is called -- deferring + the first write as well means neither version reaches the tar until + close, when only the final version is written. + + The **root** ``zarr.json`` is also deferred. During + :meth:`_flush_deferred`, it is intercepted and compressed as + ``zarr.cbor.xz`` (CBOR+LZMA) -- the consolidated-metadata format used + by ncore itar files. + + All other (non ``zarr.json``) keys remain strictly write-once. + """ + self._ensure_open_sync() + + # --- zarr.json keys: always defer (first-write OR overwrite) ---------- + if key == "zarr.json" or key.endswith("/zarr.json"): + _logger.debug(f"IndexedTarStore: deferring write of {key}") + self._deferred_zarr_json[key] = value.to_bytes() + return + + # --- Non zarr.json keys: write-once ----------------------------------- + key_exists = key in self._index.records or key in self._deferred_zarr_json + if key_exists: + raise ValueError( + f"{key} already exists and is not a zarr.json metadata key; " + f"overwriting non-metadata keys is not supported in itar format" + ) - # Read the value - self.tar_file_object.seek(record.offset_data) - value = self.tar_file_object.read(record.size) + value_bytes: bytes = value.to_bytes() + self._write_to_tar(key, value_bytes) - # Return tar file to previous location - self.tar_file_object.seek(current_position) + async def set(self, key: str, value: Buffer) -> None: + self._check_writable() + self._ensure_open_sync() - return value + assert isinstance(key, str) - def __setitem__(self, item: str, value): - if self.mode != "w": - raise zarr.errors.ReadOnlyError + if not isinstance(value, Buffer): + raise TypeError( + f"IndexedTarStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead." + ) - with self.mutex: - if item in self.index.records: - raise ValueError(f"{item} already exists, update is not supported") + with self._lock: + self._set(key, value) + + async def set_partial_values(self, key_start_values: Iterable[tuple[str, int, bytes]]) -> None: + raise NotImplementedError + + async def delete(self, key: str) -> None: + # Only raise if the key actually exists -- allows zarr APIs to avoid overhead + self._check_writable() + if await self.exists(key): + raise NotImplementedError("Deleting items is not supported by IndexedTarStore") + + # ------------------------------------------------------------------------- + # Listing operations + # ------------------------------------------------------------------------- + + async def list(self) -> AsyncIterator[str]: + self._ensure_open_sync() + with self._lock: + # Yield keys from the tar index + for key in self._index.records.keys(): + yield key + # Yield deferred zarr.json keys not yet in the tar index + for key in self._deferred_zarr_json: + if key not in self._index.records: + yield key + + async def list_prefix(self, prefix: str) -> AsyncIterator[str]: + async for key in self.list(): + if key.startswith(prefix): + yield key + + async def list_dir(self, prefix: str) -> AsyncIterator[str]: + prefix = prefix.rstrip("/") + + self._ensure_open_sync() + # Merge keys from tar index and deferred buffer + all_keys = set(self._index.records.keys()) | set(self._deferred_zarr_json.keys()) + seen: set[str] = set() + + if prefix == "": + for key in all_keys: + top = key.split("/")[0] + if top not in seen: + seen.add(top) + yield top + else: + for key in all_keys: + if key.startswith(prefix + "/") and key.strip("/") != prefix: + k = key.removeprefix(prefix + "/").split("/")[0] + if k not in seen: + seen.add(k) + yield k + + # ------------------------------------------------------------------------- + # Lifecycle + # ------------------------------------------------------------------------- + + def _flush_deferred(self) -> None: + """Write all deferred ``zarr.json`` entries to the tar archive. + + Must be called under ``self._lock`` and BEFORE ``self._tar_file.close()`` + (which finalizes the tar with two empty 512-byte blocks). + + Because ALL ``zarr.json`` writes are deferred (both first-write and + overwrites), each key is written to the tar exactly **once** -- the + final version. This guarantees zero dead space in the archive. + + The root ``zarr.json`` receives special treatment: instead of writing + it as-is, its content is intercepted and compressed as ``zarr.cbor.xz`` + (CBOR + LZMA) -- the consolidated-metadata format used by ncore itar + files. The plain ``zarr.json`` key is NOT written to the tar for the + root; only ``zarr.cbor.xz`` is. + """ + if not self._deferred_zarr_json: + return + + # Handle root zarr.json consolidation intercept: compress as zarr.cbor.xz + root_zarr_json = self._deferred_zarr_json.pop("zarr.json", None) + if root_zarr_json is not None: + consolidated = { + "zarr_consolidated_format": 1, + "metadata": json.loads(root_zarr_json), + } + + with io.BytesIO() as buffer: + with lzma.open(buffer, "wb") as lzma_file: + cbor2.dump(consolidated, lzma_file) + compressed_bytes = buffer.getvalue() + + # Write zarr.cbor.xz to the tar (not zarr.json) + self._write_to_tar("zarr.cbor.xz", compressed_bytes) + + # Write remaining (non-root) zarr.json entries + for key, value_bytes in self._deferred_zarr_json.items(): + self._write_to_tar(key, value_bytes) + + _logger.debug( + "IndexedTarStore: flushed %d deferred zarr.json entries%s", + len(self._deferred_zarr_json) + (1 if root_zarr_json is not None else 0), + " (including root zarr.json → zarr.cbor.xz)" if root_zarr_json is not None else "", + ) + self._deferred_zarr_json.clear() - value_bytes: bytes = compat.ensure_bytes(value) - value_size: int = len(value_bytes) + def _write_to_tar(self, key: str, value_bytes: bytes) -> None: + """Append a single key/value pair to the tar archive and update the index. - # Remember current tar file position, which is the start of the header - header_start_position = self.tar_file_object.tell() + Must be called under ``self._lock``. + """ + value_size = len(value_bytes) - # Store value in tar-file (will pre-pend a potentially *multi*-block header depending on item path-lengths) - tarinfo = tarfile.TarInfo(item) - tarinfo.size = value_size + # Current position is the start of the new header + header_start = self._tar_file_object.tell() - self.tar_file.addfile(tarinfo, fileobj=io.BytesIO(value_bytes)) + tarinfo = tarfile.TarInfo(key) + tarinfo.size = value_size + self._tar_file.addfile(tarinfo, fileobj=io.BytesIO(value_bytes)) - # End position after writing both header and payload - end_position = self.tar_file_object.tell() + end_position = self._tar_file_object.tell() - # Determine the effective value's payload size as a multiple of blocksize - payload_size = value_size - if remainder := payload_size % tarfile.BLOCKSIZE: - payload_size += tarfile.BLOCKSIZE - remainder + # Compute effective payload size (rounded up to block boundary) + payload_size = value_size + if remainder := payload_size % tarfile.BLOCKSIZE: + payload_size += tarfile.BLOCKSIZE - remainder - # Determine the effective header-size (can be multiple blocks for long path names) - header_size = end_position - header_start_position - payload_size + header_size = end_position - header_start - payload_size - # Construct record from reconstructed size-information - record = self.TarRecord( - # Effective start of the data in the tar file (current tar file position + header-size) - header_start_position + header_size, - # Length of the data - value_size, - ) + # Update the index to point to the newly written data + self._index.records[key] = self.TarRecord( + offset_data=header_start + header_size, + size=value_size, + ) - self.index.records[item] = record + def close(self) -> None: + """Needs to be called after finishing updating the store. - def __enter__(self): - return self + Flushes deferred ``zarr.json`` writes, saves the tar index (if + writing), closes the tar file and underlying file object, and marks the + store as closed via the zarr3 lifecycle. + """ + if self._is_open: + with self._lock: + if self._mode == "w": + # Flush deferred zarr.json overwrites BEFORE closing the + # tar (which appends two finishing 512-byte blocks). + self._flush_deferred() - def __exit__(self, exc_type, exc_value, traceback): - self.close() + # Closing the tar file appends two finishing blocks to the end of the file + # if in write mode, but doesn't close the internal file object yet + self._tar_file.close() - def close(self): - """Needs to be called after finishing updating the store""" - with self.mutex: - # Closing the tar file appends two finishing blocks to the end of the file - # if in write mode, but doesn't close the internal file object yet - self.tar_file.close() + if self._mode == "w": + # Add index if writing + self._save_tar_index(self._tar_file_object, self._index) - if self.mode == "w": - # Add index if writing - self._save_tar_index(self.tar_file_object, self.index) + self._tar_file_object.close() - self.tar_file_object.close() + super().close() - def reload_resources(self): + def reload_resources(self) -> None: """Reloads the tar file object *only* - useful to re-initialize the store in multi-process 'fork()' settings""" - with self.mutex: - # get current tar file path and seek positions, and close file object - current_position = self.tar_file_object.tell() - self.tar_file_object.close() + with self._lock: + # get current seek positions and close file object + current_position = self._tar_file_object.tell() + self._tar_file_object.close() # reload file object (require file to be both writeable and readable when writing) - if self.mode == "r": - self.tar_file_object = self.itar_upath.open("rb") + if self._mode == "r": + self._tar_file_object = self.itar_path.open("rb") else: - # universal_path for Python 3.8 (<=0.2.6) doesn't expose a - # write/read mode in it's static type-hints, although "wb+" is still accepted - # if the FS supports it, so ignore type-checker here - self.tar_file_object = self.itar_upath.open("wb+") # type: ignore[call-overload] + self._tar_file_object = self.itar_path.open("wb+") # type: ignore[call-overload] - self.tar_file.fileobj = self.tar_file_object + self._tar_file.fileobj = self._tar_file_object # seek to previous position - self.tar_file_object.seek(current_position) + self._tar_file_object.seek(current_position) + + # ------------------------------------------------------------------------- + # Index serialization + # ------------------------------------------------------------------------- - # Methods / constants for storing index header and payload INDEX_HEADER_MAGIC = b"itar" # Index header binary format @@ -291,10 +637,10 @@ def _load_tar_index(cls, tar_file_object: IO[Any]) -> TarRecordIndex: return cls.TarRecordIndex({item: cls.TarRecord(offset_datas[i], sizes[i]) for i, item in enumerate(items)}) @classmethod - def _save_tar_index(cls, tar_file_object: IO[Any], index: TarRecordIndex): + def _save_tar_index(cls, tar_file_object: IO[Any], index: TarRecordIndex) -> None: """Saves a tar record index at the end of a tar file object (needs to be finalized / have two empty blocks appended already)""" - def fill_block(): + def fill_block() -> None: # Fill up block with zeros _, remainder = divmod(tar_file_object.tell(), tarfile.BLOCKSIZE) if remainder > 0: @@ -346,87 +692,64 @@ def fill_block(): fill_block() -def consolidate_compressed_metadata(store: zarr.storage.BaseStore, metadata_key=".zmetadata.cbor.xz"): - """Consolidate all metadata for groups and arrays within the given store - into a single compressed cbor resource and put it under the given key. - - See Also - -------- - zarr.consolidate_metadata - """ - store = zarr.storage.normalize_store_arg(store, mode="w") - - version = store._store_version - - if version == 2: - - def is_zarr_key(key): - return key.endswith(".zarray") or key.endswith(".zgroup") or key.endswith(".zattrs") - - else: - raise NotImplementedError("Only supporting V2 stores") +def open_store( + store: Store, open_consolidated: bool, mode: str = "r", **kwargs: Any +) -> zarr.Group: + """Open a zarr group from a store, with optional consolidated metadata. - # Collect all meta-data - out = { - "zarr_consolidated_format": 1, - "metadata": {key: json_loads(store[key]) for key in store if is_zarr_key(key)}, - } + Implements a backwards-compatible interface that handles both new v3 itar files + and legacy v2 itar files: - with io.BytesIO() as metadata_buffer: - # Compress meta-data to in-memory buffer - with lzma.open(metadata_buffer, "wb") as lzma_file: - cbor2.dump(out, lzma_file) + 1. If ``open_consolidated=True``, tries ``zarr.open_consolidated()`` first. + For v3 files, this succeeds because the store intercepts ``zarr.json`` and + decompresses ``zarr.cbor.xz``. For v2 files, this may fail because the + decompressed metadata is in v2 format. + 2. On failure, falls back to ``zarr.open()`` which auto-detects format v2 + and reads the individual metadata keys directly. + 3. If ``open_consolidated=False``, uses ``zarr.open()`` directly. - store[metadata_key] = metadata_buffer.getvalue() - - -class ConsolidatedCompressedMetadataStore(zarr.storage.ConsolidatedMetadataStore): - """A layer over other storage, where the metadata has been consolidated into a single compressed key.""" - - # Overwrite constructor to perform decompression of metadata - def __init__(self, store: zarr.storage.StoreLike, metadata_key=".zmetadata.cbor.xz"): - self.store = Store._ensure_store(store) - - # retrieve consolidated metadata - meta = cbor2.loads(lzma.LZMADecompressor().decompress(self.store[metadata_key])) - - # check format of consolidated metadata - consolidated_format = meta.get("zarr_consolidated_format", None) - if consolidated_format != 1: - raise zarr.MetadataError("unsupported zarr consolidated metadata format: %s" % consolidated_format) - - # decode metadata - self.meta_store: zarr.storage.Store = zarr.KVStore(meta["metadata"]) - - -def open_compressed_consolidated( - store: zarr.storage.StoreLike, metadata_key=".zmetadata.cbor.xz", mode="r+", **kwargs -) -> zarr.hierarchy.Group: - """Open group using metadata previously consolidated and compressed into a single key. - - See Also - -------- - consolidate_compressed_metadata - zarr.open_consolidated + Parameters + ---------- + store : zarr.abc.store.Store + The store to open. + open_consolidated : bool + If True, attempt to use consolidated metadata for faster loading. + mode : str + Access mode ('r' or 'r+'). + **kwargs + Additional keyword arguments passed to ``zarr.open()`` / ``zarr.open_consolidated()``. + + Returns + ------- + zarr.Group + The opened zarr group. + """ + try: + if open_consolidated: + return zarr.open_consolidated(store=store, mode=mode, **kwargs) + else: + return zarr.open(store=store, mode=mode, **kwargs) + except Exception as e: + # Fall back to regular opening if consolidated read fails. + # This handles legacy v2 itar files whose compressed consolidated metadata + # cannot be parsed as v3 consolidated metadata. + _logger.warning(f"Failed to open consolidated store: {e}. Falling back to regular store opening.") + return zarr.open(store=store, mode=mode, **kwargs) + + +def lz4_codecs() -> list: + """Default zarr3 compressors for LZ4 Blosc compression. + + Produces compact, quickly-decodable chunks suitable for both local and remote + storage. Used as the default compressor pipeline for ncore data arrays. + + Returns + ------- + list + A list containing a single ``BloscCodec`` configured for LZ4 compression + with bitshuffle, suitable for the ``compressors`` argument of + ``group.create_array()``. """ + from zarr.codecs import BloscCodec - # normalize parameters - zarr_version = kwargs.get("zarr_version") - store = zarr.storage.normalize_store_arg( - store, storage_options=kwargs.get("storage_options"), mode=mode, zarr_version=zarr_version - ) - if mode not in {"r", "r+"}: - raise ValueError("invalid mode, expected either 'r' or 'r+'; found {!r}".format(mode)) - - path = kwargs.pop("path", None) - if store._store_version == 2: - ConsolidatedStoreClass = ConsolidatedCompressedMetadataStore - else: - raise NotImplementedError("Only supporting V2 stores") - - # setup metadata store - meta_store = ConsolidatedStoreClass(store, metadata_key=metadata_key) - - # pass through - chunk_store = kwargs.pop("chunk_store", None) or store - return zarr.convenience.open(store=meta_store, chunk_store=chunk_store, mode=mode, path=path, **kwargs) + return [BloscCodec(cname="lz4", clevel=5, shuffle="bitshuffle")] diff --git a/ncore/impl/data/stores_test.py b/ncore/impl/data/stores_test.py index 0e87f980..4a8f822c 100644 --- a/ncore/impl/data/stores_test.py +++ b/ncore/impl/data/stores_test.py @@ -20,7 +20,44 @@ import parameterized import zarr -from .stores import IndexedTarStore, consolidate_compressed_metadata, open_compressed_consolidated +from .stores import IndexedTarStore, open_store, lz4_codecs + + +def create_array(group: zarr.Group, name: str, data: np.ndarray, attributes: dict | None = None) -> zarr.Array: + """Helper function to create an array in a group with given name and data.""" + return group.create_array(name=name, data=data, attributes=attributes or {}, compressors=lz4_codecs()) + + +def copy_group(source: zarr.Group, target: zarr.Group) -> None: + """Helper function to recursively copy a zarr group to another group.""" + new_group = target.create_group(source.basename, attributes=source.attrs.asdict()) + + for key in source.keys(): + child = source[key] + if isinstance(child, zarr.Array): + create_array(new_group, key, child[()], child.attrs.asdict()) + elif isinstance(child, zarr.Group): + copy_group(child, new_group) + else: + raise TypeError(f"Unsupported type for key {key}: {type(child)}") + + +def copy_store(source_store: zarr.abc.store.Store, target_store: zarr.abc.store.Store) -> None: + """Helper function to copy a zarr store to another store. + + zarr3 removed ``zarr.copy_store()``, so this is a group-level replacement. + """ + source = zarr.open(store=source_store, mode="r") + target = zarr.create_group(store=target_store, zarr_format=3, attributes=source.attrs.asdict(), overwrite=True) + + for key in source.keys(): + child = source[key] + if isinstance(child, zarr.Array): + create_array(target, key, child[()], child.attrs.asdict()) + elif isinstance(child, zarr.Group): + copy_group(child, target) + else: + raise TypeError(f"Unsupported type for key {key}: {type(child)}") class TestIndexedTarStore(unittest.TestCase): @@ -28,18 +65,23 @@ class TestIndexedTarStore(unittest.TestCase): def setUp(self): # Fill a reference group with an in-memory store - self.g_ref = zarr.open(store=zarr.MemoryStore()) - self.g_ref.create_dataset("foo", data=np.random.rand(3, 3, 3)) - self.g_ref.attrs.update({"some": "thing"}) - self.g_ref.require_group("subgroup").create_dataset("foo", data=np.random.rand(5, 5, 5)) + self.g_ref: zarr.Group = zarr.open_group( + store=zarr.storage.MemoryStore(), mode="w", zarr_format=3, attributes={"some": "thing"} + ) + create_array(self.g_ref, "foo", np.random.rand(3, 3, 3)) + sub = self.g_ref.require_group("subgroup") + create_array(sub, "foo", np.random.rand(5, 5, 5), {"some": "other thing"}) - def check_with_reference(self, group): + def check_with_reference(self, group: zarr.Group) -> None: """Verifies all values of a group against the reference""" self.assertIsNone(np.testing.assert_array_equal(self.g_ref["foo"][()], group["foo"][()])) self.assertIsNone( np.testing.assert_array_equal(self.g_ref["subgroup"]["foo"][()], group["subgroup"]["foo"][()]) ) self.assertDictEqual(self.g_ref.attrs.asdict(), group.attrs.asdict()) + self.assertDictEqual(self.g_ref["foo"].attrs.asdict(), group["foo"].attrs.asdict()) + self.assertDictEqual(self.g_ref["subgroup"].attrs.asdict(), group["subgroup"].attrs.asdict()) + self.assertDictEqual(self.g_ref["subgroup"]["foo"].attrs.asdict(), group["subgroup"]["foo"].attrs.asdict()) def test_reserialization(self): """Make sure storing / loading of regular zarr data to .itar files works correctly""" @@ -47,7 +89,7 @@ def test_reserialization(self): # re-serialize to .itar archive with tempfile.NamedTemporaryFile(suffix=".itar") as f: with IndexedTarStore(f.name, mode="w") as s_itar_out: # closes file on exit - zarr.copy_store(self.g_ref.store, s_itar_out) + copy_store(self.g_ref.store, s_itar_out) # reload store from file store = IndexedTarStore(f.name) @@ -60,20 +102,26 @@ def test_reserialization(self): store.reload_resources() self.check_with_reference(g_reload) - def test_compressed_consolidated(self): - """Make sure compressed consolidated meta data is stored/loaded correctly""" + @parameterized.parameterized.expand( + [ + False, + True, + ] + ) + def test_consolidated(self, open_consolidated: bool): + """Make sure consolidated meta data is stored/loaded correctly""" - # serialize to .itar archive (will also serialize compressed-consolidated meta-data) + # serialize to .itar archive (will also serialize consolidated meta-data) with tempfile.NamedTemporaryFile(suffix=".itar") as f: with IndexedTarStore(f.name, mode="w") as s_itar_out: # closes file on exit - zarr.copy_store(self.g_ref.store, s_itar_out) + copy_store(self.g_ref.store, s_itar_out) - # consolidate compress meta-data - consolidate_compressed_metadata(s_itar_out) + # consolidate meta-data (triggers the store's zarr.json -> zarr.cbor.xz intercept) + zarr.consolidate_metadata(s_itar_out) - # reload store from file with compressed consolidated meta-data + # reload store from file with consolidated meta-data store = IndexedTarStore(f.name) - g_reload = open_compressed_consolidated(store=store, mode="r") + g_reload = open_store(store=store, open_consolidated=open_consolidated, mode="r") # check all data was correctly serialized / deserialized self.check_with_reference(g_reload) @@ -82,33 +130,6 @@ def test_compressed_consolidated(self): store.reload_resources() self.check_with_reference(g_reload) - @parameterized.parameterized.expand( - [ - ( - "not-compressed_consolidate", - False, - ), - ( - "compressed_consolidate", - True, - ), - ] - ) - def test_empty(self, _, compressed_consolidate: bool): - """Verify edge case of serialization of empty store is possible without errors""" - with tempfile.NamedTemporaryFile(suffix=".itar") as f: - with IndexedTarStore(f.name, mode="w") as s_itar_out: # closes file on exit - # Don't write any zarr data (still serializes empty tar / seek tables) - - if compressed_consolidate: - consolidate_compressed_metadata(s_itar_out) - - with IndexedTarStore(f.name) as s_itar_in: - # Loading store should work without errors - # But loading a non-existing group should then fail - with self.assertRaises(zarr.errors.PathNotFoundError): - if compressed_consolidate: - open_compressed_consolidated(s_itar_in, mode="r") - else: - zarr.open(store=s_itar_in, mode="r") +if __name__ == "__main__": + unittest.main() diff --git a/ncore/impl/data/types.py b/ncore/impl/data/types.py index e7074ce0..f158de77 100644 --- a/ncore/impl/data/types.py +++ b/ncore/impl/data/types.py @@ -17,13 +17,12 @@ import dataclasses import io -import sys from abc import ABC, abstractmethod from dataclasses import dataclass, replace from enum import IntEnum, auto, unique from functools import lru_cache -from typing import TYPE_CHECKING, Dict, List, Literal, Mapping, Optional, Protocol, Tuple, TypeVar, Union +from typing import TYPE_CHECKING, Dict, List, Literal, Mapping, Optional, Protocol, Self, Tuple, Union import dataclasses_json import numpy as np @@ -37,13 +36,6 @@ from ncore.impl.data import util -if sys.version_info >= (3, 11): - # Older python versions have issues with type-hints for nested types in - # combination with typing.get_type_hints() (used by, e.g., 'dataclasses_json') - # - alias these globally as a workaround - from typing import Self - - ## JSON-like structures JsonLike = Union[ @@ -131,9 +123,6 @@ def __post_init__(self): # Represents the collection of all concrete external distortion types ConcreteExternalDistortionParametersUnion = Union[BivariateWindshieldModelParameters] -# Self type-var for camera model parameters consistent with PEP 673 but compatible with Python < 3.11 -CameraModelParametersSelf = TypeVar("CameraModelParametersSelf", bound="CameraModelParameters") - @dataclass class CameraModelParameters(ABC): @@ -150,11 +139,11 @@ class CameraModelParameters(ABC): @abstractmethod def transform( - self: CameraModelParametersSelf, + self, image_domain_scale: Union[float, Tuple[float, float]], image_domain_offset: Tuple[float, float] = (0.0, 0.0), new_resolution: Optional[Tuple[int, int]] = None, - ) -> CameraModelParametersSelf: + ) -> Self: """ Applies a transformation to camera model parameter @@ -346,12 +335,6 @@ def transform( ) -if sys.version_info <= (3, 9): - # Older python versions have issues with type-hints for nested types in - # combination with typing.get_type_hints() (used by, e.g., 'dataclasses_json') - # - alias these globally as a workaround - PolynomialType = FThetaCameraModelParameters.PolynomialType - @dataclass class OpenCVPinholeCameraModelParameters(CameraModelParameters, dataclasses_json.DataClassJsonMixin): diff --git a/ncore/impl/data/v4/components.py b/ncore/impl/data/v4/components.py index 4e347620..7191ba99 100644 --- a/ncore/impl/data/v4/components.py +++ b/ncore/impl/data/v4/components.py @@ -19,7 +19,6 @@ import io import json import logging -import sys from abc import ABC, abstractmethod from dataclasses import dataclass @@ -32,6 +31,7 @@ List, Literal, Optional, + Self, Tuple, Type, TypeVar, @@ -45,9 +45,8 @@ import zarr import zarr.storage -from numcodecs import Blosc +from zarr.abc.store import Store from upath import UPath -from zarr._storage.store import Store from ncore.impl.common.transformations import HalfClosedInterval from ncore.impl.common.util import MD5Hasher @@ -57,12 +56,6 @@ if TYPE_CHECKING: import numpy.typing as npt # type: ignore[import-not-found] -if sys.version_info >= (3, 11): - # Older python versions have issues with type-hints for nested types in - # combination with typing.get_type_hints() (used by, e.g., 'dataclasses_json') - # - alias these globally as a workaround - from typing import Self - VERSION = "v4" @@ -193,18 +186,17 @@ def get_base_group(self, component_group_name: Optional[str]) -> zarr.Group: store_path = self._output_dir_path / f"{self._store_base_name}.{store_name}.zarr.itar" store = stores.IndexedTarStore(store_path, mode="w") elif self._store_type == "directory": - # directory-based zarr stores ..zarr.zarr + # directory-based zarr stores ..zarr store_path = self._output_dir_path / f"{self._store_base_name}.{store_name}.zarr" - store = zarr.storage.DirectoryStore(store_path) + store = zarr.storage.LocalStore(store_path) else: raise ValueError(f"Unknown store type {self._store_type}") - # Create root group in store - root_group = zarr.group(store=store) - - # Store dataset associated meta-data to root - root_group.attrs.put( - { + # Create root group in store with sequence metadata as attributes + root_group = zarr.create_group( + store=store, + zarr_format=3, + attributes={ "sequence_id": self._sequence_id, "sequence_timestamp_interval_us": { "start": self._sequence_timestamp_interval_us.start, @@ -213,7 +205,7 @@ def get_base_group(self, component_group_name: Optional[str]) -> zarr.Group: "generic_meta_data": self._generic_meta_data, "version": VERSION, "component_group_name": component_group_name, - } + }, ) # Create store / base-group mapping @@ -236,7 +228,10 @@ def finalize(self) -> List[UPath]: for root_group, store_path in self._stores_rootgroups.values(): store = root_group.store - stores.consolidate_compressed_metadata(store) + # Consolidate metadata into the store. For IndexedTarStore this + # triggers the store-level intercept that compresses the consolidated + # zarr.json into zarr.cbor.xz (CBOR + LZMA). + zarr.consolidate_metadata(store) # Finish writing all files store.close() @@ -278,7 +273,7 @@ def register_component_writer( } # Store meta-data - component_group.attrs.put(meta_data) + component_group.attrs.update(meta_data) self._component_writers[component_id] = ( component_writer := component_writer_type(component_group, self._sequence_timestamp_interval_us) @@ -362,10 +357,10 @@ def thread_load_component_store(component_store_upath: UPath) -> Tuple[zarr.Grou component_store = stores.IndexedTarStore(component_store_upath, mode="r") else: - component_store = zarr.storage.DirectoryStore(component_store_upath) + component_store = zarr.storage.LocalStore(component_store_upath) component_root = ( - stores.open_compressed_consolidated(store=component_store, mode="r") + stores.open_store(store=component_store, open_consolidated=open_consolidated, mode="r") if open_consolidated else zarr.open(store=component_store, mode="r") ) @@ -424,16 +419,9 @@ def thread_load_component_store(component_store_upath: UPath) -> Tuple[zarr.Grou def reload_resources(self) -> None: """Trigger a reload of each itar store - useful to re-initialize file objects in multi-process settings""" - component_store: Union[zarr.Group, stores.ConsolidatedCompressedMetadataStore] - for component_store, _ in self._component_stores.values(): - # unwind one layer of possible consolidated metadata store - if isinstance( - compressed_consolidated_store := component_store.store, stores.ConsolidatedCompressedMetadataStore - ): - component_store = compressed_consolidated_store - - if isinstance(store := component_store.store, stores.IndexedTarStore): - store.reload_resources() + for component_root, _ in self._component_stores.values(): + if isinstance(component_root.store, stores.IndexedTarStore): + component_root.store.reload_resources() @property def sequence_id(self) -> str: @@ -464,7 +452,7 @@ def open_component_readers( continue # instantiate a reader for each of the components - for component_instance_name, component_group in component_group.items(): + for component_instance_name, component_group in component_group.groups(): assert component_instance_name not in ret, ( f"Component instance {component_instance_name} encountered multiple times" ) @@ -484,10 +472,10 @@ def get_sequence_meta(self) -> SequenceMeta: component_stores_info: List[SequenceMeta.ComponentStoreMeta] = [] for component_root_group, component_store_path in self._component_stores.values(): components: Dict[str, Dict[str, SequenceMeta.ComponentInstanceMeta]] = {} - for component_name, component in component_root_group.items(): + for component_name, component in component_root_group.groups(): # collect component names and instances component_instances: Dict[str, SequenceMeta.ComponentInstanceMeta] = {} - for component_instance_name, component_instance in component.items(): + for component_instance_name, component_instance in component.groups(): component_instance_attrs = component_instance.attrs component_instances[component_instance_name] = SequenceMeta.ComponentInstanceMeta( version=component_instance_attrs["component_version"], @@ -628,8 +616,8 @@ def __init__(self, component_group: zarr.Group, sequence_timestamp_interval_us: def finalize(self): """Actually store the json-encoded pose data""" - self._group.create_group("static_poses").attrs.put(self.data["static_poses"]) - self._group.create_group("dynamic_poses").attrs.put(self.data["dynamic_poses"]) + self._group.create_group("static_poses").attrs.update(self.data["static_poses"]) + self._group.create_group("dynamic_poses").attrs.update(self.data["dynamic_poses"]) def store_static_pose( self, @@ -813,7 +801,7 @@ def store_camera_intrinsics( meta_data = types.encode_camera_model_parameters(camera_model_parameters) - self._cameras_group.create_group(camera_id).attrs.put(meta_data) + self._cameras_group.create_group(camera_id).attrs.update(meta_data) return self @@ -828,7 +816,7 @@ def store_lidar_intrinsics( # Prepare meta-data containing the serialization of the mandatory lidar model meta_data = types.encode_lidar_model_parameters(lidar_model_parameters) - self._lidars_group.create_group(lidar_id).attrs.put(meta_data) + self._lidars_group.create_group(lidar_id).attrs.update(meta_data) return self @@ -892,7 +880,7 @@ def store_camera_masks( """Store camera-associated masks""" # Store mask names - (camera_grp := self._cameras_group.create_group(camera_id)).attrs.put( + (camera_grp := self._cameras_group.create_group(camera_id)).attrs.update( {"mask_names": list(mask_images.keys())} ) @@ -902,9 +890,10 @@ def store_camera_masks( FORMAT = "png" mask_image.save(buffer, format=FORMAT, optimize=True) # encodes as png # store mask data (uncompressed, as already encoded) - camera_grp.create_dataset(mask_name, data=np.asarray(buffer.getvalue()), compressor=None).attrs[ - "format" - ] = FORMAT + mask_arr = camera_grp.create_array( + mask_name, data=np.asarray(buffer.getvalue()), compressors=() + ) + mask_arr.attrs["format"] = FORMAT return self @@ -971,7 +960,7 @@ def finalize(self): ) # Store as meta-data of frames group - self._frames_group.attrs.put({"frames_timestamps_us": frames_timestamps_us.tolist()}) + self._frames_group.attrs.update({"frames_timestamps_us": frames_timestamps_us.tolist()}) def _get_frame_group( self, @@ -1017,15 +1006,15 @@ def _store_base_frame( self._frames_timestamps_us[frame_timestamps_us[1].item()] = frame_timestamps_us[0].item() # Store additional generic frame data and meta-data (not dimension / dtype checked) - (frame_generic_data_group := frame_group.create_group("generic_data")).attrs.put(generic_meta_data) + (frame_generic_data_group := frame_group.create_group("generic_data")).attrs.update(generic_meta_data) for name, value in generic_data.items(): - frame_generic_data_group.create_dataset( + frame_generic_data_group.create_array( name, data=value, # we are not accessing sub-ranges, so disable chunking chunks=value.shape, # use compression that is fast to decode on modern hardware - compressor=Blosc(cname="lz4", clevel=5, shuffle=Blosc.BITSHUFFLE), + compressors=stores.lz4_codecs(), ) return frame_group @@ -1112,21 +1101,21 @@ def _store_frame_ray_bundle( ) -> None: ## Initialize ray bundle group frame_group = self._get_frame_group(frame_timestamps_us) - (ray_bundle_group := frame_group.create_group("ray_bundle")).attrs.put({"n_rays": n_rays}) + (ray_bundle_group := frame_group.create_group("ray_bundle")).attrs.update({"n_rays": n_rays}) # Store per-ray data for name, (ray_data_data, chunks) in ray_data.items(): assert len(ray_data_data) == n_rays, f"{name} doesn't have required ray count" - ray_bundle_group.create_dataset( + ray_bundle_group.create_array( name, data=ray_data_data, chunks=chunks, # use compression that is fast to decode on modern hardware - compressor=Blosc(cname="lz4", clevel=5, shuffle=Blosc.BITSHUFFLE), + compressors=stores.lz4_codecs(), ) ## Initialize ray bundle returns group - (ray_bundle_returns_group := frame_group.create_group("ray_bundle_returns")).attrs.put({"n_returns": n_returns}) + (ray_bundle_returns_group := frame_group.create_group("ray_bundle_returns")).attrs.update({"n_returns": n_returns}) # Store per-return data absent_mask = None @@ -1159,12 +1148,12 @@ def _store_frame_ray_bundle( # validate absent mask consistency assert np.array_equal(absent_mask, local_absent_mask), f"Inconsistent NaN masks in return data {name}" - ray_bundle_returns_group.create_dataset( + ray_bundle_returns_group.create_array( name, data=return_data_data, chunks=chunks, # use compression that is fast to decode on modern hardware - compressor=Blosc(cname="lz4", clevel=5, shuffle=Blosc.BITSHUFFLE), + compressors=stores.lz4_codecs(), ) if absent_mask is None: @@ -1173,14 +1162,15 @@ def _store_frame_ray_bundle( valid_mask_packed = np.packbits(~absent_mask) - frame_group.create_dataset( + valid_mask_arr = frame_group.create_array( "ray_bundle_returns_valid_mask_packed", data=valid_mask_packed, # we are not accessing sub-ranges, so disable chunking chunks=valid_mask_packed.shape, # use compression that is fast to decode on modern hardware - compressor=Blosc(cname="lz4", clevel=5, shuffle=Blosc.BITSHUFFLE), - ).attrs.put({"n_returns": n_returns, "n_rays": n_rays}) + compressors=stores.lz4_codecs(), + ) + valid_mask_arr.attrs.update({"n_returns": n_returns, "n_rays": n_rays}) class BaseRayBundleSensorComponentReader(BaseSensorComponentReader): @@ -1295,9 +1285,10 @@ def store_frame( frame_group = self._store_base_frame(frame_timestamps_us, generic_data, generic_meta_data) # Store image data (uncompressed, as already encoded) - frame_group.create_dataset("image", data=np.asarray(image_binary_data), compressor=None).attrs["format"] = ( - image_format + image_arr = frame_group.create_array( + "image", data=np.asarray(image_binary_data), compressors=() ) + image_arr.attrs["format"] = image_format return self @@ -1574,7 +1565,7 @@ def store_observations( ) obs_dict_list.append(obs.to_dict()) - self._group.create_group("cuboids").attrs.put({"cuboid_track_observations": obs_dict_list}) + self._group.create_group("cuboids").attrs.update({"cuboid_track_observations": obs_dict_list}) return self diff --git a/ncore/impl/data/v4/components_test.py b/ncore/impl/data/v4/components_test.py index bd179f06..ee00dc97 100644 --- a/ncore/impl/data/v4/components_test.py +++ b/ncore/impl/data/v4/components_test.py @@ -1179,15 +1179,14 @@ def finalize(self): timestamps_array = np.array(self.timestamps, dtype=np.uint64) # Store as zarr arrays - self._group.create_dataset( + # Note: zarr3 infers dtype from data; passing both is an error. + self._group.create_array( "velocities", data=velocities_array, - dtype=velocities_array.dtype, ) - self._group.create_dataset( + self._group.create_array( "timestamps_us", data=timestamps_array, - dtype=np.uint64, ) class Reader(ComponentReader): @@ -1365,9 +1364,9 @@ def finalize(self): accelerations_array = np.stack(self.accelerations) timestamps_array = np.array(self.timestamps, dtype=np.uint64) - self._group.create_dataset("velocities", data=velocities_array) - self._group.create_dataset("accelerations", data=accelerations_array) # NEW - self._group.create_dataset("timestamps_us", data=timestamps_array) + self._group.create_array("velocities", data=velocities_array) + self._group.create_array("accelerations", data=accelerations_array) # NEW + self._group.create_array("timestamps_us", data=timestamps_array) # Create a backward-compatible reader that can read both v1 and v2 class VelocityComponentBackwardCompatibleReader(ComponentReader): diff --git a/ncore/impl/data_converter/base.py b/ncore/impl/data_converter/base.py index 7a692bf6..4eb71505 100644 --- a/ncore/impl/data_converter/base.py +++ b/ncore/impl/data_converter/base.py @@ -16,7 +16,6 @@ from __future__ import annotations import logging -import sys from abc import ABC, abstractmethod from dataclasses import dataclass @@ -25,7 +24,7 @@ from upath import UPath -@dataclass(**({"slots": True, "kw_only": True} if sys.version_info >= (3, 10) else {})) +@dataclass(slots=True, kw_only=True) class BaseDataConverterConfig: """Generic data converter parameters""" @@ -155,7 +154,7 @@ def convert_sequence(self, sequence_id: str) -> None: pass -@dataclass(**({"slots": True, "kw_only": True} if sys.version_info >= (3, 10) else {})) +@dataclass(slots=True, kw_only=True) class FileBasedDataConverterConfig(BaseDataConverterConfig): """Config for converters that read from a local root directory. diff --git a/pyproject.toml b/pyproject.toml index 09d50935..9b861602 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,7 @@ # limitations under the License. [tool.ruff] -target-version = "py310" +target-version = "py311" line-length = 120 exclude = [