Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
359 changes: 357 additions & 2 deletions src/zarr/abc/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

from abc import abstractmethod
from collections.abc import Mapping
from typing import TYPE_CHECKING, Generic, TypeGuard, TypeVar
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Generic, Protocol, TypeGuard, TypeVar, runtime_checkable

from typing_extensions import ReadOnly, TypedDict

Expand All @@ -19,7 +20,7 @@
from zarr.core.array_spec import ArraySpec
from zarr.core.chunk_grids import ChunkGrid
from zarr.core.dtype.wrapper import TBaseDType, TBaseScalar, ZDType
from zarr.core.indexing import SelectorTuple
from zarr.core.indexing import ChunkProjection, SelectorTuple
from zarr.core.metadata import ArrayMetadata

__all__ = [
Expand All @@ -32,6 +33,9 @@
"CodecInput",
"CodecOutput",
"CodecPipeline",
"PreparedWrite",
"SupportsChunkCodec",
"SupportsSyncCodec",
]

CodecInput = TypeVar("CodecInput", bound=NDBuffer | Buffer)
Expand Down Expand Up @@ -59,6 +63,36 @@ def _check_codecjson_v2(data: object) -> TypeGuard[CodecJSON_V2[str]]:
"""The widest type of JSON-like input that could specify a codec."""


@runtime_checkable
class SupportsSyncCodec(Protocol):
"""Protocol for codecs that support synchronous encode/decode.

Codecs implementing this protocol provide ``_decode_sync`` and ``_encode_sync``
methods that perform encoding/decoding without requiring an async event loop.
"""

def _decode_sync(
self, chunk_data: NDBuffer | Buffer, chunk_spec: ArraySpec
) -> NDBuffer | Buffer: ...

def _encode_sync(
self, chunk_data: NDBuffer | Buffer, chunk_spec: ArraySpec
) -> NDBuffer | Buffer | None: ...


class SupportsChunkCodec(Protocol):
"""Protocol for objects that can decode/encode whole chunks synchronously.

[`ChunkTransform`][zarr.core.codec_pipeline.ChunkTransform] satisfies this protocol.
"""

array_spec: ArraySpec

def decode_chunk(self, chunk_bytes: Buffer) -> NDBuffer: ...

def encode_chunk(self, chunk_array: NDBuffer) -> Buffer | None: ...


class BaseCodec(Metadata, Generic[CodecInput, CodecOutput]):
"""Generic base class for codecs.

Expand Down Expand Up @@ -186,9 +220,330 @@ class ArrayArrayCodec(BaseCodec[NDBuffer, NDBuffer]):
"""Base class for array-to-array codecs."""


@dataclass
class PreparedWrite:
"""Result of the prepare phase of a write operation.

Carries deserialized chunk data and selection metadata between
[`prepare_write`][zarr.abc.codec.ArrayBytesCodec.prepare_write] (or
[`prepare_write_sync`][zarr.abc.codec.ArrayBytesCodec.prepare_write_sync])
and [`finalize_write`][zarr.abc.codec.ArrayBytesCodec.finalize_write] (or
[`finalize_write_sync`][zarr.abc.codec.ArrayBytesCodec.finalize_write_sync]).

Attributes
----------
chunk_dict : dict[tuple[int, ...], Buffer | None]
Per-inner-chunk buffers keyed by chunk coordinates.
indexer : list[ChunkProjection]
Mapping from inner-chunk coordinates to value/output selections.
"""

chunk_dict: dict[tuple[int, ...], Buffer | None]
indexer: list[ChunkProjection]


class ArrayBytesCodec(BaseCodec[NDBuffer, Buffer]):
"""Base class for array-to-bytes codecs."""

@property
def inner_codec_chain(self) -> SupportsChunkCodec | None:
"""The codec chain for decoding inner chunks after deserialization.

Returns ``None`` by default, meaning the pipeline should use its own
codec chain. ``ShardingCodec`` overrides this to return its inner
codec chain.

Returns
-------
SupportsChunkCodec or None
A [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec] instance,
or ``None``.
"""
return None

def deserialize(
self, raw: Buffer | None, chunk_spec: ArraySpec
) -> dict[tuple[int, ...], Buffer | None]:
"""Unpack stored bytes into per-inner-chunk buffers.

