Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
f427898
sketch out sync codecs + threadpool
d-v-b Feb 18, 2026
dbdc3d4
Merge branch 'main' into perf/faster-codecs
d-v-b Feb 18, 2026
65d1230
fix perf regressions
d-v-b Feb 19, 2026
e24fe7e
Merge branch 'perf/faster-codecs' of https://github.com/d-v-b/zarr-py…
d-v-b Feb 19, 2026
f979eaa
add partial encode / decode
d-v-b Feb 19, 2026
a934899
add sync hotpath
d-v-b Feb 19, 2026
b53ac3e
add comments and documentation
d-v-b Feb 19, 2026
73ac845
refactor sharding to allow sync
d-v-b Feb 19, 2026
aeecda8
fix array spec propagation
d-v-b Feb 19, 2026
69172fb
fix countingdict tests
d-v-b Feb 19, 2026
28d0def
update design doc
d-v-b Feb 19, 2026
f8e39e6
dynamic pool allocation
d-v-b Feb 19, 2026
b388911
default to 1 itemsize for data types that don't declare it
d-v-b Feb 19, 2026
7e29ef3
Merge branch 'main' into perf/faster-codecs
d-v-b Feb 19, 2026
00dde0b
Merge branch 'main' into perf/faster-codecs
d-v-b Feb 19, 2026
9d77ca5
remove extra codec pipeline
d-v-b Feb 19, 2026
88a4875
remove garbage
d-v-b Feb 19, 2026
284e5e2
lint
d-v-b Feb 19, 2026
b1b876a
use protocols for new sync behavior
d-v-b Feb 20, 2026
6996284
remove batch size parameter; add changelog entry
d-v-b Feb 20, 2026
204dda1
prune dead code, make protocols useful
d-v-b Feb 20, 2026
e9db616
restore batch size but it's only there for warnings
d-v-b Feb 20, 2026
01e1f73
fix type hints, prevent thread pool leakage, make codec pipeline intr…
d-v-b Feb 20, 2026
fbde3af
Merge branch 'main' of https://github.com/zarr-developers/zarr-python…
d-v-b Feb 20, 2026
11534b0
restore old comments / docstrings
d-v-b Feb 20, 2026
b40d53a
simplify threadpool management
d-v-b Feb 20, 2026
83c1dc1
use isinstance instead of explicit list of codec names
d-v-b Feb 21, 2026
e8a0cc6
consolidate thread pool configuration
d-v-b Feb 21, 2026
9a1d5eb
Merge branch 'main' of https://github.com/zarr-developers/zarr-python…
d-v-b Feb 21, 2026
9071954
Merge remote-tracking branch 'origin/main' into perf/smarter-codecs
d-v-b Feb 25, 2026
3297e0d
execute performance improvement plan
d-v-b Feb 25, 2026
0766289
Merge branch 'main' into perf/smarter-codecs
d-v-b Feb 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions changes/3715.misc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Added several performance optimizations to chunk encoding and decoding. Low-latency stores that do not benefit from
`async` operations can now implement synchronous IO methods which will be used when available during chunk processing.
Similarly, codecs can implement a synchronous API which will be used if available during chunk processing.
These changes remove unnecessary interactions with the event loop.

The synchronous chunk processing path optionally uses a thread pool to parallelize codec work across chunks.
The pool is skipped for single-chunk operations and for pipelines that only contain cheap codecs (e.g. endian
swap, transpose, checksum).

Use of the thread pool can be disabled in the global configuration. The minimum number of threads
and the maximum number of threads can be set via the configuration as well.
252 changes: 250 additions & 2 deletions src/zarr/abc/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

from abc import abstractmethod
from collections.abc import Mapping
from typing import TYPE_CHECKING, Generic, TypeGuard, TypeVar
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Generic, Protocol, TypeGuard, TypeVar, runtime_checkable

from typing_extensions import ReadOnly, TypedDict

Expand All @@ -19,7 +20,7 @@
from zarr.core.array_spec import ArraySpec
from zarr.core.chunk_grids import ChunkGrid
from zarr.core.dtype.wrapper import TBaseDType, TBaseScalar, ZDType
from zarr.core.indexing import SelectorTuple
from zarr.core.indexing import ChunkProjection, SelectorTuple
from zarr.core.metadata import ArrayMetadata

