Skip to main content
Attesta’s challenge system is pluggable. While the built-in challenges (Confirm, Quiz, Teach-Back, Multi-Party) cover common scenarios, you can build custom challenges for specialized verification workflows — TOTP/2FA, manager escalation, biometric confirmation, or domain-specific knowledge checks. This guide covers implementing the ChallengeProtocol, wiring custom challenges into the challenge map, and testing them.

The ChallengeProtocol

Every challenge must implement two members:
MemberSignatureDescription
presentasync present(ctx: ActionContext, risk: RiskAssessment) -> ChallengeResultPresent the challenge and return the outcome
challenge_type@property challenge_type -> ChallengeTypeThe type identifier for this challenge
from attesta import (
    ChallengeProtocol, ActionContext, RiskAssessment,
    ChallengeResult, ChallengeType,
)

class MyChallenge:
    @property
    def challenge_type(self) -> ChallengeType:
        return ChallengeType.CONFIRM  # reuse an existing type

    async def present(
        self, ctx: ActionContext, risk: RiskAssessment
    ) -> ChallengeResult:
        # Your verification logic here
        return ChallengeResult(
            passed=True,
            challenge_type=self.challenge_type,
        )

# Verify protocol compliance
assert isinstance(MyChallenge(), ChallengeProtocol)
The present() method must be async. Even if your challenge implementation is synchronous, declare it as async def or the protocol check will fail.

Example: TOTP/2FA Challenge

A time-based one-time password challenge that requires the operator to enter a code from their authenticator app.
1

Install Dependencies

pip install attesta pyotp
2

Implement the Challenge

import asyncio
import time
import pyotp
from attesta import (
    ActionContext, RiskAssessment,
    ChallengeResult, ChallengeType,
)

class TOTPChallenge:
    """Two-factor authentication challenge using TOTP codes.

    The operator must provide a valid TOTP code from their
    authenticator app before the action is approved.
    """

    def __init__(
        self,
        secret_provider,
        max_attempts: int = 3,
        code_valid_window: int = 1,
    ):
        """
        Parameters
        ----------
        secret_provider:
            Callable that accepts an operator/agent ID and returns
            their TOTP secret. E.g., a database lookup function.
        max_attempts:
            Maximum number of code entry attempts before denial.
        code_valid_window:
            Number of 30-second windows to accept (1 = current + 1 previous).
        """
        self.secret_provider = secret_provider
        self.max_attempts = max_attempts
        self.code_valid_window = code_valid_window

    @property
    def challenge_type(self) -> ChallengeType:
        return ChallengeType.CONFIRM

    async def present(
        self, ctx: ActionContext, risk: RiskAssessment
    ) -> ChallengeResult:
        start = time.monotonic()

        # Resolve the operator's TOTP secret
        operator_id = ctx.agent_id or "default"
        try:
            secret = await self._get_secret(operator_id)
        except Exception:
            return ChallengeResult(
                passed=False,
                challenge_type=self.challenge_type,
                response_time_seconds=time.monotonic() - start,
                details={"reason": "TOTP secret not found"},
            )

        totp = pyotp.TOTP(secret)

        # Display the challenge prompt
        print(f"\n  {'=' * 50}")
        print(f"  2FA VERIFICATION REQUIRED")
        print(f"  {'=' * 50}")
        print(f"  Action: {ctx.function_name}")
        print(f"  Risk: {risk.level.value.upper()} ({risk.score:.2f})")
        print(f"  Enter your authenticator code:")

        # Allow multiple attempts
        loop = asyncio.get_running_loop()
        for attempt in range(1, self.max_attempts + 1):
            code = await loop.run_in_executor(
                None,
                lambda: input(
                    f"  Code (attempt {attempt}/{self.max_attempts}): "
                ).strip(),
            )

            if totp.verify(code, valid_window=self.code_valid_window):
                elapsed = time.monotonic() - start
                return ChallengeResult(
                    passed=True,
                    challenge_type=self.challenge_type,
                    response_time_seconds=elapsed,
                    details={
                        "method": "totp",
                        "attempts": attempt,
                    },
                )

            if attempt < self.max_attempts:
                print(f"  Invalid code. Try again.")

        elapsed = time.monotonic() - start
        return ChallengeResult(
            passed=False,
            challenge_type=self.challenge_type,
            response_time_seconds=elapsed,
            details={
                "method": "totp",
                "attempts": self.max_attempts,
                "reason": "max attempts exceeded",
            },
        )

    async def _get_secret(self, operator_id: str) -> str:
        """Retrieve the TOTP secret for the given operator."""
        if asyncio.iscoroutinefunction(self.secret_provider):
            return await self.secret_provider(operator_id)
        return self.secret_provider(operator_id)
