angzarr_client.compensation¶
Compensation flow helpers for saga/PM rejection handling.
This module provides helpers for implementing compensation logic in aggregates and process managers when saga/PM commands are rejected.
- Usage in Aggregate:
from angzarr_client import Aggregate, handles from angzarr_client.compensation import (
CompensationContext, delegate_to_framework, emit_compensation_events,
)
- class OrderAggregate(Aggregate[OrderState]):
- def handle_revocation(self, notification):
ctx = CompensationContext.from_notification(notification)
# Option 1: Emit compensation events based on rejected command type if ctx.rejected_command_type == “type.googleapis.com/orders.FulfillOrder”:
- event = OrderCancelled(
order_id=self.order_id, reason=f”Fulfillment failed: {ctx.rejection_reason}”,
) self._apply_and_record(event) return emit_compensation_events(self.event_book())
# Option 2: Delegate to framework return delegate_to_framework(
reason=f”No custom compensation for {ctx.rejected_command_type}”
)
- Usage in ProcessManager:
from angzarr_client.process_manager import ProcessManager, handles from angzarr_client.compensation import (
CompensationContext, pm_delegate_to_framework, pm_emit_compensation_events,
)
- class OrderWorkflowPM(ProcessManager[WorkflowState]):
- def handle_revocation(self, notification):
ctx = CompensationContext.from_notification(notification)
# Record failure in PM state event = WorkflowStepFailed(
source_domain=ctx.source_domain, reason=ctx.rejection_reason,
) self._apply_and_record(event)
# Return PM events + framework response return pm_emit_compensation_events(
process_events=self.process_events(), also_emit_system_event=True,
)
Classes¶
Extracted context from a rejection Notification. |
|
Options struct for |
|
Result from PM compensation helpers. |
Functions¶
Create a response that delegates compensation to the framework. |
|
Create a response containing compensation events. |
|
|
Create a PM response that delegates compensation to the framework. |
|
Create a PM response containing compensation events. |
|
Check if a type URL refers to a rejection Notification. |
Module Contents¶
- class angzarr_client.compensation.CompensationContext¶
Extracted context from a rejection Notification.
Provides easy access to compensation-relevant fields. Source aggregate info is extracted from the rejected command’s angzarr_deferred header.
- rejected_command: angzarr_client.proto.angzarr.types_pb2.CommandBook | None¶
The command that was rejected (if available).
- source_aggregate: angzarr_client.proto.angzarr.types_pb2.Cover | None¶
Cover of the aggregate that triggered the flow.
- classmethod from_notification(notification: angzarr_client.proto.angzarr.types_pb2.Notification) CompensationContext¶
Extract compensation context from a Notification.
Source aggregate info is extracted from the rejected command’s angzarr_deferred header, which is always set by the framework for saga/PM-produced commands.
- Parameters:
notification – The notification containing RejectionNotification payload.
- Returns:
CompensationContext with extracted fields.
- class angzarr_client.compensation.DelegationOptions¶
Options struct for
delegate_to_framework/pm_delegate_to_framework.Audit #65 / #66: collapses the previously-divergent function shapes (Rust two-function split, Python kwargs) into a single shared options type. Cross-language symmetric — Python
DelegationOptionsmirrors Rustcompensation::DelegationOptionsfield-for-field with the same defaults.Field defaults match the previous Python kwargs and Rust’s basic
delegate_to_framework(which hardcodedemit_system_event = trueand the rest false).
- angzarr_client.compensation.delegate_to_framework(reason: str, options: DelegationOptions | None = None) angzarr_client.proto.angzarr.command_handler_pb2.BusinessResponse¶
Create a response that delegates compensation to the framework.
Use when the aggregate doesn’t have custom compensation logic for a saga. The framework will emit a SagaCompensationFailed event to the fallback domain.
- Parameters:
reason – Human-readable explanation for the delegation.
options – Optional
DelegationOptionsstruct overriding the framework defaults (emit_system_event=Trueand the restFalse). Audit #65: replaces the previous keyword arguments for cross-language symmetry with Rust.
- Returns:
BusinessResponse with revocation flags.
- angzarr_client.compensation.emit_compensation_events(event_book: angzarr_client.proto.angzarr.types_pb2.EventBook) angzarr_client.proto.angzarr.command_handler_pb2.BusinessResponse¶
Create a response containing compensation events.
Use when the aggregate emits events to record compensation. The framework will persist these events and NOT emit a system event.
- Parameters:
event_book – EventBook containing compensation events.
- Returns:
BusinessResponse with events.
- class angzarr_client.compensation.PMRevocationResponse¶
Result from PM compensation helpers.
Named type matching Go’s PMRevocationResponse, replacing raw tuples for better discoverability and documentation.
Audit finding #70:
revocationis required (no default), matching Rust’scompensation.rs:188-194revocation: RevocationResponsefield. The factory helperspm_delegate_to_frameworkandpm_emit_compensation_eventsalways construct one, so no caller is broken; direct user-code construction without it raisesTypeErrorat construction time, surfacing the missing field loudly. Symmetric to audit-#56 (tighten ambiguous Optionals when the framework always populates them).- revocation: angzarr_client.proto.angzarr.command_handler_pb2.RevocationResponse¶
Framework action flags. Required.
- angzarr_client.compensation.pm_delegate_to_framework(reason: str, options: DelegationOptions | None = None) PMRevocationResponse¶
Create a PM response that delegates compensation to the framework.
Use when the PM doesn’t have custom compensation logic.
- Parameters:
reason – Human-readable explanation for the delegation.
options – Optional
DelegationOptionsstruct. Audit #66: replaces the previousemit_system_eventkwarg with the shared options struct (same shape asdelegate_to_framework()).
- Returns:
PMRevocationResponse with no process events, delegate to framework.
- angzarr_client.compensation.pm_emit_compensation_events(process_events: angzarr_client.proto.angzarr.types_pb2.EventBook, also_emit_system_event: bool = False, reason: str = '') PMRevocationResponse¶
Create a PM response containing compensation events.
Use when the PM emits events to record the compensation in its state.
- Parameters:
process_events – EventBook containing PM compensation events.
also_emit_system_event – Also emit SagaCompensationFailed.
reason – Reason for system event (if emitting).
- Returns:
PMRevocationResponse with events and revocation flags.