__all__ = [
Expand All @@ -32,6 +33,8 @@
"CodecInput",
"CodecOutput",
"CodecPipeline",
"PreparedWrite",
"SupportsSyncCodec",
]

CodecInput = TypeVar("CodecInput", bound=NDBuffer | Buffer)
Expand Down Expand Up @@ -59,6 +62,19 @@ def _check_codecjson_v2(data: object) -> TypeGuard[CodecJSON_V2[str]]:
"""The widest type of JSON-like input that could specify a codec."""


@runtime_checkable
class SupportsSyncCodec(Protocol):
"""Protocol for codecs that support synchronous encode/decode."""

def _decode_sync(
self, chunk_data: NDBuffer | Buffer, chunk_spec: ArraySpec
) -> NDBuffer | Buffer: ...

def _encode_sync(
self, chunk_data: NDBuffer | Buffer, chunk_spec: ArraySpec
) -> NDBuffer | Buffer | None: ...


class BaseCodec(Metadata, Generic[CodecInput, CodecOutput]):
"""Generic base class for codecs.

Expand Down Expand Up @@ -186,9 +202,188 @@ class ArrayArrayCodec(BaseCodec[NDBuffer, NDBuffer]):
"""Base class for array-to-array codecs."""


def _is_complete_selection(selection: Any, shape: tuple[int, ...]) -> bool:
"""Check whether a chunk selection covers the entire chunk shape."""
if not isinstance(selection, tuple):
selection = (selection,)
for sel, dim_len in zip(selection, shape, strict=False):
if isinstance(sel, int):
if dim_len != 1:
return False
elif isinstance(sel, slice):
start, stop, step = sel.indices(dim_len)
if not (start == 0 and stop == dim_len and step == 1):
return False
else:
return False
return True


@dataclass
class PreparedWrite:
"""Result of prepare_write: existing encoded chunk bytes + selection info."""

chunk_dict: dict[tuple[int, ...], Buffer | None]
inner_codec_chain: Any # CodecChain
inner_chunk_spec: ArraySpec
indexer: list[ChunkProjection]
value_selection: SelectorTuple | None = None
# If not None, slice value with this before using inner out_selections.
# For sharding: the outer out_selection from batch_info.
# For non-sharded: None (inner out_selection IS the outer out_selection).
write_full_shard: bool = True
# True when the entire shard blob will be written from scratch (either
# because the shard doesn't exist yet or because the selection is complete).
# Used by ShardingCodec.finalize_write to decide between set vs set_range.
is_complete_shard: bool = False
# True when the outer selection covers the entire shard. When True,
# the indexer is empty and finalize_write receives the shard value
# via shard_data. The codec then encodes the full shard in one shot
# rather than iterating over individual inner chunks.
shard_data: NDBuffer | None = None
# The full shard value for complete-selection writes. Set by the pipeline
# when is_complete_shard is True, before calling finalize_write.


class ArrayBytesCodec(BaseCodec[NDBuffer, Buffer]):
"""Base class for array-to-bytes codecs."""

@property
def inner_codec_chain(self) -> Any:
"""The codec chain for decoding inner chunks after deserialization.

Returns None by default — the pipeline should use its own codec_chain.
ShardingCodec overrides to return its inner codec chain.
"""
return None

def deserialize(
self, raw: Buffer | None, chunk_spec: ArraySpec
) -> dict[tuple[int, ...], Buffer | None]:
"""Pure compute: unpack stored bytes into per-inner-chunk buffers.

Default implementation: single chunk at (0,).
ShardingCodec overrides to decode shard index and slice blob into per-chunk buffers.
"""
return {(0,): raw}

def serialize(
self, chunk_dict: dict[tuple[int, ...], Buffer | None], chunk_spec: ArraySpec
) -> Buffer | None:
"""Pure compute: pack per-inner-chunk buffers into a storage blob.

