angzarr_client.router

Unified Router — public surface for handler registration and dispatch.

Submodules

Exceptions

BuildError

Raised when the builder cannot produce a valid runtime router.

DispatchError

gRPC-compatible dispatch error carrying a StatusCode.

Classes

Router

Fluent builder that collects handler factories for the unified router.

ProcessManagerResponse

Return value of a process-manager @handles method.

RejectionHandlerResponse

Return shape of a saga @rejected handler method.

SagaHandlerResponse

Return value of a saga @handles method.

CommandHandlerRouter

Runtime router dispatching commands to registered @command_handler instances.

ProcessManagerRouter

Runtime router dispatching events to registered @process_manager instances.

ProjectorRouter

Runtime router fanning events out to registered @projector instances.

SagaRouter

Runtime router dispatching events to registered @saga instances.

UpcasterRouter

Runtime router transforming events through registered @upcaster instances.

Functions

applies(→ Callable[[F], F])

Register a method as a state applier for event_type.

command_handler(→ Callable[[T], T])

Mark a class as a command handler (aggregate) for domain.

handles(→ Callable[[F], F])

Register a method as a dispatch target for message_type.

handles_fact(→ Callable[[F], F])

Register a method as a fact-event handler for event_type.

process_manager(→ Callable[[T], T])

Mark a class as a process manager.

projector(→ Callable[[T], T])

Mark a class as a projector consuming events from domains.

rejected(→ Callable[[F], F])

Register a method as a compensation handler for command rejections.

saga(→ Callable[[T], T])

Mark a class as a saga translating events from source to commands

state_factory(→ F)

Mark a method as the state factory for this instance.

upcaster(→ Callable[[T], T])

Mark a class as an upcaster transforming events in domain.

upcasts(→ Callable[[F], F])

Register a method as a transformation from from_type to to_type.

Package Contents

exception angzarr_client.router.BuildError(message: str, *, code: str = '', details: collections.abc.Mapping[str, Any] | None = None)

Bases: Exception

Raised when the builder cannot produce a valid runtime router.

Audit #72: carries the structural error shape from audit #59 — SCREAMING_SNAKE code (programmatic dispatch / cucumber assertions), static message (cross-language equality), and details for runtime context (router name, conflicting kinds, duplicate (domain, type_url), handler class, field name, etc.). Mirrors Rust router::handler::BuildError which carries an ErrorDetail per variant.

Construction sites pass constants from angzarr_client.error_codes: codes.DUPLICATE_COMMAND_HANDLER etc. for the code, messages.DUPLICATE_COMMAND_HANDLER etc. for the static message, detail keys from keys for the details map.

Initialize self. See help(type(self)) for accurate signature.

message
code = ''
details: dict[str, str]
class angzarr_client.router.Router(name: str)

Fluent builder that collects handler factories for the unified router.

Usage:

Router("agg-service")
    .with_handler(Player, lambda: Player(db_pool))
    .with_handler(Hand, lambda: Hand(rng))
    .build()

.build() returns a typed runtime router (CommandHandlerRouter, SagaRouter, ProcessManagerRouter, or ProjectorRouter). Mixed handler kinds are rejected at build time (R4).

Handlers are registered as (cls, factory) pairs so the router can invoke factory() per dispatch call to obtain a fresh (or pooled) instance. This keeps handler state isolated per request and makes the router safe to share across threads.

name
with_handler(cls: type, factory: Callable[[], Any]) Router

Register a handler class together with a zero-arg factory.

cls must carry one of the five kind decorators (@command_handler, @saga, @process_manager, @projector, @upcaster). Registering an undecorated class raises BuildError.

factory is any Callable[[], Any]: a lambda, a bound method on a DI container, a functools.partial, a pool-checkout closure, or a callable returning a shared singleton. The router never inspects the factory body; it only calls it and uses whatever it returns.

Factories are invoked inside dispatch(), so their latency is on the request path. For handlers whose construction is expensive — opens a DB connection, reads a config file, performs I/O, does non-trivial computation, holds resources that must be released — prefer a pool checkout or close over a pre-built instance rather than constructing fresh on every call.

build() Any

Produce a typed runtime router.

Empty → BuildError. Mixed kinds → BuildError. Homogeneous → CommandHandlerRouter / SagaRouter / ProcessManagerRouter / ProjectorRouter per the shared kind.

angzarr_client.router.applies(event_type: type) Callable[[F], F]

Register a method as a state applier for event_type.

Appliers are invoked during state rebuild, walking the prior event book and mutating the instance’s state in place.

angzarr_client.router.command_handler(*, domain: str, state: type, supports_replay: bool = False) Callable[[T], T]

Mark a class as a command handler (aggregate) for domain.

The state type is the aggregate’s state type; the class must either provide a @state_factory method or rely on state() as the default factory (enforced in a later round).

supports_replay (default False) opts the aggregate into the coordinator’s Replay RPC, used for MERGE_COMMUTATIVE conflict detection. When True, the framework auto-implements replay using the @applies methods to rebuild state from a snapshot + events; the state type must be a proto Message (carry a DESCRIPTOR). When False the gRPC adapter returns UNIMPLEMENTED for Replay requests — the coordinator degrades to MERGE_STRICT semantics. Audit #45.

