Skip to content

Build Your First Connector

Connectors are how Verity ingests data from external platforms. In this tutorial you'll build a complete connector that pulls audit events and permission snapshots from a hypothetical JIRA instance, using the Verity SDK's BaseConnector framework.


How Connectors Work

Every connector follows the same pattern:

sequenceDiagram
    participant Connector
    participant Platform as Source Platform
    participant Kafka
    participant DB as PostgreSQL

    Connector->>DB: Load checkpoint (last processed timestamp)
    Connector->>Platform: Fetch audit events since checkpoint
    loop For each event
        Connector->>Kafka: Publish to verity.events.raw.{platform}
        Connector->>DB: Update checkpoint
    end
    Connector->>Platform: Snapshot current permissions
    loop For each grant
        Connector->>Kafka: Publish to verity.permissions.raw.{platform}
    end

The SDK provides:

  • BaseConnector — abstract base class with the main loop, Kafka publishing, and checkpointing built in
  • RawEvent — dataclass for audit events
  • PermissionSnapshot — dataclass for point-in-time permission grants
  • Automatic checkpointing — so connectors resume from where they left off after restarts

Prerequisites

  • Verity running locally (Quick Start)
  • Python 3.12+
  • The verity-sdk library (included in the monorepo at libs/verity-sdk)

Step 1 — Scaffold the Connector

Create the directory structure:

mkdir -p services/connectors/jira
touch services/connectors/jira/__init__.py
touch services/connectors/jira/connector.py
touch services/connectors/jira/config.py
touch services/connectors/jira/main.py
touch services/connectors/jira/Dockerfile

Your connector will have four files:

File Purpose
config.py pydantic-settings configuration
connector.py Core logic — subclass of BaseConnector
main.py Entrypoint — wires config, Kafka, DB, and starts the connector
Dockerfile Container image definition

Step 2 — Define Configuration

Create a pydantic-settings model with the JIRA_ environment variable prefix:

services/connectors/jira/config.py
"""JIRA connector configuration."""

from pydantic_settings import BaseSettings


class JiraConnectorConfig(BaseSettings):
    model_config = {"env_prefix": "JIRA_"}

    # JIRA connection
    base_url: str = "https://your-org.atlassian.net"
    api_token: str = ""
    email: str = ""

    # Polling
    poll_interval: int = 300  # seconds between poll cycles

    # Connector identity (used for checkpointing)
    connector_id: str = "jira-connector"

Step 3 — Implement the Connector

Subclass BaseConnector and implement the two required methods:

services/connectors/jira/connector.py
"""JIRA platform connector for Verity."""

from __future__ import annotations

import logging
from datetime import datetime, timezone
from typing import Any, AsyncIterator

import httpx

from verity_sdk.connector_base import BaseConnector, PermissionSnapshot, RawEvent

logger = logging.getLogger(__name__)


class JiraConnector(BaseConnector):
    """Pulls audit events and permission snapshots from JIRA.

    Implements the two abstract methods required by BaseConnector:
    - stream_audit_events()  → yields RawEvent objects
    - snapshot_permissions() → yields PermissionSnapshot objects
    """

    def __init__(
        self,
        config: dict[str, Any],
        kafka_producer: Any,
        db_pool: Any,
        http_client: httpx.AsyncClient,
    ) -> None:
        super().__init__(config=config, kafka_producer=kafka_producer, db_pool=db_pool)
        self.http = http_client
        self.base_url = config["base_url"]

    # ── Required: stream_audit_events ──────────────────────────────────

    async def stream_audit_events(
        self, since: datetime
    ) -> AsyncIterator[RawEvent]:
        """Fetch JIRA audit log entries created after `since`.

        Yields RawEvent objects that the BaseConnector.run() loop
        automatically publishes to Kafka topic:
            verity.events.raw.jira
        """
        url = f"{self.base_url}/rest/api/3/auditing/record"
        params = {
            "from": since.strftime("%Y-%m-%dT%H:%M:%S.000+0000"),
            "limit": 100,
        }

        while True:
            resp = await self.http.get(url, params=params)
            resp.raise_for_status()
            data = resp.json()

            for record in data.get("records", []):
                yield RawEvent(
                    platform="jira",
                    event_id=str(record["id"]),
                    occurred_at=datetime.fromisoformat(
                        record["created"]
                    ).replace(tzinfo=timezone.utc),
                    principal_ref=record.get("authorAccountId", "unknown"),
                    asset_ref=record.get("objectItem", {}).get("id", "unknown"),
                    operation=record.get("summary", "unknown"),
                    raw_payload=record,
                )

            # Handle pagination
            if data.get("offset", 0) + data.get("limit", 0) >= data.get("total", 0):
                break
            params["offset"] = data["offset"] + data["limit"]

    # ── Required: snapshot_permissions ──────────────────────────────────

    async def snapshot_permissions(self) -> AsyncIterator[PermissionSnapshot]:
        """Snapshot current project-level permissions in JIRA.

        Yields PermissionSnapshot objects representing who has what
        access to which JIRA projects right now.
        """
        # Fetch all projects
        projects_resp = await self.http.get(
            f"{self.base_url}/rest/api/3/project/search"
        )
        projects_resp.raise_for_status()
        projects = projects_resp.json().get("values", [])

        for project in projects:
            project_key = project["key"]

            # Fetch role-based permissions for this project
            roles_resp = await self.http.get(
                f"{self.base_url}/rest/api/3/project/{project_key}/role"
            )
            roles_resp.raise_for_status()
            roles = roles_resp.json()

            for role_name, role_url in roles.items():
                role_resp = await self.http.get(role_url)
                role_resp.raise_for_status()
                role_data = role_resp.json()

                for actor in role_data.get("actors", []):
                    yield PermissionSnapshot(
                        platform="jira",
                        principal_ref=actor.get("actorUser", {}).get(
                            "accountId", actor.get("name", "unknown")
                        ),
                        asset_ref=f"project:{project_key}",
                        privilege=role_name,
                        grant_mechanism="role",
                        granted_via=f"role:{role_data.get('name', role_name)}",
                        snapshot_at=datetime.now(timezone.utc),
                        raw_payload={
                            "project": project_key,
                            "role": role_name,
                            "actor": actor,
                        },
                    )

