From 2b64daa6aeadb73925aaea9d5c80ab28f23b9141 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Wed, 25 Feb 2026 18:33:03 -0500 Subject: [PATCH 1/6] add sync methods to codecs --- src/zarr/abc/codec.py | 20 ++++++++++++++++++- src/zarr/codecs/blosc.py | 27 ++++++++++++++++---------- src/zarr/codecs/bytes.py | 18 ++++++++++++++++-- src/zarr/codecs/crc32c_.py | 18 ++++++++++++++++-- src/zarr/codecs/gzip.py | 27 ++++++++++++++++++++------ src/zarr/codecs/transpose.py | 20 ++++++++++++++++--- src/zarr/codecs/vlen_utf8.py | 37 +++++++++++++++++++++++++++++++----- src/zarr/codecs/zstd.py | 24 ++++++++++++++++------- 8 files changed, 155 insertions(+), 36 deletions(-) diff --git a/src/zarr/abc/codec.py b/src/zarr/abc/codec.py index d41c457b4e..3ec5ec522b 100644 --- a/src/zarr/abc/codec.py +++ b/src/zarr/abc/codec.py @@ -2,7 +2,7 @@ from abc import abstractmethod from collections.abc import Mapping -from typing import TYPE_CHECKING, Generic, TypeGuard, TypeVar +from typing import TYPE_CHECKING, Generic, Protocol, TypeGuard, TypeVar, runtime_checkable from typing_extensions import ReadOnly, TypedDict @@ -32,6 +32,7 @@ "CodecInput", "CodecOutput", "CodecPipeline", + "SupportsSyncCodec", ] CodecInput = TypeVar("CodecInput", bound=NDBuffer | Buffer) @@ -59,6 +60,23 @@ def _check_codecjson_v2(data: object) -> TypeGuard[CodecJSON_V2[str]]: """The widest type of JSON-like input that could specify a codec.""" +@runtime_checkable +class SupportsSyncCodec(Protocol): + """Protocol for codecs that support synchronous encode/decode. + + Codecs implementing this protocol provide ``_decode_sync`` and ``_encode_sync`` + methods that perform encoding/decoding without requiring an async event loop. + """ + + def _decode_sync( + self, chunk_data: NDBuffer | Buffer, chunk_spec: ArraySpec + ) -> NDBuffer | Buffer: ... + + def _encode_sync( + self, chunk_data: NDBuffer | Buffer, chunk_spec: ArraySpec + ) -> NDBuffer | Buffer | None: ... + + class BaseCodec(Metadata, Generic[CodecInput, CodecOutput]): """Generic base class for codecs. diff --git a/src/zarr/codecs/blosc.py b/src/zarr/codecs/blosc.py index 5b91cfa005..d05731d640 100644 --- a/src/zarr/codecs/blosc.py +++ b/src/zarr/codecs/blosc.py @@ -299,13 +299,27 @@ def _blosc_codec(self) -> Blosc: config_dict["typesize"] = self.typesize return Blosc.from_config(config_dict) + def _decode_sync( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> Buffer: + return as_numpy_array_wrapper(self._blosc_codec.decode, chunk_bytes, chunk_spec.prototype) + async def _decode_single( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, ) -> Buffer: - return await asyncio.to_thread( - as_numpy_array_wrapper, self._blosc_codec.decode, chunk_bytes, chunk_spec.prototype + return await asyncio.to_thread(self._decode_sync, chunk_bytes, chunk_spec) + + def _encode_sync( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> Buffer | None: + return chunk_spec.prototype.buffer.from_bytes( + self._blosc_codec.encode(chunk_bytes.as_numpy_array()) ) async def _encode_single( @@ -313,14 +327,7 @@ async def _encode_single( chunk_bytes: Buffer, chunk_spec: ArraySpec, ) -> Buffer | None: - # Since blosc only support host memory, we convert the input and output of the encoding - # between numpy array and buffer - return await asyncio.to_thread( - lambda chunk: chunk_spec.prototype.buffer.from_bytes( - self._blosc_codec.encode(chunk.as_numpy_array()) - ), - chunk_bytes, - ) + return await asyncio.to_thread(self._encode_sync, chunk_bytes, chunk_spec) def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int: raise NotImplementedError diff --git a/src/zarr/codecs/bytes.py b/src/zarr/codecs/bytes.py index 1fbdeef497..86bb354fb5 100644 --- a/src/zarr/codecs/bytes.py +++ b/src/zarr/codecs/bytes.py @@ -65,7 +65,7 @@ def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: ) return self - async def _decode_single( + def _decode_sync( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, @@ -88,7 +88,14 @@ async def _decode_single( ) return chunk_array - async def _encode_single( + async def _decode_single( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> NDBuffer: + return self._decode_sync(chunk_bytes, chunk_spec) + + def _encode_sync( self, chunk_array: NDBuffer, chunk_spec: ArraySpec, @@ -109,5 +116,12 @@ async def _encode_single( nd_array = nd_array.ravel().view(dtype="B") return chunk_spec.prototype.buffer.from_array_like(nd_array) + async def _encode_single( + self, + chunk_array: NDBuffer, + chunk_spec: ArraySpec, + ) -> Buffer | None: + return self._encode_sync(chunk_array, chunk_spec) + def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int: return input_byte_length diff --git a/src/zarr/codecs/crc32c_.py b/src/zarr/codecs/crc32c_.py index 9536d0d558..ebe2ac8f7a 100644 --- a/src/zarr/codecs/crc32c_.py +++ b/src/zarr/codecs/crc32c_.py @@ -31,7 +31,7 @@ def from_dict(cls, data: dict[str, JSON]) -> Self: def to_dict(self) -> dict[str, JSON]: return {"name": "crc32c"} - async def _decode_single( + def _decode_sync( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, @@ -51,7 +51,14 @@ async def _decode_single( ) return chunk_spec.prototype.buffer.from_array_like(inner_bytes) - async def _encode_single( + async def _decode_single( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> Buffer: + return self._decode_sync(chunk_bytes, chunk_spec) + + def _encode_sync( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, @@ -64,5 +71,12 @@ async def _encode_single( # Append the checksum (as bytes) to the data return chunk_spec.prototype.buffer.from_array_like(np.append(data, checksum.view("B"))) + async def _encode_single( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> Buffer | None: + return self._encode_sync(chunk_bytes, chunk_spec) + def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int: return input_byte_length + 4 diff --git a/src/zarr/codecs/gzip.py b/src/zarr/codecs/gzip.py index 610ca9dadd..b8591748f7 100644 --- a/src/zarr/codecs/gzip.py +++ b/src/zarr/codecs/gzip.py @@ -2,6 +2,7 @@ import asyncio from dataclasses import dataclass +from functools import cached_property from typing import TYPE_CHECKING from numcodecs.gzip import GZip @@ -48,23 +49,37 @@ def from_dict(cls, data: dict[str, JSON]) -> Self: def to_dict(self) -> dict[str, JSON]: return {"name": "gzip", "configuration": {"level": self.level}} + @cached_property + def _gzip_codec(self) -> GZip: + return GZip(self.level) + + def _decode_sync( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> Buffer: + return as_numpy_array_wrapper(self._gzip_codec.decode, chunk_bytes, chunk_spec.prototype) + async def _decode_single( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, ) -> Buffer: - return await asyncio.to_thread( - as_numpy_array_wrapper, GZip(self.level).decode, chunk_bytes, chunk_spec.prototype - ) + return await asyncio.to_thread(self._decode_sync, chunk_bytes, chunk_spec) + + def _encode_sync( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> Buffer | None: + return as_numpy_array_wrapper(self._gzip_codec.encode, chunk_bytes, chunk_spec.prototype) async def _encode_single( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, ) -> Buffer | None: - return await asyncio.to_thread( - as_numpy_array_wrapper, GZip(self.level).encode, chunk_bytes, chunk_spec.prototype - ) + return await asyncio.to_thread(self._encode_sync, chunk_bytes, chunk_spec) def compute_encoded_size( self, diff --git a/src/zarr/codecs/transpose.py b/src/zarr/codecs/transpose.py index a8570b6e8f..609448a59c 100644 --- a/src/zarr/codecs/transpose.py +++ b/src/zarr/codecs/transpose.py @@ -95,20 +95,34 @@ def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec: prototype=chunk_spec.prototype, ) - async def _decode_single( + def _decode_sync( self, chunk_array: NDBuffer, chunk_spec: ArraySpec, ) -> NDBuffer: - inverse_order = np.argsort(self.order) + inverse_order = tuple(int(i) for i in np.argsort(self.order)) return chunk_array.transpose(inverse_order) - async def _encode_single( + async def _decode_single( + self, + chunk_array: NDBuffer, + chunk_spec: ArraySpec, + ) -> NDBuffer: + return self._decode_sync(chunk_array, chunk_spec) + + def _encode_sync( self, chunk_array: NDBuffer, _chunk_spec: ArraySpec, ) -> NDBuffer | None: return chunk_array.transpose(self.order) + async def _encode_single( + self, + chunk_array: NDBuffer, + _chunk_spec: ArraySpec, + ) -> NDBuffer | None: + return self._encode_sync(chunk_array, _chunk_spec) + def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int: return input_byte_length diff --git a/src/zarr/codecs/vlen_utf8.py b/src/zarr/codecs/vlen_utf8.py index fb1fb76126..a10cb7c335 100644 --- a/src/zarr/codecs/vlen_utf8.py +++ b/src/zarr/codecs/vlen_utf8.py @@ -40,8 +40,7 @@ def to_dict(self) -> dict[str, JSON]: def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: return self - # TODO: expand the tests for this function - async def _decode_single( + def _decode_sync( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, @@ -55,7 +54,14 @@ async def _decode_single( as_string_dtype = decoded.astype(chunk_spec.dtype.to_native_dtype(), copy=False) return chunk_spec.prototype.nd_buffer.from_numpy_array(as_string_dtype) - async def _encode_single( + async def _decode_single( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> NDBuffer: + return self._decode_sync(chunk_bytes, chunk_spec) + + def _encode_sync( self, chunk_array: NDBuffer, chunk_spec: ArraySpec, @@ -65,6 +71,13 @@ async def _encode_single( _vlen_utf8_codec.encode(chunk_array.as_numpy_array()) ) + async def _encode_single( + self, + chunk_array: NDBuffer, + chunk_spec: ArraySpec, + ) -> Buffer | None: + return self._encode_sync(chunk_array, chunk_spec) + def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int: # what is input_byte_length for an object dtype? raise NotImplementedError("compute_encoded_size is not implemented for VLen codecs") @@ -86,7 +99,7 @@ def to_dict(self) -> dict[str, JSON]: def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: return self - async def _decode_single( + def _decode_sync( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, @@ -99,7 +112,14 @@ async def _decode_single( decoded = _reshape_view(decoded, chunk_spec.shape) return chunk_spec.prototype.nd_buffer.from_numpy_array(decoded) - async def _encode_single( + async def _decode_single( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> NDBuffer: + return self._decode_sync(chunk_bytes, chunk_spec) + + def _encode_sync( self, chunk_array: NDBuffer, chunk_spec: ArraySpec, @@ -109,6 +129,13 @@ async def _encode_single( _vlen_bytes_codec.encode(chunk_array.as_numpy_array()) ) + async def _encode_single( + self, + chunk_array: NDBuffer, + chunk_spec: ArraySpec, + ) -> Buffer | None: + return self._encode_sync(chunk_array, chunk_spec) + def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int: # what is input_byte_length for an object dtype? raise NotImplementedError("compute_encoded_size is not implemented for VLen codecs") diff --git a/src/zarr/codecs/zstd.py b/src/zarr/codecs/zstd.py index 27cc9a7777..f93c25a3c7 100644 --- a/src/zarr/codecs/zstd.py +++ b/src/zarr/codecs/zstd.py @@ -38,7 +38,7 @@ def parse_checksum(data: JSON) -> bool: class ZstdCodec(BytesBytesCodec): """zstd codec""" - is_fixed_size = True + is_fixed_size = False level: int = 0 checksum: bool = False @@ -71,23 +71,33 @@ def _zstd_codec(self) -> Zstd: config_dict = {"level": self.level, "checksum": self.checksum} return Zstd.from_config(config_dict) + def _decode_sync( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> Buffer: + return as_numpy_array_wrapper(self._zstd_codec.decode, chunk_bytes, chunk_spec.prototype) + async def _decode_single( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, ) -> Buffer: - return await asyncio.to_thread( - as_numpy_array_wrapper, self._zstd_codec.decode, chunk_bytes, chunk_spec.prototype - ) + return await asyncio.to_thread(self._decode_sync, chunk_bytes, chunk_spec) + + def _encode_sync( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> Buffer | None: + return as_numpy_array_wrapper(self._zstd_codec.encode, chunk_bytes, chunk_spec.prototype) async def _encode_single( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, ) -> Buffer | None: - return await asyncio.to_thread( - as_numpy_array_wrapper, self._zstd_codec.encode, chunk_bytes, chunk_spec.prototype - ) + return await asyncio.to_thread(self._encode_sync, chunk_bytes, chunk_spec) def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int: raise NotImplementedError From cd4efb0a1ed611d083e650b87e3de44cc39f5a46 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Wed, 25 Feb 2026 19:09:51 -0500 Subject: [PATCH 2/6] add CodecChain dataclass and sync codec tests Introduces CodecChain, a frozen dataclass that chains array-array, array-bytes, and bytes-bytes codecs with synchronous encode/decode methods. Pure compute only -- no IO, no threading, no batching. Also adds sync roundtrip tests for individual codecs (blosc, gzip, zstd, crc32c, bytes, transpose, vlen) and CodecChain integration tests. Co-Authored-By: Claude Opus 4.6 --- src/zarr/core/codec_pipeline.py | 133 +++++++++++++++++++++++- tests/test_codecs/test_blosc.py | 29 +++++- tests/test_codecs/test_crc32c.py | 33 ++++++ tests/test_codecs/test_endian.py | 29 ++++++ tests/test_codecs/test_gzip.py | 28 +++++ tests/test_codecs/test_transpose.py | 28 +++++ tests/test_codecs/test_vlen.py | 11 +- tests/test_codecs/test_zstd.py | 32 ++++++ tests/test_sync_codec_pipeline.py | 156 ++++++++++++++++++++++++++++ 9 files changed, 474 insertions(+), 5 deletions(-) create mode 100644 tests/test_codecs/test_crc32c.py create mode 100644 tests/test_sync_codec_pipeline.py diff --git a/src/zarr/core/codec_pipeline.py b/src/zarr/core/codec_pipeline.py index fd557ac43e..7425ec30f6 100644 --- a/src/zarr/core/codec_pipeline.py +++ b/src/zarr/core/codec_pipeline.py @@ -1,8 +1,8 @@ from __future__ import annotations -from dataclasses import dataclass +from dataclasses import dataclass, field from itertools import islice, pairwise -from typing import TYPE_CHECKING, Any, TypeVar +from typing import TYPE_CHECKING, Any, TypeVar, cast from warnings import warn from zarr.abc.codec import ( @@ -13,6 +13,7 @@ BytesBytesCodec, Codec, CodecPipeline, + SupportsSyncCodec, ) from zarr.core.common import concurrent_map from zarr.core.config import config @@ -68,6 +69,134 @@ def fill_value_or_default(chunk_spec: ArraySpec) -> Any: return fill_value +@dataclass(frozen=True) +class CodecChain: + """Lightweight codec chain: array-array -> array-bytes -> bytes-bytes. + + Pure compute only -- no IO methods, no threading, no batching. + """ + + array_array_codecs: tuple[ArrayArrayCodec, ...] + array_bytes_codec: ArrayBytesCodec + bytes_bytes_codecs: tuple[BytesBytesCodec, ...] + + _all_sync: bool = field(default=False, init=False, repr=False, compare=False) + + def __post_init__(self) -> None: + object.__setattr__( + self, + "_all_sync", + all(isinstance(c, SupportsSyncCodec) for c in self), + ) + + def __iter__(self) -> Iterator[Codec]: + yield from self.array_array_codecs + yield self.array_bytes_codec + yield from self.bytes_bytes_codecs + + @classmethod + def from_codecs(cls, codecs: Iterable[Codec]) -> CodecChain: + aa, ab, bb = codecs_from_list(list(codecs)) + return cls(array_array_codecs=aa, array_bytes_codec=ab, bytes_bytes_codecs=bb) + + def resolve_metadata_chain( + self, chunk_spec: ArraySpec + ) -> tuple[ + list[tuple[ArrayArrayCodec, ArraySpec]], + tuple[ArrayBytesCodec, ArraySpec], + list[tuple[BytesBytesCodec, ArraySpec]], + ]: + """Resolve metadata through the codec chain for a single chunk_spec.""" + aa_codecs_with_spec: list[tuple[ArrayArrayCodec, ArraySpec]] = [] + spec = chunk_spec + for aa_codec in self.array_array_codecs: + aa_codecs_with_spec.append((aa_codec, spec)) + spec = aa_codec.resolve_metadata(spec) + + ab_codec_with_spec = (self.array_bytes_codec, spec) + spec = self.array_bytes_codec.resolve_metadata(spec) + + bb_codecs_with_spec: list[tuple[BytesBytesCodec, ArraySpec]] = [] + for bb_codec in self.bytes_bytes_codecs: + bb_codecs_with_spec.append((bb_codec, spec)) + spec = bb_codec.resolve_metadata(spec) + + return (aa_codecs_with_spec, ab_codec_with_spec, bb_codecs_with_spec) + + def decode_chunk( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + aa_chain: Iterable[tuple[ArrayArrayCodec, ArraySpec]] | None = None, + ab_pair: tuple[ArrayBytesCodec, ArraySpec] | None = None, + bb_chain: Iterable[tuple[BytesBytesCodec, ArraySpec]] | None = None, + ) -> NDBuffer: + """Decode a single chunk through the full codec chain, synchronously. + + Pure compute -- no IO. Only callable when all codecs support sync. + + The optional ``aa_chain``, ``ab_pair``, ``bb_chain`` parameters allow + pre-resolved metadata to be reused across many chunks with the same spec. + If not provided, ``resolve_metadata_chain`` is called internally. + """ + if aa_chain is None or ab_pair is None or bb_chain is None: + aa_chain, ab_pair, bb_chain = self.resolve_metadata_chain(chunk_spec) + + bb_out: Any = chunk_bytes + for bb_codec, spec in reversed(list(bb_chain)): + bb_out = cast("SupportsSyncCodec", bb_codec)._decode_sync(bb_out, spec) + + ab_codec, ab_spec = ab_pair + ab_out: Any = cast("SupportsSyncCodec", ab_codec)._decode_sync(bb_out, ab_spec) + + for aa_codec, spec in reversed(list(aa_chain)): + ab_out = cast("SupportsSyncCodec", aa_codec)._decode_sync(ab_out, spec) + + return ab_out # type: ignore[no-any-return] + + def encode_chunk( + self, + chunk_array: NDBuffer, + chunk_spec: ArraySpec, + ) -> Buffer | None: + """Encode a single chunk through the full codec chain, synchronously. + + Pure compute -- no IO. Only callable when all codecs support sync. + """ + spec = chunk_spec + aa_out: Any = chunk_array + + for aa_codec in self.array_array_codecs: + if aa_out is None: + return None + aa_out = cast("SupportsSyncCodec", aa_codec)._encode_sync(aa_out, spec) + spec = aa_codec.resolve_metadata(spec) + + if aa_out is None: + return None + bb_out: Any = cast("SupportsSyncCodec", self.array_bytes_codec)._encode_sync(aa_out, spec) + spec = self.array_bytes_codec.resolve_metadata(spec) + + for bb_codec in self.bytes_bytes_codecs: + if bb_out is None: + return None + bb_out = cast("SupportsSyncCodec", bb_codec)._encode_sync(bb_out, spec) + spec = bb_codec.resolve_metadata(spec) + + return bb_out # type: ignore[no-any-return] + + def compute_encoded_size(self, byte_length: int, array_spec: ArraySpec) -> int: + for codec in self: + byte_length = codec.compute_encoded_size(byte_length, array_spec) + array_spec = codec.resolve_metadata(array_spec) + return byte_length + + def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec: + for codec in self: + chunk_spec = codec.resolve_metadata(chunk_spec) + return chunk_spec + + @dataclass(frozen=True) class BatchedCodecPipeline(CodecPipeline): """Default codec pipeline. diff --git a/tests/test_codecs/test_blosc.py b/tests/test_codecs/test_blosc.py index 6f4821f8b1..0201beb8de 100644 --- a/tests/test_codecs/test_blosc.py +++ b/tests/test_codecs/test_blosc.py @@ -6,11 +6,12 @@ from packaging.version import Version import zarr +from zarr.abc.codec import SupportsSyncCodec from zarr.codecs import BloscCodec from zarr.codecs.blosc import BloscShuffle, Shuffle -from zarr.core.array_spec import ArraySpec +from zarr.core.array_spec import ArrayConfig, ArraySpec from zarr.core.buffer import default_buffer_prototype -from zarr.core.dtype import UInt16 +from zarr.core.dtype import UInt16, get_data_type_from_native_dtype from zarr.storage import MemoryStore, StorePath @@ -110,3 +111,27 @@ async def test_typesize() -> None: else: expected_size = 10216 assert size == expected_size, msg + + +def test_blosc_codec_supports_sync() -> None: + assert isinstance(BloscCodec(), SupportsSyncCodec) + + +def test_blosc_codec_sync_roundtrip() -> None: + codec = BloscCodec(typesize=8) + arr = np.arange(100, dtype="float64") + zdtype = get_data_type_from_native_dtype(arr.dtype) + spec = ArraySpec( + shape=arr.shape, + dtype=zdtype, + fill_value=zdtype.cast_scalar(0), + config=ArrayConfig(order="C", write_empty_chunks=True), + prototype=default_buffer_prototype(), + ) + buf = default_buffer_prototype().buffer.from_array_like(arr.view("B")) + + encoded = codec._encode_sync(buf, spec) + assert encoded is not None + decoded = codec._decode_sync(encoded, spec) + result = np.frombuffer(decoded.as_numpy_array(), dtype="float64") + np.testing.assert_array_equal(arr, result) diff --git a/tests/test_codecs/test_crc32c.py b/tests/test_codecs/test_crc32c.py new file mode 100644 index 0000000000..3ab1070f60 --- /dev/null +++ b/tests/test_codecs/test_crc32c.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +import numpy as np + +from zarr.abc.codec import SupportsSyncCodec +from zarr.codecs.crc32c_ import Crc32cCodec +from zarr.core.array_spec import ArrayConfig, ArraySpec +from zarr.core.buffer import default_buffer_prototype +from zarr.core.dtype import get_data_type_from_native_dtype + + +def test_crc32c_codec_supports_sync() -> None: + assert isinstance(Crc32cCodec(), SupportsSyncCodec) + + +def test_crc32c_codec_sync_roundtrip() -> None: + codec = Crc32cCodec() + arr = np.arange(100, dtype="float64") + zdtype = get_data_type_from_native_dtype(arr.dtype) + spec = ArraySpec( + shape=arr.shape, + dtype=zdtype, + fill_value=zdtype.cast_scalar(0), + config=ArrayConfig(order="C", write_empty_chunks=True), + prototype=default_buffer_prototype(), + ) + buf = default_buffer_prototype().buffer.from_array_like(arr.view("B")) + + encoded = codec._encode_sync(buf, spec) + assert encoded is not None + decoded = codec._decode_sync(encoded, spec) + result = np.frombuffer(decoded.as_numpy_array(), dtype="float64") + np.testing.assert_array_equal(arr, result) diff --git a/tests/test_codecs/test_endian.py b/tests/test_codecs/test_endian.py index ab64afb1b8..c505cee828 100644 --- a/tests/test_codecs/test_endian.py +++ b/tests/test_codecs/test_endian.py @@ -4,8 +4,12 @@ import pytest import zarr +from zarr.abc.codec import SupportsSyncCodec from zarr.abc.store import Store from zarr.codecs import BytesCodec +from zarr.core.array_spec import ArrayConfig, ArraySpec +from zarr.core.buffer import NDBuffer, default_buffer_prototype +from zarr.core.dtype import get_data_type_from_native_dtype from zarr.storage import StorePath from .test_codecs import _AsyncArrayProxy @@ -33,6 +37,31 @@ async def test_endian(store: Store, endian: Literal["big", "little"]) -> None: assert np.array_equal(data, readback_data) +def test_bytes_codec_supports_sync() -> None: + assert isinstance(BytesCodec(), SupportsSyncCodec) + + +def test_bytes_codec_sync_roundtrip() -> None: + codec = BytesCodec() + arr = np.arange(100, dtype="float64") + zdtype = get_data_type_from_native_dtype(arr.dtype) + spec = ArraySpec( + shape=arr.shape, + dtype=zdtype, + fill_value=zdtype.cast_scalar(0), + config=ArrayConfig(order="C", write_empty_chunks=True), + prototype=default_buffer_prototype(), + ) + nd_buf: NDBuffer = default_buffer_prototype().nd_buffer.from_numpy_array(arr) + + codec = codec.evolve_from_array_spec(spec) + + encoded = codec._encode_sync(nd_buf, spec) + assert encoded is not None + decoded = codec._decode_sync(encoded, spec) + np.testing.assert_array_equal(arr, decoded.as_numpy_array()) + + @pytest.mark.filterwarnings("ignore:The endianness of the requested serializer") @pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) @pytest.mark.parametrize("dtype_input_endian", [">u2", " None: a[:, :] = data assert np.array_equal(data, a[:, :]) + + +def test_gzip_codec_supports_sync() -> None: + assert isinstance(GzipCodec(), SupportsSyncCodec) + + +def test_gzip_codec_sync_roundtrip() -> None: + codec = GzipCodec(level=1) + arr = np.arange(100, dtype="float64") + zdtype = get_data_type_from_native_dtype(arr.dtype) + spec = ArraySpec( + shape=arr.shape, + dtype=zdtype, + fill_value=zdtype.cast_scalar(0), + config=ArrayConfig(order="C", write_empty_chunks=True), + prototype=default_buffer_prototype(), + ) + buf = default_buffer_prototype().buffer.from_array_like(arr.view("B")) + + encoded = codec._encode_sync(buf, spec) + assert encoded is not None + decoded = codec._decode_sync(encoded, spec) + result = np.frombuffer(decoded.as_numpy_array(), dtype="float64") + np.testing.assert_array_equal(arr, result) diff --git a/tests/test_codecs/test_transpose.py b/tests/test_codecs/test_transpose.py index 06ec668ad3..949bb72a62 100644 --- a/tests/test_codecs/test_transpose.py +++ b/tests/test_codecs/test_transpose.py @@ -3,9 +3,13 @@ import zarr from zarr import AsyncArray, config +from zarr.abc.codec import SupportsSyncCodec from zarr.abc.store import Store from zarr.codecs import TransposeCodec +from zarr.core.array_spec import ArrayConfig, ArraySpec +from zarr.core.buffer import NDBuffer, default_buffer_prototype from zarr.core.common import MemoryOrder +from zarr.core.dtype import get_data_type_from_native_dtype from zarr.storage import StorePath from .test_codecs import _AsyncArrayProxy @@ -93,3 +97,27 @@ def test_transpose_invalid( chunk_key_encoding={"name": "v2", "separator": "."}, filters=[TransposeCodec(order=order)], # type: ignore[arg-type] ) + + +def test_transpose_codec_supports_sync() -> None: + assert isinstance(TransposeCodec(order=(0, 1)), SupportsSyncCodec) + + +def test_transpose_codec_sync_roundtrip() -> None: + codec = TransposeCodec(order=(1, 0)) + arr = np.arange(12, dtype="float64").reshape(3, 4) + zdtype = get_data_type_from_native_dtype(arr.dtype) + spec = ArraySpec( + shape=arr.shape, + dtype=zdtype, + fill_value=zdtype.cast_scalar(0), + config=ArrayConfig(order="C", write_empty_chunks=True), + prototype=default_buffer_prototype(), + ) + nd_buf: NDBuffer = default_buffer_prototype().nd_buffer.from_numpy_array(arr) + + encoded = codec._encode_sync(nd_buf, spec) + assert encoded is not None + resolved_spec = codec.resolve_metadata(spec) + decoded = codec._decode_sync(encoded, resolved_spec) + np.testing.assert_array_equal(arr, decoded.as_numpy_array()) diff --git a/tests/test_codecs/test_vlen.py b/tests/test_codecs/test_vlen.py index cf0905daca..f3445824b3 100644 --- a/tests/test_codecs/test_vlen.py +++ b/tests/test_codecs/test_vlen.py @@ -5,9 +5,10 @@ import zarr from zarr import Array -from zarr.abc.codec import Codec +from zarr.abc.codec import Codec, SupportsSyncCodec from zarr.abc.store import Store from zarr.codecs import ZstdCodec +from zarr.codecs.vlen_utf8 import VLenBytesCodec, VLenUTF8Codec from zarr.core.dtype import get_data_type_from_native_dtype from zarr.core.dtype.npy.string import _NUMPY_SUPPORTS_VLEN_STRING from zarr.core.metadata.v3 import ArrayV3Metadata @@ -62,3 +63,11 @@ def test_vlen_string( assert np.array_equal(data, b[:, :]) assert b.metadata.data_type == get_data_type_from_native_dtype(data.dtype) assert a.dtype == data.dtype + + +def test_vlen_utf8_codec_supports_sync() -> None: + assert isinstance(VLenUTF8Codec(), SupportsSyncCodec) + + +def test_vlen_bytes_codec_supports_sync() -> None: + assert isinstance(VLenBytesCodec(), SupportsSyncCodec) diff --git a/tests/test_codecs/test_zstd.py b/tests/test_codecs/test_zstd.py index 6068f53443..68297e4d94 100644 --- a/tests/test_codecs/test_zstd.py +++ b/tests/test_codecs/test_zstd.py @@ -2,8 +2,12 @@ import pytest import zarr +from zarr.abc.codec import SupportsSyncCodec from zarr.abc.store import Store from zarr.codecs import ZstdCodec +from zarr.core.array_spec import ArrayConfig, ArraySpec +from zarr.core.buffer import default_buffer_prototype +from zarr.core.dtype import get_data_type_from_native_dtype from zarr.storage import StorePath @@ -23,3 +27,31 @@ def test_zstd(store: Store, checksum: bool) -> None: a[:, :] = data assert np.array_equal(data, a[:, :]) + + +def test_zstd_codec_supports_sync() -> None: + assert isinstance(ZstdCodec(), SupportsSyncCodec) + + +def test_zstd_is_not_fixed_size() -> None: + assert ZstdCodec.is_fixed_size is False + + +def test_zstd_codec_sync_roundtrip() -> None: + codec = ZstdCodec(level=1) + arr = np.arange(100, dtype="float64") + zdtype = get_data_type_from_native_dtype(arr.dtype) + spec = ArraySpec( + shape=arr.shape, + dtype=zdtype, + fill_value=zdtype.cast_scalar(0), + config=ArrayConfig(order="C", write_empty_chunks=True), + prototype=default_buffer_prototype(), + ) + buf = default_buffer_prototype().buffer.from_array_like(arr.view("B")) + + encoded = codec._encode_sync(buf, spec) + assert encoded is not None + decoded = codec._decode_sync(encoded, spec) + result = np.frombuffer(decoded.as_numpy_array(), dtype="float64") + np.testing.assert_array_equal(arr, result) diff --git a/tests/test_sync_codec_pipeline.py b/tests/test_sync_codec_pipeline.py new file mode 100644 index 0000000000..23fa28cb04 --- /dev/null +++ b/tests/test_sync_codec_pipeline.py @@ -0,0 +1,156 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +import numpy as np +import pytest + +from zarr.codecs.bytes import BytesCodec +from zarr.codecs.gzip import GzipCodec +from zarr.codecs.transpose import TransposeCodec +from zarr.codecs.zstd import ZstdCodec +from zarr.core.array_spec import ArrayConfig, ArraySpec +from zarr.core.buffer import NDBuffer, default_buffer_prototype +from zarr.core.dtype import get_data_type_from_native_dtype + +if TYPE_CHECKING: + from zarr.abc.codec import Codec + + +def _make_array_spec(shape: tuple[int, ...], dtype: np.dtype[np.generic]) -> ArraySpec: + zdtype = get_data_type_from_native_dtype(dtype) + return ArraySpec( + shape=shape, + dtype=zdtype, + fill_value=zdtype.cast_scalar(0), + config=ArrayConfig(order="C", write_empty_chunks=True), + prototype=default_buffer_prototype(), + ) + + +def _make_nd_buffer(arr: np.ndarray[Any, np.dtype[Any]]) -> NDBuffer: + return default_buffer_prototype().nd_buffer.from_numpy_array(arr) + + +class TestCodecChain: + def test_from_codecs_bytes_only(self) -> None: + from zarr.core.codec_pipeline import CodecChain + + chain = CodecChain.from_codecs([BytesCodec()]) + assert chain.array_array_codecs == () + assert isinstance(chain.array_bytes_codec, BytesCodec) + assert chain.bytes_bytes_codecs == () + assert chain._all_sync is True + + def test_from_codecs_with_compression(self) -> None: + from zarr.core.codec_pipeline import CodecChain + + chain = CodecChain.from_codecs([BytesCodec(), GzipCodec()]) + assert isinstance(chain.array_bytes_codec, BytesCodec) + assert len(chain.bytes_bytes_codecs) == 1 + assert isinstance(chain.bytes_bytes_codecs[0], GzipCodec) + assert chain._all_sync is True + + def test_from_codecs_with_transpose(self) -> None: + from zarr.core.codec_pipeline import CodecChain + + chain = CodecChain.from_codecs([TransposeCodec(order=(1, 0)), BytesCodec()]) + assert len(chain.array_array_codecs) == 1 + assert isinstance(chain.array_array_codecs[0], TransposeCodec) + assert isinstance(chain.array_bytes_codec, BytesCodec) + assert chain._all_sync is True + + def test_from_codecs_full_chain(self) -> None: + from zarr.core.codec_pipeline import CodecChain + + chain = CodecChain.from_codecs([TransposeCodec(order=(1, 0)), BytesCodec(), ZstdCodec()]) + assert len(chain.array_array_codecs) == 1 + assert isinstance(chain.array_bytes_codec, BytesCodec) + assert len(chain.bytes_bytes_codecs) == 1 + assert chain._all_sync is True + + def test_iter(self) -> None: + from zarr.core.codec_pipeline import CodecChain + + codecs: list[Codec] = [TransposeCodec(order=(1, 0)), BytesCodec(), GzipCodec()] + chain = CodecChain.from_codecs(codecs) + assert list(chain) == codecs + + def test_frozen(self) -> None: + from zarr.core.codec_pipeline import CodecChain + + chain = CodecChain.from_codecs([BytesCodec()]) + with pytest.raises(AttributeError): + chain.array_bytes_codec = BytesCodec() # type: ignore[misc] + + def test_encode_decode_roundtrip_bytes_only(self) -> None: + from zarr.core.codec_pipeline import CodecChain + + chain = CodecChain.from_codecs([BytesCodec()]) + arr = np.arange(100, dtype="float64") + spec = _make_array_spec(arr.shape, arr.dtype) + chain_evolved = CodecChain.from_codecs([c.evolve_from_array_spec(spec) for c in chain]) + nd_buf = _make_nd_buffer(arr) + + encoded = chain_evolved.encode_chunk(nd_buf, spec) + assert encoded is not None + decoded = chain_evolved.decode_chunk(encoded, spec) + assert decoded is not None + np.testing.assert_array_equal(arr, decoded.as_numpy_array()) + + def test_encode_decode_roundtrip_with_compression(self) -> None: + from zarr.core.codec_pipeline import CodecChain + + chain = CodecChain.from_codecs([BytesCodec(), GzipCodec(level=1)]) + arr = np.arange(100, dtype="float64") + spec = _make_array_spec(arr.shape, arr.dtype) + chain_evolved = CodecChain.from_codecs([c.evolve_from_array_spec(spec) for c in chain]) + nd_buf = _make_nd_buffer(arr) + + encoded = chain_evolved.encode_chunk(nd_buf, spec) + assert encoded is not None + decoded = chain_evolved.decode_chunk(encoded, spec) + assert decoded is not None + np.testing.assert_array_equal(arr, decoded.as_numpy_array()) + + def test_encode_decode_roundtrip_with_transpose(self) -> None: + from zarr.core.codec_pipeline import CodecChain + + chain = CodecChain.from_codecs( + [TransposeCodec(order=(1, 0)), BytesCodec(), ZstdCodec(level=1)] + ) + arr = np.arange(12, dtype="float64").reshape(3, 4) + spec = _make_array_spec(arr.shape, arr.dtype) + chain_evolved = CodecChain.from_codecs([c.evolve_from_array_spec(spec) for c in chain]) + nd_buf = _make_nd_buffer(arr) + + encoded = chain_evolved.encode_chunk(nd_buf, spec) + assert encoded is not None + decoded = chain_evolved.decode_chunk(encoded, spec) + assert decoded is not None + np.testing.assert_array_equal(arr, decoded.as_numpy_array()) + + def test_resolve_metadata_chain(self) -> None: + from zarr.core.codec_pipeline import CodecChain + + chain = CodecChain.from_codecs([TransposeCodec(order=(1, 0)), BytesCodec(), GzipCodec()]) + arr = np.zeros((3, 4), dtype="float64") + spec = _make_array_spec(arr.shape, arr.dtype) + chain_evolved = CodecChain.from_codecs([c.evolve_from_array_spec(spec) for c in chain]) + + aa_chain, ab_pair, bb_chain = chain_evolved.resolve_metadata_chain(spec) + assert len(aa_chain) == 1 + assert aa_chain[0][1].shape == (3, 4) # spec before transpose + _ab_codec, ab_spec = ab_pair + assert ab_spec.shape == (4, 3) # spec after transpose + assert len(bb_chain) == 1 + + def test_resolve_metadata(self) -> None: + from zarr.core.codec_pipeline import CodecChain + + chain = CodecChain.from_codecs([TransposeCodec(order=(1, 0)), BytesCodec()]) + spec = _make_array_spec((3, 4), np.dtype("float64")) + chain_evolved = CodecChain.from_codecs([c.evolve_from_array_spec(spec) for c in chain]) + resolved = chain_evolved.resolve_metadata(spec) + # After transpose (1,0) + bytes, shape should reflect the transpose + assert resolved.shape == (4, 3) From 41b7a6ad78361de0018cc8031e0280810924d680 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Wed, 25 Feb 2026 20:34:34 -0500 Subject: [PATCH 3/6] refactor codecchain --- src/zarr/core/codec_pipeline.py | 114 ++++++++++----------------- tests/test_sync_codec_pipeline.py | 123 ++++++------------------------ 2 files changed, 65 insertions(+), 172 deletions(-) diff --git a/src/zarr/core/codec_pipeline.py b/src/zarr/core/codec_pipeline.py index 7425ec30f6..9c0dd292ed 100644 --- a/src/zarr/core/codec_pipeline.py +++ b/src/zarr/core/codec_pipeline.py @@ -69,87 +69,67 @@ def fill_value_or_default(chunk_spec: ArraySpec) -> Any: return fill_value -@dataclass(frozen=True) +@dataclass(frozen=True, slots=True) class CodecChain: - """Lightweight codec chain: array-array -> array-bytes -> bytes-bytes. + """Codec chain with pre-resolved metadata specs. - Pure compute only -- no IO methods, no threading, no batching. + Constructed from an iterable of codecs and a chunk ArraySpec. + Resolves each codec against the spec so that encode/decode can + run without re-resolving. Pure compute only -- no IO, no threading, + no batching. """ - array_array_codecs: tuple[ArrayArrayCodec, ...] - array_bytes_codec: ArrayBytesCodec - bytes_bytes_codecs: tuple[BytesBytesCodec, ...] + codecs: tuple[Codec, ...] + chunk_spec: ArraySpec - _all_sync: bool = field(default=False, init=False, repr=False, compare=False) + _aa_codecs: tuple[tuple[ArrayArrayCodec, ArraySpec], ...] = field( + init=False, repr=False, compare=False + ) + _ab_codec: ArrayBytesCodec = field(init=False, repr=False, compare=False) + _ab_spec: ArraySpec = field(init=False, repr=False, compare=False) + _bb_codecs: tuple[BytesBytesCodec, ...] = field(init=False, repr=False, compare=False) + _all_sync: bool = field(init=False, repr=False, compare=False) def __post_init__(self) -> None: - object.__setattr__( - self, - "_all_sync", - all(isinstance(c, SupportsSyncCodec) for c in self), - ) - - def __iter__(self) -> Iterator[Codec]: - yield from self.array_array_codecs - yield self.array_bytes_codec - yield from self.bytes_bytes_codecs + aa, ab, bb = codecs_from_list(list(self.codecs)) - @classmethod - def from_codecs(cls, codecs: Iterable[Codec]) -> CodecChain: - aa, ab, bb = codecs_from_list(list(codecs)) - return cls(array_array_codecs=aa, array_bytes_codec=ab, bytes_bytes_codecs=bb) - - def resolve_metadata_chain( - self, chunk_spec: ArraySpec - ) -> tuple[ - list[tuple[ArrayArrayCodec, ArraySpec]], - tuple[ArrayBytesCodec, ArraySpec], - list[tuple[BytesBytesCodec, ArraySpec]], - ]: - """Resolve metadata through the codec chain for a single chunk_spec.""" - aa_codecs_with_spec: list[tuple[ArrayArrayCodec, ArraySpec]] = [] - spec = chunk_spec - for aa_codec in self.array_array_codecs: - aa_codecs_with_spec.append((aa_codec, spec)) + aa_pairs: list[tuple[ArrayArrayCodec, ArraySpec]] = [] + spec = self.chunk_spec + for aa_codec in aa: + aa_pairs.append((aa_codec, spec)) spec = aa_codec.resolve_metadata(spec) - ab_codec_with_spec = (self.array_bytes_codec, spec) - spec = self.array_bytes_codec.resolve_metadata(spec) + object.__setattr__(self, "_aa_codecs", tuple(aa_pairs)) + object.__setattr__(self, "_ab_codec", ab) + object.__setattr__(self, "_ab_spec", spec) - bb_codecs_with_spec: list[tuple[BytesBytesCodec, ArraySpec]] = [] - for bb_codec in self.bytes_bytes_codecs: - bb_codecs_with_spec.append((bb_codec, spec)) - spec = bb_codec.resolve_metadata(spec) + object.__setattr__(self, "_bb_codecs", bb) - return (aa_codecs_with_spec, ab_codec_with_spec, bb_codecs_with_spec) + object.__setattr__( + self, + "_all_sync", + all(isinstance(c, SupportsSyncCodec) for c in self.codecs), + ) + + @property + def all_sync(self) -> bool: + return self._all_sync def decode_chunk( self, chunk_bytes: Buffer, - chunk_spec: ArraySpec, - aa_chain: Iterable[tuple[ArrayArrayCodec, ArraySpec]] | None = None, - ab_pair: tuple[ArrayBytesCodec, ArraySpec] | None = None, - bb_chain: Iterable[tuple[BytesBytesCodec, ArraySpec]] | None = None, ) -> NDBuffer: """Decode a single chunk through the full codec chain, synchronously. Pure compute -- no IO. Only callable when all codecs support sync. - - The optional ``aa_chain``, ``ab_pair``, ``bb_chain`` parameters allow - pre-resolved metadata to be reused across many chunks with the same spec. - If not provided, ``resolve_metadata_chain`` is called internally. """ - if aa_chain is None or ab_pair is None or bb_chain is None: - aa_chain, ab_pair, bb_chain = self.resolve_metadata_chain(chunk_spec) - bb_out: Any = chunk_bytes - for bb_codec, spec in reversed(list(bb_chain)): - bb_out = cast("SupportsSyncCodec", bb_codec)._decode_sync(bb_out, spec) + for bb_codec in reversed(self._bb_codecs): + bb_out = cast("SupportsSyncCodec", bb_codec)._decode_sync(bb_out, self.chunk_spec) - ab_codec, ab_spec = ab_pair - ab_out: Any = cast("SupportsSyncCodec", ab_codec)._decode_sync(bb_out, ab_spec) + ab_out: Any = cast("SupportsSyncCodec", self._ab_codec)._decode_sync(bb_out, self._ab_spec) - for aa_codec, spec in reversed(list(aa_chain)): + for aa_codec, spec in reversed(self._aa_codecs): ab_out = cast("SupportsSyncCodec", aa_codec)._decode_sync(ab_out, spec) return ab_out # type: ignore[no-any-return] @@ -157,45 +137,35 @@ def decode_chunk( def encode_chunk( self, chunk_array: NDBuffer, - chunk_spec: ArraySpec, ) -> Buffer | None: """Encode a single chunk through the full codec chain, synchronously. Pure compute -- no IO. Only callable when all codecs support sync. """ - spec = chunk_spec aa_out: Any = chunk_array - for aa_codec in self.array_array_codecs: + for aa_codec, spec in self._aa_codecs: if aa_out is None: return None aa_out = cast("SupportsSyncCodec", aa_codec)._encode_sync(aa_out, spec) - spec = aa_codec.resolve_metadata(spec) if aa_out is None: return None - bb_out: Any = cast("SupportsSyncCodec", self.array_bytes_codec)._encode_sync(aa_out, spec) - spec = self.array_bytes_codec.resolve_metadata(spec) + bb_out: Any = cast("SupportsSyncCodec", self._ab_codec)._encode_sync(aa_out, self._ab_spec) - for bb_codec in self.bytes_bytes_codecs: + for bb_codec in self._bb_codecs: if bb_out is None: return None - bb_out = cast("SupportsSyncCodec", bb_codec)._encode_sync(bb_out, spec) - spec = bb_codec.resolve_metadata(spec) + bb_out = cast("SupportsSyncCodec", bb_codec)._encode_sync(bb_out, self.chunk_spec) return bb_out # type: ignore[no-any-return] def compute_encoded_size(self, byte_length: int, array_spec: ArraySpec) -> int: - for codec in self: + for codec in self.codecs: byte_length = codec.compute_encoded_size(byte_length, array_spec) array_spec = codec.resolve_metadata(array_spec) return byte_length - def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec: - for codec in self: - chunk_spec = codec.resolve_metadata(chunk_spec) - return chunk_spec - @dataclass(frozen=True) class BatchedCodecPipeline(CodecPipeline): diff --git a/tests/test_sync_codec_pipeline.py b/tests/test_sync_codec_pipeline.py index 23fa28cb04..192479dc59 100644 --- a/tests/test_sync_codec_pipeline.py +++ b/tests/test_sync_codec_pipeline.py @@ -1,9 +1,8 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any +from typing import Any import numpy as np -import pytest from zarr.codecs.bytes import BytesCodec from zarr.codecs.gzip import GzipCodec @@ -11,11 +10,9 @@ from zarr.codecs.zstd import ZstdCodec from zarr.core.array_spec import ArrayConfig, ArraySpec from zarr.core.buffer import NDBuffer, default_buffer_prototype +from zarr.core.codec_pipeline import CodecChain from zarr.core.dtype import get_data_type_from_native_dtype -if TYPE_CHECKING: - from zarr.abc.codec import Codec - def _make_array_spec(shape: tuple[int, ...], dtype: np.dtype[np.generic]) -> ArraySpec: zdtype = get_data_type_from_native_dtype(dtype) @@ -33,124 +30,50 @@ def _make_nd_buffer(arr: np.ndarray[Any, np.dtype[Any]]) -> NDBuffer: class TestCodecChain: - def test_from_codecs_bytes_only(self) -> None: - from zarr.core.codec_pipeline import CodecChain - - chain = CodecChain.from_codecs([BytesCodec()]) - assert chain.array_array_codecs == () - assert isinstance(chain.array_bytes_codec, BytesCodec) - assert chain.bytes_bytes_codecs == () - assert chain._all_sync is True - - def test_from_codecs_with_compression(self) -> None: - from zarr.core.codec_pipeline import CodecChain - - chain = CodecChain.from_codecs([BytesCodec(), GzipCodec()]) - assert isinstance(chain.array_bytes_codec, BytesCodec) - assert len(chain.bytes_bytes_codecs) == 1 - assert isinstance(chain.bytes_bytes_codecs[0], GzipCodec) - assert chain._all_sync is True - - def test_from_codecs_with_transpose(self) -> None: - from zarr.core.codec_pipeline import CodecChain - - chain = CodecChain.from_codecs([TransposeCodec(order=(1, 0)), BytesCodec()]) - assert len(chain.array_array_codecs) == 1 - assert isinstance(chain.array_array_codecs[0], TransposeCodec) - assert isinstance(chain.array_bytes_codec, BytesCodec) - assert chain._all_sync is True + def test_all_sync(self) -> None: + spec = _make_array_spec((100,), np.dtype("float64")) + chain = CodecChain((BytesCodec(),), spec) + assert chain.all_sync is True - def test_from_codecs_full_chain(self) -> None: - from zarr.core.codec_pipeline import CodecChain + def test_all_sync_with_compression(self) -> None: + spec = _make_array_spec((100,), np.dtype("float64")) + chain = CodecChain((BytesCodec(), GzipCodec()), spec) + assert chain.all_sync is True - chain = CodecChain.from_codecs([TransposeCodec(order=(1, 0)), BytesCodec(), ZstdCodec()]) - assert len(chain.array_array_codecs) == 1 - assert isinstance(chain.array_bytes_codec, BytesCodec) - assert len(chain.bytes_bytes_codecs) == 1 - assert chain._all_sync is True - - def test_iter(self) -> None: - from zarr.core.codec_pipeline import CodecChain - - codecs: list[Codec] = [TransposeCodec(order=(1, 0)), BytesCodec(), GzipCodec()] - chain = CodecChain.from_codecs(codecs) - assert list(chain) == codecs - - def test_frozen(self) -> None: - from zarr.core.codec_pipeline import CodecChain - - chain = CodecChain.from_codecs([BytesCodec()]) - with pytest.raises(AttributeError): - chain.array_bytes_codec = BytesCodec() # type: ignore[misc] + def test_all_sync_full_chain(self) -> None: + spec = _make_array_spec((3, 4), np.dtype("float64")) + chain = CodecChain((TransposeCodec(order=(1, 0)), BytesCodec(), ZstdCodec()), spec) + assert chain.all_sync is True def test_encode_decode_roundtrip_bytes_only(self) -> None: - from zarr.core.codec_pipeline import CodecChain - - chain = CodecChain.from_codecs([BytesCodec()]) arr = np.arange(100, dtype="float64") spec = _make_array_spec(arr.shape, arr.dtype) - chain_evolved = CodecChain.from_codecs([c.evolve_from_array_spec(spec) for c in chain]) + chain = CodecChain((BytesCodec(),), spec) nd_buf = _make_nd_buffer(arr) - encoded = chain_evolved.encode_chunk(nd_buf, spec) + encoded = chain.encode_chunk(nd_buf) assert encoded is not None - decoded = chain_evolved.decode_chunk(encoded, spec) - assert decoded is not None + decoded = chain.decode_chunk(encoded) np.testing.assert_array_equal(arr, decoded.as_numpy_array()) def test_encode_decode_roundtrip_with_compression(self) -> None: - from zarr.core.codec_pipeline import CodecChain - - chain = CodecChain.from_codecs([BytesCodec(), GzipCodec(level=1)]) arr = np.arange(100, dtype="float64") spec = _make_array_spec(arr.shape, arr.dtype) - chain_evolved = CodecChain.from_codecs([c.evolve_from_array_spec(spec) for c in chain]) + chain = CodecChain((BytesCodec(), GzipCodec(level=1)), spec) nd_buf = _make_nd_buffer(arr) - encoded = chain_evolved.encode_chunk(nd_buf, spec) + encoded = chain.encode_chunk(nd_buf) assert encoded is not None - decoded = chain_evolved.decode_chunk(encoded, spec) - assert decoded is not None + decoded = chain.decode_chunk(encoded) np.testing.assert_array_equal(arr, decoded.as_numpy_array()) def test_encode_decode_roundtrip_with_transpose(self) -> None: - from zarr.core.codec_pipeline import CodecChain - - chain = CodecChain.from_codecs( - [TransposeCodec(order=(1, 0)), BytesCodec(), ZstdCodec(level=1)] - ) arr = np.arange(12, dtype="float64").reshape(3, 4) spec = _make_array_spec(arr.shape, arr.dtype) - chain_evolved = CodecChain.from_codecs([c.evolve_from_array_spec(spec) for c in chain]) + chain = CodecChain((TransposeCodec(order=(1, 0)), BytesCodec(), ZstdCodec(level=1)), spec) nd_buf = _make_nd_buffer(arr) - encoded = chain_evolved.encode_chunk(nd_buf, spec) + encoded = chain.encode_chunk(nd_buf) assert encoded is not None - decoded = chain_evolved.decode_chunk(encoded, spec) - assert decoded is not None + decoded = chain.decode_chunk(encoded) np.testing.assert_array_equal(arr, decoded.as_numpy_array()) - - def test_resolve_metadata_chain(self) -> None: - from zarr.core.codec_pipeline import CodecChain - - chain = CodecChain.from_codecs([TransposeCodec(order=(1, 0)), BytesCodec(), GzipCodec()]) - arr = np.zeros((3, 4), dtype="float64") - spec = _make_array_spec(arr.shape, arr.dtype) - chain_evolved = CodecChain.from_codecs([c.evolve_from_array_spec(spec) for c in chain]) - - aa_chain, ab_pair, bb_chain = chain_evolved.resolve_metadata_chain(spec) - assert len(aa_chain) == 1 - assert aa_chain[0][1].shape == (3, 4) # spec before transpose - _ab_codec, ab_spec = ab_pair - assert ab_spec.shape == (4, 3) # spec after transpose - assert len(bb_chain) == 1 - - def test_resolve_metadata(self) -> None: - from zarr.core.codec_pipeline import CodecChain - - chain = CodecChain.from_codecs([TransposeCodec(order=(1, 0)), BytesCodec()]) - spec = _make_array_spec((3, 4), np.dtype("float64")) - chain_evolved = CodecChain.from_codecs([c.evolve_from_array_spec(spec) for c in chain]) - resolved = chain_evolved.resolve_metadata(spec) - # After transpose (1,0) + bytes, shape should reflect the transpose - assert resolved.shape == (4, 3) From 5a2a884e249277086fe6d706b25881b13b331f2d Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Wed, 25 Feb 2026 20:50:19 -0500 Subject: [PATCH 4/6] separate codecs and specs --- src/zarr/core/codec_pipeline.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/src/zarr/core/codec_pipeline.py b/src/zarr/core/codec_pipeline.py index 9c0dd292ed..4412ffa705 100644 --- a/src/zarr/core/codec_pipeline.py +++ b/src/zarr/core/codec_pipeline.py @@ -75,35 +75,37 @@ class CodecChain: Constructed from an iterable of codecs and a chunk ArraySpec. Resolves each codec against the spec so that encode/decode can - run without re-resolving. Pure compute only -- no IO, no threading, - no batching. + run without re-resolving. """ codecs: tuple[Codec, ...] chunk_spec: ArraySpec - _aa_codecs: tuple[tuple[ArrayArrayCodec, ArraySpec], ...] = field( - init=False, repr=False, compare=False - ) + _aa_codecs: tuple[ArrayArrayCodec, ...] = field(init=False, repr=False, compare=False) + _aa_specs: tuple[ArraySpec, ...] = field(init=False, repr=False, compare=False) _ab_codec: ArrayBytesCodec = field(init=False, repr=False, compare=False) _ab_spec: ArraySpec = field(init=False, repr=False, compare=False) _bb_codecs: tuple[BytesBytesCodec, ...] = field(init=False, repr=False, compare=False) + _bb_spec: ArraySpec = field(init=False, repr=False, compare=False) _all_sync: bool = field(init=False, repr=False, compare=False) def __post_init__(self) -> None: aa, ab, bb = codecs_from_list(list(self.codecs)) - aa_pairs: list[tuple[ArrayArrayCodec, ArraySpec]] = [] + aa_specs: list[ArraySpec] = [] spec = self.chunk_spec for aa_codec in aa: - aa_pairs.append((aa_codec, spec)) + aa_specs.append(spec) spec = aa_codec.resolve_metadata(spec) - object.__setattr__(self, "_aa_codecs", tuple(aa_pairs)) + object.__setattr__(self, "_aa_codecs", aa) + object.__setattr__(self, "_aa_specs", tuple(aa_specs)) object.__setattr__(self, "_ab_codec", ab) object.__setattr__(self, "_ab_spec", spec) + spec = ab.resolve_metadata(spec) object.__setattr__(self, "_bb_codecs", bb) + object.__setattr__(self, "_bb_spec", spec) object.__setattr__( self, @@ -125,11 +127,11 @@ def decode_chunk( """ bb_out: Any = chunk_bytes for bb_codec in reversed(self._bb_codecs): - bb_out = cast("SupportsSyncCodec", bb_codec)._decode_sync(bb_out, self.chunk_spec) + bb_out = cast("SupportsSyncCodec", bb_codec)._decode_sync(bb_out, self._bb_spec) ab_out: Any = cast("SupportsSyncCodec", self._ab_codec)._decode_sync(bb_out, self._ab_spec) - for aa_codec, spec in reversed(self._aa_codecs): + for aa_codec, spec in zip(reversed(self._aa_codecs), reversed(self._aa_specs), strict=True): ab_out = cast("SupportsSyncCodec", aa_codec)._decode_sync(ab_out, spec) return ab_out # type: ignore[no-any-return] @@ -144,7 +146,7 @@ def encode_chunk( """ aa_out: Any = chunk_array - for aa_codec, spec in self._aa_codecs: + for aa_codec, spec in zip(self._aa_codecs, self._aa_specs, strict=True): if aa_out is None: return None aa_out = cast("SupportsSyncCodec", aa_codec)._encode_sync(aa_out, spec) @@ -156,7 +158,7 @@ def encode_chunk( for bb_codec in self._bb_codecs: if bb_out is None: return None - bb_out = cast("SupportsSyncCodec", bb_codec)._encode_sync(bb_out, self.chunk_spec) + bb_out = cast("SupportsSyncCodec", bb_codec)._encode_sync(bb_out, self._bb_spec) return bb_out # type: ignore[no-any-return] From 4e262b17d120dd0ac9ac2e83a0e576d3e4876038 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Thu, 26 Feb 2026 11:05:55 -0500 Subject: [PATCH 5/6] add synchronous methods to stores --- src/zarr/abc/store.py | 44 +++++++++++- src/zarr/storage/_common.py | 27 +++++++ src/zarr/storage/_local.py | 69 ++++++++++++++++++ src/zarr/storage/_memory.py | 64 ++++++++++++++++- src/zarr/testing/store.py | 136 +++++++++++++++++++++++++++++++++++- tests/test_indexing.py | 17 +++++ 6 files changed, 354 insertions(+), 3 deletions(-) diff --git a/src/zarr/abc/store.py b/src/zarr/abc/store.py index 87df89a683..1d321f5fc3 100644 --- a/src/zarr/abc/store.py +++ b/src/zarr/abc/store.py @@ -16,7 +16,17 @@ from zarr.core.buffer import Buffer, BufferPrototype -__all__ = ["ByteGetter", "ByteSetter", "Store", "set_or_delete"] +__all__ = [ + "ByteGetter", + "ByteSetter", + "Store", + "SupportsDeleteSync", + "SupportsGetSync", + "SupportsSetRangeSync", + "SupportsSetSync", + "SupportsSyncStore", + "set_or_delete", +] @dataclass @@ -700,6 +710,38 @@ async def delete(self) -> None: ... async def set_if_not_exists(self, default: Buffer) -> None: ... +@runtime_checkable +class SupportsGetSync(Protocol): + def get_sync( + self, + key: str, + *, + prototype: BufferPrototype | None = None, + byte_range: ByteRequest | None = None, + ) -> Buffer | None: ... + + +@runtime_checkable +class SupportsSetSync(Protocol): + def set_sync(self, key: str, value: Buffer) -> None: ... + + +@runtime_checkable +class SupportsSetRangeSync(Protocol): + def set_range_sync(self, key: str, value: Buffer, start: int) -> None: ... + + +@runtime_checkable +class SupportsDeleteSync(Protocol): + def delete_sync(self, key: str) -> None: ... + + +@runtime_checkable +class SupportsSyncStore( + SupportsGetSync, SupportsSetSync, SupportsSetRangeSync, SupportsDeleteSync, Protocol +): ... + + async def set_or_delete(byte_setter: ByteSetter, value: Buffer | None) -> None: """Set or delete a value in a byte setter diff --git a/src/zarr/storage/_common.py b/src/zarr/storage/_common.py index 4bea04f024..c14aa1a37d 100644 --- a/src/zarr/storage/_common.py +++ b/src/zarr/storage/_common.py @@ -228,6 +228,33 @@ async def is_empty(self) -> bool: """ return await self.store.is_empty(self.path) + # ------------------------------------------------------------------- + # Synchronous IO delegation + # ------------------------------------------------------------------- + + def get_sync( + self, + *, + prototype: BufferPrototype | None = None, + byte_range: ByteRequest | None = None, + ) -> Buffer | None: + """Synchronous read — delegates to ``self.store.get_sync(self.path, ...)``.""" + if prototype is None: + prototype = default_buffer_prototype() + return self.store.get_sync(self.path, prototype=prototype, byte_range=byte_range) # type: ignore[attr-defined, no-any-return] + + def set_sync(self, value: Buffer) -> None: + """Synchronous write — delegates to ``self.store.set_sync(self.path, value)``.""" + self.store.set_sync(self.path, value) # type: ignore[attr-defined] + + def set_range_sync(self, value: Buffer, start: int) -> None: + """Synchronous byte-range write.""" + self.store.set_range_sync(self.path, value, start) # type: ignore[attr-defined] + + def delete_sync(self) -> None: + """Synchronous delete — delegates to ``self.store.delete_sync(self.path)``.""" + self.store.delete_sync(self.path) # type: ignore[attr-defined] + def __truediv__(self, other: str) -> StorePath: """Combine this store path with another path""" return self.__class__(self.store, _dereference_path(self.path, other)) diff --git a/src/zarr/storage/_local.py b/src/zarr/storage/_local.py index 80233a112d..dc05ff67a7 100644 --- a/src/zarr/storage/_local.py +++ b/src/zarr/storage/_local.py @@ -85,6 +85,19 @@ def _put(path: Path, value: Buffer, exclusive: bool = False) -> int: return f.write(view) +def _put_range(path: Path, value: Buffer, start: int) -> None: + view = value.as_buffer_like() + file_size = path.stat().st_size + if start + len(view) > file_size: + raise ValueError( + f"set_range would write beyond the end of the stored value: " + f"start={start}, len(value)={len(view)}, stored size={file_size}" + ) + with path.open("r+b") as f: + f.seek(start) + f.write(view) + + class LocalStore(Store): """ Store for the local file system. @@ -187,6 +200,62 @@ def __repr__(self) -> str: def __eq__(self, other: object) -> bool: return isinstance(other, type(self)) and self.root == other.root + # ------------------------------------------------------------------- + # Synchronous store methods + # ------------------------------------------------------------------- + + def _ensure_open_sync(self) -> None: + if not self._is_open: + if not self.read_only: + self.root.mkdir(parents=True, exist_ok=True) + if not self.root.exists(): + raise FileNotFoundError(f"{self.root} does not exist") + self._is_open = True + + def get_sync( + self, + key: str, + *, + prototype: BufferPrototype | None = None, + byte_range: ByteRequest | None = None, + ) -> Buffer | None: + if prototype is None: + prototype = default_buffer_prototype() + self._ensure_open_sync() + assert isinstance(key, str) + path = self.root / key + try: + return _get(path, prototype, byte_range) + except (FileNotFoundError, IsADirectoryError, NotADirectoryError): + return None + + def set_sync(self, key: str, value: Buffer) -> None: + self._ensure_open_sync() + self._check_writable() + assert isinstance(key, str) + if not isinstance(value, Buffer): + raise TypeError( + f"LocalStore.set(): `value` must be a Buffer instance. " + f"Got an instance of {type(value)} instead." + ) + path = self.root / key + _put(path, value) + + def set_range_sync(self, key: str, value: Buffer, start: int) -> None: + self._ensure_open_sync() + self._check_writable() + path = self.root / key + _put_range(path, value, start) + + def delete_sync(self, key: str) -> None: + self._ensure_open_sync() + self._check_writable() + path = self.root / key + if path.is_dir(): + shutil.rmtree(path) + else: + path.unlink(missing_ok=True) + async def get( self, key: str, diff --git a/src/zarr/storage/_memory.py b/src/zarr/storage/_memory.py index e6f9b7a512..168f3e8890 100644 --- a/src/zarr/storage/_memory.py +++ b/src/zarr/storage/_memory.py @@ -77,6 +77,49 @@ def __eq__(self, other: object) -> bool: and self.read_only == other.read_only ) + # ------------------------------------------------------------------- + # Synchronous store methods + # ------------------------------------------------------------------- + + def get_sync( + self, + key: str, + *, + prototype: BufferPrototype | None = None, + byte_range: ByteRequest | None = None, + ) -> Buffer | None: + if prototype is None: + prototype = default_buffer_prototype() + if not self._is_open: + self._is_open = True + assert isinstance(key, str) + try: + value = self._store_dict[key] + start, stop = _normalize_byte_range_index(value, byte_range) + return prototype.buffer.from_buffer(value[start:stop]) + except KeyError: + return None + + def set_sync(self, key: str, value: Buffer) -> None: + self._check_writable() + if not self._is_open: + self._is_open = True + assert isinstance(key, str) + if not isinstance(value, Buffer): + raise TypeError( + f"MemoryStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead." + ) + self._store_dict[key] = value + + def delete_sync(self, key: str) -> None: + self._check_writable() + if not self._is_open: + self._is_open = True + try: + del self._store_dict[key] + except KeyError: + logger.debug("Key %s does not exist.", key) + async def get( self, key: str, @@ -122,7 +165,6 @@ async def set(self, key: str, value: Buffer, byte_range: tuple[int, int] | None raise TypeError( f"MemoryStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead." ) - if byte_range is not None: buf = self._store_dict[key] buf[byte_range[0] : byte_range[1]] = value @@ -130,6 +172,26 @@ async def set(self, key: str, value: Buffer, byte_range: tuple[int, int] | None else: self._store_dict[key] = value + def _set_range_impl(self, key: str, value: Buffer, start: int) -> None: + buf = self._store_dict[key] + target = buf.as_numpy_array() + if start + len(value) > len(target): + raise ValueError( + f"set_range would write beyond the end of the stored value: " + f"start={start}, len(value)={len(value)}, stored size={len(target)}" + ) + if not target.flags.writeable: + target = target.copy() + self._store_dict[key] = buf.__class__(target) + target[start : start + len(value)] = value.as_numpy_array() + + def set_range_sync(self, key: str, value: Buffer, start: int) -> None: + """Synchronous byte-range write.""" + self._check_writable() + if not self._is_open: + self._is_open = True + self._set_range_impl(key, value, start) + async def set_if_not_exists(self, key: str, value: Buffer) -> None: # docstring inherited self._check_writable() diff --git a/src/zarr/testing/store.py b/src/zarr/testing/store.py index 1b8e85ed98..bb60cc371f 100644 --- a/src/zarr/testing/store.py +++ b/src/zarr/testing/store.py @@ -6,12 +6,13 @@ from abc import abstractmethod from typing import TYPE_CHECKING, Generic, Self, TypeVar +import numpy as np + from zarr.storage import WrapperStore if TYPE_CHECKING: from typing import Any - from zarr.abc.store import ByteRequest from zarr.core.buffer.core import BufferPrototype import pytest @@ -22,6 +23,10 @@ RangeByteRequest, Store, SuffixByteRequest, + SupportsDeleteSync, + SupportsGetSync, + SupportsSetRangeSync, + SupportsSetSync, ) from zarr.core.buffer import Buffer, default_buffer_prototype from zarr.core.sync import _collect_aiterator, sync @@ -39,6 +44,34 @@ class StoreTests(Generic[S, B]): store_cls: type[S] buffer_cls: type[B] + @staticmethod + def _require_get_sync(store: S) -> SupportsGetSync: + """Skip unless *store* implements :class:`SupportsGetSync`.""" + if not isinstance(store, SupportsGetSync): + pytest.skip("store does not implement SupportsGetSync") + return store # type: ignore[unreachable] + + @staticmethod + def _require_set_sync(store: S) -> SupportsSetSync: + """Skip unless *store* implements :class:`SupportsSetSync`.""" + if not isinstance(store, SupportsSetSync): + pytest.skip("store does not implement SupportsSetSync") + return store # type: ignore[unreachable] + + @staticmethod + def _require_set_range_sync(store: S) -> SupportsSetRangeSync: + """Skip unless *store* implements :class:`SupportsSetRangeSync`.""" + if not isinstance(store, SupportsSetRangeSync): + pytest.skip("store does not implement SupportsSetRangeSync") + return store # type: ignore[unreachable] + + @staticmethod + def _require_delete_sync(store: S) -> SupportsDeleteSync: + """Skip unless *store* implements :class:`SupportsDeleteSync`.""" + if not isinstance(store, SupportsDeleteSync): + pytest.skip("store does not implement SupportsDeleteSync") + return store # type: ignore[unreachable] + @abstractmethod async def set(self, store: S, key: str, value: Buffer) -> None: """ @@ -579,6 +612,107 @@ def test_get_json_sync(self, store: S) -> None: sync(self.set(store, key, self.buffer_cls.from_bytes(data_bytes))) assert store._get_json_sync(key, prototype=default_buffer_prototype()) == data + # ------------------------------------------------------------------- + # Synchronous store methods (SupportsSyncStore protocol) + # ------------------------------------------------------------------- + + def test_get_sync(self, store: S) -> None: + getter = self._require_get_sync(store) + data_buf = self.buffer_cls.from_bytes(b"\x01\x02\x03\x04") + key = "sync_get" + sync(self.set(store, key, data_buf)) + result = getter.get_sync(key) + assert result is not None + assert_bytes_equal(result, data_buf) + + def test_get_sync_missing(self, store: S) -> None: + getter = self._require_get_sync(store) + result = getter.get_sync("nonexistent") + assert result is None + + def test_set_sync(self, store: S) -> None: + setter = self._require_set_sync(store) + data_buf = self.buffer_cls.from_bytes(b"\x01\x02\x03\x04") + key = "sync_set" + setter.set_sync(key, data_buf) + result = sync(self.get(store, key)) + assert_bytes_equal(result, data_buf) + + def test_delete_sync(self, store: S) -> None: + setter = self._require_set_sync(store) + deleter = self._require_delete_sync(store) + getter = self._require_get_sync(store) + if not store.supports_deletes: + pytest.skip("store does not support deletes") + data_buf = self.buffer_cls.from_bytes(b"\x01\x02\x03\x04") + key = "sync_delete" + setter.set_sync(key, data_buf) + deleter.delete_sync(key) + result = getter.get_sync(key) + assert result is None + + def test_delete_sync_missing(self, store: S) -> None: + deleter = self._require_delete_sync(store) + if not store.supports_deletes: + pytest.skip("store does not support deletes") + # should not raise + deleter.delete_sync("nonexistent_sync") + + # ------------------------------------------------------------------- + # set_range (sync only — set_range is exclusively a sync-path API) + # ------------------------------------------------------------------- + + def test_set_range_sync(self, store: S) -> None: + setter = self._require_set_sync(store) + ranger = self._require_set_range_sync(store) + getter = self._require_get_sync(store) + data_buf = self.buffer_cls.from_bytes(b"hello world") + key = "range_sync_key" + setter.set_sync(key, data_buf) + patch = default_buffer_prototype().buffer.from_bytes(b"WORLD") + ranger.set_range_sync(key, patch, 6) + result = getter.get_sync(key) + assert result is not None + assert result.to_bytes() == b"hello WORLD" + + def test_set_range_sync_preserves_other_bytes(self, store: S) -> None: + setter = self._require_set_sync(store) + ranger = self._require_set_range_sync(store) + getter = self._require_get_sync(store) + data = np.arange(100, dtype="uint8") + data_buf = default_buffer_prototype().buffer.from_array_like(data) + key = "range_preserve" + setter.set_sync(key, data_buf) + patch = np.full(10, 255, dtype="uint8") + patch_buf = default_buffer_prototype().buffer.from_array_like(patch) + ranger.set_range_sync(key, patch_buf, 50) + result = getter.get_sync(key) + assert result is not None + result_arr = np.frombuffer(result.to_bytes(), dtype="uint8") + expected = data.copy() + expected[50:60] = 255 + np.testing.assert_array_equal(result_arr, expected) + + def test_set_range_sync_beyond_end_raises(self, store: S) -> None: + setter = self._require_set_sync(store) + ranger = self._require_set_range_sync(store) + data_buf = self.buffer_cls.from_bytes(b"hello") + key = "range_oob" + setter.set_sync(key, data_buf) + patch = default_buffer_prototype().buffer.from_bytes(b"world!") + with pytest.raises(ValueError, match="set_range would write beyond"): + ranger.set_range_sync(key, patch, 0) + + def test_set_range_sync_start_beyond_end_raises(self, store: S) -> None: + setter = self._require_set_sync(store) + ranger = self._require_set_range_sync(store) + data_buf = self.buffer_cls.from_bytes(b"hello") + key = "range_oob2" + setter.set_sync(key, data_buf) + patch = default_buffer_prototype().buffer.from_bytes(b"x") + with pytest.raises(ValueError, match="set_range would write beyond"): + ranger.set_range_sync(key, patch, 10) + class LatencyStore(WrapperStore[Store]): """ diff --git a/tests/test_indexing.py b/tests/test_indexing.py index c0bf7dd270..9c734fb0c3 100644 --- a/tests/test_indexing.py +++ b/tests/test_indexing.py @@ -34,6 +34,7 @@ if TYPE_CHECKING: from collections.abc import AsyncGenerator + from zarr.abc.store import ByteRequest from zarr.core.buffer import BufferPrototype from zarr.core.buffer.core import Buffer @@ -83,6 +84,22 @@ async def set(self, key: str, value: Buffer, byte_range: tuple[int, int] | None self.counter["__setitem__", key_suffix] += 1 return await super().set(key, value, byte_range) + def get_sync( + self, + key: str, + *, + prototype: BufferPrototype | None = None, + byte_range: ByteRequest | None = None, + ) -> Buffer | None: + key_suffix = "/".join(key.split("/")[1:]) + self.counter["__getitem__", key_suffix] += 1 + return super().get_sync(key, prototype=prototype, byte_range=byte_range) + + def set_sync(self, key: str, value: Buffer) -> None: + key_suffix = "/".join(key.split("/")[1:]) + self.counter["__setitem__", key_suffix] += 1 + return super().set_sync(key, value) + def test_normalize_integer_selection() -> None: assert 1 == normalize_integer_selection(1, 100) From 71a780b157164667c7684a2c0485b77d2afc530f Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 27 Feb 2026 14:40:52 -0500 Subject: [PATCH 6/6] chunktransform --- src/zarr/core/codec_pipeline.py | 66 +++++++++++++++++-------------- tests/test_sync_codec_pipeline.py | 47 ++++++++++++++++++---- 2 files changed, 75 insertions(+), 38 deletions(-) diff --git a/src/zarr/core/codec_pipeline.py b/src/zarr/core/codec_pipeline.py index 4412ffa705..0c3cccb1d9 100644 --- a/src/zarr/core/codec_pipeline.py +++ b/src/zarr/core/codec_pipeline.py @@ -69,49 +69,55 @@ def fill_value_or_default(chunk_spec: ArraySpec) -> Any: return fill_value -@dataclass(frozen=True, slots=True) -class CodecChain: - """Codec chain with pre-resolved metadata specs. +@dataclass(slots=True, kw_only=True) +class ChunkTransform: + """A stored chunk, modeled as a layered array. - Constructed from an iterable of codecs and a chunk ArraySpec. - Resolves each codec against the spec so that encode/decode can - run without re-resolving. + Each layer corresponds to one ArrayArrayCodec and the ArraySpec + at its input boundary. ``layers[0]`` is the outermost (user-visible) + transform; after the last layer comes the ArrayBytesCodec. + + The chunk's ``shape`` and ``dtype`` reflect the representation + **after** all ArrayArrayCodec layers have been applied — i.e. the + spec that feeds the ArrayBytesCodec. """ codecs: tuple[Codec, ...] - chunk_spec: ArraySpec + array_spec: ArraySpec - _aa_codecs: tuple[ArrayArrayCodec, ...] = field(init=False, repr=False, compare=False) - _aa_specs: tuple[ArraySpec, ...] = field(init=False, repr=False, compare=False) + # Each element is (ArrayArrayCodec, input_spec_for_that_codec). + layers: tuple[tuple[ArrayArrayCodec, ArraySpec], ...] = field( + init=False, repr=False, compare=False + ) _ab_codec: ArrayBytesCodec = field(init=False, repr=False, compare=False) _ab_spec: ArraySpec = field(init=False, repr=False, compare=False) _bb_codecs: tuple[BytesBytesCodec, ...] = field(init=False, repr=False, compare=False) - _bb_spec: ArraySpec = field(init=False, repr=False, compare=False) _all_sync: bool = field(init=False, repr=False, compare=False) def __post_init__(self) -> None: aa, ab, bb = codecs_from_list(list(self.codecs)) - aa_specs: list[ArraySpec] = [] - spec = self.chunk_spec + layers: tuple[tuple[ArrayArrayCodec, ArraySpec], ...] = () + spec = self.array_spec for aa_codec in aa: - aa_specs.append(spec) + layers = (*layers, (aa_codec, spec)) spec = aa_codec.resolve_metadata(spec) - object.__setattr__(self, "_aa_codecs", aa) - object.__setattr__(self, "_aa_specs", tuple(aa_specs)) - object.__setattr__(self, "_ab_codec", ab) - object.__setattr__(self, "_ab_spec", spec) + self.layers = layers + self._ab_codec = ab + self._ab_spec = spec + self._bb_codecs = bb + self._all_sync = all(isinstance(c, SupportsSyncCodec) for c in self.codecs) - spec = ab.resolve_metadata(spec) - object.__setattr__(self, "_bb_codecs", bb) - object.__setattr__(self, "_bb_spec", spec) + @property + def shape(self) -> tuple[int, ...]: + """Shape after all ArrayArrayCodec layers (input to the ArrayBytesCodec).""" + return self._ab_spec.shape - object.__setattr__( - self, - "_all_sync", - all(isinstance(c, SupportsSyncCodec) for c in self.codecs), - ) + @property + def dtype(self) -> ZDType[TBaseDType, TBaseScalar]: + """Dtype after all ArrayArrayCodec layers (input to the ArrayBytesCodec).""" + return self._ab_spec.dtype @property def all_sync(self) -> bool: @@ -127,11 +133,11 @@ def decode_chunk( """ bb_out: Any = chunk_bytes for bb_codec in reversed(self._bb_codecs): - bb_out = cast("SupportsSyncCodec", bb_codec)._decode_sync(bb_out, self._bb_spec) + bb_out = cast("SupportsSyncCodec", bb_codec)._decode_sync(bb_out, self._ab_spec) ab_out: Any = cast("SupportsSyncCodec", self._ab_codec)._decode_sync(bb_out, self._ab_spec) - for aa_codec, spec in zip(reversed(self._aa_codecs), reversed(self._aa_specs), strict=True): + for aa_codec, spec in reversed(self.layers): ab_out = cast("SupportsSyncCodec", aa_codec)._decode_sync(ab_out, spec) return ab_out # type: ignore[no-any-return] @@ -146,7 +152,7 @@ def encode_chunk( """ aa_out: Any = chunk_array - for aa_codec, spec in zip(self._aa_codecs, self._aa_specs, strict=True): + for aa_codec, spec in self.layers: if aa_out is None: return None aa_out = cast("SupportsSyncCodec", aa_codec)._encode_sync(aa_out, spec) @@ -158,7 +164,7 @@ def encode_chunk( for bb_codec in self._bb_codecs: if bb_out is None: return None - bb_out = cast("SupportsSyncCodec", bb_codec)._encode_sync(bb_out, self._bb_spec) + bb_out = cast("SupportsSyncCodec", bb_codec)._encode_sync(bb_out, self._ab_spec) return bb_out # type: ignore[no-any-return] @@ -369,7 +375,7 @@ async def read_batch( out[out_selection] = fill_value_or_default(chunk_spec) else: chunk_bytes_batch = await concurrent_map( - [(byte_getter, array_spec.prototype) for byte_getter, array_spec, *_ in batch_info], + [(byte_getter, chunk_spec.prototype) for byte_getter, chunk_spec, *_ in batch_info], lambda byte_getter, prototype: byte_getter.get(prototype), config.get("async.concurrency"), ) diff --git a/tests/test_sync_codec_pipeline.py b/tests/test_sync_codec_pipeline.py index 192479dc59..e9d05dcec6 100644 --- a/tests/test_sync_codec_pipeline.py +++ b/tests/test_sync_codec_pipeline.py @@ -10,7 +10,7 @@ from zarr.codecs.zstd import ZstdCodec from zarr.core.array_spec import ArrayConfig, ArraySpec from zarr.core.buffer import NDBuffer, default_buffer_prototype -from zarr.core.codec_pipeline import CodecChain +from zarr.core.codec_pipeline import ChunkTransform from zarr.core.dtype import get_data_type_from_native_dtype @@ -29,26 +29,28 @@ def _make_nd_buffer(arr: np.ndarray[Any, np.dtype[Any]]) -> NDBuffer: return default_buffer_prototype().nd_buffer.from_numpy_array(arr) -class TestCodecChain: +class TestChunkTransform: def test_all_sync(self) -> None: spec = _make_array_spec((100,), np.dtype("float64")) - chain = CodecChain((BytesCodec(),), spec) + chain = ChunkTransform(codecs=(BytesCodec(),), array_spec=spec) assert chain.all_sync is True def test_all_sync_with_compression(self) -> None: spec = _make_array_spec((100,), np.dtype("float64")) - chain = CodecChain((BytesCodec(), GzipCodec()), spec) + chain = ChunkTransform(codecs=(BytesCodec(), GzipCodec()), array_spec=spec) assert chain.all_sync is True def test_all_sync_full_chain(self) -> None: spec = _make_array_spec((3, 4), np.dtype("float64")) - chain = CodecChain((TransposeCodec(order=(1, 0)), BytesCodec(), ZstdCodec()), spec) + chain = ChunkTransform( + codecs=(TransposeCodec(order=(1, 0)), BytesCodec(), ZstdCodec()), array_spec=spec + ) assert chain.all_sync is True def test_encode_decode_roundtrip_bytes_only(self) -> None: arr = np.arange(100, dtype="float64") spec = _make_array_spec(arr.shape, arr.dtype) - chain = CodecChain((BytesCodec(),), spec) + chain = ChunkTransform(codecs=(BytesCodec(),), array_spec=spec) nd_buf = _make_nd_buffer(arr) encoded = chain.encode_chunk(nd_buf) @@ -56,10 +58,36 @@ def test_encode_decode_roundtrip_bytes_only(self) -> None: decoded = chain.decode_chunk(encoded) np.testing.assert_array_equal(arr, decoded.as_numpy_array()) + def test_layers_no_aa_codecs(self) -> None: + spec = _make_array_spec((100,), np.dtype("float64")) + chunk = ChunkTransform(codecs=(BytesCodec(), GzipCodec()), array_spec=spec) + assert chunk.layers == () + + def test_layers_with_transpose(self) -> None: + spec = _make_array_spec((3, 4), np.dtype("float64")) + transpose = TransposeCodec(order=(1, 0)) + chunk = ChunkTransform(codecs=(transpose, BytesCodec(), ZstdCodec()), array_spec=spec) + assert len(chunk.layers) == 1 + assert chunk.layers[0][0] is transpose + assert chunk.layers[0][1] is spec + + def test_shape_dtype_no_aa_codecs(self) -> None: + spec = _make_array_spec((100,), np.dtype("float64")) + chunk = ChunkTransform(codecs=(BytesCodec(),), array_spec=spec) + assert chunk.shape == (100,) + assert chunk.dtype == spec.dtype + + def test_shape_dtype_with_transpose(self) -> None: + spec = _make_array_spec((3, 4), np.dtype("float64")) + chunk = ChunkTransform(codecs=(TransposeCodec(order=(1, 0)), BytesCodec()), array_spec=spec) + # After transpose (1,0), shape (3,4) becomes (4,3) + assert chunk.shape == (4, 3) + assert chunk.dtype == spec.dtype + def test_encode_decode_roundtrip_with_compression(self) -> None: arr = np.arange(100, dtype="float64") spec = _make_array_spec(arr.shape, arr.dtype) - chain = CodecChain((BytesCodec(), GzipCodec(level=1)), spec) + chain = ChunkTransform(codecs=(BytesCodec(), GzipCodec(level=1)), array_spec=spec) nd_buf = _make_nd_buffer(arr) encoded = chain.encode_chunk(nd_buf) @@ -70,7 +98,10 @@ def test_encode_decode_roundtrip_with_compression(self) -> None: def test_encode_decode_roundtrip_with_transpose(self) -> None: arr = np.arange(12, dtype="float64").reshape(3, 4) spec = _make_array_spec(arr.shape, arr.dtype) - chain = CodecChain((TransposeCodec(order=(1, 0)), BytesCodec(), ZstdCodec(level=1)), spec) + chain = ChunkTransform( + codecs=(TransposeCodec(order=(1, 0)), BytesCodec(), ZstdCodec(level=1)), + array_spec=spec, + ) nd_buf = _make_nd_buffer(arr) encoded = chain.encode_chunk(nd_buf)