angzarr_client.client¶
Client implementations for Angzarr gRPC services.
Classes¶
Transport mode for gRPC connections. |
|
Client for the EventQueryService. |
|
Client for the CommandHandlerCoordinatorService. |
|
Client for speculative operations across coordinator services. |
|
Combined client for command handler, query, and speculative operations on a single domain. |
Functions¶
|
Resolve domain to command handler coordinator endpoint. |
Module Contents¶
- class angzarr_client.client.TransportMode(*args, **kwds)¶
Bases:
enum.EnumTransport mode for gRPC connections.
- STANDALONE = 'standalone'¶
- DISTRIBUTED = 'distributed'¶
- angzarr_client.client.resolve_ch_endpoint(domain: str, mode: TransportMode | None = None, *, uds_base: str = '/tmp/angzarr', namespace: str = 'angzarr', port: int = 1310) str¶
Resolve domain to command handler coordinator endpoint.
- Parameters:
domain – The domain name (e.g., “player”, “table”, “hand”)
mode – Transport mode. If None, detected from ANGZARR_MODE env var.
uds_base – Base path for Unix Domain Sockets (standalone mode)
namespace – Kubernetes namespace (distributed mode)
port – gRPC port (distributed mode)
- Returns:
Standalone: /tmp/angzarr/ch-player.sock
Distributed: ch-player.angzarr.svc:1310
- Return type:
Endpoint string suitable for _create_channel
- Environment Variables:
ANGZARR_MODE: “standalone” or “distributed” (default: “distributed”) ANGZARR_UDS_BASE: Override uds_base (default: /tmp/angzarr) ANGZARR_NAMESPACE: Override namespace (default: angzarr) ANGZARR_CH_PORT: Override port (default: 1310)
- class angzarr_client.client.QueryClient(channel: grpc.Channel, owns_channel: bool = True)¶
Client for the EventQueryService.
- classmethod connect(endpoint: str, retry: angzarr_client.retry.RetryPolicy | None = None) QueryClient¶
Connect to an event query service at the given endpoint.
- Parameters:
endpoint – The gRPC endpoint (host:port or UDS path).
retry – Optional retry policy. Defaults to exponential backoff.
- classmethod from_channel(channel: grpc.Channel) QueryClient¶
Create a client from a caller-managed channel.
The returned client will not close the channel when close() is called; the caller retains ownership.
- classmethod from_env(env_var: str, default: str) QueryClient¶
Connect using an environment variable with fallback.
- classmethod from_stub(stub) QueryClient¶
Compose a QueryClient from a pre-built gRPC stub.
Bypasses channel construction so tests can inject a fake stub (see tests/_fakes.py::RecordingStub) without opening a socket. The returned client does not own a channel;
close()is a no-op (PARITY_AUDIT.md plan item P1.12.e / finding #24).
- get_event_book(query: angzarr_client.proto.angzarr.Query, timeout: float | None = None) angzarr_client.proto.angzarr.EventBook¶
Retrieve a single EventBook for the query.
- Parameters:
query – The query specification.
timeout – Optional per-call deadline in seconds.
Audit #69 stage (b): attaches
x-correlation-idmetadata fromquery.cover.correlation_idso OTel exporters / mesh sidecars can filter on it without decoding the body. Send-only.
- get_events(query: angzarr_client.proto.angzarr.Query, timeout: float | None = None) list[angzarr_client.proto.angzarr.EventBook]¶
Retrieve all EventBooks matching the query.
- Parameters:
query – The query specification.
timeout – Optional per-call deadline in seconds.
- query(domain: str, root: uuid.UUID) angzarr_client.builder.QueryBuilder¶
Start building a query for a specific aggregate.
- query_domain(domain: str) angzarr_client.builder.QueryBuilder¶
Start building a query by domain only (use with by_correlation_id).
- class angzarr_client.client.CommandHandlerClient(channel: grpc.Channel, owns_channel: bool = True)¶
Client for the CommandHandlerCoordinatorService.
- classmethod connect(endpoint: str, retry: angzarr_client.retry.RetryPolicy | None = None) CommandHandlerClient¶
Connect to a command handler coordinator at the given endpoint.
- Parameters:
endpoint – The gRPC endpoint (host:port or UDS path).
retry – Optional retry policy. Defaults to exponential backoff.
- classmethod from_channel(channel: grpc.Channel) CommandHandlerClient¶
Create a client from a caller-managed channel.
The returned client will not close the channel when close() is called; the caller retains ownership.
- classmethod from_env(env_var: str, default: str) CommandHandlerClient¶
Connect using an environment variable with fallback.
- classmethod from_stub(stub) CommandHandlerClient¶
Compose a CommandHandlerClient from a pre-built gRPC stub.
See
QueryClient.from_stub()for the test-only seam used to inject tests/_fakes.py::RecordingStub. P1.12.e / finding #24.
- handle(command: angzarr_client.proto.angzarr.CommandBook, timeout: float | None = None) angzarr_client.proto.angzarr.CommandResponse¶
Execute a command with default async mode.
This is a convenience method that wraps the command in a CommandRequest with SYNC_MODE_ASYNC (fire-and-forget) and CASCADE_ERROR_FAIL_FAST.
- Parameters:
command – The command to execute.
timeout – Optional per-call deadline in seconds.
- handle_command(request: angzarr_client.proto.angzarr.CommandRequest, timeout: float | None = None) angzarr_client.proto.angzarr.CommandResponse¶
Execute a command with the specified sync mode.
- Parameters:
request – The command request.
timeout – Optional per-call deadline in seconds.
Audit #69 stage (b): attaches
x-correlation-idmetadata from the canonicalrequest.command.cover.correlation_id.
- handle_sync_speculative(request: angzarr_client.proto.angzarr.SpeculateCommandHandlerRequest, timeout: float | None = None) angzarr_client.proto.angzarr.CommandResponse¶
Execute a command speculatively against temporal state (no persistence).
- Parameters:
request – The speculative command request.
timeout – Optional per-call deadline in seconds.
- command(domain: str, root: uuid.UUID) angzarr_client.builder.CommandBuilder¶
Start building a command for an existing aggregate.
- command_new(domain: str) angzarr_client.builder.CommandBuilder¶
Start building a command for a new aggregate.
Materializes a fresh UUID v4 client-side and passes it explicitly to CommandBuilder. Audit #67:
rootis always client-assigned, no path exists to skip it.
- class angzarr_client.client.SpeculativeClient(channel: grpc.Channel, owns_channel: bool = True)¶
Client for speculative operations across coordinator services.
Speculative execution runs commands/events against temporal state without persistence. Each coordinator service now provides its own speculative method.
- classmethod connect(endpoint: str, retry: angzarr_client.retry.RetryPolicy | None = None) SpeculativeClient¶
Connect to coordinator services at the given endpoint.
- Parameters:
endpoint – The gRPC endpoint (host:port or UDS path).
retry – Optional retry policy. Defaults to exponential backoff.
- classmethod from_channel(channel: grpc.Channel) SpeculativeClient¶
Create a client from a caller-managed channel.
The returned client will not close the channel when close() is called; the caller retains ownership.
- classmethod from_env(env_var: str, default: str) SpeculativeClient¶
Connect using an environment variable with fallback.
- classmethod from_stubs(command_handler_stub, saga_stub, projector_stub, pm_stub) SpeculativeClient¶
Compose a SpeculativeClient from pre-built gRPC stubs.
Bypasses channel construction; the four stubs back the four speculative methods (command_handler, saga, projector, process_manager). Test-only seam — see tests/_fakes.py::RecordingStub. P1.12.e / finding #24.
- command_handler(request: angzarr_client.proto.angzarr.SpeculateCommandHandlerRequest, timeout: float | None = None) angzarr_client.proto.angzarr.CommandResponse¶
Execute a command speculatively against temporal state.
Audit #69 stage (b): canonical correlation_id at
request.command.cover.correlation_id.
- projector(request: angzarr_client.proto.angzarr.SpeculateProjectorRequest, timeout: float | None = None) angzarr_client.proto.angzarr.Projection¶
Speculatively execute a projector against events.
Audit #69 stage (b): canonical correlation_id at
request.events.cover.correlation_id.
- saga(request: angzarr_client.proto.angzarr.SpeculateSagaRequest, timeout: float | None = None) angzarr_client.proto.angzarr.SagaResponse¶
Speculatively execute a saga against events.
Audit #69 stage (b): canonical correlation_id at
request.request.source.cover.correlation_id.
- class angzarr_client.client.DomainClient(channel: grpc.Channel, owns_channel: bool = True)¶
Combined client for command handler, query, and speculative operations on a single domain.
- command_handler¶
- query¶
- speculative¶
- classmethod connect(endpoint: str, retry: angzarr_client.retry.RetryPolicy | None = None) DomainClient¶
Connect to a domain’s coordinator at the given endpoint.
- Parameters:
endpoint – The gRPC endpoint (host:port or UDS path).
retry – Optional retry policy. Defaults to exponential backoff.
- classmethod from_channel(channel: grpc.Channel) DomainClient¶
Create a client from a caller-managed channel.
The returned client will not close the channel when close() is called; the caller retains ownership.
- classmethod from_clients(command_handler: CommandHandlerClient, query: QueryClient, speculative: SpeculativeClient) DomainClient¶
Compose a DomainClient from already-constructed wrapped clients.
Bypasses channel construction — the wrapped clients hold their own channels (or are duck-typed test doubles). The DomainClient does not own the channel;
close()is a no-op for the composed instance.Used by cucumber/unit tests to inject recording mocks without spinning up a gRPC server (PARITY_AUDIT.md plan item P1.12.e / finding #24).
- classmethod for_domain(domain: str, mode: TransportMode | None = None) DomainClient¶
Connect to a domain’s command handler coordinator.
Resolves the domain name to the appropriate endpoint based on transport mode.
- Parameters:
domain – Domain name (e.g., “player”, “table”)
mode – Transport mode (standalone=UDS, distributed=K8s DNS). If None, detected from ANGZARR_MODE env var.
- Returns:
DomainClient connected to the domain’s command handler coordinator.
Examples
# Auto-detect mode from ANGZARR_MODE env var player = DomainClient.for_domain(“player”)
# Explicitly use standalone mode (Unix Domain Sockets) player = DomainClient.for_domain(“player”, TransportMode.STANDALONE)
# Explicitly use distributed mode (K8s DNS) player = DomainClient.for_domain(“player”, TransportMode.DISTRIBUTED)
- classmethod from_env(env_var: str, default: str) DomainClient¶
Connect using an environment variable with fallback.
- execute(command: angzarr_client.proto.angzarr.CommandBook, timeout: float | None = None) angzarr_client.proto.angzarr.CommandResponse¶
Execute a command with default async mode (fire-and-forget).
- execute_with_mode(command: angzarr_client.proto.angzarr.CommandBook, sync_mode: angzarr_client.proto.angzarr.SyncMode, timeout: float | None = None) angzarr_client.proto.angzarr.CommandResponse¶
Execute a command with the specified sync mode.
- Parameters:
command – The command to execute.
sync_mode – Execution mode (ASYNC, SIMPLE, or CASCADE).
timeout – Optional per-call deadline in seconds.
- get_event_book(query: angzarr_client.proto.angzarr.Query, timeout: float | None = None) angzarr_client.proto.angzarr.EventBook¶
Retrieve a single EventBook for the query — unary RPC.
Delegates to the underlying
QueryClient. Mirrors Rust’sDomainClient::get_event_book.
- get_events(query: angzarr_client.proto.angzarr.Query, timeout: float | None = None) list[angzarr_client.proto.angzarr.EventBook]¶
Retrieve all matching EventBooks for the query — streaming RPC.
Delegates to the underlying
QueryClient. Mirrors Rust’sDomainClient::get_events.