3

Wire It Into the Challenge Map

Custom challenges integrate with Attesta through the renderer. The simplest approach is to create a renderer that delegates to your challenge:
from attesta import (
    Attesta, RiskLevel, ChallengeType,
    ActionContext, RiskAssessment, ChallengeResult, Verdict,
)
from attesta.renderers.terminal import TerminalRenderer

class TOTPRenderer(TerminalRenderer):
    """Terminal renderer that adds TOTP verification for CRITICAL actions."""

    def __init__(self, totp_challenge: TOTPChallenge):
        super().__init__()
        self._totp = totp_challenge

    async def render_challenge(
        self,
        ctx: ActionContext,
        risk: RiskAssessment,
        challenge_type: ChallengeType,
    ) -> ChallengeResult:
        if challenge_type == ChallengeType.MULTI_PARTY:
            # Use TOTP for CRITICAL instead of multi-party
            return await self._totp.present(ctx, risk)
        # Fall back to default terminal rendering for other types
        return await super().render_challenge(ctx, risk, challenge_type)

# Usage
totp = TOTPChallenge(
    secret_provider=lambda op_id: "JBSWY3DPEHPK3PXP",  # demo secret
)

attesta = Attesta(renderer=TOTPRenderer(totp))

@attesta.gate(risk="critical")
def delete_production_database(db_name: str) -> str:
    return f"Deleted {db_name}"

Example: Manager Approval Challenge

A challenge that escalates the approval to a designated manager via an external notification system.
import asyncio
import time
import uuid
from attesta import (
    ActionContext, RiskAssessment,
    ChallengeResult, ChallengeType,
)

class ManagerApprovalChallenge:
    """Requires explicit approval from a designated manager.

    Sends a notification to the manager and polls for their response.
    The original operator cannot approve their own action -- a different
    person must respond.
    """

    def __init__(
        self,
        notification_service,
        manager_resolver,
        timeout_seconds: float = 3600.0,
        poll_interval: float = 5.0,
    ):
        """
        Parameters
        ----------
        notification_service:
            Object with an async `send(manager_id, message, request_id)` method.
        manager_resolver:
            Callable that takes an agent_id and returns the manager's ID.
        timeout_seconds:
            How long to wait for manager response before denying.
        poll_interval:
            How often to check for a response (seconds).
        """
        self.notifier = notification_service
        self.manager_resolver = manager_resolver
        self.timeout = timeout_seconds
        self.poll_interval = poll_interval

    @property
    def challenge_type(self) -> ChallengeType:
        return ChallengeType.MULTI_PARTY

    async def present(
        self, ctx: ActionContext, risk: RiskAssessment
    ) -> ChallengeResult:
        start = time.monotonic()
        request_id = uuid.uuid4().hex

        # Resolve the manager for this agent
        agent_id = ctx.agent_id or "unknown"
        manager_id = self.manager_resolver(agent_id)

        # Send notification to manager
        message = (
            f"Approval required for: {ctx.function_name}\n"
            f"Agent: {agent_id}\n"
            f"Risk: {risk.level.value.upper()} ({risk.score:.2f})\n"
            f"Call: {ctx.description}\n"
            f"Environment: {ctx.environment}"
        )
        await self.notifier.send(manager_id, message, request_id)

        print(f"\n  Waiting for manager ({manager_id}) approval...")
        print(f"  Request ID: {request_id}")
        print(f"  Timeout: {self.timeout:.0f}s")

        # Poll for response
        deadline = asyncio.get_event_loop().time() + self.timeout
        while asyncio.get_event_loop().time() < deadline:
            response = await self.notifier.check_response(request_id)
            if response is not None:
                elapsed = time.monotonic() - start
                approved = response.get("verdict") == "approved"
                responder = response.get("responder_id", manager_id)

                # Verify that the responder is not the original agent
                if responder == agent_id:
                    return ChallengeResult(
                        passed=False,
                        challenge_type=self.challenge_type,
                        responder=responder,
                        response_time_seconds=elapsed,
                        details={"reason": "self-approval not allowed"},
                    )

                return ChallengeResult(
                    passed=approved,
                    challenge_type=self.challenge_type,
                    responder=responder,
                    response_time_seconds=elapsed,
                    details={"manager_id": manager_id},
                )

            await asyncio.sleep(self.poll_interval)

        # Timed out
        elapsed = time.monotonic() - start
        return ChallengeResult(
            passed=False,
            challenge_type=self.challenge_type,
            responder="timeout",
            response_time_seconds=elapsed,
            details={"reason": "manager did not respond in time"},
        )

