On this page

Invariants & Monitors

How to find, encode, and verify the safety properties your system must maintain — regardless of what goes wrong.

What’s an invariant?

A test checks one scenario under one fault: “when X happens, Y should be the result.”

An invariant is stronger: “Y must ALWAYS be true, no matter what happens.” It’s not tied to a specific test — it fires across all scenarios, all faults, all interleavings.

Example: the difference

Test: “When the DB is down, create_order returns 503.”

db_down = fault_assumption("db_down",
    target = api,
    connect = deny("ECONNREFUSED"),
)

fault_scenario("order_db_down",
    scenario = create_order,
    faults = db_down,
    expect = lambda r: assert_eq(r.status, 503),
)

Invariant: “An order confirmed to the user is ALWAYS persisted in the database.”

def order_confirmed_means_persisted(event):
    if (event["type"] == "stdout" and event["service"] == "api"
            and event.get("status") == "confirmed"
            and event.get("persisted") != "true"):
        fail("order confirmed but not persisted!")

persistence_check = monitor(order_confirmed_means_persisted, service="api")

The test verifies one cell. The invariant catches a bug in ANY cell — including combinations you haven’t written overrides for.

Where invariants live in the domain model

In the domain-centric model, invariants are monitors attached to fault assumptions. They travel with the fault — every scenario that uses the fault assumption gets the invariant automatically:

# Define the invariant once.
persistence_check = monitor(order_confirmed_means_persisted, service="api")

# Attach to fault assumptions where it matters.
db_down = fault_assumption("db_down",
    target = api,
    connect = deny("ECONNREFUSED"),
    monitors = [persistence_check],
)

db_slow = fault_assumption("db_slow",
    target = db,
    write = delay("3s"),
    monitors = [persistence_check],
)

# The matrix gets invariants for free — every cell that uses
# db_down or db_slow runs persistence_check automatically.
fault_matrix(
    scenarios = [create_order, bulk_import, health_check],
    faults = [db_down, db_slow, disk_full],
)

You don’t wire invariants per-test. You wire them per-failure-mode. A new scenario added to the matrix inherits all monitors.

How to find invariants

Ask: “What must never happen?”

For each service, ask: “what would be catastrophic if it happened?”

SystemCatastrophic eventInvariant
E-commerceUser charged, order not createdPayment → order must be atomic
BankingMoney created from nothingSum of all balances = constant
InventoryStock goes negativestock >= 0 always
MessagingDuplicate messages deliveredEach message ID delivered at most once
AuthValid token rejectedAuthenticated user always authorized

Ask: “What must always happen?”

SystemRequired behaviorInvariant
DatabaseCommitted data survives restartWAL write before commit response
APIFailed request leaves no partial stateRollback on error
QueuePublished message eventually deliveredNo message loss
CacheCache consistent with sourceInvalidation after write

Ask: “What ordering must hold?”

SystemRequired orderingInvariant
PaymentsCharge before fulfillPayment event before shipping event
Event sourcingEvent published after DB commitWAL write before Kafka publish
Distributed lockLock acquired before critical sectionLock event before write event

Encoding invariants as monitors

Pattern 1: “This should never happen”

def no_negative_stock(event):
    if (event["type"] == "stdout" and event["service"] == "inventory"
            and event.get("stock") != None):
        stock = int(event["stock"])
        if stock < 0:
            fail("stock went negative: " + str(stock))

stock_check = monitor(no_negative_stock, service="inventory")

Pattern 2: “If A happens, B must have happened”

def no_orphan_events(event):
    """If we published a Kafka event, the order must be in the DB."""
    if event["type"] == "topic" and event.get("order_id"):
        db_rows = events(where=lambda e:
            e.type == "wal" and e.data.get("action") == "INSERT"
            and e.data.get("order_id") == event["order_id"])
        if len(db_rows) == 0:
            fail("orphan event: order " + event["order_id"] + " not in DB")

orphan_check = monitor(no_orphan_events)

Pattern 3: “A must happen before B”

Use assert_before in the expect oracle of a fault_scenario:

fault_scenario("wal_before_response",
    scenario = create_order,
    expect = lambda r: assert_before(
        first={"service": "db", "syscall": "write", "path": "*.wal"},
        then={"service": "api", "type": "step_recv"},
    ),
)

Pattern 4: “Count constraint”

def max_one_publish(event):
    """Each request should produce at most one Kafka event."""
    if event["type"] == "topic" and event.get("topic") == "order-events":
        all_publishes = events(where=lambda e:
            e.type == "topic" and e.data.get("topic") == "order-events")
        if len(all_publishes) > 1:
            fail("duplicate event: " + str(len(all_publishes)) + " publishes")

dedup_check = monitor(max_one_publish)

Monitors vs assertions vs expect

assert_* in expectmonitor() on assumption
WhenAfter scenario completesOn every event, in real-time
ScopeOne matrix cellEvery cell using that assumption
CatchesWrong return valueInvariant violations mid-execution
Exampleassert_eq(r.status, 503)”stock never negative”

