angzarr_client

Angzarr Python client library for gRPC services.

Submodules

Attributes

Exceptions

ClientError

Base class for client errors.

CommandRejectedError

Command was rejected due to business rule violation.

ConnectionError

Failed to establish connection to the server.

GRPCError

gRPC error from the server.

InvalidArgumentError

Invalid argument provided by caller.

InvalidTimestampError

Failed to parse timestamp.

TransportError

Transport-level error.

BuildError

Raised when the builder cannot produce a valid runtime router.

DispatchError

gRPC-compatible dispatch error carrying a StatusCode.

Classes

CommandBuilder

Fluent builder for constructing and executing commands.

QueryBuilder

Fluent builder for constructing and executing queries.

CommandHandlerClient

Client for the CommandHandlerCoordinatorService.

DomainClient

Combined client for command handler, query, and speculative operations on a single domain.

QueryClient

Client for the EventQueryService.

SpeculativeClient

Client for speculative operations across coordinator services.

TransportMode

Transport mode for gRPC connections.

CompensationContext

Extracted context from a rejection Notification.

DelegationOptions

Options struct for delegate_to_framework / pm_delegate_to_framework.

PMRevocationResponse

Result from PM compensation helpers.

Destinations

Context for saga/PM handlers providing access to destination sequences.

ExponentialBackoffRetry

Retries with exponential backoff and optional jitter.

RetryPolicy

Strategy for retrying failed operations.

CommandHandlerRouter

Runtime router dispatching commands to registered @command_handler instances.

ProcessManagerResponse

Return value of a process-manager @handles method.

ProcessManagerRouter

Runtime router dispatching events to registered @process_manager instances.

ProjectorRouter

Runtime router fanning events out to registered @projector instances.

RejectionHandlerResponse

Return shape of a saga @rejected handler method.

Router

Fluent builder that collects handler factories for the unified router.

SagaHandlerResponse

Return value of a saga @handles method.

SagaRouter

Runtime router dispatching events to registered @saga instances.

UpcasterRouter

Runtime router transforming events through registered @upcaster instances.

CommandHandlerGrpc

gRPC adapter for a unified CommandHandlerRouter.

ProcessManagerGrpc

gRPC adapter for a unified ProcessManagerRouter.

ProjectorGrpc

gRPC adapter for a unified ProjectorRouter.

SagaGrpc

gRPC adapter for a unified SagaRouter.

UpcasterGrpc

gRPC adapter for a unified UpcasterRouter.

ServerConfig

Configuration for a gRPC server.

ScenarioContext

Shared context for BDD test scenarios.

CommandBook

Wrapper for the CommandBook proto.

CommandPage

Wrapper for the CommandPage proto.

CommandResponse

Wrapper for the CommandResponse proto.

Cover

Wrapper for the Cover proto.

CoverBearer

Shared accessors for proto types that carry a Cover field.

EventBook

Wrapper for the EventBook proto.

EventPage

Wrapper for the EventPage proto.

Query

Wrapper for the Query proto.

Wrapped

Interface every angzarr wrapper implements.

Functions

resolve_ch_endpoint(→ str)

Resolve domain to command handler coordinator endpoint.

delegate_to_framework(...)

Create a response that delegates compensation to the framework.

emit_compensation_events(...)

Create a response containing compensation events.

is_notification(→ bool)

Check if a type URL refers to a rejection Notification.

pm_delegate_to_framework(→ PMRevocationResponse)

Create a PM response that delegates compensation to the framework.

pm_emit_compensation_events(→ PMRevocationResponse)

Create a PM response containing compensation events.

correlated_metadata(→ list[tuple[str, str]])

Build gRPC metadata carrying the x-correlation-id header.