Fact handling is opt-in via the @handles_fact(EventType) method decorator. Aggregates with no @handles_fact methods get UNIMPLEMENTED from the gRPC adapter for HandleFact; the coordinator falls back to pass-through-persist (per the proto’s Optional contract).

angzarr_client.router.handles(message_type: type) Callable[[F], F]

Register a method as a dispatch target for message_type.

For command handlers this is the command type; for sagas / process managers / projectors it is the event type. Dispatch routes by proto full-name match.

angzarr_client.router.handles_fact(event_type: type) Callable[[F], F]

Register a method as a fact-event handler for event_type.

Audit #45. Triggered when the coordinator dispatches a fact (an external reality, e.g. a payment confirmation from a third party) via the HandleFact RPC. The method receives (event, state) after state has been rebuilt from prior events; it returns the events to persist (or None for pure side-effects).

Aggregates with at least one @handles_fact method opt into the HandleFact RPC. Aggregates with none get UNIMPLEMENTED from the framework’s gRPC adapter — the coordinator then falls back to pass-through-persist per the proto’s Optional contract.

Distinct from @handles (which dispatches commands and returns events), @handles_fact is fact-specific: facts cannot be rejected, and the method’s return shape is just events-to-persist.

angzarr_client.router.process_manager(*, name: str, pm_domain: str, sources: list[str], targets: list[str], state: type, sync_targets: list[str] | None = None) Callable[[T], T]

Mark a class as a process manager.

pm_domain is the PM’s own state-storage domain; sources lists incoming event domains; targets lists downstream command domains.

Audit #74: sync_targets declares the subset of targets the PM ever addresses with sync mode (SIMPLE / CASCADE / DECISION / ISOLATED) — those are the targets whose coordinator must be reachable for readiness. Default None (no sync targets) — every command goes through the async bus and the readiness supervisor will not probe any target’s coordinator.

Raises ValueError if sync_targets contains a domain that is not in targets.

angzarr_client.router.projector(*, name: str, domains: list[str]) Callable[[T], T]

Mark a class as a projector consuming events from domains.

angzarr_client.router.rejected(source_domain: str, command: str) Callable[[F], F]

Register a method as a compensation handler for command rejections.

Triggered when a command originating from this component is rejected by the target aggregate. source_domain and command identify the rejected command’s proto full-name suffix split into domain/command parts.

angzarr_client.router.saga(*, name: str, source: str, target: str, sync: bool = False) Callable[[T], T]

Mark a class as a saga translating events from source to commands for target.

Audit #74: sync declares whether commands emitted to target ever use sync mode (SIMPLE / CASCADE / DECISION / ISOLATED). Default False — target commands flow through the async bus and the readiness supervisor will not probe target’s coordinator. Flip to True when the saga blocks on the downstream response.

angzarr_client.router.state_factory(fn: F) F

Mark a method as the state factory for this instance.

Overrides the default factory (calling StateType()). Useful when state construction requires parameters or custom defaults.

angzarr_client.router.upcaster(*, name: str, domain: str) Callable[[T], T]

Mark a class as an upcaster transforming events in domain.

Methods decorated with @upcasts(FromType, ToType) declare individual version-to-version transformations. An upcaster with zero @upcasts methods is allowed (passthrough).

angzarr_client.router.upcasts(from_type: type, to_type: type) Callable[[F], F]

Register a method as a transformation from from_type to to_type.

The method must accept the old event and return the new one. Dispatch matches by exact proto type-URL on the incoming event; events without a registered transform pass through unchanged.

exception angzarr_client.router.DispatchError(code: grpc.StatusCode, details: str, *, error_code: str = '', extras: dict[str, Any] | None = None)

Bases: grpc.RpcError

gRPC-compatible dispatch error carrying a StatusCode.

Raised when a command cannot be routed (unknown type, unknown domain), when a request is malformed (missing command/page/cover), or for any other caller-facing violation. The gRPC servicer can read .code() and forward the status to the client.

Audit finding #59: message is a static string (no interpolation); runtime context like type URLs, domains, etc. ride in extras. The SCREAMING_SNAKE error_code is a stable cross-language identifier suitable for cucumber assertions.

Initialize self. See help(type(self)) for accurate signature.

error_code = ''
extras: dict[str, str]
code() grpc.StatusCode
details() str
class angzarr_client.router.ProcessManagerResponse(commands: list[angzarr_client.proto.angzarr.types_pb2.CommandBook] | None = None, process_events: list[angzarr_client.proto.angzarr.types_pb2.EventBook] | None = None, facts: list[angzarr_client.proto.angzarr.types_pb2.EventBook] | None = None)

Return value of a process-manager @handles method.

  • commands: commands forwarded to target domains

  • process_events: list of EventBooks recorded into the PM’s own event stream (state). Multiple books concatenate into the wire’s single ProcessManagerHandleResponse.process_events (first non-empty book’s cover wins).

  • facts: facts injected into other aggregates without a command

