Skip to content

Connector SDK

The verity-sdk library (libs/verity-sdk/) provides the abstract base class and dataclasses required to build platform connectors for the Verity platform. Every connector — whether for Azure AD, Fabric, Synapse, Databricks, or a custom source — extends BaseConnector and implements two methods.

Architecture

sequenceDiagram
    participant Source as Source Platform
    participant Connector as BaseConnector
    participant PG as PostgreSQL<br/>(Checkpoints)
    participant Kafka as Kafka

    Connector->>PG: _load_checkpoint()
    PG-->>Connector: last_ts (or epoch)
    loop For each audit event
        Connector->>Source: stream_audit_events(since)
        Source-->>Connector: RawEvent
        Connector->>Kafka: publish to verity.events.raw.{platform}
        Connector->>PG: _save_checkpoint(occurred_at)
    end

Dataclasses

RawEvent

Represents a raw audit event from a source platform, published to Kafka for downstream enrichment and normalisation.

from dataclasses import dataclass, field
from datetime import datetime

@dataclass
class RawEvent:
    """A raw audit event from a source platform."""
    platform: str               # e.g. "azure_ad", "fabric", "databricks"
    event_id: str               # Unique event ID from the source
    occurred_at: datetime       # When the event occurred (UTC)
    principal_ref: str          # Source-specific user/principal reference
    asset_ref: str              # Source-specific asset/resource reference
    operation: str              # e.g. "SELECT", "INSERT", "LOGIN"
    raw_payload: dict = field(default_factory=dict)  # Full original payload

PermissionSnapshot

Represents a point-in-time snapshot of a permission grant, captured during periodic permission scans.

@dataclass
class PermissionSnapshot:
    """A point-in-time snapshot of a permission grant."""
    platform: str               # Source platform identifier
    principal_ref: str          # Source-specific user reference
    asset_ref: str              # Source-specific asset reference
    privilege: str              # e.g. "READ", "WRITE", "ADMIN"
    grant_mechanism: str        # e.g. "direct", "group", "role", "policy"
    granted_via: str | None = None      # Group/role name, if applicable
    snapshot_at: datetime | None = None # When the snapshot was taken
    raw_payload: dict = field(default_factory=dict)

BaseConnector ABC

The BaseConnector abstract base class provides the foundation for all connectors. Subclasses must implement two abstract methods and inherit automatic checkpointing and Kafka publishing.

Constructor

class BaseConnector(ABC):
    def __init__(self, config: Any, kafka_producer: Any, db_pool: Any) -> None:
        self.config = config
        self.kafka_producer = kafka_producer
        self.db_pool = db_pool
Parameter Type Description
config Any Connector-specific configuration dict (must include connector_id)
kafka_producer Any An AIOKafkaProducer instance for publishing events
db_pool Any An asyncpg connection pool for checkpoint persistence

Abstract Methods

stream_audit_events(since: datetime) -> AsyncIterator[RawEvent]

Yield raw audit events that occurred after the given timestamp. This is the primary data ingestion path — every event yielded is automatically published to Kafka and checkpointed.

@abstractmethod
async def stream_audit_events(self, since: datetime) -> AsyncIterator[RawEvent]:
    ...

snapshot_permissions() -> AsyncIterator[PermissionSnapshot]

Yield the current set of permission grants from the platform. Called periodically to build a complete picture of who has access to what.

@abstractmethod
async def snapshot_permissions(self) -> AsyncIterator[PermissionSnapshot]:
    ...

The run() Loop

The run() method provides the main execution loop with automatic checkpointing:

  1. Loads the last checkpoint from the connector_checkpoints table
  2. Calls stream_audit_events(since=checkpoint)
  3. For each RawEvent, publishes to verity.events.raw.{platform} on Kafka
  4. Saves the event's occurred_at as the new checkpoint
async def run(self) -> None:
    checkpoint = await self._load_checkpoint()
    async for event in self.stream_audit_events(since=checkpoint):
        await self.kafka_producer.send(
            topic=f"verity.events.raw.{event.platform}",
            value={ ... },  # Serialised RawEvent fields
        )
        await self._save_checkpoint(event.occurred_at)

Checkpoint Mechanism

Checkpoints provide exactly-once-style resumption across connector restarts.

_load_checkpoint() -> datetime

Reads from the connector_checkpoints table using the connector's ID. Falls back to datetime(2000, 1, 1, tzinfo=UTC) if no checkpoint exists or the query fails.