Default implementation: return the single chunk's bytes (or None if absent).
ShardingCodec overrides to concatenate chunks + build index.
Returns None if all chunks are empty (caller should delete the key).
"""
return chunk_dict.get((0,))

def prepare_read_sync(
self,
byte_getter: Any,
chunk_spec: ArraySpec,
chunk_selection: SelectorTuple,
codec_chain: Any,
aa_chain: Any,
ab_pair: Any,
bb_chain: Any,
) -> NDBuffer | None:
"""IO + full decode for the selected region. Returns decoded sub-array."""
raw = byte_getter.get_sync(prototype=chunk_spec.prototype)
chunk_array: NDBuffer | None = codec_chain.decode_chunk(
raw, chunk_spec, aa_chain, ab_pair, bb_chain
)
if chunk_array is not None:
return chunk_array[chunk_selection]
return None

def prepare_write_sync(
self,
byte_setter: Any,
chunk_spec: ArraySpec,
chunk_selection: SelectorTuple,
out_selection: SelectorTuple,
codec_chain: Any,
) -> PreparedWrite:
"""IO + deserialize. Returns PreparedWrite for the pipeline to decode/merge/encode."""
is_complete = _is_complete_selection(chunk_selection, chunk_spec.shape)
existing: Buffer | None = None
if not is_complete:
existing = byte_setter.get_sync(prototype=chunk_spec.prototype)
chunk_dict = self.deserialize(existing, chunk_spec)
inner_chain = self.inner_codec_chain or codec_chain
return PreparedWrite(
chunk_dict=chunk_dict,
inner_codec_chain=inner_chain,
inner_chunk_spec=chunk_spec,
indexer=[((0,), chunk_selection, out_selection, is_complete)], # type: ignore[list-item]
)

async def prepare_read(
self,
byte_getter: Any,
chunk_spec: ArraySpec,
chunk_selection: SelectorTuple,
codec_chain: Any,
aa_chain: Any,
ab_pair: Any,
bb_chain: Any,
) -> NDBuffer | None:
"""Async IO + full decode for the selected region. Returns decoded sub-array."""
raw = await byte_getter.get(prototype=chunk_spec.prototype)
chunk_array: NDBuffer | None = codec_chain.decode_chunk(
raw, chunk_spec, aa_chain, ab_pair, bb_chain
)
if chunk_array is not None:
return chunk_array[chunk_selection]
return None

async def prepare_write(
self,
byte_setter: Any,
chunk_spec: ArraySpec,
chunk_selection: SelectorTuple,
out_selection: SelectorTuple,
codec_chain: Any,
) -> PreparedWrite:
"""Async IO + deserialize. Returns PreparedWrite for the pipeline to decode/merge/encode."""
is_complete = _is_complete_selection(chunk_selection, chunk_spec.shape)
existing: Buffer | None = None
if not is_complete:
existing = await byte_setter.get(prototype=chunk_spec.prototype)
chunk_dict = self.deserialize(existing, chunk_spec)
inner_chain = self.inner_codec_chain or codec_chain
return PreparedWrite(
chunk_dict=chunk_dict,
inner_codec_chain=inner_chain,
inner_chunk_spec=chunk_spec,
indexer=[((0,), chunk_selection, out_selection, is_complete)], # type: ignore[list-item]
)

def finalize_write_sync(
self, prepared: PreparedWrite, chunk_spec: ArraySpec, byte_setter: Any
) -> None:
"""Serialize prepared chunk_dict and write to store.

Default: serialize to a single blob and call set (or delete if all empty).
ShardingCodec overrides this for byte-range writes when inner codecs are fixed-size.
"""
blob = self.serialize(prepared.chunk_dict, chunk_spec)
if blob is None:
byte_setter.delete_sync()
else:
byte_setter.set_sync(blob)

async def finalize_write(
self, prepared: PreparedWrite, chunk_spec: ArraySpec, byte_setter: Any
) -> None:
"""Async version of finalize_write_sync."""
blob = self.serialize(prepared.chunk_dict, chunk_spec)
if blob is None:
await byte_setter.delete()
else:
await byte_setter.set(blob)


class BytesBytesCodec(BaseCodec[Buffer, Buffer]):
"""Base class for bytes-to-bytes codecs."""
Expand Down Expand Up @@ -459,6 +654,59 @@ async def write(
"""
...

# -------------------------------------------------------------------
# Fully synchronous read/write (opt-in)
#
# When a CodecPipeline subclass can run the entire read/write path
# (store IO + codec compute + buffer scatter) without touching the
# event loop, it overrides these methods and sets supports_sync_io
# to True. This lets Array selection methods bypass sync() entirely.
#
# The default implementations raise NotImplementedError.
# BatchedCodecPipeline overrides these when all codecs support sync.
# -------------------------------------------------------------------

@property
def supports_sync_io(self) -> bool:
"""Whether this pipeline can run read/write entirely on the calling thread.

True when:
- All codecs implement ``SupportsSyncCodec``
- The pipeline's read_sync/write_sync methods are implemented

Checked by ``Array._can_use_sync_path()`` to decide whether to bypass
the ``sync()`` event-loop bridge.
"""
return False

def read_sync(
self,
batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple, bool]],
out: NDBuffer,
drop_axes: tuple[int, ...] = (),
) -> None:
"""Synchronous read: fetch bytes from store, decode, scatter into out.

Runs entirely on the calling thread. Only available when
``supports_sync_io`` is True. Called by ``_get_selection_sync`` in
``array.py`` when the sync bypass is active.
"""
raise NotImplementedError

def write_sync(
self,
batch_info: Iterable[tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple, bool]],
value: NDBuffer,
drop_axes: tuple[int, ...] = (),
) -> None:
"""Synchronous write: gather from value, encode, persist to store.

Runs entirely on the calling thread. Only available when
``supports_sync_io`` is True. Called by ``_set_selection_sync`` in
``array.py`` when the sync bypass is active.
"""
raise NotImplementedError


async def _batching_helper(
func: Callable[[CodecInput, ArraySpec], Awaitable[CodecOutput | None]],
Expand Down
24 changes: 23 additions & 1 deletion src/zarr/abc/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@

from zarr.core.buffer import Buffer, BufferPrototype

__all__ = ["ByteGetter", "ByteSetter", "Store", "set_or_delete"]
__all__ = [
"ByteGetter",
"ByteSetter",
"Store",
"set_or_delete",
]


@dataclass
Expand Down Expand Up @@ -466,6 +471,21 @@ async def set(self, key: str, value: Buffer) -> None:
"""
...

async def set_range(self, key: str, value: Buffer, start: int) -> None:
"""Write ``value`` into an existing key beginning at byte offset ``start``.

The key must already exist and ``start + len(value)`` must not exceed
the current size of the stored value.

Parameters
----------
key : str
value : Buffer
start : int
Byte offset at which to begin writing.
"""
raise NotImplementedError(f"{type(self).__name__} does not support set_range")

async def set_if_not_exists(self, key: str, value: Buffer) -> None:
"""
Store a key to ``value`` if the key is not already present.
Expand Down Expand Up @@ -695,6 +715,8 @@ async def get(

async def set(self, value: Buffer) -> None: ...

async def set_range(self, value: Buffer, start: int) -> None: ...

async def delete(self) -> None: ...

async def set_if_not_exists(self, default: Buffer) -> None: ...
Expand Down
9 changes: 7 additions & 2 deletions src/zarr/api/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,9 @@ async def open(
is_v3_array = zarr_format == 3 and _metadata_dict.get("node_type") == "array"
if is_v3_array or zarr_format == 2:
return AsyncArray(
store_path=store_path, metadata=_metadata_dict, config=kwargs.get("config")
store_path=store_path,
metadata=_metadata_dict,
config=kwargs.get("config"),
)
except (AssertionError, FileNotFoundError, NodeTypeValidationError):
pass
Expand Down Expand Up @@ -1279,7 +1281,10 @@ async def open_array(
_warn_write_empty_chunks_kwarg()

try:
return await AsyncArray.open(store_path, zarr_format=zarr_format)
return await AsyncArray.open(
store_path,
zarr_format=zarr_format,
)
except FileNotFoundError as err:
if not store_path.read_only and mode in _CREATE_MODES:
overwrite = _infer_overwrite(mode)
Expand Down
Loading
Loading