Skip to content

Build a Custom Connector

Goal

By the end of this guide you will have a production-ready connector that periodically pulls permission snapshots and audit events from an external platform, maps them to Verity's canonical model, and publishes them to Kafka for downstream scoring.

What you'll build

A GitHub connector that fetches organisation members, repository collaborators, and audit log events — then emits PermissionSnapshot and RawEvent objects into Verity's ingestion pipeline.


Prerequisites

Requirement Details
Verity dev environment Docker Compose stack running (make up)
Python 3.12+ With uv or pip available
verity-sdk pip install -e libs/verity-sdk
GitHub PAT read:org, read:audit_log scopes

Connector SDK Overview

The Connector SDK (libs/verity-sdk/) provides two key abstractions:

classDiagram
    class BaseConnector {
        <<abstract>>
        +platform: str
        +pull_permissions() AsyncIterator~PermissionSnapshot~
        +stream_audit_events() AsyncIterator~RawEvent~
        -_load_checkpoint() datetime
        -_save_checkpoint(ts: datetime)
    }
    class PermissionSnapshot {
        +platform: str
        +principal_ref: str
        +asset_ref: str
        +privilege: str
        +grant_mechanism: str
    }
    class RawEvent {
        +platform: str
        +event_id: str
        +occurred_at: datetime
        +principal_ref: str
        +asset_ref: str
        +operation: str
        +raw_payload: dict
    }

    BaseConnector ..> PermissionSnapshot : yields
    BaseConnector ..> RawEvent : yields

Your connector extends BaseConnector and implements two async generators:

  • pull_permissions() — yields PermissionSnapshot objects representing the current state of every access grant.
  • stream_audit_events() — yields RawEvent objects for each audit log entry since the last checkpoint.

Step 1 — Scaffold the Connector

Create the directory structure under services/connectors/:

mkdir -p services/connectors/github_connector
touch services/connectors/github_connector/{__init__,connector,config}.py

Expected layout:

services/connectors/github_connector/
├── __init__.py
├── config.py          # Pydantic settings
└── connector.py       # BaseConnector subclass

Configuration

Define connector-specific settings in config.py:

services/connectors/github_connector/config.py
from pydantic_settings import BaseSettings

class GitHubConnectorSettings(BaseSettings):
    """Configuration for the GitHub connector."""

    github_org: str
    github_token: str
    github_api_url: str = "https://api.github.com"
    page_size: int = 100

    model_config = {"env_prefix": "VERITY_GITHUB_"}

Step 2 — Implement pull_permissions()

Open connector.py and create the connector class:

services/connectors/github_connector/connector.py
from __future__ import annotations

import httpx
from collections.abc import AsyncIterator
from datetime import datetime, timezone

from verity_sdk.connectors.base import BaseConnector
from verity_sdk.models import PermissionSnapshot, RawEvent

from .config import GitHubConnectorSettings


class GitHubConnector(BaseConnector):
    """Connector that ingests GitHub organisation access data."""

    platform = "github"

    def __init__(self, settings: GitHubConnectorSettings | None = None) -> None:
        super().__init__()
        self.settings = settings or GitHubConnectorSettings()  # type: ignore[call-arg]
        self._client = httpx.AsyncClient(
            base_url=self.settings.github_api_url,
            headers={
                "Authorization": f"Bearer {self.settings.github_token}",
                "Accept": "application/vnd.github+json",
            },
        )

    # ── Permissions ──────────────────────────────────────────────
    async def pull_permissions(self) -> AsyncIterator[PermissionSnapshot]:
        """Yield a PermissionSnapshot for every org-member × repo grant."""
        org = self.settings.github_org
        repos = await self._paginate(f"/orgs/{org}/repos")

        for repo in repos:
            collaborators = await self._paginate(
                f"/repos/{org}/{repo['name']}/collaborators"
            )
            for collab in collaborators:
                for perm, granted in collab.get("permissions", {}).items():
                    if granted:
                        yield PermissionSnapshot(
                            platform=self.platform,
                            principal_ref=collab["login"],
                            asset_ref=f"{org}/{repo['name']}",
                            privilege=perm.upper(),
                            grant_mechanism=_mechanism(collab),
                            snapshot_at=datetime.now(timezone.utc),
                        )