Use expect for: “this specific cell should produce this result.”

Use monitors for: “this property should ALWAYS hold under this failure.”

Use both together — and always verify the fault actually fired:

fault_scenario("order_db_write_fail",
    scenario = create_order,
    faults = db_write_fail,   # carries orphan_check monitor
    expect = lambda r: (
        assert_true(r.status >= 400),
        # Verify: no Kafka event was published
        assert_never(where=lambda e:
            e.type == "topic" and e.data.get("topic") == "order-created"),
        # Verify: the fault actually intercepted a syscall
        assert_eventually(type="syscall", service="db", decision="deny*"),
    ),
)

The monitor catches the invariant violation if it happens. The expect checks the outcome. assert_eventually(decision="deny*") confirms the fault was actually exercised — not silently bypassed.

Invariants under fault — the real power

Invariants that hold even when things break catch the most dangerous bugs:

# Invariants
persistence_check = monitor(order_confirmed_means_persisted, service="api")
stock_check = monitor(no_negative_stock, service="inventory")

# Fault assumptions carry invariants
db_down = fault_assumption("db_down",
    target = api,
    connect = deny("ECONNREFUSED"),
    monitors = [persistence_check, stock_check],
)

disk_full = fault_assumption("disk_full",
    target = db,
    write = deny("ENOSPC"),
    monitors = [persistence_check],
)

# Matrix: 3 scenarios × 2 faults = 6 cells.
# Each cell runs with the appropriate invariant monitors.
fault_matrix(
    scenarios = [create_order, bulk_import, health_check],
    faults = [db_down, disk_full],
    overrides = {
        (create_order, db_down): lambda r: assert_eq(r.status, 503),
    },
)

If a monitor fires during ANY cell, you’ve found a real bug: the system violates a safety property under failure.

Data integrity patterns

The hardest bugs aren’t “API returned 500” — they’re “API returned 500 but also wrote a partial row to the database.” Use protocol steps inside expect to verify data integrity, not just HTTP responses.

No partial rows after error

fault_scenario("no_orphaned_orders",
    scenario = create_order,
    faults = db_write_error,
    expect = lambda r: (
        assert_true(r.status >= 500),
        # Query the DB directly — is the data clean?
        assert_eq(
            db.main.query(sql="SELECT count(*) as n FROM orders WHERE status='pending'").data[0]["n"],
            0,
            "no partial rows after failed INSERT"),
    ),
)

db.main.query() in expect talks to the running database. The service is still up — you can verify actual state, not just the HTTP response.

No stale cache after failure

fault_scenario("no_stale_cache",
    scenario = create_order,
    faults = redis_down,
    expect = lambda r: (
        assert_true(r.status >= 500),
        assert_eq(len(redis.main.keys(pattern="order:*").data), 0,
            "no cached entries after Redis failure"),
    ),
)

No orphan Kafka events

fault_scenario("no_orphan_events",
    scenario = create_order,
    faults = db_write_error,
    expect = lambda r: (
        assert_true(r.status >= 500),
        # If DB write failed, no Kafka event should exist
        assert_never(where=lambda e:
            e.type == "topic" and e.data.get("topic") == "order-events"),
        # Verify the fault actually fired
        assert_eventually(type="syscall", service="db", decision="deny*"),
    ),
)

Message loss detection

fault_scenario("no_message_loss",
    scenario = publish_and_consume,
    faults = consumer_slow,
    expect = lambda r: assert_eq(
        len(events(where=lambda e: e.type == "topic" and e.data.get("action") == "produce")),
        len(events(where=lambda e: e.type == "topic" and e.data.get("action") == "consume")),
        "every produced message must be consumed"),
)

Which tool for which check

VerifyToolWhere
HTTP responser.status, r.bodyexpect lambda arg
DB row exists/absentdb.main.query(sql=...)expect lambda body
Redis key exists/absentredis.main.keys(pattern=...)expect lambda body
Kafka message publishedassert_eventually(type="topic")expect lambda body
Kafka message absentassert_never(type="topic")expect lambda body
Fault actually firedassert_eventually(decision="deny*")expect lambda body
Continuous invariantmonitor() on fault_assumptionFires across all tests

Progression

  1. Start with one invariant — the most important one for your system
  2. Encode it as a monitor() — store in a variable
  3. Attach to your fault assumptions via monitors=
  4. Run the matrix — the invariant fires across all combinations
  5. Add more invariants as you discover them (production incidents are a good source)
  6. Share across specs — put monitors in a load()-able file
# invariants.star — shared across all specs
def no_negative_stock(event):
    ...

def no_orphan_events(event):
    ...

stock_check = monitor(no_negative_stock, service="inventory")
orphan_check = monitor(no_orphan_events)
# order-spec.star
load("invariants.star", "stock_check", "orphan_check")

db_down = fault_assumption("db_down",
    target = api,
    connect = deny("ECONNREFUSED"),
    monitors = [stock_check, orphan_check],
)

Every fault assumption that uses these monitors gets the invariants automatically. A new spec that loads the invariants file inherits the full safety net.