Connector SDK¶
The verity-sdk library (libs/verity-sdk/) provides the abstract base class and dataclasses required to build platform connectors for the Verity platform. Every connector — whether for Azure AD, Fabric, Synapse, Databricks, or a custom source — extends BaseConnector and implements two methods.
Architecture¶
sequenceDiagram
participant Source as Source Platform
participant Connector as BaseConnector
participant PG as PostgreSQL<br/>(Checkpoints)
participant Kafka as Kafka
Connector->>PG: _load_checkpoint()
PG-->>Connector: last_ts (or epoch)
loop For each audit event
Connector->>Source: stream_audit_events(since)
Source-->>Connector: RawEvent
Connector->>Kafka: publish to verity.events.raw.{platform}
Connector->>PG: _save_checkpoint(occurred_at)
end
Dataclasses¶
RawEvent¶
Represents a raw audit event from a source platform, published to Kafka for downstream enrichment and normalisation.
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class RawEvent:
"""A raw audit event from a source platform."""
platform: str # e.g. "azure_ad", "fabric", "databricks"
event_id: str # Unique event ID from the source
occurred_at: datetime # When the event occurred (UTC)
principal_ref: str # Source-specific user/principal reference
asset_ref: str # Source-specific asset/resource reference
operation: str # e.g. "SELECT", "INSERT", "LOGIN"
raw_payload: dict = field(default_factory=dict) # Full original payload
PermissionSnapshot¶
Represents a point-in-time snapshot of a permission grant, captured during periodic permission scans.
@dataclass
class PermissionSnapshot:
"""A point-in-time snapshot of a permission grant."""
platform: str # Source platform identifier
principal_ref: str # Source-specific user reference
asset_ref: str # Source-specific asset reference
privilege: str # e.g. "READ", "WRITE", "ADMIN"
grant_mechanism: str # e.g. "direct", "group", "role", "policy"
granted_via: str | None = None # Group/role name, if applicable
snapshot_at: datetime | None = None # When the snapshot was taken
raw_payload: dict = field(default_factory=dict)
BaseConnector ABC¶
The BaseConnector abstract base class provides the foundation for all connectors. Subclasses must implement two abstract methods and inherit automatic checkpointing and Kafka publishing.
Constructor¶
class BaseConnector(ABC):
def __init__(self, config: Any, kafka_producer: Any, db_pool: Any) -> None:
self.config = config
self.kafka_producer = kafka_producer
self.db_pool = db_pool
| Parameter | Type | Description |
|---|---|---|
config |
Any |
Connector-specific configuration dict (must include connector_id) |
kafka_producer |
Any |
An AIOKafkaProducer instance for publishing events |
db_pool |
Any |
An asyncpg connection pool for checkpoint persistence |
Abstract Methods¶
stream_audit_events(since: datetime) -> AsyncIterator[RawEvent]¶
Yield raw audit events that occurred after the given timestamp. This is the primary data ingestion path — every event yielded is automatically published to Kafka and checkpointed.
@abstractmethod
async def stream_audit_events(self, since: datetime) -> AsyncIterator[RawEvent]:
...
snapshot_permissions() -> AsyncIterator[PermissionSnapshot]¶
Yield the current set of permission grants from the platform. Called periodically to build a complete picture of who has access to what.
The run() Loop¶
The run() method provides the main execution loop with automatic checkpointing:
- Loads the last checkpoint from the
connector_checkpointstable - Calls
stream_audit_events(since=checkpoint) - For each
RawEvent, publishes toverity.events.raw.{platform}on Kafka - Saves the event's
occurred_atas the new checkpoint
async def run(self) -> None:
checkpoint = await self._load_checkpoint()
async for event in self.stream_audit_events(since=checkpoint):
await self.kafka_producer.send(
topic=f"verity.events.raw.{event.platform}",
value={ ... }, # Serialised RawEvent fields
)
await self._save_checkpoint(event.occurred_at)
Checkpoint Mechanism¶
Checkpoints provide exactly-once-style resumption across connector restarts.
_load_checkpoint() -> datetime¶
Reads from the connector_checkpoints table using the connector's ID. Falls back to datetime(2000, 1, 1, tzinfo=UTC) if no checkpoint exists or the query fails.
_save_checkpoint(ts: datetime) -> None¶
Upserts the checkpoint timestamp:
INSERT INTO connector_checkpoints (connector_id, last_ts)
VALUES ($1, $2)
ON CONFLICT (connector_id)
DO UPDATE SET last_ts = EXCLUDED.last_ts
Kafka Topic Convention¶
All raw events are published to topics following the naming convention:
For example:
verity.events.raw.azure_adverity.events.raw.fabricverity.events.raw.databricksverity.events.raw.synapse
Full Example: Custom Connector¶
Below is a complete example of building a custom connector for a hypothetical "Acme Data Platform":
"""Connector for Acme Data Platform."""
from __future__ import annotations
import logging
from datetime import datetime
from typing import Any, AsyncIterator
import httpx
from verity_sdk.connector_base import BaseConnector, PermissionSnapshot, RawEvent
logger = logging.getLogger(__name__)
class AcmeConnector(BaseConnector):
"""Ingests audit events and permissions from the Acme Data Platform."""
PLATFORM = "acme"
def __init__(self, config: Any, kafka_producer: Any, db_pool: Any) -> None:
super().__init__(config, kafka_producer, db_pool)
self.base_url = config["acme_api_url"]
self.api_key = config["acme_api_key"]
self.client = httpx.AsyncClient(
base_url=self.base_url,
headers={"Authorization": f"Bearer {self.api_key}"},
timeout=30.0,
)
async def stream_audit_events(self, since: datetime) -> AsyncIterator[RawEvent]:
"""Poll the Acme audit log API for new events."""
page = 1
while True:
resp = await self.client.get(
"/api/v1/audit-logs",
params={
"since": since.isoformat(),
"page": page,
"page_size": 100,
},
)
resp.raise_for_status()
data = resp.json()
for entry in data["events"]:
yield RawEvent(
platform=self.PLATFORM,
event_id=entry["id"],
occurred_at=datetime.fromisoformat(entry["timestamp"]),
principal_ref=entry["user_email"],
asset_ref=entry["resource_urn"],
operation=entry["action"],
raw_payload=entry,
)
if not data.get("has_next"):
break
page += 1
async def snapshot_permissions(self) -> AsyncIterator[PermissionSnapshot]:
"""Fetch all current permission grants from Acme."""
resp = await self.client.get("/api/v1/permissions")
resp.raise_for_status()
for grant in resp.json()["grants"]:
yield PermissionSnapshot(
platform=self.PLATFORM,
principal_ref=grant["user_email"],
asset_ref=grant["resource_urn"],
privilege=grant["level"].upper(),
grant_mechanism=grant.get("mechanism", "direct"),
granted_via=grant.get("role_name"),
snapshot_at=datetime.utcnow(),
raw_payload=grant,
)
# ── Entry point ──────────────────────────────────────────────────────────
async def main():
from verity_common.config import VerityConfig
from verity_common.kafka import KafkaSettings, create_producer
config = VerityConfig()
connector_config = {
"connector_id": "acme-prod",
"acme_api_url": "https://api.acme.example.com",
"acme_api_key": "sk-...",
}
async with create_producer(config.kafka) as producer:
import asyncpg
pool = await asyncpg.create_pool(dsn="postgresql://verity:verity@localhost/verity")
connector = AcmeConnector(connector_config, producer, pool)
await connector.run()
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Existing Connectors¶
| Connector | Path | Platform |
|---|---|---|
| Azure AD / Entra ID | services/connectors/aad/ |
azure_ad |
| Microsoft Fabric | services/connectors/fabric/ |
fabric |
| Azure Synapse | services/connectors/synapse/ |
synapse |
| Databricks | services/connectors/databricks/ |
databricks |
| PostgreSQL | services/connectors/postgres/ |
postgres |
| HR System | services/connectors/hr/ |
hr |
Best Practices¶
- Always set
connector_idin your config — this is used as the checkpoint key - Make
stream_audit_eventsidempotent — events may be re-yielded after a crash before checkpoint is saved - Use UTC datetimes everywhere — the checkpoint system assumes UTC
- Handle pagination in your API calls — source APIs often paginate audit logs
- Log generously — use the standard
loggingmodule; structlog is configured at the service level - Keep
raw_payloadcomplete — downstream enrichment relies on the full original event