Skip to content

API Reference

Complete reference for all fastapi-topaz public exports.


Configuration

TopazConfig

Configuration for Topaz authorization. Create once at app startup, use to generate authorization dependencies.

PARAMETER DESCRIPTION
authorizer_options

Connection settings for Topaz authorizer

TYPE: AuthorizerOptions

policy_path_root

Root package name for policies (e.g., "myapp")

TYPE: str

identity_provider

Function to extract user identity from request

TYPE: Callable[[Request], Identity]

policy_instance_name

Name of policy instance to evaluate

TYPE: str

policy_instance_label

Label for policy instance (defaults to name)

TYPE: str | None DEFAULT: None

resource_context_provider

Function to provide additional context

TYPE: Callable[[Request], ResourceContext] | None DEFAULT: None

policy_path_normalizer

Optional callable to transform generated policy paths (e.g., replace hyphens with underscores for valid Rego identifiers)

TYPE: Callable[[str], str] | None DEFAULT: None

default_policy

Optional fallback policy path evaluated when no explicit policy file exists and no policy group matches. All requests still go through Topaz — this only changes which policy is evaluated.

TYPE: str | None DEFAULT: None

policy_groups

Optional ordered list of :class:PolicyGroup entries. The first group whose url_pattern matches the route template wins.

TYPE: list[PolicyGroup] | None DEFAULT: None

decision_cache

Optional cache for authorization decisions

TYPE: DecisionCache | None DEFAULT: None

max_concurrent_checks

Max concurrent authorization checks for bulk operations (default: 10)

TYPE: int DEFAULT: 10

circuit_breaker

Optional circuit breaker for graceful degradation

TYPE: CircuitBreaker | None DEFAULT: None

connection_pool

Optional connection pool for gRPC connection reuse

TYPE: ConnectionPool | None DEFAULT: None

audit_logger

Optional audit logger for authorization decisions

TYPE: AuditLogger | None DEFAULT: None

metrics

Optional Prometheus metrics collector

TYPE: PrometheusMetrics | None DEFAULT: None

tracing

Optional OpenTelemetry tracing

TYPE: OTelTracing | None DEFAULT: None

Source code in src/fastapi_topaz/config.py
def __init__(
    self,
    *,
    authorizer_options: AuthorizerOptions,
    policy_path_root: str,
    identity_provider: Callable[[Request], Identity],
    policy_instance_name: str,
    policy_instance_label: str | None = None,
    resource_context_provider: Callable[[Request], ResourceContext] | None = None,
    policy_path_normalizer: Callable[[str], str] | None = None,
    default_policy: str | None = None,
    policy_groups: list[PolicyGroup] | None = None,
    decision_cache: DecisionCache | None = None,
    max_concurrent_checks: int = 10,
    circuit_breaker: CircuitBreaker | None = None,
    connection_pool: ConnectionPool | None = None,
    audit_logger: AuditLogger | None = None,
    metrics: PrometheusMetrics | None = None,
    tracing: OTelTracing | None = None,
):
    self.authorizer_options = authorizer_options
    self.policy_path_root = policy_path_root
    self.identity_provider = identity_provider
    self.policy_instance_name = policy_instance_name
    self.policy_instance_label = policy_instance_label or policy_instance_name
    self.resource_context_provider = resource_context_provider
    self.policy_path_normalizer = policy_path_normalizer
    self.default_policy = default_policy
    self.policy_groups = tuple(policy_groups or [])
    self.decision_cache = decision_cache
    self.max_concurrent_checks = max_concurrent_checks
    self.circuit_breaker = circuit_breaker
    self.connection_pool = connection_pool
    self.audit_logger = audit_logger
    self.metrics = metrics
    self.tracing = tracing
    self._semaphore = asyncio.Semaphore(max_concurrent_checks)
    # Stale cache for circuit breaker fallback (stores entries beyond normal TTL)
    self._stale_cache: dict[str, tuple[bool, float]] = {}
    self._stale_cache_lock = asyncio.Lock()

    # Validate policy_groups regex patterns at config creation time
    if self.policy_groups:
        from ._policy import _compile_policy_groups

        _compile_policy_groups(self.policy_groups)  # raises ValueError on bad regex

    # Guard against empty string default_policy
    if default_policy is not None and not default_policy:
        raise ValueError("default_policy must be a non-empty string or None")

    # Configure connection pool with authorizer options
    if self.connection_pool:
        self.connection_pool.configure(authorizer_options)

check_decision async

check_decision(request: Request, policy_path: str, decision: str, resource_context: ResourceContext | None = None, source: str = 'dependency') -> bool

Check an authorization decision, using cache if available.

This is the core authorization check method that handles caching, circuit breaker logic, and can be used directly for custom authorization logic.

Source code in src/fastapi_topaz/config.py
async def check_decision(
    self,
    request: Request,
    policy_path: str,
    decision: str,
    resource_context: ResourceContext | None = None,
    source: str = "dependency",
) -> bool:
    """
    Check an authorization decision, using cache if available.

    This is the core authorization check method that handles caching,
    circuit breaker logic, and can be used directly for custom authorization logic.
    """
    identity = self.identity_provider(request)
    start_time = time.monotonic()
    cached_result = False
    result: bool = False
    span = None

    # Start tracing span
    if self.tracing:
        span = self.tracing.start_auth_span(
            source=source,
            check_type="policy",
            policy_path=policy_path,
            identity_value=identity.value if identity else None,
        )

    try:
        # Check fresh cache first
        identity_value = identity.value or ""
        if self.decision_cache:
            cached = await self.decision_cache.get(
                identity_value, policy_path, decision, resource_context
            )
            if cached is not None:
                logger.debug(f"Cache HIT: {policy_path}, decision={decision}")
                cached_result = True
                if self.metrics:
                    self.metrics.record_cache_hit(source)
                return cached
            else:
                if self.metrics:
                    self.metrics.record_cache_miss(source)

        # Check circuit breaker - should we attempt the call?
        if self.circuit_breaker:
            should_call = await self.circuit_breaker.should_allow_request()
            if not should_call:
                # Circuit is open, use fallback
                stale_cached = await self._get_stale_cached(
                    identity_value, policy_path, decision, resource_context
                )
                logger.warning(
                    f"Circuit OPEN, using fallback for {policy_path} "
                    f"(stale_cache={'hit' if stale_cached is not None else 'miss'})"
                )

                result = await self.circuit_breaker.get_fallback_decision(
                    request,
                    policy_path,
                    dict(resource_context) if resource_context else {},
                    stale_cached,
                    ConnectionError("Circuit breaker open"),
                )

                if self.metrics:
                    self.metrics.record_fallback(
                        "circuit_open",
                        stale_cached is not None,
                        "allowed" if result else "denied",
                    )

                if self.circuit_breaker.on_fallback:
                    try:
                        self.circuit_breaker.on_fallback(
                            request, policy_path, stale_cached, result
                        )
                    except Exception as e:
                        logger.error(f"Error in on_fallback callback: {e}")

                return result

        # Make the authorization call
        # Note: ConnectionPool cannot be used here because AuthorizerClient
        # binds identity at construction time and decisions() offers no
        # per-call identity override. Each request needs its own client.
        topaz_start = time.monotonic()
        client = self.create_client(request)
        decisions_result = await client.decisions(
            policy_path=policy_path,
            decisions=(decision,),
            policy_instance_name=self.policy_instance_name,
            policy_instance_label=self.policy_instance_label,
            resource_context=resource_context,
        )
        result = decisions_result.get(decision, False)
        topaz_latency = time.monotonic() - topaz_start

        if self.metrics:
            self.metrics.record_topaz_latency(topaz_latency)

        # Record success with circuit breaker
        if self.circuit_breaker:
            await self.circuit_breaker.record_success()

        # Cache the result
        if self.decision_cache:
            await self.decision_cache.set(
                identity_value, policy_path, decision, resource_context, result
            )

        # Store in stale cache for circuit breaker fallback
        await self._set_stale_cached(
            identity_value, policy_path, decision, resource_context, result
        )

        return result

    except Exception as e:
        if self.metrics:
            self.metrics.record_error(type(e).__name__)
        if self.tracing and span:
            self.tracing.record_error(span, e)
            span = None  # Don't end span twice

        # Check if this is a failure that should trip the circuit breaker
        if self.circuit_breaker and self.circuit_breaker.is_failure_exception(e):
            await self.circuit_breaker.record_failure(e)

            # Try fallback
            stale_cached = await self._get_stale_cached(
                identity_value, policy_path, decision, resource_context
            )
            logger.warning(
                f"Topaz call failed ({type(e).__name__}), using fallback for {policy_path}"
            )

            result = await self.circuit_breaker.get_fallback_decision(
                request,
                policy_path,
                dict(resource_context) if resource_context else {},
                stale_cached,
                e,
            )

            if self.metrics:
                self.metrics.record_fallback(
                    "error",
                    stale_cached is not None,
                    "allowed" if result else "denied",
                )

            if self.circuit_breaker.on_fallback:
                try:
                    self.circuit_breaker.on_fallback(
                        request, policy_path, stale_cached, result
                    )
                except Exception as cb_error:
                    logger.error(f"Error in on_fallback callback: {cb_error}")

            return result

        # Not a circuit breaker failure, re-raise
        raise

    finally:
        latency_seconds = time.monotonic() - start_time
        latency_ms = latency_seconds * 1000
        result_decision = "allowed" if result else "denied"

        # Record metrics
        if self.metrics:
            self.metrics.record_auth_request(
                source=source,
                decision=result_decision,
                check_type="policy",
                policy_path=policy_path,
            )
            self.metrics.record_latency(
                latency_seconds, source, cached_result, policy_path
            )

        # End tracing span
        if self.tracing and span:
            self.tracing.end_auth_span(
                span,
                decision=result_decision,
                cached=cached_result,
                latency_ms=latency_ms,
                resource_context=dict(resource_context) if resource_context else None,
            )