The default implementation returns a single-entry dict keyed at
``(0,)``. ``ShardingCodec`` overrides this to decode the shard index
and split the blob into per-chunk buffers.

Parameters
----------
raw : Buffer or None
The raw bytes read from the store, or ``None`` if the key was
absent.
chunk_spec : ArraySpec
The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk.

Returns
-------
dict[tuple[int, ...], Buffer | None]
Mapping from inner-chunk coordinates to their encoded bytes.
"""
return {(0,): raw}

def serialize(
self,
chunk_dict: dict[tuple[int, ...], Buffer | None],
chunk_spec: ArraySpec,
) -> Buffer | None:
"""Pack per-inner-chunk buffers into a storage blob.

The default implementation returns the single entry at ``(0,)``.
``ShardingCodec`` overrides this to concatenate chunks and build a
shard index.

Parameters
----------
chunk_dict : dict[tuple[int, ...], Buffer | None]
Mapping from inner-chunk coordinates to their encoded bytes.
chunk_spec : ArraySpec
The [`ArraySpec`][zarr.core.array_spec.ArraySpec] for the chunk.

Returns
-------
Buffer or None
The serialized blob, or ``None`` when all chunks are empty
(the caller should delete the key).
"""
return chunk_dict.get((0,))

# ------------------------------------------------------------------
# prepare / finalize — sync
# ------------------------------------------------------------------

def prepare_read_sync(
self,
byte_getter: Any,
chunk_selection: SelectorTuple,
codec_chain: SupportsChunkCodec,
) -> NDBuffer | None:
"""Read a chunk from the store synchronously, decode it, and
return the selected region.

Parameters
----------
byte_getter : Any
An object supporting ``get_sync`` (e.g.
[`StorePath`][zarr.storage._common.StorePath]).
chunk_selection : SelectorTuple
Selection within the decoded chunk array.
codec_chain : SupportsChunkCodec
The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec] used to
decode the chunk. Must carry an ``array_spec`` attribute.

Returns
-------
NDBuffer or None
The decoded chunk data at *chunk_selection*, or ``None`` if the
chunk does not exist in the store.
"""
raw = byte_getter.get_sync(prototype=codec_chain.array_spec.prototype)
if raw is None:
return None
chunk_array = codec_chain.decode_chunk(raw)
return chunk_array[chunk_selection]

def prepare_write_sync(
self,
byte_setter: Any,
codec_chain: SupportsChunkCodec,
chunk_selection: SelectorTuple,
out_selection: SelectorTuple,
replace: bool,
) -> PreparedWrite:
"""Prepare a synchronous write by optionally reading existing data.

When *replace* is ``False``, the existing chunk bytes are fetched
from the store so they can be merged with the new data. When
*replace* is ``True``, the fetch is skipped.

Parameters
----------
byte_setter : Any
An object supporting ``get_sync`` and ``set_sync`` (e.g.
[`StorePath`][zarr.storage._common.StorePath]).
codec_chain : SupportsChunkCodec
The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec]
carrying the ``array_spec`` for the chunk.
chunk_selection : SelectorTuple
Selection within the chunk being written.
out_selection : SelectorTuple
Corresponding selection within the source value array.
replace : bool
If ``True``, the write replaces all data in the chunk and no
read-modify-write is needed. If ``False``, existing data is
fetched first.

Returns
-------
PreparedWrite
A [`PreparedWrite`][zarr.abc.codec.PreparedWrite] carrying the
deserialized chunk data and selection metadata.
"""
chunk_spec = codec_chain.array_spec
existing: Buffer | None = None
if not replace:
existing = byte_setter.get_sync(prototype=chunk_spec.prototype)
chunk_dict = self.deserialize(existing, chunk_spec)
return PreparedWrite(
chunk_dict=chunk_dict,
indexer=[
( # type: ignore[list-item]
(0,),
chunk_selection,
out_selection,
replace,
)
],
)

def finalize_write_sync(
self,
prepared: PreparedWrite,
codec_chain: SupportsChunkCodec,
byte_setter: Any,
) -> None:
"""Serialize the prepared chunk data and write it to the store.

If serialization produces ``None`` (all chunks empty), the key is
deleted instead.

