angzarr_client¶
Angzarr Python client library for gRPC services.
Submodules¶
- angzarr_client.builder
- angzarr_client.client
- angzarr_client.compensation
- angzarr_client.destinations
- angzarr_client.error_codes
- angzarr_client.errors
- angzarr_client.helpers
- angzarr_client.identity
- angzarr_client.readiness
- angzarr_client.retry
- angzarr_client.router
- angzarr_client.server
- angzarr_client.testing
- angzarr_client.validation
- angzarr_client.wrappers
Attributes¶
Exceptions¶
Base class for client errors. |
|
Command was rejected due to business rule violation. |
|
Failed to establish connection to the server. |
|
gRPC error from the server. |
|
Invalid argument provided by caller. |
|
Failed to parse timestamp. |
|
Transport-level error. |
|
Raised when the builder cannot produce a valid runtime router. |
|
gRPC-compatible dispatch error carrying a StatusCode. |
Classes¶
Fluent builder for constructing and executing commands. |
|
Fluent builder for constructing and executing queries. |
|
Client for the CommandHandlerCoordinatorService. |
|
Combined client for command handler, query, and speculative operations on a single domain. |
|
Client for the EventQueryService. |
|
Client for speculative operations across coordinator services. |
|
Transport mode for gRPC connections. |
|
Extracted context from a rejection Notification. |
|
Options struct for |
|
Result from PM compensation helpers. |
|
Context for saga/PM handlers providing access to destination sequences. |
|
Retries with exponential backoff and optional jitter. |
|
Strategy for retrying failed operations. |
|
Runtime router dispatching commands to registered @command_handler instances. |
|
Return value of a process-manager |
|
Runtime router dispatching events to registered @process_manager instances. |
|
Runtime router fanning events out to registered @projector instances. |
|
Return shape of a saga |
|
Fluent builder that collects handler factories for the unified router. |
|
Return value of a saga |
|
Runtime router dispatching events to registered @saga instances. |
|
Runtime router transforming events through registered @upcaster instances. |
|
gRPC adapter for a unified |
|
gRPC adapter for a unified |
|
gRPC adapter for a unified |
|
gRPC adapter for a unified |
|
gRPC adapter for a unified |
|
Configuration for a gRPC server. |
|
Shared context for BDD test scenarios. |
|
Wrapper for the |
|
Wrapper for the |
|
Wrapper for the |
|
Wrapper for the |
|
Shared accessors for proto types that carry a |
|
Wrapper for the |
|
Wrapper for the |
|
Wrapper for the |
|
Interface every angzarr wrapper implements. |
Functions¶
|
Resolve domain to command handler coordinator endpoint. |
Create a response that delegates compensation to the framework. |
|
Create a response containing compensation events. |
|
|
Check if a type URL refers to a rejection Notification. |
|
Create a PM response that delegates compensation to the framework. |
|
Create a PM response containing compensation events. |
|
Build gRPC metadata carrying the |
|
Build a map from root UUID hex to EventBook for destination lookup. |
|
Get the full type URL for a message class. |
|
Create an edition with the given name but no divergences. |
|
Return the current time as a protobuf Timestamp. |
|
Parse an RFC3339 timestamp string. |
|
Convert a proto UUID to Python UUID. |
|
Convert a proto UUID to hex string format. |
|
Extract the wire-format type name from a type URL. |
|
Construct a full type URL from a fully-qualified message type name. |
|
Check if a type URL matches the given fully-qualified type name. |
|
Convert a Python UUID to a proto UUID. |
|
Compute a deterministic root UUID for a cart aggregate. |
|
Compute a deterministic root UUID from domain and business key. |
|
Compute a deterministic root UUID for a customer aggregate. |
|
Compute a deterministic root UUID for a fulfillment aggregate. |
|
Generate a deterministic UUID for an inventory product aggregate. |
|
Compute a deterministic root UUID for an inventory aggregate. |
|
Compute a deterministic root UUID for an order aggregate. |
|
Compute a deterministic root UUID for a product aggregate. |
|
Convert a uuid.UUID to 16-byte proto representation. |
|
Return the standard retry policy matching Rust's backoff config. |
|
Register a method as a state applier for |
|
Mark a class as a command handler (aggregate) for |
|
Register a method as a dispatch target for |
|
Register a method as a fact-event handler for |
|
Mark a class as a process manager. |
|
Mark a class as a projector consuming events from |
|
Register a method as a compensation handler for command rejections. |
|
Mark a class as a saga translating events from |
|
Mark a method as the state factory for this instance. |
|
Mark a class as an upcaster transforming events in |
|
Register a method as a transformation from |
|
Clean up a UDS socket file. |
|
Configure structlog with JSON rendering and ISO timestamps. |
|
Create an async gRPC server with health checking wired up. |
|
Get transport configuration from environment. |
|
Compute the TCP bind address. |
|
Run a command handler gRPC server. |
|
Run a process manager gRPC server. |
|
Run a projector gRPC server. |
|
Run a saga gRPC server. |
|
Run an async gRPC server until termination. |
|
Run an upcaster gRPC server. |
|
Create a CommandBook with a single command. |
|
Create a CommandPage. |
|
Create a Cover from domain and root bytes. |
|
Create an EventBook. |
|
Create an EventPage. |
|
Create a timestamp for now. |
|
Generate a deterministic 16-byte UUID from a name. |
|
Generate a deterministic UUID object from a name. |
|
Generate a deterministic UUID string from a name. |
|
Pack a protobuf message into an Any with the canonical type URL. |
|
Require that an aggregate exists (caller-supplied predicate). |
|
Require that a value is zero or greater. |
|
Require that a sequence has at least one element. |
|
Require that a string is not empty. |
|
Require that an aggregate does NOT exist (caller-supplied predicate). |
|
Require that a value is greater than zero. |
|
Require that the current status matches the expected value. |
|
Require that the current status is NOT the forbidden value. |
Package Contents¶
- class angzarr_client.CommandBuilder(client: angzarr_client.client.CommandHandlerClient, domain: str, root: uuid.UUID)¶
Fluent builder for constructing and executing commands.
Audit #67:
rootis required. The only path to an auto-generated UUID v4 is viacommand_new(), which materializes the UUID and passes it explicitly to the constructor. Aggregate roots are always client-assigned across all six languages (audit #20 convention).- with_correlation_id(id: str) CommandBuilder¶
Set the correlation ID for request tracing.
- with_sequence(seq: int) CommandBuilder¶
Set the expected sequence number for optimistic locking.
- with_merge_strategy(strategy: angzarr_client.proto.angzarr.MergeStrategy) CommandBuilder¶
Set the merge strategy for conflict resolution.
- Parameters:
strategy – MERGE_COMMUTATIVE (default) or MERGE_STRICT.
- with_command(type_url: str, message: google.protobuf.message.Message) CommandBuilder¶
Set the command type URL and message.
- build() angzarr_client.proto.angzarr.CommandBook¶
Build the CommandBook without executing.
- execute(sync_mode: angzarr_client.proto.angzarr.SyncMode = SyncMode.SYNC_MODE_ASYNC) angzarr_client.proto.angzarr.CommandResponse¶
Build and execute the command.
- Parameters:
sync_mode – Execution mode (ASYNC, SIMPLE, or CASCADE). Defaults to ASYNC for fire-and-forget behavior.
- class angzarr_client.QueryBuilder(client: angzarr_client.client.QueryClient, domain: str, root: uuid.UUID | None = None)¶
Fluent builder for constructing and executing queries.
- by_correlation_id(id: str) QueryBuilder¶
Query by correlation ID instead of root.
- edition(edition: str) QueryBuilder¶
Query events from a specific edition.
- with_edition¶
- range(lower: int) QueryBuilder¶
Query a range of sequences from lower (inclusive).
Last-selection-wins: clears any previously-set temporal selection so chained calls like .as_of_sequence(10).range(5) produce a Query with only the range. Mirrors Rust’s builder.rs:165 single-slot semantics (PARITY_AUDIT.md finding #23).
- range_to(lower: int, upper: int) QueryBuilder¶
Query a range of sequences with upper bound (inclusive).
Last-selection-wins (see
range()).
- as_of_sequence(seq: int) QueryBuilder¶
Query state as of a specific sequence number.
Last-selection-wins: clears any previously-set range selection.
- as_of_time(rfc3339: str) QueryBuilder¶
Query state as of a specific timestamp (RFC3339 format).
Last-selection-wins: clears any previously-set range selection.
Audit finding #34 (Option B — raise immediately): a malformed
rfc3339string raisesInvalidTimestampErrorsynchronously rather than deferring tobuild(). Previously the failure was captured into a sticky_errfield that survived subsequent last-call-wins setters, making qb.as_of_time(“bad”).as_of_sequence(5).build() raise the stale parse error. Mirrors Rust’sas_of_time(...) -> Result<Self>signature where the bad call short-circuits at the call site.
- build() angzarr_client.proto.angzarr.Query¶
Build the Query without executing.
- get_event_book() angzarr_client.proto.angzarr.EventBook¶
Execute the query and return a single EventBook.
- class angzarr_client.CommandHandlerClient(channel: grpc.Channel, owns_channel: bool = True)¶
Client for the CommandHandlerCoordinatorService.
- classmethod connect(endpoint: str, retry: angzarr_client.retry.RetryPolicy | None = None) CommandHandlerClient¶
Connect to a command handler coordinator at the given endpoint.
- Parameters:
endpoint – The gRPC endpoint (host:port or UDS path).
retry – Optional retry policy. Defaults to exponential backoff.
- classmethod from_channel(channel: grpc.Channel) CommandHandlerClient¶
Create a client from a caller-managed channel.
The returned client will not close the channel when close() is called; the caller retains ownership.
- classmethod from_env(env_var: str, default: str) CommandHandlerClient¶
Connect using an environment variable with fallback.
- classmethod from_stub(stub) CommandHandlerClient¶
Compose a CommandHandlerClient from a pre-built gRPC stub.
See
QueryClient.from_stub()for the test-only seam used to inject tests/_fakes.py::RecordingStub. P1.12.e / finding #24.
- handle(command: angzarr_client.proto.angzarr.CommandBook, timeout: float | None = None) angzarr_client.proto.angzarr.CommandResponse¶
Execute a command with default async mode.
This is a convenience method that wraps the command in a CommandRequest with SYNC_MODE_ASYNC (fire-and-forget) and CASCADE_ERROR_FAIL_FAST.
- Parameters:
command – The command to execute.
timeout – Optional per-call deadline in seconds.
- handle_command(request: angzarr_client.proto.angzarr.CommandRequest, timeout: float | None = None) angzarr_client.proto.angzarr.CommandResponse¶
Execute a command with the specified sync mode.
- Parameters:
request – The command request.
timeout – Optional per-call deadline in seconds.
Audit #69 stage (b): attaches
x-correlation-idmetadata from the canonicalrequest.command.cover.correlation_id.
- handle_sync_speculative(request: angzarr_client.proto.angzarr.SpeculateCommandHandlerRequest, timeout: float | None = None) angzarr_client.proto.angzarr.CommandResponse¶
Execute a command speculatively against temporal state (no persistence).
- Parameters:
request – The speculative command request.
timeout – Optional per-call deadline in seconds.
- command(domain: str, root: uuid.UUID) angzarr_client.builder.CommandBuilder¶
Start building a command for an existing aggregate.
- command_new(domain: str) angzarr_client.builder.CommandBuilder¶
Start building a command for a new aggregate.
Materializes a fresh UUID v4 client-side and passes it explicitly to CommandBuilder. Audit #67:
rootis always client-assigned, no path exists to skip it.
- class angzarr_client.DomainClient(channel: grpc.Channel, owns_channel: bool = True)¶
Combined client for command handler, query, and speculative operations on a single domain.
- command_handler¶
- query¶
- speculative¶
- classmethod connect(endpoint: str, retry: angzarr_client.retry.RetryPolicy | None = None) DomainClient¶
Connect to a domain’s coordinator at the given endpoint.
- Parameters:
endpoint – The gRPC endpoint (host:port or UDS path).
retry – Optional retry policy. Defaults to exponential backoff.
- classmethod from_channel(channel: grpc.Channel) DomainClient¶
Create a client from a caller-managed channel.
The returned client will not close the channel when close() is called; the caller retains ownership.
- classmethod from_clients(command_handler: CommandHandlerClient, query: QueryClient, speculative: SpeculativeClient) DomainClient¶
Compose a DomainClient from already-constructed wrapped clients.
Bypasses channel construction — the wrapped clients hold their own channels (or are duck-typed test doubles). The DomainClient does not own the channel;
close()is a no-op for the composed instance.Used by cucumber/unit tests to inject recording mocks without spinning up a gRPC server (PARITY_AUDIT.md plan item P1.12.e / finding #24).
- classmethod for_domain(domain: str, mode: TransportMode | None = None) DomainClient¶
Connect to a domain’s command handler coordinator.
Resolves the domain name to the appropriate endpoint based on transport mode.
- Parameters:
domain – Domain name (e.g., “player”, “table”)
mode – Transport mode (standalone=UDS, distributed=K8s DNS). If None, detected from ANGZARR_MODE env var.
- Returns:
DomainClient connected to the domain’s command handler coordinator.
Examples
# Auto-detect mode from ANGZARR_MODE env var player = DomainClient.for_domain(“player”)
# Explicitly use standalone mode (Unix Domain Sockets) player = DomainClient.for_domain(“player”, TransportMode.STANDALONE)
# Explicitly use distributed mode (K8s DNS) player = DomainClient.for_domain(“player”, TransportMode.DISTRIBUTED)
- classmethod from_env(env_var: str, default: str) DomainClient¶
Connect using an environment variable with fallback.
- execute(command: angzarr_client.proto.angzarr.CommandBook, timeout: float | None = None) angzarr_client.proto.angzarr.CommandResponse¶
Execute a command with default async mode (fire-and-forget).
- execute_with_mode(command: angzarr_client.proto.angzarr.CommandBook, sync_mode: angzarr_client.proto.angzarr.SyncMode, timeout: float | None = None) angzarr_client.proto.angzarr.CommandResponse¶
Execute a command with the specified sync mode.
- Parameters:
command – The command to execute.
sync_mode – Execution mode (ASYNC, SIMPLE, or CASCADE).
timeout – Optional per-call deadline in seconds.
- get_event_book(query: angzarr_client.proto.angzarr.Query, timeout: float | None = None) angzarr_client.proto.angzarr.EventBook¶
Retrieve a single EventBook for the query — unary RPC.
Delegates to the underlying
QueryClient. Mirrors Rust’sDomainClient::get_event_book.
- get_events(query: angzarr_client.proto.angzarr.Query, timeout: float | None = None) list[angzarr_client.proto.angzarr.EventBook]¶
Retrieve all matching EventBooks for the query — streaming RPC.
Delegates to the underlying
QueryClient. Mirrors Rust’sDomainClient::get_events.
- class angzarr_client.QueryClient(channel: grpc.Channel, owns_channel: bool = True)¶
Client for the EventQueryService.
- classmethod connect(endpoint: str, retry: angzarr_client.retry.RetryPolicy | None = None) QueryClient¶
Connect to an event query service at the given endpoint.
- Parameters:
endpoint – The gRPC endpoint (host:port or UDS path).
retry – Optional retry policy. Defaults to exponential backoff.
- classmethod from_channel(channel: grpc.Channel) QueryClient¶
Create a client from a caller-managed channel.
The returned client will not close the channel when close() is called; the caller retains ownership.
- classmethod from_env(env_var: str, default: str) QueryClient¶
Connect using an environment variable with fallback.
- classmethod from_stub(stub) QueryClient¶
Compose a QueryClient from a pre-built gRPC stub.
Bypasses channel construction so tests can inject a fake stub (see tests/_fakes.py::RecordingStub) without opening a socket. The returned client does not own a channel;
close()is a no-op (PARITY_AUDIT.md plan item P1.12.e / finding #24).
- get_event_book(query: angzarr_client.proto.angzarr.Query, timeout: float | None = None) angzarr_client.proto.angzarr.EventBook¶
Retrieve a single EventBook for the query.
- Parameters:
query – The query specification.
timeout – Optional per-call deadline in seconds.
Audit #69 stage (b): attaches
x-correlation-idmetadata fromquery.cover.correlation_idso OTel exporters / mesh sidecars can filter on it without decoding the body. Send-only.
- get_events(query: angzarr_client.proto.angzarr.Query, timeout: float | None = None) list[angzarr_client.proto.angzarr.EventBook]¶
Retrieve all EventBooks matching the query.
- Parameters:
query – The query specification.
timeout – Optional per-call deadline in seconds.
- query(domain: str, root: uuid.UUID) angzarr_client.builder.QueryBuilder¶
Start building a query for a specific aggregate.
- query_domain(domain: str) angzarr_client.builder.QueryBuilder¶
Start building a query by domain only (use with by_correlation_id).
- class angzarr_client.SpeculativeClient(channel: grpc.Channel, owns_channel: bool = True)¶
Client for speculative operations across coordinator services.
Speculative execution runs commands/events against temporal state without persistence. Each coordinator service now provides its own speculative method.
- classmethod connect(endpoint: str, retry: angzarr_client.retry.RetryPolicy | None = None) SpeculativeClient¶
Connect to coordinator services at the given endpoint.
- Parameters:
endpoint – The gRPC endpoint (host:port or UDS path).
retry – Optional retry policy. Defaults to exponential backoff.
- classmethod from_channel(channel: grpc.Channel) SpeculativeClient¶
Create a client from a caller-managed channel.
The returned client will not close the channel when close() is called; the caller retains ownership.
- classmethod from_env(env_var: str, default: str) SpeculativeClient¶
Connect using an environment variable with fallback.
- classmethod from_stubs(command_handler_stub, saga_stub, projector_stub, pm_stub) SpeculativeClient¶
Compose a SpeculativeClient from pre-built gRPC stubs.
Bypasses channel construction; the four stubs back the four speculative methods (command_handler, saga, projector, process_manager). Test-only seam — see tests/_fakes.py::RecordingStub. P1.12.e / finding #24.
- command_handler(request: angzarr_client.proto.angzarr.SpeculateCommandHandlerRequest, timeout: float | None = None) angzarr_client.proto.angzarr.CommandResponse¶
Execute a command speculatively against temporal state.
Audit #69 stage (b): canonical correlation_id at
request.command.cover.correlation_id.
- projector(request: angzarr_client.proto.angzarr.SpeculateProjectorRequest, timeout: float | None = None) angzarr_client.proto.angzarr.Projection¶
Speculatively execute a projector against events.
Audit #69 stage (b): canonical correlation_id at
request.events.cover.correlation_id.
- saga(request: angzarr_client.proto.angzarr.SpeculateSagaRequest, timeout: float | None = None) angzarr_client.proto.angzarr.SagaResponse¶
Speculatively execute a saga against events.
Audit #69 stage (b): canonical correlation_id at
request.request.source.cover.correlation_id.
- class angzarr_client.TransportMode(*args, **kwds)¶
Bases:
enum.EnumTransport mode for gRPC connections.
- STANDALONE = 'standalone'¶
- DISTRIBUTED = 'distributed'¶
- angzarr_client.resolve_ch_endpoint(domain: str, mode: TransportMode | None = None, *, uds_base: str = '/tmp/angzarr', namespace: str = 'angzarr', port: int = 1310) str¶
Resolve domain to command handler coordinator endpoint.
- Parameters:
domain – The domain name (e.g., “player”, “table”, “hand”)
mode – Transport mode. If None, detected from ANGZARR_MODE env var.
uds_base – Base path for Unix Domain Sockets (standalone mode)
namespace – Kubernetes namespace (distributed mode)
port – gRPC port (distributed mode)
- Returns:
Standalone: /tmp/angzarr/ch-player.sock
Distributed: ch-player.angzarr.svc:1310
- Return type:
Endpoint string suitable for _create_channel
- Environment Variables:
ANGZARR_MODE: “standalone” or “distributed” (default: “distributed”) ANGZARR_UDS_BASE: Override uds_base (default: /tmp/angzarr) ANGZARR_NAMESPACE: Override namespace (default: angzarr) ANGZARR_CH_PORT: Override port (default: 1310)
- class angzarr_client.CompensationContext¶
Extracted context from a rejection Notification.
Provides easy access to compensation-relevant fields. Source aggregate info is extracted from the rejected command’s angzarr_deferred header.
- rejected_command: angzarr_client.proto.angzarr.types_pb2.CommandBook | None¶
The command that was rejected (if available).
- source_aggregate: angzarr_client.proto.angzarr.types_pb2.Cover | None¶
Cover of the aggregate that triggered the flow.
- classmethod from_notification(notification: angzarr_client.proto.angzarr.types_pb2.Notification) CompensationContext¶
Extract compensation context from a Notification.
Source aggregate info is extracted from the rejected command’s angzarr_deferred header, which is always set by the framework for saga/PM-produced commands.
- Parameters:
notification – The notification containing RejectionNotification payload.
- Returns:
CompensationContext with extracted fields.
- class angzarr_client.DelegationOptions¶
Options struct for
delegate_to_framework/pm_delegate_to_framework.Audit #65 / #66: collapses the previously-divergent function shapes (Rust two-function split, Python kwargs) into a single shared options type. Cross-language symmetric — Python
DelegationOptionsmirrors Rustcompensation::DelegationOptionsfield-for-field with the same defaults.Field defaults match the previous Python kwargs and Rust’s basic
delegate_to_framework(which hardcodedemit_system_event = trueand the rest false).
- class angzarr_client.PMRevocationResponse¶
Result from PM compensation helpers.
Named type matching Go’s PMRevocationResponse, replacing raw tuples for better discoverability and documentation.
Audit finding #70:
revocationis required (no default), matching Rust’scompensation.rs:188-194revocation: RevocationResponsefield. The factory helperspm_delegate_to_frameworkandpm_emit_compensation_eventsalways construct one, so no caller is broken; direct user-code construction without it raisesTypeErrorat construction time, surfacing the missing field loudly. Symmetric to audit-#56 (tighten ambiguous Optionals when the framework always populates them).- revocation: angzarr_client.proto.angzarr.command_handler_pb2.RevocationResponse¶
Framework action flags. Required.
- angzarr_client.delegate_to_framework(reason: str, options: DelegationOptions | None = None) angzarr_client.proto.angzarr.command_handler_pb2.BusinessResponse¶
Create a response that delegates compensation to the framework.
Use when the aggregate doesn’t have custom compensation logic for a saga. The framework will emit a SagaCompensationFailed event to the fallback domain.
- Parameters:
reason – Human-readable explanation for the delegation.
options – Optional
DelegationOptionsstruct overriding the framework defaults (emit_system_event=Trueand the restFalse). Audit #65: replaces the previous keyword arguments for cross-language symmetry with Rust.
- Returns:
BusinessResponse with revocation flags.
- angzarr_client.emit_compensation_events(event_book: angzarr_client.proto.angzarr.types_pb2.EventBook) angzarr_client.proto.angzarr.command_handler_pb2.BusinessResponse¶
Create a response containing compensation events.
Use when the aggregate emits events to record compensation. The framework will persist these events and NOT emit a system event.
- Parameters:
event_book – EventBook containing compensation events.
- Returns:
BusinessResponse with events.
- angzarr_client.is_notification(type_url: str) bool¶
Check if a type URL refers to a rejection Notification.
Cross-language alias for Rust’s is_notification(type_url). Useful for dispatch code that needs to branch on “is this a notification?” without unpacking the Any payload.
- angzarr_client.pm_delegate_to_framework(reason: str, options: DelegationOptions | None = None) PMRevocationResponse¶
Create a PM response that delegates compensation to the framework.
Use when the PM doesn’t have custom compensation logic.
- Parameters:
reason – Human-readable explanation for the delegation.
options – Optional
DelegationOptionsstruct. Audit #66: replaces the previousemit_system_eventkwarg with the shared options struct (same shape asdelegate_to_framework()).
- Returns:
PMRevocationResponse with no process events, delegate to framework.
- angzarr_client.pm_emit_compensation_events(process_events: angzarr_client.proto.angzarr.types_pb2.EventBook, also_emit_system_event: bool = False, reason: str = '') PMRevocationResponse¶
Create a PM response containing compensation events.
Use when the PM emits events to record the compensation in its state.
- Parameters:
process_events – EventBook containing PM compensation events.
also_emit_system_event – Also emit SagaCompensationFailed.
reason – Reason for system event (if emitting).
- Returns:
PMRevocationResponse with events and revocation flags.
- class angzarr_client.Destinations(sequences: dict[str, int])¶
Context for saga/PM handlers providing access to destination sequences.
Sagas and PMs receive destination sequences (not full EventBooks) from the framework. This class provides a stamp_command() helper to set the correct sequence number on commands before they’re sent.
Why sequences only?¶
Sagas and PMs are translators/coordinators - they should NOT make business decisions based on destination state. If you need external information: 1. Inject it as a fact 2. Let aggregates decide (they validate commands) 3. Use sync mode for immediate feedback on command results
- sequences¶
Map from domain name to next sequence number.
Create a Destinations context from a sequence map.
- param sequences:
Dict mapping domain name to next sequence number. Typically from proto’s destination_sequences field.
- classmethod from_proto(destination_sequences: dict[str, int]) Destinations¶
Create from proto’s destination_sequences map.
- Parameters:
destination_sequences – Map from ProcessManagerHandleRequest or SagaHandleRequest proto.
- Returns:
Destinations instance with the sequences.
- sequence_for(domain: str) int | None¶
Get the next sequence number for a destination domain.
- Parameters:
domain – The target domain name.
- Returns:
The next sequence number, or None if domain not found.
- stamp_command(cmd: angzarr_client.proto.angzarr.types_pb2.CommandBook, domain: str) angzarr_client.proto.angzarr.types_pb2.CommandBook¶
Stamp a command with the correct sequence for its destination domain.
Modifies the command’s page headers in-place with the sequence number.
- Parameters:
cmd – The CommandBook to stamp.
domain – The target domain (must be in destination_sequences).
- Returns:
The same CommandBook (for chaining).
- Raises:
InvalidArgumentError – If domain is not in destination_sequences. Carries
code=MISSING_DESTINATION_SEQUENCEanddetails["domain"]=<domain>for cucumber assertions. Check youroutput_domainsconfig when you see this. Audit #64.
- static deferred_header(source_cover, source_seq: int) angzarr_client.proto.angzarr.types_pb2.PageHeader¶
Build a PageHeader carrying an
AngzarrDeferredSequence.Accepts either a
Coverwrapper (the canonical post-B2 form passed to handlers by the framework) or a rawCoverproto. Internally the proto is what gets copied into theAngzarrDeferredSequencepayload.Use this on saga-produced commands so the framework can dedupe on
(source.root, source_seq, target.root). AMQP at-least-once redelivery of the trigger event then becomes a no-op at the destination aggregate’s pipeline (cached events returned without re-invoking business logic), instead of relying on a business guard that surfaces as an idempotent-failure-shaped retry storm.
- exception angzarr_client.ClientError(message: str, cause: Exception | None = None, *, code: str = '', details: collections.abc.Mapping[str, Any] | None = None)¶
Bases:
ExceptionBase class for client errors.
Predicate methods (
is_not_found,is_precondition_failed,is_invalid_argument,is_connection_error) returnFalsehere and are overridden by subclasses. This lets callers writeif err.is_not_found(): ...without casting.- Audit finding #59 fields:
message— static human-readable stringcode— SCREAMING_SNAKE stable identifierdetails— runtime context asdict[str, str]
Initialize self. See help(type(self)) for accurate signature.
- message¶
- code = ''¶
- cause = None¶
- exception angzarr_client.CommandRejectedError(message: str, status_code: str = 'FAILED_PRECONDITION', *, code: str = '', details: collections.abc.Mapping[str, Any] | None = None, cover: Any | None = None)¶
Bases:
ClientErrorCommand was rejected due to business rule violation.
- Status codes and retry semantics:
FAILED_PRECONDITION: state-based rejection; retryable after refreshing state.
INVALID_ARGUMENT: bad input; not retryable.
NOT_FOUND: aggregate does not exist; not retryable — refetching won’t help.
Audit finding #59: callers pass a static
message, a SCREAMING_SNAKEcode, and structureddetailskwargs. The factory methods (precondition_failed/invalid_argument/not_found) bind the appropriatestatus_code.The
coverattribute is the addressing envelope (domain,root,correlation_id,edition) of the command that produced this rejection. Handlers do not populate it; the router stamps it from the incomingCommandRequestat the dispatch boundary so every rejection is traceable to its originating workflow without each call site having to thread the context.Initialize self. See help(type(self)) for accurate signature.
- status_code = 'FAILED_PRECONDITION'¶
- cover = None¶
- static precondition_failed(code: str, message: str, details: collections.abc.Mapping[str, Any] | None = None) CommandRejectedError¶
Create a FAILED_PRECONDITION error for guard failures.
- Parameters:
code – SCREAMING_SNAKE stable identifier (use a constant from
angzarr_client.error_codes.codes).message – Static human-readable message (no interpolation; use a constant from
angzarr_client.error_codes.messages).details – Structured runtime context — keys should be from
angzarr_client.error_codes.keys.
- static invalid_argument(code: str, message: str, details: collections.abc.Mapping[str, Any] | None = None) CommandRejectedError¶
Create an INVALID_ARGUMENT error for input validation failures.
- static not_found(code: str, message: str, details: collections.abc.Mapping[str, Any] | None = None) CommandRejectedError¶
Create a NOT_FOUND error for missing-aggregate failures.
- exception angzarr_client.ConnectionError(message: str = 'connection failed', *, code: str = 'CONNECTION_FAILED', details: collections.abc.Mapping[str, Any] | None = None)¶
Bases:
ClientErrorFailed to establish connection to the server.
Initialize self. See help(type(self)) for accurate signature.
- exception angzarr_client.GRPCError(cause: grpc.RpcError, *, code: str = codes.GRPC_ERROR, details: collections.abc.Mapping[str, Any] | None = None)¶
Bases:
ClientErrorgRPC error from the server.
Initialize self. See help(type(self)) for accurate signature.
- property grpc_code: grpc.StatusCode¶
Return the gRPC status code.
- status() grpc.RpcError¶
Return the underlying gRPC RpcError (status).
- exception angzarr_client.InvalidArgumentError(message: str, *, code: str = 'INVALID_ARGUMENT', details: collections.abc.Mapping[str, Any] | None = None)¶
Bases:
ClientErrorInvalid argument provided by caller.
Initialize self. See help(type(self)) for accurate signature.
- exception angzarr_client.InvalidTimestampError(message: str = 'invalid timestamp', *, code: str = 'INVALID_TIMESTAMP', details: collections.abc.Mapping[str, Any] | None = None)¶
Bases:
ClientErrorFailed to parse timestamp.
Initialize self. See help(type(self)) for accurate signature.
- exception angzarr_client.TransportError(cause: Exception, *, code: str = codes.TRANSPORT_ERROR, details: collections.abc.Mapping[str, Any] | None = None)¶
Bases:
ClientErrorTransport-level error.
Initialize self. See help(type(self)) for accurate signature.
- angzarr_client.CORRELATION_ID_HEADER = 'x-correlation-id'¶
- angzarr_client.DEFAULT_EDITION = ''¶
- angzarr_client.META_ANGZARR_DOMAIN = '_angzarr'¶
- angzarr_client.PROJECTION_DOMAIN_PREFIX = '_projection'¶
- angzarr_client.PROJECTION_TYPE_URL = 'angzarr_client.proto.angzarr.Projection'¶
- angzarr_client.TYPE_URL_PREFIX = 'type.googleapis.com/'¶
- angzarr_client.UNKNOWN_DOMAIN = 'unknown'¶
- angzarr_client.WILDCARD_DOMAIN = '*'¶
Build gRPC metadata carrying the
x-correlation-idheader.Returns the list shape grpcio expects on stub calls. Empty correlation IDs produce an empty list — the header is skipped, never sent as a blank value. Audit #69.
- angzarr_client.destination_map(destinations: list[angzarr_client.proto.angzarr.EventBook]) dict[str, angzarr_client.proto.angzarr.EventBook]¶
Build a map from root UUID hex to EventBook for destination lookup.
Used in multi-destination sagas to look up the correct EventBook by aggregate root when setting command sequences. Entries without a root are skipped.
- angzarr_client.full_type_url¶
- angzarr_client.full_type_url_for(msg_class: type[google.protobuf.message.Message]) str¶
Get the full type URL for a message class.
- angzarr_client.implicit_edition(name: str) angzarr_client.proto.angzarr.Edition¶
Create an edition with the given name but no divergences.
- angzarr_client.now() google.protobuf.timestamp_pb2.Timestamp¶
Return the current time as a protobuf Timestamp.
- angzarr_client.parse_timestamp(rfc3339: str) google.protobuf.timestamp_pb2.Timestamp¶
Parse an RFC3339 timestamp string.
- angzarr_client.proto_to_uuid(u: angzarr_client.proto.angzarr.UUID) uuid.UUID¶
Convert a proto UUID to Python UUID.
- angzarr_client.proto_uuid_to_hex(u: angzarr_client.proto.angzarr.UUID | None) str¶
Convert a proto UUID to hex string format.
- angzarr_client.type_name_from_url(type_url_str: str) str¶
Extract the wire-format type name from a type URL.
Returns the part after the last
/. Inputs with no slash are returned unchanged.
- angzarr_client.type_url(type_name: str) str¶
Construct a full type URL from a fully-qualified message type name.
- angzarr_client.type_url_matches(type_url_str: str, type_name: str) bool¶
Check if a type URL matches the given fully-qualified type name.
- angzarr_client.type_url_matches_exact¶
- angzarr_client.uuid_to_proto(u: uuid.UUID) angzarr_client.proto.angzarr.UUID¶
Convert a Python UUID to a proto UUID.
- angzarr_client.INVENTORY_PRODUCT_NAMESPACE¶
- angzarr_client.cart_root(customer_id: str) uuid.UUID¶
Compute a deterministic root UUID for a cart aggregate.
- angzarr_client.compute_root(domain: str, business_key: str) uuid.UUID¶
Compute a deterministic root UUID from domain and business key.
The UUID is derived from: hash(“angzarr” + domain + business_key) using the OID namespace, matching the Rust compute_root function.
- angzarr_client.customer_root(email: str) uuid.UUID¶
Compute a deterministic root UUID for a customer aggregate.
- angzarr_client.fulfillment_root(order_id: str) uuid.UUID¶
Compute a deterministic root UUID for a fulfillment aggregate.
- angzarr_client.inventory_product_root(product_id: str) uuid.UUID¶
Generate a deterministic UUID for an inventory product aggregate.
Uses UUID v5 with INVENTORY_PRODUCT_NAMESPACE to ensure the same product_id always maps to the same inventory aggregate root.
- angzarr_client.inventory_root(product_id: str) uuid.UUID¶
Compute a deterministic root UUID for an inventory aggregate.
- angzarr_client.order_root(order_id: str) uuid.UUID¶
Compute a deterministic root UUID for an order aggregate.
- angzarr_client.product_root(sku: str) uuid.UUID¶
Compute a deterministic root UUID for a product aggregate.
- angzarr_client.to_proto_bytes(id: uuid.UUID) bytes¶
Convert a uuid.UUID to 16-byte proto representation.
- class angzarr_client.ExponentialBackoffRetry(min_delay: float = 0.1, max_delay: float = 5.0, max_attempts: int = 10, jitter: bool = True, on_retry: Callable[[int, Exception], None] | None = None)¶
Bases:
RetryPolicyRetries with exponential backoff and optional jitter.
- min_delay = 0.1¶
- max_delay = 5.0¶
- max_attempts = 10¶
- jitter = True¶
- on_retry = None¶
- angzarr_client.default_retry_policy() RetryPolicy¶
Return the standard retry policy matching Rust’s backoff config.
- exception angzarr_client.BuildError(message: str, *, code: str = '', details: collections.abc.Mapping[str, Any] | None = None)¶
Bases:
ExceptionRaised when the builder cannot produce a valid runtime router.
Audit #72: carries the structural error shape from audit #59 — SCREAMING_SNAKE
code(programmatic dispatch / cucumber assertions), staticmessage(cross-language equality), anddetailsfor runtime context (router name, conflicting kinds, duplicate (domain, type_url), handler class, field name, etc.). Mirrors Rustrouter::handler::BuildErrorwhich carries anErrorDetailper variant.Construction sites pass constants from
angzarr_client.error_codes:codes.DUPLICATE_COMMAND_HANDLERetc. for the code,messages.DUPLICATE_COMMAND_HANDLERetc. for the static message, detail keys fromkeysfor the details map.Initialize self. See help(type(self)) for accurate signature.
- message¶
- code = ''¶
- class angzarr_client.CommandHandlerRouter(factories: list[Factory])¶
Bases:
_BuiltRouterBase,Generic[S]Runtime router dispatching commands to registered @command_handler instances.
- property name: str¶
Domain this router serves (read from the first registered handler’s
@command_handler(domain=...)metadata).@command_handlerhas nonameattribute, so the domain is the natural identifier. Mirrors Rust’sCommandHandlerRouter::name()(runtime.rs:625-633).
- dispatch(request)¶
Route a ContextualCommand to the matching @handles method.
- supports_handle_fact() bool¶
Audit #45: True if any registered handler declares at least one
@handles_factmethod. The gRPC adapter uses this as the gate for theHandleFactRPC — False → returnUNIMPLEMENTEDwithout entering dispatch.Implemented as pure metadata read on the registered classes; no factory invocation, no per-request work.
- supports_replay() bool¶
Audit #45: True if any registered handler opted in via
@command_handler(supports_replay=True). The gRPC adapter uses this as the gate for theReplayRPC.Replay is a generic state-rebuild operation — the framework implements it from the existing
@appliesmachinery, so the opt-in is just an explicit acknowledgement that the aggregate’s state type is proto-serializable (replay round-trips state viaAny).
- dispatch_fact(request)¶
Route a FactRequest to matching @handles_fact methods.
Caller is responsible for gating via
supports_handle_fact()before invoking this — the gRPC adapter does so.
- dispatch_replay(request)¶
Replay events on top of a base snapshot to compute final state.
Caller is responsible for gating via
supports_replay()before invoking this — the gRPC adapter does so.
- exception angzarr_client.DispatchError(code: grpc.StatusCode, details: str, *, error_code: str = '', extras: dict[str, Any] | None = None)¶
Bases:
grpc.RpcErrorgRPC-compatible dispatch error carrying a StatusCode.
Raised when a command cannot be routed (unknown type, unknown domain), when a request is malformed (missing command/page/cover), or for any other caller-facing violation. The gRPC servicer can read
.code()and forward the status to the client.Audit finding #59:
messageis a static string (no interpolation); runtime context like type URLs, domains, etc. ride inextras. The SCREAMING_SNAKEerror_codeis a stable cross-language identifier suitable for cucumber assertions.Initialize self. See help(type(self)) for accurate signature.
- error_code = ''¶
- code() grpc.StatusCode¶
- class angzarr_client.ProcessManagerResponse(commands: list[angzarr_client.proto.angzarr.types_pb2.CommandBook] | None = None, process_events: list[angzarr_client.proto.angzarr.types_pb2.EventBook] | None = None, facts: list[angzarr_client.proto.angzarr.types_pb2.EventBook] | None = None)¶
Return value of a process-manager
@handlesmethod.commands: commands forwarded to target domainsprocess_events: list of EventBooks recorded into the PM’s own event stream (state). Multiple books concatenate into the wire’s singleProcessManagerHandleResponse.process_events(first non-empty book’s cover wins).facts: facts injected into other aggregates without a command
Audit finding #56 (Option B — list[EventBook]): aligns with Rust’s
Vec<EventBook>and matches the existingcommands/factslist shapes for symmetry.- commands = []¶
- process_events = []¶
- facts = []¶
- class angzarr_client.ProcessManagerRouter(factories: list[Factory])¶
Bases:
_BuiltRouterBase,Generic[S]Runtime router dispatching events to registered @process_manager instances.
- dispatch(request)¶
Route a ProcessManagerHandleRequest to matching handlers.
- output_domains() list[str]¶
Domains this router emits commands to.
Read from each registered handler class’s
__angzarr_meta__. Default is empty — only sagas (target) and process managers (targets) override.
- sync_output_domains() list[str]¶
Audit #74: subset of
output_domains()that the registered handlers ever address with sync mode (SIMPLE / CASCADE / DECISION / ISOLATED). Drives readiness probing — only sync targets need their coordinator reachable for traffic to be safe. Default empty; saga / PM routers override.
- has_async_outputs() bool¶
Audit #74:
Trueif any registered handler emits to at least one async target — i.e. ever publishes through the async bus rather than synchronously calling a downstream coordinator. Drives whether the readiness supervisor includes aBusProbe. DefaultFalse(CH / Projector / Upcaster never cross-emit).
- class angzarr_client.ProjectorRouter(factories: list[Factory])¶
Bases:
_BuiltRouterBaseRuntime router fanning events out to registered @projector instances.
- dispatch(events)¶
Fan out each event in the book to matching @handles methods.
- class angzarr_client.RejectionHandlerResponse¶
Return shape of a saga
@rejectedhandler method.Carries compensation events to persist locally and an optional
Notificationto forward upstream. Mirrors Rust’srouter::responses::RejectionHandlerResponsefield-for-field.Audit finding #56 (Option B — list[EventBook]): aligns with Rust’s
Vec<EventBook>. Multiple books concatenate downstream; first non-empty book’s cover wins.
- class angzarr_client.Router(name: str)¶
Fluent builder that collects handler factories for the unified router.
Usage:
Router("agg-service") .with_handler(Player, lambda: Player(db_pool)) .with_handler(Hand, lambda: Hand(rng)) .build()
.build()returns a typed runtime router (CommandHandlerRouter,SagaRouter,ProcessManagerRouter, orProjectorRouter). Mixed handler kinds are rejected at build time (R4).Handlers are registered as
(cls, factory)pairs so the router can invokefactory()per dispatch call to obtain a fresh (or pooled) instance. This keeps handler state isolated per request and makes the router safe to share across threads.- name¶
- with_handler(cls: type, factory: Callable[[], Any]) Router¶
Register a handler class together with a zero-arg factory.
clsmust carry one of the five kind decorators (@command_handler,@saga,@process_manager,@projector,@upcaster). Registering an undecorated class raisesBuildError.factoryis anyCallable[[], Any]: a lambda, a bound method on a DI container, afunctools.partial, a pool-checkout closure, or a callable returning a shared singleton. The router never inspects the factory body; it only calls it and uses whatever it returns.Factories are invoked inside
dispatch(), so their latency is on the request path. For handlers whose construction is expensive — opens a DB connection, reads a config file, performs I/O, does non-trivial computation, holds resources that must be released — prefer a pool checkout or close over a pre-built instance rather than constructing fresh on every call.
- build() Any¶
Produce a typed runtime router.
Empty →
BuildError. Mixed kinds →BuildError. Homogeneous →CommandHandlerRouter/SagaRouter/ProcessManagerRouter/ProjectorRouterper the shared kind.
- class angzarr_client.SagaHandlerResponse(commands: list[angzarr_client.proto.angzarr.types_pb2.CommandBook] | None = None, events: list[angzarr_client.proto.angzarr.types_pb2.EventBook] | None = None)¶
Return value of a saga
@handlesmethod.Carries commands to forward to target domains and/or facts (events) to inject into other aggregates. Both are optional.
- commands = []¶
- events = []¶
- class angzarr_client.SagaRouter(factories: list[Factory])¶
Bases:
_BuiltRouterBaseRuntime router dispatching events to registered @saga instances.
- dispatch(request)¶
Route a SagaHandleRequest to all matching @handles methods.
- output_domains() list[str]¶
Domains this router emits commands to.
Read from each registered handler class’s
__angzarr_meta__. Default is empty — only sagas (target) and process managers (targets) override.
- sync_output_domains() list[str]¶
Audit #74: subset of
output_domains()that the registered handlers ever address with sync mode (SIMPLE / CASCADE / DECISION / ISOLATED). Drives readiness probing — only sync targets need their coordinator reachable for traffic to be safe. Default empty; saga / PM routers override.
- has_async_outputs() bool¶
Audit #74:
Trueif any registered handler emits to at least one async target — i.e. ever publishes through the async bus rather than synchronously calling a downstream coordinator. Drives whether the readiness supervisor includes aBusProbe. DefaultFalse(CH / Projector / Upcaster never cross-emit).
- class angzarr_client.UpcasterRouter(factories: list[Factory])¶
Bases:
_BuiltRouterBaseRuntime router transforming events through registered @upcaster instances.
- dispatch(request)¶
Transform each event in
request.eventsvia matching @upcasts methods.
- angzarr_client.applies(event_type: type) Callable[[F], F]¶
Register a method as a state applier for
event_type.Appliers are invoked during state rebuild, walking the prior event book and mutating the instance’s state in place.
- angzarr_client.command_handler(*, domain: str, state: type, supports_replay: bool = False) Callable[[T], T]¶
Mark a class as a command handler (aggregate) for
domain.The
statetype is the aggregate’s state type; the class must either provide a@state_factorymethod or rely onstate()as the default factory (enforced in a later round).supports_replay(defaultFalse) opts the aggregate into the coordinator’sReplayRPC, used forMERGE_COMMUTATIVEconflict detection. WhenTrue, the framework auto-implements replay using the@appliesmethods to rebuild state from a snapshot + events; the state type must be a proto Message (carry aDESCRIPTOR). WhenFalsethe gRPC adapter returnsUNIMPLEMENTEDforReplayrequests — the coordinator degrades toMERGE_STRICTsemantics. Audit #45.Fact handling is opt-in via the
@handles_fact(EventType)method decorator. Aggregates with no@handles_factmethods getUNIMPLEMENTEDfrom the gRPC adapter forHandleFact; the coordinator falls back to pass-through-persist (per the proto’s Optional contract).
- angzarr_client.handles(message_type: type) Callable[[F], F]¶
Register a method as a dispatch target for
message_type.For command handlers this is the command type; for sagas / process managers / projectors it is the event type. Dispatch routes by proto full-name match.
- angzarr_client.handles_fact(event_type: type) Callable[[F], F]¶
Register a method as a fact-event handler for
event_type.Audit #45. Triggered when the coordinator dispatches a fact (an external reality, e.g. a payment confirmation from a third party) via the
HandleFactRPC. The method receives(event, state)after state has been rebuilt from prior events; it returns the events to persist (orNonefor pure side-effects).Aggregates with at least one
@handles_factmethod opt into theHandleFactRPC. Aggregates with none getUNIMPLEMENTEDfrom the framework’s gRPC adapter — the coordinator then falls back to pass-through-persist per the proto’s Optional contract.Distinct from
@handles(which dispatches commands and returns events),@handles_factis fact-specific: facts cannot be rejected, and the method’s return shape is just events-to-persist.
- angzarr_client.process_manager(*, name: str, pm_domain: str, sources: list[str], targets: list[str], state: type, sync_targets: list[str] | None = None) Callable[[T], T]¶
Mark a class as a process manager.
pm_domainis the PM’s own state-storage domain;sourceslists incoming event domains;targetslists downstream command domains.Audit #74:
sync_targetsdeclares the subset oftargetsthe PM ever addresses with sync mode (SIMPLE / CASCADE / DECISION / ISOLATED) — those are the targets whose coordinator must be reachable for readiness. DefaultNone(no sync targets) — every command goes through the async bus and the readiness supervisor will not probe any target’s coordinator.Raises
ValueErrorifsync_targetscontains a domain that is not intargets.
- angzarr_client.projector(*, name: str, domains: list[str]) Callable[[T], T]¶
Mark a class as a projector consuming events from
domains.
- angzarr_client.rejected(source_domain: str, command: str) Callable[[F], F]¶
Register a method as a compensation handler for command rejections.
Triggered when a command originating from this component is rejected by the target aggregate.
source_domainandcommandidentify the rejected command’s proto full-name suffix split into domain/command parts.
- angzarr_client.saga(*, name: str, source: str, target: str, sync: bool = False) Callable[[T], T]¶
Mark a class as a saga translating events from
sourceto commands fortarget.Audit #74:
syncdeclares whether commands emitted totargetever use sync mode (SIMPLE / CASCADE / DECISION / ISOLATED). DefaultFalse— target commands flow through the async bus and the readiness supervisor will not probetarget’s coordinator. Flip toTruewhen the saga blocks on the downstream response.
- angzarr_client.state_factory(fn: F) F¶
Mark a method as the state factory for this instance.
Overrides the default factory (calling
StateType()). Useful when state construction requires parameters or custom defaults.
- angzarr_client.upcaster(*, name: str, domain: str) Callable[[T], T]¶
Mark a class as an upcaster transforming events in
domain.Methods decorated with
@upcasts(FromType, ToType)declare individual version-to-version transformations. An upcaster with zero@upcastsmethods is allowed (passthrough).
- angzarr_client.upcasts(from_type: type, to_type: type) Callable[[F], F]¶
Register a method as a transformation from
from_typetoto_type.The method must accept the old event and return the new one. Dispatch matches by exact proto type-URL on the incoming event; events without a registered transform pass through unchanged.
- class angzarr_client.CommandHandlerGrpc(router: Any)¶
Bases:
angzarr_client.proto.angzarr.command_handler_pb2_grpc.CommandHandlerServiceServicergRPC adapter for a unified
CommandHandlerRouter.Audit #45: implements all three RPCs declared by the proto:
Handle(always functional),HandleFact(gated on whether the aggregate declared any@handles_factmethods), andReplay(gated on whether the aggregate opted in via@command_handler(supports_replay=True)).- async Handle(request: angzarr_client.proto.angzarr.types_pb2.ContextualCommand, context: grpc.aio.ServicerContext) angzarr_client.proto.angzarr.command_handler_pb2.BusinessResponse¶
Process command and return business response (events or revocation request)
- async HandleFact(request: angzarr_client.proto.angzarr.command_handler_pb2.FactRequest, context: grpc.aio.ServicerContext) angzarr_client.proto.angzarr.types_pb2.EventBook¶
Process fact events - update aggregate state based on external realities. Optional: if unimplemented, facts are persisted as-is (pass-through).
- async Replay(request: angzarr_client.proto.angzarr.command_handler_pb2.ReplayRequest, context: grpc.aio.ServicerContext) angzarr_client.proto.angzarr.command_handler_pb2.ReplayResponse¶
Replay events to compute state (for conflict detection) Optional: only needed if aggregate supports MERGE_COMMUTATIVE
- class angzarr_client.ProcessManagerGrpc(router: Any)¶
Bases:
angzarr_client.proto.angzarr.process_manager_pb2_grpc.ProcessManagerServiceServicergRPC adapter for a unified
ProcessManagerRouter.- async Handle(request: angzarr_client.proto.angzarr.process_manager_pb2.ProcessManagerHandleRequest, context: grpc.aio.ServicerContext) angzarr_client.proto.angzarr.process_manager_pb2.ProcessManagerHandleResponse¶
Handle with trigger + process state. Returns commands for other aggregates and events for the PM’s own domain.
PMs do not rebuild destination aggregate state — they translate events into commands/facts and rely on destination_sequences for command stamping. See process manager design philosophy.
- class angzarr_client.ProjectorGrpc(router: Any)¶
Bases:
angzarr_client.proto.angzarr.projector_pb2_grpc.ProjectorServiceServicergRPC adapter for a unified
ProjectorRouter.- async Handle(request: angzarr_client.proto.angzarr.types_pb2.EventBook, context: grpc.aio.ServicerContext) angzarr_client.proto.angzarr.types_pb2.Projection¶
Async projection - projector should persist and return
- async HandleSpeculative(request: angzarr_client.proto.angzarr.types_pb2.EventBook, context: grpc.aio.ServicerContext) angzarr_client.proto.angzarr.types_pb2.Projection¶
Speculative processing - projector must avoid external side effects
- class angzarr_client.SagaGrpc(router: Any)¶
Bases:
angzarr_client.proto.angzarr.saga_pb2_grpc.SagaServiceServicergRPC adapter for a unified
SagaRouter.- async Handle(request: angzarr_client.proto.angzarr.saga_pb2.SagaHandleRequest, context: grpc.aio.ServicerContext) angzarr_client.proto.angzarr.saga_pb2.SagaResponse¶
Translate source events into commands for target domains. Commands use angzarr_deferred — framework stamps explicit sequences on delivery.
- class angzarr_client.UpcasterGrpc(router: Any)¶
Bases:
angzarr_client.proto.angzarr.upcaster_pb2_grpc.UpcasterServiceServicergRPC adapter for a unified
UpcasterRouter.- async Upcast(request: angzarr_client.proto.angzarr.upcaster_pb2.UpcastRequest, context: grpc.aio.ServicerContext) angzarr_client.proto.angzarr.upcaster_pb2.UpcastResponse¶
Transform events to current version Returns events in same order, transformed where applicable
- angzarr_client.DEFAULT_BIND_HOST = '[::]'¶
- angzarr_client.ENV_BIND_ADDRESS = 'ANGZARR_BIND_ADDRESS'¶
- class angzarr_client.ServerConfig¶
Configuration for a gRPC server.
Cross-language alias for Rust’s ServerConfig { port, uds_path }.
- classmethod from_env(default_port: int = 50052) ServerConfig¶
- angzarr_client.configure_logging() None¶
Configure structlog with JSON rendering and ISO timestamps.
- angzarr_client.create_server(add_servicer_func: collections.abc.Callable, servicer: object, service_name: str = '') ServerHandle¶
Create an async gRPC server with health checking wired up.
Audit #68: health reporting starts at
NOT_SERVINGfor both the empty (overall) service name and any explicitservice_name; the readiness supervisor flips it once every probe passes. Pre-#68 Python setSERVINGimmediately, which made K8s readiness vacuously true.
- angzarr_client.get_transport_config() tuple[str, str]¶
Get transport configuration from environment.
- Returns:
Tuple of (transport_type, address) - For TCP: (“tcp”, “[::]:{port}”) — overridable via
ANGZARR_BIND_ADDRESSFor UDS: (“uds”, “unix://{socket_path}”)
- angzarr_client.resolve_bind_address(default_port: int = 50052) str¶
Compute the TCP bind address.
Audit #77: returns
ANGZARR_BIND_ADDRESSverbatim when set, otherwise composes[::]:{port}whereportcomes from thePORTenv var ordefault_port.
- angzarr_client.run_command_handler_server(router, domain: str = '', default_port: int = 50052) None¶
Run a command handler gRPC server.
- angzarr_client.run_process_manager_server(router, domain: str = '', default_port: int = 50052) None¶
Run a process manager gRPC server.
- angzarr_client.run_projector_server(router, domain: str = '', default_port: int = 50052) None¶
Run a projector gRPC server.
- angzarr_client.run_saga_server(router, domain: str = '', default_port: int = 50052) None¶
Run a saga gRPC server.
- angzarr_client.run_server(add_servicer_func: collections.abc.Callable, servicer: object, service_name: str = '', domain: str = '', default_port: str = '50052', sync_output_domains: list[str] | None = None, has_async_outputs: bool = False, logger=None) None¶
Run an async gRPC server until termination.
Sync entry point — runs the asyncio event loop internally so callers can keep using the same blocking shape they had before.
- angzarr_client.run_upcaster_server(router, domain: str = '', default_port: int = 50052) None¶
Run an upcaster gRPC server.
- angzarr_client.DEFAULT_TEST_NAMESPACE¶
- class angzarr_client.ScenarioContext¶
Shared context for BDD test scenarios.
Tracks the current aggregate, event history, command results, and rebuilt state across Given/When/Then steps.
- domain¶
Current aggregate domain being tested
- root¶
Current aggregate root as bytes
- events¶
List of packed events (ProtoAny) in history
- result¶
Last command handler result (event or tuple of events)
- error¶
Last CommandRejectedError if command was rejected
- state¶
Rebuilt aggregate state after applying events
Example
ctx = ScenarioContext() ctx.domain = “player” ctx.root = uuid_for(“player-alice”)
# Given player registered ctx.add_event(PlayerRegistered(email=”alice@test.com”))
# When deposit funds try:
ctx.result = handler.handle(deposit_cmd, ctx.event_book())
- except CommandRejectedError as e:
ctx.error = e
# Then balance updated assert ctx.result.new_balance == 100
- result: Any = None¶
- error: angzarr_client.errors.CommandRejectedError | None = None¶
- state: Any = None¶
- event_book() angzarr_client.proto.angzarr.EventBook¶
Build EventBook from accumulated events.
Creates an EventBook with proper sequencing from the events added via add_event().
- Returns:
EventBook with cover, pages, and next_sequence set
- add_event(event_msg)¶
Add an event to history.
Packs the event message (deriving the type URL from
event_msg.DESCRIPTOR.full_name) and appends to the event list.Audit finding #47: the previous
type_url_prefixarg is dropped — the canonical URL is derived from the message descriptor.- Parameters:
event_msg – The protobuf event message to add
- clear_events()¶
Clear all events from history.
- clear_result()¶
Clear the last result and error.
- reset()¶
Reset context to initial state.
- angzarr_client.make_command_book(cover: angzarr_client.proto.angzarr.Cover, command: google.protobuf.any_pb2.Any, sequence: int = 0) angzarr_client.proto.angzarr.CommandBook¶
Create a CommandBook with a single command.
- Parameters:
cover – The Cover identifying the target aggregate
command – The packed command (ProtoAny)
sequence – The command sequence number (defaults to 0)
- Returns:
CommandBook proto with one page
- angzarr_client.make_command_page(sequence: int, command: google.protobuf.any_pb2.Any) angzarr_client.proto.angzarr.CommandPage¶
Create a CommandPage.
- Parameters:
sequence – The command sequence number
command – The packed command (ProtoAny)
- Returns:
CommandPage proto
- angzarr_client.make_cover(domain: str, root: bytes, correlation_id: str = '') angzarr_client.proto.angzarr.Cover¶
Create a Cover from domain and root bytes.
- Parameters:
domain – The aggregate domain name
root – The aggregate root as 16 bytes
correlation_id – Optional correlation ID for cross-domain tracking
- Returns:
Cover proto with domain and root set
- angzarr_client.make_event_book(cover: angzarr_client.proto.angzarr.Cover, pages: list[angzarr_client.proto.angzarr.EventPage] = None, next_sequence: int = None) angzarr_client.proto.angzarr.EventBook¶
Create an EventBook.
- Parameters:
cover – The Cover identifying the aggregate
pages – List of EventPages (defaults to empty)
next_sequence – Next sequence number (defaults to len(pages))
- Returns:
EventBook proto
- angzarr_client.make_event_page(sequence: int, event: google.protobuf.any_pb2.Any) angzarr_client.proto.angzarr.EventPage¶
Create an EventPage.
- Parameters:
sequence – The event sequence number
event – The packed event (ProtoAny)
- Returns:
EventPage proto
- angzarr_client.make_timestamp() google.protobuf.timestamp_pb2.Timestamp¶
Create a timestamp for now.
Alias for angzarr_client.helpers.now() for backwards compatibility.
- Returns:
Current time as protobuf Timestamp
- angzarr_client.uuid_for(name: str, namespace: uuid.UUID = DEFAULT_TEST_NAMESPACE) bytes¶
Generate a deterministic 16-byte UUID from a name.
The same name always generates the same UUID within a namespace. Returns bytes suitable for use as aggregate root IDs.
- Parameters:
name – A string identifier (e.g., “player-alice”, “table-1”)
namespace – UUID namespace for generation (defaults to test namespace)
- Returns:
16-byte UUID as bytes
Example
root = uuid_for(“player-alice”) assert len(root) == 16 assert uuid_for(“player-alice”) == root # deterministic
- angzarr_client.uuid_obj_for(name: str, namespace: uuid.UUID = DEFAULT_TEST_NAMESPACE) uuid.UUID¶
Generate a deterministic UUID object from a name.
- Parameters:
name – A string identifier
namespace – UUID namespace for generation
- Returns:
UUID object
Example
id_obj = uuid_obj_for(“player-alice”) assert id_obj.bytes == uuid_for(“player-alice”)
- angzarr_client.uuid_str_for(name: str, namespace: uuid.UUID = DEFAULT_TEST_NAMESPACE) str¶
Generate a deterministic UUID string from a name.
- Parameters:
name – A string identifier
namespace – UUID namespace for generation
- Returns:
UUID as standard string format (8-4-4-4-12)
Example
id_str = uuid_str_for(“player-alice”) assert “-” in id_str # standard UUID format
- angzarr_client.testing_pack_event(msg: google.protobuf.message.Message) google.protobuf.any_pb2.Any¶
Pack a protobuf message into an Any with the canonical type URL.
The type URL is derived from the message’s descriptor (
msg.DESCRIPTOR.full_name) prefixed with the standardtype.googleapis.com/— per the google.protobuf.Any spec.Audit finding #47 (Option C — drop the second arg, derive name from the message): mirrors Rust’s
testing::builders::pack_event<M>(msg). Removes the previoustype_url_prefixparameter (which was a typo-prone footgun and diverged in semantics from the Rust 2nd-arg convention).- Returns:
ProtoAny containing the packed message.
- angzarr_client.require_exists(exists: bool, context: str) None¶
Require that an aggregate exists (caller-supplied predicate).
Audit #60: takes a precomputed predicate boolean (e.g.
state.is_some()/bool(state.id)) so the callsite is explicit about what “exists” means. Mirrors Rust’svalidation.rs:41require_exists(exists: bool, context: &str).Raises NOT_FOUND — not retryable, since refetching events cannot change the outcome. For empty-string checks, use
require_not_empty_str().
- angzarr_client.require_non_negative(value: _Numeric, field_name: str) None¶
Require that a value is zero or greater.
- angzarr_client.require_not_empty(items: collections.abc.Sequence[Any], field_name: str) None¶
Require that a sequence has at least one element.
- angzarr_client.require_not_empty_str(value: str, field_name: str) None¶
Require that a string is not empty.
- angzarr_client.require_not_exists(exists: bool, context: str) None¶
Require that an aggregate does NOT exist (caller-supplied predicate).
Audit #60: takes a precomputed predicate boolean. Mirrors Rust’s
validation.rs:54require_not_exists(exists: bool, context: &str).
- angzarr_client.require_positive(value: _Numeric, field_name: str) None¶
Require that a value is greater than zero.
- angzarr_client.require_status(actual: str, expected: str, context: str) None¶
Require that the current status matches the expected value.
- angzarr_client.require_status_not(actual: str, forbidden: str, context: str) None¶
Require that the current status is NOT the forbidden value.
- class angzarr_client.CommandBook(proto: angzarr_client.proto.angzarr.CommandBook)¶
Bases:
CoverBearerWrapper for the
CommandBookproto.- proto() angzarr_client.proto.angzarr.CommandBook¶
Return the wrapped proto message.
- cover() Cover¶
The wrapped cover (always present; default-instance if not set).
See
EventBook.cover()for the rationale.
- pages() list[CommandPage]¶
All command pages, wrapped.
- first_command() CommandPage | None¶
First command page, or None when empty.
- merge_strategy() angzarr_client.proto.angzarr.MergeStrategy¶
Merge strategy of the first page; defaults to commutative.
- class angzarr_client.CommandPage(proto: angzarr_client.proto.angzarr.CommandPage)¶
Bases:
WrappedWrapper for the
CommandPageproto.- proto() angzarr_client.proto.angzarr.CommandPage¶
Return the wrapped proto message.
- merge_strategy() angzarr_client.proto.angzarr.MergeStrategy¶
Per-page merge strategy.
- class angzarr_client.CommandResponse(proto: angzarr_client.proto.angzarr.CommandResponse)¶
Bases:
WrappedWrapper for the
CommandResponseproto.- proto() angzarr_client.proto.angzarr.CommandResponse¶
Return the wrapped proto message.
- class angzarr_client.Cover(proto: angzarr_client.proto.angzarr.Cover)¶
Bases:
CoverBearerWrapper for the
Coverproto.- proto() angzarr_client.proto.angzarr.Cover¶
Return the wrapped proto message.
- class angzarr_client.CoverBearer¶
Bases:
WrappedShared accessors for proto types that carry a
Coverfield.Subclasses set
self._protoand override_cover_proto()to return the embedded Cover (orNoneif missing). The default implementation assumesself._protois itself a Cover — onlyCoverrelies on the default; the others override.- domain() str¶
Get the domain, falling back to
UNKNOWN_DOMAIN.
- class angzarr_client.EventBook(proto: angzarr_client.proto.angzarr.EventBook)¶
Bases:
CoverBearerWrapper for the
EventBookproto.- proto() angzarr_client.proto.angzarr.EventBook¶
Return the wrapped proto message.
- cover() Cover¶
The wrapped cover.
Always returns a
Cover— when the underlying proto has no cover field set, the wrapper is built around the proto’s default-instance cover, so accessors like.domain()still work (returning the canonical empty responses, e.g.UNKNOWN_DOMAIN).
- class angzarr_client.EventPage(proto: angzarr_client.proto.angzarr.EventPage)¶
Bases:
WrappedWrapper for the
EventPageproto.- proto() angzarr_client.proto.angzarr.EventPage¶
Return the wrapped proto message.
- class angzarr_client.Query(proto: angzarr_client.proto.angzarr.Query)¶
Bases:
CoverBearerWrapper for the
Queryproto.- proto() angzarr_client.proto.angzarr.Query¶
Return the wrapped proto message.
- cover() Cover¶
The wrapped cover (always present; default-instance if not set).
See
EventBook.cover()for the rationale.
- class angzarr_client.Wrapped¶
Bases:
abc.ABCInterface every angzarr wrapper implements.
Wrappers expose two surfaces:
Method-style accessors for common needs (e.g.
Cover.domain()).The raw proto for everything else, via
proto().
Cross-language note:
proto()is the documented escape hatch for callers that want direct proto field access (e.g. for serialization, for fields that don’t have a wrapper accessor, for proto-specific methods likeHasField). Other languages will provide an equivalent method (e.g. JavaCover.proto()).- abstractmethod proto()¶
Return the wrapped proto message.