Spec Patterns for Real Projects
Cookbook patterns for common architectures. Each pattern shows the topology, fault assumptions, invariants, and a composed matrix.
Pattern 1: API + Database + Cache
The most common pattern. An HTTP API backed by Postgres with Redis caching.
Client → API (HTTP :8080) → Postgres (5432)
→ Redis (6379)
Topology
db = service("db",
interface("pg", "postgres", 5432),
image="postgres:16",
env={"POSTGRES_PASSWORD": "test", "POSTGRES_DB": "mydb"},
healthcheck=tcp("localhost:5432"),
)
cache = service("cache",
interface("redis", "redis", 6379),
image="redis:7",
healthcheck=tcp("localhost:6379"),
)
api = service("api",
interface("http", "http", 8080),
image="myapp:latest",
env={
"DATABASE_URL": "postgres://test@" + db.pg.internal_addr + "/mydb",
"REDIS_URL": "redis://" + cache.redis.internal_addr,
},
depends_on=[db, cache],
healthcheck=http("localhost:8080/health"),
)
Scenarios
def create_user():
return api.http.post(path="/users", body='{"name":"alice"}')
scenario(create_user)
def get_user():
return api.http.get(path="/users/1")
scenario(get_user)
def health():
return api.http.get(path="/health")
scenario(health)
Fault assumptions
db_down = fault_assumption("db_down",
target = api,
connect = deny("ECONNREFUSED"),
)
cache_down = fault_assumption("cache_down",
target = api,
connect = deny("ECONNREFUSED"),
description = "Redis unreachable — API should fall back to DB",
)
disk_full = fault_assumption("disk_full",
target = db,
write = deny("ENOSPC"),
)
db_slow = fault_assumption("db_slow",
target = db,
write = delay("3s"),
)
# Protocol-level: specific query failure
insert_fail = fault_assumption("insert_fail",
target = db.pg,
rules = [error(query="INSERT INTO users*", message="disk full")],
)
Invariants
# Data written to DB must not be lost
def write_persisted(event):
if (event["type"] == "stdout" and event["service"] == "api"
and event.get("action") == "user_created"
and event.get("persisted") != "true"):
fail("user created but not persisted")
persistence_check = monitor(write_persisted, service="api")
# Attach to fault assumptions that affect writes
db_down = fault_assumption("db_down",
target = api,
connect = deny("ECONNREFUSED"),
monitors = [persistence_check],
)
Matrix
fault_matrix(
scenarios = [create_user, get_user, health],
faults = [db_down, cache_down, disk_full, db_slow, insert_fail],
default_expect = lambda r: assert_true(r != None),
overrides = {
(create_user, db_down): lambda r: (
assert_eq(r.status, 503),
# Verify no partial row leaked into the DB
assert_eq(db.pg.query(sql="SELECT count(*) as n FROM users WHERE name='alice'").data[0]["n"], 0,
"no partial user row after DB down"),
),
(create_user, disk_full): lambda r: (
assert_true(r.status >= 500),
assert_eq(db.pg.query(sql="SELECT count(*) as n FROM users WHERE name='alice'").data[0]["n"], 0,
"no partial row on disk full"),
),
(create_user, insert_fail): lambda r: (
assert_true(r.status >= 500),
# Verify proxy actually intercepted the INSERT
assert_eventually(type="proxy", where=lambda e:
"INSERT" in e.data.get("query", "") and e.data.get("action") == "error"),
),
(get_user, cache_down): lambda r: (
assert_eq(r.status, 200), # fallback to DB
# Verify no stale cache entries
assert_eq(len(cache.redis.keys(pattern="user:*").data), 0, "cache should be empty"),
),
(get_user, db_slow): lambda r: assert_true(r.duration_ms < 5000),
},
exclude = [
(health, disk_full), # health check doesn't write
(health, insert_fail), # health check doesn't INSERT
],
)
# 3 scenarios × 5 faults - 2 excluded = 13 tests
Pattern 2: Event-Driven (Producer + Broker + Consumer)
Services communicate through a message broker.
Producer (HTTP :8080) → Kafka (9092) → Consumer (HTTP :8081)
→ Database (5432)
Topology
kafka = service("kafka",
interface("broker", "kafka", 9092),
image="confluentinc/cp-kafka:7.6",
healthcheck=tcp("localhost:9092"),
observe=[topic("order-events", decoder=json_decoder())],
)
db = service("db",
interface("pg", "postgres", 5432),
image="postgres:16",
env={"POSTGRES_PASSWORD": "test"},
healthcheck=tcp("localhost:5432"),
observe=[wal_stream(tables=["orders"])],
)
producer = service("producer",
interface("http", "http", 8080),
image="myapp-producer:latest",
env={"KAFKA_BROKERS": kafka.broker.internal_addr},
depends_on=[kafka],
healthcheck=http("localhost:8080/health"),
)
consumer = service("consumer",
interface("http", "http", 8081),
image="myapp-consumer:latest",
env={
"KAFKA_BROKERS": kafka.broker.internal_addr,
"DATABASE_URL": "postgres://test@" + db.pg.internal_addr + "/mydb",
},
depends_on=[kafka, db],
healthcheck=http("localhost:8081/health"),
)
Scenarios
def publish_order():
return producer.http.post(path="/orders", body='{"item":"widget"}')
scenario(publish_order)
Fault assumptions + invariants
# Invariants
def no_orphan_events(event):
"""Kafka event without DB row = data loss for consumers."""
if event["type"] == "topic" and event.get("order_id"):
db_rows = events(where=lambda e:
e.type == "wal" and e.data.get("action") == "INSERT"
and e.data.get("order_id") == event["order_id"])
if len(db_rows) == 0:
fail("orphan event: " + event["order_id"])
def no_duplicates(event):
if event["type"] == "topic" and event.get("order_id"):
all_for_id = events(where=lambda e:
e.type == "topic" and e.data.get("order_id") == event["order_id"])
if len(all_for_id) > 1:
fail("duplicate message: " + event["order_id"])
orphan_check = monitor(no_orphan_events)
dedup_check = monitor(no_duplicates)
# Fault assumptions
kafka_down = fault_assumption("kafka_down",
target = producer,
connect = deny("ECONNREFUSED"),
monitors = [orphan_check],
)
consumer_db_down = fault_assumption("consumer_db_down",
target = db,
write = deny("EIO"),
monitors = [orphan_check, dedup_check],
)
kafka_drop = fault_assumption("kafka_drop",
target = kafka.broker,
rules = [drop(topic="order-events")],
monitors = [orphan_check],
)
Matrix
fault_matrix(
scenarios = [publish_order],
faults = [kafka_down, consumer_db_down, kafka_drop],
overrides = {
(publish_order, kafka_down): lambda r: assert_true(r.status >= 500,
"should fail, not silently drop"),
},
)
Pattern 3: Microservice Mesh
Multiple services calling each other over HTTP/gRPC.
Gateway (HTTP :8080) → Orders (gRPC :50051) → Inventory (gRPC :50052)
→ Payments (gRPC :50053)
→ Notifications (gRPC :50054)
Topology
inventory = service("inventory",
interface("grpc", "grpc", 50052),
image="myapp-inventory:latest",
healthcheck=tcp("localhost:50052"),
)
payments = service("payments",
interface("grpc", "grpc", 50053),
image="myapp-payments:latest",
healthcheck=tcp("localhost:50053"),
)
orders = service("orders",
interface("grpc", "grpc", 50051),
image="myapp-orders:latest",
env={
"INVENTORY_ADDR": inventory.grpc.internal_addr,
"PAYMENTS_ADDR": payments.grpc.internal_addr,
},
depends_on=[inventory, payments],
healthcheck=tcp("localhost:50051"),
)
gateway = service("gateway",
interface("http", "http", 8080),
image="myapp-gateway:latest",
env={"ORDERS_ADDR": orders.grpc.internal_addr},
depends_on=[orders],
healthcheck=http("localhost:8080/health"),
)
Scenarios
def place_order():
return gateway.http.post(path="/orders", body='{"item":"widget","qty":1}')
scenario(place_order)
def check_inventory():
return gateway.http.get(path="/inventory/widget")
scenario(check_inventory)
Fault assumptions + invariants
# Invariant: if gateway returns 200, the order is fully processed
def no_partial_orders(event):
if (event["type"] == "stdout" and event["service"] == "gateway"
and event.get("status") == "200"
and event.get("order_complete") != "true"):
fail("gateway returned 200 but order not complete")
# Invariant: SLA — no request > 10s
def sla_check(event):
if event["type"] == "step_recv" and event["service"] == "test":
duration = int(event.get("duration_ms", "0"))
if duration > 10000:
fail("SLA violation: " + str(duration) + "ms")
order_integrity = monitor(no_partial_orders, service="gateway")
sla_monitor = monitor(sla_check)
# Fault assumptions
payments_down = fault_assumption("payments_down",
target = orders,
connect = deny("ECONNREFUSED"),
monitors = [order_integrity, sla_monitor],
)
inventory_slow = fault_assumption("inventory_slow",
target = inventory,
write = delay("8s"),
monitors = [sla_monitor],
)
# Cascade: payments down + inventory slow simultaneously
cascade = fault_assumption("cascade",
faults = [payments_down, inventory_slow],
description = "payments down AND inventory slow",
)
Matrix
fault_matrix(
scenarios = [place_order, check_inventory],
faults = [payments_down, inventory_slow, cascade],
default_expect = lambda r: assert_true(r != None),
overrides = {
(place_order, payments_down): lambda r: (
assert_true(r.status in [503, 502]),
assert_true(r.duration_ms < 5000, "should not hang"),
),
(place_order, inventory_slow): lambda r: (
assert_true(r.duration_ms < 10000, "must timeout"),
),
(place_order, cascade): lambda r: (
assert_true(r.status >= 500),
assert_true(r.duration_ms < 5000, "should fail fast"),
),
},
)
# Partitions: standalone tests (not matrix-composable)
def test_partition_orders_inventory():
def scenario():
resp = gateway.http.post(path="/orders", body='...')
assert_true(resp.status >= 500)
partition(orders, inventory, run=scenario)
Adapting patterns to your project
- Identify which pattern is closest to your architecture
- Copy the topology and adjust service names, images, ports
- Define 3-5 scenarios as probes — critical user flows that return results
- Define fault assumptions — 3 per dependency (down, slow, error)
- Attach 1-2 invariant monitors to the most dangerous fault assumptions
- Compose a
fault_matrix()— start withdefault_expectonly, then add overrides - Run and iterate — the first run reveals missing error handling