is_allowed async

is_allowed(request: Request, policy_path: str, resource_context: ResourceContext | None = None, decision: str = 'allowed') -> bool

Check if an action is allowed without raising an exception.

This is a non-raising alternative to require_policy_allowed that returns True/False instead of raising HTTPException(403). Useful for UI patterns where you need to check permissions without blocking (e.g., showing/hiding edit or delete buttons).

PARAMETER DESCRIPTION
request

The FastAPI request object

TYPE: Request

policy_path

Full policy path (e.g., "webapp.PUT.documents")

TYPE: str

resource_context

Optional resource context dict

TYPE: ResourceContext | None DEFAULT: None

decision

Decision to check (default: "allowed")

TYPE: str DEFAULT: 'allowed'

RETURNS DESCRIPTION
bool

True if allowed, False otherwise

Example
@app.get("/documents/{id}")
async def get_document(id: int, request: Request):
    doc = await fetch_document(id)
    can_edit = await config.is_allowed(
        request,
        policy_path="myapp.PUT.documents",
        resource_context={"id": str(id)},
    )
    return {"document": doc, "can_edit": can_edit}
Source code in src/fastapi_topaz/config.py
async def is_allowed(
    self,
    request: Request,
    policy_path: str,
    resource_context: ResourceContext | None = None,
    decision: str = "allowed",
) -> bool:
    """
    Check if an action is allowed without raising an exception.

    This is a non-raising alternative to require_policy_allowed that returns
    True/False instead of raising HTTPException(403). Useful for UI patterns
    where you need to check permissions without blocking (e.g., showing/hiding
    edit or delete buttons).

    Args:
        request: The FastAPI request object
        policy_path: Full policy path (e.g., "webapp.PUT.documents")
        resource_context: Optional resource context dict
        decision: Decision to check (default: "allowed")

    Returns:
        True if allowed, False otherwise

    Example:
        ```python
        @app.get("/documents/{id}")
        async def get_document(id: int, request: Request):
            doc = await fetch_document(id)
            can_edit = await config.is_allowed(
                request,
                policy_path="myapp.PUT.documents",
                resource_context={"id": str(id)},
            )
            return {"document": doc, "can_edit": can_edit}
        ```
    """
    ctx: ResourceContext = dict(resource_context) if resource_context else {}
    if self.resource_context_provider:
        ctx.update(self.resource_context_provider(request))
    if request.path_params:
        ctx.update(request.path_params)

    return await self.check_decision(request, policy_path, decision, ctx)

check_relation async

check_relation(request: Request, object_type: str, object_id: str, relation: str, subject_type: str = 'user') -> bool

Check a ReBAC relation without raising an exception.

This is a non-raising alternative to require_rebac_allowed that returns True/False instead of raising HTTPException(403). Useful for checking if a user has a specific relationship with an object.

PARAMETER DESCRIPTION
request

The FastAPI request object

TYPE: Request

object_type

Type of object (e.g., "document", "folder")

TYPE: str

object_id

ID of the object to check

TYPE: str

relation

Relation to check (e.g., "can_read", "can_write", "can_delete")

TYPE: str

subject_type

Subject type (default: "user")

TYPE: str DEFAULT: 'user'

RETURNS DESCRIPTION
bool

True if the relation exists, False otherwise

Example
@app.get("/documents/{id}")
async def get_document(id: int, request: Request):
    doc = await fetch_document(id)
    can_delete = await config.check_relation(
        request,
        object_type="document",
        object_id=str(id),
        relation="can_delete",
    )
    return {"document": doc, "can_delete": can_delete}
Source code in src/fastapi_topaz/config.py
async def check_relation(
    self,
    request: Request,
    object_type: str,
    object_id: str,
    relation: str,
    subject_type: str = "user",
) -> bool:
    """
    Check a ReBAC relation without raising an exception.

    This is a non-raising alternative to require_rebac_allowed that returns
    True/False instead of raising HTTPException(403). Useful for checking
    if a user has a specific relationship with an object.

    Args:
        request: The FastAPI request object
        object_type: Type of object (e.g., "document", "folder")
        object_id: ID of the object to check
        relation: Relation to check (e.g., "can_read", "can_write", "can_delete")
        subject_type: Subject type (default: "user")

    Returns:
        True if the relation exists, False otherwise

    Example:
        ```python
        @app.get("/documents/{id}")
        async def get_document(id: int, request: Request):
            doc = await fetch_document(id)
            can_delete = await config.check_relation(
                request,
                object_type="document",
                object_id=str(id),
                relation="can_delete",
            )
            return {"document": doc, "can_delete": can_delete}
        ```
    """
    resource_ctx: ResourceContext = {}
    if self.resource_context_provider:
        resource_ctx.update(self.resource_context_provider(request))

    resource_ctx.update({
        "object_type": object_type,
        "object_id": object_id,
        "relation": relation,
        "subject_type": subject_type,
    })

    policy_path = f"{self.policy_path_root}.check"
    return await self.check_decision(request, policy_path, "allowed", resource_ctx)

check_relations async

check_relations(request: Request, object_type: str, object_id: str, relations: list[str], subject_type: str = 'user') -> dict[str, bool]

Check multiple ReBAC relations at once without raising exceptions.

This method checks multiple relations concurrently and returns a dict mapping relation names to boolean results. Useful for fetching all permissions for an object in a single call (e.g., to populate a permissions object in an API response).

PARAMETER DESCRIPTION
request

The FastAPI request object

TYPE: Request

object_type

Type of object (e.g., "document", "folder")

TYPE: str

object_id

ID of the object to check

TYPE: str

relations

List of relations to check (e.g., ["can_read", "can_write", "can_delete"])

TYPE: list[str]

subject_type

Subject type (default: "user")

TYPE: str DEFAULT: 'user'

RETURNS DESCRIPTION
dict[str, bool]

Dict mapping relation names to boolean results

Example
@app.get("/documents/{id}")
async def get_document(id: int, request: Request):
    doc = await fetch_document(id)
    permissions = await config.check_relations(
        request,
        object_type="document",
        object_id=str(id),
        relations=["can_read", "can_write", "can_delete", "can_share"],
    )
    # permissions = {"can_read": True, "can_write": True, "can_delete": False, "can_share": False}
    return {"document": doc, "permissions": permissions}
Source code in src/fastapi_topaz/config.py
async def check_relations(
    self,
    request: Request,
    object_type: str,
    object_id: str,
    relations: list[str],
    subject_type: str = "user",
) -> dict[str, bool]:
    """
    Check multiple ReBAC relations at once without raising exceptions.

    This method checks multiple relations concurrently and returns a dict
    mapping relation names to boolean results. Useful for fetching all
    permissions for an object in a single call (e.g., to populate a
    permissions object in an API response).

    Args:
        request: The FastAPI request object
        object_type: Type of object (e.g., "document", "folder")
        object_id: ID of the object to check
        relations: List of relations to check (e.g., ["can_read", "can_write", "can_delete"])
        subject_type: Subject type (default: "user")

    Returns:
        Dict mapping relation names to boolean results

    Example:
        ```python
        @app.get("/documents/{id}")
        async def get_document(id: int, request: Request):
            doc = await fetch_document(id)
            permissions = await config.check_relations(
                request,
                object_type="document",
                object_id=str(id),
                relations=["can_read", "can_write", "can_delete", "can_share"],
            )
            # permissions = {"can_read": True, "can_write": True, "can_delete": False, "can_share": False}
            return {"document": doc, "permissions": permissions}
        ```
    """
    async def check_single_relation(rel: str) -> tuple[str, bool]:
        async with self._semaphore:
            result = await self.check_relation(
                request,
                object_type=object_type,
                object_id=object_id,
                relation=rel,
                subject_type=subject_type,
            )
        return rel, result

    results = await asyncio.gather(*[check_single_relation(rel) for rel in relations])
    return dict(results)

policy_path_for

policy_path_for(method: str, route_path: str) -> str

Generate the policy path for a given HTTP method and route path.

Useful for debugging, testing, or previewing what policy path will be generated for a given route.

PARAMETER DESCRIPTION
method

HTTP method (e.g., "GET", "POST")

TYPE: str

route_path

URL path pattern (e.g., "/documents/{id}")

TYPE: str

RETURNS DESCRIPTION
str

The policy path that would be used for authorization

Example

config.policy_path_for("GET", "/documents/{id}") "myapp.GET.documents.__id"

Source code in src/fastapi_topaz/config.py
def policy_path_for(self, method: str, route_path: str) -> str:
    """
    Generate the policy path for a given HTTP method and route path.

    Useful for debugging, testing, or previewing what policy path
    will be generated for a given route.

    Args:
        method: HTTP method (e.g., "GET", "POST")
        route_path: URL path pattern (e.g., "/documents/{id}")

    Returns:
        The policy path that would be used for authorization

    Example:
        >>> config.policy_path_for("GET", "/documents/{id}")
        "myapp.GET.documents.__id"
    """
    return _resolve_policy_path(
        self.policy_path_root, method, route_path, self.policy_path_normalizer
    )

create_client

create_client(request: Request) -> AuthorizerClient

Create a Topaz authorizer client with identity from request.

Source code in src/fastapi_topaz/config.py
def create_client(self, request: Request) -> AuthorizerClient:
    """Create a Topaz authorizer client with identity from request."""
    identity = self.identity_provider(request)
    return AuthorizerClient(identity=identity, options=self.authorizer_options)

DecisionCache

Simple in-memory TTL cache for authorization decisions.

PARAMETER DESCRIPTION
ttl_seconds

Time-to-live for cache entries (default: 60 seconds)

TYPE: float DEFAULT: 60.0

