From 2b64daa6aeadb73925aaea9d5c80ab28f23b9141 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Wed, 25 Feb 2026 18:33:03 -0500 Subject: [PATCH 01/10] add sync methods to codecs --- src/zarr/abc/codec.py | 20 ++++++++++++++++++- src/zarr/codecs/blosc.py | 27 ++++++++++++++++---------- src/zarr/codecs/bytes.py | 18 ++++++++++++++++-- src/zarr/codecs/crc32c_.py | 18 ++++++++++++++++-- src/zarr/codecs/gzip.py | 27 ++++++++++++++++++++------ src/zarr/codecs/transpose.py | 20 ++++++++++++++++--- src/zarr/codecs/vlen_utf8.py | 37 +++++++++++++++++++++++++++++++----- src/zarr/codecs/zstd.py | 24 ++++++++++++++++------- 8 files changed, 155 insertions(+), 36 deletions(-) diff --git a/src/zarr/abc/codec.py b/src/zarr/abc/codec.py index d41c457b4e..3ec5ec522b 100644 --- a/src/zarr/abc/codec.py +++ b/src/zarr/abc/codec.py @@ -2,7 +2,7 @@ from abc import abstractmethod from collections.abc import Mapping -from typing import TYPE_CHECKING, Generic, TypeGuard, TypeVar +from typing import TYPE_CHECKING, Generic, Protocol, TypeGuard, TypeVar, runtime_checkable from typing_extensions import ReadOnly, TypedDict @@ -32,6 +32,7 @@ "CodecInput", "CodecOutput", "CodecPipeline", + "SupportsSyncCodec", ] CodecInput = TypeVar("CodecInput", bound=NDBuffer | Buffer) @@ -59,6 +60,23 @@ def _check_codecjson_v2(data: object) -> TypeGuard[CodecJSON_V2[str]]: """The widest type of JSON-like input that could specify a codec.""" +@runtime_checkable +class SupportsSyncCodec(Protocol): + """Protocol for codecs that support synchronous encode/decode. + + Codecs implementing this protocol provide ``_decode_sync`` and ``_encode_sync`` + methods that perform encoding/decoding without requiring an async event loop. + """ + + def _decode_sync( + self, chunk_data: NDBuffer | Buffer, chunk_spec: ArraySpec + ) -> NDBuffer | Buffer: ... + + def _encode_sync( + self, chunk_data: NDBuffer | Buffer, chunk_spec: ArraySpec + ) -> NDBuffer | Buffer | None: ... + + class BaseCodec(Metadata, Generic[CodecInput, CodecOutput]): """Generic base class for codecs. diff --git a/src/zarr/codecs/blosc.py b/src/zarr/codecs/blosc.py index 5b91cfa005..d05731d640 100644 --- a/src/zarr/codecs/blosc.py +++ b/src/zarr/codecs/blosc.py @@ -299,13 +299,27 @@ def _blosc_codec(self) -> Blosc: config_dict["typesize"] = self.typesize return Blosc.from_config(config_dict) + def _decode_sync( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> Buffer: + return as_numpy_array_wrapper(self._blosc_codec.decode, chunk_bytes, chunk_spec.prototype) + async def _decode_single( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, ) -> Buffer: - return await asyncio.to_thread( - as_numpy_array_wrapper, self._blosc_codec.decode, chunk_bytes, chunk_spec.prototype + return await asyncio.to_thread(self._decode_sync, chunk_bytes, chunk_spec) + + def _encode_sync( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> Buffer | None: + return chunk_spec.prototype.buffer.from_bytes( + self._blosc_codec.encode(chunk_bytes.as_numpy_array()) ) async def _encode_single( @@ -313,14 +327,7 @@ async def _encode_single( chunk_bytes: Buffer, chunk_spec: ArraySpec, ) -> Buffer | None: - # Since blosc only support host memory, we convert the input and output of the encoding - # between numpy array and buffer - return await asyncio.to_thread( - lambda chunk: chunk_spec.prototype.buffer.from_bytes( - self._blosc_codec.encode(chunk.as_numpy_array()) - ), - chunk_bytes, - ) + return await asyncio.to_thread(self._encode_sync, chunk_bytes, chunk_spec) def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int: raise NotImplementedError diff --git a/src/zarr/codecs/bytes.py b/src/zarr/codecs/bytes.py index 1fbdeef497..86bb354fb5 100644 --- a/src/zarr/codecs/bytes.py +++ b/src/zarr/codecs/bytes.py @@ -65,7 +65,7 @@ def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: ) return self - async def _decode_single( + def _decode_sync( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, @@ -88,7 +88,14 @@ async def _decode_single( ) return chunk_array - async def _encode_single( + async def _decode_single( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> NDBuffer: + return self._decode_sync(chunk_bytes, chunk_spec) + + def _encode_sync( self, chunk_array: NDBuffer, chunk_spec: ArraySpec, @@ -109,5 +116,12 @@ async def _encode_single( nd_array = nd_array.ravel().view(dtype="B") return chunk_spec.prototype.buffer.from_array_like(nd_array) + async def _encode_single( + self, + chunk_array: NDBuffer, + chunk_spec: ArraySpec, + ) -> Buffer | None: + return self._encode_sync(chunk_array, chunk_spec) + def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int: return input_byte_length diff --git a/src/zarr/codecs/crc32c_.py b/src/zarr/codecs/crc32c_.py index 9536d0d558..ebe2ac8f7a 100644 --- a/src/zarr/codecs/crc32c_.py +++ b/src/zarr/codecs/crc32c_.py @@ -31,7 +31,7 @@ def from_dict(cls, data: dict[str, JSON]) -> Self: def to_dict(self) -> dict[str, JSON]: return {"name": "crc32c"} - async def _decode_single( + def _decode_sync( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, @@ -51,7 +51,14 @@ async def _decode_single( ) return chunk_spec.prototype.buffer.from_array_like(inner_bytes) - async def _encode_single( + async def _decode_single( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> Buffer: + return self._decode_sync(chunk_bytes, chunk_spec) + + def _encode_sync( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, @@ -64,5 +71,12 @@ async def _encode_single( # Append the checksum (as bytes) to the data return chunk_spec.prototype.buffer.from_array_like(np.append(data, checksum.view("B"))) + async def _encode_single( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> Buffer | None: + return self._encode_sync(chunk_bytes, chunk_spec) + def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int: return input_byte_length + 4 diff --git a/src/zarr/codecs/gzip.py b/src/zarr/codecs/gzip.py index 610ca9dadd..b8591748f7 100644 --- a/src/zarr/codecs/gzip.py +++ b/src/zarr/codecs/gzip.py @@ -2,6 +2,7 @@ import asyncio from dataclasses import dataclass +from functools import cached_property from typing import TYPE_CHECKING from numcodecs.gzip import GZip @@ -48,23 +49,37 @@ def from_dict(cls, data: dict[str, JSON]) -> Self: def to_dict(self) -> dict[str, JSON]: return {"name": "gzip", "configuration": {"level": self.level}} + @cached_property + def _gzip_codec(self) -> GZip: + return GZip(self.level) + + def _decode_sync( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> Buffer: + return as_numpy_array_wrapper(self._gzip_codec.decode, chunk_bytes, chunk_spec.prototype) + async def _decode_single( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, ) -> Buffer: - return await asyncio.to_thread( - as_numpy_array_wrapper, GZip(self.level).decode, chunk_bytes, chunk_spec.prototype - ) + return await asyncio.to_thread(self._decode_sync, chunk_bytes, chunk_spec) + + def _encode_sync( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> Buffer | None: + return as_numpy_array_wrapper(self._gzip_codec.encode, chunk_bytes, chunk_spec.prototype) async def _encode_single( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, ) -> Buffer | None: - return await asyncio.to_thread( - as_numpy_array_wrapper, GZip(self.level).encode, chunk_bytes, chunk_spec.prototype - ) + return await asyncio.to_thread(self._encode_sync, chunk_bytes, chunk_spec) def compute_encoded_size( self, diff --git a/src/zarr/codecs/transpose.py b/src/zarr/codecs/transpose.py index a8570b6e8f..609448a59c 100644 --- a/src/zarr/codecs/transpose.py +++ b/src/zarr/codecs/transpose.py @@ -95,20 +95,34 @@ def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec: prototype=chunk_spec.prototype, ) - async def _decode_single( + def _decode_sync( self, chunk_array: NDBuffer, chunk_spec: ArraySpec, ) -> NDBuffer: - inverse_order = np.argsort(self.order) + inverse_order = tuple(int(i) for i in np.argsort(self.order)) return chunk_array.transpose(inverse_order) - async def _encode_single( + async def _decode_single( + self, + chunk_array: NDBuffer, + chunk_spec: ArraySpec, + ) -> NDBuffer: + return self._decode_sync(chunk_array, chunk_spec) + + def _encode_sync( self, chunk_array: NDBuffer, _chunk_spec: ArraySpec, ) -> NDBuffer | None: return chunk_array.transpose(self.order) + async def _encode_single( + self, + chunk_array: NDBuffer, + _chunk_spec: ArraySpec, + ) -> NDBuffer | None: + return self._encode_sync(chunk_array, _chunk_spec) + def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int: return input_byte_length diff --git a/src/zarr/codecs/vlen_utf8.py b/src/zarr/codecs/vlen_utf8.py index fb1fb76126..a10cb7c335 100644 --- a/src/zarr/codecs/vlen_utf8.py +++ b/src/zarr/codecs/vlen_utf8.py @@ -40,8 +40,7 @@ def to_dict(self) -> dict[str, JSON]: def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: return self - # TODO: expand the tests for this function - async def _decode_single( + def _decode_sync( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, @@ -55,7 +54,14 @@ async def _decode_single( as_string_dtype = decoded.astype(chunk_spec.dtype.to_native_dtype(), copy=False) return chunk_spec.prototype.nd_buffer.from_numpy_array(as_string_dtype) - async def _encode_single( + async def _decode_single( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> NDBuffer: + return self._decode_sync(chunk_bytes, chunk_spec) + + def _encode_sync( self, chunk_array: NDBuffer, chunk_spec: ArraySpec, @@ -65,6 +71,13 @@ async def _encode_single( _vlen_utf8_codec.encode(chunk_array.as_numpy_array()) ) + async def _encode_single( + self, + chunk_array: NDBuffer, + chunk_spec: ArraySpec, + ) -> Buffer | None: + return self._encode_sync(chunk_array, chunk_spec) + def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int: # what is input_byte_length for an object dtype? raise NotImplementedError("compute_encoded_size is not implemented for VLen codecs") @@ -86,7 +99,7 @@ def to_dict(self) -> dict[str, JSON]: def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: return self - async def _decode_single( + def _decode_sync( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, @@ -99,7 +112,14 @@ async def _decode_single( decoded = _reshape_view(decoded, chunk_spec.shape) return chunk_spec.prototype.nd_buffer.from_numpy_array(decoded) - async def _encode_single( + async def _decode_single( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> NDBuffer: + return self._decode_sync(chunk_bytes, chunk_spec) + + def _encode_sync( self, chunk_array: NDBuffer, chunk_spec: ArraySpec, @@ -109,6 +129,13 @@ async def _encode_single( _vlen_bytes_codec.encode(chunk_array.as_numpy_array()) ) + async def _encode_single( + self, + chunk_array: NDBuffer, + chunk_spec: ArraySpec, + ) -> Buffer | None: + return self._encode_sync(chunk_array, chunk_spec) + def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int: # what is input_byte_length for an object dtype? raise NotImplementedError("compute_encoded_size is not implemented for VLen codecs") diff --git a/src/zarr/codecs/zstd.py b/src/zarr/codecs/zstd.py index 27cc9a7777..f93c25a3c7 100644 --- a/src/zarr/codecs/zstd.py +++ b/src/zarr/codecs/zstd.py @@ -38,7 +38,7 @@ def parse_checksum(data: JSON) -> bool: class ZstdCodec(BytesBytesCodec): """zstd codec""" - is_fixed_size = True + is_fixed_size = False level: int = 0 checksum: bool = False @@ -71,23 +71,33 @@ def _zstd_codec(self) -> Zstd: config_dict = {"level": self.level, "checksum": self.checksum} return Zstd.from_config(config_dict) + def _decode_sync( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> Buffer: + return as_numpy_array_wrapper(self._zstd_codec.decode, chunk_bytes, chunk_spec.prototype) + async def _decode_single( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, ) -> Buffer: - return await asyncio.to_thread( - as_numpy_array_wrapper, self._zstd_codec.decode, chunk_bytes, chunk_spec.prototype - ) + return await asyncio.to_thread(self._decode_sync, chunk_bytes, chunk_spec) + + def _encode_sync( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> Buffer | None: + return as_numpy_array_wrapper(self._zstd_codec.encode, chunk_bytes, chunk_spec.prototype) async def _encode_single( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, ) -> Buffer | None: - return await asyncio.to_thread( - as_numpy_array_wrapper, self._zstd_codec.encode, chunk_bytes, chunk_spec.prototype - ) + return await asyncio.to_thread(self._encode_sync, chunk_bytes, chunk_spec) def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int: raise NotImplementedError From cd4efb0a1ed611d083e650b87e3de44cc39f5a46 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Wed, 25 Feb 2026 19:09:51 -0500 Subject: [PATCH 02/10] add CodecChain dataclass and sync codec tests Introduces CodecChain, a frozen dataclass that chains array-array, array-bytes, and bytes-bytes codecs with synchronous encode/decode methods. Pure compute only -- no IO, no threading, no batching. Also adds sync roundtrip tests for individual codecs (blosc, gzip, zstd, crc32c, bytes, transpose, vlen) and CodecChain integration tests. Co-Authored-By: Claude Opus 4.6 --- src/zarr/core/codec_pipeline.py | 133 +++++++++++++++++++++++- tests/test_codecs/test_blosc.py | 29 +++++- tests/test_codecs/test_crc32c.py | 33 ++++++ tests/test_codecs/test_endian.py | 29 ++++++ tests/test_codecs/test_gzip.py | 28 +++++ tests/test_codecs/test_transpose.py | 28 +++++ tests/test_codecs/test_vlen.py | 11 +- tests/test_codecs/test_zstd.py | 32 ++++++ tests/test_sync_codec_pipeline.py | 156 ++++++++++++++++++++++++++++ 9 files changed, 474 insertions(+), 5 deletions(-) create mode 100644 tests/test_codecs/test_crc32c.py create mode 100644 tests/test_sync_codec_pipeline.py diff --git a/src/zarr/core/codec_pipeline.py b/src/zarr/core/codec_pipeline.py index fd557ac43e..7425ec30f6 100644 --- a/src/zarr/core/codec_pipeline.py +++ b/src/zarr/core/codec_pipeline.py @@ -1,8 +1,8 @@ from __future__ import annotations -from dataclasses import dataclass +from dataclasses import dataclass, field from itertools import islice, pairwise -from typing import TYPE_CHECKING, Any, TypeVar +from typing import TYPE_CHECKING, Any, TypeVar, cast from warnings import warn from zarr.abc.codec import ( @@ -13,6 +13,7 @@ BytesBytesCodec, Codec, CodecPipeline, + SupportsSyncCodec, ) from zarr.core.common import concurrent_map from zarr.core.config import config @@ -68,6 +69,134 @@ def fill_value_or_default(chunk_spec: ArraySpec) -> Any: return fill_value +@dataclass(frozen=True) +class CodecChain: + """Lightweight codec chain: array-array -> array-bytes -> bytes-bytes. + + Pure compute only -- no IO methods, no threading, no batching. + """ + + array_array_codecs: tuple[ArrayArrayCodec, ...] + array_bytes_codec: ArrayBytesCodec + bytes_bytes_codecs: tuple[BytesBytesCodec, ...] + + _all_sync: bool = field(default=False, init=False, repr=False, compare=False) + + def __post_init__(self) -> None: + object.__setattr__( + self, + "_all_sync", + all(isinstance(c, SupportsSyncCodec) for c in self), + ) + + def __iter__(self) -> Iterator[Codec]: + yield from self.array_array_codecs + yield self.array_bytes_codec + yield from self.bytes_bytes_codecs + + @classmethod + def from_codecs(cls, codecs: Iterable[Codec]) -> CodecChain: + aa, ab, bb = codecs_from_list(list(codecs)) + return cls(array_array_codecs=aa, array_bytes_codec=ab, bytes_bytes_codecs=bb) + + def resolve_metadata_chain( + self, chunk_spec: ArraySpec + ) -> tuple[ + list[tuple[ArrayArrayCodec, ArraySpec]], + tuple[ArrayBytesCodec, ArraySpec], + list[tuple[BytesBytesCodec, ArraySpec]], + ]: + """Resolve metadata through the codec chain for a single chunk_spec.""" + aa_codecs_with_spec: list[tuple[ArrayArrayCodec, ArraySpec]] = [] + spec = chunk_spec + for aa_codec in self.array_array_codecs: + aa_codecs_with_spec.append((aa_codec, spec)) + spec = aa_codec.resolve_metadata(spec) + + ab_codec_with_spec = (self.array_bytes_codec, spec) + spec = self.array_bytes_codec.resolve_metadata(spec) + + bb_codecs_with_spec: list[tuple[BytesBytesCodec, ArraySpec]] = [] + for bb_codec in self.bytes_bytes_codecs: + bb_codecs_with_spec.append((bb_codec, spec)) + spec = bb_codec.resolve_metadata(spec) + + return (aa_codecs_with_spec, ab_codec_with_spec, bb_codecs_with_spec) + + def decode_chunk( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + aa_chain: Iterable[tuple[ArrayArrayCodec, ArraySpec]] | None = None, + ab_pair: tuple[ArrayBytesCodec, ArraySpec] | None = None, + bb_chain: Iterable[tuple[BytesBytesCodec, ArraySpec]] | None = None, + ) -> NDBuffer: + """Decode a single chunk through the full codec chain, synchronously. + + Pure compute -- no IO. Only callable when all codecs support sync. + + The optional ``aa_chain``, ``ab_pair``, ``bb_chain`` parameters allow + pre-resolved metadata to be reused across many chunks with the same spec. + If not provided, ``resolve_metadata_chain`` is called internally. + """ + if aa_chain is None or ab_pair is None or bb_chain is None: + aa_chain, ab_pair, bb_chain = self.resolve_metadata_chain(chunk_spec) + + bb_out: Any = chunk_bytes + for bb_codec, spec in reversed(list(bb_chain)): + bb_out = cast("SupportsSyncCodec", bb_codec)._decode_sync(bb_out, spec) + + ab_codec, ab_spec = ab_pair + ab_out: Any = cast("SupportsSyncCodec", ab_codec)._decode_sync(bb_out, ab_spec) + + for aa_codec, spec in reversed(list(aa_chain)): + ab_out = cast("SupportsSyncCodec", aa_codec)._decode_sync(ab_out, spec) + + return ab_out # type: ignore[no-any-return] + + def encode_chunk( + self, + chunk_array: NDBuffer, + chunk_spec: ArraySpec, + ) -> Buffer | None: + """Encode a single chunk through the full codec chain, synchronously. + + Pure compute -- no IO. Only callable when all codecs support sync. + """ + spec = chunk_spec + aa_out: Any = chunk_array + + for aa_codec in self.array_array_codecs: + if aa_out is None: + return None + aa_out = cast("SupportsSyncCodec", aa_codec)._encode_sync(aa_out, spec) + spec = aa_codec.resolve_metadata(spec) + + if aa_out is None: + return None + bb_out: Any = cast("SupportsSyncCodec", self.array_bytes_codec)._encode_sync(aa_out, spec) + spec = self.array_bytes_codec.resolve_metadata(spec) + + for bb_codec in self.bytes_bytes_codecs: + if bb_out is None: + return None + bb_out = cast("SupportsSyncCodec", bb_codec)._encode_sync(bb_out, spec) + spec = bb_codec.resolve_metadata(spec) + + return bb_out # type: ignore[no-any-return] + + def compute_encoded_size(self, byte_length: int, array_spec: ArraySpec) -> int: + for codec in self: + byte_length = codec.compute_encoded_size(byte_length, array_spec) + array_spec = codec.resolve_metadata(array_spec) + return byte_length + + def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec: + for codec in self: + chunk_spec = codec.resolve_metadata(chunk_spec) + return chunk_spec + + @dataclass(frozen=True) class BatchedCodecPipeline(CodecPipeline): """Default codec pipeline. diff --git a/tests/test_codecs/test_blosc.py b/tests/test_codecs/test_blosc.py index 6f4821f8b1..0201beb8de 100644 --- a/tests/test_codecs/test_blosc.py +++ b/tests/test_codecs/test_blosc.py @@ -6,11 +6,12 @@ from packaging.version import Version import zarr +from zarr.abc.codec import SupportsSyncCodec from zarr.codecs import BloscCodec from zarr.codecs.blosc import BloscShuffle, Shuffle -from zarr.core.array_spec import ArraySpec +from zarr.core.array_spec import ArrayConfig, ArraySpec from zarr.core.buffer import default_buffer_prototype -from zarr.core.dtype import UInt16 +from zarr.core.dtype import UInt16, get_data_type_from_native_dtype from zarr.storage import MemoryStore, StorePath @@ -110,3 +111,27 @@ async def test_typesize() -> None: else: expected_size = 10216 assert size == expected_size, msg + + +def test_blosc_codec_supports_sync() -> None: + assert isinstance(BloscCodec(), SupportsSyncCodec) + + +def test_blosc_codec_sync_roundtrip() -> None: + codec = BloscCodec(typesize=8) + arr = np.arange(100, dtype="float64") + zdtype = get_data_type_from_native_dtype(arr.dtype) + spec = ArraySpec( + shape=arr.shape, + dtype=zdtype, + fill_value=zdtype.cast_scalar(0), + config=ArrayConfig(order="C", write_empty_chunks=True), + prototype=default_buffer_prototype(), + ) + buf = default_buffer_prototype().buffer.from_array_like(arr.view("B")) + + encoded = codec._encode_sync(buf, spec) + assert encoded is not None + decoded = codec._decode_sync(encoded, spec) + result = np.frombuffer(decoded.as_numpy_array(), dtype="float64") + np.testing.assert_array_equal(arr, result) diff --git a/tests/test_codecs/test_crc32c.py b/tests/test_codecs/test_crc32c.py new file mode 100644 index 0000000000..3ab1070f60 --- /dev/null +++ b/tests/test_codecs/test_crc32c.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +import numpy as np + +from zarr.abc.codec import SupportsSyncCodec +from zarr.codecs.crc32c_ import Crc32cCodec +from zarr.core.array_spec import ArrayConfig, ArraySpec +from zarr.core.buffer import default_buffer_prototype +from zarr.core.dtype import get_data_type_from_native_dtype + + +def test_crc32c_codec_supports_sync() -> None: + assert isinstance(Crc32cCodec(), SupportsSyncCodec) + + +def test_crc32c_codec_sync_roundtrip() -> None: + codec = Crc32cCodec() + arr = np.arange(100, dtype="float64") + zdtype = get_data_type_from_native_dtype(arr.dtype) + spec = ArraySpec( + shape=arr.shape, + dtype=zdtype, + fill_value=zdtype.cast_scalar(0), + config=ArrayConfig(order="C", write_empty_chunks=True), + prototype=default_buffer_prototype(), + ) + buf = default_buffer_prototype().buffer.from_array_like(arr.view("B")) + + encoded = codec._encode_sync(buf, spec) + assert encoded is not None + decoded = codec._decode_sync(encoded, spec) + result = np.frombuffer(decoded.as_numpy_array(), dtype="float64") + np.testing.assert_array_equal(arr, result) diff --git a/tests/test_codecs/test_endian.py b/tests/test_codecs/test_endian.py index ab64afb1b8..c505cee828 100644 --- a/tests/test_codecs/test_endian.py +++ b/tests/test_codecs/test_endian.py @@ -4,8 +4,12 @@ import pytest import zarr +from zarr.abc.codec import SupportsSyncCodec from zarr.abc.store import Store from zarr.codecs import BytesCodec +from zarr.core.array_spec import ArrayConfig, ArraySpec +from zarr.core.buffer import NDBuffer, default_buffer_prototype +from zarr.core.dtype import get_data_type_from_native_dtype from zarr.storage import StorePath from .test_codecs import _AsyncArrayProxy @@ -33,6 +37,31 @@ async def test_endian(store: Store, endian: Literal["big", "little"]) -> None: assert np.array_equal(data, readback_data) +def test_bytes_codec_supports_sync() -> None: + assert isinstance(BytesCodec(), SupportsSyncCodec) + + +def test_bytes_codec_sync_roundtrip() -> None: + codec = BytesCodec() + arr = np.arange(100, dtype="float64") + zdtype = get_data_type_from_native_dtype(arr.dtype) + spec = ArraySpec( + shape=arr.shape, + dtype=zdtype, + fill_value=zdtype.cast_scalar(0), + config=ArrayConfig(order="C", write_empty_chunks=True), + prototype=default_buffer_prototype(), + ) + nd_buf: NDBuffer = default_buffer_prototype().nd_buffer.from_numpy_array(arr) + + codec = codec.evolve_from_array_spec(spec) + + encoded = codec._encode_sync(nd_buf, spec) + assert encoded is not None + decoded = codec._decode_sync(encoded, spec) + np.testing.assert_array_equal(arr, decoded.as_numpy_array()) + + @pytest.mark.filterwarnings("ignore:The endianness of the requested serializer") @pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) @pytest.mark.parametrize("dtype_input_endian", [">u2", " None: a[:, :] = data assert np.array_equal(data, a[:, :]) + + +def test_gzip_codec_supports_sync() -> None: + assert isinstance(GzipCodec(), SupportsSyncCodec) + + +def test_gzip_codec_sync_roundtrip() -> None: + codec = GzipCodec(level=1) + arr = np.arange(100, dtype="float64") + zdtype = get_data_type_from_native_dtype(arr.dtype) + spec = ArraySpec( + shape=arr.shape, + dtype=zdtype, + fill_value=zdtype.cast_scalar(0), + config=ArrayConfig(order="C", write_empty_chunks=True), + prototype=default_buffer_prototype(), + ) + buf = default_buffer_prototype().buffer.from_array_like(arr.view("B")) + + encoded = codec._encode_sync(buf, spec) + assert encoded is not None + decoded = codec._decode_sync(encoded, spec) + result = np.frombuffer(decoded.as_numpy_array(), dtype="float64") + np.testing.assert_array_equal(arr, result) diff --git a/tests/test_codecs/test_transpose.py b/tests/test_codecs/test_transpose.py index 06ec668ad3..949bb72a62 100644 --- a/tests/test_codecs/test_transpose.py +++ b/tests/test_codecs/test_transpose.py @@ -3,9 +3,13 @@ import zarr from zarr import AsyncArray, config +from zarr.abc.codec import SupportsSyncCodec from zarr.abc.store import Store from zarr.codecs import TransposeCodec +from zarr.core.array_spec import ArrayConfig, ArraySpec +from zarr.core.buffer import NDBuffer, default_buffer_prototype from zarr.core.common import MemoryOrder +from zarr.core.dtype import get_data_type_from_native_dtype from zarr.storage import StorePath from .test_codecs import _AsyncArrayProxy @@ -93,3 +97,27 @@ def test_transpose_invalid( chunk_key_encoding={"name": "v2", "separator": "."}, filters=[TransposeCodec(order=order)], # type: ignore[arg-type] ) + + +def test_transpose_codec_supports_sync() -> None: + assert isinstance(TransposeCodec(order=(0, 1)), SupportsSyncCodec) + + +def test_transpose_codec_sync_roundtrip() -> None: + codec = TransposeCodec(order=(1, 0)) + arr = np.arange(12, dtype="float64").reshape(3, 4) + zdtype = get_data_type_from_native_dtype(arr.dtype) + spec = ArraySpec( + shape=arr.shape, + dtype=zdtype, + fill_value=zdtype.cast_scalar(0), + config=ArrayConfig(order="C", write_empty_chunks=True), + prototype=default_buffer_prototype(), + ) + nd_buf: NDBuffer = default_buffer_prototype().nd_buffer.from_numpy_array(arr) + + encoded = codec._encode_sync(nd_buf, spec) + assert encoded is not None + resolved_spec = codec.resolve_metadata(spec) + decoded = codec._decode_sync(encoded, resolved_spec) + np.testing.assert_array_equal(arr, decoded.as_numpy_array()) diff --git a/tests/test_codecs/test_vlen.py b/tests/test_codecs/test_vlen.py index cf0905daca..f3445824b3 100644 --- a/tests/test_codecs/test_vlen.py +++ b/tests/test_codecs/test_vlen.py @@ -5,9 +5,10 @@ import zarr from zarr import Array -from zarr.abc.codec import Codec +from zarr.abc.codec import Codec, SupportsSyncCodec from zarr.abc.store import Store from zarr.codecs import ZstdCodec +from zarr.codecs.vlen_utf8 import VLenBytesCodec, VLenUTF8Codec from zarr.core.dtype import get_data_type_from_native_dtype from zarr.core.dtype.npy.string import _NUMPY_SUPPORTS_VLEN_STRING from zarr.core.metadata.v3 import ArrayV3Metadata @@ -62,3 +63,11 @@ def test_vlen_string( assert np.array_equal(data, b[:, :]) assert b.metadata.data_type == get_data_type_from_native_dtype(data.dtype) assert a.dtype == data.dtype + + +def test_vlen_utf8_codec_supports_sync() -> None: + assert isinstance(VLenUTF8Codec(), SupportsSyncCodec) + + +def test_vlen_bytes_codec_supports_sync() -> None: + assert isinstance(VLenBytesCodec(), SupportsSyncCodec) diff --git a/tests/test_codecs/test_zstd.py b/tests/test_codecs/test_zstd.py index 6068f53443..68297e4d94 100644 --- a/tests/test_codecs/test_zstd.py +++ b/tests/test_codecs/test_zstd.py @@ -2,8 +2,12 @@ import pytest import zarr +from zarr.abc.codec import SupportsSyncCodec from zarr.abc.store import Store from zarr.codecs import ZstdCodec +from zarr.core.array_spec import ArrayConfig, ArraySpec +from zarr.core.buffer import default_buffer_prototype +from zarr.core.dtype import get_data_type_from_native_dtype from zarr.storage import StorePath @@ -23,3 +27,31 @@ def test_zstd(store: Store, checksum: bool) -> None: a[:, :] = data assert np.array_equal(data, a[:, :]) + + +def test_zstd_codec_supports_sync() -> None: + assert isinstance(ZstdCodec(), SupportsSyncCodec) + + +def test_zstd_is_not_fixed_size() -> None: + assert ZstdCodec.is_fixed_size is False + + +def test_zstd_codec_sync_roundtrip() -> None: + codec = ZstdCodec(level=1) + arr = np.arange(100, dtype="float64") + zdtype = get_data_type_from_native_dtype(arr.dtype) + spec = ArraySpec( + shape=arr.shape, + dtype=zdtype, + fill_value=zdtype.cast_scalar(0), + config=ArrayConfig(order="C", write_empty_chunks=True), + prototype=default_buffer_prototype(), + ) + buf = default_buffer_prototype().buffer.from_array_like(arr.view("B")) + + encoded = codec._encode_sync(buf, spec) + assert encoded is not None + decoded = codec._decode_sync(encoded, spec) + result = np.frombuffer(decoded.as_numpy_array(), dtype="float64") + np.testing.assert_array_equal(arr, result) diff --git a/tests/test_sync_codec_pipeline.py b/tests/test_sync_codec_pipeline.py new file mode 100644 index 0000000000..23fa28cb04 --- /dev/null +++ b/tests/test_sync_codec_pipeline.py @@ -0,0 +1,156 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +import numpy as np +import pytest + +from zarr.codecs.bytes import BytesCodec +from zarr.codecs.gzip import GzipCodec +from zarr.codecs.transpose import TransposeCodec +from zarr.codecs.zstd import ZstdCodec +from zarr.core.array_spec import ArrayConfig, ArraySpec +from zarr.core.buffer import NDBuffer, default_buffer_prototype +from zarr.core.dtype import get_data_type_from_native_dtype + +if TYPE_CHECKING: + from zarr.abc.codec import Codec + + +def _make_array_spec(shape: tuple[int, ...], dtype: np.dtype[np.generic]) -> ArraySpec: + zdtype = get_data_type_from_native_dtype(dtype) + return ArraySpec( + shape=shape, + dtype=zdtype, + fill_value=zdtype.cast_scalar(0), + config=ArrayConfig(order="C", write_empty_chunks=True), + prototype=default_buffer_prototype(), + ) + + +def _make_nd_buffer(arr: np.ndarray[Any, np.dtype[Any]]) -> NDBuffer: + return default_buffer_prototype().nd_buffer.from_numpy_array(arr) + + +class TestCodecChain: + def test_from_codecs_bytes_only(self) -> None: + from zarr.core.codec_pipeline import CodecChain + + chain = CodecChain.from_codecs([BytesCodec()]) + assert chain.array_array_codecs == () + assert isinstance(chain.array_bytes_codec, BytesCodec) + assert chain.bytes_bytes_codecs == () + assert chain._all_sync is True + + def test_from_codecs_with_compression(self) -> None: + from zarr.core.codec_pipeline import CodecChain + + chain = CodecChain.from_codecs([BytesCodec(), GzipCodec()]) + assert isinstance(chain.array_bytes_codec, BytesCodec) + assert len(chain.bytes_bytes_codecs) == 1 + assert isinstance(chain.bytes_bytes_codecs[0], GzipCodec) + assert chain._all_sync is True + + def test_from_codecs_with_transpose(self) -> None: + from zarr.core.codec_pipeline import CodecChain + + chain = CodecChain.from_codecs([TransposeCodec(order=(1, 0)), BytesCodec()]) + assert len(chain.array_array_codecs) == 1 + assert isinstance(chain.array_array_codecs[0], TransposeCodec) + assert isinstance(chain.array_bytes_codec, BytesCodec) + assert chain._all_sync is True + + def test_from_codecs_full_chain(self) -> None: + from zarr.core.codec_pipeline import CodecChain + + chain = CodecChain.from_codecs([TransposeCodec(order=(1, 0)), BytesCodec(), ZstdCodec()]) + assert len(chain.array_array_codecs) == 1 + assert isinstance(chain.array_bytes_codec, BytesCodec) + assert len(chain.bytes_bytes_codecs) == 1 + assert chain._all_sync is True + + def test_iter(self) -> None: + from zarr.core.codec_pipeline import CodecChain + + codecs: list[Codec] = [TransposeCodec(order=(1, 0)), BytesCodec(), GzipCodec()] + chain = CodecChain.from_codecs(codecs) + assert list(chain) == codecs + + def test_frozen(self) -> None: + from zarr.core.codec_pipeline import CodecChain + + chain = CodecChain.from_codecs([BytesCodec()]) + with pytest.raises(AttributeError): + chain.array_bytes_codec = BytesCodec() # type: ignore[misc] + + def test_encode_decode_roundtrip_bytes_only(self) -> None: + from zarr.core.codec_pipeline import CodecChain + + chain = CodecChain.from_codecs([BytesCodec()]) + arr = np.arange(100, dtype="float64") + spec = _make_array_spec(arr.shape, arr.dtype) + chain_evolved = CodecChain.from_codecs([c.evolve_from_array_spec(spec) for c in chain]) + nd_buf = _make_nd_buffer(arr) + + encoded = chain_evolved.encode_chunk(nd_buf, spec) + assert encoded is not None + decoded = chain_evolved.decode_chunk(encoded, spec) + assert decoded is not None + np.testing.assert_array_equal(arr, decoded.as_numpy_array()) + + def test_encode_decode_roundtrip_with_compression(self) -> None: + from zarr.core.codec_pipeline import CodecChain + + chain = CodecChain.from_codecs([BytesCodec(), GzipCodec(level=1)]) + arr = np.arange(100, dtype="float64") + spec = _make_array_spec(arr.shape, arr.dtype) + chain_evolved = CodecChain.from_codecs([c.evolve_from_array_spec(spec) for c in chain]) + nd_buf = _make_nd_buffer(arr) + + encoded = chain_evolved.encode_chunk(nd_buf, spec) + assert encoded is not None + decoded = chain_evolved.decode_chunk(encoded, spec) + assert decoded is not None + np.testing.assert_array_equal(arr, decoded.as_numpy_array()) + + def test_encode_decode_roundtrip_with_transpose(self) -> None: + from zarr.core.codec_pipeline import CodecChain + + chain = CodecChain.from_codecs( + [TransposeCodec(order=(1, 0)), BytesCodec(), ZstdCodec(level=1)] + ) + arr = np.arange(12, dtype="float64").reshape(3, 4) + spec = _make_array_spec(arr.shape, arr.dtype) + chain_evolved = CodecChain.from_codecs([c.evolve_from_array_spec(spec) for c in chain]) + nd_buf = _make_nd_buffer(arr) + + encoded = chain_evolved.encode_chunk(nd_buf, spec) + assert encoded is not None + decoded = chain_evolved.decode_chunk(encoded, spec) + assert decoded is not None + np.testing.assert_array_equal(arr, decoded.as_numpy_array()) + + def test_resolve_metadata_chain(self) -> None: + from zarr.core.codec_pipeline import CodecChain + + chain = CodecChain.from_codecs([TransposeCodec(order=(1, 0)), BytesCodec(), GzipCodec()]) + arr = np.zeros((3, 4), dtype="float64") + spec = _make_array_spec(arr.shape, arr.dtype) + chain_evolved = CodecChain.from_codecs([c.evolve_from_array_spec(spec) for c in chain]) + + aa_chain, ab_pair, bb_chain = chain_evolved.resolve_metadata_chain(spec) + assert len(aa_chain) == 1 + assert aa_chain[0][1].shape == (3, 4) # spec before transpose + _ab_codec, ab_spec = ab_pair + assert ab_spec.shape == (4, 3) # spec after transpose + assert len(bb_chain) == 1 + + def test_resolve_metadata(self) -> None: + from zarr.core.codec_pipeline import CodecChain + + chain = CodecChain.from_codecs([TransposeCodec(order=(1, 0)), BytesCodec()]) + spec = _make_array_spec((3, 4), np.dtype("float64")) + chain_evolved = CodecChain.from_codecs([c.evolve_from_array_spec(spec) for c in chain]) + resolved = chain_evolved.resolve_metadata(spec) + # After transpose (1,0) + bytes, shape should reflect the transpose + assert resolved.shape == (4, 3) From 41b7a6ad78361de0018cc8031e0280810924d680 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Wed, 25 Feb 2026 20:34:34 -0500 Subject: [PATCH 03/10] refactor codecchain --- src/zarr/core/codec_pipeline.py | 114 ++++++++++----------------- tests/test_sync_codec_pipeline.py | 123 ++++++------------------------ 2 files changed, 65 insertions(+), 172 deletions(-) diff --git a/src/zarr/core/codec_pipeline.py b/src/zarr/core/codec_pipeline.py index 7425ec30f6..9c0dd292ed 100644 --- a/src/zarr/core/codec_pipeline.py +++ b/src/zarr/core/codec_pipeline.py @@ -69,87 +69,67 @@ def fill_value_or_default(chunk_spec: ArraySpec) -> Any: return fill_value -@dataclass(frozen=True) +@dataclass(frozen=True, slots=True) class CodecChain: - """Lightweight codec chain: array-array -> array-bytes -> bytes-bytes. + """Codec chain with pre-resolved metadata specs. - Pure compute only -- no IO methods, no threading, no batching. + Constructed from an iterable of codecs and a chunk ArraySpec. + Resolves each codec against the spec so that encode/decode can + run without re-resolving. Pure compute only -- no IO, no threading, + no batching. """ - array_array_codecs: tuple[ArrayArrayCodec, ...] - array_bytes_codec: ArrayBytesCodec - bytes_bytes_codecs: tuple[BytesBytesCodec, ...] + codecs: tuple[Codec, ...] + chunk_spec: ArraySpec - _all_sync: bool = field(default=False, init=False, repr=False, compare=False) + _aa_codecs: tuple[tuple[ArrayArrayCodec, ArraySpec], ...] = field( + init=False, repr=False, compare=False + ) + _ab_codec: ArrayBytesCodec = field(init=False, repr=False, compare=False) + _ab_spec: ArraySpec = field(init=False, repr=False, compare=False) + _bb_codecs: tuple[BytesBytesCodec, ...] = field(init=False, repr=False, compare=False) + _all_sync: bool = field(init=False, repr=False, compare=False) def __post_init__(self) -> None: - object.__setattr__( - self, - "_all_sync", - all(isinstance(c, SupportsSyncCodec) for c in self), - ) - - def __iter__(self) -> Iterator[Codec]: - yield from self.array_array_codecs - yield self.array_bytes_codec - yield from self.bytes_bytes_codecs + aa, ab, bb = codecs_from_list(list(self.codecs)) - @classmethod - def from_codecs(cls, codecs: Iterable[Codec]) -> CodecChain: - aa, ab, bb = codecs_from_list(list(codecs)) - return cls(array_array_codecs=aa, array_bytes_codec=ab, bytes_bytes_codecs=bb) - - def resolve_metadata_chain( - self, chunk_spec: ArraySpec - ) -> tuple[ - list[tuple[ArrayArrayCodec, ArraySpec]], - tuple[ArrayBytesCodec, ArraySpec], - list[tuple[BytesBytesCodec, ArraySpec]], - ]: - """Resolve metadata through the codec chain for a single chunk_spec.""" - aa_codecs_with_spec: list[tuple[ArrayArrayCodec, ArraySpec]] = [] - spec = chunk_spec - for aa_codec in self.array_array_codecs: - aa_codecs_with_spec.append((aa_codec, spec)) + aa_pairs: list[tuple[ArrayArrayCodec, ArraySpec]] = [] + spec = self.chunk_spec + for aa_codec in aa: + aa_pairs.append((aa_codec, spec)) spec = aa_codec.resolve_metadata(spec) - ab_codec_with_spec = (self.array_bytes_codec, spec) - spec = self.array_bytes_codec.resolve_metadata(spec) + object.__setattr__(self, "_aa_codecs", tuple(aa_pairs)) + object.__setattr__(self, "_ab_codec", ab) + object.__setattr__(self, "_ab_spec", spec) - bb_codecs_with_spec: list[tuple[BytesBytesCodec, ArraySpec]] = [] - for bb_codec in self.bytes_bytes_codecs: - bb_codecs_with_spec.append((bb_codec, spec)) - spec = bb_codec.resolve_metadata(spec) + object.__setattr__(self, "_bb_codecs", bb) - return (aa_codecs_with_spec, ab_codec_with_spec, bb_codecs_with_spec) + object.__setattr__( + self, + "_all_sync", + all(isinstance(c, SupportsSyncCodec) for c in self.codecs), + ) + + @property + def all_sync(self) -> bool: + return self._all_sync def decode_chunk( self, chunk_bytes: Buffer, - chunk_spec: ArraySpec, - aa_chain: Iterable[tuple[ArrayArrayCodec, ArraySpec]] | None = None, - ab_pair: tuple[ArrayBytesCodec, ArraySpec] | None = None, - bb_chain: Iterable[tuple[BytesBytesCodec, ArraySpec]] | None = None, ) -> NDBuffer: """Decode a single chunk through the full codec chain, synchronously. Pure compute -- no IO. Only callable when all codecs support sync. - - The optional ``aa_chain``, ``ab_pair``, ``bb_chain`` parameters allow - pre-resolved metadata to be reused across many chunks with the same spec. - If not provided, ``resolve_metadata_chain`` is called internally. """ - if aa_chain is None or ab_pair is None or bb_chain is None: - aa_chain, ab_pair, bb_chain = self.resolve_metadata_chain(chunk_spec) - bb_out: Any = chunk_bytes - for bb_codec, spec in reversed(list(bb_chain)): - bb_out = cast("SupportsSyncCodec", bb_codec)._decode_sync(bb_out, spec) + for bb_codec in reversed(self._bb_codecs): + bb_out = cast("SupportsSyncCodec", bb_codec)._decode_sync(bb_out, self.chunk_spec) - ab_codec, ab_spec = ab_pair - ab_out: Any = cast("SupportsSyncCodec", ab_codec)._decode_sync(bb_out, ab_spec) + ab_out: Any = cast("SupportsSyncCodec", self._ab_codec)._decode_sync(bb_out, self._ab_spec) - for aa_codec, spec in reversed(list(aa_chain)): + for aa_codec, spec in reversed(self._aa_codecs): ab_out = cast("SupportsSyncCodec", aa_codec)._decode_sync(ab_out, spec) return ab_out # type: ignore[no-any-return] @@ -157,45 +137,35 @@ def decode_chunk( def encode_chunk( self, chunk_array: NDBuffer, - chunk_spec: ArraySpec, ) -> Buffer | None: """Encode a single chunk through the full codec chain, synchronously. Pure compute -- no IO. Only callable when all codecs support sync. """ - spec = chunk_spec aa_out: Any = chunk_array - for aa_codec in self.array_array_codecs: + for aa_codec, spec in self._aa_codecs: if aa_out is None: return None aa_out = cast("SupportsSyncCodec", aa_codec)._encode_sync(aa_out, spec) - spec = aa_codec.resolve_metadata(spec) if aa_out is None: return None - bb_out: Any = cast("SupportsSyncCodec", self.array_bytes_codec)._encode_sync(aa_out, spec) - spec = self.array_bytes_codec.resolve_metadata(spec) + bb_out: Any = cast("SupportsSyncCodec", self._ab_codec)._encode_sync(aa_out, self._ab_spec) - for bb_codec in self.bytes_bytes_codecs: + for bb_codec in self._bb_codecs: if bb_out is None: return None - bb_out = cast("SupportsSyncCodec", bb_codec)._encode_sync(bb_out, spec) - spec = bb_codec.resolve_metadata(spec) + bb_out = cast("SupportsSyncCodec", bb_codec)._encode_sync(bb_out, self.chunk_spec) return bb_out # type: ignore[no-any-return] def compute_encoded_size(self, byte_length: int, array_spec: ArraySpec) -> int: - for codec in self: + for codec in self.codecs: byte_length = codec.compute_encoded_size(byte_length, array_spec) array_spec = codec.resolve_metadata(array_spec) return byte_length - def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec: - for codec in self: - chunk_spec = codec.resolve_metadata(chunk_spec) - return chunk_spec - @dataclass(frozen=True) class BatchedCodecPipeline(CodecPipeline): diff --git a/tests/test_sync_codec_pipeline.py b/tests/test_sync_codec_pipeline.py index 23fa28cb04..192479dc59 100644 --- a/tests/test_sync_codec_pipeline.py +++ b/tests/test_sync_codec_pipeline.py @@ -1,9 +1,8 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any +from typing import Any import numpy as np -import pytest from zarr.codecs.bytes import BytesCodec from zarr.codecs.gzip import GzipCodec @@ -11,11 +10,9 @@ from zarr.codecs.zstd import ZstdCodec from zarr.core.array_spec import ArrayConfig, ArraySpec from zarr.core.buffer import NDBuffer, default_buffer_prototype +from zarr.core.codec_pipeline import CodecChain from zarr.core.dtype import get_data_type_from_native_dtype -if TYPE_CHECKING: - from zarr.abc.codec import Codec - def _make_array_spec(shape: tuple[int, ...], dtype: np.dtype[np.generic]) -> ArraySpec: zdtype = get_data_type_from_native_dtype(dtype) @@ -33,124 +30,50 @@ def _make_nd_buffer(arr: np.ndarray[Any, np.dtype[Any]]) -> NDBuffer: class TestCodecChain: - def test_from_codecs_bytes_only(self) -> None: - from zarr.core.codec_pipeline import CodecChain - - chain = CodecChain.from_codecs([BytesCodec()]) - assert chain.array_array_codecs == () - assert isinstance(chain.array_bytes_codec, BytesCodec) - assert chain.bytes_bytes_codecs == () - assert chain._all_sync is True - - def test_from_codecs_with_compression(self) -> None: - from zarr.core.codec_pipeline import CodecChain - - chain = CodecChain.from_codecs([BytesCodec(), GzipCodec()]) - assert isinstance(chain.array_bytes_codec, BytesCodec) - assert len(chain.bytes_bytes_codecs) == 1 - assert isinstance(chain.bytes_bytes_codecs[0], GzipCodec) - assert chain._all_sync is True - - def test_from_codecs_with_transpose(self) -> None: - from zarr.core.codec_pipeline import CodecChain - - chain = CodecChain.from_codecs([TransposeCodec(order=(1, 0)), BytesCodec()]) - assert len(chain.array_array_codecs) == 1 - assert isinstance(chain.array_array_codecs[0], TransposeCodec) - assert isinstance(chain.array_bytes_codec, BytesCodec) - assert chain._all_sync is True + def test_all_sync(self) -> None: + spec = _make_array_spec((100,), np.dtype("float64")) + chain = CodecChain((BytesCodec(),), spec) + assert chain.all_sync is True - def test_from_codecs_full_chain(self) -> None: - from zarr.core.codec_pipeline import CodecChain + def test_all_sync_with_compression(self) -> None: + spec = _make_array_spec((100,), np.dtype("float64")) + chain = CodecChain((BytesCodec(), GzipCodec()), spec) + assert chain.all_sync is True - chain = CodecChain.from_codecs([TransposeCodec(order=(1, 0)), BytesCodec(), ZstdCodec()]) - assert len(chain.array_array_codecs) == 1 - assert isinstance(chain.array_bytes_codec, BytesCodec) - assert len(chain.bytes_bytes_codecs) == 1 - assert chain._all_sync is True - - def test_iter(self) -> None: - from zarr.core.codec_pipeline import CodecChain - - codecs: list[Codec] = [TransposeCodec(order=(1, 0)), BytesCodec(), GzipCodec()] - chain = CodecChain.from_codecs(codecs) - assert list(chain) == codecs - - def test_frozen(self) -> None: - from zarr.core.codec_pipeline import CodecChain - - chain = CodecChain.from_codecs([BytesCodec()]) - with pytest.raises(AttributeError): - chain.array_bytes_codec = BytesCodec() # type: ignore[misc] + def test_all_sync_full_chain(self) -> None: + spec = _make_array_spec((3, 4), np.dtype("float64")) + chain = CodecChain((TransposeCodec(order=(1, 0)), BytesCodec(), ZstdCodec()), spec) + assert chain.all_sync is True def test_encode_decode_roundtrip_bytes_only(self) -> None: - from zarr.core.codec_pipeline import CodecChain - - chain = CodecChain.from_codecs([BytesCodec()]) arr = np.arange(100, dtype="float64") spec = _make_array_spec(arr.shape, arr.dtype) - chain_evolved = CodecChain.from_codecs([c.evolve_from_array_spec(spec) for c in chain]) + chain = CodecChain((BytesCodec(),), spec) nd_buf = _make_nd_buffer(arr) - encoded = chain_evolved.encode_chunk(nd_buf, spec) + encoded = chain.encode_chunk(nd_buf) assert encoded is not None - decoded = chain_evolved.decode_chunk(encoded, spec) - assert decoded is not None + decoded = chain.decode_chunk(encoded) np.testing.assert_array_equal(arr, decoded.as_numpy_array()) def test_encode_decode_roundtrip_with_compression(self) -> None: - from zarr.core.codec_pipeline import CodecChain - - chain = CodecChain.from_codecs([BytesCodec(), GzipCodec(level=1)]) arr = np.arange(100, dtype="float64") spec = _make_array_spec(arr.shape, arr.dtype) - chain_evolved = CodecChain.from_codecs([c.evolve_from_array_spec(spec) for c in chain]) + chain = CodecChain((BytesCodec(), GzipCodec(level=1)), spec) nd_buf = _make_nd_buffer(arr) - encoded = chain_evolved.encode_chunk(nd_buf, spec) + encoded = chain.encode_chunk(nd_buf) assert encoded is not None - decoded = chain_evolved.decode_chunk(encoded, spec) - assert decoded is not None + decoded = chain.decode_chunk(encoded) np.testing.assert_array_equal(arr, decoded.as_numpy_array()) def test_encode_decode_roundtrip_with_transpose(self) -> None: - from zarr.core.codec_pipeline import CodecChain - - chain = CodecChain.from_codecs( - [TransposeCodec(order=(1, 0)), BytesCodec(), ZstdCodec(level=1)] - ) arr = np.arange(12, dtype="float64").reshape(3, 4) spec = _make_array_spec(arr.shape, arr.dtype) - chain_evolved = CodecChain.from_codecs([c.evolve_from_array_spec(spec) for c in chain]) + chain = CodecChain((TransposeCodec(order=(1, 0)), BytesCodec(), ZstdCodec(level=1)), spec) nd_buf = _make_nd_buffer(arr) - encoded = chain_evolved.encode_chunk(nd_buf, spec) + encoded = chain.encode_chunk(nd_buf) assert encoded is not None - decoded = chain_evolved.decode_chunk(encoded, spec) - assert decoded is not None + decoded = chain.decode_chunk(encoded) np.testing.assert_array_equal(arr, decoded.as_numpy_array()) - - def test_resolve_metadata_chain(self) -> None: - from zarr.core.codec_pipeline import CodecChain - - chain = CodecChain.from_codecs([TransposeCodec(order=(1, 0)), BytesCodec(), GzipCodec()]) - arr = np.zeros((3, 4), dtype="float64") - spec = _make_array_spec(arr.shape, arr.dtype) - chain_evolved = CodecChain.from_codecs([c.evolve_from_array_spec(spec) for c in chain]) - - aa_chain, ab_pair, bb_chain = chain_evolved.resolve_metadata_chain(spec) - assert len(aa_chain) == 1 - assert aa_chain[0][1].shape == (3, 4) # spec before transpose - _ab_codec, ab_spec = ab_pair - assert ab_spec.shape == (4, 3) # spec after transpose - assert len(bb_chain) == 1 - - def test_resolve_metadata(self) -> None: - from zarr.core.codec_pipeline import CodecChain - - chain = CodecChain.from_codecs([TransposeCodec(order=(1, 0)), BytesCodec()]) - spec = _make_array_spec((3, 4), np.dtype("float64")) - chain_evolved = CodecChain.from_codecs([c.evolve_from_array_spec(spec) for c in chain]) - resolved = chain_evolved.resolve_metadata(spec) - # After transpose (1,0) + bytes, shape should reflect the transpose - assert resolved.shape == (4, 3) From 5a2a884e249277086fe6d706b25881b13b331f2d Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Wed, 25 Feb 2026 20:50:19 -0500 Subject: [PATCH 04/10] separate codecs and specs --- src/zarr/core/codec_pipeline.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/src/zarr/core/codec_pipeline.py b/src/zarr/core/codec_pipeline.py index 9c0dd292ed..4412ffa705 100644 --- a/src/zarr/core/codec_pipeline.py +++ b/src/zarr/core/codec_pipeline.py @@ -75,35 +75,37 @@ class CodecChain: Constructed from an iterable of codecs and a chunk ArraySpec. Resolves each codec against the spec so that encode/decode can - run without re-resolving. Pure compute only -- no IO, no threading, - no batching. + run without re-resolving. """ codecs: tuple[Codec, ...] chunk_spec: ArraySpec - _aa_codecs: tuple[tuple[ArrayArrayCodec, ArraySpec], ...] = field( - init=False, repr=False, compare=False - ) + _aa_codecs: tuple[ArrayArrayCodec, ...] = field(init=False, repr=False, compare=False) + _aa_specs: tuple[ArraySpec, ...] = field(init=False, repr=False, compare=False) _ab_codec: ArrayBytesCodec = field(init=False, repr=False, compare=False) _ab_spec: ArraySpec = field(init=False, repr=False, compare=False) _bb_codecs: tuple[BytesBytesCodec, ...] = field(init=False, repr=False, compare=False) + _bb_spec: ArraySpec = field(init=False, repr=False, compare=False) _all_sync: bool = field(init=False, repr=False, compare=False) def __post_init__(self) -> None: aa, ab, bb = codecs_from_list(list(self.codecs)) - aa_pairs: list[tuple[ArrayArrayCodec, ArraySpec]] = [] + aa_specs: list[ArraySpec] = [] spec = self.chunk_spec for aa_codec in aa: - aa_pairs.append((aa_codec, spec)) + aa_specs.append(spec) spec = aa_codec.resolve_metadata(spec) - object.__setattr__(self, "_aa_codecs", tuple(aa_pairs)) + object.__setattr__(self, "_aa_codecs", aa) + object.__setattr__(self, "_aa_specs", tuple(aa_specs)) object.__setattr__(self, "_ab_codec", ab) object.__setattr__(self, "_ab_spec", spec) + spec = ab.resolve_metadata(spec) object.__setattr__(self, "_bb_codecs", bb) + object.__setattr__(self, "_bb_spec", spec) object.__setattr__( self, @@ -125,11 +127,11 @@ def decode_chunk( """ bb_out: Any = chunk_bytes for bb_codec in reversed(self._bb_codecs): - bb_out = cast("SupportsSyncCodec", bb_codec)._decode_sync(bb_out, self.chunk_spec) + bb_out = cast("SupportsSyncCodec", bb_codec)._decode_sync(bb_out, self._bb_spec) ab_out: Any = cast("SupportsSyncCodec", self._ab_codec)._decode_sync(bb_out, self._ab_spec) - for aa_codec, spec in reversed(self._aa_codecs): + for aa_codec, spec in zip(reversed(self._aa_codecs), reversed(self._aa_specs), strict=True): ab_out = cast("SupportsSyncCodec", aa_codec)._decode_sync(ab_out, spec) return ab_out # type: ignore[no-any-return] @@ -144,7 +146,7 @@ def encode_chunk( """ aa_out: Any = chunk_array - for aa_codec, spec in self._aa_codecs: + for aa_codec, spec in zip(self._aa_codecs, self._aa_specs, strict=True): if aa_out is None: return None aa_out = cast("SupportsSyncCodec", aa_codec)._encode_sync(aa_out, spec) @@ -156,7 +158,7 @@ def encode_chunk( for bb_codec in self._bb_codecs: if bb_out is None: return None - bb_out = cast("SupportsSyncCodec", bb_codec)._encode_sync(bb_out, self.chunk_spec) + bb_out = cast("SupportsSyncCodec", bb_codec)._encode_sync(bb_out, self._bb_spec) return bb_out # type: ignore[no-any-return] From 4e262b17d120dd0ac9ac2e83a0e576d3e4876038 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Thu, 26 Feb 2026 11:05:55 -0500 Subject: [PATCH 05/10] add synchronous methods to stores --- src/zarr/abc/store.py | 44 +++++++++++- src/zarr/storage/_common.py | 27 +++++++ src/zarr/storage/_local.py | 69 ++++++++++++++++++ src/zarr/storage/_memory.py | 64 ++++++++++++++++- src/zarr/testing/store.py | 136 +++++++++++++++++++++++++++++++++++- tests/test_indexing.py | 17 +++++ 6 files changed, 354 insertions(+), 3 deletions(-) diff --git a/src/zarr/abc/store.py b/src/zarr/abc/store.py index 87df89a683..1d321f5fc3 100644 --- a/src/zarr/abc/store.py +++ b/src/zarr/abc/store.py @@ -16,7 +16,17 @@ from zarr.core.buffer import Buffer, BufferPrototype -__all__ = ["ByteGetter", "ByteSetter", "Store", "set_or_delete"] +__all__ = [ + "ByteGetter", + "ByteSetter", + "Store", + "SupportsDeleteSync", + "SupportsGetSync", + "SupportsSetRangeSync", + "SupportsSetSync", + "SupportsSyncStore", + "set_or_delete", +] @dataclass @@ -700,6 +710,38 @@ async def delete(self) -> None: ... async def set_if_not_exists(self, default: Buffer) -> None: ... +@runtime_checkable +class SupportsGetSync(Protocol): + def get_sync( + self, + key: str, + *, + prototype: BufferPrototype | None = None, + byte_range: ByteRequest | None = None, + ) -> Buffer | None: ... + + +@runtime_checkable +class SupportsSetSync(Protocol): + def set_sync(self, key: str, value: Buffer) -> None: ... + + +@runtime_checkable +class SupportsSetRangeSync(Protocol): + def set_range_sync(self, key: str, value: Buffer, start: int) -> None: ... + + +@runtime_checkable +class SupportsDeleteSync(Protocol): + def delete_sync(self, key: str) -> None: ... + + +@runtime_checkable +class SupportsSyncStore( + SupportsGetSync, SupportsSetSync, SupportsSetRangeSync, SupportsDeleteSync, Protocol +): ... + + async def set_or_delete(byte_setter: ByteSetter, value: Buffer | None) -> None: """Set or delete a value in a byte setter diff --git a/src/zarr/storage/_common.py b/src/zarr/storage/_common.py index 4bea04f024..c14aa1a37d 100644 --- a/src/zarr/storage/_common.py +++ b/src/zarr/storage/_common.py @@ -228,6 +228,33 @@ async def is_empty(self) -> bool: """ return await self.store.is_empty(self.path) + # ------------------------------------------------------------------- + # Synchronous IO delegation + # ------------------------------------------------------------------- + + def get_sync( + self, + *, + prototype: BufferPrototype | None = None, + byte_range: ByteRequest | None = None, + ) -> Buffer | None: + """Synchronous read — delegates to ``self.store.get_sync(self.path, ...)``.""" + if prototype is None: + prototype = default_buffer_prototype() + return self.store.get_sync(self.path, prototype=prototype, byte_range=byte_range) # type: ignore[attr-defined, no-any-return] + + def set_sync(self, value: Buffer) -> None: + """Synchronous write — delegates to ``self.store.set_sync(self.path, value)``.""" + self.store.set_sync(self.path, value) # type: ignore[attr-defined] + + def set_range_sync(self, value: Buffer, start: int) -> None: + """Synchronous byte-range write.""" + self.store.set_range_sync(self.path, value, start) # type: ignore[attr-defined] + + def delete_sync(self) -> None: + """Synchronous delete — delegates to ``self.store.delete_sync(self.path)``.""" + self.store.delete_sync(self.path) # type: ignore[attr-defined] + def __truediv__(self, other: str) -> StorePath: """Combine this store path with another path""" return self.__class__(self.store, _dereference_path(self.path, other)) diff --git a/src/zarr/storage/_local.py b/src/zarr/storage/_local.py index 80233a112d..dc05ff67a7 100644 --- a/src/zarr/storage/_local.py +++ b/src/zarr/storage/_local.py @@ -85,6 +85,19 @@ def _put(path: Path, value: Buffer, exclusive: bool = False) -> int: return f.write(view) +def _put_range(path: Path, value: Buffer, start: int) -> None: + view = value.as_buffer_like() + file_size = path.stat().st_size + if start + len(view) > file_size: + raise ValueError( + f"set_range would write beyond the end of the stored value: " + f"start={start}, len(value)={len(view)}, stored size={file_size}" + ) + with path.open("r+b") as f: + f.seek(start) + f.write(view) + + class LocalStore(Store): """ Store for the local file system. @@ -187,6 +200,62 @@ def __repr__(self) -> str: def __eq__(self, other: object) -> bool: return isinstance(other, type(self)) and self.root == other.root + # ------------------------------------------------------------------- + # Synchronous store methods + # ------------------------------------------------------------------- + + def _ensure_open_sync(self) -> None: + if not self._is_open: + if not self.read_only: + self.root.mkdir(parents=True, exist_ok=True) + if not self.root.exists(): + raise FileNotFoundError(f"{self.root} does not exist") + self._is_open = True + + def get_sync( + self, + key: str, + *, + prototype: BufferPrototype | None = None, + byte_range: ByteRequest | None = None, + ) -> Buffer | None: + if prototype is None: + prototype = default_buffer_prototype() + self._ensure_open_sync() + assert isinstance(key, str) + path = self.root / key + try: + return _get(path, prototype, byte_range) + except (FileNotFoundError, IsADirectoryError, NotADirectoryError): + return None + + def set_sync(self, key: str, value: Buffer) -> None: + self._ensure_open_sync() + self._check_writable() + assert isinstance(key, str) + if not isinstance(value, Buffer): + raise TypeError( + f"LocalStore.set(): `value` must be a Buffer instance. " + f"Got an instance of {type(value)} instead." + ) + path = self.root / key + _put(path, value) + + def set_range_sync(self, key: str, value: Buffer, start: int) -> None: + self._ensure_open_sync() + self._check_writable() + path = self.root / key + _put_range(path, value, start) + + def delete_sync(self, key: str) -> None: + self._ensure_open_sync() + self._check_writable() + path = self.root / key + if path.is_dir(): + shutil.rmtree(path) + else: + path.unlink(missing_ok=True) + async def get( self, key: str, diff --git a/src/zarr/storage/_memory.py b/src/zarr/storage/_memory.py index e6f9b7a512..168f3e8890 100644 --- a/src/zarr/storage/_memory.py +++ b/src/zarr/storage/_memory.py @@ -77,6 +77,49 @@ def __eq__(self, other: object) -> bool: and self.read_only == other.read_only ) + # ------------------------------------------------------------------- + # Synchronous store methods + # ------------------------------------------------------------------- + + def get_sync( + self, + key: str, + *, + prototype: BufferPrototype | None = None, + byte_range: ByteRequest | None = None, + ) -> Buffer | None: + if prototype is None: + prototype = default_buffer_prototype() + if not self._is_open: + self._is_open = True + assert isinstance(key, str) + try: + value = self._store_dict[key] + start, stop = _normalize_byte_range_index(value, byte_range) + return prototype.buffer.from_buffer(value[start:stop]) + except KeyError: + return None + + def set_sync(self, key: str, value: Buffer) -> None: + self._check_writable() + if not self._is_open: + self._is_open = True + assert isinstance(key, str) + if not isinstance(value, Buffer): + raise TypeError( + f"MemoryStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead." + ) + self._store_dict[key] = value + + def delete_sync(self, key: str) -> None: + self._check_writable() + if not self._is_open: + self._is_open = True + try: + del self._store_dict[key] + except KeyError: + logger.debug("Key %s does not exist.", key) + async def get( self, key: str, @@ -122,7 +165,6 @@ async def set(self, key: str, value: Buffer, byte_range: tuple[int, int] | None raise TypeError( f"MemoryStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead." ) - if byte_range is not None: buf = self._store_dict[key] buf[byte_range[0] : byte_range[1]] = value @@ -130,6 +172,26 @@ async def set(self, key: str, value: Buffer, byte_range: tuple[int, int] | None else: self._store_dict[key] = value + def _set_range_impl(self, key: str, value: Buffer, start: int) -> None: + buf = self._store_dict[key] + target = buf.as_numpy_array() + if start + len(value) > len(target): + raise ValueError( + f"set_range would write beyond the end of the stored value: " + f"start={start}, len(value)={len(value)}, stored size={len(target)}" + ) + if not target.flags.writeable: + target = target.copy() + self._store_dict[key] = buf.__class__(target) + target[start : start + len(value)] = value.as_numpy_array() + + def set_range_sync(self, key: str, value: Buffer, start: int) -> None: + """Synchronous byte-range write.""" + self._check_writable() + if not self._is_open: + self._is_open = True + self._set_range_impl(key, value, start) + async def set_if_not_exists(self, key: str, value: Buffer) -> None: # docstring inherited self._check_writable() diff --git a/src/zarr/testing/store.py b/src/zarr/testing/store.py index 1b8e85ed98..bb60cc371f 100644 --- a/src/zarr/testing/store.py +++ b/src/zarr/testing/store.py @@ -6,12 +6,13 @@ from abc import abstractmethod from typing import TYPE_CHECKING, Generic, Self, TypeVar +import numpy as np + from zarr.storage import WrapperStore if TYPE_CHECKING: from typing import Any - from zarr.abc.store import ByteRequest from zarr.core.buffer.core import BufferPrototype import pytest @@ -22,6 +23,10 @@ RangeByteRequest, Store, SuffixByteRequest, + SupportsDeleteSync, + SupportsGetSync, + SupportsSetRangeSync, + SupportsSetSync, ) from zarr.core.buffer import Buffer, default_buffer_prototype from zarr.core.sync import _collect_aiterator, sync @@ -39,6 +44,34 @@ class StoreTests(Generic[S, B]): store_cls: type[S] buffer_cls: type[B] + @staticmethod + def _require_get_sync(store: S) -> SupportsGetSync: + """Skip unless *store* implements :class:`SupportsGetSync`.""" + if not isinstance(store, SupportsGetSync): + pytest.skip("store does not implement SupportsGetSync") + return store # type: ignore[unreachable] + + @staticmethod + def _require_set_sync(store: S) -> SupportsSetSync: + """Skip unless *store* implements :class:`SupportsSetSync`.""" + if not isinstance(store, SupportsSetSync): + pytest.skip("store does not implement SupportsSetSync") + return store # type: ignore[unreachable] + + @staticmethod + def _require_set_range_sync(store: S) -> SupportsSetRangeSync: + """Skip unless *store* implements :class:`SupportsSetRangeSync`.""" + if not isinstance(store, SupportsSetRangeSync): + pytest.skip("store does not implement SupportsSetRangeSync") + return store # type: ignore[unreachable] + + @staticmethod + def _require_delete_sync(store: S) -> SupportsDeleteSync: + """Skip unless *store* implements :class:`SupportsDeleteSync`.""" + if not isinstance(store, SupportsDeleteSync): + pytest.skip("store does not implement SupportsDeleteSync") + return store # type: ignore[unreachable] + @abstractmethod async def set(self, store: S, key: str, value: Buffer) -> None: """ @@ -579,6 +612,107 @@ def test_get_json_sync(self, store: S) -> None: sync(self.set(store, key, self.buffer_cls.from_bytes(data_bytes))) assert store._get_json_sync(key, prototype=default_buffer_prototype()) == data + # ------------------------------------------------------------------- + # Synchronous store methods (SupportsSyncStore protocol) + # ------------------------------------------------------------------- + + def test_get_sync(self, store: S) -> None: + getter = self._require_get_sync(store) + data_buf = self.buffer_cls.from_bytes(b"\x01\x02\x03\x04") + key = "sync_get" + sync(self.set(store, key, data_buf)) + result = getter.get_sync(key) + assert result is not None + assert_bytes_equal(result, data_buf) + + def test_get_sync_missing(self, store: S) -> None: + getter = self._require_get_sync(store) + result = getter.get_sync("nonexistent") + assert result is None + + def test_set_sync(self, store: S) -> None: + setter = self._require_set_sync(store) + data_buf = self.buffer_cls.from_bytes(b"\x01\x02\x03\x04") + key = "sync_set" + setter.set_sync(key, data_buf) + result = sync(self.get(store, key)) + assert_bytes_equal(result, data_buf) + + def test_delete_sync(self, store: S) -> None: + setter = self._require_set_sync(store) + deleter = self._require_delete_sync(store) + getter = self._require_get_sync(store) + if not store.supports_deletes: + pytest.skip("store does not support deletes") + data_buf = self.buffer_cls.from_bytes(b"\x01\x02\x03\x04") + key = "sync_delete" + setter.set_sync(key, data_buf) + deleter.delete_sync(key) + result = getter.get_sync(key) + assert result is None + + def test_delete_sync_missing(self, store: S) -> None: + deleter = self._require_delete_sync(store) + if not store.supports_deletes: + pytest.skip("store does not support deletes") + # should not raise + deleter.delete_sync("nonexistent_sync") + + # ------------------------------------------------------------------- + # set_range (sync only — set_range is exclusively a sync-path API) + # ------------------------------------------------------------------- + + def test_set_range_sync(self, store: S) -> None: + setter = self._require_set_sync(store) + ranger = self._require_set_range_sync(store) + getter = self._require_get_sync(store) + data_buf = self.buffer_cls.from_bytes(b"hello world") + key = "range_sync_key" + setter.set_sync(key, data_buf) + patch = default_buffer_prototype().buffer.from_bytes(b"WORLD") + ranger.set_range_sync(key, patch, 6) + result = getter.get_sync(key) + assert result is not None + assert result.to_bytes() == b"hello WORLD" + + def test_set_range_sync_preserves_other_bytes(self, store: S) -> None: + setter = self._require_set_sync(store) + ranger = self._require_set_range_sync(store) + getter = self._require_get_sync(store) + data = np.arange(100, dtype="uint8") + data_buf = default_buffer_prototype().buffer.from_array_like(data) + key = "range_preserve" + setter.set_sync(key, data_buf) + patch = np.full(10, 255, dtype="uint8") + patch_buf = default_buffer_prototype().buffer.from_array_like(patch) + ranger.set_range_sync(key, patch_buf, 50) + result = getter.get_sync(key) + assert result is not None + result_arr = np.frombuffer(result.to_bytes(), dtype="uint8") + expected = data.copy() + expected[50:60] = 255 + np.testing.assert_array_equal(result_arr, expected) + + def test_set_range_sync_beyond_end_raises(self, store: S) -> None: + setter = self._require_set_sync(store) + ranger = self._require_set_range_sync(store) + data_buf = self.buffer_cls.from_bytes(b"hello") + key = "range_oob" + setter.set_sync(key, data_buf) + patch = default_buffer_prototype().buffer.from_bytes(b"world!") + with pytest.raises(ValueError, match="set_range would write beyond"): + ranger.set_range_sync(key, patch, 0) + + def test_set_range_sync_start_beyond_end_raises(self, store: S) -> None: + setter = self._require_set_sync(store) + ranger = self._require_set_range_sync(store) + data_buf = self.buffer_cls.from_bytes(b"hello") + key = "range_oob2" + setter.set_sync(key, data_buf) + patch = default_buffer_prototype().buffer.from_bytes(b"x") + with pytest.raises(ValueError, match="set_range would write beyond"): + ranger.set_range_sync(key, patch, 10) + class LatencyStore(WrapperStore[Store]): """ diff --git a/tests/test_indexing.py b/tests/test_indexing.py index c0bf7dd270..9c734fb0c3 100644 --- a/tests/test_indexing.py +++ b/tests/test_indexing.py @@ -34,6 +34,7 @@ if TYPE_CHECKING: from collections.abc import AsyncGenerator + from zarr.abc.store import ByteRequest from zarr.core.buffer import BufferPrototype from zarr.core.buffer.core import Buffer @@ -83,6 +84,22 @@ async def set(self, key: str, value: Buffer, byte_range: tuple[int, int] | None self.counter["__setitem__", key_suffix] += 1 return await super().set(key, value, byte_range) + def get_sync( + self, + key: str, + *, + prototype: BufferPrototype | None = None, + byte_range: ByteRequest | None = None, + ) -> Buffer | None: + key_suffix = "/".join(key.split("/")[1:]) + self.counter["__getitem__", key_suffix] += 1 + return super().get_sync(key, prototype=prototype, byte_range=byte_range) + + def set_sync(self, key: str, value: Buffer) -> None: + key_suffix = "/".join(key.split("/")[1:]) + self.counter["__setitem__", key_suffix] += 1 + return super().set_sync(key, value) + def test_normalize_integer_selection() -> None: assert 1 == normalize_integer_selection(1, 100) From 71a780b157164667c7684a2c0485b77d2afc530f Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 27 Feb 2026 14:40:52 -0500 Subject: [PATCH 06/10] chunktransform --- src/zarr/core/codec_pipeline.py | 66 +++++++++++++++++-------------- tests/test_sync_codec_pipeline.py | 47 ++++++++++++++++++---- 2 files changed, 75 insertions(+), 38 deletions(-) diff --git a/src/zarr/core/codec_pipeline.py b/src/zarr/core/codec_pipeline.py index 4412ffa705..0c3cccb1d9 100644 --- a/src/zarr/core/codec_pipeline.py +++ b/src/zarr/core/codec_pipeline.py @@ -69,49 +69,55 @@ def fill_value_or_default(chunk_spec: ArraySpec) -> Any: return fill_value -@dataclass(frozen=True, slots=True) -class CodecChain: - """Codec chain with pre-resolved metadata specs. +@dataclass(slots=True, kw_only=True) +class ChunkTransform: + """A stored chunk, modeled as a layered array. - Constructed from an iterable of codecs and a chunk ArraySpec. - Resolves each codec against the spec so that encode/decode can - run without re-resolving. + Each layer corresponds to one ArrayArrayCodec and the ArraySpec + at its input boundary. ``layers[0]`` is the outermost (user-visible) + transform; after the last layer comes the ArrayBytesCodec. + + The chunk's ``shape`` and ``dtype`` reflect the representation + **after** all ArrayArrayCodec layers have been applied — i.e. the + spec that feeds the ArrayBytesCodec. """ codecs: tuple[Codec, ...] - chunk_spec: ArraySpec + array_spec: ArraySpec - _aa_codecs: tuple[ArrayArrayCodec, ...] = field(init=False, repr=False, compare=False) - _aa_specs: tuple[ArraySpec, ...] = field(init=False, repr=False, compare=False) + # Each element is (ArrayArrayCodec, input_spec_for_that_codec). + layers: tuple[tuple[ArrayArrayCodec, ArraySpec], ...] = field( + init=False, repr=False, compare=False + ) _ab_codec: ArrayBytesCodec = field(init=False, repr=False, compare=False) _ab_spec: ArraySpec = field(init=False, repr=False, compare=False) _bb_codecs: tuple[BytesBytesCodec, ...] = field(init=False, repr=False, compare=False) - _bb_spec: ArraySpec = field(init=False, repr=False, compare=False) _all_sync: bool = field(init=False, repr=False, compare=False) def __post_init__(self) -> None: aa, ab, bb = codecs_from_list(list(self.codecs)) - aa_specs: list[ArraySpec] = [] - spec = self.chunk_spec + layers: tuple[tuple[ArrayArrayCodec, ArraySpec], ...] = () + spec = self.array_spec for aa_codec in aa: - aa_specs.append(spec) + layers = (*layers, (aa_codec, spec)) spec = aa_codec.resolve_metadata(spec) - object.__setattr__(self, "_aa_codecs", aa) - object.__setattr__(self, "_aa_specs", tuple(aa_specs)) - object.__setattr__(self, "_ab_codec", ab) - object.__setattr__(self, "_ab_spec", spec) + self.layers = layers + self._ab_codec = ab + self._ab_spec = spec + self._bb_codecs = bb + self._all_sync = all(isinstance(c, SupportsSyncCodec) for c in self.codecs) - spec = ab.resolve_metadata(spec) - object.__setattr__(self, "_bb_codecs", bb) - object.__setattr__(self, "_bb_spec", spec) + @property + def shape(self) -> tuple[int, ...]: + """Shape after all ArrayArrayCodec layers (input to the ArrayBytesCodec).""" + return self._ab_spec.shape - object.__setattr__( - self, - "_all_sync", - all(isinstance(c, SupportsSyncCodec) for c in self.codecs), - ) + @property + def dtype(self) -> ZDType[TBaseDType, TBaseScalar]: + """Dtype after all ArrayArrayCodec layers (input to the ArrayBytesCodec).""" + return self._ab_spec.dtype @property def all_sync(self) -> bool: @@ -127,11 +133,11 @@ def decode_chunk( """ bb_out: Any = chunk_bytes for bb_codec in reversed(self._bb_codecs): - bb_out = cast("SupportsSyncCodec", bb_codec)._decode_sync(bb_out, self._bb_spec) + bb_out = cast("SupportsSyncCodec", bb_codec)._decode_sync(bb_out, self._ab_spec) ab_out: Any = cast("SupportsSyncCodec", self._ab_codec)._decode_sync(bb_out, self._ab_spec) - for aa_codec, spec in zip(reversed(self._aa_codecs), reversed(self._aa_specs), strict=True): + for aa_codec, spec in reversed(self.layers): ab_out = cast("SupportsSyncCodec", aa_codec)._decode_sync(ab_out, spec) return ab_out # type: ignore[no-any-return] @@ -146,7 +152,7 @@ def encode_chunk( """ aa_out: Any = chunk_array - for aa_codec, spec in zip(self._aa_codecs, self._aa_specs, strict=True): + for aa_codec, spec in self.layers: if aa_out is None: return None aa_out = cast("SupportsSyncCodec", aa_codec)._encode_sync(aa_out, spec) @@ -158,7 +164,7 @@ def encode_chunk( for bb_codec in self._bb_codecs: if bb_out is None: return None - bb_out = cast("SupportsSyncCodec", bb_codec)._encode_sync(bb_out, self._bb_spec) + bb_out = cast("SupportsSyncCodec", bb_codec)._encode_sync(bb_out, self._ab_spec) return bb_out # type: ignore[no-any-return] @@ -369,7 +375,7 @@ async def read_batch( out[out_selection] = fill_value_or_default(chunk_spec) else: chunk_bytes_batch = await concurrent_map( - [(byte_getter, array_spec.prototype) for byte_getter, array_spec, *_ in batch_info], + [(byte_getter, chunk_spec.prototype) for byte_getter, chunk_spec, *_ in batch_info], lambda byte_getter, prototype: byte_getter.get(prototype), config.get("async.concurrency"), ) diff --git a/tests/test_sync_codec_pipeline.py b/tests/test_sync_codec_pipeline.py index 192479dc59..e9d05dcec6 100644 --- a/tests/test_sync_codec_pipeline.py +++ b/tests/test_sync_codec_pipeline.py @@ -10,7 +10,7 @@ from zarr.codecs.zstd import ZstdCodec from zarr.core.array_spec import ArrayConfig, ArraySpec from zarr.core.buffer import NDBuffer, default_buffer_prototype -from zarr.core.codec_pipeline import CodecChain +from zarr.core.codec_pipeline import ChunkTransform from zarr.core.dtype import get_data_type_from_native_dtype @@ -29,26 +29,28 @@ def _make_nd_buffer(arr: np.ndarray[Any, np.dtype[Any]]) -> NDBuffer: return default_buffer_prototype().nd_buffer.from_numpy_array(arr) -class TestCodecChain: +class TestChunkTransform: def test_all_sync(self) -> None: spec = _make_array_spec((100,), np.dtype("float64")) - chain = CodecChain((BytesCodec(),), spec) + chain = ChunkTransform(codecs=(BytesCodec(),), array_spec=spec) assert chain.all_sync is True def test_all_sync_with_compression(self) -> None: spec = _make_array_spec((100,), np.dtype("float64")) - chain = CodecChain((BytesCodec(), GzipCodec()), spec) + chain = ChunkTransform(codecs=(BytesCodec(), GzipCodec()), array_spec=spec) assert chain.all_sync is True def test_all_sync_full_chain(self) -> None: spec = _make_array_spec((3, 4), np.dtype("float64")) - chain = CodecChain((TransposeCodec(order=(1, 0)), BytesCodec(), ZstdCodec()), spec) + chain = ChunkTransform( + codecs=(TransposeCodec(order=(1, 0)), BytesCodec(), ZstdCodec()), array_spec=spec + ) assert chain.all_sync is True def test_encode_decode_roundtrip_bytes_only(self) -> None: arr = np.arange(100, dtype="float64") spec = _make_array_spec(arr.shape, arr.dtype) - chain = CodecChain((BytesCodec(),), spec) + chain = ChunkTransform(codecs=(BytesCodec(),), array_spec=spec) nd_buf = _make_nd_buffer(arr) encoded = chain.encode_chunk(nd_buf) @@ -56,10 +58,36 @@ def test_encode_decode_roundtrip_bytes_only(self) -> None: decoded = chain.decode_chunk(encoded) np.testing.assert_array_equal(arr, decoded.as_numpy_array()) + def test_layers_no_aa_codecs(self) -> None: + spec = _make_array_spec((100,), np.dtype("float64")) + chunk = ChunkTransform(codecs=(BytesCodec(), GzipCodec()), array_spec=spec) + assert chunk.layers == () + + def test_layers_with_transpose(self) -> None: + spec = _make_array_spec((3, 4), np.dtype("float64")) + transpose = TransposeCodec(order=(1, 0)) + chunk = ChunkTransform(codecs=(transpose, BytesCodec(), ZstdCodec()), array_spec=spec) + assert len(chunk.layers) == 1 + assert chunk.layers[0][0] is transpose + assert chunk.layers[0][1] is spec + + def test_shape_dtype_no_aa_codecs(self) -> None: + spec = _make_array_spec((100,), np.dtype("float64")) + chunk = ChunkTransform(codecs=(BytesCodec(),), array_spec=spec) + assert chunk.shape == (100,) + assert chunk.dtype == spec.dtype + + def test_shape_dtype_with_transpose(self) -> None: + spec = _make_array_spec((3, 4), np.dtype("float64")) + chunk = ChunkTransform(codecs=(TransposeCodec(order=(1, 0)), BytesCodec()), array_spec=spec) + # After transpose (1,0), shape (3,4) becomes (4,3) + assert chunk.shape == (4, 3) + assert chunk.dtype == spec.dtype + def test_encode_decode_roundtrip_with_compression(self) -> None: arr = np.arange(100, dtype="float64") spec = _make_array_spec(arr.shape, arr.dtype) - chain = CodecChain((BytesCodec(), GzipCodec(level=1)), spec) + chain = ChunkTransform(codecs=(BytesCodec(), GzipCodec(level=1)), array_spec=spec) nd_buf = _make_nd_buffer(arr) encoded = chain.encode_chunk(nd_buf) @@ -70,7 +98,10 @@ def test_encode_decode_roundtrip_with_compression(self) -> None: def test_encode_decode_roundtrip_with_transpose(self) -> None: arr = np.arange(12, dtype="float64").reshape(3, 4) spec = _make_array_spec(arr.shape, arr.dtype) - chain = CodecChain((TransposeCodec(order=(1, 0)), BytesCodec(), ZstdCodec(level=1)), spec) + chain = ChunkTransform( + codecs=(TransposeCodec(order=(1, 0)), BytesCodec(), ZstdCodec(level=1)), + array_spec=spec, + ) nd_buf = _make_nd_buffer(arr) encoded = chain.encode_chunk(nd_buf) From c2131e0358ed005982bb4e620a126ed173260a90 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Thu, 26 Feb 2026 17:51:33 -0500 Subject: [PATCH 07/10] add prepared write logic --- src/zarr/abc/codec.py | 183 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 181 insertions(+), 2 deletions(-) diff --git a/src/zarr/abc/codec.py b/src/zarr/abc/codec.py index 3ec5ec522b..30189e4cbb 100644 --- a/src/zarr/abc/codec.py +++ b/src/zarr/abc/codec.py @@ -2,7 +2,8 @@ from abc import abstractmethod from collections.abc import Mapping -from typing import TYPE_CHECKING, Generic, Protocol, TypeGuard, TypeVar, runtime_checkable +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any, Generic, Protocol, TypeGuard, TypeVar, runtime_checkable from typing_extensions import ReadOnly, TypedDict @@ -19,7 +20,7 @@ from zarr.core.array_spec import ArraySpec from zarr.core.chunk_grids import ChunkGrid from zarr.core.dtype.wrapper import TBaseDType, TBaseScalar, ZDType - from zarr.core.indexing import SelectorTuple + from zarr.core.indexing import ChunkProjection, SelectorTuple from zarr.core.metadata import ArrayMetadata __all__ = [ @@ -32,6 +33,7 @@ "CodecInput", "CodecOutput", "CodecPipeline", + "PreparedWrite", "SupportsSyncCodec", ] @@ -204,9 +206,186 @@ class ArrayArrayCodec(BaseCodec[NDBuffer, NDBuffer]): """Base class for array-to-array codecs.""" +@dataclass +class PreparedWrite: + """Result of ``prepare_write``: existing encoded chunk bytes + selection info.""" + + chunk_dict: dict[tuple[int, ...], Buffer | None] + inner_codec_chain: Any # CodecChain — typed as Any to avoid circular import + inner_chunk_spec: ArraySpec + indexer: list[ChunkProjection] + value_selection: SelectorTuple | None = None + write_full_shard: bool = True + is_complete_shard: bool = False + shard_data: NDBuffer | None = field(default=None) + + class ArrayBytesCodec(BaseCodec[NDBuffer, Buffer]): """Base class for array-to-bytes codecs.""" + @property + def inner_codec_chain(self) -> Any: + """The codec chain for decoding inner chunks after deserialization. + + Returns ``None`` by default — the pipeline should use its own codec chain. + ``ShardingCodec`` overrides to return its inner codec chain. + """ + return None + + def deserialize( + self, raw: Buffer | None, chunk_spec: ArraySpec + ) -> dict[tuple[int, ...], Buffer | None]: + """Unpack stored bytes into per-inner-chunk buffers. + + Default: single chunk keyed at ``(0,)``. + ``ShardingCodec`` overrides to decode the shard index and slice the + blob into per-chunk buffers. + """ + return {(0,): raw} + + def serialize( + self, + chunk_dict: dict[tuple[int, ...], Buffer | None], + chunk_spec: ArraySpec, + ) -> Buffer | None: + """Pack per-inner-chunk buffers into a storage blob. + + Default: return the single chunk's bytes (or ``None`` if absent). + ``ShardingCodec`` overrides to concatenate chunks and build an index. + Returns ``None`` when all chunks are empty (caller should delete the key). + """ + return chunk_dict.get((0,)) + + # ------------------------------------------------------------------ + # prepare / finalize — sync + # ------------------------------------------------------------------ + + def prepare_read_sync( + self, + byte_getter: Any, + chunk_spec: ArraySpec, + chunk_selection: SelectorTuple, + codec_chain: Any, + aa_chain: Any, + ab_pair: Any, + bb_chain: Any, + ) -> NDBuffer | None: + """Sync IO + full decode for the selected region.""" + raw = byte_getter.get_sync(prototype=chunk_spec.prototype) + chunk_array: NDBuffer | None = codec_chain.decode_chunk( + raw, chunk_spec, aa_chain, ab_pair, bb_chain + ) + if chunk_array is not None: + return chunk_array[chunk_selection] + return None + + def prepare_write_sync( + self, + byte_setter: Any, + chunk_spec: ArraySpec, + chunk_selection: SelectorTuple, + out_selection: SelectorTuple, + replace: bool, + codec_chain: Any, + ) -> PreparedWrite: + """Sync IO + deserialize. Returns a :class:`PreparedWrite`.""" + existing: Buffer | None = None + if not replace: + existing = byte_setter.get_sync(prototype=chunk_spec.prototype) + chunk_dict = self.deserialize(existing, chunk_spec) + inner_chain = self.inner_codec_chain or codec_chain + return PreparedWrite( + chunk_dict=chunk_dict, + inner_codec_chain=inner_chain, + inner_chunk_spec=chunk_spec, + indexer=[ + ( # type: ignore[list-item] + (0,), + chunk_selection, + out_selection, + replace, + ) + ], + ) + + def finalize_write_sync( + self, + prepared: PreparedWrite, + chunk_spec: ArraySpec, + byte_setter: Any, + ) -> None: + """Serialize the prepared *chunk_dict* and write to store.""" + blob = self.serialize(prepared.chunk_dict, chunk_spec) + if blob is None: + byte_setter.delete_sync() + else: + byte_setter.set_sync(blob) + + # ------------------------------------------------------------------ + # prepare / finalize — async + # ------------------------------------------------------------------ + + async def prepare_read( + self, + byte_getter: Any, + chunk_spec: ArraySpec, + chunk_selection: SelectorTuple, + codec_chain: Any, + aa_chain: Any, + ab_pair: Any, + bb_chain: Any, + ) -> NDBuffer | None: + """Async IO + full decode for the selected region.""" + raw = await byte_getter.get(prototype=chunk_spec.prototype) + chunk_array: NDBuffer | None = codec_chain.decode_chunk( + raw, chunk_spec, aa_chain, ab_pair, bb_chain + ) + if chunk_array is not None: + return chunk_array[chunk_selection] + return None + + async def prepare_write( + self, + byte_setter: Any, + chunk_spec: ArraySpec, + chunk_selection: SelectorTuple, + out_selection: SelectorTuple, + replace: bool, + codec_chain: Any, + ) -> PreparedWrite: + """Async IO + deserialize. Returns a :class:`PreparedWrite`.""" + existing: Buffer | None = None + if not replace: + existing = await byte_setter.get(prototype=chunk_spec.prototype) + chunk_dict = self.deserialize(existing, chunk_spec) + inner_chain = self.inner_codec_chain or codec_chain + return PreparedWrite( + chunk_dict=chunk_dict, + inner_codec_chain=inner_chain, + inner_chunk_spec=chunk_spec, + indexer=[ + ( # type: ignore[list-item] + (0,), + chunk_selection, + out_selection, + replace, + ) + ], + ) + + async def finalize_write( + self, + prepared: PreparedWrite, + chunk_spec: ArraySpec, + byte_setter: Any, + ) -> None: + """Async version of :meth:`finalize_write_sync`.""" + blob = self.serialize(prepared.chunk_dict, chunk_spec) + if blob is None: + await byte_setter.delete() + else: + await byte_setter.set(blob) + class BytesBytesCodec(BaseCodec[Buffer, Buffer]): """Base class for bytes-to-bytes codecs.""" From 4305dc0ee3c33ccfa387e025203f9a2025230909 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Thu, 26 Feb 2026 23:42:49 -0500 Subject: [PATCH 08/10] add prepared write semantics --- src/zarr/abc/codec.py | 267 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 228 insertions(+), 39 deletions(-) diff --git a/src/zarr/abc/codec.py b/src/zarr/abc/codec.py index 30189e4cbb..6d7eabd6cf 100644 --- a/src/zarr/abc/codec.py +++ b/src/zarr/abc/codec.py @@ -34,6 +34,7 @@ "CodecOutput", "CodecPipeline", "PreparedWrite", + "SupportsChunkCodec", "SupportsSyncCodec", ] @@ -79,6 +80,17 @@ def _encode_sync( ) -> NDBuffer | Buffer | None: ... +class SupportsChunkCodec(Protocol): + """Protocol for objects that can decode/encode whole chunks synchronously. + + [`CodecChain`][zarr.core.codec_pipeline.CodecChain] satisfies this protocol. + """ + + def decode_chunk(self, chunk_bytes: Buffer) -> NDBuffer: ... + + def encode_chunk(self, chunk_array: NDBuffer) -> Buffer | None: ... + + class BaseCodec(Metadata, Generic[CodecInput, CodecOutput]): """Generic base class for codecs. @@ -208,10 +220,37 @@ class ArrayArrayCodec(BaseCodec[NDBuffer, NDBuffer]): @dataclass class PreparedWrite: - """Result of ``prepare_write``: existing encoded chunk bytes + selection info.""" + """Result of the prepare phase of a write operation. + + Carries deserialized chunk data and selection metadata between + [`prepare_write`][zarr.abc.codec.ArrayBytesCodec.prepare_write] (or + [`prepare_write_sync`][zarr.abc.codec.ArrayBytesCodec.prepare_write_sync]) + and [`finalize_write`][zarr.abc.codec.ArrayBytesCodec.finalize_write] (or + [`finalize_write_sync`][zarr.abc.codec.ArrayBytesCodec.finalize_write_sync]). + + Attributes + ---------- + chunk_dict : dict[tuple[int, ...], Buffer | None] + Per-inner-chunk buffers keyed by chunk coordinates. + inner_codec_chain : SupportsChunkCodec + The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec] for + decoding/encoding inner chunks. + inner_chunk_spec : ArraySpec + The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for inner chunks. + indexer : list[ChunkProjection] + Mapping from inner-chunk coordinates to value/output selections. + value_selection : SelectorTuple | None + Outer ``out_selection`` for sharding. Unused by the base implementation. + write_full_shard : bool + Whether the full shard blob will be written. Unused by the base implementation. + is_complete_shard : bool + Fast-path flag for complete shard writes. Unused by the base implementation. + shard_data : NDBuffer | None + Full shard value for complete writes. Unused by the base implementation. + """ chunk_dict: dict[tuple[int, ...], Buffer | None] - inner_codec_chain: Any # CodecChain — typed as Any to avoid circular import + inner_codec_chain: SupportsChunkCodec inner_chunk_spec: ArraySpec indexer: list[ChunkProjection] value_selection: SelectorTuple | None = None @@ -224,11 +263,18 @@ class ArrayBytesCodec(BaseCodec[NDBuffer, Buffer]): """Base class for array-to-bytes codecs.""" @property - def inner_codec_chain(self) -> Any: + def inner_codec_chain(self) -> SupportsChunkCodec | None: """The codec chain for decoding inner chunks after deserialization. - Returns ``None`` by default — the pipeline should use its own codec chain. - ``ShardingCodec`` overrides to return its inner codec chain. + Returns ``None`` by default, meaning the pipeline should use its own + codec chain. ``ShardingCodec`` overrides this to return its inner + codec chain. + + Returns + ------- + SupportsChunkCodec or None + A [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec] instance, + or ``None``. """ return None @@ -237,9 +283,22 @@ def deserialize( ) -> dict[tuple[int, ...], Buffer | None]: """Unpack stored bytes into per-inner-chunk buffers. - Default: single chunk keyed at ``(0,)``. - ``ShardingCodec`` overrides to decode the shard index and slice the - blob into per-chunk buffers. + The default implementation returns a single-entry dict keyed at + ``(0,)``. ``ShardingCodec`` overrides this to decode the shard index + and split the blob into per-chunk buffers. + + Parameters + ---------- + raw : Buffer or None + The raw bytes read from the store, or ``None`` if the key was + absent. + chunk_spec : ArraySpec + The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk. + + Returns + ------- + dict[tuple[int, ...], Buffer | None] + Mapping from inner-chunk coordinates to their encoded bytes. """ return {(0,): raw} @@ -250,9 +309,22 @@ def serialize( ) -> Buffer | None: """Pack per-inner-chunk buffers into a storage blob. - Default: return the single chunk's bytes (or ``None`` if absent). - ``ShardingCodec`` overrides to concatenate chunks and build an index. - Returns ``None`` when all chunks are empty (caller should delete the key). + The default implementation returns the single entry at ``(0,)``. + ``ShardingCodec`` overrides this to concatenate chunks and build a + shard index. + + Parameters + ---------- + chunk_dict : dict[tuple[int, ...], Buffer | None] + Mapping from inner-chunk coordinates to their encoded bytes. + chunk_spec : ArraySpec + The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk. + + Returns + ------- + Buffer or None + The serialized blob, or ``None`` when all chunks are empty + (the caller should delete the key). """ return chunk_dict.get((0,)) @@ -265,19 +337,35 @@ def prepare_read_sync( byte_getter: Any, chunk_spec: ArraySpec, chunk_selection: SelectorTuple, - codec_chain: Any, - aa_chain: Any, - ab_pair: Any, - bb_chain: Any, + codec_chain: SupportsChunkCodec, ) -> NDBuffer | None: - """Sync IO + full decode for the selected region.""" + """Read a chunk from the store synchronously, decode it, and + return the selected region. + + Parameters + ---------- + byte_getter : Any + An object supporting ``get_sync`` (e.g. + [`StorePath`][zarr.storage._common.StorePath]). + chunk_spec : ArraySpec + The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk. + chunk_selection : SelectorTuple + Selection within the decoded chunk array. + codec_chain : SupportsChunkCodec + The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec] used to + decode the chunk. + + Returns + ------- + NDBuffer or None + The decoded chunk data at *chunk_selection*, or ``None`` if the + chunk does not exist in the store. + """ raw = byte_getter.get_sync(prototype=chunk_spec.prototype) - chunk_array: NDBuffer | None = codec_chain.decode_chunk( - raw, chunk_spec, aa_chain, ab_pair, bb_chain - ) - if chunk_array is not None: - return chunk_array[chunk_selection] - return None + if raw is None: + return None + chunk_array = codec_chain.decode_chunk(raw) + return chunk_array[chunk_selection] def prepare_write_sync( self, @@ -286,9 +374,39 @@ def prepare_write_sync( chunk_selection: SelectorTuple, out_selection: SelectorTuple, replace: bool, - codec_chain: Any, + codec_chain: SupportsChunkCodec, ) -> PreparedWrite: - """Sync IO + deserialize. Returns a :class:`PreparedWrite`.""" + """Prepare a synchronous write by optionally reading existing data. + + When *replace* is ``False``, the existing chunk bytes are fetched + from the store so they can be merged with the new data. When + *replace* is ``True``, the fetch is skipped. + + Parameters + ---------- + byte_setter : Any + An object supporting ``get_sync`` and ``set_sync`` (e.g. + [`StorePath`][zarr.storage._common.StorePath]). + chunk_spec : ArraySpec + The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk. + chunk_selection : SelectorTuple + Selection within the chunk being written. + out_selection : SelectorTuple + Corresponding selection within the source value array. + replace : bool + If ``True``, the write replaces all data in the chunk and no + read-modify-write is needed. If ``False``, existing data is + fetched first. + codec_chain : SupportsChunkCodec + The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec] used to + decode/encode the chunk. + + Returns + ------- + PreparedWrite + A [`PreparedWrite`][zarr.abc.codec.PreparedWrite] carrying the + deserialized chunk data and selection metadata. + """ existing: Buffer | None = None if not replace: existing = byte_setter.get_sync(prototype=chunk_spec.prototype) @@ -314,7 +432,22 @@ def finalize_write_sync( chunk_spec: ArraySpec, byte_setter: Any, ) -> None: - """Serialize the prepared *chunk_dict* and write to store.""" + """Serialize the prepared chunk data and write it to the store. + + If serialization produces ``None`` (all chunks empty), the key is + deleted instead. + + Parameters + ---------- + prepared : PreparedWrite + The [`PreparedWrite`][zarr.abc.codec.PreparedWrite] returned by + [`prepare_write_sync`][zarr.abc.codec.ArrayBytesCodec.prepare_write_sync]. + chunk_spec : ArraySpec + The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk. + byte_setter : Any + An object supporting ``set_sync`` and ``delete_sync`` (e.g. + [`StorePath`][zarr.storage._common.StorePath]). + """ blob = self.serialize(prepared.chunk_dict, chunk_spec) if blob is None: byte_setter.delete_sync() @@ -330,19 +463,35 @@ async def prepare_read( byte_getter: Any, chunk_spec: ArraySpec, chunk_selection: SelectorTuple, - codec_chain: Any, - aa_chain: Any, - ab_pair: Any, - bb_chain: Any, + codec_chain: SupportsChunkCodec, ) -> NDBuffer | None: - """Async IO + full decode for the selected region.""" + """Async variant of + [`prepare_read_sync`][zarr.abc.codec.ArrayBytesCodec.prepare_read_sync]. + + Parameters + ---------- + byte_getter : Any + An object supporting ``get`` (e.g. + [`StorePath`][zarr.storage._common.StorePath]). + chunk_spec : ArraySpec + The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk. + chunk_selection : SelectorTuple + Selection within the decoded chunk array. + codec_chain : SupportsChunkCodec + The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec] used to + decode the chunk. + + Returns + ------- + NDBuffer or None + The decoded chunk data at *chunk_selection*, or ``None`` if the + chunk does not exist in the store. + """ raw = await byte_getter.get(prototype=chunk_spec.prototype) - chunk_array: NDBuffer | None = codec_chain.decode_chunk( - raw, chunk_spec, aa_chain, ab_pair, bb_chain - ) - if chunk_array is not None: - return chunk_array[chunk_selection] - return None + if raw is None: + return None + chunk_array = codec_chain.decode_chunk(raw) + return chunk_array[chunk_selection] async def prepare_write( self, @@ -351,9 +500,36 @@ async def prepare_write( chunk_selection: SelectorTuple, out_selection: SelectorTuple, replace: bool, - codec_chain: Any, + codec_chain: SupportsChunkCodec, ) -> PreparedWrite: - """Async IO + deserialize. Returns a :class:`PreparedWrite`.""" + """Async variant of + [`prepare_write_sync`][zarr.abc.codec.ArrayBytesCodec.prepare_write_sync]. + + Parameters + ---------- + byte_setter : Any + An object supporting ``get`` and ``set`` (e.g. + [`StorePath`][zarr.storage._common.StorePath]). + chunk_spec : ArraySpec + The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk. + chunk_selection : SelectorTuple + Selection within the chunk being written. + out_selection : SelectorTuple + Corresponding selection within the source value array. + replace : bool + If ``True``, the write replaces all data in the chunk and no + read-modify-write is needed. If ``False``, existing data is + fetched first. + codec_chain : SupportsChunkCodec + The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec] used to + decode/encode the chunk. + + Returns + ------- + PreparedWrite + A [`PreparedWrite`][zarr.abc.codec.PreparedWrite] carrying the + deserialized chunk data and selection metadata. + """ existing: Buffer | None = None if not replace: existing = await byte_setter.get(prototype=chunk_spec.prototype) @@ -379,7 +555,20 @@ async def finalize_write( chunk_spec: ArraySpec, byte_setter: Any, ) -> None: - """Async version of :meth:`finalize_write_sync`.""" + """Async variant of + [`finalize_write_sync`][zarr.abc.codec.ArrayBytesCodec.finalize_write_sync]. + + Parameters + ---------- + prepared : PreparedWrite + The [`PreparedWrite`][zarr.abc.codec.PreparedWrite] returned by + [`prepare_write`][zarr.abc.codec.ArrayBytesCodec.prepare_write]. + chunk_spec : ArraySpec + The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk. + byte_setter : Any + An object supporting ``set`` and ``delete`` (e.g. + [`StorePath`][zarr.storage._common.StorePath]). + """ blob = self.serialize(prepared.chunk_dict, chunk_spec) if blob is None: await byte_setter.delete() From 292779799e1e9960e3c39e13299bca59d86fcfd1 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 27 Feb 2026 09:01:08 -0500 Subject: [PATCH 09/10] simplify preparedwrite --- src/zarr/abc/codec.py | 35 +---------------------------------- 1 file changed, 1 insertion(+), 34 deletions(-) diff --git a/src/zarr/abc/codec.py b/src/zarr/abc/codec.py index 6d7eabd6cf..fc86d0edc0 100644 --- a/src/zarr/abc/codec.py +++ b/src/zarr/abc/codec.py @@ -2,7 +2,7 @@ from abc import abstractmethod from collections.abc import Mapping -from dataclasses import dataclass, field +from dataclasses import dataclass from typing import TYPE_CHECKING, Any, Generic, Protocol, TypeGuard, TypeVar, runtime_checkable from typing_extensions import ReadOnly, TypedDict @@ -232,31 +232,12 @@ class PreparedWrite: ---------- chunk_dict : dict[tuple[int, ...], Buffer | None] Per-inner-chunk buffers keyed by chunk coordinates. - inner_codec_chain : SupportsChunkCodec - The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec] for - decoding/encoding inner chunks. - inner_chunk_spec : ArraySpec - The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for inner chunks. indexer : list[ChunkProjection] Mapping from inner-chunk coordinates to value/output selections. - value_selection : SelectorTuple | None - Outer ``out_selection`` for sharding. Unused by the base implementation. - write_full_shard : bool - Whether the full shard blob will be written. Unused by the base implementation. - is_complete_shard : bool - Fast-path flag for complete shard writes. Unused by the base implementation. - shard_data : NDBuffer | None - Full shard value for complete writes. Unused by the base implementation. """ chunk_dict: dict[tuple[int, ...], Buffer | None] - inner_codec_chain: SupportsChunkCodec - inner_chunk_spec: ArraySpec indexer: list[ChunkProjection] - value_selection: SelectorTuple | None = None - write_full_shard: bool = True - is_complete_shard: bool = False - shard_data: NDBuffer | None = field(default=None) class ArrayBytesCodec(BaseCodec[NDBuffer, Buffer]): @@ -374,7 +355,6 @@ def prepare_write_sync( chunk_selection: SelectorTuple, out_selection: SelectorTuple, replace: bool, - codec_chain: SupportsChunkCodec, ) -> PreparedWrite: """Prepare a synchronous write by optionally reading existing data. @@ -397,9 +377,6 @@ def prepare_write_sync( If ``True``, the write replaces all data in the chunk and no read-modify-write is needed. If ``False``, existing data is fetched first. - codec_chain : SupportsChunkCodec - The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec] used to - decode/encode the chunk. Returns ------- @@ -411,11 +388,8 @@ def prepare_write_sync( if not replace: existing = byte_setter.get_sync(prototype=chunk_spec.prototype) chunk_dict = self.deserialize(existing, chunk_spec) - inner_chain = self.inner_codec_chain or codec_chain return PreparedWrite( chunk_dict=chunk_dict, - inner_codec_chain=inner_chain, - inner_chunk_spec=chunk_spec, indexer=[ ( # type: ignore[list-item] (0,), @@ -500,7 +474,6 @@ async def prepare_write( chunk_selection: SelectorTuple, out_selection: SelectorTuple, replace: bool, - codec_chain: SupportsChunkCodec, ) -> PreparedWrite: """Async variant of [`prepare_write_sync`][zarr.abc.codec.ArrayBytesCodec.prepare_write_sync]. @@ -520,9 +493,6 @@ async def prepare_write( If ``True``, the write replaces all data in the chunk and no read-modify-write is needed. If ``False``, existing data is fetched first. - codec_chain : SupportsChunkCodec - The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec] used to - decode/encode the chunk. Returns ------- @@ -534,11 +504,8 @@ async def prepare_write( if not replace: existing = await byte_setter.get(prototype=chunk_spec.prototype) chunk_dict = self.deserialize(existing, chunk_spec) - inner_chain = self.inner_codec_chain or codec_chain return PreparedWrite( chunk_dict=chunk_dict, - inner_codec_chain=inner_chain, - inner_chunk_spec=chunk_spec, indexer=[ ( # type: ignore[list-item] (0,), From b387aeb78c57a2a98c892816b0bb6ef6982b319b Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 27 Feb 2026 15:53:09 -0500 Subject: [PATCH 10/10] use chunktransform --- src/zarr/abc/codec.py | 52 ++++++++++++++++++++++--------------------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/src/zarr/abc/codec.py b/src/zarr/abc/codec.py index fc86d0edc0..d5d673e1c7 100644 --- a/src/zarr/abc/codec.py +++ b/src/zarr/abc/codec.py @@ -83,9 +83,11 @@ def _encode_sync( class SupportsChunkCodec(Protocol): """Protocol for objects that can decode/encode whole chunks synchronously. - [`CodecChain`][zarr.core.codec_pipeline.CodecChain] satisfies this protocol. + [`ChunkTransform`][zarr.core.codec_pipeline.ChunkTransform] satisfies this protocol. """ + array_spec: ArraySpec + def decode_chunk(self, chunk_bytes: Buffer) -> NDBuffer: ... def encode_chunk(self, chunk_array: NDBuffer) -> Buffer | None: ... @@ -316,7 +318,6 @@ def serialize( def prepare_read_sync( self, byte_getter: Any, - chunk_spec: ArraySpec, chunk_selection: SelectorTuple, codec_chain: SupportsChunkCodec, ) -> NDBuffer | None: @@ -328,13 +329,11 @@ def prepare_read_sync( byte_getter : Any An object supporting ``get_sync`` (e.g. [`StorePath`][zarr.storage._common.StorePath]). - chunk_spec : ArraySpec - The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk. chunk_selection : SelectorTuple Selection within the decoded chunk array. codec_chain : SupportsChunkCodec The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec] used to - decode the chunk. + decode the chunk. Must carry an ``array_spec`` attribute. Returns ------- @@ -342,7 +341,7 @@ def prepare_read_sync( The decoded chunk data at *chunk_selection*, or ``None`` if the chunk does not exist in the store. """ - raw = byte_getter.get_sync(prototype=chunk_spec.prototype) + raw = byte_getter.get_sync(prototype=codec_chain.array_spec.prototype) if raw is None: return None chunk_array = codec_chain.decode_chunk(raw) @@ -351,7 +350,7 @@ def prepare_read_sync( def prepare_write_sync( self, byte_setter: Any, - chunk_spec: ArraySpec, + codec_chain: SupportsChunkCodec, chunk_selection: SelectorTuple, out_selection: SelectorTuple, replace: bool, @@ -367,8 +366,9 @@ def prepare_write_sync( byte_setter : Any An object supporting ``get_sync`` and ``set_sync`` (e.g. [`StorePath`][zarr.storage._common.StorePath]). - chunk_spec : ArraySpec - The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk. + codec_chain : SupportsChunkCodec + The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec] + carrying the ``array_spec`` for the chunk. chunk_selection : SelectorTuple Selection within the chunk being written. out_selection : SelectorTuple @@ -384,6 +384,7 @@ def prepare_write_sync( A [`PreparedWrite`][zarr.abc.codec.PreparedWrite] carrying the deserialized chunk data and selection metadata. """ + chunk_spec = codec_chain.array_spec existing: Buffer | None = None if not replace: existing = byte_setter.get_sync(prototype=chunk_spec.prototype) @@ -403,7 +404,7 @@ def prepare_write_sync( def finalize_write_sync( self, prepared: PreparedWrite, - chunk_spec: ArraySpec, + codec_chain: SupportsChunkCodec, byte_setter: Any, ) -> None: """Serialize the prepared chunk data and write it to the store. @@ -416,13 +417,14 @@ def finalize_write_sync( prepared : PreparedWrite The [`PreparedWrite`][zarr.abc.codec.PreparedWrite] returned by [`prepare_write_sync`][zarr.abc.codec.ArrayBytesCodec.prepare_write_sync]. - chunk_spec : ArraySpec - The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk. + codec_chain : SupportsChunkCodec + The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec] + carrying the ``array_spec`` for the chunk. byte_setter : Any An object supporting ``set_sync`` and ``delete_sync`` (e.g. [`StorePath`][zarr.storage._common.StorePath]). """ - blob = self.serialize(prepared.chunk_dict, chunk_spec) + blob = self.serialize(prepared.chunk_dict, codec_chain.array_spec) if blob is None: byte_setter.delete_sync() else: @@ -435,7 +437,6 @@ def finalize_write_sync( async def prepare_read( self, byte_getter: Any, - chunk_spec: ArraySpec, chunk_selection: SelectorTuple, codec_chain: SupportsChunkCodec, ) -> NDBuffer | None: @@ -447,13 +448,11 @@ async def prepare_read( byte_getter : Any An object supporting ``get`` (e.g. [`StorePath`][zarr.storage._common.StorePath]). - chunk_spec : ArraySpec - The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk. chunk_selection : SelectorTuple Selection within the decoded chunk array. codec_chain : SupportsChunkCodec The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec] used to - decode the chunk. + decode the chunk. Must carry an ``array_spec`` attribute. Returns ------- @@ -461,7 +460,7 @@ async def prepare_read( The decoded chunk data at *chunk_selection*, or ``None`` if the chunk does not exist in the store. """ - raw = await byte_getter.get(prototype=chunk_spec.prototype) + raw = await byte_getter.get(prototype=codec_chain.array_spec.prototype) if raw is None: return None chunk_array = codec_chain.decode_chunk(raw) @@ -470,7 +469,7 @@ async def prepare_read( async def prepare_write( self, byte_setter: Any, - chunk_spec: ArraySpec, + codec_chain: SupportsChunkCodec, chunk_selection: SelectorTuple, out_selection: SelectorTuple, replace: bool, @@ -483,8 +482,9 @@ async def prepare_write( byte_setter : Any An object supporting ``get`` and ``set`` (e.g. [`StorePath`][zarr.storage._common.StorePath]). - chunk_spec : ArraySpec - The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk. + codec_chain : SupportsChunkCodec + The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec] + carrying the ``array_spec`` for the chunk. chunk_selection : SelectorTuple Selection within the chunk being written. out_selection : SelectorTuple @@ -500,6 +500,7 @@ async def prepare_write( A [`PreparedWrite`][zarr.abc.codec.PreparedWrite] carrying the deserialized chunk data and selection metadata. """ + chunk_spec = codec_chain.array_spec existing: Buffer | None = None if not replace: existing = await byte_setter.get(prototype=chunk_spec.prototype) @@ -519,7 +520,7 @@ async def prepare_write( async def finalize_write( self, prepared: PreparedWrite, - chunk_spec: ArraySpec, + codec_chain: SupportsChunkCodec, byte_setter: Any, ) -> None: """Async variant of @@ -530,13 +531,14 @@ async def finalize_write( prepared : PreparedWrite The [`PreparedWrite`][zarr.abc.codec.PreparedWrite] returned by [`prepare_write`][zarr.abc.codec.ArrayBytesCodec.prepare_write]. - chunk_spec : ArraySpec - The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk. + codec_chain : SupportsChunkCodec + The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec] + carrying the ``array_spec`` for the chunk. byte_setter : Any An object supporting ``set`` and ``delete`` (e.g. [`StorePath`][zarr.storage._common.StorePath]). """ - blob = self.serialize(prepared.chunk_dict, chunk_spec) + blob = self.serialize(prepared.chunk_dict, codec_chain.array_spec) if blob is None: await byte_setter.delete() else: