Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion src/zarr/abc/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from abc import abstractmethod
from collections.abc import Mapping
from typing import TYPE_CHECKING, Generic, TypeGuard, TypeVar
from typing import TYPE_CHECKING, Generic, Protocol, TypeGuard, TypeVar, runtime_checkable

from typing_extensions import ReadOnly, TypedDict

Expand Down Expand Up @@ -32,6 +32,7 @@
"CodecInput",
"CodecOutput",
"CodecPipeline",
"SupportsSyncCodec",
]

CodecInput = TypeVar("CodecInput", bound=NDBuffer | Buffer)
Expand Down Expand Up @@ -59,6 +60,23 @@ def _check_codecjson_v2(data: object) -> TypeGuard[CodecJSON_V2[str]]:
"""The widest type of JSON-like input that could specify a codec."""


@runtime_checkable
class SupportsSyncCodec(Protocol):
"""Protocol for codecs that support synchronous encode/decode.

Codecs implementing this protocol provide ``_decode_sync`` and ``_encode_sync``
methods that perform encoding/decoding without requiring an async event loop.
"""

def _decode_sync(
self, chunk_data: NDBuffer | Buffer, chunk_spec: ArraySpec
) -> NDBuffer | Buffer: ...

def _encode_sync(
self, chunk_data: NDBuffer | Buffer, chunk_spec: ArraySpec
) -> NDBuffer | Buffer | None: ...


class BaseCodec(Metadata, Generic[CodecInput, CodecOutput]):
"""Generic base class for codecs.

Expand Down
44 changes: 43 additions & 1 deletion src/zarr/abc/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,17 @@

from zarr.core.buffer import Buffer, BufferPrototype

__all__ = ["ByteGetter", "ByteSetter", "Store", "set_or_delete"]
__all__ = [
"ByteGetter",
"ByteSetter",
"Store",
"SupportsDeleteSync",
"SupportsGetSync",
"SupportsSetRangeSync",
"SupportsSetSync",
"SupportsSyncStore",
"set_or_delete",
]


@dataclass
Expand Down Expand Up @@ -700,6 +710,38 @@ async def delete(self) -> None: ...
async def set_if_not_exists(self, default: Buffer) -> None: ...


@runtime_checkable
class SupportsGetSync(Protocol):
def get_sync(
self,
key: str,
*,
prototype: BufferPrototype | None = None,
byte_range: ByteRequest | None = None,
) -> Buffer | None: ...


@runtime_checkable
class SupportsSetSync(Protocol):
def set_sync(self, key: str, value: Buffer) -> None: ...


@runtime_checkable
class SupportsSetRangeSync(Protocol):
def set_range_sync(self, key: str, value: Buffer, start: int) -> None: ...


@runtime_checkable
class SupportsDeleteSync(Protocol):
def delete_sync(self, key: str) -> None: ...


@runtime_checkable
class SupportsSyncStore(
SupportsGetSync, SupportsSetSync, SupportsSetRangeSync, SupportsDeleteSync, Protocol
): ...


async def set_or_delete(byte_setter: ByteSetter, value: Buffer | None) -> None:
"""Set or delete a value in a byte setter

Expand Down
27 changes: 17 additions & 10 deletions src/zarr/codecs/blosc.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,28 +299,35 @@ def _blosc_codec(self) -> Blosc:
config_dict["typesize"] = self.typesize
return Blosc.from_config(config_dict)