max_size

Maximum number of entries to cache (default: 1000)

TYPE: int DEFAULT: 1000

get async

get(identity_value: str, policy_path: str, decision: str, resource_context: ResourceContext | None) -> bool | None

Get a cached decision, or None if not cached or expired.

Source code in src/fastapi_topaz/cache.py
async def get(
    self,
    identity_value: str,
    policy_path: str,
    decision: str,
    resource_context: ResourceContext | None,
) -> bool | None:
    """Get a cached decision, or None if not cached or expired."""
    key = self._make_key(identity_value, policy_path, decision, resource_context)
    async with self._lock:
        entry = self._cache.get(key)
        if entry is None:
            return None
        if time.monotonic() > entry.expires_at:
            del self._cache[key]
            return None
        # Re-insert to mark as recently used (LRU)
        del self._cache[key]
        self._cache[key] = entry
        return entry.value

set async

set(identity_value: str, policy_path: str, decision: str, resource_context: ResourceContext | None, value: bool) -> None

Cache a decision.

Source code in src/fastapi_topaz/cache.py
async def set(
    self,
    identity_value: str,
    policy_path: str,
    decision: str,
    resource_context: ResourceContext | None,
    value: bool,
) -> None:
    """Cache a decision."""
    key = self._make_key(identity_value, policy_path, decision, resource_context)
    async with self._lock:
        # Evict oldest entries if cache is full
        if len(self._cache) >= self.max_size:
            # Remove expired entries first
            now = time.monotonic()
            expired = [k for k, v in self._cache.items() if v.expires_at < now]
            for k in expired:
                del self._cache[k]
            # If still full, remove oldest 10%
            if len(self._cache) >= self.max_size:
                to_remove = list(self._cache.keys())[: self.max_size // 10]
                for k in to_remove:
                    del self._cache[k]

        self._cache[key] = CacheEntry(
            value=value,
            expires_at=time.monotonic() + self.ttl_seconds,
        )

clear async

clear() -> None

Clear all cached entries.

Source code in src/fastapi_topaz/cache.py
async def clear(self) -> None:
    """Clear all cached entries."""
    async with self._lock:
        self._cache.clear()

Authorization Dependencies

require_policy_allowed

Async dependency that raises HTTPException(403) if policy denies access.

PARAMETER DESCRIPTION
config

Topaz configuration

TYPE: TopazConfig

policy_path

Full policy path (e.g., "webapp.POST.api.documents")

TYPE: str

decision

Decision to check (default: "allowed")

TYPE: str DEFAULT: 'allowed'

resource_context

Optional resource context dict

TYPE: ResourceContext | None DEFAULT: None

RETURNS DESCRIPTION
Callable[[Request], Awaitable[None]]

Async dependency function for FastAPI

Example
@router.post("/documents")
async def create_document(
    _: None = Depends(require_policy_allowed(topaz_config, "webapp.POST.api.documents")),
):
    ...
Source code in src/fastapi_topaz/dependencies.py
def require_policy_allowed(
    config: TopazConfig,
    policy_path: str,
    decision: str = "allowed",
    resource_context: ResourceContext | None = None,
) -> Callable[[Request], Awaitable[None]]:
    """
    Async dependency that raises HTTPException(403) if policy denies access.

    Args:
        config: Topaz configuration
        policy_path: Full policy path (e.g., "webapp.POST.api.documents")
        decision: Decision to check (default: "allowed")
        resource_context: Optional resource context dict

    Returns:
        Async dependency function for FastAPI

    Example:
        ```python
        @router.post("/documents")
        async def create_document(
            _: None = Depends(require_policy_allowed(topaz_config, "webapp.POST.api.documents")),
        ):
            ...
        ```
    """

    async def dependency(request: Request) -> None:
        identity = config.identity_provider(request)

        ctx: ResourceContext = dict(resource_context) if resource_context else {}
        if config.resource_context_provider:
            ctx.update(config.resource_context_provider(request))

        # Add path params to context
        if request.path_params:
            ctx.update(request.path_params)

        logger.info(
            f"Authorization check: path={policy_path}, decision={decision}, "
            f"identity_type={identity.type}, identity_value={identity.value}"
        )
        logger.debug(f"Resource context: {ctx}")

        allowed = await config.check_decision(request, policy_path, decision, ctx)

        logger.info(f"Authorization result: policy={policy_path}, allowed={allowed}")

        if not allowed:
            logger.warning(
                f"Access DENIED: path={policy_path}, identity={identity.value}, "
                f"context={ctx}"
            )
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Access denied: {policy_path}",
            )

        logger.info(f"Access GRANTED: path={policy_path}, identity={identity.value}")

    return dependency

require_policy_auto

Async dependency that auto-generates policy path from route and raises HTTPException(403) if denied.

The policy path is automatically derived from the HTTP method and route path pattern: - GET /documents -> {root}.GET.documents - POST /documents -> {root}.POST.documents - GET /documents/{id} -> {root}.GET.documents.__id - PUT /users/{user_id}/docs/{doc_id} -> {root}.PUT.users.__user_id.docs.__doc_id

PARAMETER DESCRIPTION
config

Topaz configuration

TYPE: TopazConfig

decision

Decision to check (default: "allowed")

TYPE: str DEFAULT: 'allowed'

resource_context

Optional resource context dict

TYPE: ResourceContext | None DEFAULT: None

RETURNS DESCRIPTION
Callable[[Request], Awaitable[None]]

Async dependency function for FastAPI

Example
@router.get("/documents/{id}")
async def get_document(
    id: int,
    _: None = Depends(require_policy_auto(topaz_config)),
):
    # Policy path auto-generated as "myapp.GET.documents.__id"
    ...
Source code in src/fastapi_topaz/dependencies.py
def require_policy_auto(
    config: TopazConfig,
    decision: str = "allowed",
    resource_context: ResourceContext | None = None,
) -> Callable[[Request], Awaitable[None]]:
    """
    Async dependency that auto-generates policy path from route and raises HTTPException(403) if denied.

    The policy path is automatically derived from the HTTP method and route path pattern:
    - GET /documents -> {root}.GET.documents
    - POST /documents -> {root}.POST.documents
    - GET /documents/{id} -> {root}.GET.documents.__id
    - PUT /users/{user_id}/docs/{doc_id} -> {root}.PUT.users.__user_id.docs.__doc_id

    Args:
        config: Topaz configuration
        decision: Decision to check (default: "allowed")
        resource_context: Optional resource context dict

    Returns:
        Async dependency function for FastAPI

    Example:
        ```python
        @router.get("/documents/{id}")
        async def get_document(
            id: int,
            _: None = Depends(require_policy_auto(topaz_config)),
        ):
            # Policy path auto-generated as "myapp.GET.documents.__id"
            ...
        ```
    """

    async def dependency(request: Request) -> None:
        # Extract route path pattern from FastAPI's routing
        route = request.scope.get("route")
        if route is None:
            raise HTTPException(
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
                detail="Unable to determine route for policy path auto-resolution",
            )

        route_path = route.path
        method = request.method

        # Generate policy path
        policy_path = _resolve_policy_path(
            config.policy_path_root, method, route_path,
            config.policy_path_normalizer,
        )

        identity = config.identity_provider(request)

        ctx: ResourceContext = dict(resource_context) if resource_context else {}
        if config.resource_context_provider:
            ctx.update(config.resource_context_provider(request))

        # Add path params to context
        if request.path_params:
            ctx.update(request.path_params)

        logger.info(
            f"Authorization check (auto): path={policy_path}, decision={decision}, "
            f"identity_type={identity.type}, identity_value={identity.value}"
        )
        logger.debug(f"Resource context: {ctx}")

        allowed = await config.check_decision(request, policy_path, decision, ctx)

        logger.info(f"Authorization result: policy={policy_path}, allowed={allowed}")

        if not allowed:
            logger.warning(
                f"Access DENIED: path={policy_path}, identity={identity.value}, "
                f"context={ctx}"
            )
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Access denied: {policy_path}",
            )

        logger.info(f"Access GRANTED: path={policy_path}, identity={identity.value}")

    return dependency

require_rebac_allowed

Async dependency that raises HTTPException(403) if ReBAC check fails.

PARAMETER DESCRIPTION
config

Topaz configuration

TYPE: TopazConfig

object_type

Type of object (e.g., "document", "folder")

TYPE: str

relation

Relation to check (e.g., "can_write", "can_delete")

TYPE: str

object_id

Static ID, callable to extract from request, or None (uses path param "id")

TYPE: str | Callable[[Request], str] | None DEFAULT: None

subject_type

Subject type (default: "user")

TYPE: str DEFAULT: 'user'

RETURNS DESCRIPTION
Callable[[Request], Awaitable[None]]

Async dependency function for FastAPI

Example
@router.put("/documents/{id}")
async def update_document(
    id: int,
    _: None = Depends(require_rebac_allowed(topaz_config, "document", "can_write")),
):
    ...
