Skip to content

Common Library

The verity-common package (libs/verity-common/) provides shared utilities used by every service in the Verity platform. It centralises configuration management, database access, Kafka messaging, Prometheus metrics, structured logging, and event schemas.

Module Overview

graph LR
    subgraph verity-common
        CONFIG["config.py<br/><i>VerityConfig</i>"]
        DB["database.py<br/><i>Async SQLAlchemy</i>"]
        KAFKA["kafka.py<br/><i>Producer / Consumer</i>"]
        METRICS["metrics.py<br/><i>Prometheus Counters</i>"]
        LOGGING["logging.py<br/><i>structlog JSON</i>"]
        SCHEMAS["schemas.py<br/><i>Kafka Event Schemas</i>"]
        MODELS["models.py<br/><i>Domain Models</i>"]
    end

    CONFIG --> DB
    CONFIG --> KAFKA
    SERVICES["All Services"] --> CONFIG
    SERVICES --> DB
    SERVICES --> KAFKA
    SERVICES --> METRICS
    SERVICES --> LOGGING
    SERVICES --> SCHEMAS
    SERVICES --> MODELS

    style CONFIG fill:#7c3aed,color:#fff
    style DB fill:#7c3aed,color:#fff
    style KAFKA fill:#7c3aed,color:#fff
    style METRICS fill:#7c3aed,color:#fff
    style LOGGING fill:#7c3aed,color:#fff
    style SCHEMAS fill:#7c3aed,color:#fff
    style MODELS fill:#7c3aed,color:#fff

config.py — Configuration

Aggregated settings for the entire platform, powered by pydantic-settings.

VerityConfig

Top-level configuration that composes sub-configs:

from pydantic_settings import BaseSettings

class VerityConfig(BaseSettings):
    model_config = {"env_prefix": "VERITY_"}

    db: DatabaseSettings = DatabaseSettings()
    kafka: KafkaSettings = KafkaSettings()

    redis_url: str = "redis://localhost:6379/0"
    temporal_host: str = "localhost:7233"
    temporal_namespace: str = "verity"

    azure_tenant_id: Optional[str] = None
    azure_client_id: Optional[str] = None

DatabaseSettings

Defined in database.py, env prefix DB_:

Environment Variable Field Default
DB_HOST host localhost
DB_PORT port 5432
DB_USER user verity
DB_PASSWORD password verity
DB_DATABASE database verity
DB_MIN_POOL min_pool 2
DB_MAX_POOL max_pool 10

Provides a url property that generates the async connection string:

postgresql+asyncpg://{user}:{password}@{host}:{port}/{database}

KafkaSettings

Defined in kafka.py, env prefix KAFKA_:

Environment Variable Field Default
KAFKA_BOOTSTRAP_SERVERS bootstrap_servers localhost:9092
KAFKA_SECURITY_PROTOCOL security_protocol PLAINTEXT
KAFKA_SASL_MECHANISM sasl_mechanism None
KAFKA_SASL_USERNAME sasl_username None
KAFKA_SASL_PASSWORD sasl_password None

RedisSettings

Configured via the top-level VerityConfig.redis_url string. In production, this points to Azure Cache for Redis with TLS:

rediss://verity-prod-redis.redis.cache.windows.net:6380

database.py — Async SQLAlchemy

Provides async database engine and session management using SQLAlchemy's async API with asyncpg.

Functions

create_async_engine(settings: DatabaseSettings) -> AsyncEngine

Creates and returns a SQLAlchemy async engine:

from verity_common.database import create_async_engine, DatabaseSettings

settings = DatabaseSettings()
engine = create_async_engine(settings)

Engine configuration:

  • pool_size = settings.max_pool (default 10)
  • max_overflow = 0
  • pool_pre_ping = True (validates connections before use)

init_engine(settings: DatabaseSettings) -> AsyncEngine

Initialises the module-level engine and session factory (singleton pattern):

from verity_common.database import init_engine, get_session, DatabaseSettings

engine = init_engine(DatabaseSettings())

get_session() -> AsyncGenerator[AsyncSession, None]

Async generator for dependency injection of database sessions. Raises RuntimeError if init_engine() hasn't been called:

async for session in get_session():
    result = await session.execute(...)

kafka.py — Kafka Helpers

Producer and consumer factories with JSON serialisation, built on aiokafka.

Serialisation

def json_serializer(value: Any) -> bytes:
    """Serialize a value to JSON bytes."""
    return json.dumps(value, default=str).encode("utf-8")

def json_deserializer(data: bytes) -> Any:
    """Deserialize JSON bytes to a Python object."""
    return json.loads(data.decode("utf-8"))

The default=str in the serialiser handles datetime, UUID, and other non-JSON-native types automatically.

create_producer(settings: KafkaSettings) -> AsyncIterator[AIOKafkaProducer]

Async context manager that yields a started producer:

