Skip to content

Normalise Engine

Path: services/ingest/normalise-engine/ · Type: Consumer

The Normalise Engine transforms validated events into the Verity canonical data model. It performs identity resolution, asset classification, and persists the resulting principals, assets, and grants into PostgreSQL.

Architecture

graph LR
    K1[verity.events.normalised] --> NE[Normalise Engine]
    NE --> K2[verity.identity.resolve]
    NE --> K3[verity.asset.classify]
    NE --> PG[(PostgreSQL)]

Processing Pipeline

1. Identity Resolution

Cross-platform identity matching that resolves multiple platform-specific accounts to a single Principal:

  • Email matching: Primary and alias email addresses.
  • UPN correlation: Azure AD UPN ↔ Snowflake login name ↔ Databricks email.
  • Service account grouping: Service principals linked by naming convention or metadata tags.

Resolved identities are published to the verity.identity.resolve topic for downstream consumers.

flowchart TD
    E[Normalised Event] --> IR[Identity Resolution]
    IR --> M1{Email match?}
    M1 -->|Yes| P[Existing Principal]
    M1 -->|No| M2{UPN match?}
    M2 -->|Yes| P
    M2 -->|No| M3{Service account pattern?}
    M3 -->|Yes| P
    M3 -->|No| NP[Create New Principal]
    P --> U[Upsert Principal]
    NP --> U

2. Asset Classification

Each data asset is classified along two dimensions:

Dimension Values
Asset type TABLE, SCHEMA, DATABASE, WAREHOUSE, NOTEBOOK, CLUSTER, WORKSPACE, APPLICATION
Sensitivity PII, FINANCIAL, CONFIDENTIAL, INTERNAL, PUBLIC

Classification rules:

  • Tag-based: Inherited from source system tags (e.g., Snowflake object tags, Unity Catalog tags).
  • Pattern-based: Column/table name pattern matching (e.g., *_ssn, *_credit_cardPII).
  • Manual override: Operators can set sensitivity via the API.

Classified assets are published to the verity.asset.classify topic.

3. Database Upserts

The engine writes resolved entities to PostgreSQL using upsert (INSERT ... ON CONFLICT UPDATE) semantics:

Table Key Description
principals (platform, external_id) Resolved identities
assets (platform, external_id) Classified data assets
grants (principal_id, asset_id, permission) Access-grant relationships

All upserts are performed within a single database transaction per event batch to ensure consistency.

Configuration

Variable Required Default Description
NORMALISE_KAFKA_BOOTSTRAP Yes Kafka bootstrap servers
NORMALISE_CONSUMER_GROUP No verity-normalise-engine Kafka consumer group ID
NORMALISE_DATABASE_URL Yes PostgreSQL connection string
NORMALISE_BATCH_SIZE No 50 Events per transaction batch
NORMALISE_CLASSIFICATION_RULES No default Classification rule set name
NORMALISE_LOG_LEVEL No INFO Python log level

Observability

Metric Type Description
normalise_events_processed_total Counter Events consumed (by event type)
normalise_principals_upserted_total Counter Principal upserts performed
normalise_assets_upserted_total Counter Asset upserts performed
normalise_grants_upserted_total Counter Grant upserts performed
normalise_identity_resolutions_total Counter Successful cross-platform identity matches
normalise_processing_duration_seconds Histogram Batch processing latency