Source code in src/fastapi_topaz/dependencies.py
def require_rebac_allowed(
    config: TopazConfig,
    object_type: str,
    relation: str,
    object_id: str | Callable[[Request], str] | None = None,
    subject_type: str = "user",
) -> Callable[[Request], Awaitable[None]]:
    """
    Async dependency that raises HTTPException(403) if ReBAC check fails.

    Args:
        config: Topaz configuration
        object_type: Type of object (e.g., "document", "folder")
        relation: Relation to check (e.g., "can_write", "can_delete")
        object_id: Static ID, callable to extract from request, or None (uses path param "id")
        subject_type: Subject type (default: "user")

    Returns:
        Async dependency function for FastAPI

    Example:
        ```python
        @router.put("/documents/{id}")
        async def update_document(
            id: int,
            _: None = Depends(require_rebac_allowed(topaz_config, "document", "can_write")),
        ):
            ...
        ```
    """

    async def dependency(request: Request) -> None:
        # Resolve object_id
        if callable(object_id):
            obj_id = object_id(request)
        elif object_id is not None:
            obj_id = object_id
        else:
            # Default: extract from path params
            obj_id = str(request.path_params.get("id", ""))

        # Start with resource context from provider (includes document data, user info, etc.)
        resource_ctx: ResourceContext = {}
        if config.resource_context_provider:
            resource_ctx.update(config.resource_context_provider(request))

        # Add ReBAC-specific fields
        resource_ctx.update({
            "object_type": object_type,
            "object_id": obj_id,
            "relation": relation,
            "subject_type": subject_type,
        })

        policy_path = f"{config.policy_path_root}.check"

        allowed = await config.check_decision(request, policy_path, "allowed", resource_ctx)

        if not allowed:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Access denied: {relation} on {object_type}:{obj_id}",
            )

    return dependency

get_authorized_resource

Async dependency that fetches a resource and checks authorization. Returns resource or raises 403/404.

PARAMETER DESCRIPTION
config

Topaz configuration

TYPE: TopazConfig

resource_fetcher

Function that takes (request) and returns resource or None

TYPE: Callable[[Request], T | None]

object_type

Type of object (e.g., "document")

TYPE: str

relation

Relation to check (e.g., "can_write")

TYPE: str

object_id

Static ID, callable, or None (uses path param "id")

TYPE: str | Callable[[Request], str] | None DEFAULT: None

subject_type

Subject type (default: "user")

TYPE: str DEFAULT: 'user'

RETURNS DESCRIPTION
Callable[[Request], Awaitable[T]]

Async dependency function that returns the authorized resource

Example
def fetch_document(request: Request, db: Session) -> Document | None:
    doc_id = request.path_params["id"]
    return db.query(Document).filter(Document.id == doc_id).first()

@router.put("/documents/{id}")
async def update_document(
    document: Document = Depends(
        get_authorized_resource(topaz_config, fetch_document, "document", "can_write")
    ),
):
    # document is pre-fetched and authorized
    ...
Source code in src/fastapi_topaz/dependencies.py
def get_authorized_resource(
    config: TopazConfig,
    resource_fetcher: Callable[[Request], T | None],
    object_type: str,
    relation: str,
    object_id: str | Callable[[Request], str] | None = None,
    subject_type: str = "user",
) -> Callable[[Request], Awaitable[T]]:
    """
    Async dependency that fetches a resource and checks authorization.
    Returns resource or raises 403/404.

    Args:
        config: Topaz configuration
        resource_fetcher: Function that takes (request) and returns resource or None
        object_type: Type of object (e.g., "document")
        relation: Relation to check (e.g., "can_write")
        object_id: Static ID, callable, or None (uses path param "id")
        subject_type: Subject type (default: "user")

    Returns:
        Async dependency function that returns the authorized resource

    Example:
        ```python
        def fetch_document(request: Request, db: Session) -> Document | None:
            doc_id = request.path_params["id"]
            return db.query(Document).filter(Document.id == doc_id).first()

        @router.put("/documents/{id}")
        async def update_document(
            document: Document = Depends(
                get_authorized_resource(topaz_config, fetch_document, "document", "can_write")
            ),
        ):
            # document is pre-fetched and authorized
            ...
        ```
    """

    async def dependency(request: Request) -> T:
        # First fetch the resource
        resource = resource_fetcher(request)

        if resource is None:
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND,
                detail=f"{object_type.capitalize()} not found",
            )

        # Resolve object_id
        if callable(object_id):
            obj_id = object_id(request)
        elif object_id is not None:
            obj_id = object_id
        else:
            obj_id = str(request.path_params.get("id", ""))

        # Check authorization
        resource_ctx: ResourceContext = {
            "object_type": object_type,
            "object_id": obj_id,
            "relation": relation,
            "subject_type": subject_type,
        }

        policy_path = f"{config.policy_path_root}.check"

        allowed = await config.check_decision(request, policy_path, "allowed", resource_ctx)

        if not allowed:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Access denied: {relation} on {object_type}:{obj_id}",
            )

        return resource

    return dependency

filter_authorized_resources

Async dependency that returns an async filter function to remove unauthorized resources.

Uses concurrent authorization checks (controlled by config.max_concurrent_checks) and caching (if config.decision_cache is set) for optimal performance.

PARAMETER DESCRIPTION
config

Topaz configuration

TYPE: TopazConfig

object_type

Type of object (e.g., "document")

TYPE: str

relation

Relation to check (e.g., "can_read")

TYPE: str

id_extractor

Function to extract ID from resource object

TYPE: Callable[[Any], str] DEFAULT: lambda obj: str(getattr(obj, 'id', ''))

subject_type

Subject type (default: "user")

TYPE: str DEFAULT: 'user'

RETURNS DESCRIPTION
Callable[[Request], Awaitable[Callable[[list[T]], Awaitable[list[T]]]]]

Async dependency that returns an async filter function

Example
@router.get("/documents")
async def list_documents(
    filter_fn: Callable = Depends(
        filter_authorized_resources(topaz_config, "document", "can_read")
    ),
    db: Session = Depends(get_db),
):
    all_docs = db.query(Document).all()
    authorized_docs = await filter_fn(all_docs)
    return authorized_docs
Source code in src/fastapi_topaz/dependencies.py
def filter_authorized_resources(
    config: TopazConfig,
    object_type: str,
    relation: str,
    id_extractor: Callable[[Any], str] = lambda obj: str(getattr(obj, "id", "")),
    subject_type: str = "user",
) -> Callable[[Request], Awaitable[Callable[[list[T]], Awaitable[list[T]]]]]:
    """
    Async dependency that returns an async filter function to remove unauthorized resources.

    Uses concurrent authorization checks (controlled by config.max_concurrent_checks)
    and caching (if config.decision_cache is set) for optimal performance.

    Args:
        config: Topaz configuration
        object_type: Type of object (e.g., "document")
        relation: Relation to check (e.g., "can_read")
        id_extractor: Function to extract ID from resource object
        subject_type: Subject type (default: "user")

    Returns:
        Async dependency that returns an async filter function

    Example:
        ```python
        @router.get("/documents")
        async def list_documents(
            filter_fn: Callable = Depends(
                filter_authorized_resources(topaz_config, "document", "can_read")
            ),
            db: Session = Depends(get_db),
        ):
            all_docs = db.query(Document).all()
            authorized_docs = await filter_fn(all_docs)
            return authorized_docs
        ```
    """

    async def dependency(request: Request) -> Callable[[list[T]], Awaitable[list[T]]]:
        async def check_single(resource: T) -> tuple[T, bool]:
            """Check authorization for a single resource with semaphore limiting."""
            obj_id = id_extractor(resource)

            resource_ctx: ResourceContext = {
                "object_type": object_type,
                "object_id": obj_id,
                "relation": relation,
                "subject_type": subject_type,
            }

            policy_path = f"{config.policy_path_root}.check"

            # Use semaphore to limit concurrent checks
            async with config._semaphore:
                allowed = await config.check_decision(
                    request, policy_path, "allowed", resource_ctx
                )

            return resource, allowed

        async def filter_fn(resources: list[T]) -> list[T]:
            if not resources:
                return []

            # Run all checks concurrently (limited by semaphore)
            results = await asyncio.gather(*[check_single(r) for r in resources])

            # Filter to only authorized resources
            return [resource for resource, allowed in results if allowed]

        return filter_fn

    return dependency

require_rebac_hierarchy

Async dependency for hierarchical ReBAC authorization.

Checks multiple object/relation pairs in a single dependency, reducing boilerplate for nested resources like /orgs/{org}/projects/{proj}/docs/{doc}.

PARAMETER DESCRIPTION
config

Topaz configuration

TYPE: TopazConfig

checks

List of (object_type, id_source, relation) tuples. id_source can be: - "param_name" -> request.path_params["param_name"] - "header:X-Name" -> request.headers["X-Name"] - "query:name" -> request.query_params["name"] - "static:value" -> literal "value" - callable -> callable(request)

TYPE: list[tuple[str, str, str]]

mode

Check mode: - "all" (default): All checks must pass (AND). Fails fast. - "any": At least one check must pass (OR). - "first_match": Return on first success.

TYPE: Literal['all', 'any', 'first_match'] DEFAULT: 'all'

subject_type

Subject type (default: "user")

TYPE: str DEFAULT: 'user'

optimize

Run checks concurrently when possible (default: True)

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
Callable[[Request], Awaitable[None]]

Async dependency function for FastAPI

RAISES DESCRIPTION
HTTPException(403)

If authorization fails based on mode semantics

Example
@app.get("/orgs/{org_id}/projects/{proj_id}/docs/{doc_id}")
async def get_doc(
    _=Depends(require_rebac_hierarchy(config, [
        ("organization", "org_id", "member"),
        ("project", "proj_id", "viewer"),
        ("document", "doc_id", "can_read"),
    ])),
):
    ...