Parameters
----------
prepared : PreparedWrite
The [`PreparedWrite`][zarr.abc.codec.PreparedWrite] returned by
[`prepare_write_sync`][zarr.abc.codec.ArrayBytesCodec.prepare_write_sync].
codec_chain : SupportsChunkCodec
The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec]
carrying the ``array_spec`` for the chunk.
byte_setter : Any
An object supporting ``set_sync`` and ``delete_sync`` (e.g.
[`StorePath`][zarr.storage._common.StorePath]).
"""
blob = self.serialize(prepared.chunk_dict, codec_chain.array_spec)
if blob is None:
byte_setter.delete_sync()
else:
byte_setter.set_sync(blob)

# ------------------------------------------------------------------
# prepare / finalize — async
# ------------------------------------------------------------------

async def prepare_read(
self,
byte_getter: Any,
chunk_selection: SelectorTuple,
codec_chain: SupportsChunkCodec,
) -> NDBuffer | None:
"""Async variant of
[`prepare_read_sync`][zarr.abc.codec.ArrayBytesCodec.prepare_read_sync].

Parameters
----------
byte_getter : Any
An object supporting ``get`` (e.g.
[`StorePath`][zarr.storage._common.StorePath]).
chunk_selection : SelectorTuple
Selection within the decoded chunk array.
codec_chain : SupportsChunkCodec
The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec] used to
decode the chunk. Must carry an ``array_spec`` attribute.

Returns
-------
NDBuffer or None
The decoded chunk data at *chunk_selection*, or ``None`` if the
chunk does not exist in the store.
"""
raw = await byte_getter.get(prototype=codec_chain.array_spec.prototype)
if raw is None:
return None
chunk_array = codec_chain.decode_chunk(raw)
return chunk_array[chunk_selection]

async def prepare_write(
self,
byte_setter: Any,
codec_chain: SupportsChunkCodec,
chunk_selection: SelectorTuple,
out_selection: SelectorTuple,
replace: bool,
) -> PreparedWrite:
"""Async variant of
[`prepare_write_sync`][zarr.abc.codec.ArrayBytesCodec.prepare_write_sync].

Parameters
----------
byte_setter : Any
An object supporting ``get`` and ``set`` (e.g.
[`StorePath`][zarr.storage._common.StorePath]).
codec_chain : SupportsChunkCodec
The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec]
carrying the ``array_spec`` for the chunk.
chunk_selection : SelectorTuple
Selection within the chunk being written.
out_selection : SelectorTuple
Corresponding selection within the source value array.
replace : bool
If ``True``, the write replaces all data in the chunk and no
read-modify-write is needed. If ``False``, existing data is
fetched first.

Returns
-------
PreparedWrite
A [`PreparedWrite`][zarr.abc.codec.PreparedWrite] carrying the
deserialized chunk data and selection metadata.
"""
chunk_spec = codec_chain.array_spec
existing: Buffer | None = None
if not replace:
existing = await byte_setter.get(prototype=chunk_spec.prototype)
chunk_dict = self.deserialize(existing, chunk_spec)
return PreparedWrite(
chunk_dict=chunk_dict,
indexer=[
( # type: ignore[list-item]
(0,),
chunk_selection,
out_selection,
replace,
)
],
)

async def finalize_write(
self,
prepared: PreparedWrite,
codec_chain: SupportsChunkCodec,
byte_setter: Any,
) -> None:
"""Async variant of
[`finalize_write_sync`][zarr.abc.codec.ArrayBytesCodec.finalize_write_sync].

Parameters
----------
prepared : PreparedWrite
The [`PreparedWrite`][zarr.abc.codec.PreparedWrite] returned by
[`prepare_write`][zarr.abc.codec.ArrayBytesCodec.prepare_write].
codec_chain : SupportsChunkCodec
The [`SupportsChunkCodec`][zarr.abc.codec.SupportsChunkCodec]
carrying the ``array_spec`` for the chunk.
byte_setter : Any
An object supporting ``set`` and ``delete`` (e.g.
[`StorePath`][zarr.storage._common.StorePath]).
"""
blob = self.serialize(prepared.chunk_dict, codec_chain.array_spec)
if blob is None:
await byte_setter.delete()
else:
await byte_setter.set(blob)


class BytesBytesCodec(BaseCodec[Buffer, Buffer]):
"""Base class for bytes-to-bytes codecs."""
Expand Down
Loading
Loading