destination_map(→ dict[str, ...)

Build a map from root UUID hex to EventBook for destination lookup.

full_type_url_for(→ str)

Get the full type URL for a message class.

implicit_edition(→ angzarr_client.proto.angzarr.Edition)

Create an edition with the given name but no divergences.

now(→ google.protobuf.timestamp_pb2.Timestamp)

Return the current time as a protobuf Timestamp.

parse_timestamp(→ google.protobuf.timestamp_pb2.Timestamp)

Parse an RFC3339 timestamp string.

proto_to_uuid(→ uuid.UUID)

Convert a proto UUID to Python UUID.

proto_uuid_to_hex(→ str)

Convert a proto UUID to hex string format.

type_name_from_url(→ str)

Extract the wire-format type name from a type URL.

type_url(→ str)

Construct a full type URL from a fully-qualified message type name.

type_url_matches(→ bool)

Check if a type URL matches the given fully-qualified type name.

uuid_to_proto(→ angzarr_client.proto.angzarr.UUID)

Convert a Python UUID to a proto UUID.

cart_root(→ uuid.UUID)

Compute a deterministic root UUID for a cart aggregate.

compute_root(→ uuid.UUID)

Compute a deterministic root UUID from domain and business key.

customer_root(→ uuid.UUID)

Compute a deterministic root UUID for a customer aggregate.

fulfillment_root(→ uuid.UUID)

Compute a deterministic root UUID for a fulfillment aggregate.

inventory_product_root(→ uuid.UUID)

Generate a deterministic UUID for an inventory product aggregate.

inventory_root(→ uuid.UUID)

Compute a deterministic root UUID for an inventory aggregate.

order_root(→ uuid.UUID)

Compute a deterministic root UUID for an order aggregate.

product_root(→ uuid.UUID)

Compute a deterministic root UUID for a product aggregate.

to_proto_bytes(→ bytes)

Convert a uuid.UUID to 16-byte proto representation.

default_retry_policy(→ RetryPolicy)

Return the standard retry policy matching Rust's backoff config.

applies(→ Callable[[F], F])

Register a method as a state applier for event_type.

command_handler(→ Callable[[T], T])

Mark a class as a command handler (aggregate) for domain.

handles(→ Callable[[F], F])

Register a method as a dispatch target for message_type.

handles_fact(→ Callable[[F], F])

Register a method as a fact-event handler for event_type.

process_manager(→ Callable[[T], T])

Mark a class as a process manager.

projector(→ Callable[[T], T])

Mark a class as a projector consuming events from domains.

rejected(→ Callable[[F], F])

Register a method as a compensation handler for command rejections.

saga(→ Callable[[T], T])

Mark a class as a saga translating events from source to commands

state_factory(→ F)

Mark a method as the state factory for this instance.

upcaster(→ Callable[[T], T])

Mark a class as an upcaster transforming events in domain.

upcasts(→ Callable[[F], F])

Register a method as a transformation from from_type to to_type.

cleanup_socket(→ None)

Clean up a UDS socket file.

configure_logging(→ None)

Configure structlog with JSON rendering and ISO timestamps.

create_server(→ ServerHandle)

Create an async gRPC server with health checking wired up.

get_transport_config(→ tuple[str, str])

Get transport configuration from environment.

resolve_bind_address(→ str)

Compute the TCP bind address.

run_command_handler_server(→ None)

Run a command handler gRPC server.

run_process_manager_server(→ None)

Run a process manager gRPC server.

run_projector_server(→ None)

Run a projector gRPC server.

run_saga_server(→ None)

Run a saga gRPC server.

run_server(→ None)

Run an async gRPC server until termination.

run_upcaster_server(→ None)

Run an upcaster gRPC server.

make_command_book(...)

Create a CommandBook with a single command.

make_command_page(...)

Create a CommandPage.

make_cover(→ angzarr_client.proto.angzarr.Cover)

Create a Cover from domain and root bytes.

make_event_book(→ angzarr_client.proto.angzarr.EventBook)

Create an EventBook.

make_event_page(→ angzarr_client.proto.angzarr.EventPage)

Create an EventPage.

make_timestamp(→ google.protobuf.timestamp_pb2.Timestamp)

Create a timestamp for now.

uuid_for(→ bytes)

Generate a deterministic 16-byte UUID from a name.

uuid_obj_for(→ uuid.UUID)

Generate a deterministic UUID object from a name.

uuid_str_for(→ str)

Generate a deterministic UUID string from a name.

testing_pack_event(→ google.protobuf.any_pb2.Any)

Pack a protobuf message into an Any with the canonical type URL.

require_exists(→ None)

Require that an aggregate exists (caller-supplied predicate).

require_non_negative(→ None)

Require that a value is zero or greater.

require_not_empty(→ None)

Require that a sequence has at least one element.

require_not_empty_str(→ None)

Require that a string is not empty.

require_not_exists(→ None)

Require that an aggregate does NOT exist (caller-supplied predicate).

require_positive(→ None)

Require that a value is greater than zero.

require_status(→ None)

Require that the current status matches the expected value.

require_status_not(→ None)

Require that the current status is NOT the forbidden value.

Package Contents

class angzarr_client.CommandBuilder(client: angzarr_client.client.CommandHandlerClient, domain: str, root: uuid.UUID)

Fluent builder for constructing and executing commands.

Audit #67: root is required. The only path to an auto-generated UUID v4 is via command_new(), which materializes the UUID and passes it explicitly to the constructor. Aggregate roots are always client-assigned across all six languages (audit #20 convention).

with_correlation_id(id: str) CommandBuilder

Set the correlation ID for request tracing.

with_sequence(seq: int) CommandBuilder

Set the expected sequence number for optimistic locking.

with_merge_strategy(strategy: angzarr_client.proto.angzarr.MergeStrategy) CommandBuilder

Set the merge strategy for conflict resolution.

Parameters:

strategy – MERGE_COMMUTATIVE (default) or MERGE_STRICT.

with_command(type_url: str, message: google.protobuf.message.Message) CommandBuilder

Set the command type URL and message.

build() angzarr_client.proto.angzarr.CommandBook

Build the CommandBook without executing.

execute(sync_mode: angzarr_client.proto.angzarr.SyncMode = SyncMode.SYNC_MODE_ASYNC) angzarr_client.proto.angzarr.CommandResponse

Build and execute the command.

Parameters:

sync_mode – Execution mode (ASYNC, SIMPLE, or CASCADE). Defaults to ASYNC for fire-and-forget behavior.

class angzarr_client.QueryBuilder(client: angzarr_client.client.QueryClient, domain: str, root: uuid.UUID | None = None)

Fluent builder for constructing and executing queries.

by_correlation_id(id: str) QueryBuilder

Query by correlation ID instead of root.

edition(edition: str) QueryBuilder

Query events from a specific edition.

with_edition
range(lower: int) QueryBuilder

Query a range of sequences from lower (inclusive).

Last-selection-wins: clears any previously-set temporal selection so chained calls like .as_of_sequence(10).range(5) produce a Query with only the range. Mirrors Rust’s builder.rs:165 single-slot semantics (PARITY_AUDIT.md finding #23).

range_to(lower: int, upper: int) QueryBuilder

Query a range of sequences with upper bound (inclusive).

Last-selection-wins (see range()).

as_of_sequence(seq: int) QueryBuilder

Query state as of a specific sequence number.

Last-selection-wins: clears any previously-set range selection.

as_of_time(rfc3339: str) QueryBuilder

Query state as of a specific timestamp (RFC3339 format).

Last-selection-wins: clears any previously-set range selection.

Audit finding #34 (Option B — raise immediately): a malformed rfc3339 string raises InvalidTimestampError synchronously rather than deferring to build(). Previously the failure was captured into a sticky _err field that survived subsequent last-call-wins setters, making qb.as_of_time(“bad”).as_of_sequence(5).build() raise the stale parse error. Mirrors Rust’s as_of_time(...) -> Result<Self> signature where the bad call short-circuits at the call site.

build() angzarr_client.proto.angzarr.Query

Build the Query without executing.

get_event_book() angzarr_client.proto.angzarr.EventBook

Execute the query and return a single EventBook.

get_events() list[angzarr_client.proto.angzarr.EventBook]

Execute the query and return all matching EventBooks.

get_pages() list[angzarr_client.proto.angzarr.EventPage]

Execute the query and return just the event pages.

class angzarr_client.CommandHandlerClient(channel: grpc.Channel, owns_channel: bool = True)

Client for the CommandHandlerCoordinatorService.

classmethod connect(endpoint: str, retry: angzarr_client.retry.RetryPolicy | None = None) CommandHandlerClient

Connect to a command handler coordinator at the given endpoint.

Parameters:
  • endpoint – The gRPC endpoint (host:port or UDS path).

  • retry – Optional retry policy. Defaults to exponential backoff.

classmethod from_channel(channel: grpc.Channel) CommandHandlerClient

Create a client from a caller-managed channel.

The returned client will not close the channel when close() is called; the caller retains ownership.

classmethod from_env(env_var: str, default: str) CommandHandlerClient

Connect using an environment variable with fallback.

classmethod from_stub(stub) CommandHandlerClient

Compose a CommandHandlerClient from a pre-built gRPC stub.

See QueryClient.from_stub() for the test-only seam used to inject tests/_fakes.py::RecordingStub. P1.12.e / finding #24.

handle(command: angzarr_client.proto.angzarr.CommandBook, timeout: float | None = None) angzarr_client.proto.angzarr.CommandResponse

Execute a command with default async mode.

This is a convenience method that wraps the command in a CommandRequest with SYNC_MODE_ASYNC (fire-and-forget) and CASCADE_ERROR_FAIL_FAST.

Parameters:
  • command – The command to execute.

  • timeout – Optional per-call deadline in seconds.

handle_command(request: angzarr_client.proto.angzarr.CommandRequest, timeout: float | None = None) angzarr_client.proto.angzarr.CommandResponse

Execute a command with the specified sync mode.

Parameters:
  • request – The command request.

  • timeout – Optional per-call deadline in seconds.

Audit #69 stage (b): attaches x-correlation-id metadata from the canonical request.command.cover.correlation_id.

handle_sync_speculative(request: angzarr_client.proto.angzarr.SpeculateCommandHandlerRequest, timeout: float | None = None) angzarr_client.proto.angzarr.CommandResponse

Execute a command speculatively against temporal state (no persistence).

Parameters:
  • request – The speculative command request.

  • timeout – Optional per-call deadline in seconds.

command(domain: str, root: uuid.UUID) angzarr_client.builder.CommandBuilder

Start building a command for an existing aggregate.

command_new(domain: str) angzarr_client.builder.CommandBuilder

Start building a command for a new aggregate.

Materializes a fresh UUID v4 client-side and passes it explicitly to CommandBuilder. Audit #67: root is always client-assigned, no path exists to skip it.

close() None

Close the underlying channel if this client owns it.

class angzarr_client.DomainClient(channel: grpc.Channel, owns_channel: bool = True)

Combined client for command handler, query, and speculative operations on a single domain.

command_handler
query
speculative
classmethod connect(endpoint: str, retry: angzarr_client.retry.RetryPolicy | None = None) DomainClient

Connect to a domain’s coordinator at the given endpoint.

Parameters:
  • endpoint – The gRPC endpoint (host:port or UDS path).

  • retry – Optional retry policy. Defaults to exponential backoff.

classmethod from_channel(channel: grpc.Channel) DomainClient

Create a client from a caller-managed channel.

The returned client will not close the channel when close() is called; the caller retains ownership.

classmethod from_clients(command_handler: CommandHandlerClient, query: QueryClient, speculative: SpeculativeClient) DomainClient

Compose a DomainClient from already-constructed wrapped clients.

Bypasses channel construction — the wrapped clients hold their own channels (or are duck-typed test doubles). The DomainClient does not own the channel; close() is a no-op for the composed instance.

Used by cucumber/unit tests to inject recording mocks without spinning up a gRPC server (PARITY_AUDIT.md plan item P1.12.e / finding #24).

classmethod for_domain(domain: str, mode: TransportMode | None = None) DomainClient

Connect to a domain’s command handler coordinator.

Resolves the domain name to the appropriate endpoint based on transport mode.

Parameters:
  • domain – Domain name (e.g., “player”, “table”)

  • mode – Transport mode (standalone=UDS, distributed=K8s DNS). If None, detected from ANGZARR_MODE env var.

Returns:

DomainClient connected to the domain’s command handler coordinator.

Examples

# Auto-detect mode from ANGZARR_MODE env var player = DomainClient.for_domain(“player”)

# Explicitly use standalone mode (Unix Domain Sockets) player = DomainClient.for_domain(“player”, TransportMode.STANDALONE)

# Explicitly use distributed mode (K8s DNS) player = DomainClient.for_domain(“player”, TransportMode.DISTRIBUTED)

classmethod from_env(env_var: str, default: str) DomainClient

Connect using an environment variable with fallback.

execute(command: angzarr_client.proto.angzarr.CommandBook, timeout: float | None = None) angzarr_client.proto.angzarr.CommandResponse

Execute a command with default async mode (fire-and-forget).

execute_with_mode(command: angzarr_client.proto.angzarr.CommandBook, sync_mode: angzarr_client.proto.angzarr.SyncMode, timeout: float | None = None) angzarr_client.proto.angzarr.CommandResponse

Execute a command with the specified sync mode.

Parameters:
  • command – The command to execute.

  • sync_mode – Execution mode (ASYNC, SIMPLE, or CASCADE).

  • timeout – Optional per-call deadline in seconds.

get_event_book(query: angzarr_client.proto.angzarr.Query, timeout: float | None = None) angzarr_client.proto.angzarr.EventBook

Retrieve a single EventBook for the query — unary RPC.

Delegates to the underlying QueryClient. Mirrors Rust’s DomainClient::get_event_book.

get_events(query: angzarr_client.proto.angzarr.Query, timeout: float | None = None) list[angzarr_client.proto.angzarr.EventBook]

Retrieve all matching EventBooks for the query — streaming RPC.

Delegates to the underlying QueryClient. Mirrors Rust’s DomainClient::get_events.

close() None

Close the underlying channel if this client owns it.

class angzarr_client.QueryClient(channel: grpc.Channel, owns_channel: bool = True)

Client for the EventQueryService.

classmethod connect(endpoint: str, retry: angzarr_client.retry.RetryPolicy | None = None) QueryClient

Connect to an event query service at the given endpoint.

Parameters:
  • endpoint – The gRPC endpoint (host:port or UDS path).

  • retry – Optional retry policy. Defaults to exponential backoff.

classmethod from_channel(channel: grpc.Channel) QueryClient

Create a client from a caller-managed channel.

The returned client will not close the channel when close() is called; the caller retains ownership.

classmethod from_env(env_var: str, default: str) QueryClient

Connect using an environment variable with fallback.

classmethod from_stub(stub) QueryClient

Compose a QueryClient from a pre-built gRPC stub.

Bypasses channel construction so tests can inject a fake stub (see tests/_fakes.py::RecordingStub) without opening a socket. The returned client does not own a channel; close() is a no-op (PARITY_AUDIT.md plan item P1.12.e / finding #24).

get_event_book(query: angzarr_client.proto.angzarr.Query, timeout: float | None = None) angzarr_client.proto.angzarr.EventBook

Retrieve a single EventBook for the query.

Parameters:
  • query – The query specification.

  • timeout – Optional per-call deadline in seconds.

Audit #69 stage (b): attaches x-correlation-id metadata from query.cover.correlation_id so OTel exporters / mesh sidecars can filter on it without decoding the body. Send-only.

get_events(query: angzarr_client.proto.angzarr.Query, timeout: float | None = None) list[angzarr_client.proto.angzarr.EventBook]

Retrieve all EventBooks matching the query.

Parameters:
  • query – The query specification.

  • timeout – Optional per-call deadline in seconds.

query(domain: str, root: uuid.UUID) angzarr_client.builder.QueryBuilder

Start building a query for a specific aggregate.

query_domain(domain: str) angzarr_client.builder.QueryBuilder

Start building a query by domain only (use with by_correlation_id).

close() None

Close the underlying channel if this client owns it.

class angzarr_client.SpeculativeClient(channel: grpc.Channel, owns_channel: bool = True)

Client for speculative operations across coordinator services.

Speculative execution runs commands/events against temporal state without persistence. Each coordinator service now provides its own speculative method.

classmethod connect(endpoint: str, retry: angzarr_client.retry.RetryPolicy | None = None) SpeculativeClient

Connect to coordinator services at the given endpoint.

Parameters:
  • endpoint – The gRPC endpoint (host:port or UDS path).

  • retry – Optional retry policy. Defaults to exponential backoff.

classmethod from_channel(channel: grpc.Channel) SpeculativeClient

Create a client from a caller-managed channel.

The returned client will not close the channel when close() is called; the caller retains ownership.

classmethod from_env(env_var: str, default: str) SpeculativeClient

Connect using an environment variable with fallback.

classmethod from_stubs(command_handler_stub, saga_stub, projector_stub, pm_stub) SpeculativeClient

Compose a SpeculativeClient from pre-built gRPC stubs.

Bypasses channel construction; the four stubs back the four speculative methods (command_handler, saga, projector, process_manager). Test-only seam — see tests/_fakes.py::RecordingStub. P1.12.e / finding #24.

command_handler(request: angzarr_client.proto.angzarr.SpeculateCommandHandlerRequest, timeout: float | None = None) angzarr_client.proto.angzarr.CommandResponse

Execute a command speculatively against temporal state.

Audit #69 stage (b): canonical correlation_id at request.command.cover.correlation_id.

projector(request: angzarr_client.proto.angzarr.SpeculateProjectorRequest, timeout: float | None = None) angzarr_client.proto.angzarr.Projection

Speculatively execute a projector against events.

Audit #69 stage (b): canonical correlation_id at request.events.cover.correlation_id.

saga(request: angzarr_client.proto.angzarr.SpeculateSagaRequest, timeout: float | None = None) angzarr_client.proto.angzarr.SagaResponse

Speculatively execute a saga against events.

Audit #69 stage (b): canonical correlation_id at request.request.source.cover.correlation_id.

process_manager(request: angzarr_client.proto.angzarr.SpeculatePmRequest, timeout: float | None = None) angzarr_client.proto.angzarr.ProcessManagerHandleResponse

Speculatively execute a process manager.

Audit #69 stage (b): canonical correlation_id at request.request.trigger.cover.correlation_id.

close() None

Close the underlying channel if this client owns it.

class angzarr_client.TransportMode(*args, **kwds)

Bases: enum.Enum

Transport mode for gRPC connections.

STANDALONE = 'standalone'
DISTRIBUTED = 'distributed'
angzarr_client.resolve_ch_endpoint(domain: str, mode: TransportMode | None = None, *, uds_base: str = '/tmp/angzarr', namespace: str = 'angzarr', port: int = 1310) str

Resolve domain to command handler coordinator endpoint.

Parameters:
  • domain – The domain name (e.g., “player”, “table”, “hand”)

  • mode – Transport mode. If None, detected from ANGZARR_MODE env var.

  • uds_base – Base path for Unix Domain Sockets (standalone mode)

  • namespace – Kubernetes namespace (distributed mode)

  • port – gRPC port (distributed mode)

Returns:

  • Standalone: /tmp/angzarr/ch-player.sock

  • Distributed: ch-player.angzarr.svc:1310

Return type:

Endpoint string suitable for _create_channel

Environment Variables:

ANGZARR_MODE: “standalone” or “distributed” (default: “distributed”) ANGZARR_UDS_BASE: Override uds_base (default: /tmp/angzarr) ANGZARR_NAMESPACE: Override namespace (default: angzarr) ANGZARR_CH_PORT: Override port (default: 1310)

class angzarr_client.CompensationContext

Extracted context from a rejection Notification.

Provides easy access to compensation-relevant fields. Source aggregate info is extracted from the rejected command’s angzarr_deferred header.

source_event_sequence: int

Sequence of the event that triggered the saga/PM flow.

rejection_reason: str

Why the command was rejected.

rejected_command: angzarr_client.proto.angzarr.types_pb2.CommandBook | None

The command that was rejected (if available).

source_aggregate: angzarr_client.proto.angzarr.types_pb2.Cover | None

Cover of the aggregate that triggered the flow.

classmethod from_notification(notification: angzarr_client.proto.angzarr.types_pb2.Notification) CompensationContext

Extract compensation context from a Notification.

Source aggregate info is extracted from the rejected command’s angzarr_deferred header, which is always set by the framework for saga/PM-produced commands.

Parameters:

notification – The notification containing RejectionNotification payload.

Returns:

CompensationContext with extracted fields.

property rejected_command_type: str | None

Get the type URL of the rejected command, if available.

property source_domain: str | None

Get the domain of the source aggregate, if available.

property source_root: bytes | None

Get the root UUID bytes of the source aggregate, if available.

property dispatch_key: str

Build a dispatch key for routing rejection handlers.

Returns:

A key in format “domain/command” or empty string.

class angzarr_client.DelegationOptions

Options struct for delegate_to_framework / pm_delegate_to_framework.

Audit #65 / #66: collapses the previously-divergent function shapes (Rust two-function split, Python kwargs) into a single shared options type. Cross-language symmetric — Python DelegationOptions mirrors Rust compensation::DelegationOptions field-for-field with the same defaults.

Field defaults match the previous Python kwargs and Rust’s basic delegate_to_framework (which hardcoded emit_system_event = true and the rest false).

emit_system_event: bool = True

Emit SagaCompensationFailed to the fallback domain.

send_to_dead_letter: bool = False

Move the failed event to the dead-letter queue.

escalate: bool = False

Mark for operator intervention.

abort: bool = False

Stop the saga entirely without retry.

class angzarr_client.PMRevocationResponse

Result from PM compensation helpers.

Named type matching Go’s PMRevocationResponse, replacing raw tuples for better discoverability and documentation.

Audit finding #70: revocation is required (no default), matching Rust’s compensation.rs:188-194 revocation: RevocationResponse field. The factory helpers pm_delegate_to_framework and pm_emit_compensation_events always construct one, so no caller is broken; direct user-code construction without it raises TypeError at construction time, surfacing the missing field loudly. Symmetric to audit-#56 (tighten ambiguous Optionals when the framework always populates them).

revocation: angzarr_client.proto.angzarr.command_handler_pb2.RevocationResponse

Framework action flags. Required.

process_events: angzarr_client.proto.angzarr.types_pb2.EventBook | None = None

PM events to persist (may be None when delegating).

angzarr_client.delegate_to_framework(reason: str, options: DelegationOptions | None = None) angzarr_client.proto.angzarr.command_handler_pb2.BusinessResponse

Create a response that delegates compensation to the framework.

Use when the aggregate doesn’t have custom compensation logic for a saga. The framework will emit a SagaCompensationFailed event to the fallback domain.

Parameters:
  • reason – Human-readable explanation for the delegation.

  • options – Optional DelegationOptions struct overriding the framework defaults (emit_system_event=True and the rest False). Audit #65: replaces the previous keyword arguments for cross-language symmetry with Rust.

Returns:

BusinessResponse with revocation flags.

angzarr_client.emit_compensation_events(event_book: angzarr_client.proto.angzarr.types_pb2.EventBook) angzarr_client.proto.angzarr.command_handler_pb2.BusinessResponse

Create a response containing compensation events.

Use when the aggregate emits events to record compensation. The framework will persist these events and NOT emit a system event.

Parameters:

event_book – EventBook containing compensation events.

Returns:

BusinessResponse with events.

angzarr_client.is_notification(type_url: str) bool

Check if a type URL refers to a rejection Notification.

Cross-language alias for Rust’s is_notification(type_url). Useful for dispatch code that needs to branch on “is this a notification?” without unpacking the Any payload.

angzarr_client.pm_delegate_to_framework(reason: str, options: DelegationOptions | None = None) PMRevocationResponse

Create a PM response that delegates compensation to the framework.

Use when the PM doesn’t have custom compensation logic.

Parameters:
  • reason – Human-readable explanation for the delegation.

  • options – Optional DelegationOptions struct. Audit #66: replaces the previous emit_system_event kwarg with the shared options struct (same shape as delegate_to_framework()).

Returns:

PMRevocationResponse with no process events, delegate to framework.

angzarr_client.pm_emit_compensation_events(process_events: angzarr_client.proto.angzarr.types_pb2.EventBook, also_emit_system_event: bool = False, reason: str = '') PMRevocationResponse

Create a PM response containing compensation events.

Use when the PM emits events to record the compensation in its state.

Parameters:
  • process_events – EventBook containing PM compensation events.

  • also_emit_system_event – Also emit SagaCompensationFailed.

  • reason – Reason for system event (if emitting).

Returns:

PMRevocationResponse with events and revocation flags.

class angzarr_client.Destinations(sequences: dict[str, int])

Context for saga/PM handlers providing access to destination sequences.

Sagas and PMs receive destination sequences (not full EventBooks) from the framework. This class provides a stamp_command() helper to set the correct sequence number on commands before they’re sent.

Why sequences only?

Sagas and PMs are translators/coordinators - they should NOT make business decisions based on destination state. If you need external information: 1. Inject it as a fact 2. Let aggregates decide (they validate commands) 3. Use sync mode for immediate feedback on command results

sequences

Map from domain name to next sequence number.

Create a Destinations context from a sequence map.

param sequences:

Dict mapping domain name to next sequence number. Typically from proto’s destination_sequences field.

classmethod from_proto(destination_sequences: dict[str, int]) Destinations

Create from proto’s destination_sequences map.

Parameters:

destination_sequences – Map from ProcessManagerHandleRequest or SagaHandleRequest proto.

Returns:

Destinations instance with the sequences.

sequence_for(domain: str) int | None

Get the next sequence number for a destination domain.

Parameters:

domain – The target domain name.

Returns:

The next sequence number, or None if domain not found.

stamp_command(cmd: angzarr_client.proto.angzarr.types_pb2.CommandBook, domain: str) angzarr_client.proto.angzarr.types_pb2.CommandBook

Stamp a command with the correct sequence for its destination domain.

Modifies the command’s page headers in-place with the sequence number.

Parameters:
  • cmd – The CommandBook to stamp.

  • domain – The target domain (must be in destination_sequences).

Returns:

The same CommandBook (for chaining).

Raises:

InvalidArgumentError – If domain is not in destination_sequences. Carries code=MISSING_DESTINATION_SEQUENCE and details["domain"]=<domain> for cucumber assertions. Check your output_domains config when you see this. Audit #64.

static deferred_header(source_cover, source_seq: int) angzarr_client.proto.angzarr.types_pb2.PageHeader

Build a PageHeader carrying an AngzarrDeferredSequence.

Accepts either a Cover wrapper (the canonical post-B2 form passed to handlers by the framework) or a raw Cover proto. Internally the proto is what gets copied into the AngzarrDeferredSequence payload.

Use this on saga-produced commands so the framework can dedupe on (source.root, source_seq, target.root). AMQP at-least-once redelivery of the trigger event then becomes a no-op at the destination aggregate’s pipeline (cached events returned without re-invoking business logic), instead of relying on a business guard that surfaces as an idempotent-failure-shaped retry storm.

has_domain(domain: str) bool

Check if a destination domain is available.

Parameters:

domain – The domain name to check.

Returns:

True if the domain has a sequence entry.

property domains: list[str]

Get list of available destination domains.

exception angzarr_client.ClientError(message: str, cause: Exception | None = None, *, code: str = '', details: collections.abc.Mapping[str, Any] | None = None)

Bases: Exception

Base class for client errors.

Predicate methods (is_not_found, is_precondition_failed, is_invalid_argument, is_connection_error) return False here and are overridden by subclasses. This lets callers write if err.is_not_found(): ... without casting.

Audit finding #59 fields:
  • message — static human-readable string

  • code — SCREAMING_SNAKE stable identifier

  • details — runtime context as dict[str, str]

Initialize self. See help(type(self)) for accurate signature.

message
code = ''
cause = None
details: dict[str, str]
is_not_found() bool

Return True if this is a NOT_FOUND error.

is_precondition_failed() bool

Return True if this is a FAILED_PRECONDITION error.

is_invalid_argument() bool

Return True if this is an INVALID_ARGUMENT error.

is_connection_error() bool

Return True if this is a connection or transport error.

exception angzarr_client.CommandRejectedError(message: str, status_code: str = 'FAILED_PRECONDITION', *, code: str = '', details: collections.abc.Mapping[str, Any] | None = None, cover: Any | None = None)

Bases: ClientError

Command was rejected due to business rule violation.

Status codes and retry semantics:
  • FAILED_PRECONDITION: state-based rejection; retryable after refreshing state.

  • INVALID_ARGUMENT: bad input; not retryable.

  • NOT_FOUND: aggregate does not exist; not retryable — refetching won’t help.

Audit finding #59: callers pass a static message, a SCREAMING_SNAKE code, and structured details kwargs. The factory methods (precondition_failed / invalid_argument / not_found) bind the appropriate status_code.

The cover attribute is the addressing envelope (domain, root, correlation_id, edition) of the command that produced this rejection. Handlers do not populate it; the router stamps it from the incoming CommandRequest at the dispatch boundary so every rejection is traceable to its originating workflow without each call site having to thread the context.

Initialize self. See help(type(self)) for accurate signature.

status_code = 'FAILED_PRECONDITION'
cover = None
static precondition_failed(code: str, message: str, details: collections.abc.Mapping[str, Any] | None = None) CommandRejectedError

Create a FAILED_PRECONDITION error for guard failures.

Parameters:
  • code – SCREAMING_SNAKE stable identifier (use a constant from angzarr_client.error_codes.codes).

  • message – Static human-readable message (no interpolation; use a constant from angzarr_client.error_codes.messages).

  • details – Structured runtime context — keys should be from angzarr_client.error_codes.keys.

static invalid_argument(code: str, message: str, details: collections.abc.Mapping[str, Any] | None = None) CommandRejectedError

Create an INVALID_ARGUMENT error for input validation failures.

static not_found(code: str, message: str, details: collections.abc.Mapping[str, Any] | None = None) CommandRejectedError

Create a NOT_FOUND error for missing-aggregate failures.

is_precondition_failed() bool

Return True if this is a FAILED_PRECONDITION error.

is_invalid_argument() bool

Return True if this is an INVALID_ARGUMENT error.

is_not_found() bool

Return True if this is a NOT_FOUND error.

exception angzarr_client.ConnectionError(message: str = 'connection failed', *, code: str = 'CONNECTION_FAILED', details: collections.abc.Mapping[str, Any] | None = None)

Bases: ClientError

Failed to establish connection to the server.

Initialize self. See help(type(self)) for accurate signature.

is_connection_error() bool

Return True if this is a connection or transport error.

exception angzarr_client.GRPCError(cause: grpc.RpcError, *, code: str = codes.GRPC_ERROR, details: collections.abc.Mapping[str, Any] | None = None)

Bases: ClientError

gRPC error from the server.

Initialize self. See help(type(self)) for accurate signature.

property grpc_code: grpc.StatusCode

Return the gRPC status code.

property grpc_details: str

Return the gRPC server-side details string.

status() grpc.RpcError

Return the underlying gRPC RpcError (status).

is_not_found() bool

Return True if this is a NOT_FOUND error.

is_precondition_failed() bool

Return True if this is a FAILED_PRECONDITION error.

is_invalid_argument() bool

Return True if this is an INVALID_ARGUMENT error.

is_connection_error() bool

Return True for UNAVAILABLE status.

exception angzarr_client.InvalidArgumentError(message: str, *, code: str = 'INVALID_ARGUMENT', details: collections.abc.Mapping[str, Any] | None = None)

Bases: ClientError

Invalid argument provided by caller.

Initialize self. See help(type(self)) for accurate signature.

is_invalid_argument() bool

Return True if this is an INVALID_ARGUMENT error.

exception angzarr_client.InvalidTimestampError(message: str = 'invalid timestamp', *, code: str = 'INVALID_TIMESTAMP', details: collections.abc.Mapping[str, Any] | None = None)

Bases: ClientError

Failed to parse timestamp.

Initialize self. See help(type(self)) for accurate signature.

exception angzarr_client.TransportError(cause: Exception, *, code: str = codes.TRANSPORT_ERROR, details: collections.abc.Mapping[str, Any] | None = None)

Bases: ClientError

Transport-level error.

Initialize self. See help(type(self)) for accurate signature.

is_connection_error() bool

Return True if this is a connection or transport error.

angzarr_client.CORRELATION_ID_HEADER = 'x-correlation-id'
angzarr_client.DEFAULT_EDITION = ''
angzarr_client.META_ANGZARR_DOMAIN = '_angzarr'
angzarr_client.PROJECTION_DOMAIN_PREFIX = '_projection'
angzarr_client.PROJECTION_TYPE_URL = 'angzarr_client.proto.angzarr.Projection'
angzarr_client.TYPE_URL_PREFIX = 'type.googleapis.com/'
angzarr_client.UNKNOWN_DOMAIN = 'unknown'
angzarr_client.WILDCARD_DOMAIN = '*'
angzarr_client.correlated_metadata(correlation_id: str) list[tuple[str, str]]

Build gRPC metadata carrying the x-correlation-id header.

Returns the list shape grpcio expects on stub calls. Empty correlation IDs produce an empty list — the header is skipped, never sent as a blank value. Audit #69.

angzarr_client.destination_map(destinations: list[angzarr_client.proto.angzarr.EventBook]) dict[str, angzarr_client.proto.angzarr.EventBook]

Build a map from root UUID hex to EventBook for destination lookup.

Used in multi-destination sagas to look up the correct EventBook by aggregate root when setting command sequences. Entries without a root are skipped.

angzarr_client.full_type_url
angzarr_client.full_type_url_for(msg_class: type[google.protobuf.message.Message]) str

Get the full type URL for a message class.

angzarr_client.implicit_edition(name: str) angzarr_client.proto.angzarr.Edition

Create an edition with the given name but no divergences.

angzarr_client.now() google.protobuf.timestamp_pb2.Timestamp

Return the current time as a protobuf Timestamp.

angzarr_client.parse_timestamp(rfc3339: str) google.protobuf.timestamp_pb2.Timestamp

Parse an RFC3339 timestamp string.

angzarr_client.proto_to_uuid(u: angzarr_client.proto.angzarr.UUID) uuid.UUID

Convert a proto UUID to Python UUID.

angzarr_client.proto_uuid_to_hex(u: angzarr_client.proto.angzarr.UUID | None) str

Convert a proto UUID to hex string format.

angzarr_client.type_name_from_url(type_url_str: str) str

Extract the wire-format type name from a type URL.

Returns the part after the last /. Inputs with no slash are returned unchanged.

angzarr_client.type_url(type_name: str) str

Construct a full type URL from a fully-qualified message type name.

angzarr_client.type_url_matches(type_url_str: str, type_name: str) bool

Check if a type URL matches the given fully-qualified type name.

angzarr_client.type_url_matches_exact
angzarr_client.uuid_to_proto(u: uuid.UUID) angzarr_client.proto.angzarr.UUID

Convert a Python UUID to a proto UUID.

angzarr_client.INVENTORY_PRODUCT_NAMESPACE
angzarr_client.cart_root(customer_id: str) uuid.UUID

Compute a deterministic root UUID for a cart aggregate.

angzarr_client.compute_root(domain: str, business_key: str) uuid.UUID

Compute a deterministic root UUID from domain and business key.

The UUID is derived from: hash(“angzarr” + domain + business_key) using the OID namespace, matching the Rust compute_root function.

angzarr_client.customer_root(email: str) uuid.UUID

Compute a deterministic root UUID for a customer aggregate.

angzarr_client.fulfillment_root(order_id: str) uuid.UUID

Compute a deterministic root UUID for a fulfillment aggregate.

angzarr_client.inventory_product_root(product_id: str) uuid.UUID

Generate a deterministic UUID for an inventory product aggregate.

Uses UUID v5 with INVENTORY_PRODUCT_NAMESPACE to ensure the same product_id always maps to the same inventory aggregate root.

angzarr_client.inventory_root(product_id: str) uuid.UUID

Compute a deterministic root UUID for an inventory aggregate.

angzarr_client.order_root(order_id: str) uuid.UUID

Compute a deterministic root UUID for an order aggregate.

angzarr_client.product_root(sku: str) uuid.UUID

Compute a deterministic root UUID for a product aggregate.

angzarr_client.to_proto_bytes(id: uuid.UUID) bytes

Convert a uuid.UUID to 16-byte proto representation.

class angzarr_client.ExponentialBackoffRetry(min_delay: float = 0.1, max_delay: float = 5.0, max_attempts: int = 10, jitter: bool = True, on_retry: Callable[[int, Exception], None] | None = None)

Bases: RetryPolicy

Retries with exponential backoff and optional jitter.

min_delay = 0.1
max_delay = 5.0
max_attempts = 10
jitter = True
on_retry = None
execute(operation: Callable[[], T]) T

Run the operation, retrying on failure according to the policy.

Returns the result of the first successful attempt. Raises the last exception if all attempts fail.

class angzarr_client.RetryPolicy

Bases: abc.ABC

Strategy for retrying failed operations.

abstractmethod execute(operation: Callable[[], T]) T

Run the operation, retrying on failure according to the policy.

Returns the result of the first successful attempt. Raises the last exception if all attempts fail.

angzarr_client.default_retry_policy() RetryPolicy

Return the standard retry policy matching Rust’s backoff config.

exception angzarr_client.BuildError(message: str, *, code: str = '', details: collections.abc.Mapping[str, Any] | None = None)

Bases: Exception

Raised when the builder cannot produce a valid runtime router.

Audit #72: carries the structural error shape from audit #59 — SCREAMING_SNAKE code (programmatic dispatch / cucumber assertions), static message (cross-language equality), and details for runtime context (router name, conflicting kinds, duplicate (domain, type_url), handler class, field name, etc.). Mirrors Rust router::handler::BuildError which carries an ErrorDetail per variant.

Construction sites pass constants from angzarr_client.error_codes: codes.DUPLICATE_COMMAND_HANDLER etc. for the code, messages.DUPLICATE_COMMAND_HANDLER etc. for the static message, detail keys from keys for the details map.

Initialize self. See help(type(self)) for accurate signature.

message
code = ''
details: dict[str, str]
class angzarr_client.CommandHandlerRouter(factories: list[Factory])

Bases: _BuiltRouterBase, Generic[S]

Runtime router dispatching commands to registered @command_handler instances.

property name: str

Domain this router serves (read from the first registered handler’s @command_handler(domain=...) metadata).

@command_handler has no name attribute, so the domain is the natural identifier. Mirrors Rust’s CommandHandlerRouter::name() (runtime.rs:625-633).

dispatch(request)

Route a ContextualCommand to the matching @handles method.

supports_handle_fact() bool

Audit #45: True if any registered handler declares at least one @handles_fact method. The gRPC adapter uses this as the gate for the HandleFact RPC — False → return UNIMPLEMENTED without entering dispatch.

Implemented as pure metadata read on the registered classes; no factory invocation, no per-request work.

supports_replay() bool

Audit #45: True if any registered handler opted in via @command_handler(supports_replay=True). The gRPC adapter uses this as the gate for the Replay RPC.

Replay is a generic state-rebuild operation — the framework implements it from the existing @applies machinery, so the opt-in is just an explicit acknowledgement that the aggregate’s state type is proto-serializable (replay round-trips state via Any).

dispatch_fact(request)

Route a FactRequest to matching @handles_fact methods.

Caller is responsible for gating via supports_handle_fact() before invoking this — the gRPC adapter does so.

dispatch_replay(request)

Replay events on top of a base snapshot to compute final state.

Caller is responsible for gating via supports_replay() before invoking this — the gRPC adapter does so.

exception angzarr_client.DispatchError(code: grpc.StatusCode, details: str, *, error_code: str = '', extras: dict[str, Any] | None = None)

Bases: grpc.RpcError

gRPC-compatible dispatch error carrying a StatusCode.

Raised when a command cannot be routed (unknown type, unknown domain), when a request is malformed (missing command/page/cover), or for any other caller-facing violation. The gRPC servicer can read .code() and forward the status to the client.

Audit finding #59: message is a static string (no interpolation); runtime context like type URLs, domains, etc. ride in extras. The SCREAMING_SNAKE error_code is a stable cross-language identifier suitable for cucumber assertions.

Initialize self. See help(type(self)) for accurate signature.

error_code = ''
extras: dict[str, str]
code() grpc.StatusCode
details() str
class angzarr_client.ProcessManagerResponse(commands: list[angzarr_client.proto.angzarr.types_pb2.CommandBook] | None = None, process_events: list[angzarr_client.proto.angzarr.types_pb2.EventBook] | None = None, facts: list[angzarr_client.proto.angzarr.types_pb2.EventBook] | None = None)

Return value of a process-manager @handles method.

  • commands: commands forwarded to target domains

  • process_events: list of EventBooks recorded into the PM’s own event stream (state). Multiple books concatenate into the wire’s single ProcessManagerHandleResponse.process_events (first non-empty book’s cover wins).

  • facts: facts injected into other aggregates without a command

Audit finding #56 (Option B — list[EventBook]): aligns with Rust’s Vec<EventBook> and matches the existing commands / facts list shapes for symmetry.

commands = []
process_events = []
facts = []
class angzarr_client.ProcessManagerRouter(factories: list[Factory])

Bases: _BuiltRouterBase, Generic[S]

Runtime router dispatching events to registered @process_manager instances.

dispatch(request)

Route a ProcessManagerHandleRequest to matching handlers.

output_domains() list[str]

Domains this router emits commands to.

Read from each registered handler class’s __angzarr_meta__. Default is empty — only sagas (target) and process managers (targets) override.

sync_output_domains() list[str]

Audit #74: subset of output_domains() that the registered handlers ever address with sync mode (SIMPLE / CASCADE / DECISION / ISOLATED). Drives readiness probing — only sync targets need their coordinator reachable for traffic to be safe. Default empty; saga / PM routers override.

has_async_outputs() bool

Audit #74: True if any registered handler emits to at least one async target — i.e. ever publishes through the async bus rather than synchronously calling a downstream coordinator. Drives whether the readiness supervisor includes a BusProbe. Default False (CH / Projector / Upcaster never cross-emit).

class angzarr_client.ProjectorRouter(factories: list[Factory])

Bases: _BuiltRouterBase

Runtime router fanning events out to registered @projector instances.

dispatch(events)

Fan out each event in the book to matching @handles methods.

class angzarr_client.RejectionHandlerResponse

Return shape of a saga @rejected handler method.

Carries compensation events to persist locally and an optional Notification to forward upstream. Mirrors Rust’s router::responses::RejectionHandlerResponse field-for-field.

Audit finding #56 (Option B — list[EventBook]): aligns with Rust’s Vec<EventBook>. Multiple books concatenate downstream; first non-empty book’s cover wins.

events: list[angzarr_client.proto.angzarr.types_pb2.EventBook] = []

Events to persist to own state (compensation).

notification: angzarr_client.proto.angzarr.types_pb2.Notification | None = None

Notification to forward upstream (rejection propagation).

class angzarr_client.Router(name: str)

Fluent builder that collects handler factories for the unified router.

Usage:

Router("agg-service")
    .with_handler(Player, lambda: Player(db_pool))
    .with_handler(Hand, lambda: Hand(rng))
    .build()

.build() returns a typed runtime router (CommandHandlerRouter, SagaRouter, ProcessManagerRouter, or ProjectorRouter). Mixed handler kinds are rejected at build time (R4).

Handlers are registered as (cls, factory) pairs so the router can invoke factory() per dispatch call to obtain a fresh (or pooled) instance. This keeps handler state isolated per request and makes the router safe to share across threads.

name
with_handler(cls: type, factory: Callable[[], Any]) Router

Register a handler class together with a zero-arg factory.

cls must carry one of the five kind decorators (@command_handler, @saga, @process_manager, @projector, @upcaster). Registering an undecorated class raises BuildError.

factory is any Callable[[], Any]: a lambda, a bound method on a DI container, a functools.partial, a pool-checkout closure, or a callable returning a shared singleton. The router never inspects the factory body; it only calls it and uses whatever it returns.

Factories are invoked inside dispatch(), so their latency is on the request path. For handlers whose construction is expensive — opens a DB connection, reads a config file, performs I/O, does non-trivial computation, holds resources that must be released — prefer a pool checkout or close over a pre-built instance rather than constructing fresh on every call.

build() Any

Produce a typed runtime router.

Empty → BuildError. Mixed kinds → BuildError. Homogeneous → CommandHandlerRouter / SagaRouter / ProcessManagerRouter / ProjectorRouter per the shared kind.

class angzarr_client.SagaHandlerResponse(commands: list[angzarr_client.proto.angzarr.types_pb2.CommandBook] | None = None, events: list[angzarr_client.proto.angzarr.types_pb2.EventBook] | None = None)

Return value of a saga @handles method.

Carries commands to forward to target domains and/or facts (events) to inject into other aggregates. Both are optional.

commands = []
events = []
class angzarr_client.SagaRouter(factories: list[Factory])

Bases: _BuiltRouterBase

Runtime router dispatching events to registered @saga instances.

dispatch(request)

Route a SagaHandleRequest to all matching @handles methods.

output_domains() list[str]

Domains this router emits commands to.

Read from each registered handler class’s __angzarr_meta__. Default is empty — only sagas (target) and process managers (targets) override.

sync_output_domains() list[str]

Audit #74: subset of output_domains() that the registered handlers ever address with sync mode (SIMPLE / CASCADE / DECISION / ISOLATED). Drives readiness probing — only sync targets need their coordinator reachable for traffic to be safe. Default empty; saga / PM routers override.

has_async_outputs() bool

Audit #74: True if any registered handler emits to at least one async target — i.e. ever publishes through the async bus rather than synchronously calling a downstream coordinator. Drives whether the readiness supervisor includes a BusProbe. Default False (CH / Projector / Upcaster never cross-emit).

class angzarr_client.UpcasterRouter(factories: list[Factory])

Bases: _BuiltRouterBase

Runtime router transforming events through registered @upcaster instances.

dispatch(request)

Transform each event in request.events via matching @upcasts methods.

angzarr_client.applies(event_type: type) Callable[[F], F]

Register a method as a state applier for event_type.

Appliers are invoked during state rebuild, walking the prior event book and mutating the instance’s state in place.

angzarr_client.command_handler(*, domain: str, state: type, supports_replay: bool = False) Callable[[T], T]

Mark a class as a command handler (aggregate) for domain.

The state type is the aggregate’s state type; the class must either provide a @state_factory method or rely on state() as the default factory (enforced in a later round).

supports_replay (default False) opts the aggregate into the coordinator’s Replay RPC, used for MERGE_COMMUTATIVE conflict detection. When True, the framework auto-implements replay using the @applies methods to rebuild state from a snapshot + events; the state type must be a proto Message (carry a DESCRIPTOR). When False the gRPC adapter returns UNIMPLEMENTED for Replay requests — the coordinator degrades to MERGE_STRICT semantics. Audit #45.

Fact handling is opt-in via the @handles_fact(EventType) method decorator. Aggregates with no @handles_fact methods get UNIMPLEMENTED from the gRPC adapter for HandleFact; the coordinator falls back to pass-through-persist (per the proto’s Optional contract).

angzarr_client.handles(message_type: type) Callable[[F], F]

Register a method as a dispatch target for message_type.

For command handlers this is the command type; for sagas / process managers / projectors it is the event type. Dispatch routes by proto full-name match.

angzarr_client.handles_fact(event_type: type) Callable[[F], F]

Register a method as a fact-event handler for event_type.

Audit #45. Triggered when the coordinator dispatches a fact (an external reality, e.g. a payment confirmation from a third party) via the HandleFact RPC. The method receives (event, state) after state has been rebuilt from prior events; it returns the events to persist (or None for pure side-effects).

Aggregates with at least one @handles_fact method opt into the HandleFact RPC. Aggregates with none get UNIMPLEMENTED from the framework’s gRPC adapter — the coordinator then falls back to pass-through-persist per the proto’s Optional contract.

Distinct from @handles (which dispatches commands and returns events), @handles_fact is fact-specific: facts cannot be rejected, and the method’s return shape is just events-to-persist.

angzarr_client.process_manager(*, name: str, pm_domain: str, sources: list[str], targets: list[str], state: type, sync_targets: list[str] | None = None) Callable[[T], T]

Mark a class as a process manager.

pm_domain is the PM’s own state-storage domain; sources lists incoming event domains; targets lists downstream command domains.

Audit #74: sync_targets declares the subset of targets the PM ever addresses with sync mode (SIMPLE / CASCADE / DECISION / ISOLATED) — those are the targets whose coordinator must be reachable for readiness. Default None (no sync targets) — every command goes through the async bus and the readiness supervisor will not probe any target’s coordinator.

Raises ValueError if sync_targets contains a domain that is not in targets.

angzarr_client.projector(*, name: str, domains: list[str]) Callable[[T], T]

Mark a class as a projector consuming events from domains.

angzarr_client.rejected(source_domain: str, command: str) Callable[[F], F]

Register a method as a compensation handler for command rejections.

Triggered when a command originating from this component is rejected by the target aggregate. source_domain and command identify the rejected command’s proto full-name suffix split into domain/command parts.

angzarr_client.saga(*, name: str, source: str, target: str, sync: bool = False) Callable[[T], T]

Mark a class as a saga translating events from source to commands for target.

Audit #74: sync declares whether commands emitted to target ever use sync mode (SIMPLE / CASCADE / DECISION / ISOLATED). Default False — target commands flow through the async bus and the readiness supervisor will not probe target’s coordinator. Flip to True when the saga blocks on the downstream response.

angzarr_client.state_factory(fn: F) F

Mark a method as the state factory for this instance.

Overrides the default factory (calling StateType()). Useful when state construction requires parameters or custom defaults.

angzarr_client.upcaster(*, name: str, domain: str) Callable[[T], T]

Mark a class as an upcaster transforming events in domain.

Methods decorated with @upcasts(FromType, ToType) declare individual version-to-version transformations. An upcaster with zero @upcasts methods is allowed (passthrough).

angzarr_client.upcasts(from_type: type, to_type: type) Callable[[F], F]

Register a method as a transformation from from_type to to_type.

The method must accept the old event and return the new one. Dispatch matches by exact proto type-URL on the incoming event; events without a registered transform pass through unchanged.

class angzarr_client.CommandHandlerGrpc(router: Any)

Bases: angzarr_client.proto.angzarr.command_handler_pb2_grpc.CommandHandlerServiceServicer

gRPC adapter for a unified CommandHandlerRouter.

Audit #45: implements all three RPCs declared by the proto: Handle (always functional), HandleFact (gated on whether the aggregate declared any @handles_fact methods), and Replay (gated on whether the aggregate opted in via @command_handler(supports_replay=True)).

async Handle(request: angzarr_client.proto.angzarr.types_pb2.ContextualCommand, context: grpc.aio.ServicerContext) angzarr_client.proto.angzarr.command_handler_pb2.BusinessResponse

Process command and return business response (events or revocation request)

async HandleFact(request: angzarr_client.proto.angzarr.command_handler_pb2.FactRequest, context: grpc.aio.ServicerContext) angzarr_client.proto.angzarr.types_pb2.EventBook

Process fact events - update aggregate state based on external realities. Optional: if unimplemented, facts are persisted as-is (pass-through).

async Replay(request: angzarr_client.proto.angzarr.command_handler_pb2.ReplayRequest, context: grpc.aio.ServicerContext) angzarr_client.proto.angzarr.command_handler_pb2.ReplayResponse

Replay events to compute state (for conflict detection) Optional: only needed if aggregate supports MERGE_COMMUTATIVE

class angzarr_client.ProcessManagerGrpc(router: Any)

Bases: angzarr_client.proto.angzarr.process_manager_pb2_grpc.ProcessManagerServiceServicer

gRPC adapter for a unified ProcessManagerRouter.

async Handle(request: angzarr_client.proto.angzarr.process_manager_pb2.ProcessManagerHandleRequest, context: grpc.aio.ServicerContext) angzarr_client.proto.angzarr.process_manager_pb2.ProcessManagerHandleResponse

Handle with trigger + process state. Returns commands for other aggregates and events for the PM’s own domain.

PMs do not rebuild destination aggregate state — they translate events into commands/facts and rely on destination_sequences for command stamping. See process manager design philosophy.

class angzarr_client.ProjectorGrpc(router: Any)

Bases: angzarr_client.proto.angzarr.projector_pb2_grpc.ProjectorServiceServicer

gRPC adapter for a unified ProjectorRouter.

async Handle(request: angzarr_client.proto.angzarr.types_pb2.EventBook, context: grpc.aio.ServicerContext) angzarr_client.proto.angzarr.types_pb2.Projection

Async projection - projector should persist and return

async HandleSpeculative(request: angzarr_client.proto.angzarr.types_pb2.EventBook, context: grpc.aio.ServicerContext) angzarr_client.proto.angzarr.types_pb2.Projection

Speculative processing - projector must avoid external side effects

class angzarr_client.SagaGrpc(router: Any)

Bases: angzarr_client.proto.angzarr.saga_pb2_grpc.SagaServiceServicer

gRPC adapter for a unified SagaRouter.

async Handle(request: angzarr_client.proto.angzarr.saga_pb2.SagaHandleRequest, context: grpc.aio.ServicerContext) angzarr_client.proto.angzarr.saga_pb2.SagaResponse

Translate source events into commands for target domains. Commands use angzarr_deferred — framework stamps explicit sequences on delivery.

class angzarr_client.UpcasterGrpc(router: Any)

Bases: angzarr_client.proto.angzarr.upcaster_pb2_grpc.UpcasterServiceServicer

gRPC adapter for a unified UpcasterRouter.

async Upcast(request: angzarr_client.proto.angzarr.upcaster_pb2.UpcastRequest, context: grpc.aio.ServicerContext) angzarr_client.proto.angzarr.upcaster_pb2.UpcastResponse

Transform events to current version Returns events in same order, transformed where applicable

angzarr_client.DEFAULT_BIND_HOST = '[::]'
angzarr_client.ENV_BIND_ADDRESS = 'ANGZARR_BIND_ADDRESS'
class angzarr_client.ServerConfig

Configuration for a gRPC server.

Cross-language alias for Rust’s ServerConfig { port, uds_path }.

port: int = 50052
uds_path: str | None = None
classmethod from_env(default_port: int = 50052) ServerConfig
angzarr_client.cleanup_socket(socket_path: str) None

Clean up a UDS socket file.

angzarr_client.configure_logging() None

Configure structlog with JSON rendering and ISO timestamps.

angzarr_client.create_server(add_servicer_func: collections.abc.Callable, servicer: object, service_name: str = '') ServerHandle

Create an async gRPC server with health checking wired up.

Audit #68: health reporting starts at NOT_SERVING for both the empty (overall) service name and any explicit service_name; the readiness supervisor flips it once every probe passes. Pre-#68 Python set SERVING immediately, which made K8s readiness vacuously true.

angzarr_client.get_transport_config() tuple[str, str]

Get transport configuration from environment.

Returns:

Tuple of (transport_type, address) - For TCP: (“tcp”, “[::]:{port}”) — overridable via

ANGZARR_BIND_ADDRESS

  • For UDS: (“uds”, “unix://{socket_path}”)

angzarr_client.resolve_bind_address(default_port: int = 50052) str

Compute the TCP bind address.

Audit #77: returns ANGZARR_BIND_ADDRESS verbatim when set, otherwise composes [::]:{port} where port comes from the PORT env var or default_port.

angzarr_client.run_command_handler_server(router, domain: str = '', default_port: int = 50052) None

Run a command handler gRPC server.

angzarr_client.run_process_manager_server(router, domain: str = '', default_port: int = 50052) None

Run a process manager gRPC server.

angzarr_client.run_projector_server(router, domain: str = '', default_port: int = 50052) None

Run a projector gRPC server.

angzarr_client.run_saga_server(router, domain: str = '', default_port: int = 50052) None

Run a saga gRPC server.

angzarr_client.run_server(add_servicer_func: collections.abc.Callable, servicer: object, service_name: str = '', domain: str = '', default_port: str = '50052', sync_output_domains: list[str] | None = None, has_async_outputs: bool = False, logger=None) None

Run an async gRPC server until termination.

Sync entry point — runs the asyncio event loop internally so callers can keep using the same blocking shape they had before.

angzarr_client.run_upcaster_server(router, domain: str = '', default_port: int = 50052) None

Run an upcaster gRPC server.

angzarr_client.DEFAULT_TEST_NAMESPACE
class angzarr_client.ScenarioContext

Shared context for BDD test scenarios.

Tracks the current aggregate, event history, command results, and rebuilt state across Given/When/Then steps.

domain

Current aggregate domain being tested

root

Current aggregate root as bytes

events

List of packed events (ProtoAny) in history

result

Last command handler result (event or tuple of events)

error

Last CommandRejectedError if command was rejected

state

Rebuilt aggregate state after applying events

Example

ctx = ScenarioContext() ctx.domain = “player” ctx.root = uuid_for(“player-alice”)

# Given player registered ctx.add_event(PlayerRegistered(email=”alice@test.com”))

# When deposit funds try:

ctx.result = handler.handle(deposit_cmd, ctx.event_book())

except CommandRejectedError as e:

ctx.error = e

# Then balance updated assert ctx.result.new_balance == 100

domain: str = ''
root: bytes = b''
events: list = []
result: Any = None
error: angzarr_client.errors.CommandRejectedError | None = None
state: Any = None
event_book() angzarr_client.proto.angzarr.EventBook

Build EventBook from accumulated events.

Creates an EventBook with proper sequencing from the events added via add_event().

Returns:

EventBook with cover, pages, and next_sequence set

add_event(event_msg)

Add an event to history.

Packs the event message (deriving the type URL from event_msg.DESCRIPTOR.full_name) and appends to the event list.

Audit finding #47: the previous type_url_prefix arg is dropped — the canonical URL is derived from the message descriptor.

Parameters:

event_msg – The protobuf event message to add

clear_events()

Clear all events from history.

clear_result()

Clear the last result and error.

reset()

Reset context to initial state.

angzarr_client.make_command_book(cover: angzarr_client.proto.angzarr.Cover, command: google.protobuf.any_pb2.Any, sequence: int = 0) angzarr_client.proto.angzarr.CommandBook

Create a CommandBook with a single command.

Parameters:
  • cover – The Cover identifying the target aggregate

  • command – The packed command (ProtoAny)

  • sequence – The command sequence number (defaults to 0)

Returns:

CommandBook proto with one page

angzarr_client.make_command_page(sequence: int, command: google.protobuf.any_pb2.Any) angzarr_client.proto.angzarr.CommandPage

Create a CommandPage.

Parameters:
  • sequence – The command sequence number

  • command – The packed command (ProtoAny)

Returns:

CommandPage proto

angzarr_client.make_cover(domain: str, root: bytes, correlation_id: str = '') angzarr_client.proto.angzarr.Cover

Create a Cover from domain and root bytes.

Parameters:
  • domain – The aggregate domain name

  • root – The aggregate root as 16 bytes

  • correlation_id – Optional correlation ID for cross-domain tracking

Returns:

Cover proto with domain and root set

angzarr_client.make_event_book(cover: angzarr_client.proto.angzarr.Cover, pages: list[angzarr_client.proto.angzarr.EventPage] = None, next_sequence: int = None) angzarr_client.proto.angzarr.EventBook

Create an EventBook.

Parameters:
  • cover – The Cover identifying the aggregate

  • pages – List of EventPages (defaults to empty)

  • next_sequence – Next sequence number (defaults to len(pages))

Returns:

EventBook proto

angzarr_client.make_event_page(sequence: int, event: google.protobuf.any_pb2.Any) angzarr_client.proto.angzarr.EventPage

Create an EventPage.

Parameters:
  • sequence – The event sequence number

  • event – The packed event (ProtoAny)

Returns:

EventPage proto

angzarr_client.make_timestamp() google.protobuf.timestamp_pb2.Timestamp

Create a timestamp for now.

Alias for angzarr_client.helpers.now() for backwards compatibility.

Returns:

Current time as protobuf Timestamp

angzarr_client.uuid_for(name: str, namespace: uuid.UUID = DEFAULT_TEST_NAMESPACE) bytes

Generate a deterministic 16-byte UUID from a name.

The same name always generates the same UUID within a namespace. Returns bytes suitable for use as aggregate root IDs.

Parameters:
  • name – A string identifier (e.g., “player-alice”, “table-1”)

  • namespace – UUID namespace for generation (defaults to test namespace)

Returns:

16-byte UUID as bytes

Example

root = uuid_for(“player-alice”) assert len(root) == 16 assert uuid_for(“player-alice”) == root # deterministic

angzarr_client.uuid_obj_for(name: str, namespace: uuid.UUID = DEFAULT_TEST_NAMESPACE) uuid.UUID

Generate a deterministic UUID object from a name.

Parameters:
  • name – A string identifier

  • namespace – UUID namespace for generation

Returns:

UUID object

Example

id_obj = uuid_obj_for(“player-alice”) assert id_obj.bytes == uuid_for(“player-alice”)

angzarr_client.uuid_str_for(name: str, namespace: uuid.UUID = DEFAULT_TEST_NAMESPACE) str

Generate a deterministic UUID string from a name.

Parameters:
  • name – A string identifier

  • namespace – UUID namespace for generation

Returns:

UUID as standard string format (8-4-4-4-12)

Example

id_str = uuid_str_for(“player-alice”) assert “-” in id_str # standard UUID format

angzarr_client.testing_pack_event(msg: google.protobuf.message.Message) google.protobuf.any_pb2.Any

Pack a protobuf message into an Any with the canonical type URL.

The type URL is derived from the message’s descriptor (msg.DESCRIPTOR.full_name) prefixed with the standard type.googleapis.com/ — per the google.protobuf.Any spec.

Audit finding #47 (Option C — drop the second arg, derive name from the message): mirrors Rust’s testing::builders::pack_event<M>(msg). Removes the previous type_url_prefix parameter (which was a typo-prone footgun and diverged in semantics from the Rust 2nd-arg convention).

Returns:

ProtoAny containing the packed message.

angzarr_client.require_exists(exists: bool, context: str) None

Require that an aggregate exists (caller-supplied predicate).

Audit #60: takes a precomputed predicate boolean (e.g. state.is_some() / bool(state.id)) so the callsite is explicit about what “exists” means. Mirrors Rust’s validation.rs:41 require_exists(exists: bool, context: &str).

Raises NOT_FOUND — not retryable, since refetching events cannot change the outcome. For empty-string checks, use require_not_empty_str().

angzarr_client.require_non_negative(value: _Numeric, field_name: str) None

Require that a value is zero or greater.

angzarr_client.require_not_empty(items: collections.abc.Sequence[Any], field_name: str) None

Require that a sequence has at least one element.

angzarr_client.require_not_empty_str(value: str, field_name: str) None

Require that a string is not empty.

angzarr_client.require_not_exists(exists: bool, context: str) None

Require that an aggregate does NOT exist (caller-supplied predicate).

Audit #60: takes a precomputed predicate boolean. Mirrors Rust’s validation.rs:54 require_not_exists(exists: bool, context: &str).

angzarr_client.require_positive(value: _Numeric, field_name: str) None

Require that a value is greater than zero.

angzarr_client.require_status(actual: str, expected: str, context: str) None

Require that the current status matches the expected value.

angzarr_client.require_status_not(actual: str, forbidden: str, context: str) None

Require that the current status is NOT the forbidden value.

class angzarr_client.CommandBook(proto: angzarr_client.proto.angzarr.CommandBook)

Bases: CoverBearer

Wrapper for the CommandBook proto.

proto() angzarr_client.proto.angzarr.CommandBook

Return the wrapped proto message.

cover() Cover

The wrapped cover (always present; default-instance if not set).

See EventBook.cover() for the rationale.

pages() list[CommandPage]

All command pages, wrapped.

first_command() CommandPage | None

First command page, or None when empty.

command_sequence() int

Sequence number of the first command page (0 when empty).

merge_strategy() angzarr_client.proto.angzarr.MergeStrategy

Merge strategy of the first page; defaults to commutative.

class angzarr_client.CommandPage(proto: angzarr_client.proto.angzarr.CommandPage)

Bases: Wrapped

Wrapper for the CommandPage proto.

proto() angzarr_client.proto.angzarr.CommandPage

Return the wrapped proto message.

sequence_num() int

Explicit sequence number, or 0 if not set.

header() angzarr_client.proto.angzarr.PageHeader | None

The page header proto, or None if missing.

is_deferred() bool

True if this page carries a deferred-sequence header.

type_url() str | None

Command payload’s type URL, or None if missing.

payload() bytes | None

Raw command payload bytes, or None if missing.

merge_strategy() angzarr_client.proto.angzarr.MergeStrategy

Per-page merge strategy.

class angzarr_client.CommandResponse(proto: angzarr_client.proto.angzarr.CommandResponse)

Bases: Wrapped

Wrapper for the CommandResponse proto.

proto() angzarr_client.proto.angzarr.CommandResponse

Return the wrapped proto message.

events_book() EventBook | None

The wrapped events EventBook, or None if not set.

events() list[EventPage]

All event pages from the response, wrapped.

class angzarr_client.Cover(proto: angzarr_client.proto.angzarr.Cover)

Bases: CoverBearer

Wrapper for the Cover proto.

proto() angzarr_client.proto.angzarr.Cover

Return the wrapped proto message.

class angzarr_client.CoverBearer

Bases: Wrapped

Shared accessors for proto types that carry a Cover field.

Subclasses set self._proto and override _cover_proto() to return the embedded Cover (or None if missing). The default implementation assumes self._proto is itself a Cover — only Cover relies on the default; the others override.

domain() str

Get the domain, falling back to UNKNOWN_DOMAIN.

correlation_id() str

Get the correlation_id, or empty string if missing.

has_correlation_id() bool

True if a non-empty correlation_id is present.

root_uuid() uuid.UUID | None

Extract the root UUID, or None if missing/malformed.

root_id_hex() str

Root UUID as hex, or empty string if missing.

edition() str | None

Edition name, or None when missing/empty.

routing_key() str

Bus routing key (currently the domain).

cache_key() str

Cache key derived from edition + domain + root.

class angzarr_client.EventBook(proto: angzarr_client.proto.angzarr.EventBook)

Bases: CoverBearer

Wrapper for the EventBook proto.

proto() angzarr_client.proto.angzarr.EventBook

Return the wrapped proto message.

cover() Cover

The wrapped cover.

Always returns a Cover — when the underlying proto has no cover field set, the wrapper is built around the proto’s default-instance cover, so accessors like .domain() still work (returning the canonical empty responses, e.g. UNKNOWN_DOMAIN).

next_sequence() int

Framework-precomputed next sequence number.

is_empty() bool

True if there are no event pages.

pages() list[EventPage]

All event pages, wrapped.

first_page() EventPage | None

First event page, or None when empty.

last_page() EventPage | None

Last event page, or None when empty.

class angzarr_client.EventPage(proto: angzarr_client.proto.angzarr.EventPage)

Bases: Wrapped

Wrapper for the EventPage proto.

proto() angzarr_client.proto.angzarr.EventPage

Return the wrapped proto message.

sequence_num() int

Explicit sequence number, or 0 if not set.

header() angzarr_client.proto.angzarr.PageHeader | None

The page header proto, or None if missing.

is_deferred() bool

True if this page carries a deferred-sequence header.

type_url() str | None

Event payload’s type URL, or None if missing.

payload() bytes | None

Raw event payload bytes, or None if missing.

decode_typed(msg_class: type[T]) T | None

Decode the event payload into msg_class, exact-match on type URL.

Returns None on missing event, type mismatch, or decode failure.

class angzarr_client.Query(proto: angzarr_client.proto.angzarr.Query)

Bases: CoverBearer

Wrapper for the Query proto.

proto() angzarr_client.proto.angzarr.Query

Return the wrapped proto message.

cover() Cover

The wrapped cover (always present; default-instance if not set).

See EventBook.cover() for the rationale.

class angzarr_client.Wrapped

Bases: abc.ABC

Interface every angzarr wrapper implements.

Wrappers expose two surfaces:

  1. Method-style accessors for common needs (e.g. Cover.domain()).

  2. The raw proto for everything else, via proto().

Cross-language note: proto() is the documented escape hatch for callers that want direct proto field access (e.g. for serialization, for fields that don’t have a wrapper accessor, for proto-specific methods like HasField). Other languages will provide an equivalent method (e.g. Java Cover.proto()).

abstractmethod proto()

Return the wrapped proto message.