On this page

Spec Patterns for Real Projects

Cookbook patterns for common architectures. Each pattern shows the topology, fault assumptions, invariants, and a composed matrix.

Pattern 1: API + Database + Cache

The most common pattern. An HTTP API backed by Postgres with Redis caching.

Client → API (HTTP :8080) → Postgres (5432)
                          → Redis (6379)

Topology

db = service("db",
    interface("pg", "postgres", 5432),
    image="postgres:16",
    env={"POSTGRES_PASSWORD": "test", "POSTGRES_DB": "mydb"},
    healthcheck=tcp("localhost:5432"),
)

cache = service("cache",
    interface("redis", "redis", 6379),
    image="redis:7",
    healthcheck=tcp("localhost:6379"),
)

api = service("api",
    interface("http", "http", 8080),
    image="myapp:latest",
    env={
        "DATABASE_URL": "postgres://test@" + db.pg.internal_addr + "/mydb",
        "REDIS_URL": "redis://" + cache.redis.internal_addr,
    },
    depends_on=[db, cache],
    healthcheck=http("localhost:8080/health"),
)

Scenarios

def create_user():
    return api.http.post(path="/users", body='{"name":"alice"}')

scenario(create_user)

def get_user():
    return api.http.get(path="/users/1")

scenario(get_user)

def health():
    return api.http.get(path="/health")

scenario(health)

Fault assumptions

db_down = fault_assumption("db_down",
    target = api,
    connect = deny("ECONNREFUSED"),
)

cache_down = fault_assumption("cache_down",
    target = api,
    connect = deny("ECONNREFUSED"),
    description = "Redis unreachable — API should fall back to DB",
)

disk_full = fault_assumption("disk_full",
    target = db,
    write = deny("ENOSPC"),
)

db_slow = fault_assumption("db_slow",
    target = db,
    write = delay("3s"),
)

# Protocol-level: specific query failure
insert_fail = fault_assumption("insert_fail",
    target = db.pg,
    rules = [error(query="INSERT INTO users*", message="disk full")],
)

Invariants

# Data written to DB must not be lost
def write_persisted(event):
    if (event["type"] == "stdout" and event["service"] == "api"
            and event.get("action") == "user_created"
            and event.get("persisted") != "true"):
        fail("user created but not persisted")

persistence_check = monitor(write_persisted, service="api")

# Attach to fault assumptions that affect writes
db_down = fault_assumption("db_down",
    target = api,
    connect = deny("ECONNREFUSED"),
    monitors = [persistence_check],
)

Matrix

fault_matrix(
    scenarios = [create_user, get_user, health],
    faults = [db_down, cache_down, disk_full, db_slow, insert_fail],
    default_expect = lambda r: assert_true(r != None),
    overrides = {
        (create_user, db_down): lambda r: (
            assert_eq(r.status, 503),
            # Verify no partial row leaked into the DB
            assert_eq(db.pg.query(sql="SELECT count(*) as n FROM users WHERE name='alice'").data[0]["n"], 0,
                "no partial user row after DB down"),
        ),
        (create_user, disk_full): lambda r: (
            assert_true(r.status >= 500),
            assert_eq(db.pg.query(sql="SELECT count(*) as n FROM users WHERE name='alice'").data[0]["n"], 0,
                "no partial row on disk full"),
        ),
        (create_user, insert_fail): lambda r: (
            assert_true(r.status >= 500),
            # Verify proxy actually intercepted the INSERT
            assert_eventually(type="proxy", where=lambda e:
                "INSERT" in e.data.get("query", "") and e.data.get("action") == "error"),
        ),
        (get_user, cache_down): lambda r: (
            assert_eq(r.status, 200),  # fallback to DB
            # Verify no stale cache entries
            assert_eq(len(cache.redis.keys(pattern="user:*").data), 0, "cache should be empty"),
        ),
        (get_user, db_slow): lambda r: assert_true(r.duration_ms < 5000),
    },
    exclude = [
        (health, disk_full),    # health check doesn't write
        (health, insert_fail),  # health check doesn't INSERT
    ],
)
# 3 scenarios × 5 faults - 2 excluded = 13 tests

Pattern 2: Event-Driven (Producer + Broker + Consumer)

Services communicate through a message broker.

Producer (HTTP :8080) → Kafka (9092) → Consumer (HTTP :8081)
                                     → Database (5432)

Topology

kafka = service("kafka",
    interface("broker", "kafka", 9092),
    image="confluentinc/cp-kafka:7.6",
    healthcheck=tcp("localhost:9092"),
    observe=[topic("order-events", decoder=json_decoder())],
)

db = service("db",
    interface("pg", "postgres", 5432),
    image="postgres:16",
    env={"POSTGRES_PASSWORD": "test"},
    healthcheck=tcp("localhost:5432"),
    observe=[wal_stream(tables=["orders"])],
)

producer = service("producer",
    interface("http", "http", 8080),
    image="myapp-producer:latest",
    env={"KAFKA_BROKERS": kafka.broker.internal_addr},
    depends_on=[kafka],
    healthcheck=http("localhost:8080/health"),
)

consumer = service("consumer",
    interface("http", "http", 8081),
    image="myapp-consumer:latest",
    env={
        "KAFKA_BROKERS": kafka.broker.internal_addr,
        "DATABASE_URL": "postgres://test@" + db.pg.internal_addr + "/mydb",
    },
    depends_on=[kafka, db],
    healthcheck=http("localhost:8081/health"),
)

Scenarios

def publish_order():
    return producer.http.post(path="/orders", body='{"item":"widget"}')

scenario(publish_order)