Audit finding #56 (Option B — list[EventBook]): aligns with Rust’s Vec<EventBook> and matches the existing commands / facts list shapes for symmetry.

commands = []
process_events = []
facts = []
class angzarr_client.router.RejectionHandlerResponse

Return shape of a saga @rejected handler method.

Carries compensation events to persist locally and an optional Notification to forward upstream. Mirrors Rust’s router::responses::RejectionHandlerResponse field-for-field.

Audit finding #56 (Option B — list[EventBook]): aligns with Rust’s Vec<EventBook>. Multiple books concatenate downstream; first non-empty book’s cover wins.

events: list[angzarr_client.proto.angzarr.types_pb2.EventBook] = []

Events to persist to own state (compensation).

notification: angzarr_client.proto.angzarr.types_pb2.Notification | None = None

Notification to forward upstream (rejection propagation).

class angzarr_client.router.SagaHandlerResponse(commands: list[angzarr_client.proto.angzarr.types_pb2.CommandBook] | None = None, events: list[angzarr_client.proto.angzarr.types_pb2.EventBook] | None = None)

Return value of a saga @handles method.

Carries commands to forward to target domains and/or facts (events) to inject into other aggregates. Both are optional.

commands = []
events = []
class angzarr_client.router.CommandHandlerRouter(factories: list[Factory])

Bases: _BuiltRouterBase, Generic[S]

Runtime router dispatching commands to registered @command_handler instances.

property name: str

Domain this router serves (read from the first registered handler’s @command_handler(domain=...) metadata).

@command_handler has no name attribute, so the domain is the natural identifier. Mirrors Rust’s CommandHandlerRouter::name() (runtime.rs:625-633).

dispatch(request)

Route a ContextualCommand to the matching @handles method.

supports_handle_fact() bool

Audit #45: True if any registered handler declares at least one @handles_fact method. The gRPC adapter uses this as the gate for the HandleFact RPC — False → return UNIMPLEMENTED without entering dispatch.

Implemented as pure metadata read on the registered classes; no factory invocation, no per-request work.

supports_replay() bool

Audit #45: True if any registered handler opted in via @command_handler(supports_replay=True). The gRPC adapter uses this as the gate for the Replay RPC.

Replay is a generic state-rebuild operation — the framework implements it from the existing @applies machinery, so the opt-in is just an explicit acknowledgement that the aggregate’s state type is proto-serializable (replay round-trips state via Any).

dispatch_fact(request)

Route a FactRequest to matching @handles_fact methods.

Caller is responsible for gating via supports_handle_fact() before invoking this — the gRPC adapter does so.

dispatch_replay(request)

Replay events on top of a base snapshot to compute final state.

Caller is responsible for gating via supports_replay() before invoking this — the gRPC adapter does so.

class angzarr_client.router.ProcessManagerRouter(factories: list[Factory])

Bases: _BuiltRouterBase, Generic[S]

Runtime router dispatching events to registered @process_manager instances.

dispatch(request)

Route a ProcessManagerHandleRequest to matching handlers.

output_domains() list[str]

Domains this router emits commands to.

Read from each registered handler class’s __angzarr_meta__. Default is empty — only sagas (target) and process managers (targets) override.

sync_output_domains() list[str]

Audit #74: subset of output_domains() that the registered handlers ever address with sync mode (SIMPLE / CASCADE / DECISION / ISOLATED). Drives readiness probing — only sync targets need their coordinator reachable for traffic to be safe. Default empty; saga / PM routers override.

has_async_outputs() bool

Audit #74: True if any registered handler emits to at least one async target — i.e. ever publishes through the async bus rather than synchronously calling a downstream coordinator. Drives whether the readiness supervisor includes a BusProbe. Default False (CH / Projector / Upcaster never cross-emit).

class angzarr_client.router.ProjectorRouter(factories: list[Factory])

Bases: _BuiltRouterBase

Runtime router fanning events out to registered @projector instances.

dispatch(events)

Fan out each event in the book to matching @handles methods.

class angzarr_client.router.SagaRouter(factories: list[Factory])

Bases: _BuiltRouterBase

Runtime router dispatching events to registered @saga instances.

dispatch(request)

Route a SagaHandleRequest to all matching @handles methods.

output_domains() list[str]

Domains this router emits commands to.

Read from each registered handler class’s __angzarr_meta__. Default is empty — only sagas (target) and process managers (targets) override.

sync_output_domains() list[str]

Audit #74: subset of output_domains() that the registered handlers ever address with sync mode (SIMPLE / CASCADE / DECISION / ISOLATED). Drives readiness probing — only sync targets need their coordinator reachable for traffic to be safe. Default empty; saga / PM routers override.

has_async_outputs() bool

Audit #74: True if any registered handler emits to at least one async target — i.e. ever publishes through the async bus rather than synchronously calling a downstream coordinator. Drives whether the readiness supervisor includes a BusProbe. Default False (CH / Projector / Upcaster never cross-emit).

class angzarr_client.router.UpcasterRouter(factories: list[Factory])

Bases: _BuiltRouterBase

Runtime router transforming events through registered @upcaster instances.

dispatch(request)

Transform each event in request.events via matching @upcasts methods.