Example: Domain Knowledge Challenge

A challenge that asks domain-specific questions using templates from a DomainProfile.
import asyncio
import time
from attesta import (
    ActionContext, RiskAssessment,
    ChallengeResult, ChallengeType,
)
from attesta.domains.profile import DomainProfile

class DomainKnowledgeChallenge:
    """Asks domain-specific comprehension questions.

    Pulls question templates from a DomainProfile and requires the
    operator to demonstrate understanding of the domain implications.
    """

    def __init__(
        self,
        profile: DomainProfile,
        min_correct: int = 2,
        max_questions: int = 3,
    ):
        self.profile = profile
        self.min_correct = min_correct
        self.max_questions = max_questions

    @property
    def challenge_type(self) -> ChallengeType:
        return ChallengeType.QUIZ

    async def present(
        self, ctx: ActionContext, risk: RiskAssessment
    ) -> ChallengeResult:
        start = time.monotonic()

        # Get templates for this risk level
        templates = self.profile.get_templates_for_level(
            risk.level.value
        )[:self.max_questions]

        if not templates:
            # No domain questions available; pass through
            return ChallengeResult(
                passed=True,
                challenge_type=self.challenge_type,
                response_time_seconds=0.0,
                details={"reason": "no domain questions available"},
            )

        loop = asyncio.get_running_loop()
        correct = 0
        asked = 0

        for template in templates:
            # Format the question with context variables
            question = template.question_template.format(
                action=ctx.function_name,
                environment=ctx.environment,
            )

            print(f"\n  Q{asked + 1}: {question}")
            answer = await loop.run_in_executor(
                None, lambda: input("  > ").strip()
            )
            asked += 1

            # Check answer against hints (fuzzy keyword matching)
            answer_lower = answer.lower()
            if any(hint.lower() in answer_lower for hint in template.answer_hints):
                correct += 1
                print("  Correct.")
            else:
                print("  Incorrect.")

        elapsed = time.monotonic() - start
        passed = correct >= self.min_correct

        return ChallengeResult(
            passed=passed,
            challenge_type=self.challenge_type,
            response_time_seconds=elapsed,
            questions_asked=asked,
            questions_correct=correct,
            details={
                "domain": self.profile.name,
                "required_correct": self.min_correct,
            },
        )

Adding Custom Challenges to the Challenge Map

The challenge map in Attesta maps RiskLevel to ChallengeType. Since custom challenges implement the protocol and are presented through the renderer, the integration point is the renderer:
Override specific challenge types in a custom renderer:
from attesta.renderers.terminal import TerminalRenderer
from attesta import ChallengeType, ChallengeResult

