Common Library¶
The verity-common package (libs/verity-common/) provides shared utilities used by every service in the Verity platform. It centralises configuration management, database access, Kafka messaging, Prometheus metrics, structured logging, and event schemas.
Module Overview¶
graph LR
subgraph verity-common
CONFIG["config.py<br/><i>VerityConfig</i>"]
DB["database.py<br/><i>Async SQLAlchemy</i>"]
KAFKA["kafka.py<br/><i>Producer / Consumer</i>"]
METRICS["metrics.py<br/><i>Prometheus Counters</i>"]
LOGGING["logging.py<br/><i>structlog JSON</i>"]
SCHEMAS["schemas.py<br/><i>Kafka Event Schemas</i>"]
MODELS["models.py<br/><i>Domain Models</i>"]
end
CONFIG --> DB
CONFIG --> KAFKA
SERVICES["All Services"] --> CONFIG
SERVICES --> DB
SERVICES --> KAFKA
SERVICES --> METRICS
SERVICES --> LOGGING
SERVICES --> SCHEMAS
SERVICES --> MODELS
style CONFIG fill:#7c3aed,color:#fff
style DB fill:#7c3aed,color:#fff
style KAFKA fill:#7c3aed,color:#fff
style METRICS fill:#7c3aed,color:#fff
style LOGGING fill:#7c3aed,color:#fff
style SCHEMAS fill:#7c3aed,color:#fff
style MODELS fill:#7c3aed,color:#fff
config.py — Configuration¶
Aggregated settings for the entire platform, powered by pydantic-settings.
VerityConfig¶
Top-level configuration that composes sub-configs:
from pydantic_settings import BaseSettings
class VerityConfig(BaseSettings):
model_config = {"env_prefix": "VERITY_"}
db: DatabaseSettings = DatabaseSettings()
kafka: KafkaSettings = KafkaSettings()
redis_url: str = "redis://localhost:6379/0"
temporal_host: str = "localhost:7233"
temporal_namespace: str = "verity"
azure_tenant_id: Optional[str] = None
azure_client_id: Optional[str] = None
DatabaseSettings¶
Defined in database.py, env prefix DB_:
| Environment Variable | Field | Default |
|---|---|---|
DB_HOST |
host |
localhost |
DB_PORT |
port |
5432 |
DB_USER |
user |
verity |
DB_PASSWORD |
password |
verity |
DB_DATABASE |
database |
verity |
DB_MIN_POOL |
min_pool |
2 |
DB_MAX_POOL |
max_pool |
10 |
Provides a url property that generates the async connection string:
KafkaSettings¶
Defined in kafka.py, env prefix KAFKA_:
| Environment Variable | Field | Default |
|---|---|---|
KAFKA_BOOTSTRAP_SERVERS |
bootstrap_servers |
localhost:9092 |
KAFKA_SECURITY_PROTOCOL |
security_protocol |
PLAINTEXT |
KAFKA_SASL_MECHANISM |
sasl_mechanism |
None |
KAFKA_SASL_USERNAME |
sasl_username |
None |
KAFKA_SASL_PASSWORD |
sasl_password |
None |
RedisSettings¶
Configured via the top-level VerityConfig.redis_url string. In production, this points to Azure Cache for Redis with TLS:
database.py — Async SQLAlchemy¶
Provides async database engine and session management using SQLAlchemy's async API with asyncpg.
Functions¶
create_async_engine(settings: DatabaseSettings) -> AsyncEngine¶
Creates and returns a SQLAlchemy async engine:
from verity_common.database import create_async_engine, DatabaseSettings
settings = DatabaseSettings()
engine = create_async_engine(settings)
Engine configuration:
pool_size=settings.max_pool(default 10)max_overflow= 0pool_pre_ping= True (validates connections before use)
init_engine(settings: DatabaseSettings) -> AsyncEngine¶
Initialises the module-level engine and session factory (singleton pattern):
from verity_common.database import init_engine, get_session, DatabaseSettings
engine = init_engine(DatabaseSettings())
get_session() -> AsyncGenerator[AsyncSession, None]¶
Async generator for dependency injection of database sessions. Raises RuntimeError if init_engine() hasn't been called:
kafka.py — Kafka Helpers¶
Producer and consumer factories with JSON serialisation, built on aiokafka.
Serialisation¶
def json_serializer(value: Any) -> bytes:
"""Serialize a value to JSON bytes."""
return json.dumps(value, default=str).encode("utf-8")
def json_deserializer(data: bytes) -> Any:
"""Deserialize JSON bytes to a Python object."""
return json.loads(data.decode("utf-8"))
The default=str in the serialiser handles datetime, UUID, and other non-JSON-native types automatically.
create_producer(settings: KafkaSettings) -> AsyncIterator[AIOKafkaProducer]¶
Async context manager that yields a started producer:
from verity_common.kafka import KafkaSettings, create_producer
async with create_producer(KafkaSettings()) as producer:
await producer.send("verity.events.raw.azure_ad", value={"event_id": "..."})
SASL authentication is configured automatically when sasl_mechanism is set in the settings.
create_consumer(settings, group_id, topics) -> AsyncIterator[AIOKafkaConsumer]¶
Async context manager that yields a started consumer:
from verity_common.kafka import KafkaSettings, create_consumer
async with create_consumer(
KafkaSettings(),
group_id="decay-engine",
topics=["verity.events.normalised"],
) as consumer:
async for msg in consumer:
process(msg.value)
metrics.py — Prometheus Metrics¶
Pre-defined Prometheus metrics for all Verity services, using prometheus_client.
Ingestion Metrics¶
| Metric | Type | Labels | Description |
|---|---|---|---|
verity_events_ingested_total |
Counter | platform, connector |
Total events ingested from source platforms |
verity_events_lag_seconds |
Gauge | platform, connector |
Lag between event occurrence and ingestion |
Scoring Metrics¶
| Metric | Type | Labels | Description |
|---|---|---|---|
verity_scores_computed_total |
Counter | trigger |
Total access-decay scores computed |
verity_score_computation_duration_seconds |
Histogram | trigger |
Duration of score computation |
Review Metrics¶
| Metric | Type | Labels | Description |
|---|---|---|---|
verity_reviews_open_total |
Gauge | risk_level |
Number of currently open review packets |
verity_reviews_sla_breached_total |
Counter | risk_level |
Total reviews that breached their SLA |
Remediation Metrics¶
| Metric | Type | Labels | Description |
|---|---|---|---|
verity_remediations_executed_total |
Counter | platform, status |
Total remediation actions executed |
verity_remediations_failed_total |
Counter | platform |
Total remediation actions that failed |
API Metrics¶
| Metric | Type | Labels | Description |
|---|---|---|---|
verity_api_request_duration_seconds |
Histogram | method, endpoint, status |
Duration of API requests |
Audit Metrics¶
| Metric | Type | Labels | Description |
|---|---|---|---|
verity_audit_write_lag_seconds |
Gauge | — | Lag between audit event creation and persistence |
Usage Example¶
from verity_common.metrics import verity_events_ingested_total
verity_events_ingested_total.labels(platform="azure_ad", connector="aad-prod").inc()
logging.py — Structured Logging¶
JSON-formatted structured logging via structlog, with trace context injection for distributed tracing.
setup_logging(service_name: str, level: str = "INFO") -> BoundLogger¶
Configures structlog once per process and returns a bound logger:
from verity_common.logging import setup_logging
logger = setup_logging("decay-engine", level="INFO")
logger.info("engine started", score_threshold=30)
Output (JSON):
{
"service": "decay-engine",
"event": "engine started",
"score_threshold": 30,
"level": "info",
"timestamp": "2024-06-15T10:30:00.000000Z"
}
Processor Pipeline¶
The structlog pipeline applies these processors in order:
merge_contextvars— Merges context variables (e.g. request ID)add_log_level— AddslevelfieldTimeStamper(fmt="iso")— Adds ISO 8601 timestamp_add_trace_context— Injectstrace_idandspan_idfromContextVarsStackInfoRenderer— Renders stack info if presentformat_exc_info— Formats exception tracebacksJSONRenderer— Outputs JSON
Trace Context¶
Distributed trace IDs are propagated via Python ContextVars:
from verity_common.logging import trace_id_var, span_id_var
trace_id_var.set("abc-123-trace")
span_id_var.set("def-456-span")
# Subsequent log entries will include trace_id and span_id
get_logger() -> BoundLogger¶
Returns the current structlog logger (must call setup_logging first):
from verity_common.logging import get_logger
logger = get_logger()
logger.warning("high decay score", principal_id="...", score=15)
PrintLoggerFactory
The logging module uses structlog.PrintLoggerFactory(sys.stdout) instead of structlog.stdlib.LoggerFactory. This avoids the add_logger_name crash that occurs when using PrintLoggerFactory with add_logger_name processor — see Troubleshooting.
schemas.py — Kafka Event Schemas¶
Pydantic models defining the structure of all Kafka events flowing through the platform.
| Schema | Kafka Topic | Description |
|---|---|---|
RawEvent |
verity.events.raw.{platform} |
Raw audit event from a connector |
NormalisedEvent |
verity.events.normalised |
Enriched event with resolved principal/asset IDs |
ScoreUpdatedEvent |
verity.scores.updated |
Decay score computation result |
ReviewCreatedEvent |
verity.reviews.created |
New review packet generated |
ReviewDecidedEvent |
verity.reviews.decided |
Human decision on a review |
RemediationCompletedEvent |
verity.remediation.completed |
Result of a remediation action |
AuditEvent |
verity.audit.trail |
Immutable audit trail event for ClickHouse |
Example: Publishing a NormalisedEvent¶
from verity_common.schemas import NormalisedEvent
event = NormalisedEvent(
event_id="evt-123",
occurred_at=datetime.utcnow(),
principal_id=principal_uuid,
asset_id=asset_uuid,
platform="fabric",
operation="SELECT",
rows_touched=1500,
duration_ms=230,
)
await producer.send("verity.events.normalised", value=event.model_dump(mode="json"))