Normalise Engine¶
Path:
services/ingest/normalise-engine/· Type: Consumer
The Normalise Engine transforms validated events into the Verity canonical data model. It performs identity resolution, asset classification, and persists the resulting principals, assets, and grants into PostgreSQL.
Architecture¶
graph LR
K1[verity.events.normalised] --> NE[Normalise Engine]
NE --> K2[verity.identity.resolve]
NE --> K3[verity.asset.classify]
NE --> PG[(PostgreSQL)]
Processing Pipeline¶
1. Identity Resolution¶
Cross-platform identity matching that resolves multiple platform-specific accounts to a single Principal:
- Email matching: Primary and alias email addresses.
- UPN correlation: Azure AD UPN ↔ Snowflake login name ↔ Databricks email.
- Service account grouping: Service principals linked by naming convention or metadata tags.
Resolved identities are published to the verity.identity.resolve topic for downstream consumers.
flowchart TD
E[Normalised Event] --> IR[Identity Resolution]
IR --> M1{Email match?}
M1 -->|Yes| P[Existing Principal]
M1 -->|No| M2{UPN match?}
M2 -->|Yes| P
M2 -->|No| M3{Service account pattern?}
M3 -->|Yes| P
M3 -->|No| NP[Create New Principal]
P --> U[Upsert Principal]
NP --> U
2. Asset Classification¶
Each data asset is classified along two dimensions:
| Dimension | Values |
|---|---|
| Asset type | TABLE, SCHEMA, DATABASE, WAREHOUSE, NOTEBOOK, CLUSTER, WORKSPACE, APPLICATION |
| Sensitivity | PII, FINANCIAL, CONFIDENTIAL, INTERNAL, PUBLIC |
Classification rules:
- Tag-based: Inherited from source system tags (e.g., Snowflake object tags, Unity Catalog tags).
- Pattern-based: Column/table name pattern matching (e.g.,
*_ssn,*_credit_card→PII). - Manual override: Operators can set sensitivity via the API.
Classified assets are published to the verity.asset.classify topic.
3. Database Upserts¶
The engine writes resolved entities to PostgreSQL using upsert (INSERT ... ON CONFLICT UPDATE) semantics:
| Table | Key | Description |
|---|---|---|
principals |
(platform, external_id) |
Resolved identities |
assets |
(platform, external_id) |
Classified data assets |
grants |
(principal_id, asset_id, permission) |
Access-grant relationships |
All upserts are performed within a single database transaction per event batch to ensure consistency.
Configuration¶
| Variable | Required | Default | Description |
|---|---|---|---|
NORMALISE_KAFKA_BOOTSTRAP |
Yes | — | Kafka bootstrap servers |
NORMALISE_CONSUMER_GROUP |
No | verity-normalise-engine |
Kafka consumer group ID |
NORMALISE_DATABASE_URL |
Yes | — | PostgreSQL connection string |
NORMALISE_BATCH_SIZE |
No | 50 |
Events per transaction batch |
NORMALISE_CLASSIFICATION_RULES |
No | default |
Classification rule set name |
NORMALISE_LOG_LEVEL |
No | INFO |
Python log level |
Observability¶
| Metric | Type | Description |
|---|---|---|
normalise_events_processed_total |
Counter | Events consumed (by event type) |
normalise_principals_upserted_total |
Counter | Principal upserts performed |
normalise_assets_upserted_total |
Counter | Asset upserts performed |
normalise_grants_upserted_total |
Counter | Grant upserts performed |
normalise_identity_resolutions_total |
Counter | Successful cross-platform identity matches |
normalise_processing_duration_seconds |
Histogram | Batch processing latency |