Rate limiting

GitHub's API enforces rate limits. Add an asyncio.sleep() between pages or use the X-RateLimit-Remaining header to back off gracefully.


Step 3 — Implement stream_audit_events()

Add the audit event stream to the same class:

services/connectors/github_connector/connector.py (continued)
    async def stream_audit_events(self) -> AsyncIterator[RawEvent]:
        """Stream org audit-log events since the last checkpoint."""
        since = self._load_checkpoint()
        org = self.settings.github_org
        url = f"/orgs/{org}/audit-log"
        params = {
            "phrase": f"created:>{since.isoformat()}",
            "per_page": self.settings.page_size,
        }

        page = 1
        while True:
            resp = await self._client.get(url, params={**params, "page": page})
            resp.raise_for_status()
            entries = resp.json()
            if not entries:
                break

            for entry in entries:
                event = RawEvent(
                    platform=self.platform,
                    event_id=entry["_document_id"],
                    occurred_at=datetime.fromtimestamp(
                        entry["@timestamp"] / 1000, tz=timezone.utc
                    ),
                    principal_ref=entry.get("actor", "unknown"),
                    asset_ref=entry.get("repo", entry.get("org", "")),
                    operation=entry.get("action", "unknown"),
                    raw_payload=entry,
                )
                yield event
                self._save_checkpoint(event.occurred_at)

            page += 1

Step 4 — Handle Checkpointing

BaseConnector provides _load_checkpoint() and _save_checkpoint() backed by PostgreSQL. Each connector stores its checkpoint under a key derived from self.platform.

Checkpoint flow
# On startup — resume from last known position
since = self._load_checkpoint()   # → datetime | epoch

# After processing each event — persist progress
self._save_checkpoint(event.occurred_at)

Idempotency

Always de-duplicate events on the consumer side. If the connector restarts mid-page, it will re-emit events from the last checkpoint. Downstream services use (platform, event_id) as a natural dedup key.

For connectors that use offset-based pagination (e.g., cursor tokens), store the offset as a JSON string in the checkpoint column:

import json

def _save_cursor(self, cursor: str) -> None:
    self._save_checkpoint_raw(json.dumps({"cursor": cursor}))

def _load_cursor(self) -> str | None:
    raw = self._load_checkpoint_raw()
    return json.loads(raw).get("cursor") if raw else None

Step 5 — Add Helper Utilities

services/connectors/github_connector/connector.py (helpers)
    async def _paginate(self, url: str) -> list[dict]:
        """Fetch all pages from a GitHub REST endpoint."""
        results: list[dict] = []
        page = 1
        while True:
            resp = await self._client.get(
                url, params={"per_page": self.settings.page_size, "page": page}
            )
            resp.raise_for_status()
            data = resp.json()
            if not data:
                break
            results.extend(data)
            page += 1
        return results

    async def close(self) -> None:
        await self._client.aclose()


def _mechanism(collaborator: dict) -> str:
    """Infer grant mechanism from the GitHub collaborator object."""
    if collaborator.get("role_name") == "admin":
        return "role"
    if collaborator.get("permissions", {}).get("admin"):
        return "direct"
    return "group"

Step 6 — Register in the Factory

Verity discovers connectors through a factory registry. Add your connector:

services/connectors/registry.py
from services.connectors.github_connector.connector import GitHubConnector

CONNECTOR_REGISTRY: dict[str, type] = {
    # ... existing connectors ...
    "github": GitHubConnector,
}

Step 7 — Write Tests

Create a test file with mocked API responses:

tests/connectors/test_github_connector.py
import pytest
from unittest.mock import AsyncMock, patch
from datetime import datetime, timezone

from services.connectors.github_connector.connector import GitHubConnector
from services.connectors.github_connector.config import GitHubConnectorSettings


@pytest.fixture
def settings() -> GitHubConnectorSettings:
    return GitHubConnectorSettings(
        github_org="acme-corp",
        github_token="ghp_test_token_000",
    )