class CustomRenderer(TerminalRenderer):
    def __init__(self, custom_challenges: dict):
        super().__init__()
        self._custom = custom_challenges

    async def render_challenge(self, ctx, risk, challenge_type):
        if challenge_type in self._custom:
            challenge = self._custom[challenge_type]
            return await challenge.present(ctx, risk)
        return await super().render_challenge(ctx, risk, challenge_type)

# Use TOTP for HIGH, manager approval for CRITICAL
renderer = CustomRenderer({
    ChallengeType.QUIZ: TOTPChallenge(secret_provider=...),
    ChallengeType.MULTI_PARTY: ManagerApprovalChallenge(
        notification_service=...,
        manager_resolver=...,
    ),
})

Testing Custom Challenges

import pytest
from attesta import (
    ActionContext, RiskAssessment, RiskLevel,
    ChallengeProtocol, ChallengeType,
)

def test_totp_challenge_protocol():
    """Verify TOTP challenge satisfies the ChallengeProtocol."""
    challenge = TOTPChallenge(
        secret_provider=lambda _: "JBSWY3DPEHPK3PXP"
    )
    assert isinstance(challenge, ChallengeProtocol)
    assert challenge.challenge_type == ChallengeType.CONFIRM

@pytest.mark.asyncio
async def test_totp_challenge_with_valid_code(monkeypatch):
    """Test that a valid TOTP code passes the challenge."""
    import pyotp
    secret = "JBSWY3DPEHPK3PXP"
    totp = pyotp.TOTP(secret)
    valid_code = totp.now()

    challenge = TOTPChallenge(
        secret_provider=lambda _: secret
    )

    ctx = ActionContext(
        function_name="delete_user",
        agent_id="test-operator",
    )
    risk = RiskAssessment(
        score=0.9,
        level=RiskLevel.CRITICAL,
    )

    # Mock the input() call to return the valid code
    monkeypatch.setattr("builtins.input", lambda _: valid_code)
    result = await challenge.present(ctx, risk)
    assert result.passed is True
    assert result.details["method"] == "totp"

@pytest.mark.asyncio
async def test_totp_challenge_max_attempts(monkeypatch):
    """Test that exceeding max attempts fails the challenge."""
    challenge = TOTPChallenge(
        secret_provider=lambda _: "JBSWY3DPEHPK3PXP",
        max_attempts=2,
    )

    ctx = ActionContext(function_name="deploy", agent_id="test")
    risk = RiskAssessment(score=0.9, level=RiskLevel.CRITICAL)

    # Always return an invalid code
    monkeypatch.setattr("builtins.input", lambda _: "000000")
    result = await challenge.present(ctx, risk)
    assert result.passed is False
    assert result.details["attempts"] == 2

@pytest.mark.asyncio
async def test_manager_self_approval_blocked():
    """Manager challenge should block self-approval."""
    # Mock notifier that responds with the same agent ID
    class MockNotifier:
        async def send(self, *args): pass
        async def check_response(self, request_id):
            return {"verdict": "approved", "responder_id": "agent-1"}

    challenge = ManagerApprovalChallenge(
        notification_service=MockNotifier(),
        manager_resolver=lambda _: "manager-1",
    )

    ctx = ActionContext(
        function_name="deploy",
        agent_id="agent-1",  # same as responder
    )
    risk = RiskAssessment(score=0.9, level=RiskLevel.CRITICAL)

    result = await challenge.present(ctx, risk)
    assert result.passed is False
    assert result.details["reason"] == "self-approval not allowed"

Best Practices

Always Return ChallengeResult

Never raise exceptions from present(). Return ChallengeResult(passed=False, ...) with a descriptive details dict on failure.

Enforce Timeouts

Every challenge that waits for external input should have a configurable timeout that defaults to denial.

Record Metadata

Use the details dict on ChallengeResult to record verification method, attempt counts, responder identity, and failure reasons for the audit trail.

Prevent Self-Approval

For multi-party or escalation challenges, verify that the responder is not the same entity that initiated the action.

Challenge System

How Attesta selects and presents challenges

Protocols

Full ChallengeProtocol specification