angzarr_client.router¶
Unified Router — public surface for handler registration and dispatch.
Submodules¶
Exceptions¶
Raised when the builder cannot produce a valid runtime router. |
|
gRPC-compatible dispatch error carrying a StatusCode. |
Classes¶
Fluent builder that collects handler factories for the unified router. |
|
Return value of a process-manager |
|
Return shape of a saga |
|
Return value of a saga |
|
Runtime router dispatching commands to registered @command_handler instances. |
|
Runtime router dispatching events to registered @process_manager instances. |
|
Runtime router fanning events out to registered @projector instances. |
|
Runtime router dispatching events to registered @saga instances. |
|
Runtime router transforming events through registered @upcaster instances. |
Functions¶
|
Register a method as a state applier for |
|
Mark a class as a command handler (aggregate) for |
|
Register a method as a dispatch target for |
|
Register a method as a fact-event handler for |
|
Mark a class as a process manager. |
|
Mark a class as a projector consuming events from |
|
Register a method as a compensation handler for command rejections. |
|
Mark a class as a saga translating events from |
|
Mark a method as the state factory for this instance. |
|
Mark a class as an upcaster transforming events in |
|
Register a method as a transformation from |
Package Contents¶
- exception angzarr_client.router.BuildError(message: str, *, code: str = '', details: collections.abc.Mapping[str, Any] | None = None)¶
Bases:
ExceptionRaised when the builder cannot produce a valid runtime router.
Audit #72: carries the structural error shape from audit #59 — SCREAMING_SNAKE
code(programmatic dispatch / cucumber assertions), staticmessage(cross-language equality), anddetailsfor runtime context (router name, conflicting kinds, duplicate (domain, type_url), handler class, field name, etc.). Mirrors Rustrouter::handler::BuildErrorwhich carries anErrorDetailper variant.Construction sites pass constants from
angzarr_client.error_codes:codes.DUPLICATE_COMMAND_HANDLERetc. for the code,messages.DUPLICATE_COMMAND_HANDLERetc. for the static message, detail keys fromkeysfor the details map.Initialize self. See help(type(self)) for accurate signature.
- message¶
- code = ''¶
- class angzarr_client.router.Router(name: str)¶
Fluent builder that collects handler factories for the unified router.
Usage:
Router("agg-service") .with_handler(Player, lambda: Player(db_pool)) .with_handler(Hand, lambda: Hand(rng)) .build()
.build()returns a typed runtime router (CommandHandlerRouter,SagaRouter,ProcessManagerRouter, orProjectorRouter). Mixed handler kinds are rejected at build time (R4).Handlers are registered as
(cls, factory)pairs so the router can invokefactory()per dispatch call to obtain a fresh (or pooled) instance. This keeps handler state isolated per request and makes the router safe to share across threads.- name¶
- with_handler(cls: type, factory: Callable[[], Any]) Router¶
Register a handler class together with a zero-arg factory.
clsmust carry one of the five kind decorators (@command_handler,@saga,@process_manager,@projector,@upcaster). Registering an undecorated class raisesBuildError.factoryis anyCallable[[], Any]: a lambda, a bound method on a DI container, afunctools.partial, a pool-checkout closure, or a callable returning a shared singleton. The router never inspects the factory body; it only calls it and uses whatever it returns.Factories are invoked inside
dispatch(), so their latency is on the request path. For handlers whose construction is expensive — opens a DB connection, reads a config file, performs I/O, does non-trivial computation, holds resources that must be released — prefer a pool checkout or close over a pre-built instance rather than constructing fresh on every call.
- build() Any¶
Produce a typed runtime router.
Empty →
BuildError. Mixed kinds →BuildError. Homogeneous →CommandHandlerRouter/SagaRouter/ProcessManagerRouter/ProjectorRouterper the shared kind.
- angzarr_client.router.applies(event_type: type) Callable[[F], F]¶
Register a method as a state applier for
event_type.Appliers are invoked during state rebuild, walking the prior event book and mutating the instance’s state in place.
- angzarr_client.router.command_handler(*, domain: str, state: type, supports_replay: bool = False) Callable[[T], T]¶
Mark a class as a command handler (aggregate) for
domain.The
statetype is the aggregate’s state type; the class must either provide a@state_factorymethod or rely onstate()as the default factory (enforced in a later round).supports_replay(defaultFalse) opts the aggregate into the coordinator’sReplayRPC, used forMERGE_COMMUTATIVEconflict detection. WhenTrue, the framework auto-implements replay using the@appliesmethods to rebuild state from a snapshot + events; the state type must be a proto Message (carry aDESCRIPTOR). WhenFalsethe gRPC adapter returnsUNIMPLEMENTEDforReplayrequests — the coordinator degrades toMERGE_STRICTsemantics. Audit #45.Fact handling is opt-in via the
@handles_fact(EventType)method decorator. Aggregates with no@handles_factmethods getUNIMPLEMENTEDfrom the gRPC adapter forHandleFact; the coordinator falls back to pass-through-persist (per the proto’s Optional contract).
- angzarr_client.router.handles(message_type: type) Callable[[F], F]¶
Register a method as a dispatch target for
message_type.For command handlers this is the command type; for sagas / process managers / projectors it is the event type. Dispatch routes by proto full-name match.
- angzarr_client.router.handles_fact(event_type: type) Callable[[F], F]¶
Register a method as a fact-event handler for
event_type.Audit #45. Triggered when the coordinator dispatches a fact (an external reality, e.g. a payment confirmation from a third party) via the
HandleFactRPC. The method receives(event, state)after state has been rebuilt from prior events; it returns the events to persist (orNonefor pure side-effects).Aggregates with at least one
@handles_factmethod opt into theHandleFactRPC. Aggregates with none getUNIMPLEMENTEDfrom the framework’s gRPC adapter — the coordinator then falls back to pass-through-persist per the proto’s Optional contract.Distinct from
@handles(which dispatches commands and returns events),@handles_factis fact-specific: facts cannot be rejected, and the method’s return shape is just events-to-persist.
- angzarr_client.router.process_manager(*, name: str, pm_domain: str, sources: list[str], targets: list[str], state: type, sync_targets: list[str] | None = None) Callable[[T], T]¶
Mark a class as a process manager.
pm_domainis the PM’s own state-storage domain;sourceslists incoming event domains;targetslists downstream command domains.Audit #74:
sync_targetsdeclares the subset oftargetsthe PM ever addresses with sync mode (SIMPLE / CASCADE / DECISION / ISOLATED) — those are the targets whose coordinator must be reachable for readiness. DefaultNone(no sync targets) — every command goes through the async bus and the readiness supervisor will not probe any target’s coordinator.Raises
ValueErrorifsync_targetscontains a domain that is not intargets.
- angzarr_client.router.projector(*, name: str, domains: list[str]) Callable[[T], T]¶
Mark a class as a projector consuming events from
domains.
- angzarr_client.router.rejected(source_domain: str, command: str) Callable[[F], F]¶
Register a method as a compensation handler for command rejections.
Triggered when a command originating from this component is rejected by the target aggregate.
source_domainandcommandidentify the rejected command’s proto full-name suffix split into domain/command parts.
- angzarr_client.router.saga(*, name: str, source: str, target: str, sync: bool = False) Callable[[T], T]¶
Mark a class as a saga translating events from
sourceto commands fortarget.Audit #74:
syncdeclares whether commands emitted totargetever use sync mode (SIMPLE / CASCADE / DECISION / ISOLATED). DefaultFalse— target commands flow through the async bus and the readiness supervisor will not probetarget’s coordinator. Flip toTruewhen the saga blocks on the downstream response.
- angzarr_client.router.state_factory(fn: F) F¶
Mark a method as the state factory for this instance.
Overrides the default factory (calling
StateType()). Useful when state construction requires parameters or custom defaults.
- angzarr_client.router.upcaster(*, name: str, domain: str) Callable[[T], T]¶
Mark a class as an upcaster transforming events in
domain.Methods decorated with
@upcasts(FromType, ToType)declare individual version-to-version transformations. An upcaster with zero@upcastsmethods is allowed (passthrough).
- angzarr_client.router.upcasts(from_type: type, to_type: type) Callable[[F], F]¶
Register a method as a transformation from
from_typetoto_type.The method must accept the old event and return the new one. Dispatch matches by exact proto type-URL on the incoming event; events without a registered transform pass through unchanged.
- exception angzarr_client.router.DispatchError(code: grpc.StatusCode, details: str, *, error_code: str = '', extras: dict[str, Any] | None = None)¶
Bases:
grpc.RpcErrorgRPC-compatible dispatch error carrying a StatusCode.
Raised when a command cannot be routed (unknown type, unknown domain), when a request is malformed (missing command/page/cover), or for any other caller-facing violation. The gRPC servicer can read
.code()and forward the status to the client.Audit finding #59:
messageis a static string (no interpolation); runtime context like type URLs, domains, etc. ride inextras. The SCREAMING_SNAKEerror_codeis a stable cross-language identifier suitable for cucumber assertions.Initialize self. See help(type(self)) for accurate signature.
- error_code = ''¶
- code() grpc.StatusCode¶
- class angzarr_client.router.ProcessManagerResponse(commands: list[angzarr_client.proto.angzarr.types_pb2.CommandBook] | None = None, process_events: list[angzarr_client.proto.angzarr.types_pb2.EventBook] | None = None, facts: list[angzarr_client.proto.angzarr.types_pb2.EventBook] | None = None)¶
Return value of a process-manager
@handlesmethod.commands: commands forwarded to target domainsprocess_events: list of EventBooks recorded into the PM’s own event stream (state). Multiple books concatenate into the wire’s singleProcessManagerHandleResponse.process_events(first non-empty book’s cover wins).facts: facts injected into other aggregates without a command
Audit finding #56 (Option B — list[EventBook]): aligns with Rust’s
Vec<EventBook>and matches the existingcommands/factslist shapes for symmetry.- commands = []¶
- process_events = []¶
- facts = []¶
- class angzarr_client.router.RejectionHandlerResponse¶
Return shape of a saga
@rejectedhandler method.Carries compensation events to persist locally and an optional
Notificationto forward upstream. Mirrors Rust’srouter::responses::RejectionHandlerResponsefield-for-field.Audit finding #56 (Option B — list[EventBook]): aligns with Rust’s
Vec<EventBook>. Multiple books concatenate downstream; first non-empty book’s cover wins.
- class angzarr_client.router.SagaHandlerResponse(commands: list[angzarr_client.proto.angzarr.types_pb2.CommandBook] | None = None, events: list[angzarr_client.proto.angzarr.types_pb2.EventBook] | None = None)¶
Return value of a saga
@handlesmethod.Carries commands to forward to target domains and/or facts (events) to inject into other aggregates. Both are optional.
- commands = []¶
- events = []¶
- class angzarr_client.router.CommandHandlerRouter(factories: list[Factory])¶
Bases:
_BuiltRouterBase,Generic[S]Runtime router dispatching commands to registered @command_handler instances.
- property name: str¶
Domain this router serves (read from the first registered handler’s
@command_handler(domain=...)metadata).@command_handlerhas nonameattribute, so the domain is the natural identifier. Mirrors Rust’sCommandHandlerRouter::name()(runtime.rs:625-633).
- dispatch(request)¶
Route a ContextualCommand to the matching @handles method.
- supports_handle_fact() bool¶
Audit #45: True if any registered handler declares at least one
@handles_factmethod. The gRPC adapter uses this as the gate for theHandleFactRPC — False → returnUNIMPLEMENTEDwithout entering dispatch.Implemented as pure metadata read on the registered classes; no factory invocation, no per-request work.
- supports_replay() bool¶
Audit #45: True if any registered handler opted in via
@command_handler(supports_replay=True). The gRPC adapter uses this as the gate for theReplayRPC.Replay is a generic state-rebuild operation — the framework implements it from the existing
@appliesmachinery, so the opt-in is just an explicit acknowledgement that the aggregate’s state type is proto-serializable (replay round-trips state viaAny).
- dispatch_fact(request)¶
Route a FactRequest to matching @handles_fact methods.
Caller is responsible for gating via
supports_handle_fact()before invoking this — the gRPC adapter does so.
- dispatch_replay(request)¶
Replay events on top of a base snapshot to compute final state.
Caller is responsible for gating via
supports_replay()before invoking this — the gRPC adapter does so.
- class angzarr_client.router.ProcessManagerRouter(factories: list[Factory])¶
Bases:
_BuiltRouterBase,Generic[S]Runtime router dispatching events to registered @process_manager instances.
- dispatch(request)¶
Route a ProcessManagerHandleRequest to matching handlers.
- output_domains() list[str]¶
Domains this router emits commands to.
Read from each registered handler class’s
__angzarr_meta__. Default is empty — only sagas (target) and process managers (targets) override.
- sync_output_domains() list[str]¶
Audit #74: subset of
output_domains()that the registered handlers ever address with sync mode (SIMPLE / CASCADE / DECISION / ISOLATED). Drives readiness probing — only sync targets need their coordinator reachable for traffic to be safe. Default empty; saga / PM routers override.
- has_async_outputs() bool¶
Audit #74:
Trueif any registered handler emits to at least one async target — i.e. ever publishes through the async bus rather than synchronously calling a downstream coordinator. Drives whether the readiness supervisor includes aBusProbe. DefaultFalse(CH / Projector / Upcaster never cross-emit).
- class angzarr_client.router.ProjectorRouter(factories: list[Factory])¶
Bases:
_BuiltRouterBaseRuntime router fanning events out to registered @projector instances.
- dispatch(events)¶
Fan out each event in the book to matching @handles methods.
- class angzarr_client.router.SagaRouter(factories: list[Factory])¶
Bases:
_BuiltRouterBaseRuntime router dispatching events to registered @saga instances.
- dispatch(request)¶
Route a SagaHandleRequest to all matching @handles methods.
- output_domains() list[str]¶
Domains this router emits commands to.
Read from each registered handler class’s
__angzarr_meta__. Default is empty — only sagas (target) and process managers (targets) override.
- sync_output_domains() list[str]¶
Audit #74: subset of
output_domains()that the registered handlers ever address with sync mode (SIMPLE / CASCADE / DECISION / ISOLATED). Drives readiness probing — only sync targets need their coordinator reachable for traffic to be safe. Default empty; saga / PM routers override.
- has_async_outputs() bool¶
Audit #74:
Trueif any registered handler emits to at least one async target — i.e. ever publishes through the async bus rather than synchronously calling a downstream coordinator. Drives whether the readiness supervisor includes aBusProbe. DefaultFalse(CH / Projector / Upcaster never cross-emit).