angzarr_client.client

Client implementations for Angzarr gRPC services.

Classes

TransportMode

Transport mode for gRPC connections.

QueryClient

Client for the EventQueryService.

CommandHandlerClient

Client for the CommandHandlerCoordinatorService.

SpeculativeClient

Client for speculative operations across coordinator services.

DomainClient

Combined client for command handler, query, and speculative operations on a single domain.

Functions

resolve_ch_endpoint(→ str)

Resolve domain to command handler coordinator endpoint.

Module Contents

class angzarr_client.client.TransportMode(*args, **kwds)

Bases: enum.Enum

Transport mode for gRPC connections.

STANDALONE = 'standalone'
DISTRIBUTED = 'distributed'
angzarr_client.client.resolve_ch_endpoint(domain: str, mode: TransportMode | None = None, *, uds_base: str = '/tmp/angzarr', namespace: str = 'angzarr', port: int = 1310) str

Resolve domain to command handler coordinator endpoint.

Parameters:
  • domain – The domain name (e.g., “player”, “table”, “hand”)

  • mode – Transport mode. If None, detected from ANGZARR_MODE env var.

  • uds_base – Base path for Unix Domain Sockets (standalone mode)

  • namespace – Kubernetes namespace (distributed mode)

  • port – gRPC port (distributed mode)

Returns:

  • Standalone: /tmp/angzarr/ch-player.sock

  • Distributed: ch-player.angzarr.svc:1310

Return type:

Endpoint string suitable for _create_channel

Environment Variables:

ANGZARR_MODE: “standalone” or “distributed” (default: “distributed”) ANGZARR_UDS_BASE: Override uds_base (default: /tmp/angzarr) ANGZARR_NAMESPACE: Override namespace (default: angzarr) ANGZARR_CH_PORT: Override port (default: 1310)

class angzarr_client.client.QueryClient(channel: grpc.Channel, owns_channel: bool = True)

Client for the EventQueryService.

classmethod connect(endpoint: str, retry: angzarr_client.retry.RetryPolicy | None = None) QueryClient

Connect to an event query service at the given endpoint.

Parameters:
  • endpoint – The gRPC endpoint (host:port or UDS path).

  • retry – Optional retry policy. Defaults to exponential backoff.

classmethod from_channel(channel: grpc.Channel) QueryClient

Create a client from a caller-managed channel.

The returned client will not close the channel when close() is called; the caller retains ownership.

classmethod from_env(env_var: str, default: str) QueryClient

Connect using an environment variable with fallback.

classmethod from_stub(stub) QueryClient

Compose a QueryClient from a pre-built gRPC stub.