Source code in src/fastapi_topaz/dependencies.py
def require_rebac_hierarchy(
    config: TopazConfig,
    checks: list[tuple[str, str, str]],
    mode: Literal["all", "any", "first_match"] = "all",
    subject_type: str = "user",
    optimize: bool = True,
) -> Callable[[Request], Awaitable[None]]:
    """
    Async dependency for hierarchical ReBAC authorization.

    Checks multiple object/relation pairs in a single dependency, reducing
    boilerplate for nested resources like /orgs/{org}/projects/{proj}/docs/{doc}.

    Args:
        config: Topaz configuration
        checks: List of (object_type, id_source, relation) tuples.
            id_source can be:
            - "param_name" -> request.path_params["param_name"]
            - "header:X-Name" -> request.headers["X-Name"]
            - "query:name" -> request.query_params["name"]
            - "static:value" -> literal "value"
            - callable -> callable(request)
        mode: Check mode:
            - "all" (default): All checks must pass (AND). Fails fast.
            - "any": At least one check must pass (OR).
            - "first_match": Return on first success.
        subject_type: Subject type (default: "user")
        optimize: Run checks concurrently when possible (default: True)

    Returns:
        Async dependency function for FastAPI

    Raises:
        HTTPException(403): If authorization fails based on mode semantics

    Example:
        ```python
        @app.get("/orgs/{org_id}/projects/{proj_id}/docs/{doc_id}")
        async def get_doc(
            _=Depends(require_rebac_hierarchy(config, [
                ("organization", "org_id", "member"),
                ("project", "proj_id", "viewer"),
                ("document", "doc_id", "can_read"),
            ])),
        ):
            ...
        ```
    """

    async def dependency(request: Request) -> None:
        result = await config.check_hierarchy(
            request, checks, mode, subject_type, optimize
        )

        if not result.allowed:
            if result.denied_at:
                detail = f"Access denied at {result.denied_at}"
            else:
                detail = "Access denied: no matching permissions"
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=detail,
            )

    return dependency

HierarchyResult

Result of a hierarchy authorization check.

ATTRIBUTE DESCRIPTION
allowed

Whether the hierarchy check passed

TYPE: bool

checks

List of (object_type, object_id, relation, result) tuples

TYPE: list[tuple[str, str, str, bool]]

denied_at

Object type where access was denied (mode="all")

TYPE: str | None

first_match

Relation that matched first (mode="first_match")

TYPE: str | None

as_dict

as_dict() -> dict[str, bool]

Return dict mapping object_type to boolean result.

Source code in src/fastapi_topaz/config.py
def as_dict(self) -> dict[str, bool]:
    """Return dict mapping object_type to boolean result."""
    return {obj_type: result for obj_type, _, _, result in self.checks}

Middleware

TopazMiddleware

FastAPI middleware for global authorization (pure ASGI).

Auto-protects all routes by checking policy paths derived from HTTP method and route pattern. Routes are protected unless explicitly excluded.

Policy Resolution Chain (first match wins): 1. Explicit: Route has a .rego file in policies_dir 2. Group: Route matches a :class:PolicyGroup url_pattern 3. Default: config.default_policy is set 4. Generated: Use auto-generated policy path (legacy behaviour)

PARAMETER DESCRIPTION
app

The FastAPI application

TYPE: ASGIApp

config

TopazConfig with authorizer settings

TYPE: TopazConfig

exclude_paths

Regex patterns for paths to skip (e.g., [r"^/health$", r"^/docs.*"])

TYPE: list[str] | None DEFAULT: None

exclude_methods

HTTP methods to skip (default: ["OPTIONS", "HEAD"])

TYPE: list[str] | None DEFAULT: None

on_missing_identity

How to handle missing identity: - "deny": Return 401 Unauthorized - "anonymous": Pass anonymous identity to Topaz (let policy decide)

TYPE: Literal['deny', 'anonymous'] DEFAULT: 'deny'

on_denied

Optional callback to customize 403 response

TYPE: Callable[[Request, str], Response] | None DEFAULT: None

policies_dir

Optional directory to scan for explicit .rego policy files at startup. When provided, the middleware builds a set of known policy paths and uses the resolution chain to decide which policy to evaluate per request.

TYPE: str | Path | None DEFAULT: None

Source code in src/fastapi_topaz/middleware.py
def __init__(
    self,
    app: ASGIApp,
    config: TopazConfig,
    exclude_paths: list[str] | None = None,
    exclude_methods: list[str] | None = None,
    on_missing_identity: Literal["deny", "anonymous"] = "deny",
    on_denied: Callable[[Request, str], Response] | None = None,
    policies_dir: str | Path | None = None,
) -> None:
    self.app = app
    self.config = config
    self.exclude_paths = [re.compile(p) for p in (exclude_paths or [])]
    self.exclude_methods = set(exclude_methods or ["OPTIONS", "HEAD"])
    self.on_missing_identity = on_missing_identity
    self.on_denied = on_denied

    # --- Resolution chain setup ---
    # Scan explicit policy files
    self._scanned_policies: set[str] | None = None
    if policies_dir is not None:
        self._scanned_policies = scan_policy_files(policies_dir)
        logger.info("Scanned %d policy files from %s", len(self._scanned_policies), policies_dir)

    # Pre-compile policy group patterns (avoid per-request re.compile)
    self._compiled_groups = _compile_policy_groups(config.policy_groups)

    # Warn if multiple groups could match same common prefixes
    for i, (p1, _) in enumerate(self._compiled_groups):
        for j, (p2, _) in enumerate(self._compiled_groups):
            if i < j:
                for test in ["/api/", "/admin/", "/api/v1/"]:
                    if p1.match(test) and p2.match(test):
                        logger.warning(
                            "PolicyGroup patterns %r and %r both match %r — first wins",
                            p1.pattern, p2.pattern, test,
                        )

    # Startup warnings for missing policy files
    if self._scanned_policies is not None:
        if config.default_policy and config.default_policy not in self._scanned_policies:
            logger.warning(
                "default_policy %r not found in %s", config.default_policy, policies_dir,
            )
        for group in config.policy_groups:
            if group.policy_path not in self._scanned_policies:
                logger.warning(
                    "PolicyGroup policy_path %r not found in %s",
                    group.policy_path, policies_dir,
                )

skip_middleware

Decorator to mark a route as excluded from authorization middleware.

The decorated endpoint will not be checked by TopazMiddleware, allowing you to implement custom authorization logic.

from fastapi_topaz import skip_middleware

@app.post("/documents/bulk-import")
@skip_middleware
async def bulk_import(
    _=Depends(require_policy_allowed(config, "myapp.admin.bulk_import")),
):
    # Custom policy path, not auto-generated
    ...
Source code in src/fastapi_topaz/middleware.py
def skip_middleware(func: Callable) -> Callable:
    """
    Decorator to mark a route as excluded from authorization middleware.

    The decorated endpoint will not be checked by TopazMiddleware,
    allowing you to implement custom authorization logic.

    ```python
    from fastapi_topaz import skip_middleware

    @app.post("/documents/bulk-import")
    @skip_middleware
    async def bulk_import(
        _=Depends(require_policy_allowed(config, "myapp.admin.bulk_import")),
    ):
        # Custom policy path, not auto-generated
        ...
    ```
    """
    func.__skip_topaz_middleware__ = True  # type: ignore[attr-defined]
    return func

SkipMiddleware

Marker dependency to skip authorization middleware for a router or route.

Use as a dependency on a router to exclude all routes from middleware authorization:

from fastapi import APIRouter, Depends
from fastapi_topaz import SkipMiddleware

public_router = APIRouter(
    prefix="/api/public",
    dependencies=[Depends(SkipMiddleware)],
)

@public_router.get("/status")  # Automatically excluded from middleware
async def public_status():
    return {"status": "ok"}
Source code in src/fastapi_topaz/middleware.py
def __init__(self) -> None:
    pass

Circuit Breaker

CircuitBreaker

Circuit breaker configuration for Topaz authorization.

Prevents cascading failures when Topaz is unavailable by opening the circuit after a threshold of failures, using fallback strategies, and automatically testing for recovery.

PARAMETER DESCRIPTION
failure_threshold

Number of consecutive failures before opening circuit

TYPE: int DEFAULT: 5

success_threshold

Number of successes in half-open before closing

TYPE: int DEFAULT: 2

recovery_timeout

Seconds to wait before transitioning to half-open

TYPE: float DEFAULT: 30.0

fallback

Strategy when circuit is open ("cache_then_deny", "cache_then_allow", "deny", "allow", or custom callable)

TYPE: FallbackStrategy DEFAULT: 'cache_then_deny'

serve_stale_cache

Whether to serve expired cache entries when open

TYPE: bool DEFAULT: True

stale_cache_ttl

Maximum age (seconds) of stale cache to serve

TYPE: float DEFAULT: 300.0

failure_exceptions

Exception types that count as failures

TYPE: list[type] DEFAULT: (lambda: [ConnectionError, TimeoutError, OSError])()

timeout_ms

Consider timeout after this many milliseconds

TYPE: int DEFAULT: 5000

on_state_change

Callback when circuit state changes

TYPE: Callable[[str, str, str], None] | None DEFAULT: None

on_fallback

Callback when fallback is used

TYPE: Callable[[Request, str, bool | None, bool], None] | None DEFAULT: None

half_open_max_requests

Number of test requests allowed in half-open state

TYPE: int DEFAULT: 1

Example
config = TopazConfig(
    ...,
    circuit_breaker=CircuitBreaker(
        failure_threshold=5,
        recovery_timeout=30,
        fallback="cache_then_deny",
    ),
)

state property

state: CircuitState

Get current circuit state.

status

status() -> CircuitStatus

Get current circuit status for health checks.

Source code in src/fastapi_topaz/circuit_breaker.py
def status(self) -> CircuitStatus:
    """Get current circuit status for health checks."""
    return CircuitStatus(
        state=self._state.value,
        failure_count=self._failure_count,
        success_count=self._success_count,
        last_failure_time=self._last_failure_time,
        last_success_time=self._last_success_time,
        open_since=self._open_since,
    )

should_allow_request async

should_allow_request() -> bool

Check if a request should be allowed through to Topaz.

Returns True if the circuit is closed or if we should test in half-open. Returns False if the circuit is open and fallback should be used.

