Invariants & Monitors
How to find, encode, and verify the safety properties your system must maintain — regardless of what goes wrong.
What’s an invariant?
A test checks one scenario under one fault: “when X happens, Y should be the result.”
An invariant is stronger: “Y must ALWAYS be true, no matter what happens.” It’s not tied to a specific test — it fires across all scenarios, all faults, all interleavings.
Example: the difference
Test: “When the DB is down, create_order returns 503.”
db_down = fault_assumption("db_down",
target = api,
connect = deny("ECONNREFUSED"),
)
fault_scenario("order_db_down",
scenario = create_order,
faults = db_down,
expect = lambda r: assert_eq(r.status, 503),
)
Invariant: “An order confirmed to the user is ALWAYS persisted in the database.”
def order_confirmed_means_persisted(event):
if (event["type"] == "stdout" and event["service"] == "api"
and event.get("status") == "confirmed"
and event.get("persisted") != "true"):
fail("order confirmed but not persisted!")
persistence_check = monitor(order_confirmed_means_persisted, service="api")
The test verifies one cell. The invariant catches a bug in ANY cell — including combinations you haven’t written overrides for.
Where invariants live in the domain model
In the domain-centric model, invariants are monitors attached to fault assumptions. They travel with the fault — every scenario that uses the fault assumption gets the invariant automatically:
# Define the invariant once.
persistence_check = monitor(order_confirmed_means_persisted, service="api")
# Attach to fault assumptions where it matters.
db_down = fault_assumption("db_down",
target = api,
connect = deny("ECONNREFUSED"),
monitors = [persistence_check],
)
db_slow = fault_assumption("db_slow",
target = db,
write = delay("3s"),
monitors = [persistence_check],
)
# The matrix gets invariants for free — every cell that uses
# db_down or db_slow runs persistence_check automatically.
fault_matrix(
scenarios = [create_order, bulk_import, health_check],
faults = [db_down, db_slow, disk_full],
)
You don’t wire invariants per-test. You wire them per-failure-mode. A new scenario added to the matrix inherits all monitors.
How to find invariants
Ask: “What must never happen?”
For each service, ask: “what would be catastrophic if it happened?”
| System | Catastrophic event | Invariant |
|---|---|---|
| E-commerce | User charged, order not created | Payment → order must be atomic |
| Banking | Money created from nothing | Sum of all balances = constant |
| Inventory | Stock goes negative | stock >= 0 always |
| Messaging | Duplicate messages delivered | Each message ID delivered at most once |
| Auth | Valid token rejected | Authenticated user always authorized |
Ask: “What must always happen?”
| System | Required behavior | Invariant |
|---|---|---|
| Database | Committed data survives restart | WAL write before commit response |
| API | Failed request leaves no partial state | Rollback on error |
| Queue | Published message eventually delivered | No message loss |
| Cache | Cache consistent with source | Invalidation after write |
Ask: “What ordering must hold?”
| System | Required ordering | Invariant |
|---|---|---|
| Payments | Charge before fulfill | Payment event before shipping event |
| Event sourcing | Event published after DB commit | WAL write before Kafka publish |
| Distributed lock | Lock acquired before critical section | Lock event before write event |
Encoding invariants as monitors
Pattern 1: “This should never happen”
def no_negative_stock(event):
if (event["type"] == "stdout" and event["service"] == "inventory"
and event.get("stock") != None):
stock = int(event["stock"])
if stock < 0:
fail("stock went negative: " + str(stock))
stock_check = monitor(no_negative_stock, service="inventory")
Pattern 2: “If A happens, B must have happened”
def no_orphan_events(event):
"""If we published a Kafka event, the order must be in the DB."""
if event["type"] == "topic" and event.get("order_id"):
db_rows = events(where=lambda e:
e.type == "wal" and e.data.get("action") == "INSERT"
and e.data.get("order_id") == event["order_id"])
if len(db_rows) == 0:
fail("orphan event: order " + event["order_id"] + " not in DB")
orphan_check = monitor(no_orphan_events)
Pattern 3: “A must happen before B”
Use assert_before in the expect oracle of a fault_scenario:
fault_scenario("wal_before_response",
scenario = create_order,
expect = lambda r: assert_before(
first={"service": "db", "syscall": "write", "path": "*.wal"},
then={"service": "api", "type": "step_recv"},
),
)
Pattern 4: “Count constraint”
def max_one_publish(event):
"""Each request should produce at most one Kafka event."""
if event["type"] == "topic" and event.get("topic") == "order-events":
all_publishes = events(where=lambda e:
e.type == "topic" and e.data.get("topic") == "order-events")
if len(all_publishes) > 1:
fail("duplicate event: " + str(len(all_publishes)) + " publishes")
dedup_check = monitor(max_one_publish)
Monitors vs assertions vs expect
assert_* in expect | monitor() on assumption | |
|---|---|---|
| When | After scenario completes | On every event, in real-time |
| Scope | One matrix cell | Every cell using that assumption |
| Catches | Wrong return value | Invariant violations mid-execution |
| Example | assert_eq(r.status, 503) | ”stock never negative” |
Use expect for: “this specific cell should produce this result.”
Use monitors for: “this property should ALWAYS hold under this failure.”
Use both together — and always verify the fault actually fired:
fault_scenario("order_db_write_fail",
scenario = create_order,
faults = db_write_fail, # carries orphan_check monitor
expect = lambda r: (
assert_true(r.status >= 400),
# Verify: no Kafka event was published
assert_never(where=lambda e:
e.type == "topic" and e.data.get("topic") == "order-created"),
# Verify: the fault actually intercepted a syscall
assert_eventually(type="syscall", service="db", decision="deny*"),
),
)
The monitor catches the invariant violation if it happens. The expect
checks the outcome. assert_eventually(decision="deny*") confirms the
fault was actually exercised — not silently bypassed.
Invariants under fault — the real power
Invariants that hold even when things break catch the most dangerous bugs:
# Invariants
persistence_check = monitor(order_confirmed_means_persisted, service="api")
stock_check = monitor(no_negative_stock, service="inventory")
# Fault assumptions carry invariants
db_down = fault_assumption("db_down",
target = api,
connect = deny("ECONNREFUSED"),
monitors = [persistence_check, stock_check],
)
disk_full = fault_assumption("disk_full",
target = db,
write = deny("ENOSPC"),
monitors = [persistence_check],
)
# Matrix: 3 scenarios × 2 faults = 6 cells.
# Each cell runs with the appropriate invariant monitors.
fault_matrix(
scenarios = [create_order, bulk_import, health_check],
faults = [db_down, disk_full],
overrides = {
(create_order, db_down): lambda r: assert_eq(r.status, 503),
},
)
If a monitor fires during ANY cell, you’ve found a real bug: the system violates a safety property under failure.
Data integrity patterns
The hardest bugs aren’t “API returned 500” — they’re “API returned 500
but also wrote a partial row to the database.” Use protocol steps inside
expect to verify data integrity, not just HTTP responses.
No partial rows after error
fault_scenario("no_orphaned_orders",
scenario = create_order,
faults = db_write_error,
expect = lambda r: (
assert_true(r.status >= 500),
# Query the DB directly — is the data clean?
assert_eq(
db.main.query(sql="SELECT count(*) as n FROM orders WHERE status='pending'").data[0]["n"],
0,
"no partial rows after failed INSERT"),
),
)
db.main.query() in expect talks to the running database. The service
is still up — you can verify actual state, not just the HTTP response.
No stale cache after failure
fault_scenario("no_stale_cache",
scenario = create_order,
faults = redis_down,
expect = lambda r: (
assert_true(r.status >= 500),
assert_eq(len(redis.main.keys(pattern="order:*").data), 0,
"no cached entries after Redis failure"),
),
)
No orphan Kafka events
fault_scenario("no_orphan_events",
scenario = create_order,
faults = db_write_error,
expect = lambda r: (
assert_true(r.status >= 500),
# If DB write failed, no Kafka event should exist
assert_never(where=lambda e:
e.type == "topic" and e.data.get("topic") == "order-events"),
# Verify the fault actually fired
assert_eventually(type="syscall", service="db", decision="deny*"),
),
)
Message loss detection
fault_scenario("no_message_loss",
scenario = publish_and_consume,
faults = consumer_slow,
expect = lambda r: assert_eq(
len(events(where=lambda e: e.type == "topic" and e.data.get("action") == "produce")),
len(events(where=lambda e: e.type == "topic" and e.data.get("action") == "consume")),
"every produced message must be consumed"),
)
Which tool for which check
| Verify | Tool | Where |
|---|---|---|
| HTTP response | r.status, r.body | expect lambda arg |
| DB row exists/absent | db.main.query(sql=...) | expect lambda body |
| Redis key exists/absent | redis.main.keys(pattern=...) | expect lambda body |
| Kafka message published | assert_eventually(type="topic") | expect lambda body |
| Kafka message absent | assert_never(type="topic") | expect lambda body |
| Fault actually fired | assert_eventually(decision="deny*") | expect lambda body |
| Continuous invariant | monitor() on fault_assumption | Fires across all tests |
Progression
- Start with one invariant — the most important one for your system
- Encode it as a
monitor()— store in a variable - Attach to your fault assumptions via
monitors= - Run the matrix — the invariant fires across all combinations
- Add more invariants as you discover them (production incidents are a good source)
- Share across specs — put monitors in a
load()-able file
# invariants.star — shared across all specs
def no_negative_stock(event):
...
def no_orphan_events(event):
...
stock_check = monitor(no_negative_stock, service="inventory")
orphan_check = monitor(no_orphan_events)
# order-spec.star
load("invariants.star", "stock_check", "orphan_check")
db_down = fault_assumption("db_down",
target = api,
connect = deny("ECONNREFUSED"),
monitors = [stock_check, orphan_check],
)
Every fault assumption that uses these monitors gets the invariants automatically. A new spec that loads the invariants file inherits the full safety net.