Build Your First Connector¶
Connectors are how Verity ingests data from external platforms. In this tutorial you'll build a complete connector that pulls audit events and permission snapshots from a hypothetical JIRA instance, using the Verity SDK's BaseConnector framework.
How Connectors Work¶
Every connector follows the same pattern:
sequenceDiagram
participant Connector
participant Platform as Source Platform
participant Kafka
participant DB as PostgreSQL
Connector->>DB: Load checkpoint (last processed timestamp)
Connector->>Platform: Fetch audit events since checkpoint
loop For each event
Connector->>Kafka: Publish to verity.events.raw.{platform}
Connector->>DB: Update checkpoint
end
Connector->>Platform: Snapshot current permissions
loop For each grant
Connector->>Kafka: Publish to verity.permissions.raw.{platform}
end
The SDK provides:
BaseConnector— abstract base class with the main loop, Kafka publishing, and checkpointing built inRawEvent— dataclass for audit eventsPermissionSnapshot— dataclass for point-in-time permission grants- Automatic checkpointing — so connectors resume from where they left off after restarts
Prerequisites¶
- Verity running locally (Quick Start)
- Python 3.12+
- The
verity-sdklibrary (included in the monorepo atlibs/verity-sdk)
Step 1 — Scaffold the Connector¶
Create the directory structure:
mkdir -p services/connectors/jira
touch services/connectors/jira/__init__.py
touch services/connectors/jira/connector.py
touch services/connectors/jira/config.py
touch services/connectors/jira/main.py
touch services/connectors/jira/Dockerfile
Your connector will have four files:
| File | Purpose |
|---|---|
config.py |
pydantic-settings configuration |
connector.py |
Core logic — subclass of BaseConnector |
main.py |
Entrypoint — wires config, Kafka, DB, and starts the connector |
Dockerfile |
Container image definition |
Step 2 — Define Configuration¶
Create a pydantic-settings model with the JIRA_ environment variable prefix:
"""JIRA connector configuration."""
from pydantic_settings import BaseSettings
class JiraConnectorConfig(BaseSettings):
model_config = {"env_prefix": "JIRA_"}
# JIRA connection
base_url: str = "https://your-org.atlassian.net"
api_token: str = ""
email: str = ""
# Polling
poll_interval: int = 300 # seconds between poll cycles
# Connector identity (used for checkpointing)
connector_id: str = "jira-connector"
Step 3 — Implement the Connector¶
Subclass BaseConnector and implement the two required methods:
"""JIRA platform connector for Verity."""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from typing import Any, AsyncIterator
import httpx
from verity_sdk.connector_base import BaseConnector, PermissionSnapshot, RawEvent
logger = logging.getLogger(__name__)
class JiraConnector(BaseConnector):
"""Pulls audit events and permission snapshots from JIRA.
Implements the two abstract methods required by BaseConnector:
- stream_audit_events() → yields RawEvent objects
- snapshot_permissions() → yields PermissionSnapshot objects
"""
def __init__(
self,
config: dict[str, Any],
kafka_producer: Any,
db_pool: Any,
http_client: httpx.AsyncClient,
) -> None:
super().__init__(config=config, kafka_producer=kafka_producer, db_pool=db_pool)
self.http = http_client
self.base_url = config["base_url"]
# ── Required: stream_audit_events ──────────────────────────────────
async def stream_audit_events(
self, since: datetime
) -> AsyncIterator[RawEvent]:
"""Fetch JIRA audit log entries created after `since`.
Yields RawEvent objects that the BaseConnector.run() loop
automatically publishes to Kafka topic:
verity.events.raw.jira
"""
url = f"{self.base_url}/rest/api/3/auditing/record"
params = {
"from": since.strftime("%Y-%m-%dT%H:%M:%S.000+0000"),
"limit": 100,
}
while True:
resp = await self.http.get(url, params=params)
resp.raise_for_status()
data = resp.json()
for record in data.get("records", []):
yield RawEvent(
platform="jira",
event_id=str(record["id"]),
occurred_at=datetime.fromisoformat(
record["created"]
).replace(tzinfo=timezone.utc),
principal_ref=record.get("authorAccountId", "unknown"),
asset_ref=record.get("objectItem", {}).get("id", "unknown"),
operation=record.get("summary", "unknown"),
raw_payload=record,
)
# Handle pagination
if data.get("offset", 0) + data.get("limit", 0) >= data.get("total", 0):
break
params["offset"] = data["offset"] + data["limit"]
# ── Required: snapshot_permissions ──────────────────────────────────
async def snapshot_permissions(self) -> AsyncIterator[PermissionSnapshot]:
"""Snapshot current project-level permissions in JIRA.
Yields PermissionSnapshot objects representing who has what
access to which JIRA projects right now.
"""
# Fetch all projects
projects_resp = await self.http.get(
f"{self.base_url}/rest/api/3/project/search"
)
projects_resp.raise_for_status()
projects = projects_resp.json().get("values", [])
for project in projects:
project_key = project["key"]
# Fetch role-based permissions for this project
roles_resp = await self.http.get(
f"{self.base_url}/rest/api/3/project/{project_key}/role"
)
roles_resp.raise_for_status()
roles = roles_resp.json()
for role_name, role_url in roles.items():
role_resp = await self.http.get(role_url)
role_resp.raise_for_status()
role_data = role_resp.json()
for actor in role_data.get("actors", []):
yield PermissionSnapshot(
platform="jira",
principal_ref=actor.get("actorUser", {}).get(
"accountId", actor.get("name", "unknown")
),
asset_ref=f"project:{project_key}",
privilege=role_name,
grant_mechanism="role",
granted_via=f"role:{role_data.get('name', role_name)}",
snapshot_at=datetime.now(timezone.utc),
raw_payload={
"project": project_key,
"role": role_name,
"actor": actor,
},
)
Step 4 — Understand the Data Models¶
The SDK provides two dataclasses that form the contract between connectors and the rest of the pipeline:
RawEvent¶
Represents a single audit event from the source platform.
@dataclass
class RawEvent:
platform: str # e.g., "jira", "aad", "fabric"
event_id: str # unique ID from the source platform
occurred_at: datetime # when the event happened (UTC)
principal_ref: str # platform-specific user identifier
asset_ref: str # platform-specific resource identifier
operation: str # what happened (e.g., "project.read", "issue.update")
raw_payload: dict # full original event for audit trail
PermissionSnapshot¶
Represents a point-in-time record of a permission grant.
@dataclass
class PermissionSnapshot:
platform: str # e.g., "jira", "aad", "fabric"
principal_ref: str # platform-specific user identifier
asset_ref: str # platform-specific resource identifier
privilege: str # permission level (e.g., "admin", "read", "write")
grant_mechanism: str # how access was granted (e.g., "role", "direct", "group")
granted_via: str | None # intermediary (e.g., "role:Developers")
snapshot_at: datetime | None # when this snapshot was taken (UTC)
raw_payload: dict # full original record for audit trail
Identity resolution happens downstream
Connectors emit platform-specific identifiers (e.g., JIRA account IDs). The Identity Resolver service in the Normalise plane maps these to canonical Verity principals — your connector doesn't need to worry about cross-platform identity matching.
Step 5 — Create the Entrypoint¶
Wire everything together in main.py:
"""JIRA connector entrypoint."""
import asyncio
import logging
import asyncpg
import httpx
from aiokafka import AIOKafkaProducer
from .config import JiraConnectorConfig
from .connector import JiraConnector
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
async def main() -> None:
config = JiraConnectorConfig() # (1)!
# Initialise infrastructure clients
db_pool = await asyncpg.create_pool(
host=config.model_config.get("db_host", "localhost"),
port=5432,
user="verity",
password="verity_dev",
database="verity",
)
kafka_producer = AIOKafkaProducer(
bootstrap_servers="kafka:9092",
)
await kafka_producer.start()
async with httpx.AsyncClient(
headers={
"Authorization": f"Basic {config.email}:{config.api_token}",
"Content-Type": "application/json",
},
timeout=30.0,
) as http_client:
connector = JiraConnector(
config={
"connector_id": config.connector_id,
"base_url": config.base_url,
},
kafka_producer=kafka_producer,
db_pool=db_pool,
http_client=http_client,
)
# Run the connector loop
while True:
try:
await connector.run() # (2)!
except Exception:
logger.exception("Connector run failed, retrying...")
await asyncio.sleep(config.poll_interval)
await kafka_producer.stop()
await db_pool.close()
if __name__ == "__main__":
asyncio.run(main())
- pydantic-settings automatically reads
JIRA_*environment variables. BaseConnector.run()handles the event streaming, Kafka publishing, and checkpointing loop.
Step 6 — Create the Dockerfile¶
FROM python:3.12-slim
WORKDIR /app
COPY libs/verity-sdk /app/libs/verity-sdk
COPY services/connectors/jira /app/services/connectors/jira
RUN pip install --no-cache-dir \
asyncpg \
aiokafka \
httpx \
pydantic-settings \
/app/libs/verity-sdk
CMD ["python", "-m", "services.connectors.jira.main"]
Step 7 — Register in Docker Compose¶
Add your connector to docker-compose.yml:
connector-jira:
build:
context: .
dockerfile: services/connectors/jira/Dockerfile
container_name: verity-connector-jira
profiles:
- connectors
environment:
<<: *app-env
JIRA_BASE_URL: https://your-org.atlassian.net
JIRA_EMAIL: your-email@example.com
JIRA_API_TOKEN: your-api-token
depends_on:
postgres:
condition: service_healthy
kafka:
condition: service_healthy
networks:
- verity-net
Start it with the connectors profile:
Checkpoint Mechanism¶
The BaseConnector provides automatic checkpointing so your connector resumes from its last position after restarts:
| Method | Behaviour |
|---|---|
_load_checkpoint() |
Reads the last processed timestamp from the connector_checkpoints table. Returns 2000-01-01T00:00:00Z if no checkpoint exists. |
_save_checkpoint(ts) |
Upserts the timestamp into connector_checkpoints using the connector's connector_id. Called automatically after each event is published to Kafka. |
The checkpoint table schema:
CREATE TABLE connector_checkpoints (
connector_id TEXT PRIMARY KEY,
platform TEXT NOT NULL,
checkpoint_at TIMESTAMPTZ NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}',
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
Idempotent event IDs
Always use the source platform's native event ID as RawEvent.event_id. Downstream services use this for deduplication — if your connector re-emits an event after a restart, the Ingest Worker will detect and discard the duplicate.
Kafka Topic Convention¶
Connectors publish to topics following this naming pattern:
| Topic Pattern | Purpose | Example |
|---|---|---|
verity.events.raw.{platform} |
Audit events | verity.events.raw.jira |
verity.permissions.raw.{platform} |
Permission snapshots | verity.permissions.raw.jira |
The {platform} segment is derived from RawEvent.platform and PermissionSnapshot.platform — ensure it's a consistent lowercase string.
Testing Your Connector¶
Unit Tests¶
Test your connector methods in isolation by mocking the HTTP client:
import pytest
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock
from services.connectors.jira.connector import JiraConnector
@pytest.fixture
def connector():
return JiraConnector(
config={"connector_id": "test-jira", "base_url": "https://test.atlassian.net"},
kafka_producer=AsyncMock(),
db_pool=AsyncMock(),
http_client=AsyncMock(),
)
@pytest.mark.asyncio
async def test_stream_audit_events_yields_raw_events(connector):
connector.http.get = AsyncMock(return_value=MagicMock(
json=lambda: {
"records": [
{
"id": 1,
"created": "2025-01-15T10:30:00.000+0000",
"authorAccountId": "user-123",
"objectItem": {"id": "PROJ-1"},
"summary": "issue.view",
}
],
"offset": 0,
"limit": 100,
"total": 1,
},
raise_for_status=lambda: None,
))
events = []
async for event in connector.stream_audit_events(
since=datetime(2025, 1, 1, tzinfo=timezone.utc)
):
events.append(event)
assert len(events) == 1
assert events[0].platform == "jira"
assert events[0].event_id == "1"
assert events[0].operation == "issue.view"
Integration Test¶
Verify end-to-end Kafka publishing by running the connector against the local stack:
# Start infrastructure + your connector
docker compose --profile connectors up -d connector-jira
# Watch the Kafka topic for events
docker compose exec kafka /opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic verity.events.raw.jira \
--from-beginning
Connector Checklist¶
Before shipping a new connector, verify:
- Subclasses
BaseConnectorfromverity_sdk - Implements
stream_audit_events()→ yieldsRawEvent - Implements
snapshot_permissions()→ yieldsPermissionSnapshot - Uses consistent, lowercase
platformname - Sets a unique
connector_idfor checkpointing - Handles pagination from the source API
- Handles API rate limits with backoff
- Has unit tests with mocked HTTP responses
- Has a Dockerfile and docker-compose entry
- Configuration uses pydantic-settings with a service-specific prefix
Existing Connectors for Reference¶
Study the built-in connectors in the services/connectors/ directory:
| Connector | Directory | Platform |
|---|---|---|
| Azure AD | services/connectors/aad/ |
Azure Active Directory / Entra ID |
| Fabric | services/connectors/fabric/ |
Microsoft Fabric |
| Synapse | services/connectors/synapse/ |
Azure Synapse Analytics |
| Databricks | services/connectors/databricks/ |
Databricks Unity Catalog |
| PostgreSQL | services/connectors/postgres/ |
PostgreSQL pg_stat_statements |
| HR | services/connectors/hr/ |
HR / Identity Provider feeds |
Next Steps¶
- Connector SDK Reference — Full API documentation for
BaseConnector,RawEvent, andPermissionSnapshot - Architecture → Data Flow — See how connector events flow through all five processing planes
- Services → Connectors — Detailed documentation for each built-in connector