Step 4 — Understand the Data Models

The SDK provides two dataclasses that form the contract between connectors and the rest of the pipeline:

RawEvent

Represents a single audit event from the source platform.

@dataclass
class RawEvent:
    platform: str          # e.g., "jira", "aad", "fabric"
    event_id: str          # unique ID from the source platform
    occurred_at: datetime  # when the event happened (UTC)
    principal_ref: str     # platform-specific user identifier
    asset_ref: str         # platform-specific resource identifier
    operation: str         # what happened (e.g., "project.read", "issue.update")
    raw_payload: dict      # full original event for audit trail

PermissionSnapshot

Represents a point-in-time record of a permission grant.

@dataclass
class PermissionSnapshot:
    platform: str          # e.g., "jira", "aad", "fabric"
    principal_ref: str     # platform-specific user identifier
    asset_ref: str         # platform-specific resource identifier
    privilege: str         # permission level (e.g., "admin", "read", "write")
    grant_mechanism: str   # how access was granted (e.g., "role", "direct", "group")
    granted_via: str | None       # intermediary (e.g., "role:Developers")
    snapshot_at: datetime | None  # when this snapshot was taken (UTC)
    raw_payload: dict      # full original record for audit trail

Identity resolution happens downstream

Connectors emit platform-specific identifiers (e.g., JIRA account IDs). The Identity Resolver service in the Normalise plane maps these to canonical Verity principals — your connector doesn't need to worry about cross-platform identity matching.


Step 5 — Create the Entrypoint

Wire everything together in main.py:

services/connectors/jira/main.py
"""JIRA connector entrypoint."""

import asyncio
import logging

import asyncpg
import httpx
from aiokafka import AIOKafkaProducer

from .config import JiraConnectorConfig
from .connector import JiraConnector

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)


async def main() -> None:
    config = JiraConnectorConfig()  # (1)!

    # Initialise infrastructure clients
    db_pool = await asyncpg.create_pool(
        host=config.model_config.get("db_host", "localhost"),
        port=5432,
        user="verity",
        password="verity_dev",
        database="verity",
    )

    kafka_producer = AIOKafkaProducer(
        bootstrap_servers="kafka:9092",
    )
    await kafka_producer.start()

    async with httpx.AsyncClient(
        headers={
            "Authorization": f"Basic {config.email}:{config.api_token}",
            "Content-Type": "application/json",
        },
        timeout=30.0,
    ) as http_client:
        connector = JiraConnector(
            config={
                "connector_id": config.connector_id,
                "base_url": config.base_url,
            },
            kafka_producer=kafka_producer,
            db_pool=db_pool,
            http_client=http_client,
        )

        # Run the connector loop
        while True:
            try:
                await connector.run()  # (2)!
            except Exception:
                logger.exception("Connector run failed, retrying...")
            await asyncio.sleep(config.poll_interval)

    await kafka_producer.stop()
    await db_pool.close()


if __name__ == "__main__":
    asyncio.run(main())
  1. pydantic-settings automatically reads JIRA_* environment variables.
  2. BaseConnector.run() handles the event streaming, Kafka publishing, and checkpointing loop.

Step 6 — Create the Dockerfile

services/connectors/jira/Dockerfile
FROM python:3.12-slim

WORKDIR /app

COPY libs/verity-sdk /app/libs/verity-sdk
COPY services/connectors/jira /app/services/connectors/jira

RUN pip install --no-cache-dir \
    asyncpg \
    aiokafka \
    httpx \
    pydantic-settings \
    /app/libs/verity-sdk

