Skip to content

Ingest Worker

Path: services/ingest/ingest-worker/ · Type: Consumer

The Ingest Worker is the first processing stage after raw data arrives from connectors. It consumes raw events from all verity.events.raw.* topics, applies validation, deduplication, and enrichment, then publishes clean, normalised events downstream.

Architecture

graph LR
    K1[verity.events.raw.azure_ad] --> IW[Ingest Worker]
    K2[verity.events.raw.snowflake] --> IW
    K3[verity.events.raw.databricks] --> IW
    K4[verity.events.raw.fabric] --> IW

    IW --> K5[verity.events.normalised]
    IW --> DLQ[verity.events.dlq]

Processing Pipeline

Each raw event passes through three stages:

1. Validation

  • Schema validation against the RawEvent Pydantic model.
  • Required fields: event_id, platform, event_type, timestamp, payload.
  • Events failing validation are routed to the dead-letter queue (verity.events.dlq) with the error reason attached.

2. Deduplication

  • Events are deduplicated using a composite key of (platform, event_type, entity_id, timestamp).
  • A sliding-window bloom filter (backed by Redis) rejects duplicates within a configurable time window (default: 24 hours).
  • Duplicate events are silently dropped and counted in the ingest_duplicates_total metric.

3. Enrichment

  • Timestamps are normalised to UTC ISO 8601.
  • Platform-specific field names are mapped to the canonical Verity schema.
  • Metadata is attached: ingested_at, worker_id, schema_version.

Consumer Configuration

Variable Required Default Description
INGEST_KAFKA_BOOTSTRAP Yes Kafka bootstrap servers
INGEST_CONSUMER_GROUP No verity-ingest-worker Kafka consumer group ID
INGEST_TOPICS No verity.events.raw.* Topic subscription pattern
INGEST_REDIS_URL Yes Redis URL for deduplication bloom filter
INGEST_DEDUP_WINDOW_HOURS No 24 Deduplication window in hours
INGEST_BATCH_SIZE No 100 Max events per processing batch
INGEST_LOG_LEVEL No INFO Python log level

Error Handling

flowchart TD
    A[Consume raw event] --> B{Valid schema?}
    B -->|Yes| C{Duplicate?}
    B -->|No| D[Send to DLQ]
    C -->|No| E[Enrich event]
    C -->|Yes| F[Drop & count]
    E --> G[Publish to verity.events.normalised]
    G --> H{Publish succeeded?}
    H -->|Yes| I[Commit offset]
    H -->|No| J[Retry with back-off]
    J -->|Max retries exceeded| D
  • Dead-letter queue: Events that cannot be processed after all retries are sent to verity.events.dlq with full error context.
  • At-least-once delivery: Kafka offsets are committed only after successful downstream publish.
  • Back-pressure: The worker pauses consumption when the enrichment pipeline falls behind.

Observability

Metric Type Description
ingest_events_received_total Counter Total raw events consumed (by platform)
ingest_events_published_total Counter Events successfully published downstream
ingest_duplicates_total Counter Duplicate events dropped
ingest_validation_errors_total Counter Events that failed schema validation
ingest_processing_duration_seconds Histogram End-to-end processing latency per event