@pytest.fixture
def connector(settings: GitHubConnectorSettings) -> GitHubConnector:
    return GitHubConnector(settings=settings)


# ── Permission snapshots ─────────────────────────────────────────

REPOS_RESPONSE = [{"name": "backend", "full_name": "acme-corp/backend"}]
COLLABS_RESPONSE = [
    {
        "login": "alice",
        "role_name": "admin",
        "permissions": {"admin": True, "push": True, "pull": True},
    },
]


@pytest.mark.asyncio
async def test_pull_permissions(connector: GitHubConnector) -> None:
    """Connector yields one PermissionSnapshot per granted privilege."""
    with patch.object(connector, "_paginate", new_callable=AsyncMock) as mock_pg:
        mock_pg.side_effect = [REPOS_RESPONSE, COLLABS_RESPONSE]

        snapshots = [s async for s in connector.pull_permissions()]

    assert len(snapshots) == 3  # admin, push, pull
    assert snapshots[0].principal_ref == "alice"
    assert snapshots[0].asset_ref == "acme-corp/backend"
    assert snapshots[0].privilege == "ADMIN"


# ── Audit events ─────────────────────────────────────────────────

AUDIT_RESPONSE = [
    {
        "_document_id": "evt-001",
        "@timestamp": 1719820800000,
        "actor": "alice",
        "repo": "acme-corp/backend",
        "action": "repo.access",
    },
]


@pytest.mark.asyncio
async def test_stream_audit_events(connector: GitHubConnector) -> None:
    """Connector yields RawEvent objects from the audit log."""
    mock_resp = AsyncMock()
    mock_resp.json.side_effect = [AUDIT_RESPONSE, []]
    mock_resp.raise_for_status = lambda: None

    with (
        patch.object(connector._client, "get", return_value=mock_resp),
        patch.object(connector, "_load_checkpoint", return_value=datetime(2024, 1, 1, tzinfo=timezone.utc)),
        patch.object(connector, "_save_checkpoint"),
    ):
        events = [e async for e in connector.stream_audit_events()]

    assert len(events) == 1
    assert events[0].event_id == "evt-001"
    assert events[0].operation == "repo.access"

Run the tests:

pytest tests/connectors/test_github_connector.py -v

Step 8 — Deploy

Add the connector to docker-compose.yml:

docker-compose.yml (excerpt)
github-connector:
  build:
    context: .
    dockerfile: services/connectors/Dockerfile
  environment:
    VERITY_CONNECTOR: github
    VERITY_GITHUB_ORG: acme-corp
    VERITY_GITHUB_TOKEN: ${GITHUB_TOKEN}
    VERITY_KAFKA_BROKERS: kafka:9092
    VERITY_POSTGRES_DSN: postgresql://verity:verity@postgres:5432/verity
  depends_on:
    - kafka
    - postgres
values.yaml (excerpt)
connectors:
  github:
    enabled: true
    env:
      VERITY_GITHUB_ORG: acme-corp
    secretRef: verity-github-credentials
    schedule: "*/15 * * * *"   # every 15 minutes

Verify

# Check Kafka topic for events
docker compose exec kafka kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic verity.events.raw.github \
  --from-beginning --max-messages 5

# Check permission snapshots in the dashboard
curl -s http://localhost:8000/api/v1/grants?platform=github | python -m json.tool

Full Working Example

The complete GitHub connector is available in the repository:

services/connectors/github_connector/
├── __init__.py
├── config.py
├── connector.py
tests/connectors/
└── test_github_connector.py

Troubleshooting

Symptom Cause Fix
401 Unauthorized from GitHub API Expired or missing PAT Regenerate the token with required scopes
No events on Kafka topic Checkpoint is ahead of available data Reset the checkpoint: DELETE FROM connector_checkpoints WHERE platform = 'github'
Duplicate events in scoring Connector restarted mid-page Expected — downstream services deduplicate on (platform, event_id)
Rate-limit 403 errors Too many API calls in one cycle Increase page_size, reduce polling frequency, or add backoff logic

Next Steps