CMD ["python", "-m", "services.connectors.jira.main"]

Step 7 — Register in Docker Compose

Add your connector to docker-compose.yml:

docker-compose.yml (addition)
  connector-jira:
    build:
      context: .
      dockerfile: services/connectors/jira/Dockerfile
    container_name: verity-connector-jira
    profiles:
      - connectors
    environment:
      <<: *app-env
      JIRA_BASE_URL: https://your-org.atlassian.net
      JIRA_EMAIL: your-email@example.com
      JIRA_API_TOKEN: your-api-token
    depends_on:
      postgres:
        condition: service_healthy
      kafka:
        condition: service_healthy
    networks:
      - verity-net

Start it with the connectors profile:

docker compose --profile connectors up -d connector-jira

Checkpoint Mechanism

The BaseConnector provides automatic checkpointing so your connector resumes from its last position after restarts:

Method Behaviour
_load_checkpoint() Reads the last processed timestamp from the connector_checkpoints table. Returns 2000-01-01T00:00:00Z if no checkpoint exists.
_save_checkpoint(ts) Upserts the timestamp into connector_checkpoints using the connector's connector_id. Called automatically after each event is published to Kafka.

The checkpoint table schema:

CREATE TABLE connector_checkpoints (
    connector_id TEXT PRIMARY KEY,
    platform     TEXT NOT NULL,
    checkpoint_at TIMESTAMPTZ NOT NULL,
    metadata     JSONB NOT NULL DEFAULT '{}',
    updated_at   TIMESTAMPTZ NOT NULL DEFAULT now()
);

Idempotent event IDs

Always use the source platform's native event ID as RawEvent.event_id. Downstream services use this for deduplication — if your connector re-emits an event after a restart, the Ingest Worker will detect and discard the duplicate.


Kafka Topic Convention

Connectors publish to topics following this naming pattern:

Topic Pattern Purpose Example
verity.events.raw.{platform} Audit events verity.events.raw.jira
verity.permissions.raw.{platform} Permission snapshots verity.permissions.raw.jira

The {platform} segment is derived from RawEvent.platform and PermissionSnapshot.platform — ensure it's a consistent lowercase string.


Testing Your Connector

Unit Tests

Test your connector methods in isolation by mocking the HTTP client:

tests/connectors/test_jira_connector.py
import pytest
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock

from services.connectors.jira.connector import JiraConnector


@pytest.fixture
def connector():
    return JiraConnector(
        config={"connector_id": "test-jira", "base_url": "https://test.atlassian.net"},
        kafka_producer=AsyncMock(),
        db_pool=AsyncMock(),
        http_client=AsyncMock(),
    )


@pytest.mark.asyncio
async def test_stream_audit_events_yields_raw_events(connector):
    connector.http.get = AsyncMock(return_value=MagicMock(
        json=lambda: {
            "records": [
                {
                    "id": 1,
                    "created": "2025-01-15T10:30:00.000+0000",
                    "authorAccountId": "user-123",
                    "objectItem": {"id": "PROJ-1"},
                    "summary": "issue.view",
                }
            ],
            "offset": 0,
            "limit": 100,
            "total": 1,
        },
        raise_for_status=lambda: None,
    ))

    events = []
    async for event in connector.stream_audit_events(
        since=datetime(2025, 1, 1, tzinfo=timezone.utc)
    ):
        events.append(event)

    assert len(events) == 1
    assert events[0].platform == "jira"
    assert events[0].event_id == "1"
    assert events[0].operation == "issue.view"

Integration Test

Verify end-to-end Kafka publishing by running the connector against the local stack:

# Start infrastructure + your connector
docker compose --profile connectors up -d connector-jira

# Watch the Kafka topic for events
docker compose exec kafka /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic verity.events.raw.jira \
  --from-beginning

Connector Checklist

Before shipping a new connector, verify:

  • Subclasses BaseConnector from verity_sdk
  • Implements stream_audit_events() → yields RawEvent
  • Implements snapshot_permissions() → yields PermissionSnapshot
  • Uses consistent, lowercase platform name
  • Sets a unique connector_id for checkpointing
  • Handles pagination from the source API
  • Handles API rate limits with backoff
  • Has unit tests with mocked HTTP responses
  • Has a Dockerfile and docker-compose entry
  • Configuration uses pydantic-settings with a service-specific prefix

Existing Connectors for Reference

Study the built-in connectors in the services/connectors/ directory:

Connector Directory Platform
Azure AD services/connectors/aad/ Azure Active Directory / Entra ID
Fabric services/connectors/fabric/ Microsoft Fabric
Synapse services/connectors/synapse/ Azure Synapse Analytics
Databricks services/connectors/databricks/ Databricks Unity Catalog
PostgreSQL services/connectors/postgres/ PostgreSQL pg_stat_statements
HR services/connectors/hr/ HR / Identity Provider feeds

Next Steps