from verity_common.kafka import KafkaSettings, create_producer

async with create_producer(KafkaSettings()) as producer:
    await producer.send("verity.events.raw.azure_ad", value={"event_id": "..."})

SASL authentication is configured automatically when sasl_mechanism is set in the settings.

create_consumer(settings, group_id, topics) -> AsyncIterator[AIOKafkaConsumer]

Async context manager that yields a started consumer:

from verity_common.kafka import KafkaSettings, create_consumer

async with create_consumer(
    KafkaSettings(),
    group_id="decay-engine",
    topics=["verity.events.normalised"],
) as consumer:
    async for msg in consumer:
        process(msg.value)

metrics.py — Prometheus Metrics

Pre-defined Prometheus metrics for all Verity services, using prometheus_client.

Ingestion Metrics

Metric Type Labels Description
verity_events_ingested_total Counter platform, connector Total events ingested from source platforms
verity_events_lag_seconds Gauge platform, connector Lag between event occurrence and ingestion

Scoring Metrics

Metric Type Labels Description
verity_scores_computed_total Counter trigger Total access-decay scores computed
verity_score_computation_duration_seconds Histogram trigger Duration of score computation

Review Metrics

Metric Type Labels Description
verity_reviews_open_total Gauge risk_level Number of currently open review packets
verity_reviews_sla_breached_total Counter risk_level Total reviews that breached their SLA

Remediation Metrics

Metric Type Labels Description
verity_remediations_executed_total Counter platform, status Total remediation actions executed
verity_remediations_failed_total Counter platform Total remediation actions that failed

API Metrics

Metric Type Labels Description
verity_api_request_duration_seconds Histogram method, endpoint, status Duration of API requests

Audit Metrics

Metric Type Labels Description
verity_audit_write_lag_seconds Gauge Lag between audit event creation and persistence

Usage Example

from verity_common.metrics import verity_events_ingested_total

verity_events_ingested_total.labels(platform="azure_ad", connector="aad-prod").inc()

logging.py — Structured Logging

JSON-formatted structured logging via structlog, with trace context injection for distributed tracing.

setup_logging(service_name: str, level: str = "INFO") -> BoundLogger

Configures structlog once per process and returns a bound logger:

from verity_common.logging import setup_logging

logger = setup_logging("decay-engine", level="INFO")
logger.info("engine started", score_threshold=30)

Output (JSON):

{
  "service": "decay-engine",
  "event": "engine started",
  "score_threshold": 30,
  "level": "info",
  "timestamp": "2024-06-15T10:30:00.000000Z"
}

Processor Pipeline

The structlog pipeline applies these processors in order:

  1. merge_contextvars — Merges context variables (e.g. request ID)
  2. add_log_level — Adds level field
  3. TimeStamper(fmt="iso") — Adds ISO 8601 timestamp
  4. _add_trace_context — Injects trace_id and span_id from ContextVars
  5. StackInfoRenderer — Renders stack info if present
  6. format_exc_info — Formats exception tracebacks
  7. JSONRenderer — Outputs JSON

Trace Context

Distributed trace IDs are propagated via Python ContextVars:

from verity_common.logging import trace_id_var, span_id_var

trace_id_var.set("abc-123-trace")
span_id_var.set("def-456-span")
# Subsequent log entries will include trace_id and span_id

get_logger() -> BoundLogger

Returns the current structlog logger (must call setup_logging first):

from verity_common.logging import get_logger

logger = get_logger()
logger.warning("high decay score", principal_id="...", score=15)

PrintLoggerFactory

The logging module uses structlog.PrintLoggerFactory(sys.stdout) instead of structlog.stdlib.LoggerFactory. This avoids the add_logger_name crash that occurs when using PrintLoggerFactory with add_logger_name processor — see Troubleshooting.


schemas.py — Kafka Event Schemas

Pydantic models defining the structure of all Kafka events flowing through the platform.

Schema Kafka Topic Description
RawEvent verity.events.raw.{platform} Raw audit event from a connector
NormalisedEvent verity.events.normalised Enriched event with resolved principal/asset IDs
ScoreUpdatedEvent verity.scores.updated Decay score computation result
ReviewCreatedEvent verity.reviews.created New review packet generated
ReviewDecidedEvent verity.reviews.decided Human decision on a review
RemediationCompletedEvent verity.remediation.completed Result of a remediation action
AuditEvent verity.audit.trail Immutable audit trail event for ClickHouse

Example: Publishing a NormalisedEvent

from verity_common.schemas import NormalisedEvent

event = NormalisedEvent(
    event_id="evt-123",
    occurred_at=datetime.utcnow(),
    principal_id=principal_uuid,
    asset_id=asset_uuid,
    platform="fabric",
    operation="SELECT",
    rows_touched=1500,
    duration_ms=230,
)

await producer.send("verity.events.normalised", value=event.model_dump(mode="json"))