Skip to main content
The EventBus allows external code to subscribe to lifecycle events emitted during the Attesta approval pipeline. Events are fired at key points — risk scoring, trust computation, challenge issuance, approval decisions, and audit logging — enabling integrations like webhooks, metrics, and custom logging.

EventType

The EventType enum defines all lifecycle events emitted during the approval pipeline.

Import

from attesta.events import EventType

Values

MemberValueDescription
RISK_SCORED"risk_scored"Emitted after the risk scorer produces a score.
TRUST_COMPUTED"trust_computed"Emitted after the trust engine computes a trust score.
CHALLENGE_ISSUED"challenge_issued"Emitted when a challenge is presented to the operator.
CHALLENGE_COMPLETED"challenge_completed"Emitted when the operator completes a challenge.
APPROVED"approved"Emitted when an action is approved.
DENIED"denied"Emitted when an action is denied.
ESCALATED"escalated"Emitted when timeout policy escalates an action.
AUDIT_LOGGED"audit_logged"Emitted after an entry is written to the audit log.

Event

A single lifecycle event emitted by the pipeline.

Import

from attesta.events import Event

Fields

FieldTypeDefaultDescription
typeEventType(required)The kind of lifecycle event.
timestampfloattime.time()Unix timestamp when the event was created.
datadict[str, Any]{}Arbitrary payload data associated with the event.

Example

from attesta.events import Event, EventType

event = Event(
    type=EventType.APPROVED,
    data={"agent_id": "deploy-bot", "action": "deploy"},
)
print(event.type)       # EventType.APPROVED
print(event.timestamp)  # 1718456789.123
print(event.data)       # {"agent_id": "deploy-bot", "action": "deploy"}

EventBus

Publish/subscribe event bus for pipeline lifecycle events. Thread-safe. Supports both sync and async handlers. Errors in handlers are caught and logged — they never break the pipeline.

Import

from attesta.events import EventBus

Constructor

bus = EventBus()
The EventBus takes no constructor arguments. All state is managed internally.

on()

Subscribe a synchronous callback to an event type. Can be used as a decorator or called directly.
from attesta.events import EventBus, EventType

bus = EventBus()

# As a decorator
@bus.on(EventType.APPROVED)
def on_approved(event):
    print(f"Action approved: {event.data}")

# Direct registration
def on_denied(event):
    print(f"Action denied: {event.data}")

bus.on(EventType.DENIED, on_denied)

Signature

def on(self, event_type: EventType, callback: EventHandler | None = None) -> Callable
ParameterTypeDescription
event_typeEventTypeThe event type to subscribe to.
callbackEventHandler | NoneThe handler function. If None, returns a decorator.

async_on()

Subscribe an async callback to an event type. Works identically to on() but for async handlers.
@bus.async_on(EventType.RISK_SCORED)
async def on_risk_scored(event):
    await send_metric("risk_score", event.data.get("score"))

Signature

def async_on(self, event_type: EventType, callback: Callable | None = None) -> Callable
ParameterTypeDescription
event_typeEventTypeThe event type to subscribe to.
callbackCallable | NoneThe async handler function. If None, returns a decorator.

off()

Remove a callback from an event type’s subscription list.
bus.off(EventType.APPROVED, on_approved)

Signature

def off(self, event_type: EventType, callback: EventHandler) -> None
ParameterTypeDescription
event_typeEventTypeThe event type to unsubscribe from.
callbackEventHandlerThe handler to remove.

emit()

Emit an event to all registered synchronous handlers. Errors in handlers are caught and logged.
from attesta.events import Event, EventType

bus.emit(Event(type=EventType.APPROVED, data={"agent_id": "deploy-bot"}))

Signature

def emit(self, event: Event) -> None

async_emit()

Emit an event to both sync and async handlers. Sync handlers are called first, then async handlers are awaited.
await bus.async_emit(Event(type=EventType.APPROVED, data={"agent_id": "deploy-bot"}))

Signature

async def async_emit(self, event: Event) -> None

clear()

Remove all handlers for all event types.
bus.clear()

Integration with Attesta

Pass an EventBus to the Attesta constructor to receive pipeline lifecycle events:
from attesta import Attesta
from attesta.events import EventBus, EventType

bus = EventBus()

@bus.on(EventType.APPROVED)
def on_approved(event):
    print(f"Approved: {event.data}")

@bus.on(EventType.DENIED)
def on_denied(event):
    print(f"Denied: {event.data}")

attesta = Attesta(event_bus=bus)
The event bus is also the foundation for webhook notifications. The WebhookDispatcher subscribes to an EventBus and sends HTTP POST requests for configured event types.

Webhooks

Send HTTP notifications for pipeline events

Audit Trail

Every event is also recorded in the audit log