Source code in src/fastapi_topaz/circuit_breaker.py
async def should_allow_request(self) -> bool:
    """
    Check if a request should be allowed through to Topaz.

    Returns True if the circuit is closed or if we should test in half-open.
    Returns False if the circuit is open and fallback should be used.
    """
    async with self._lock:
        if self._state == CircuitState.CLOSED:
            return True

        if self._state == CircuitState.OPEN:
            # Check if recovery timeout has passed
            if self._open_since is not None:
                elapsed = time.monotonic() - self._open_since
                if elapsed >= self.recovery_timeout:
                    await self._transition_to(
                        CircuitState.HALF_OPEN, "recovery_timeout_expired"
                    )
                    self._half_open_requests = 1
                    return True
            return False

        if self._state == CircuitState.HALF_OPEN:
            # Allow limited requests in half-open state
            if self._half_open_requests < self.half_open_max_requests:
                self._half_open_requests += 1
                return True
            return False

        return False

record_success async

record_success() -> None

Record a successful authorization call.

Source code in src/fastapi_topaz/circuit_breaker.py
async def record_success(self) -> None:
    """Record a successful authorization call."""
    async with self._lock:
        self._last_success_time = time.monotonic()
        self._failure_count = 0

        if self._state == CircuitState.HALF_OPEN:
            self._success_count += 1
            if self._success_count >= self.success_threshold:
                await self._transition_to(CircuitState.CLOSED, "test_succeeded")

record_failure async

record_failure(exception: Exception) -> None

Record a failed authorization call.

Source code in src/fastapi_topaz/circuit_breaker.py
async def record_failure(self, exception: Exception) -> None:
    """Record a failed authorization call."""
    async with self._lock:
        self._last_failure_time = time.monotonic()
        self._failure_count += 1
        self._success_count = 0

        logger.warning(
            f"Circuit breaker recorded failure #{self._failure_count}: {exception}"
        )

        if self._state == CircuitState.CLOSED:
            if self._failure_count >= self.failure_threshold:
                await self._transition_to(
                    CircuitState.OPEN, "failure_threshold_exceeded"
                )
        elif self._state == CircuitState.HALF_OPEN:
            await self._transition_to(CircuitState.OPEN, "test_failed")

get_fallback_decision async

get_fallback_decision(request: Request, policy_path: str, resource_context: dict[str, Any], cached_decision: bool | None, error: Exception) -> bool

Get the fallback decision when circuit is open.

PARAMETER DESCRIPTION
request

The FastAPI request

TYPE: Request

policy_path

The policy path being checked

TYPE: str

resource_context

The resource context

TYPE: dict[str, Any]

cached_decision

Cached decision (may be stale), or None if not cached

TYPE: bool | None

error

The exception that caused the fallback

TYPE: Exception

RETURNS DESCRIPTION
bool

Authorization decision (True/False)

Source code in src/fastapi_topaz/circuit_breaker.py
async def get_fallback_decision(
    self,
    request: Request,
    policy_path: str,
    resource_context: dict[str, Any],
    cached_decision: bool | None,
    error: Exception,
) -> bool:
    """
    Get the fallback decision when circuit is open.

    Args:
        request: The FastAPI request
        policy_path: The policy path being checked
        resource_context: The resource context
        cached_decision: Cached decision (may be stale), or None if not cached
        error: The exception that caused the fallback

    Returns:
        Authorization decision (True/False)
    """
    # Check no_stale_for patterns
    if cached_decision is not None and self.no_stale_for:
        import fnmatch

        for pattern in self.no_stale_for:
            if fnmatch.fnmatch(policy_path, pattern):
                cached_decision = None
                break

    if callable(self.fallback):
        result = self.fallback(
            request, policy_path, resource_context, cached_decision, error
        )
        # Support both sync and async callables
        if asyncio.iscoroutine(result):
            result = await result  # type: ignore[misc]
        return result

    strategy = self.fallback

    if strategy == "cache_then_deny":
        if cached_decision is not None:
            return cached_decision
        return False

    if strategy == "cache_then_allow":
        if cached_decision is not None:
            return cached_decision
        return True

    if strategy == "deny":
        return False

    if strategy == "allow":
        return True

    # Default to deny for unknown strategies
    logger.error(f"Unknown fallback strategy: {strategy}, defaulting to deny")
    return False

reset async

reset() -> None

Reset the circuit breaker to initial state.

Source code in src/fastapi_topaz/circuit_breaker.py
async def reset(self) -> None:
    """Reset the circuit breaker to initial state."""
    async with self._lock:
        await self._transition_to(CircuitState.CLOSED, "manual_reset")
        self._failure_count = 0
        self._success_count = 0
        self._last_failure_time = None
        self._last_success_time = None
        self._open_since = None
        self._half_open_requests = 0

is_failure_exception

is_failure_exception(exc: Exception) -> bool

Check if an exception should count as a circuit breaker failure.

Source code in src/fastapi_topaz/circuit_breaker.py
def is_failure_exception(self, exc: Exception) -> bool:
    """Check if an exception should count as a circuit breaker failure."""
    return any(isinstance(exc, exc_type) for exc_type in self.failure_exceptions)

CircuitState

Bases: Enum

Circuit breaker states.


CircuitStatus

Current status of the circuit breaker for health checks.

is_open property

is_open: bool

Check if circuit is currently open.


Connection Pool

ConnectionPool

Async connection pool for AuthorizerClient connections.

Manages a pool of gRPC connections to Topaz, reusing them across requests to reduce connection overhead.

PARAMETER DESCRIPTION
min_connections

Minimum connections to keep warm

TYPE: int DEFAULT: 2

max_connections

Maximum connections allowed

TYPE: int DEFAULT: 10

acquire_timeout

Seconds to wait for a connection

TYPE: float DEFAULT: 5.0

connection_timeout

Seconds to establish a new connection

TYPE: float DEFAULT: 10.0

max_idle_time

Seconds before closing idle connections

TYPE: float DEFAULT: 300.0

idle_check_interval

Seconds between idle cleanup runs

TYPE: float DEFAULT: 60.0

health_check_interval

Seconds between health checks

TYPE: float DEFAULT: 30.0

health_check_timeout

Seconds for health check to complete

TYPE: float DEFAULT: 5.0

eager_init

Create min_connections at initialization

TYPE: bool DEFAULT: False

retry_on_failure

Retry failed connection creation

TYPE: bool DEFAULT: True

max_retries

Maximum connection creation retries

TYPE: int DEFAULT: 3

configure

configure(authorizer_options: AuthorizerOptions) -> None

Configure the pool with authorizer options.

Source code in src/fastapi_topaz/connection_pool.py
def configure(self, authorizer_options: AuthorizerOptions) -> None:
    """Configure the pool with authorizer options."""
    self._authorizer_options = authorizer_options
    self._semaphore = asyncio.Semaphore(self.max_connections)

initialize async

initialize() -> None

Initialize the pool, optionally creating min_connections.

Source code in src/fastapi_topaz/connection_pool.py
async def initialize(self) -> None:
    """Initialize the pool, optionally creating min_connections."""
    if self._initialized:
        return

    async with self._lock:
        if self._initialized:
            return

        if self._semaphore is None:
            self._semaphore = asyncio.Semaphore(self.max_connections)

        if self.eager_init and self._authorizer_options:
            for _ in range(self.min_connections):
                try:
                    conn = await self._create_connection()
                    await self._idle.put(conn)
                except Exception as e:
                    logger.warning(f"Failed to create initial connection: {e}")

        # Start background cleanup task
        if self.idle_check_interval > 0:
            self._cleanup_task = asyncio.create_task(self._idle_cleanup_loop())

        self._initialized = True
        logger.info(
            f"Connection pool initialized: min={self.min_connections}, "
            f"max={self.max_connections}"
        )

acquire async

acquire() -> PooledConnection

Acquire a connection from the pool.

Returns an idle connection if available, creates a new one if under max_connections, or waits for one to become available.

RAISES DESCRIPTION
TimeoutError

If acquire_timeout is exceeded

Source code in src/fastapi_topaz/connection_pool.py
async def acquire(self) -> PooledConnection:
    """
    Acquire a connection from the pool.

    Returns an idle connection if available, creates a new one if under
    max_connections, or waits for one to become available.

    Raises:
        asyncio.TimeoutError: If acquire_timeout is exceeded
    """
    if self._closed:
        raise RuntimeError("Connection pool is closed")

    if not self._initialized:
        await self.initialize()

    assert self._semaphore is not None

    # Wait for a slot (respects max_connections)
    try:
        await asyncio.wait_for(
            self._semaphore.acquire(),
            timeout=self.acquire_timeout,
        )
    except asyncio.TimeoutError:
        logger.warning("Timeout waiting to acquire connection from pool")
        raise

    # Try to get an idle connection
    try:
        conn = self._idle.get_nowait()
        conn.mark_used()
        self._busy.add(conn)
        logger.debug(f"Acquired idle connection, busy: {len(self._busy)}")
        return conn
    except asyncio.QueueEmpty:
        pass

    # Create a new connection
    try:
        conn = await self._create_connection()
        conn.mark_used()
        self._busy.add(conn)
        logger.debug(f"Created new connection for acquire, busy: {len(self._busy)}")
        return conn
    except Exception:
        # Release semaphore if we failed to create connection
        self._semaphore.release()
        raise

release async

release(conn: PooledConnection) -> None

Return a connection to the pool.

Source code in src/fastapi_topaz/connection_pool.py
async def release(self, conn: PooledConnection) -> None:
    """Return a connection to the pool."""
    if conn not in self._busy:
        logger.warning("Attempted to release connection not marked as busy")
        return

    self._busy.discard(conn)

    if conn.healthy and not self._closed:
        conn.mark_used()
        await self._idle.put(conn)
        logger.debug("Released connection back to idle pool")
    else:
        # Remove unhealthy connection
        self._connections.discard(conn)
        logger.debug("Discarded unhealthy connection")

    if self._semaphore:
        self._semaphore.release()

connection async