Bypasses channel construction so tests can inject a fake stub (see tests/_fakes.py::RecordingStub) without opening a socket. The returned client does not own a channel; close() is a no-op (PARITY_AUDIT.md plan item P1.12.e / finding #24).

get_event_book(query: angzarr_client.proto.angzarr.Query, timeout: float | None = None) angzarr_client.proto.angzarr.EventBook

Retrieve a single EventBook for the query.

Parameters:
  • query – The query specification.

  • timeout – Optional per-call deadline in seconds.

Audit #69 stage (b): attaches x-correlation-id metadata from query.cover.correlation_id so OTel exporters / mesh sidecars can filter on it without decoding the body. Send-only.

get_events(query: angzarr_client.proto.angzarr.Query, timeout: float | None = None) list[angzarr_client.proto.angzarr.EventBook]

Retrieve all EventBooks matching the query.

Parameters:
  • query – The query specification.

  • timeout – Optional per-call deadline in seconds.

query(domain: str, root: uuid.UUID) angzarr_client.builder.QueryBuilder

Start building a query for a specific aggregate.

query_domain(domain: str) angzarr_client.builder.QueryBuilder

Start building a query by domain only (use with by_correlation_id).

close() None

Close the underlying channel if this client owns it.

class angzarr_client.client.CommandHandlerClient(channel: grpc.Channel, owns_channel: bool = True)

Client for the CommandHandlerCoordinatorService.

classmethod connect(endpoint: str, retry: angzarr_client.retry.RetryPolicy | None = None) CommandHandlerClient

Connect to a command handler coordinator at the given endpoint.

Parameters:
  • endpoint – The gRPC endpoint (host:port or UDS path).

  • retry – Optional retry policy. Defaults to exponential backoff.

classmethod from_channel(channel: grpc.Channel) CommandHandlerClient

Create a client from a caller-managed channel.

The returned client will not close the channel when close() is called; the caller retains ownership.

classmethod from_env(env_var: str, default: str) CommandHandlerClient

Connect using an environment variable with fallback.

classmethod from_stub(stub) CommandHandlerClient

Compose a CommandHandlerClient from a pre-built gRPC stub.

See QueryClient.from_stub() for the test-only seam used to inject tests/_fakes.py::RecordingStub. P1.12.e / finding #24.

handle(command: angzarr_client.proto.angzarr.CommandBook, timeout: float | None = None) angzarr_client.proto.angzarr.CommandResponse

Execute a command with default async mode.

This is a convenience method that wraps the command in a CommandRequest with SYNC_MODE_ASYNC (fire-and-forget) and CASCADE_ERROR_FAIL_FAST.

Parameters:
  • command – The command to execute.

  • timeout – Optional per-call deadline in seconds.

handle_command(request: angzarr_client.proto.angzarr.CommandRequest, timeout: float | None = None) angzarr_client.proto.angzarr.CommandResponse

Execute a command with the specified sync mode.

Parameters:
  • request – The command request.

  • timeout – Optional per-call deadline in seconds.

Audit #69 stage (b): attaches x-correlation-id metadata from the canonical request.command.cover.correlation_id.

handle_sync_speculative(request: angzarr_client.proto.angzarr.SpeculateCommandHandlerRequest, timeout: float | None = None) angzarr_client.proto.angzarr.CommandResponse

Execute a command speculatively against temporal state (no persistence).

Parameters:
  • request – The speculative command request.

  • timeout – Optional per-call deadline in seconds.

command(domain: str, root: uuid.UUID) angzarr_client.builder.CommandBuilder

Start building a command for an existing aggregate.

command_new(domain: str) angzarr_client.builder.CommandBuilder

Start building a command for a new aggregate.

Materializes a fresh UUID v4 client-side and passes it explicitly to CommandBuilder. Audit #67: root is always client-assigned, no path exists to skip it.

close() None

Close the underlying channel if this client owns it.

class angzarr_client.client.SpeculativeClient(channel: grpc.Channel, owns_channel: bool = True)

Client for speculative operations across coordinator services.

Speculative execution runs commands/events against temporal state without persistence. Each coordinator service now provides its own speculative method.

classmethod connect(endpoint: str, retry: angzarr_client.retry.RetryPolicy | None = None) SpeculativeClient

Connect to coordinator services at the given endpoint.

Parameters:
  • endpoint – The gRPC endpoint (host:port or UDS path).

  • retry – Optional retry policy. Defaults to exponential backoff.

classmethod from_channel(channel: grpc.Channel) SpeculativeClient

Create a client from a caller-managed channel.

The returned client will not close the channel when close() is called; the caller retains ownership.

classmethod from_env(env_var: str, default: str) SpeculativeClient

Connect using an environment variable with fallback.

classmethod from_stubs(command_handler_stub, saga_stub, projector_stub, pm_stub) SpeculativeClient

Compose a SpeculativeClient from pre-built gRPC stubs.

Bypasses channel construction; the four stubs back the four speculative methods (command_handler, saga, projector, process_manager). Test-only seam — see tests/_fakes.py::RecordingStub. P1.12.e / finding #24.

command_handler(request: angzarr_client.proto.angzarr.SpeculateCommandHandlerRequest, timeout: float | None = None) angzarr_client.proto.angzarr.CommandResponse

Execute a command speculatively against temporal state.

Audit #69 stage (b): canonical correlation_id at request.command.cover.correlation_id.

projector(request: angzarr_client.proto.angzarr.SpeculateProjectorRequest, timeout: float | None = None) angzarr_client.proto.angzarr.Projection

Speculatively execute a projector against events.

Audit #69 stage (b): canonical correlation_id at request.events.cover.correlation_id.

saga(request: angzarr_client.proto.angzarr.SpeculateSagaRequest, timeout: float | None = None) angzarr_client.proto.angzarr.SagaResponse

Speculatively execute a saga against events.

Audit #69 stage (b): canonical correlation_id at request.request.source.cover.correlation_id.

process_manager(request: angzarr_client.proto.angzarr.SpeculatePmRequest, timeout: float | None = None) angzarr_client.proto.angzarr.ProcessManagerHandleResponse

Speculatively execute a process manager.

Audit #69 stage (b): canonical correlation_id at request.request.trigger.cover.correlation_id.

close() None

Close the underlying channel if this client owns it.

class angzarr_client.client.DomainClient(channel: grpc.Channel, owns_channel: bool = True)

Combined client for command handler, query, and speculative operations on a single domain.

command_handler
query
speculative
classmethod connect(endpoint: str, retry: angzarr_client.retry.RetryPolicy | None = None) DomainClient

Connect to a domain’s coordinator at the given endpoint.

Parameters:
  • endpoint – The gRPC endpoint (host:port or UDS path).

  • retry – Optional retry policy. Defaults to exponential backoff.

classmethod from_channel(channel: grpc.Channel) DomainClient

Create a client from a caller-managed channel.

The returned client will not close the channel when close() is called; the caller retains ownership.

classmethod from_clients(command_handler: CommandHandlerClient, query: QueryClient, speculative: SpeculativeClient) DomainClient

Compose a DomainClient from already-constructed wrapped clients.

Bypasses channel construction — the wrapped clients hold their own channels (or are duck-typed test doubles). The DomainClient does not own the channel; close() is a no-op for the composed instance.

Used by cucumber/unit tests to inject recording mocks without spinning up a gRPC server (PARITY_AUDIT.md plan item P1.12.e / finding #24).

classmethod for_domain(domain: str, mode: TransportMode | None = None) DomainClient

Connect to a domain’s command handler coordinator.

Resolves the domain name to the appropriate endpoint based on transport mode.

Parameters:
  • domain – Domain name (e.g., “player”, “table”)

  • mode – Transport mode (standalone=UDS, distributed=K8s DNS). If None, detected from ANGZARR_MODE env var.

Returns:

DomainClient connected to the domain’s command handler coordinator.

Examples

# Auto-detect mode from ANGZARR_MODE env var player = DomainClient.for_domain(“player”)

# Explicitly use standalone mode (Unix Domain Sockets) player = DomainClient.for_domain(“player”, TransportMode.STANDALONE)

# Explicitly use distributed mode (K8s DNS) player = DomainClient.for_domain(“player”, TransportMode.DISTRIBUTED)

classmethod from_env(env_var: str, default: str) DomainClient

Connect using an environment variable with fallback.

execute(command: angzarr_client.proto.angzarr.CommandBook, timeout: float | None = None) angzarr_client.proto.angzarr.CommandResponse

Execute a command with default async mode (fire-and-forget).

execute_with_mode(command: angzarr_client.proto.angzarr.CommandBook, sync_mode: angzarr_client.proto.angzarr.SyncMode, timeout: float | None = None) angzarr_client.proto.angzarr.CommandResponse

Execute a command with the specified sync mode.

Parameters:
  • command – The command to execute.

  • sync_mode – Execution mode (ASYNC, SIMPLE, or CASCADE).

  • timeout – Optional per-call deadline in seconds.

get_event_book(query: angzarr_client.proto.angzarr.Query, timeout: float | None = None) angzarr_client.proto.angzarr.EventBook

Retrieve a single EventBook for the query — unary RPC.

Delegates to the underlying QueryClient. Mirrors Rust’s DomainClient::get_event_book.

get_events(query: angzarr_client.proto.angzarr.Query, timeout: float | None = None) list[angzarr_client.proto.angzarr.EventBook]

Retrieve all matching EventBooks for the query — streaming RPC.

Delegates to the underlying QueryClient. Mirrors Rust’s DomainClient::get_events.

close() None

Close the underlying channel if this client owns it.