Fault assumptions + invariants

# Invariants
def no_orphan_events(event):
    """Kafka event without DB row = data loss for consumers."""
    if event["type"] == "topic" and event.get("order_id"):
        db_rows = events(where=lambda e:
            e.type == "wal" and e.data.get("action") == "INSERT"
            and e.data.get("order_id") == event["order_id"])
        if len(db_rows) == 0:
            fail("orphan event: " + event["order_id"])

def no_duplicates(event):
    if event["type"] == "topic" and event.get("order_id"):
        all_for_id = events(where=lambda e:
            e.type == "topic" and e.data.get("order_id") == event["order_id"])
        if len(all_for_id) > 1:
            fail("duplicate message: " + event["order_id"])

orphan_check = monitor(no_orphan_events)
dedup_check = monitor(no_duplicates)

# Fault assumptions
kafka_down = fault_assumption("kafka_down",
    target = producer,
    connect = deny("ECONNREFUSED"),
    monitors = [orphan_check],
)

consumer_db_down = fault_assumption("consumer_db_down",
    target = db,
    write = deny("EIO"),
    monitors = [orphan_check, dedup_check],
)

kafka_drop = fault_assumption("kafka_drop",
    target = kafka.broker,
    rules = [drop(topic="order-events")],
    monitors = [orphan_check],
)

Matrix

fault_matrix(
    scenarios = [publish_order],
    faults = [kafka_down, consumer_db_down, kafka_drop],
    overrides = {
        (publish_order, kafka_down): lambda r: assert_true(r.status >= 500,
            "should fail, not silently drop"),
    },
)

Pattern 3: Microservice Mesh

Multiple services calling each other over HTTP/gRPC.

Gateway (HTTP :8080) → Orders (gRPC :50051) → Inventory (gRPC :50052)
                     → Payments (gRPC :50053)
                     → Notifications (gRPC :50054)

Topology

inventory = service("inventory",
    interface("grpc", "grpc", 50052),
    image="myapp-inventory:latest",
    healthcheck=tcp("localhost:50052"),
)

payments = service("payments",
    interface("grpc", "grpc", 50053),
    image="myapp-payments:latest",
    healthcheck=tcp("localhost:50053"),
)

orders = service("orders",
    interface("grpc", "grpc", 50051),
    image="myapp-orders:latest",
    env={
        "INVENTORY_ADDR": inventory.grpc.internal_addr,
        "PAYMENTS_ADDR": payments.grpc.internal_addr,
    },
    depends_on=[inventory, payments],
    healthcheck=tcp("localhost:50051"),
)

gateway = service("gateway",
    interface("http", "http", 8080),
    image="myapp-gateway:latest",
    env={"ORDERS_ADDR": orders.grpc.internal_addr},
    depends_on=[orders],
    healthcheck=http("localhost:8080/health"),
)

Scenarios

def place_order():
    return gateway.http.post(path="/orders", body='{"item":"widget","qty":1}')

scenario(place_order)

def check_inventory():
    return gateway.http.get(path="/inventory/widget")

scenario(check_inventory)

Fault assumptions + invariants

# Invariant: if gateway returns 200, the order is fully processed
def no_partial_orders(event):
    if (event["type"] == "stdout" and event["service"] == "gateway"
            and event.get("status") == "200"
            and event.get("order_complete") != "true"):
        fail("gateway returned 200 but order not complete")

# Invariant: SLA — no request > 10s
def sla_check(event):
    if event["type"] == "step_recv" and event["service"] == "test":
        duration = int(event.get("duration_ms", "0"))
        if duration > 10000:
            fail("SLA violation: " + str(duration) + "ms")

order_integrity = monitor(no_partial_orders, service="gateway")
sla_monitor = monitor(sla_check)

# Fault assumptions
payments_down = fault_assumption("payments_down",
    target = orders,
    connect = deny("ECONNREFUSED"),
    monitors = [order_integrity, sla_monitor],
)

inventory_slow = fault_assumption("inventory_slow",
    target = inventory,
    write = delay("8s"),
    monitors = [sla_monitor],
)

# Cascade: payments down + inventory slow simultaneously
cascade = fault_assumption("cascade",
    faults = [payments_down, inventory_slow],
    description = "payments down AND inventory slow",
)

Matrix

fault_matrix(
    scenarios = [place_order, check_inventory],
    faults = [payments_down, inventory_slow, cascade],
    default_expect = lambda r: assert_true(r != None),
    overrides = {
        (place_order, payments_down): lambda r: (
            assert_true(r.status in [503, 502]),
            assert_true(r.duration_ms < 5000, "should not hang"),
        ),
        (place_order, inventory_slow): lambda r: (
            assert_true(r.duration_ms < 10000, "must timeout"),
        ),
        (place_order, cascade): lambda r: (
            assert_true(r.status >= 500),
            assert_true(r.duration_ms < 5000, "should fail fast"),
        ),
    },
)

# Partitions: standalone tests (not matrix-composable)
def test_partition_orders_inventory():
    def scenario():
        resp = gateway.http.post(path="/orders", body='...')
        assert_true(resp.status >= 500)
    partition(orders, inventory, run=scenario)

Adapting patterns to your project

  1. Identify which pattern is closest to your architecture
  2. Copy the topology and adjust service names, images, ports
  3. Define 3-5 scenarios as probes — critical user flows that return results
  4. Define fault assumptions — 3 per dependency (down, slow, error)
  5. Attach 1-2 invariant monitors to the most dangerous fault assumptions
  6. Compose a fault_matrix() — start with default_expect only, then add overrides
  7. Run and iterate — the first run reveals missing error handling