Build a Custom Connector¶
Goal¶
By the end of this guide you will have a production-ready connector that periodically pulls permission snapshots and audit events from an external platform, maps them to Verity's canonical model, and publishes them to Kafka for downstream scoring.
What you'll build
A GitHub connector that fetches organisation members, repository
collaborators, and audit log events — then emits PermissionSnapshot and
RawEvent objects into Verity's ingestion pipeline.
Prerequisites¶
| Requirement | Details |
|---|---|
| Verity dev environment | Docker Compose stack running (make up) |
| Python 3.12+ | With uv or pip available |
verity-sdk |
pip install -e libs/verity-sdk |
| GitHub PAT | read:org, read:audit_log scopes |
Connector SDK Overview¶
The Connector SDK (libs/verity-sdk/) provides two key abstractions:
classDiagram
class BaseConnector {
<<abstract>>
+platform: str
+pull_permissions() AsyncIterator~PermissionSnapshot~
+stream_audit_events() AsyncIterator~RawEvent~
-_load_checkpoint() datetime
-_save_checkpoint(ts: datetime)
}
class PermissionSnapshot {
+platform: str
+principal_ref: str
+asset_ref: str
+privilege: str
+grant_mechanism: str
}
class RawEvent {
+platform: str
+event_id: str
+occurred_at: datetime
+principal_ref: str
+asset_ref: str
+operation: str
+raw_payload: dict
}
BaseConnector ..> PermissionSnapshot : yields
BaseConnector ..> RawEvent : yields
Your connector extends BaseConnector and implements two async generators:
pull_permissions()— yieldsPermissionSnapshotobjects representing the current state of every access grant.stream_audit_events()— yieldsRawEventobjects for each audit log entry since the last checkpoint.
Step 1 — Scaffold the Connector¶
Create the directory structure under services/connectors/:
mkdir -p services/connectors/github_connector
touch services/connectors/github_connector/{__init__,connector,config}.py
Expected layout:
services/connectors/github_connector/
├── __init__.py
├── config.py # Pydantic settings
└── connector.py # BaseConnector subclass
Configuration¶
Define connector-specific settings in config.py:
from pydantic_settings import BaseSettings
class GitHubConnectorSettings(BaseSettings):
"""Configuration for the GitHub connector."""
github_org: str
github_token: str
github_api_url: str = "https://api.github.com"
page_size: int = 100
model_config = {"env_prefix": "VERITY_GITHUB_"}
Step 2 — Implement pull_permissions()¶
Open connector.py and create the connector class:
from __future__ import annotations
import httpx
from collections.abc import AsyncIterator
from datetime import datetime, timezone
from verity_sdk.connectors.base import BaseConnector
from verity_sdk.models import PermissionSnapshot, RawEvent
from .config import GitHubConnectorSettings
class GitHubConnector(BaseConnector):
"""Connector that ingests GitHub organisation access data."""
platform = "github"
def __init__(self, settings: GitHubConnectorSettings | None = None) -> None:
super().__init__()
self.settings = settings or GitHubConnectorSettings() # type: ignore[call-arg]
self._client = httpx.AsyncClient(
base_url=self.settings.github_api_url,
headers={
"Authorization": f"Bearer {self.settings.github_token}",
"Accept": "application/vnd.github+json",
},
)
# ── Permissions ──────────────────────────────────────────────
async def pull_permissions(self) -> AsyncIterator[PermissionSnapshot]:
"""Yield a PermissionSnapshot for every org-member × repo grant."""
org = self.settings.github_org
repos = await self._paginate(f"/orgs/{org}/repos")
for repo in repos:
collaborators = await self._paginate(
f"/repos/{org}/{repo['name']}/collaborators"
)
for collab in collaborators:
for perm, granted in collab.get("permissions", {}).items():
if granted:
yield PermissionSnapshot(
platform=self.platform,
principal_ref=collab["login"],
asset_ref=f"{org}/{repo['name']}",
privilege=perm.upper(),
grant_mechanism=_mechanism(collab),
snapshot_at=datetime.now(timezone.utc),
)
Rate limiting
GitHub's API enforces rate limits. Add an asyncio.sleep() between pages
or use the X-RateLimit-Remaining header to back off gracefully.
Step 3 — Implement stream_audit_events()¶
Add the audit event stream to the same class:
async def stream_audit_events(self) -> AsyncIterator[RawEvent]:
"""Stream org audit-log events since the last checkpoint."""
since = self._load_checkpoint()
org = self.settings.github_org
url = f"/orgs/{org}/audit-log"
params = {
"phrase": f"created:>{since.isoformat()}",
"per_page": self.settings.page_size,
}
page = 1
while True:
resp = await self._client.get(url, params={**params, "page": page})
resp.raise_for_status()
entries = resp.json()
if not entries:
break
for entry in entries:
event = RawEvent(
platform=self.platform,
event_id=entry["_document_id"],
occurred_at=datetime.fromtimestamp(
entry["@timestamp"] / 1000, tz=timezone.utc
),
principal_ref=entry.get("actor", "unknown"),
asset_ref=entry.get("repo", entry.get("org", "")),
operation=entry.get("action", "unknown"),
raw_payload=entry,
)
yield event
self._save_checkpoint(event.occurred_at)
page += 1
Step 4 — Handle Checkpointing¶
BaseConnector provides _load_checkpoint() and _save_checkpoint() backed
by PostgreSQL. Each connector stores its checkpoint under a key derived from
self.platform.
# On startup — resume from last known position
since = self._load_checkpoint() # → datetime | epoch
# After processing each event — persist progress
self._save_checkpoint(event.occurred_at)
Idempotency
Always de-duplicate events on the consumer side. If the connector restarts
mid-page, it will re-emit events from the last checkpoint. Downstream
services use (platform, event_id) as a natural dedup key.
For connectors that use offset-based pagination (e.g., cursor tokens), store the offset as a JSON string in the checkpoint column:
import json
def _save_cursor(self, cursor: str) -> None:
self._save_checkpoint_raw(json.dumps({"cursor": cursor}))
def _load_cursor(self) -> str | None:
raw = self._load_checkpoint_raw()
return json.loads(raw).get("cursor") if raw else None
Step 5 — Add Helper Utilities¶
async def _paginate(self, url: str) -> list[dict]:
"""Fetch all pages from a GitHub REST endpoint."""
results: list[dict] = []
page = 1
while True:
resp = await self._client.get(
url, params={"per_page": self.settings.page_size, "page": page}
)
resp.raise_for_status()
data = resp.json()
if not data:
break
results.extend(data)
page += 1
return results
async def close(self) -> None:
await self._client.aclose()
def _mechanism(collaborator: dict) -> str:
"""Infer grant mechanism from the GitHub collaborator object."""
if collaborator.get("role_name") == "admin":
return "role"
if collaborator.get("permissions", {}).get("admin"):
return "direct"
return "group"
Step 6 — Register in the Factory¶
Verity discovers connectors through a factory registry. Add your connector:
from services.connectors.github_connector.connector import GitHubConnector
CONNECTOR_REGISTRY: dict[str, type] = {
# ... existing connectors ...
"github": GitHubConnector,
}
Step 7 — Write Tests¶
Create a test file with mocked API responses:
import pytest
from unittest.mock import AsyncMock, patch
from datetime import datetime, timezone
from services.connectors.github_connector.connector import GitHubConnector
from services.connectors.github_connector.config import GitHubConnectorSettings
@pytest.fixture
def settings() -> GitHubConnectorSettings:
return GitHubConnectorSettings(
github_org="acme-corp",
github_token="ghp_test_token_000",
)
@pytest.fixture
def connector(settings: GitHubConnectorSettings) -> GitHubConnector:
return GitHubConnector(settings=settings)
# ── Permission snapshots ─────────────────────────────────────────
REPOS_RESPONSE = [{"name": "backend", "full_name": "acme-corp/backend"}]
COLLABS_RESPONSE = [
{
"login": "alice",
"role_name": "admin",
"permissions": {"admin": True, "push": True, "pull": True},
},
]
@pytest.mark.asyncio
async def test_pull_permissions(connector: GitHubConnector) -> None:
"""Connector yields one PermissionSnapshot per granted privilege."""
with patch.object(connector, "_paginate", new_callable=AsyncMock) as mock_pg:
mock_pg.side_effect = [REPOS_RESPONSE, COLLABS_RESPONSE]
snapshots = [s async for s in connector.pull_permissions()]
assert len(snapshots) == 3 # admin, push, pull
assert snapshots[0].principal_ref == "alice"
assert snapshots[0].asset_ref == "acme-corp/backend"
assert snapshots[0].privilege == "ADMIN"
# ── Audit events ─────────────────────────────────────────────────
AUDIT_RESPONSE = [
{
"_document_id": "evt-001",
"@timestamp": 1719820800000,
"actor": "alice",
"repo": "acme-corp/backend",
"action": "repo.access",
},
]
@pytest.mark.asyncio
async def test_stream_audit_events(connector: GitHubConnector) -> None:
"""Connector yields RawEvent objects from the audit log."""
mock_resp = AsyncMock()
mock_resp.json.side_effect = [AUDIT_RESPONSE, []]
mock_resp.raise_for_status = lambda: None
with (
patch.object(connector._client, "get", return_value=mock_resp),
patch.object(connector, "_load_checkpoint", return_value=datetime(2024, 1, 1, tzinfo=timezone.utc)),
patch.object(connector, "_save_checkpoint"),
):
events = [e async for e in connector.stream_audit_events()]
assert len(events) == 1
assert events[0].event_id == "evt-001"
assert events[0].operation == "repo.access"
Run the tests:
Step 8 — Deploy¶
Add the connector to docker-compose.yml:
github-connector:
build:
context: .
dockerfile: services/connectors/Dockerfile
environment:
VERITY_CONNECTOR: github
VERITY_GITHUB_ORG: acme-corp
VERITY_GITHUB_TOKEN: ${GITHUB_TOKEN}
VERITY_KAFKA_BROKERS: kafka:9092
VERITY_POSTGRES_DSN: postgresql://verity:verity@postgres:5432/verity
depends_on:
- kafka
- postgres
Verify¶
# Check Kafka topic for events
docker compose exec kafka kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic verity.events.raw.github \
--from-beginning --max-messages 5
# Check permission snapshots in the dashboard
curl -s http://localhost:8000/api/v1/grants?platform=github | python -m json.tool
Full Working Example¶
The complete GitHub connector is available in the repository:
services/connectors/github_connector/
├── __init__.py
├── config.py
├── connector.py
tests/connectors/
└── test_github_connector.py
Troubleshooting¶
| Symptom | Cause | Fix |
|---|---|---|
401 Unauthorized from GitHub API |
Expired or missing PAT | Regenerate the token with required scopes |
| No events on Kafka topic | Checkpoint is ahead of available data | Reset the checkpoint: DELETE FROM connector_checkpoints WHERE platform = 'github' |
| Duplicate events in scoring | Connector restarted mid-page | Expected — downstream services deduplicate on (platform, event_id) |
Rate-limit 403 errors |
Too many API calls in one cycle | Increase page_size, reduce polling frequency, or add backoff logic |
Next Steps¶
- Read the full Connector SDK reference
- Explore the Domain Models for field-level details
- See how the Normalise Engine processes raw events
- Set up Alerting to monitor connector health