Skip to content

Building an Integration Layer

Aevum governs access to context. It does not move data between your systems. Your integration layer is the code that bridges your data sources and Aevum's governed membrane.

What the integration layer is

Your data sources                  Aevum
  ┌──────────────┐                ┌─────────────────────┐
  │ Billing DB   │─── ingest ────>│  governed membrane  │
  │ CRM          │─── ingest ────>│  knowledge graph    │
  │ ERP          │─── ingest ────>│  episodic ledger    │
  └──────────────┘                └─────────────────────┘
                                        query
                                  ┌────────▼────────┐
                                  │   Your agent    │
                                  └─────────────────┘

The integration layer is the code that: 1. Reads from your data sources 2. Calls engine.ingest() with proper provenance and classification 3. Handles the OutputEnvelope response

You build this. Aevum governs it.

Option A: Direct Python (simplest)

Write a Python function that reads from your source and ingests to Aevum:

import psycopg2
from aevum.core import Engine
from aevum.core.consent.models import ConsentGrant

engine = Engine()

def sync_invoice(invoice_id: str, customer_id: str) -> None:
    """Read an invoice from billing DB and ingest to Aevum."""
    conn = psycopg2.connect(dsn="postgresql://billing-db/invoices")
    cur = conn.cursor()
    cur.execute("SELECT * FROM invoices WHERE id = %s", (invoice_id,))
    row = cur.fetchone()

    result = engine.ingest(
        data={
            "invoice_id": row["id"],
            "amount": float(row["amount"]),
            "status": row["status"],
            "payment_date": row["payment_date"].isoformat() if row["payment_date"] else None,
        },
        provenance={
            "source_id": "billing-db",
            "chain_of_custody": ["billing-db"],
            "classification": 1,
        },
        purpose="billing-inquiry",
        subject_id=customer_id,
        actor="sync-job",
        idempotency_key=f"invoice-{invoice_id}",
    )

    if result.status != "ok":
        raise RuntimeError(f"Ingest failed: {result.data}")

Call this from a cron job, a webhook handler, or wherever your data changes.

Async Python note. The Engine's five functions are synchronous. If you are calling Aevum from an async Python application (FastAPI, asyncio, etc.), use a thread executor to avoid blocking the event loop:

import asyncio

loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
    None,
    lambda: engine.ingest(
        data=invoice,
        provenance=provenance,
        purpose="billing-inquiry",
        subject_id=customer_id,
        actor=agent_id,
    )
)

Option B: HTTP API (language-agnostic)

For non-Python applications or services that cannot import aevum-core directly, use the HTTP API:

pip install aevum-server
aevum server start --host 0.0.0.0 --port 8000

Then call the five functions from any language or framework:

import httpx

response = httpx.post("http://localhost:8000/v1/ingest", json={
    "data": {"note": "User requested account review"},
    "provenance": {
        "source_id": "billing-system",
        "chain_of_custody": ["billing-system"],
        "classification": 1,
    },
    "purpose": "billing-inquiry",
    "subject_id": customer_id,
}, headers={"X-Actor": agent_id})

Use Option B when your integration code is not Python, or when you need to call Aevum from multiple services without sharing a library dependency.

Option C: Message Queue (enterprise scale)

For high-volume or multi-system integrations, use a message queue:

Billing system ──> Kafka topic "invoices" ──> Aevum ingest consumer
CRM            ──> Kafka topic "customers" ──> Aevum ingest consumer
ERP            ──> Kafka topic "contracts" ──> Aevum ingest consumer
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    "invoices",
    bootstrap_servers=["kafka:9092"],
    value_deserializer=lambda m: json.loads(m.decode("utf-8")),
)

for message in consumer:
    invoice = message.value
    engine.ingest(
        data=invoice,
        provenance={
            "source_id": "billing-system",
            "chain_of_custody": ["billing-system", "kafka"],
            "classification": 1,
        },
        purpose="billing-inquiry",
        subject_id=invoice["customer_id"],
        actor="ingest-consumer",
        idempotency_key=f"invoice-{invoice['id']}-{message.offset}",
    )

The chain_of_custody list records every system the data passed through before reaching Aevum, satisfying Barrier 5 (Provenance).

A complete example: billing use case fully wired

from aevum.core import Engine
from aevum.core.consent.models import ConsentGrant

# Setup
engine = Engine()

def setup_customer_consent(customer_id: str, consent_ref: str) -> None:
    engine.add_consent_grant(ConsentGrant(
        grant_id=f"customer-{customer_id}-billing",
        subject_id=customer_id,
        grantee_id="billing-agent",
        operations=["ingest", "query"],
        purpose="billing-inquiry",
        classification_max=1,
        granted_at="2026-01-01T00:00:00Z",
        expires_at="2027-01-01T00:00:00Z",
        authorization_ref=consent_ref,
    ))

def handle_invoice_event(invoice: dict) -> None:
    """Called by your webhook handler or message consumer."""
    customer_id = invoice["customer_id"]

    result = engine.ingest(
        data=invoice,
        provenance={
            "source_id": "billing-system",
            "chain_of_custody": ["billing-system"],
            "classification": 1,
        },
        purpose="billing-inquiry",
        subject_id=customer_id,
        actor="webhook-handler",
        idempotency_key=f"invoice-{invoice['id']}",
    )
    return result.audit_id

def handle_support_inquiry(customer_id: str, agent_id: str) -> dict:
    """Called by your support agent."""
    result = engine.query(
        purpose="billing-inquiry",
        subject_ids=[customer_id],
        actor=agent_id,
        classification_max=1,
    )
    if result.status == "ok":
        return result.data["results"].get(customer_id, {})
    return {}

The webhook: how Aevum talks back

Aevum can notify your systems when reviews are approved or vetoed:

engine.register_webhook(
    webhook_id="billing-webhooks",
    url="https://your-system.internal/aevum-events",
    secret="your-hmac-secret",
    events=["review.approved", "review.vetoed"],
)

Your webhook handler receives HMAC-signed POST requests. Verify the signature before processing.

What Aevum does vs what you build

Aevum handles You build
Consent enforcement Data source connectors (DB queries, API calls)
Sigchain and audit Webhook handler or message consumer
Human review gates Notification to reviewers
Classification ceilings Posting outcomes to external systems
Replay and verification Agent logic and UI

Technology stack summary

Layer Technology
Kernel aevum-core
Persistence aevum-store-oxigraph (dev) or aevum-store-postgres (prod)
HTTP API aevum-server (FastAPI)
MCP tools aevum-mcp
Policy Cedar in-process + OPA HTTP sidecar (optional)
Your integration Python, your message queue, your DB client