SELECT last_ts FROM connector_checkpoints WHERE connector_id = $1

_save_checkpoint(ts: datetime) -> None

Upserts the checkpoint timestamp:

INSERT INTO connector_checkpoints (connector_id, last_ts)
VALUES ($1, $2)
ON CONFLICT (connector_id)
DO UPDATE SET last_ts = EXCLUDED.last_ts

Kafka Topic Convention

All raw events are published to topics following the naming convention:

verity.events.raw.{platform}

For example:

  • verity.events.raw.azure_ad
  • verity.events.raw.fabric
  • verity.events.raw.databricks
  • verity.events.raw.synapse

Full Example: Custom Connector

Below is a complete example of building a custom connector for a hypothetical "Acme Data Platform":

"""Connector for Acme Data Platform."""

from __future__ import annotations

import logging
from datetime import datetime
from typing import Any, AsyncIterator

import httpx

from verity_sdk.connector_base import BaseConnector, PermissionSnapshot, RawEvent

logger = logging.getLogger(__name__)


class AcmeConnector(BaseConnector):
    """Ingests audit events and permissions from the Acme Data Platform."""

    PLATFORM = "acme"

    def __init__(self, config: Any, kafka_producer: Any, db_pool: Any) -> None:
        super().__init__(config, kafka_producer, db_pool)
        self.base_url = config["acme_api_url"]
        self.api_key = config["acme_api_key"]
        self.client = httpx.AsyncClient(
            base_url=self.base_url,
            headers={"Authorization": f"Bearer {self.api_key}"},
            timeout=30.0,
        )

    async def stream_audit_events(self, since: datetime) -> AsyncIterator[RawEvent]:
        """Poll the Acme audit log API for new events."""
        page = 1
        while True:
            resp = await self.client.get(
                "/api/v1/audit-logs",
                params={
                    "since": since.isoformat(),
                    "page": page,
                    "page_size": 100,
                },
            )
            resp.raise_for_status()
            data = resp.json()

            for entry in data["events"]:
                yield RawEvent(
                    platform=self.PLATFORM,
                    event_id=entry["id"],
                    occurred_at=datetime.fromisoformat(entry["timestamp"]),
                    principal_ref=entry["user_email"],
                    asset_ref=entry["resource_urn"],
                    operation=entry["action"],
                    raw_payload=entry,
                )

            if not data.get("has_next"):
                break
            page += 1

    async def snapshot_permissions(self) -> AsyncIterator[PermissionSnapshot]:
        """Fetch all current permission grants from Acme."""
        resp = await self.client.get("/api/v1/permissions")
        resp.raise_for_status()

        for grant in resp.json()["grants"]:
            yield PermissionSnapshot(
                platform=self.PLATFORM,
                principal_ref=grant["user_email"],
                asset_ref=grant["resource_urn"],
                privilege=grant["level"].upper(),
                grant_mechanism=grant.get("mechanism", "direct"),
                granted_via=grant.get("role_name"),
                snapshot_at=datetime.utcnow(),
                raw_payload=grant,
            )


# ── Entry point ──────────────────────────────────────────────────────────
async def main():
    from verity_common.config import VerityConfig
    from verity_common.kafka import KafkaSettings, create_producer

    config = VerityConfig()
    connector_config = {
        "connector_id": "acme-prod",
        "acme_api_url": "https://api.acme.example.com",
        "acme_api_key": "sk-...",
    }

    async with create_producer(config.kafka) as producer:
        import asyncpg
        pool = await asyncpg.create_pool(dsn="postgresql://verity:verity@localhost/verity")
        connector = AcmeConnector(connector_config, producer, pool)
        await connector.run()


if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

Existing Connectors

Connector Path Platform
Azure AD / Entra ID services/connectors/aad/ azure_ad
Microsoft Fabric services/connectors/fabric/ fabric
Azure Synapse services/connectors/synapse/ synapse
Databricks services/connectors/databricks/ databricks
PostgreSQL services/connectors/postgres/ postgres
HR System services/connectors/hr/ hr

Best Practices

  1. Always set connector_id in your config — this is used as the checkpoint key
  2. Make stream_audit_events idempotent — events may be re-yielded after a crash before checkpoint is saved
  3. Use UTC datetimes everywhere — the checkpoint system assumes UTC
  4. Handle pagination in your API calls — source APIs often paginate audit logs
  5. Log generously — use the standard logging module; structlog is configured at the service level
  6. Keep raw_payload complete — downstream enrichment relies on the full original event