def _decode_sync(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer:
return as_numpy_array_wrapper(self._blosc_codec.decode, chunk_bytes, chunk_spec.prototype)

async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer:
return await asyncio.to_thread(
as_numpy_array_wrapper, self._blosc_codec.decode, chunk_bytes, chunk_spec.prototype
return await asyncio.to_thread(self._decode_sync, chunk_bytes, chunk_spec)

def _encode_sync(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
return chunk_spec.prototype.buffer.from_bytes(
self._blosc_codec.encode(chunk_bytes.as_numpy_array())
)

async def _encode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
# Since blosc only support host memory, we convert the input and output of the encoding
# between numpy array and buffer
return await asyncio.to_thread(
lambda chunk: chunk_spec.prototype.buffer.from_bytes(
self._blosc_codec.encode(chunk.as_numpy_array())
),
chunk_bytes,
)
return await asyncio.to_thread(self._encode_sync, chunk_bytes, chunk_spec)

def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int:
raise NotImplementedError
18 changes: 16 additions & 2 deletions src/zarr/codecs/bytes.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
)
return self

async def _decode_single(
def _decode_sync(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
Expand All @@ -88,7 +88,14 @@ async def _decode_single(
)
return chunk_array

async def _encode_single(
async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
return self._decode_sync(chunk_bytes, chunk_spec)

def _encode_sync(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
Expand All @@ -109,5 +116,12 @@ async def _encode_single(
nd_array = nd_array.ravel().view(dtype="B")
return chunk_spec.prototype.buffer.from_array_like(nd_array)

async def _encode_single(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
return self._encode_sync(chunk_array, chunk_spec)

def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int:
return input_byte_length
18 changes: 16 additions & 2 deletions src/zarr/codecs/crc32c_.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def from_dict(cls, data: dict[str, JSON]) -> Self:
def to_dict(self) -> dict[str, JSON]:
return {"name": "crc32c"}

async def _decode_single(
def _decode_sync(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
Expand All @@ -51,7 +51,14 @@ async def _decode_single(
)
return chunk_spec.prototype.buffer.from_array_like(inner_bytes)

async def _encode_single(
async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer:
return self._decode_sync(chunk_bytes, chunk_spec)

def _encode_sync(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
Expand All @@ -64,5 +71,12 @@ async def _encode_single(
# Append the checksum (as bytes) to the data
return chunk_spec.prototype.buffer.from_array_like(np.append(data, checksum.view("B")))

async def _encode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
return self._encode_sync(chunk_bytes, chunk_spec)

def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int:
return input_byte_length + 4
27 changes: 21 additions & 6 deletions src/zarr/codecs/gzip.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
from dataclasses import dataclass
from functools import cached_property
from typing import TYPE_CHECKING

from numcodecs.gzip import GZip
Expand Down Expand Up @@ -48,23 +49,37 @@ def from_dict(cls, data: dict[str, JSON]) -> Self:
def to_dict(self) -> dict[str, JSON]:
return {"name": "gzip", "configuration": {"level": self.level}}

@cached_property
def _gzip_codec(self) -> GZip:
return GZip(self.level)

def _decode_sync(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer:
return as_numpy_array_wrapper(self._gzip_codec.decode, chunk_bytes, chunk_spec.prototype)

async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer:
return await asyncio.to_thread(
as_numpy_array_wrapper, GZip(self.level).decode, chunk_bytes, chunk_spec.prototype
)
return await asyncio.to_thread(self._decode_sync, chunk_bytes, chunk_spec)

def _encode_sync(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
return as_numpy_array_wrapper(self._gzip_codec.encode, chunk_bytes, chunk_spec.prototype)

async def _encode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
return await asyncio.to_thread(
as_numpy_array_wrapper, GZip(self.level).encode, chunk_bytes, chunk_spec.prototype
)
return await asyncio.to_thread(self._encode_sync, chunk_bytes, chunk_spec)

def compute_encoded_size(
self,
Expand Down
20 changes: 17 additions & 3 deletions src/zarr/codecs/transpose.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,34 @@ def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec:
prototype=chunk_spec.prototype,
)

async def _decode_single(
def _decode_sync(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
inverse_order = np.argsort(self.order)
inverse_order = tuple(int(i) for i in np.argsort(self.order))
return chunk_array.transpose(inverse_order)

async def _encode_single(
async def _decode_single(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
return self._decode_sync(chunk_array, chunk_spec)

def _encode_sync(
self,
chunk_array: NDBuffer,
_chunk_spec: ArraySpec,
) -> NDBuffer | None:
return chunk_array.transpose(self.order)

async def _encode_single(
self,
chunk_array: NDBuffer,
_chunk_spec: ArraySpec,
) -> NDBuffer | None:
return self._encode_sync(chunk_array, _chunk_spec)

def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int:
return input_byte_length
37 changes: 32 additions & 5 deletions src/zarr/codecs/vlen_utf8.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ def to_dict(self) -> dict[str, JSON]:
def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
return self

# TODO: expand the tests for this function
async def _decode_single(
def _decode_sync(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
Expand All @@ -55,7 +54,14 @@ async def _decode_single(
as_string_dtype = decoded.astype(chunk_spec.dtype.to_native_dtype(), copy=False)
return chunk_spec.prototype.nd_buffer.from_numpy_array(as_string_dtype)

async def _encode_single(
async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
return self._decode_sync(chunk_bytes, chunk_spec)

def _encode_sync(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
Expand All @@ -65,6 +71,13 @@ async def _encode_single(
_vlen_utf8_codec.encode(chunk_array.as_numpy_array())
)

async def _encode_single(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
return self._encode_sync(chunk_array, chunk_spec)

def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int:
# what is input_byte_length for an object dtype?
raise NotImplementedError("compute_encoded_size is not implemented for VLen codecs")
Expand All @@ -86,7 +99,7 @@ def to_dict(self) -> dict[str, JSON]:
def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
return self

async def _decode_single(
def _decode_sync(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
Expand All @@ -99,7 +112,14 @@ async def _decode_single(
decoded = _reshape_view(decoded, chunk_spec.shape)
return chunk_spec.prototype.nd_buffer.from_numpy_array(decoded)

async def _encode_single(
async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
return self._decode_sync(chunk_bytes, chunk_spec)

def _encode_sync(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
Expand All @@ -109,6 +129,13 @@ async def _encode_single(
_vlen_bytes_codec.encode(chunk_array.as_numpy_array())
)

async def _encode_single(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
return self._encode_sync(chunk_array, chunk_spec)

def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int:
# what is input_byte_length for an object dtype?
raise NotImplementedError("compute_encoded_size is not implemented for VLen codecs")
Loading
Loading