connection() -> AsyncIterator[PooledConnection]

Context manager for acquiring and releasing connections.

Source code in src/fastapi_topaz/connection_pool.py
@asynccontextmanager
async def connection(self) -> AsyncIterator[PooledConnection]:
    """Context manager for acquiring and releasing connections."""
    conn = await self.acquire()
    try:
        yield conn
    finally:
        await self.release(conn)

status

status() -> PoolStatus

Get current pool status for health checks.

Source code in src/fastapi_topaz/connection_pool.py
def status(self) -> PoolStatus:
    """Get current pool status for health checks."""
    idle_count = self._idle.qsize()
    busy_count = len(self._busy)
    healthy_count = sum(1 for c in self._connections if c.healthy)

    return PoolStatus(
        total=len(self._connections),
        idle=idle_count,
        busy=busy_count,
        healthy_connections=healthy_count,
        max_connections=self.max_connections,
        min_connections=self.min_connections,
    )

close async

close() -> None

Close all connections and shut down the pool.

Source code in src/fastapi_topaz/connection_pool.py
async def close(self) -> None:
    """Close all connections and shut down the pool."""
    self._closed = True

    # Cancel cleanup task
    if self._cleanup_task:
        self._cleanup_task.cancel()
        try:
            await self._cleanup_task
        except asyncio.CancelledError:
            pass

    # Clear idle queue
    while True:
        try:
            self._idle.get_nowait()
        except asyncio.QueueEmpty:
            break

    # Clear all connections
    self._connections.clear()
    self._busy.clear()

    logger.info("Connection pool closed")

PoolStatus

Current status of the connection pool.

healthy property

healthy: bool

Pool is healthy if we have at least one healthy connection.


Audit Logging

AuditLogger

Audit logger for authorization decisions.

PARAMETER DESCRIPTION
log_allowed

Log successful authorizations

TYPE: bool DEFAULT: True

log_denied

Log denied authorizations

TYPE: bool DEFAULT: True

log_skipped

Log skipped/excluded routes

TYPE: bool DEFAULT: False

log_unauthenticated

Log 401 events

TYPE: bool DEFAULT: True

log_manual_checks

Log is_allowed(), check_relations()

TYPE: bool DEFAULT: False

level_allowed

Log level for allowed events

TYPE: str DEFAULT: 'INFO'

level_denied

Log level for denied events

TYPE: str DEFAULT: 'WARNING'

level_unauthenticated

Log level for 401 events

TYPE: str DEFAULT: 'WARNING'

level_skipped

Log level for skipped events

TYPE: str DEFAULT: 'DEBUG'

include_resource_context

Include resource context in logs

TYPE: bool DEFAULT: True

include_request_headers

Include HTTP headers (privacy concern)

TYPE: bool DEFAULT: False

handler

Custom async handler for events

TYPE: AuditHandler | None DEFAULT: None

log_decision async

log_decision(request: Request | None, policy_path: str, allowed: bool, *, source: str = 'dependency', check_type: str = 'policy', cached: bool = False, latency_ms: float | None = None, identity_type: str | None = None, identity_value: str | None = None, object_type: str | None = None, object_id: str | None = None, relation: str | None = None, subject_type: str | None = None, resource_context: dict[str, Any] | None = None, policy_resolution_source: str | None = None) -> None

Log an authorization decision.

Source code in src/fastapi_topaz/audit.py
async def log_decision(
    self,
    request: Request | None,
    policy_path: str,
    allowed: bool,
    *,
    source: str = "dependency",
    check_type: str = "policy",
    cached: bool = False,
    latency_ms: float | None = None,
    identity_type: str | None = None,
    identity_value: str | None = None,
    object_type: str | None = None,
    object_id: str | None = None,
    relation: str | None = None,
    subject_type: str | None = None,
    resource_context: dict[str, Any] | None = None,
    policy_resolution_source: str | None = None,
) -> None:
    """Log an authorization decision."""
    if allowed and not self.log_allowed:
        return
    if not allowed and not self.log_denied:
        return

    decision = "allowed" if allowed else "denied"
    event_name = f"authorization.{source}.{decision}"
    level = self.level_allowed if allowed else self.level_denied

    event = AuditEvent(
        event=event_name,
        level=level,
        request_id=self._get_request_id(request),
        source=source,
        identity_type=identity_type,
        identity_value=identity_value,
        policy_path=policy_path,
        decision=decision,
        check_type=check_type,
        cached=cached,
        latency_ms=latency_ms,
        method=request.method if request else None,
        path=str(request.url.path) if request else None,
        client_ip=self._get_client_ip(request),
        object_type=object_type,
        object_id=object_id,
        relation=relation,
        subject_type=subject_type,
        resource_context=resource_context if self.include_resource_context else None,
        policy_resolution_source=policy_resolution_source,
    )

    await self._emit(event)

log_batch_check async

log_batch_check(request: Request | None, object_type: str, object_id: str, results: dict[str, bool], *, latency_ms: float | None = None, identity_value: str | None = None) -> None

Log batch relation check results.

Source code in src/fastapi_topaz/audit.py
async def log_batch_check(
    self,
    request: Request | None,
    object_type: str,
    object_id: str,
    results: dict[str, bool],
    *,
    latency_ms: float | None = None,
    identity_value: str | None = None,
) -> None:
    """Log batch relation check results."""
    if not self.log_manual_checks:
        return

    event = AuditEvent(
        event="authorization.check.relations",
        level=self.level_allowed,
        request_id=self._get_request_id(request),
        source="manual",
        identity_value=identity_value,
        check_type="rebac_batch",
        latency_ms=latency_ms,
        method=request.method if request else None,
        path=str(request.url.path) if request else None,
        client_ip=self._get_client_ip(request),
        object_type=object_type,
        object_id=object_id,
        results=results,
    )

    await self._emit(event)

log_unauthenticated_event async

log_unauthenticated_event(request: Request | None, reason: str = 'missing_identity') -> None

Log unauthenticated access attempt.

Source code in src/fastapi_topaz/audit.py
async def log_unauthenticated_event(
    self,
    request: Request | None,
    reason: str = "missing_identity",
) -> None:
    """Log unauthenticated access attempt."""
    if not self.log_unauthenticated:
        return

    event = AuditEvent(
        event="authorization.middleware.unauthenticated",
        level=self.level_unauthenticated,
        request_id=self._get_request_id(request),
        source="middleware",
        anonymous=True,
        method=request.method if request else None,
        path=str(request.url.path) if request else None,
        client_ip=self._get_client_ip(request),
        reason=reason,
    )

    await self._emit(event)

AuditEvent

Structured audit event for authorization decisions.

to_dict

to_dict() -> dict[str, Any]

Convert to structured dict for logging.

Source code in src/fastapi_topaz/audit.py
def to_dict(self) -> dict[str, Any]:
    """Convert to structured dict for logging."""
    data: dict[str, Any] = {
        "timestamp": self.timestamp,
        "level": self.level,
        "event": self.event,
        "source": self.source,
    }
    if self.request_id:
        data["request_id"] = self.request_id

    # Identity block
    if self.identity_value is not None:
        data["identity"] = {
            "type": self.identity_type,
            "value": self.identity_value,
            "anonymous": self.anonymous,
        }
    elif self.anonymous:
        data["identity"] = None

    # Authorization block
    auth: dict[str, Any] = {}
    if self.policy_path:
        auth["policy_path"] = self.policy_path
    if self.decision:
        auth["decision"] = self.decision
    if self.check_type != "policy":
        auth["check_type"] = self.check_type
    if self.cached:
        auth["cached"] = True
    if self.latency_ms is not None:
        auth["latency_ms"] = round(self.latency_ms, 2)
    if self.policy_resolution_source:
        auth["resolution_source"] = self.policy_resolution_source
    if auth:
        data["authorization"] = auth

    # Request block
    req: dict[str, Any] = {}
    if self.method:
        req["method"] = self.method
    if self.path:
        req["path"] = self.path
    if self.route_pattern:
        req["route_pattern"] = self.route_pattern
    if self.client_ip:
        req["ip"] = self.client_ip
    if req:
        data["request"] = req

    # Resource block (ReBAC)
    if self.object_type or self.object_id or self.relation:
        data["resource"] = {
            k: v for k, v in {
                "object_type": self.object_type,
                "object_id": self.object_id,
                "relation": self.relation,
                "subject_type": self.subject_type,
            }.items() if v is not None
        }

    # Additional fields
    if self.reason:
        data["reason"] = self.reason
    if self.results:
        data["results"] = self.results
    if self.resource_context:
        data["resource_context"] = self.resource_context

    return data

to_json

to_json() -> str

Convert to JSON string.

Source code in src/fastapi_topaz/audit.py
def to_json(self) -> str:
    """Convert to JSON string."""
    return json.dumps(self.to_dict())

Observability

PrometheusMetrics

Prometheus metrics collector for authorization decisions.

PARAMETER DESCRIPTION
prefix

Metric name prefix (default: "topaz")

TYPE: str DEFAULT: 'topaz'

include_policy_path

Add policy_path label (high cardinality)

TYPE: bool DEFAULT: False

include_object_type

Add object_type label (medium cardinality)

TYPE: bool DEFAULT: False

include_relation

Add relation label (medium cardinality)

TYPE: bool DEFAULT: False

latency_buckets

Histogram buckets for latency

TYPE: list[float] DEFAULT: (lambda: [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5])()

registry

Custom prometheus registry (default: global)

TYPE: Any DEFAULT: None

record_auth_request

record_auth_request(source: str, decision: str, check_type: str, policy_path: str | None = None) -> None

Record an authorization request.

