angzarr_client.compensation

Compensation flow helpers for saga/PM rejection handling.

This module provides helpers for implementing compensation logic in aggregates and process managers when saga/PM commands are rejected.

Usage in Aggregate:

from angzarr_client import Aggregate, handles from angzarr_client.compensation import (

CompensationContext, delegate_to_framework, emit_compensation_events,

)

class OrderAggregate(Aggregate[OrderState]):
def handle_revocation(self, notification):

ctx = CompensationContext.from_notification(notification)

# Option 1: Emit compensation events based on rejected command type if ctx.rejected_command_type == “type.googleapis.com/orders.FulfillOrder”:

event = OrderCancelled(

order_id=self.order_id, reason=f”Fulfillment failed: {ctx.rejection_reason}”,

) self._apply_and_record(event) return emit_compensation_events(self.event_book())

# Option 2: Delegate to framework return delegate_to_framework(

reason=f”No custom compensation for {ctx.rejected_command_type}”

)

Usage in ProcessManager:

from angzarr_client.process_manager import ProcessManager, handles from angzarr_client.compensation import (

CompensationContext, pm_delegate_to_framework, pm_emit_compensation_events,

)

class OrderWorkflowPM(ProcessManager[WorkflowState]):
def handle_revocation(self, notification):

ctx = CompensationContext.from_notification(notification)

# Record failure in PM state event = WorkflowStepFailed(

source_domain=ctx.source_domain, reason=ctx.rejection_reason,

) self._apply_and_record(event)

# Return PM events + framework response return pm_emit_compensation_events(

process_events=self.process_events(), also_emit_system_event=True,

)

Classes

CompensationContext

Extracted context from a rejection Notification.

DelegationOptions

Options struct for delegate_to_framework / pm_delegate_to_framework.

PMRevocationResponse

Result from PM compensation helpers.

Functions

delegate_to_framework(...)

Create a response that delegates compensation to the framework.

emit_compensation_events(...)

Create a response containing compensation events.

pm_delegate_to_framework(→ PMRevocationResponse)

Create a PM response that delegates compensation to the framework.

pm_emit_compensation_events(→ PMRevocationResponse)

Create a PM response containing compensation events.

is_notification(→ bool)

Check if a type URL refers to a rejection Notification.

Module Contents

class angzarr_client.compensation.CompensationContext

Extracted context from a rejection Notification.

Provides easy access to compensation-relevant fields. Source aggregate info is extracted from the rejected command’s angzarr_deferred header.

source_event_sequence: int

Sequence of the event that triggered the saga/PM flow.

rejection_reason: str

Why the command was rejected.

rejected_command: angzarr_client.proto.angzarr.types_pb2.CommandBook | None

The command that was rejected (if available).

source_aggregate: angzarr_client.proto.angzarr.types_pb2.Cover | None

Cover of the aggregate that triggered the flow.

classmethod from_notification(notification: angzarr_client.proto.angzarr.types_pb2.Notification) CompensationContext

Extract compensation context from a Notification.

Source aggregate info is extracted from the rejected command’s angzarr_deferred header, which is always set by the framework for saga/PM-produced commands.

Parameters:

notification – The notification containing RejectionNotification payload.

Returns:

CompensationContext with extracted fields.

property rejected_command_type: str | None

Get the type URL of the rejected command, if available.

property source_domain: str | None

Get the domain of the source aggregate, if available.

property source_root: bytes | None

Get the root UUID bytes of the source aggregate, if available.

property dispatch_key: str

Build a dispatch key for routing rejection handlers.

Returns:

A key in format “domain/command” or empty string.

class angzarr_client.compensation.DelegationOptions

Options struct for delegate_to_framework / pm_delegate_to_framework.

Audit #65 / #66: collapses the previously-divergent function shapes (Rust two-function split, Python kwargs) into a single shared options type. Cross-language symmetric — Python DelegationOptions mirrors Rust compensation::DelegationOptions field-for-field with the same defaults.

Field defaults match the previous Python kwargs and Rust’s basic delegate_to_framework (which hardcoded emit_system_event = true and the rest false).

emit_system_event: bool = True

Emit SagaCompensationFailed to the fallback domain.

send_to_dead_letter: bool = False

Move the failed event to the dead-letter queue.

escalate: bool = False

Mark for operator intervention.

abort: bool = False

Stop the saga entirely without retry.

angzarr_client.compensation.delegate_to_framework(reason: str, options: DelegationOptions | None = None) angzarr_client.proto.angzarr.command_handler_pb2.BusinessResponse

Create a response that delegates compensation to the framework.

Use when the aggregate doesn’t have custom compensation logic for a saga. The framework will emit a SagaCompensationFailed event to the fallback domain.

Parameters:
  • reason – Human-readable explanation for the delegation.

  • options – Optional DelegationOptions struct overriding the framework defaults (emit_system_event=True and the rest False). Audit #65: replaces the previous keyword arguments for cross-language symmetry with Rust.

Returns:

BusinessResponse with revocation flags.

angzarr_client.compensation.emit_compensation_events(event_book: angzarr_client.proto.angzarr.types_pb2.EventBook) angzarr_client.proto.angzarr.command_handler_pb2.BusinessResponse

Create a response containing compensation events.

Use when the aggregate emits events to record compensation. The framework will persist these events and NOT emit a system event.

Parameters:

event_book – EventBook containing compensation events.

Returns:

BusinessResponse with events.

class angzarr_client.compensation.PMRevocationResponse

Result from PM compensation helpers.

Named type matching Go’s PMRevocationResponse, replacing raw tuples for better discoverability and documentation.

Audit finding #70: revocation is required (no default), matching Rust’s compensation.rs:188-194 revocation: RevocationResponse field. The factory helpers pm_delegate_to_framework and pm_emit_compensation_events always construct one, so no caller is broken; direct user-code construction without it raises TypeError at construction time, surfacing the missing field loudly. Symmetric to audit-#56 (tighten ambiguous Optionals when the framework always populates them).

revocation: angzarr_client.proto.angzarr.command_handler_pb2.RevocationResponse

Framework action flags. Required.

process_events: angzarr_client.proto.angzarr.types_pb2.EventBook | None = None

PM events to persist (may be None when delegating).

angzarr_client.compensation.pm_delegate_to_framework(reason: str, options: DelegationOptions | None = None) PMRevocationResponse

Create a PM response that delegates compensation to the framework.

Use when the PM doesn’t have custom compensation logic.

Parameters:
  • reason – Human-readable explanation for the delegation.

  • options – Optional DelegationOptions struct. Audit #66: replaces the previous emit_system_event kwarg with the shared options struct (same shape as delegate_to_framework()).

Returns:

PMRevocationResponse with no process events, delegate to framework.

angzarr_client.compensation.pm_emit_compensation_events(process_events: angzarr_client.proto.angzarr.types_pb2.EventBook, also_emit_system_event: bool = False, reason: str = '') PMRevocationResponse

Create a PM response containing compensation events.

Use when the PM emits events to record the compensation in its state.

Parameters:
  • process_events – EventBook containing PM compensation events.

  • also_emit_system_event – Also emit SagaCompensationFailed.

  • reason – Reason for system event (if emitting).

Returns:

PMRevocationResponse with events and revocation flags.

angzarr_client.compensation.is_notification(type_url: str) bool

Check if a type URL refers to a rejection Notification.

Cross-language alias for Rust’s is_notification(type_url). Useful for dispatch code that needs to branch on “is this a notification?” without unpacking the Any payload.