Source code in src/fastapi_topaz/observability.py
def record_auth_request(
    self,
    source: str,
    decision: str,
    check_type: str,
    policy_path: str | None = None,
) -> None:
    """Record an authorization request."""
    self._initialize()
    if not PROMETHEUS_AVAILABLE or not self._auth_requests:
        return

    labels = {"source": source, "decision": decision, "check_type": check_type}
    if self.include_policy_path and policy_path:
        labels["policy_path"] = policy_path

    self._auth_requests.labels(**labels).inc()

record_cache_hit

record_cache_hit(source: str) -> None

Record a cache hit.

Source code in src/fastapi_topaz/observability.py
def record_cache_hit(self, source: str) -> None:
    """Record a cache hit."""
    self._initialize()
    if not PROMETHEUS_AVAILABLE or not self._cache_hits:
        return
    self._cache_hits.labels(source=source).inc()

record_cache_miss

record_cache_miss(source: str) -> None

Record a cache miss.

Source code in src/fastapi_topaz/observability.py
def record_cache_miss(self, source: str) -> None:
    """Record a cache miss."""
    self._initialize()
    if not PROMETHEUS_AVAILABLE or not self._cache_misses:
        return
    self._cache_misses.labels(source=source).inc()

record_latency

record_latency(latency_seconds: float, source: str, cached: bool, policy_path: str | None = None) -> None

Record authorization latency.

Source code in src/fastapi_topaz/observability.py
def record_latency(
    self,
    latency_seconds: float,
    source: str,
    cached: bool,
    policy_path: str | None = None,
) -> None:
    """Record authorization latency."""
    self._initialize()
    if not PROMETHEUS_AVAILABLE or not self._auth_latency:
        return

    labels = {"source": source, "cached": str(cached).lower()}
    if self.include_policy_path and policy_path:
        labels["policy_path"] = policy_path

    self._auth_latency.labels(**labels).observe(latency_seconds)

record_topaz_latency

record_topaz_latency(latency_seconds: float) -> None

Record actual Topaz call latency.

Source code in src/fastapi_topaz/observability.py
def record_topaz_latency(self, latency_seconds: float) -> None:
    """Record actual Topaz call latency."""
    self._initialize()
    if not PROMETHEUS_AVAILABLE or not self._topaz_latency:
        return
    self._topaz_latency.observe(latency_seconds)

record_error

record_error(error_type: str) -> None

Record an authorization error.

Source code in src/fastapi_topaz/observability.py
def record_error(self, error_type: str) -> None:
    """Record an authorization error."""
    self._initialize()
    if not PROMETHEUS_AVAILABLE or not self._errors:
        return
    self._errors.labels(error_type=error_type).inc()

set_circuit_state

set_circuit_state(state: int) -> None

Set circuit breaker state (0=closed, 1=open, 2=half_open).

Source code in src/fastapi_topaz/observability.py
def set_circuit_state(self, state: int) -> None:
    """Set circuit breaker state (0=closed, 1=open, 2=half_open)."""
    self._initialize()
    if not PROMETHEUS_AVAILABLE or not self._circuit_state:
        return
    self._circuit_state.set(state)

record_circuit_transition

record_circuit_transition(from_state: str, to_state: str) -> None

Record circuit state transition.

Source code in src/fastapi_topaz/observability.py
def record_circuit_transition(self, from_state: str, to_state: str) -> None:
    """Record circuit state transition."""
    self._initialize()
    if not PROMETHEUS_AVAILABLE or not self._circuit_transitions:
        return
    self._circuit_transitions.labels(from_state=from_state, to_state=to_state).inc()

record_fallback

record_fallback(strategy: str, cache_hit: bool, decision: str) -> None

Record circuit breaker fallback.

Source code in src/fastapi_topaz/observability.py
def record_fallback(self, strategy: str, cache_hit: bool, decision: str) -> None:
    """Record circuit breaker fallback."""
    self._initialize()
    if not PROMETHEUS_AVAILABLE or not self._fallback:
        return
    self._fallback.labels(
        strategy=strategy,
        cache_hit=str(cache_hit).lower(),
        decision=decision,
    ).inc()

set_cache_size

set_cache_size(size: int) -> None

Set current cache size.

Source code in src/fastapi_topaz/observability.py
def set_cache_size(self, size: int) -> None:
    """Set current cache size."""
    self._initialize()
    if not PROMETHEUS_AVAILABLE or not self._cache_size:
        return
    self._cache_size.set(size)

OTelTracing

OpenTelemetry tracing for authorization decisions.

PARAMETER DESCRIPTION
trace_all_checks

Trace all authorization checks

TYPE: bool DEFAULT: True

trace_cache_operations

Include cache lookup/store spans

TYPE: bool DEFAULT: True

include_identity

Add identity to span attributes (privacy risk)

TYPE: bool DEFAULT: False

include_policy_path

Add policy_path to spans

TYPE: bool DEFAULT: False

include_resource_context

Add full resource context (privacy risk)

TYPE: bool DEFAULT: False

span_name_prefix

Prefix for span names

TYPE: str DEFAULT: 'topaz'

start_auth_span

start_auth_span(source: str, check_type: str, policy_path: str | None = None, identity_value: str | None = None) -> Any

Start an authorization span.

Source code in src/fastapi_topaz/observability.py
def start_auth_span(
    self,
    source: str,
    check_type: str,
    policy_path: str | None = None,
    identity_value: str | None = None,
) -> Any:
    """Start an authorization span."""
    tracer = self._get_tracer()
    if not tracer or not self.trace_all_checks:
        return None

    attributes = {
        f"{self.span_name_prefix}.source": source,
        f"{self.span_name_prefix}.check_type": check_type,
    }

    if self.include_policy_path and policy_path:
        attributes[f"{self.span_name_prefix}.policy_path"] = policy_path

    if self.include_identity and identity_value:
        attributes[f"{self.span_name_prefix}.identity"] = identity_value

    return tracer.start_span(
        f"{self.span_name_prefix}.authorization",
        attributes=attributes,
    )

end_auth_span

end_auth_span(span: Any, decision: str, cached: bool, latency_ms: float, resource_context: dict | None = None) -> None

End an authorization span with results.

Source code in src/fastapi_topaz/observability.py
def end_auth_span(
    self,
    span: Any,
    decision: str,
    cached: bool,
    latency_ms: float,
    resource_context: dict | None = None,
) -> None:
    """End an authorization span with results."""
    if not span or not OTEL_AVAILABLE:
        return

    span.set_attribute(f"{self.span_name_prefix}.decision", decision)
    span.set_attribute(f"{self.span_name_prefix}.cached", cached)
    span.set_attribute(f"{self.span_name_prefix}.latency_ms", latency_ms)

    if self.include_resource_context and resource_context:
        span.set_attribute(
            f"{self.span_name_prefix}.resource_context",
            str(resource_context),
        )

    span.set_attribute(f"{self.span_name_prefix}.denied", decision == "denied")
    span.set_status(Status(StatusCode.OK))

    span.end()

start_cache_span

start_cache_span(operation: str) -> Any

Start a cache operation span.

Source code in src/fastapi_topaz/observability.py
def start_cache_span(self, operation: str) -> Any:
    """Start a cache operation span."""
    tracer = self._get_tracer()
    if not tracer or not self.trace_cache_operations:
        return None

    return tracer.start_span(f"{self.span_name_prefix}.cache.{operation}")

end_cache_span

end_cache_span(span: Any, hit: bool | None = None) -> None

End a cache operation span.

Source code in src/fastapi_topaz/observability.py
def end_cache_span(self, span: Any, hit: bool | None = None) -> None:
    """End a cache operation span."""
    if not span or not OTEL_AVAILABLE:
        return

    if hit is not None:
        span.set_attribute("hit", hit)

    span.set_status(Status(StatusCode.OK))
    span.end()

start_topaz_span

start_topaz_span() -> Any

Start a Topaz request span.

Source code in src/fastapi_topaz/observability.py
def start_topaz_span(self) -> Any:
    """Start a Topaz request span."""
    tracer = self._get_tracer()
    if not tracer or not self.trace_all_checks:
        return None

    return tracer.start_span(f"{self.span_name_prefix}.topaz.request")

end_topaz_span

end_topaz_span(span: Any, latency_ms: float) -> None

End a Topaz request span.

Source code in src/fastapi_topaz/observability.py
def end_topaz_span(self, span: Any, latency_ms: float) -> None:
    """End a Topaz request span."""
    if not span or not OTEL_AVAILABLE:
        return

    span.set_attribute("latency_ms", latency_ms)
    span.set_status(Status(StatusCode.OK))
    span.end()

record_error

record_error(span: Any, error: Exception) -> None

Record an error on a span.

Source code in src/fastapi_topaz/observability.py
def record_error(self, span: Any, error: Exception) -> None:
    """Record an error on a span."""
    if not span or not OTEL_AVAILABLE:
        return

    span.set_status(Status(StatusCode.ERROR, str(error)))
    span.record_exception(error)
    span.end()

get_current_trace_id

get_current_trace_id() -> str | None

Get current trace ID for correlation.

Source code in src/fastapi_topaz/observability.py
def get_current_trace_id(self) -> str | None:
    """Get current trace ID for correlation."""
    if not OTEL_AVAILABLE:
        return None

    span = trace.get_current_span()  # type: ignore[union-attr]
    if span and span.get_span_context().is_valid:
        return format(span.get_span_context().trace_id, "032x")
    return None

Re-exported Types

These types are re-exported from aserto.client for convenience:

from fastapi_topaz import (
    AuthorizerOptions,
    Identity,
    IdentityType,
    ResourceContext,
)

AuthorizationError

Bases: Exception


Type Aliases

from fastapi_topaz._defaults import (
    IdentityMapper,   # Callable[[], Identity]
    StringMapper,     # Callable[[], str]
    ObjectMapper,     # Callable[[], Obj]
    ResourceMapper,   # Callable[[], ResourceContext]
)