On this page

Faultbox Spec Language Reference

Faultbox uses a single Starlark file (faultbox.star) to define the system topology and test scenarios. Starlark is a Python-like language — the configuration is code.

faultbox test faultbox.star                        # run all tests
faultbox test faultbox.star --test happy_path      # run one test
faultbox test faultbox.star --output trace.json    # JSON trace with syscall events
faultbox test faultbox.star --shiviz trace.shiviz  # ShiViz visualization format
faultbox test faultbox.star --runs 100 --show fail # counterexample discovery
faultbox test faultbox.star --seed 42              # deterministic replay
faultbox init --name orders --port 8080 ./order-svc  # generate starter .star

Primitive Index

Every builtin grouped by what it’s for. Use Cmd-F to jump.

Topologyservice, mock_service, interface, tls_cert (RFC-038).

Healthcheckstcp, http, kafka_ready.

Faults (syscall + protocol)fault, fault_all, fault_start, fault_stop, fault_assumption, fault_scenario, fault_matrix, scenario, partition, nondet.

Fault primitivesdeny, delay, allow, response (proxy), error (proxy), drop (proxy), duplicate (proxy), op.

Mock responsesjson_response, text_response, bytes_response, status_only, redirect, grpc_response, grpc_typed_response (RFC-023), grpc_raw_response (RFC-023), grpc_error, dynamic.

Stdlib mocks (under @faultbox/mocks/) — kafka.broker, redis.server, mongo.server, grpc.server + 7 status shorthands (v0.9.8), http.server, jwt.server (v0.9.9).

Spec-load file readers (RFC-026, v0.9.8) — load_file, load_yaml, load_json.

Assertionsassert_true, assert_eq, assert_eventually, assert_never, assert_before.

Matrix expectations (RFC-027, v0.9.8) — expect_success, expect_error_within, expect_hang.

Event sources & decodersevents, stdout, json_decoder, logfmt_decoder, regex_decoder, monitor.

Concurrency primitivesparallel.

Tracingtrace, trace_start, trace_stop.

Miscstruct, load.

Bottom-rung JWT primitives (rarely needed; jwt.server is the supported surface) — jwt_keypair, jwt_sign, jwt_jwks.

The fastest way to look up a kwarg list: search for the function name in this file. Every primitive’s section has a complete kwarg table or signature line. Anything missing is a doc bug — please file an issue.


Quick Start

# faultbox.star

inventory = service("inventory", "/usr/local/bin/inventory-svc",
    interface("main", "tcp", 5432),
    env = {"PORT": "5432", "WAL_PATH": "/tmp/inventory.wal"},
    healthcheck = tcp("localhost:5432"),
)

orders = service("orders", "/usr/local/bin/order-svc",
    interface("public", "http", 8080),
    env = {"PORT": "8080", "INVENTORY_ADDR": inventory.main.addr},
    depends_on = [inventory],
    healthcheck = http("localhost:8080/health"),
)

def test_happy_path():
    resp = orders.post(path="/orders", body='{"sku":"widget","qty":1}')
    assert_eq(resp.status, 200)
    assert_true("confirmed" in resp.body)

    # Temporal: WAL must have been written.
    assert_eventually(service="inventory", syscall="openat", path="/tmp/inventory.wal")

def test_inventory_down():
    def scenario():
        resp = orders.post(path="/orders", body='{"sku":"widget","qty":1}')
        assert_eq(resp.status, 503)

        # No WAL write should occur.
        assert_never(service="inventory", syscall="openat", path="/tmp/inventory.wal")
    fault(orders, connect=deny("ECONNREFUSED"), run=scenario)

Topology

service(name, [binary], *interfaces, ...)

Declares a service in the system under test. Returns a service object that can be referenced by other services and used in tests.

A service must have exactly one source: binary (local executable), image (Docker container image), or build (Dockerfile directory).

# Binary mode — local executable
db = service("db", "/tmp/mock-db",
    interface("main", "tcp", 5432),
    args = ["--data-dir", "/tmp/db-data"],
    env = {"PORT": "5432"},
    depends_on = [],
    healthcheck = tcp("localhost:5432"),
)

# Container mode — pull image from registry
postgres = service("postgres",
    interface("main", "tcp", 5432),
    image = "postgres:16-alpine",
    env = {"POSTGRES_PASSWORD": "test", "POSTGRES_DB": "testdb"},
    healthcheck = tcp("localhost:5432"),
)

# Container mode — build from Dockerfile
api = service("api",
    interface("public", "http", 8080),
    build = "./api",
    env = {"PORT": "8080", "DB_URL": postgres.main.internal_addr},
    depends_on = [postgres],
    healthcheck = http("localhost:8080/health"),
)
ParameterTypeRequiredDescription
namestringyesService name (used in logs and results)
binarystringone of threePath to the executable (positional or keyword)
imagestringone of threeDocker image reference (e.g., "postgres:16-alpine")
buildstringone of threePath to Dockerfile context directory
positionalinterfaceyesOne or more interface() declarations
argslistnoCommand-line arguments passed to the binary
envdictnoEnvironment variables
volumesdictnoVolume mounts {host_path: container_path} (container mode)
depends_onlistnoServices that must start first
healthcheckhealthchecknoReadiness check (tcp(), http(), or kafka_ready())
observelistnoEvent sources to attach (see Event Sources)
portsdictnoExplicit host port mapping {container_port: host_port} (0 = Docker picks)
reuseboolnoKeep container alive between tests (see Container Lifecycle)
seedcallablenoInitialize service state after healthcheck — runs once (see Container Lifecycle)
resetcallablenoRe-initialize state between tests — runs before each test except the first (see Container Lifecycle)
opsdictnoNamed operations for fd-level fault targeting (see Named Operations)
seccompboolnoDefault True. Set to False to skip shim + seccomp-notify acquisition for this service. Proxy-level faults (HTTP/SQL/Redis/etc.) still apply; syscall-level fault() rules on this service are silently skipped. Workaround for multi-process container entrypoints (MySQL 8’s mysqld_safe wrapper, certain JVM images) where the shim handoff hangs.

Seed data for databases — use volumes to mount init scripts:

postgres = service("postgres",
    interface("main", "postgres", 5432),
    image = "postgres:16-alpine",
    env = {"POSTGRES_PASSWORD": "test", "POSTGRES_DB": "testdb"},
    volumes = {"./init.sql": "/docker-entrypoint-initdb.d/init.sql"},
    healthcheck = tcp("localhost:5432"),
)

Most database images run scripts from /docker-entrypoint-initdb.d/ on first start. This creates your schema and test data before tests run.

Services must be declared in dependency order — define db before api if api depends on db.

interface(name, protocol, port, spec=, tls=)

Declares a communication interface for a service.

interface("public", "http", 8080)
interface("main", "tcp", 5432)
interface("internal", "grpc", 9090)
interface("events", "kafka", 9092, spec="./events.avsc")

# TLS upstream — proxy terminates and re-establishes TLS at both legs:
interface("api", "https", 443, tls=tls_cert())
interface("geo", "grpc", 443, tls=tls_cert(ca="certs/upstream-ca.pem"))
ParameterTypeRequiredDescription
namestringyesInterface name (e.g., "main", "public")
protocolstringyesProtocol type ("http", "tcp", "grpc", etc.)
portintyesPort number
specstringnoPath to protocol spec file (OpenAPI, protobuf, Avro, etc.)
tlstls_cert(...)noTLS material; the proxy terminates TLS at the listener and re-establishes it dialing upstream. See TLS Support.

Protocols are provided by plugins — Go implementations registered at compile time. Each protocol defines its own step methods, healthcheck, and response format. See Protocols for the full list.

Multi-Interface Services

A service can expose multiple interfaces:

courier = service("courier", "./courier-svc",
    interface("public", "http", 8080),
    interface("internal", "grpc", 9090),
    interface("events", "kafka", 9092),
    depends_on = [db, cache],
    healthcheck = http("localhost:8080/health"),
)

Access interfaces by name: courier.public, courier.internal, courier.events.


Type Reference

Everything in a .star file is a typed value. This section defines each built-in type, its constructor, properties, and what’s extensible.

Type: Service

Constructor: service(name, [binary], *interfaces, ...)

A service declaration. Created by service() and assigned to a variable. The variable name is arbitrary — "main" is not special:

db = service("db", ...)        # variable "db", service name "db"
my_pg = service("postgres", ...)  # variable "my_pg", service name "postgres"

Properties (read-only):

PropertyTypeDescription
.namestringService name (first arg to service())
.<interface_name>InterfaceRefReference to the named interface

Shorthand step methods: When a service has exactly one interface, its step methods are promoted to the service level:

# These are equivalent when api has one interface:
api.public.get(path="/health")
api.get(path="/health")

What’s user-defined: The service name and interface names are yours. Nothing is built-in — "main", "public", "internal" are conventions, not keywords.


Type: Interface

Constructor: interface(name, protocol, port, spec=)

Declares a communication endpoint on a service. The protocol string selects which plugin handles step methods and healthchecks.

ParameterTypeRequiredDescription
namestringyesYour name for this interface (arbitrary)
protocolstringyesPlugin name — determines available methods
portintyesPort number
specstringnoPath to protocol spec file (OpenAPI, protobuf, Avro)

What’s user-defined: The name is yours. The protocol must match a registered plugin (see Protocols).


Type: InterfaceRef

Not constructed directly. Returned when you access service.interface_name.

db = service("db", ..., interface("main", "tcp", 5432))
ref = db.main  # ← this is an InterfaceRef

Properties (read-only):

PropertyTypeDescription
.addrstring"localhost:<port>" — for healthchecks, test steps, binary-mode env
.hoststring"localhost"
.portintPort number
.internal_addrstringContainer-to-container address ("servicename:<port>" in Docker, same as .addr for binaries)
.proxy_addrstringHost-side proxy listener for the SUT to dial (RFC-033). Late-bound — returns a placeholder at spec-load that the runtime resolves to e.g. "127.0.0.1:36643" once the proxy is up.
.proxy_hoststringHost part of .proxy_addr"127.0.0.1" for binary SUTs, "host.docker.internal" for container SUTs.
.proxy_portstringPort part of .proxy_addr (string, not int — see note below).

Step methods: determined by the protocol plugin. Accessing a method name returns a callable StepMethod:

db.main.send(data="PING")       # tcp protocol → send()
api.public.post(path="/data")    # http protocol → post()
pg.main.query(sql="SELECT 1")   # postgres protocol → query()

.addr vs .internal_addr vs .proxy_addr:

Binary modeContainer mode
.addrlocalhost:5432localhost:<mapped_port>
.internal_addrlocalhost:5432db:5432 (Docker DNS)
.proxy_addr127.0.0.1:<auto>host.docker.internal:<auto>

Use .addr for healthchecks and test steps (from the host). Use .internal_addr in container env for service-to-service traffic on the Docker network. Use .proxy_addr / .proxy_host / .proxy_port to wire a SUT’s connection through the fault-injection proxy. This is the right choice for any host-binary SUT connecting to a Docker upstream — the upstream’s auto-mapped host port and the proxy’s auto-assigned listener port are both unknown at spec-load time, so a literal value would never work.

truck = service("truck-api", "/usr/local/bin/truck-api",
    interface("main", "http", 9000),
    env = {
        "MYSQL_HOST": db.mysql.proxy_host,                      # → "127.0.0.1"
        "MYSQL_PORT": db.mysql.proxy_port,                      # → "36643" (string)
        "MYSQL_DSN":  "user:pass@tcp(" + db.mysql.proxy_addr + ")/appdb",
    },
)

Late-binding mechanics: at spec-load time .proxy_addr returns a placeholder string (e.g. __FB_PROXY_ADDR_db__mysql__). The placeholder survives any string concatenation the spec does. buildEnv replaces it with the real proxy address once the proxy starts. Don’t .split() or .rsplit() the value — operations on the placeholder run at spec-load time and produce nonsense. Use .proxy_host / .proxy_port instead, which are resolved separately.

.proxy_port is a string, not an int. Late-bound resolution can only substitute into env strings, so the attribute returns a placeholder string at spec-load. Most clients accept string ports; if your spec needs an int (rare), use the auto-injected FAULTBOX_<SVC>_<IFACE>_PORT env var on the SUT process instead.


Type: StepMethod

Not constructed directly. Returned when you access a method on an InterfaceRef.

fn = api.public.post   # ← StepMethod
fn(path="/data")       # ← callable

All step methods return a Response. The available methods depend on the protocol — see Protocols.


Type: Response

Returned by step methods. Wraps the result of a protocol call.

PropertyTypeDescription
.statusintStatus code (HTTP status, or 0 for non-HTTP on success)
.bodystringRaw response body
.datadict/listAuto-decoded — JSON body parsed into native Starlark values
.okboolTrue if the step succeeded
.errorstringError message if .ok is False
.duration_msintStep execution time in milliseconds

.body vs .data: .body is always the raw string. .data is the same content auto-decoded from JSON — you never need json.decode():

resp = pg.main.query(sql="SELECT * FROM users")
print(resp.body)           # '[{"id": 1, "name": "alice"}]'
print(resp.data[0]["name"])  # 'alice'

Type: StarlarkEvent

Passed to where= lambda predicates in assertions and events().

PropertyTypeDescription
.seqintMonotonic sequence number
.servicestringService that produced the event
.typestringEvent type ("syscall", "stdout", "wal", "topic")
.event_typestringPObserve dotted notation ("syscall.write")
.datadictAuto-decoded payload (from JSON "data" field, or all fields)
.fieldsdictRaw string fields
.firstStarlarkEvent/NoneIn assert_before then= lambda: the matched first event
.<field_name>stringDirect access to any field (e.g., .decision, .label)
assert_eventually(where=lambda e: e.type == "wal" and e.data["op"] == "INSERT")
assert_before(
    first=lambda e: e.data["op"] == "INSERT",
    then=lambda e: e.data["ref_id"] == e.first.data["id"],
)

Protocols

Protocols are Go plugins registered at compile time via init(). Each protocol defines which step methods are available on its interfaces.

The protocol string in interface(name, protocol, port) selects the plugin. You cannot define new protocols in Starlark — they are Go code. To add a protocol, implement the protocol.Protocol interface in Go.

Built-in Protocols

ProtocolStep MethodsResponse .dataHealthcheck
httpget, post, put, delete, patchraw body (auto-decoded if JSON)HTTP GET 2xx-3xx
http2get, post, put, delete, patchraw bodyh2c GET 2xx-4xx
tcpsendresponse line as stringTCP connect
udpsend, send_no_reply{raw: hex, size: N}UDP dial
postgresquery, exec[{col: val, ...}] / {rows_affected: N}TCP + Postgres ping
redisget, set, del, ping, keys, lpush, rpush, lrange, incr, command{value: ...}TCP + PING/PONG
mysqlquery, exec[{col: val, ...}] / {rows_affected: N}TCP + MySQL ping
kafkapublish, consume{published: true} / {topic, key, value, ...}TCP connect
natspublish, request, subscribe{subject, data}TCP connect
grpccall{method, raw}TCP connect
mongodbfind, insert, insert_many, update, delete, count, commandBSON docs normalized to JSONTCP + Mongo ping
cassandraquery, exec[{col: val, ...}]TCP + CQL session
clickhousequery, exec[{col: val, ...}] / {ok: true}HTTP /ping

Protocol Step Method Reference

httpinterface("api", "http", 8080)

resp = svc.api.get(path="/users", headers={"Authorization": "Bearer ..."})
resp = svc.api.post(path="/users", body='{"name": "alice"}')

tcpinterface("main", "tcp", 5432)

resp = svc.main.send(data="PING")  # returns string, not Response

postgresinterface("main", "postgres", 5432)

resp = svc.main.query(sql="SELECT * FROM users WHERE id=1")
# resp.data = [{"id": 1, "name": "alice"}]
resp = svc.main.exec(sql="INSERT INTO users (name) VALUES ('bob')")
# resp.data = {"rows_affected": 1}

redisinterface("main", "redis", 6379)

svc.main.set(key="user:1", value="alice")
resp = svc.main.get(key="user:1")
# resp.data = {"value": "alice"}

kafkainterface("main", "kafka", 9092)

svc.main.publish(topic="events", data='{"type": "order"}', key="order-1")
resp = svc.main.consume(topic="events", group="test")
# resp.data = {"topic": "events", "key": "order-1", "value": "..."}

natsinterface("main", "nats", 4222)

svc.main.publish(subject="orders.new", data='{"id": 1}')
resp = svc.main.request(subject="orders.get", data='{"id": 1}')
resp = svc.main.subscribe(subject="orders.*")

grpcinterface("main", "grpc", 9090)

resp = svc.main.call(method="/package.Service/GetUser", body='{"id": 1}')

http2interface("public", "http2", 8080)

# Same API as HTTP/1.1; wire protocol is h2c (cleartext HTTP/2).
resp = svc.public.get(path="/users/1")
# resp.fields["proto"] = "HTTP/2.0"

udpinterface("main", "udp", 8125)

svc.main.send_no_reply(data="api.requests:1|c")        # StatsD metric
resp = svc.main.send(hex="...", timeout_ms=2000)       # DNS query
# resp.data = {"raw": "<hex>", "size": 64}

mongodbinterface("main", "mongodb", 27017)

db.main.insert(collection="users", document={"name": "alice", "role": "admin"})
resp = db.main.find(collection="users", filter={"role": "admin"}, limit=10)
# resp.data = [{"_id": "...", "name": "alice", "role": "admin"}]
db.main.command(cmd={"dropDatabase": 1})

cassandrainterface("main", "cassandra", 9042)

cass.main.exec(cql="CREATE KEYSPACE IF NOT EXISTS test WITH replication = {...}")
resp = cass.main.query(cql="SELECT * FROM test.orders", consistency="QUORUM")

clickhouseinterface("main", "clickhouse", 8123) (HTTP interface)

ch.main.exec(sql="INSERT INTO events (date, type) VALUES (today(), 'order')")
resp = ch.main.query(sql="SELECT count() as n FROM events")
# resp.data = [{"n": 1000000}]

TLS Support

When interface(..., tls=tls_cert(...)) is set, the Faultbox proxy terminates TLS at its listener and re-establishes TLS dialing the upstream. Between the two TLS legs the proxy sees plaintext, so all the protocol-aware fault rules (http.error(path=...), postgres.error(query=...), redis.error(key=...), etc.) continue to fire exactly the same as on plaintext upstreams.

This is the opt-in TLS path — without tls=, the proxy stays in plain-TCP mode (the pre-RFC-038 behavior). Existing specs are unchanged.

tls_cert(...) — TLS material for an interface

tls_cert(
    proxy_cert = "certs/proxy-server.crt",   # cert proxy presents to clients
    proxy_key  = "certs/proxy-server.key",
    client_cert = "certs/proxy-client.crt",  # mTLS cert proxy uses upstream
    client_key  = "certs/proxy-client.key",
    ca = "certs/upstream-ca.pem",            # CA proxy trusts for upstream
    insecure = False,                        # InsecureSkipVerify on upstream
)

All kwargs are optional. tls_cert() (no args) is the dev/test default — the proxy auto-generates a self-signed server cert in memory, and trusts the system CA pool when verifying the upstream.

KwargTypeDefaultPurpose
proxy_certstringauto-generated self-signedServer cert the proxy presents to clients connecting to its listener.
proxy_keystring(paired with proxy_cert)Server key. Must be set if and only if proxy_cert is set.
client_certstringnonemTLS client cert the proxy presents when dialing the upstream.
client_keystring(paired with client_cert)mTLS client key. Must be set if and only if client_cert is set.
castringsystem CA poolPEM bundle the proxy trusts when verifying the upstream’s cert.
insecureboolFalseInsecureSkipVerify on the upstream side — dev escape hatch for self-signed upstreams. Mutually exclusive with ca.

Validation runs at spec-load time. Half-set cert/key pairs, missing files, garbage CA PEM, and insecure=True + ca= collisions all fail with clear errors before the proxy starts.

Relative paths resolve against the spec’s directory. Customers usually keep cert material in a certs/ subfolder next to the spec.

tls_cert() is kwargs-only — positional args are refused so a typo can’t silently swap server / client material.

Per-plugin TLS support matrix

The proxy has 14 plugins. As of v0.12.28, six terminate TLS; the rest stay plain-TCP and emit a proxy_tls_pending warning when an interface declares tls=. The deferred plugins are tracked in RFC-039.

ProtocolTLS supportPatternNotes
http✅ v0.12.24wrap-and-dialHTTPS; Transport.TLSClientConfig upstream.
http2✅ v0.12.24wrap-and-dialALPN h2 forced; http2.ConfigureServer for dispatch.
grpc✅ v0.12.25framework credsgrpc.Creds(credentials.NewTLS(...)) rather than listener-wrap (gRPC owns its handshake).
kafka✅ v0.12.26wrap-and-dialBrokers expose plain + TLS on separate ports.
redis✅ v0.12.27wrap-and-dialRedis 6+ tls-port. RESP3 corpus unchanged.
tcp✅ v0.12.28wrap-and-dialGeneric escape hatch for any “TLS from byte 1” service. Prefix-peek rules still fire on plaintext.
postgres🟡 deferredupgrade-in-bandSSLRequest preamble. RFC-039 PR 1.
mysql🟡 deferredupgrade-in-bandCLIENT_SSL capability. RFC-039 PR 2.
mongodb🟡 deferredwrap-and-dialRFC-039 PR 3.
cassandra🟡 deferredwrap-and-dialRFC-039 PR 4.
clickhouse🟡 deferredwrap-and-dialRFC-039 PR 4.
nats🟡 deferredwrap-and-dialRFC-039 PR 5.
amqp🟡 deferredwrap-and-dialRFC-039 PR 5.
memcached🟡 deferredwrap-and-dialRFC-039 PR 5.
udp❌ no TLSUDP has no TLS in the kernel sense. DTLS would be a separate RFC.

When an interface declares tls= against a 🟡 deferred plugin, the proxy still starts but the listener stays plaintext. The runtime emits a proxy_tls_pending event (visible in the bundle) and a warning to stderr so the discrepancy is visible — silence here would let “TLS handshake fails against proxy” debugging burn an hour.

Common patterns

Dev / test against a TLS upstream with no cert management:

api = service("api", remote="api-prod.example.com",
    interface("public", "http", 443, tls=tls_cert()),
)

The proxy auto-generates its server cert; the upstream is verified against the system CA pool.

mTLS upstream (the inDrive Freight pattern):

geo = service("geo", remote="geo-config.svc.cluster.local",
    interface("api", "grpc", 443,
        tls = tls_cert(
            client_cert = "certs/proxy-client.crt",
            client_key  = "certs/proxy-client.key",
            ca = "certs/upstream-ca.pem",
        ),
    ),
)

The proxy presents its auto-cert to the SUT and client_cert to the upstream.

TLS-terminated upstream that uses a self-signed cert:

cache = service("cache", remote="redis-staging.local",
    interface("main", "redis", 6380, tls=tls_cert(insecure=True)),
)

insecure=True is logged at spec-load — use only for dev.

Healthchecks

tcp(addr, timeout=)

healthcheck = tcp("localhost:5432")
healthcheck = tcp("localhost:5432", timeout="15s")

Polls a TCP connection until it succeeds.

http(url, timeout=)

healthcheck = http("localhost:8080/health")
healthcheck = http("localhost:8080/ready", timeout="30s")

Polls an HTTP endpoint until it returns 2xx/3xx.

kafka_ready(addr, timeout=)

healthcheck = kafka_ready("localhost:9092")
healthcheck = kafka_ready("localhost:9092", timeout="120s")

Verifies Kafka broker readiness at the protocol level: connects, creates a sentinel topic, and confirms a partition leader is elected. More reliable than tcp() for Kafka because Docker’s port proxy accepts TCP before the broker is ready to handle produce/consume requests. Default timeout: 120s.

Default timeout for tcp() and http(): 10s.

Environment Variables

User-Defined

env = {"PORT": "8080", "LOG_LEVEL": "debug"}

Cross-Service References

Reference another service’s interface address directly:

api = service("api", "./api",
    interface("public", "http", 8080),
    env = {"DB_ADDR": db.main.addr},   # → "localhost:5432"
    depends_on = [db],
)

Available attributes on interface references:

AttributeReturnsExample
.addr"localhost:port"db.main.addr"localhost:5432"
.host"localhost"db.main.host"localhost"
.portport numberdb.main.port5432
.internal_addr"hostname:port"db.main.internal_addr"db:5432" (container) or "localhost:5432" (binary)

Container networking: For container services, .internal_addr returns <service-name>:<port> — the Docker network hostname. Use this for container-to-container references in env vars. .addr returns localhost:<mapped-port> for test driver access.

Auto-Injected Variables

Faultbox injects FAULTBOX_<SERVICE>_<INTERFACE>_* env vars for every service:

FAULTBOX_DB_MAIN_ADDR=localhost:5432
FAULTBOX_DB_MAIN_HOST=localhost
FAULTBOX_DB_MAIN_PORT=5432

Since v0.9.5 (RFC-024), these values point at a pass-through proxy that Faultbox pre-starts for every proxy-capable interface, not at the real upstream. The proxy is transparent when no rules are installed — behaviour is byte-identical to dialing the upstream directly. When fault(interface_ref, response(...)|error(...)|drop(...)) installs a rule, the proxy applies it to the SUT’s app-initiated traffic, not just traffic from step() calls. User env values that contain a literal upstream address (e.g. DATABASE_URL="postgres://u:p@localhost:5432/db" via pg.main.addr concatenation) are substring-rewritten the same way.


Container Lifecycle

Reuse, Seed, and Reset

By default, containers are created and destroyed for each test. For real infrastructure (Postgres, Redis, Kafka), this can take 20+ seconds per test.

With reuse=True, containers are created once, seeded once, and reset between tests — cutting multi-test execution time by 5-10x:

postgres = service("postgres",
    interface("main", "postgres", 5432),
    image = "postgres:16-alpine",
    reuse = True,
    seed = seed_db,
    reset = reset_db,
    healthcheck = tcp("localhost:5432"),
)

def seed_db():
    """Runs once after first healthcheck — expensive initialization."""
    postgres.main.exec(sql="CREATE TABLE orders (id SERIAL, status TEXT)")
    postgres.main.exec(sql="INSERT INTO orders (status) VALUES ('pending')")

def reset_db():
    """Runs before each test (except first) — lightweight cleanup."""
    postgres.main.exec(sql="TRUNCATE orders RESTART IDENTITY CASCADE")

Lifecycle

Suite start:  create → healthcheck → seed()
Test 1:       run test
              ↓ faults cleared, reset()
Test 2:       run test
              ↓ faults cleared, reset()
Test N:       run test
Suite end:    destroy container
  • seed() runs once after the first healthcheck. Use it for schema creation, fixture data, or other expensive initialization.
  • reset() runs before each subsequent test, after fault rules are cleared. Use it for TRUNCATE, FLUSHDB, or other fast state cleanup.
  • If reset is not set, seed is called as a fallback between tests.
  • If neither is set, a warning is emitted (state may leak between tests).
  • Reset failure fails the entire test (prevents hidden state leak bugs).

Ports

Use ports= to map container ports to specific host ports:

kafka = service("kafka",
    interface("main", "kafka", 9092),
    image = "apache/kafka:3.7.0",
    ports = {9092: 9092},          # container:host
    healthcheck = kafka_ready("localhost:9092"),
)

When ports is not set, Docker picks random host ports automatically.


Mock Services

For dependencies that don’t deserve a full container (auth/JWKS stubs, metrics sinks, feature-flag gateways) Faultbox can stand up in-process protocol stubs entirely from Starlark — no Dockerfile, no sidecar process. See the dedicated Mock Services reference for the full API. Quick summary:

# Generic primitive — request/response protocols (HTTP, HTTP/2, TCP, UDP, gRPC).
auth = mock_service("auth",
    interface("http", "http", 8090),
    routes = {
        "GET /.well-known/openid-configuration/jwks": json_response(200, {"keys": [...]}),
        "POST /token": dynamic(lambda req: json_response(200, {"sub": req["query"]["user"]})),
    },
)

# Stdlib mocks for stateful protocols.
load("@faultbox/mocks/kafka.star",   "kafka")
load("@faultbox/mocks/redis.star",   "redis")
load("@faultbox/mocks/mongodb.star", "mongo")

bus   = kafka.broker("bus",       interface = interface("main", "kafka", 9092),  topics = {"orders": []})
cache = redis.server("cache",     interface = interface("main", "redis", 6379),  state  = {"flag:new": "true"})
users = mongo.server("users-stub", interface = interface("main", "mongodb", 27017),
                                  collections = {"users": [{"_id": "1", "name": "alice"}]})

mock_service(name, *interfaces, routes={}, default=None, tls=False, config={}, descriptors=None, openapi=None, examples="first", validate="off", overrides={}, depends_on=[])

Generic primitive for request/response protocols. Returns a ServiceDef interchangeable with real services — fault(), events(), env-var references all work the same way.

ParamNotes
routesPattern → response dict. Pattern format depends on protocol ("METHOD /path" for HTTP, "/pkg.Svc/Method" for gRPC, byte-prefix string for TCP/UDP). OpenAPI-style {id} segments in HTTP patterns normalise to *.
defaultFallback when no route matches (default: protocol-appropriate error like HTTP 404).
tlsWhen True, terminate TLS using a per-runtime mock CA. CA bundle path available via the runtime; SUTs trust it via RootCAs.
configOpaque dict consumed by the protocol plugin. Used by stdlib wrappers — you rarely set this directly.
descriptors(gRPC only, RFC-023) Path to a FileDescriptorSet (protoc --descriptor_set_out). Enables typed-proto responses.
openapi(HTTP only, RFC-021) Path to an OpenAPI 3.0 document. Faultbox generates one route per path × method, using the declared example as the response body. Loaded and validated at spec-load time.
examples(HTTP only) Response-selection strategy: "first" (default, deterministic), "<name>" (pick named variant across ops), "random" (seeded random per op), "synthesize" (minimal type-correct values when no example is declared).
validate(HTTP only) Request validation: "off" (default), "warn" (log mismatches), "strict" (reject with HTTP 400). Only JSON bodies are validated.
overrides(HTTP only) Route dict that REPLACES OpenAPI-generated entries by pattern. Accepts OpenAPI-style paths ({id}).
depends_onSame start-ordering semantics as service().

Response constructors

json_response(status=200, body={...}, headers={})    # JSON body, sets Content-Type
text_response(status=200, body="...", headers={})    # text/plain
bytes_response(status=0, data="raw bytes")           # TCP/UDP write-back
status_only(code)                                     # HTTP status, empty body
redirect(location, status=302)                        # HTTP redirect
grpc_response(body={...})                             # google.protobuf.Struct
grpc_error(code="UNAVAILABLE", message="...")         # gRPC canonical status
dynamic(fn)                                           # per-request callable (req → response)

dynamic(fn) runs a Starlark callable per request. The callable receives a dict with method, path, headers, query, body and returns a response value. Use it for JWT signing, request-aware flag lookups, anything where the canned answer depends on the input.

Stdlib mocks (@faultbox/mocks/*.star)

ModuleConstructorBacked by
@faultbox/mocks/kafka.starkafka.broker(name, interface, topics, partitions, depends_on)franz-go/pkg/kfake — full broker
@faultbox/mocks/redis.starredis.server(name, interface, state, depends_on)miniredis — full RESP2
@faultbox/mocks/mongodb.starmongo.server(name, interface, collections, depends_on)hand-written BSON OP_MSG/OP_QUERY responder
@faultbox/mocks/grpc.stargrpc.server(name, interface, descriptors, services, depends_on, tls)protoreflect + FileDescriptorSet (RFC-023)
@faultbox/mocks/http.starhttp.server(name, interface, openapi, examples, validate, overrides, routes, default, depends_on, tls)kin-openapi + OpenAPI 3.0 (RFC-021)
@faultbox/mocks/jwt.starjwt.server(name, interface, issuer, key_id, depends_on) → struct with .service / .sign(claims=…) / .jwksAuto-generated EdDSA keypair + standard JWKS endpoint (v0.9.9, customer ask B3)

gRPC status-code shorthands (v0.9.8): the grpc stdlib now exposes per-code helpers so you don’t have to remember the grpc_error(code="…") incantation. Each wraps grpc_error() with a sensible default message.

load("@faultbox/mocks/grpc.star", "grpc")

grpc.server(
    name        = "users",
    interface   = interface("main", "grpc", 50051),
    descriptors = "./proto/users.pb",
    services    = {
        "/users.v1.Users/Get":       {"response": {"id": 1, "name": "Alice"}},
        "/users.v1.Users/Admin":     grpc.permission_denied("admin only"),
        "/users.v1.Users/Slow":      grpc.deadline_exceeded(),
        "/users.v1.Users/Outage":    grpc.unavailable(),
    },
)

Available: grpc.unavailable(), grpc.deadline_exceeded(), grpc.permission_denied(), grpc.unauthenticated(), grpc.not_found(), grpc.resource_exhausted(), grpc.internal().

Stdlib constructors are thin Starlark wrappers around mock_service() that translate protocol-specific kwargs into the opaque config= map. Same Go runtime, same event log, same fault() integration — just a nicer call site for protocols where routes={} doesn’t fit.

See the Mock Services reference for the per-protocol matrix, scope, and what mocks deliberately don’t do.


Event Sources

Event sources capture non-syscall events (stdout, WAL changes, message queues, log files) and emit them into the trace as first-class events. They are attached to services via the observe= parameter.

api = service("api", "./api",
    interface("public", "http", 8080),
    observe=[
        stdout(decoder=json_decoder()),
    ],
)

db = service("postgres",
    interface("main", "postgres", 5432),
    image="postgres:16",
    observe=[
        stdout(decoder=logfmt_decoder()),
        wal_stream(slot="faultbox"),
    ],
)

Events from sources have a type ("stdout", "wal", "topic", "tail", "poll") and are queryable by assertions and monitors — same as syscall events.

Built-in Event Sources

SourceConstructorWhat it captures
stdoutstdout(decoder=)Service stdout lines, decoded per line
stderrstderr(decoder=)Service stderr lines, decoded per line (zap/logrus default)
wal_streamwal_stream(slot=)Postgres logical replication (INSERT/UPDATE/DELETE)
topictopic(broker=, topic=, group=)Kafka/NATS topic messages
tailtail(path=)New lines appended to a file (inotify)
pollpoll(url=, interval=)Periodic HTTP endpoint fetch

Decoders

Decoders parse raw bytes (a line of output, a message payload) into structured event fields. The "data" field is auto-decoded on StarlarkEvent.data — no json.decode() needed.

DecoderConstructorParses
jsonjson_decoder()JSON objects — top-level keys become fields
logfmtlogfmt_decoder()key=value key2="value 2" pairs
regexregex_decoder(pattern=)Named capture groups from regex
# JSON: {"level":"INFO","msg":"started"} → e.data["level"] == "INFO"
observe=[stdout(decoder=json_decoder())]

# Logfmt: level=INFO msg="started" → e.data["msg"] == "started"
observe=[stdout(decoder=logfmt_decoder())]

# Regex: WAL: fsync /data/wal/001 → e.data["action"] == "fsync"
observe=[stdout(decoder=regex_decoder(pattern=r"WAL: (?P<action>\w+) (?P<path>.+)"))]

Querying Event Source Events

Event source events work with all assertion and query functions:

# Assert a WAL INSERT happened:
assert_eventually(where=lambda e: e.type == "wal" and e.data["op"] == "INSERT")

# Monitor stdout for errors:
monitor(lambda e: fail("unexpected error") if e.type == "stdout" and "ERROR" in e.data.get("level", ""))

# Query Kafka topic events:
msgs = events(where=lambda e: e.type == "topic" and e.data["topic"] == "orders.events")

Diagnosing SUT failures via stdout / stderr

When a containerized or binary SUT silently hangs at startup or fails behind a proxy, attach observe=[stdout(decoder=...)] (or stderr(...) if your service writes logs to fd 2) so the SUT’s own log lines become first-class trace events in the bundle. The bundle becomes self-diagnosing — you can see the last function the SUT reached without redeploying a debug build.

# zap, logrus, slog (default) all write to stderr — capture via stderr().
api = service("truck-api",
    interface("http", "http", 8080),
    binary="/usr/local/bin/truck-api",
    env={
        "DATABASE_HOST": db.mysql.proxy_host,
        "DATABASE_PORT": db.mysql.proxy_port,
    },
    observe=[stderr(decoder=json_decoder())],
    healthcheck=http("localhost:8080/health", timeout="60s"),
)

# Services that route logs to stdout explicitly (Python defaults,
# many CLIs) use stdout() — same surface, different fd.
worker = service("worker",
    interface("rpc", "tcp", 9000),
    binary="/usr/local/bin/worker",
    observe=[stdout(decoder=logfmt_decoder())],
)

# Capture both — the SUT writes errors to stderr, business events to
# stdout. Each emits with its own event type so you can filter the
# timeline and event log independently.
mixed = service("mixed",
    interface("api", "http", 8080),
    binary="/usr/local/bin/mixed",
    observe=[
        stdout(decoder=json_decoder()),
        stderr(decoder=json_decoder()),
    ],
)

Pre-v0.12.17 only stdout() existed; capturing zap/logrus output required a SUT-side env-gate (e.g. FB_LOG_TO_STDOUT=1) to redirect logs to fd 1. With stderr() you can capture default-configured Go services without touching their code.

Pre-v0.12.19 both sources only worked against binary-mode services; from v0.12.19 they apply to container services too. Faultbox reads Docker’s multiplexed log stream (client.ContainerLogs(...)) and demultiplexes internally, so a containerised SUT becomes self- diagnosing without any image change.

Combined with structured logging in the SUT (zap, slog, logrus, etc.), this turns “the SUT hangs and nobody knows why” into “seq 33: ‘done init config’ → seq 34: ‘FATAL: connect to db: invalid connection’.” The latter is actionable from the bundle alone — no SUT re-instrumentation needed across debug iterations.

This pattern was load-bearing for the v0.12.15.x diagnostic arc — three proxy bugs (handshake, RESP3 framing, goroutine ctx-rooting) each diagnosed from a customer bundle on the first attempt because the SUT’s fatal log was already in the trace. If you author specs that go to customers, default to including observe=[stdout(decoder=...)] on the SUT — it’s cheap to keep on, expensive to add later when something breaks.

Decoder choice:

  • json_decoder() — structured loggers (zap, zerolog, slog default)
  • logfmt_decoder()key=value style (logrus, klog)
  • regex_decoder(pattern=...) — unstructured logs; capture the fields you need

If your SUT defaults to a non-stdout sink (file, syslog, proprietary format), gate the stdout output behind an env var so production behavior is unaffected — the spec sets the env var, the SUT honors it only under test.


Scenarios & Generation

scenario(fn)

Registers a function as a scenario probe. The function runs as a test (like test_*) and is also available to faultbox generate, fault_scenario(), and fault_matrix().

A scenario is a probe — it exercises the system and returns an observable result. Scenarios SHOULD return values (for use with fault_scenario(expect=)) and SHOULD NOT contain assert_* calls. Assertions belong in the expect callback of fault_scenario() or fault_matrix().

def order_flow():
    """Place an order — returns response for external validation."""
    return orders.post(path="/orders", body='{"sku":"widget","qty":1}')

scenario(order_flow)  # runs as test_order_flow + registered for composition

Multi-step scenarios return a dict of observables:

def order_lifecycle():
    place = orders.post(path="/orders", body='{"sku":"widget","qty":1}')
    if place.status != 200:
        return {"phase": "place", "resp": place}
    check = orders.get(path="/inventory/widget")
    return {"phase": "check_stock", "resp": check, "order": place}

scenario(order_lifecycle)

Backward compatibility: Existing scenarios with inline assert_* calls still work — the return value is simply None. The convention of returning values is optional but recommended for composition with fault_scenario().

faultbox generate

Takes registered scenarios and systematically generates fault_assumption() definitions and a fault_matrix() call — one assumption per dependency × failure mode:

faultbox generate faultbox.star
# → order_flow.faults.star
# → health_check.faults.star

Generated .faults.star files use load() to import topology and scenario functions, then define fault assumptions and a matrix:

# order_flow.faults.star (auto-generated)
load("faultbox.star", "orders", "inventory", "order_flow")

inventory_down = fault_assumption("inventory_down",
    target = orders,
    connect = deny("ECONNREFUSED"),
)

inventory_slow = fault_assumption("inventory_slow",
    target = orders,
    connect = delay("500ms"),
)

fault_matrix(
    scenarios = [order_flow],
    faults = [inventory_down, inventory_slow],
)

Add overrides= to fault_matrix() for per-cell expectations. See CLI Reference for all flags.

expect_success() / expect_error_within(ms) / expect_hang() (v0.9.8)

Built-in outcome predicates for default_expect= and overrides={} in fault_matrix(). They replace the hand-rolled assertion helpers every mature spec grows (“is the result non-None? is the status under 500?”), giving each matrix row an explicit, machine-readable outcome intent that the v0.11.0 HTML report will consume (RFC-027, RFC-029).

fault_matrix(
    scenarios = [get_config, health],
    faults    = [db_down, upstream_slow],
    default_expect = expect_success(),            # row passes → 200-ish, fast
    overrides = {
        (get_config, upstream_slow): expect_error_within(ms = 10000),
        (health, db_down):           expect_success(),  # health stays green
        # Deliberately trigger a client-timeout path.
        (get_config, db_down):       expect_hang(),
    },
)

Behaviour:

  • expect_success() — scenario returned non-nil, status_code (if present) is < 500.
  • expect_error_within(ms=N) — scenario returned with an error shape (status_code >= 500 or non-empty error field) AND duration_ms <= N. “Returned 200 fast” violates this because the row was supposed to degrade.
  • expect_hang() — scenario did not return at all (row timeout cancelled it). Used for deliberately exercising caller-timeout paths.

Backwards compatible: default_expect= still accepts plain Starlark lambdas for rows that need custom checks.

Outcome taxonomy (v0.11.1)

Every fault_scenario / fault_matrix row produces one of five outcomes in manifest.json and the HTML report:

OutcomePillMeaning
passedgreenScenario returned; expect predicate (if any) accepted the result; any required faults fired.
failedredAn assert_* inside the scenario body fired before the predicate ran.
expectation_violatedamberBody assertions were clean, but the expect predicate disagreed with the result.
fault_bypassedgreyScenario returned cleanly, but a fault rule was installed and never matched a syscall (only with require_faults_fire=True).
erroredgreyScenario raised an untyped error (crash, timeout outside a predicate).

expectation_violated is a refinement of failed — legacy consumers that only know the three-way taxonomy still see the row in the summary.failed count. fault_bypassed is a refinement of passed — the scenario did pass, but the test is uninformative because the fault never fired. The predicate name (e.g. expect_success, expect_error_within, or lambda for user callables) lands in manifest.tests[].expectation and surfaces alongside the pill in the report’s tests table and drill-down header.

require_faults_fire=True on fault_matrix() (v0.11.1)

Opt-in gate that demotes rows where at least one installed fault rule never matched a syscall during the test:

fault_matrix(
    scenarios           = [checkout, orders, health],
    faults              = [db_down, cache_slow],
    default_expect      = expect_success(),
    require_faults_fire = True,
)

Without the flag (default), a cell returning HTTP 200 goes green even if the service cached an init-time response and never touched the faulted upstream. With the flag on, such cells become fault_bypassed (grey) and the drill-down lists every rule the runtime saw unmatched — usually a hint that the scenario is hitting a different code path than intended.

require_faults_fire composes with any default_expect / overrides — the fault-fired check runs after the expect predicate. Rows that already failed or errored keep their outcome.

File readers — load_file(), load_yaml(), load_json() (v0.9.8)

Spec-load-time file reads. Use them instead of hand-inlining SQL fixtures, OpenAPI specs, or JSON config as Starlark string constants.

# Read raw bytes — returns a Starlark string.
seed_sql = load_file("./seed.sql")
mysql.exec(sql = seed_sql)

# Decode YAML into Starlark dict/list/scalar.
fixture = load_yaml("./fixtures/users.yaml")
for user in fixture["users"]:
    print(user["email"])

# Same shape for JSON. Integer-valued numbers become int, not float.
rates = load_json("./config/rates.json")

Path resolution is relative to the spec file’s directory (same base as load()), not the process cwd. Absolute paths work but emit an INFO log line.

Security guardrails (see RFC-026 for rationale):

  • Network schemes (http://, https://, file:// with remote authority) refused with a clear error.
  • Size cap: 50 MB per file by default. Override via $FAULTBOX_LOAD_FILE_MAX_BYTES (decimal bytes).
  • $FAULTBOX_HERMETIC=1 rejects symlinks escaping the spec dir.
  • YAML non-string map keys refused (Starlark dicts need string keys).

Every file read via these builtins is also captured into the .fb bundle’s spec/ directory so faultbox inspect and faultbox replay see the exact source tree that produced the run. No separate plumbing — piggybacks on the existing RFC-025 Phase 4 capture.

load(filename, symbol1, symbol2, ...)

Imports symbols from another .star file. The loaded file shares the same runtime (service registry, builtins, event log).

# custom-failures.star
load("faultbox.star", "orders", "inventory", "order_flow")

inventory_down = fault_assumption("inventory_down",
    target = inventory,
    connect = deny("ECONNREFUSED"),
)

fault_scenario("order_inventory_down",
    scenario = order_flow,
    faults = inventory_down,
    expect = lambda r: assert_eq(r.status, 503),
)

Paths are resolved relative to the loading file’s directory. Modules are cached — each file is executed at most once.

print(...)

Outputs to stderr during test execution. Use for debugging event structures:

resp = db.main.query(sql="SELECT * FROM users")
print(resp.data)       # shows the auto-decoded dict/list structure
print(resp.data[0])    # shows first row

writes = events(service="db", syscall="write")
print(len(writes), "writes recorded")

Tests

Test functions are named test_* and discovered automatically. Each test runs with fresh service instances (restarted between tests). Scenario-registered functions also run as tests (as test_<name>).

def test_happy_path():
    """Normal operation — all services healthy."""
    resp = api.get(path="/health")
    assert_eq(resp.status, 200)

Execution Order

For each test function:

1. Reset event log (fresh trace per test)
2. Wait for ports to be free (cleanup from previous test)
3. Start all services in dependency order
4. Wait for healthchecks to pass
5. Run the test function
6. Stop all services (SIGTERM → SIGKILL after 2s)
7. Capture syscall trace and report result

Running Tests

faultbox test faultbox.star                        # all tests
faultbox test faultbox.star --test happy_path      # one test
faultbox test faultbox.star --debug                # verbose logging
faultbox test faultbox.star --output trace.json    # JSON trace output
faultbox test faultbox.star --shiviz trace.shiviz  # ShiViz output

Steps

Steps are method calls on service interfaces that exercise the running system.

Step Addressing

api.public.post(path="/data/key")   # explicit interface
api.post(path="/data/key")          # shorthand (single-interface service)
db.main.send(data="PING")           # TCP interface

When a service has one interface, the interface name can be omitted.

HTTP Steps

Available on interfaces with protocol: "http".

Operations: get, post, put, delete, patch

resp = api.get(path="/health")
resp = api.post(path="/orders", body='{"sku":"widget","qty":1}')
resp = api.post(path="/data", body="data", headers={"Authorization": "Bearer token"})
ParameterTypeRequiredDescription
pathstringnoURL path (default: "/")
bodystringnoRequest body
headersdictnoHTTP headers

Response object:

resp = api.post(path="/orders", body='{"sku":"widget"}')
resp.status       # int — HTTP status code (200, 404, 500, ...)
resp.body         # string — response body (trimmed)
resp.ok           # bool — True if step succeeded
resp.error        # string — error message if step failed
resp.duration_ms  # int — request duration in milliseconds

TCP Steps

Available on interfaces with protocol: "tcp".

Operation: send

resp = db.main.send(data="PING")    # returns response as string
assert_eq(resp, "PONG")

resp = db.main.send(data="CHECK widget")
assert_eq(resp, "100")
ParameterTypeRequiredDescription
datastringyesData to send (newline appended automatically)

TCP send returns a string (the first response line), not a response object. It opens a connection, sends one line, reads one line, and closes.


Faults

Faults inject failures at the syscall level via seccomp-notify.

fault(service, run=callback, **syscall_faults)

Scoped fault injection — faults are active only during the callback:

def test_inventory_slow():
    def scenario():
        resp = orders.post(path="/orders", body='{"sku":"gadget","qty":1}')
        assert_eq(resp.status, 200)
        assert_true(resp.duration_ms > 400)
    fault(inventory, write=delay("500ms"), run=scenario)

The run parameter takes a callable. Faults are automatically removed when the callback returns (even on error).

Multiple faults can be applied at once:

fault(db,
    write=delay("1s"),
    connect=deny("ECONNREFUSED"),
    run=scenario,
)

fault_start(service, ...) / fault_stop(service)

Imperative fault control:

def test_imperative():
    fault_start(db, write=delay("500ms"))
    resp = api.post(path="/data/key1", body="value")
    assert_eq(resp.status, 200)
    fault_stop(db)

Use fault() with run= when possible — it guarantees cleanup.

fault_all([services], **syscall_faults, run=callback)

Apply the same fault to multiple services simultaneously. Useful for testing “all replicas down” or “entire dependency tier fails”:

# All three Kafka brokers down at once.
fault_all([kafka1, kafka2, kafka3],
    connect = deny("ECONNREFUSED"),
    run = scenario,
)

# All databases slow.
fault_all([pg_primary, pg_replica],
    write = delay("500ms"),
    run = scenario,
)

Equivalent to nesting fault() calls but without the lambda pyramid. Faults are applied to all services before the callback runs, and removed from all services after.

trace(service, syscalls=[...], run=callback)

Observe syscalls without injecting faults. Installs seccomp filters that record events but allow all syscalls to proceed normally.

def test_observe_writes():
    def scenario():
        resp = orders.post(path="/orders", body='{"sku":"widget","qty":1}')
        assert_eq(resp.status, 200)
        assert_eventually(service="inventory", syscall="write", path="*.wal")
    trace(inventory, syscalls=["write", "openat", "fsync"], run=scenario)

Use trace() when you want to assert on internal behavior of a healthy system — no faults, just observation.

trace_start(service, syscalls=[...]) / trace_stop(service)

Imperative trace control:

def test_observe_then_fault():
    trace_start(inventory, syscalls=["write", "fsync"])
    resp = orders.post(path="/orders", body='...')
    assert_eventually(service="inventory", syscall="write", path="*.wal")
    trace_stop(inventory)

op(syscalls=[...], path=)

Define a named operation that groups related syscalls. Used in service() declarations with the ops= parameter.

db = service("db", "./db",
    interface("main", "tcp", 5432),
    healthcheck=tcp("localhost:5432"),
    ops={
        "persist": op(syscalls=["write", "fsync"]),
        "wal_write": op(syscalls=["write", "fsync"], path="/tmp/*.wal"),
    },
)

def test_persist_failure():
    def scenario():
        resp = api.post(path="/data/key", body="val")
        assert_true(resp.status >= 500)
    fault(db, persist=deny("EIO"), run=scenario)

Named operations can include a path filter — only syscalls on matching files are faulted. The trace shows the operation name: persist(write) deny(EIO).

delay(duration, probability=)

Delays a syscall by sleeping before allowing it to proceed.

delay("500ms")              # 500ms delay, 100% probability
delay("2s")                 # 2 second delay
delay("100ms", probability="50%")  # 50% chance of delay
delay("500ms", label="slow WAL")   # labeled for diagnostics
ParameterTypeDefaultDescription
durationstringGo duration: "500ms", "2s", "100us"
probabilitystring"100%"Chance the fault fires
labelstringHuman-readable label shown in trace output

deny(errno, probability=, label=)

Fails a syscall by returning an error code.

deny("ECONNREFUSED")                     # 100% connection refused
deny("EIO", probability="10%")           # 10% I/O error
deny("ENOSPC")                           # disk full
deny("EIO", label="WAL write")           # labeled for diagnostics
ParameterTypeDefaultDescription
errnostringError code (see table below)
probabilitystring"100%"Chance the fault fires
labelstringHuman-readable label shown in trace output

Labels in diagnostics: When a labeled fault fires, the trace output shows the label alongside the decision:

  syscall trace (85 events):
    #72  db    write   deny(input/output error)  [WAL write]
    #73  db    write   deny(input/output error)  [WAL write]
  fault rule on db: write=deny(EIO) → filter:[write,writev,pwrite64] label="WAL write"

Fault Targeting

Keyword arguments map syscall names to faults:

fault(inventory, write=delay("500ms"), run=fn)     # delay inventory's write() syscalls
fault(orders, connect=deny("ECONNREFUSED"), run=fn) # deny orders' connect()
fault(inventory, fsync=deny("EIO"), run=fn)         # fail inventory's fsync
fault(inventory, openat=deny("ENOSPC"), run=fn)     # fail inventory's file opens

Faults apply to the service’s own syscalls:

# CORRECT: orders can't connect to inventory (orders makes outbound connect)
fault(orders, connect=deny("ECONNREFUSED"), run=fn)

# CORRECT: inventory WAL write is slow (inventory's write syscall is delayed)
fault(inventory, write=delay("500ms"), run=fn)

Supported Errno Values

File/IO: ENOENT, EACCES, EPERM, EIO, ENOSPC, EROFS, EEXIST, ENOTEMPTY, ENFILE, EMFILE, EFBIG

Network: ECONNREFUSED, ECONNRESET, ECONNABORTED, ETIMEDOUT, ENETUNREACH, EHOSTUNREACH, EADDRINUSE, EADDRNOTAVAIL

Generic: EINTR, EAGAIN, ENOMEM, EBUSY, EINVAL

Supported Syscalls

File/IO: openat, read, write, writev, readv, close, fsync, mkdirat, unlinkat, faccessat, fstatat, getdents64, readlinkat

Network: connect, socket, bind, listen, accept, sendto, recvfrom

Process: clone, execve, wait4, getpid, getrandom


Protocol-Level Faults

Syscall-level fault(service, ...) operates at the kernel level. Protocol-level fault(interface_ref, ...) operates at the application protocol level via a transparent proxy.

fault(interface_ref, *rules, run=, source=)

When the first argument is an interface reference (e.g., db.main), Faultbox starts a transparent proxy that speaks the interface’s protocol and injects faults matching the rules.

# Syscall level — first arg is service:
fault(db, write=deny("EIO"), run=scenario)

# Protocol level — first arg is interface_ref:
fault(db.main, error(query="INSERT*", message="disk full"), run=scenario)
fault(api.public, response(path="/orders", status=429), run=scenario)
fault(kafka.main, drop(topic="orders.*"), run=scenario)

Optional source= targets a specific consumer when multiple services connect to the same interface:

fault(kafka.main, source=worker,
    drop(topic="orders.*"),
    run=scenario,
)

Protocol fault builtins

These create ProxyFaultDef values passed as positional args to fault(). All support glob patterns for matching.

response(method=, path=, status=, body=, command=, key=, value=)

Return a custom response without forwarding to the real service.

response(method="POST", path="/orders", status=429, body='{"error":"rate_limited"}')
response(command="GET", key="cache:*")               # Redis nil (empty body)
response(command="GET", key="cache:*", value="stale") # Redis custom value

error(method=, path=, query=, command=, key=, topic=, message=, status=)

Return a protocol-specific error.

error(query="INSERT*", message="disk full")          # Postgres/MySQL
error(method="/pkg.Svc/Method", status=14)            # gRPC UNAVAILABLE
error(command="SET", key="session:*", message="READONLY")  # Redis
error(topic="orders.*", message="LEADER_NOT_AVAILABLE")    # Kafka

delay(method=, path=, query=, command=, key=, topic=, delay=)

Delay matching requests, then forward normally.

delay(path="/data/*", delay="500ms")                 # HTTP
delay(query="SELECT*", delay="3s")                   # Postgres/MySQL
delay(command="GET", delay="2s")                     # Redis
delay(topic="orders.events", delay="5s")             # Kafka

Note: delay() without a positional duration returns a protocol-level fault. delay("500ms") with a positional duration returns a syscall-level fault. Same builtin, context-dependent.

drop(method=, path=, topic=, probability=)

Drop the connection or message.

drop(method="POST", path="/upload")                  # HTTP — TCP reset
drop(topic="orders.events", probability="30%")       # Kafka — message loss

duplicate(topic=)

Deliver a message twice (for idempotency testing).

duplicate(topic="orders.events")                     # Kafka/NATS

Supported protocols

ProtocolMatch byFault builtins
httpmethod=, path=response, error, delay, drop
http2method=, path=response, error, delay, drop
postgresquery= (SQL-aware canonicalized match)error, delay, drop
mysqlquery= (SQL-aware canonicalized match)error, delay, drop
rediscommand=, key=error, response, delay, drop
grpcmethod=error, delay, drop
kafkatopic=drop, delay, error, duplicate
mongodbop= / method= (cmd), collection= / key=error, delay, drop
cassandraquery= (CQL)error, delay, drop
clickhousequery= (SQL, matches body or ?query=)error, delay, drop
udp(datagram-level)drop, delay
amqptopic= (routing key)drop, delay, error
natstopic= (subject)drop, delay
memcachedcommand=, key=error, response, delay, drop

SQL query matching (v0.8.2+)

For the postgres and mysql proxies, query= patterns match incoming SQL after both sides are run through a canonicalizer. This frees rule authors from guessing exactly how a driver or ORM will format the query on the wire:

  • Case is folded (keywords lowercased; string-literal contents preserved).
  • Whitespace runs collapse to single spaces; leading/trailing whitespace and trailing ; are stripped.
  • ? and $1/$2/$N placeholders normalize to a shared $? marker, so a rule written with MySQL-style ? matches a Postgres-style $1 query and vice versa.
  • =, <, >, !=, <>, <=, >=, ,, (, ) get space-padded so tight driver output ("id=$1") matches user-written patterns ("id = ?").
  • Trailing * in the pattern remains a glob suffix (INSERT*).

A single rule pattern therefore matches every reasonable shape a driver might emit:

# This rule fires on every variant below:
rules = [mysql.deadlock(query = "UPDATE users SET role = ? WHERE id = ?")]

# ✓ "UPDATE users SET role = ? WHERE id = ?"
# ✓ "update users set role=$1 where id=$2"
# ✓ "UPDATE  users  SET role=$1 WHERE id=$2;"

Trace events

Protocol proxy actions emit type="proxy" events into the trace:

assert_eventually(where=lambda e: e.type == "proxy" and e.data.get("action") == "error")

Importing recipes (standard library)

Faultbox ships a curated library of protocol-specific failure helpers embedded in the binary. Load them via the @faultbox/ prefix:

load("@faultbox/recipes/mongodb.star",    "mongodb")
load("@faultbox/recipes/cassandra.star",  "cassandra")
load("@faultbox/recipes/clickhouse.star", "clickhouse")

broken = fault_assumption("broken",
    target = db.main,
    rules  = [
        mongodb.disk_full(collection = "orders"),
        cassandra.unavailable(),
        clickhouse.too_many_parts(),
    ],
)

Each recipe file exports one namespace struct named after the protocol (see RFC-018). Zero name collisions when you load recipes for multiple protocols — mongodb.disk_full and postgres.disk_full coexist naturally.

Discover what’s available:

$ faultbox recipes list
$ faultbox recipes show mongodb     # print a recipe's source

Recipes ship with the binary — no filesystem setup, no network fetch. See RFC-019 for the distribution convention.

User-authored recipes work identically via relative paths:

load("@faultbox/recipes/mongodb.star", "mongodb")   # stdlib
load("./recipes/checkout.star",        "checkout")  # your project

rules = [mongodb.disk_full(), checkout.post_q2_race()]

The @faultbox/ prefix is reserved for the stdlib; everything else hits the filesystem relative to the spec’s directory.


Assertions

Starlark has no built-in assert statement. Faultbox provides assertion builtins — value checks, temporal properties, and ordering verification.

Value Assertions

assert_true(condition, message=)

assert_true(resp.status == 200)
assert_true("ok" in resp.body, "expected ok in body")
assert_true(resp.duration_ms < 1000, "response too slow")

assert_eq(a, b, message=)

assert_eq(resp.status, 200)
assert_eq(resp.body, "hello")
assert_eq(db.main.send(data="PING"), "PONG")

Temporal Assertions

Temporal assertions query the syscall event trace captured during the current test. Every intercepted syscall is recorded with service attribution, decision, and path — temporal assertions search this trace.

assert_eventually(service=, syscall=, path=, decision=, where=)

Asserts that at least one event matches all given filters. Use this to verify that an expected operation occurred.

# Simple filter matching:
assert_eventually(service="inventory", syscall="openat", path="/tmp/inventory.wal")
assert_eventually(service="inventory", syscall="fsync", decision="deny*")
assert_eventually(service="orders", syscall="connect")

# Lambda predicate for complex conditions:
assert_eventually(where=lambda e: e.service == "db" and e.data.get("table") == "users")
assert_eventually(where=lambda e: e.type == "wal" and e.data["op"] == "INSERT")

assert_never(service=, syscall=, path=, decision=, where=)

Asserts that no event matches all given filters. Use this to verify that an operation did NOT occur.

# Simple filter matching:
assert_never(service="inventory", syscall="openat", path="/tmp/inventory.wal")
assert_never(service="db", syscall="write", decision="deny*")

# Lambda predicate:
assert_never(where=lambda e: e.decision.startswith("deny") and e.label == "critical path")

Filter parameters

Two ways to filter events — dict matching (simple) and lambda predicates (powerful). Both can be combined.

Dict matching — keyword arguments as flat string filters:

ParameterTypeDescription
servicestringService name (e.g., "inventory", "orders")
syscallstringSyscall name (e.g., "write", "openat", "connect")
pathstringFile path (for file syscalls like openat)
decisionstringFault decision (e.g., "allow", "deny*", "delay*")

Glob matching: Values ending with * match as a prefix. Values starting with * match as a suffix. Example: decision="deny*" matches "deny(ECONNREFUSED)", "deny(EIO)", etc.

Lambda predicatewhere=lambda e: ... for complex conditions:

The lambda receives a StarlarkEvent (see Type Reference) with .service, .type, .data, .fields, .seq, and direct field access.

# Access auto-decoded structured data:
where=lambda e: e.data["table"] == "users" and e.data["op"] == "INSERT"

# Combine multiple conditions:
where=lambda e: e.service == "db" and int(e.fields.get("size", "0")) > 4096

Ordering Assertions

assert_before(first=, then=)

Asserts that the first event matching first occurs before the first event matching then in the trace. Arguments can be dicts (same filter keys as assert_eventually) or lambda predicates.

# Dict matching:
assert_before(
    first={"service": "inventory", "syscall": "openat", "path": "/tmp/inventory.wal"},
    then={"service": "inventory", "syscall": "write", "path": "/tmp/inventory.wal"},
)

# Lambda predicates with correlation — then= receives the matched first event:
assert_before(
    first=lambda e: e.data["op"] == "INSERT",
    then=lambda e: e.data["ref_id"] == e.first.data["id"],
)

Event Query

events(service=, syscall=, path=, decision=, where=)

Returns a list of matching events from the current test’s trace. Each element is a StarlarkEvent with .service, .type, .data, .fields.

# Dict matching:
retries = events(service="orders", syscall="connect", decision="deny*")
print("retries:", len(retries))

# Lambda predicate:
big_writes = events(where=lambda e: e.data.get("size", 0) > 4096)

```python
# Count how many connect retries happened.
retries = events(service="orders", syscall="connect", decision="deny*")
print("retries:", len(retries))

# Get all WAL operations.
wal_ops = events(service="inventory", path="/tmp/inventory.wal")

Concurrency

parallel(fn1, fn2, ...)

Runs multiple step callables concurrently. Returns results in argument order. Use with --runs N to explore different interleavings — each seed produces a different scheduling order.

def test_concurrent_orders():
    """Two orders at once — no double-spend."""
    results = parallel(
        lambda: orders.post(path="/orders", body='{"sku":"widget","qty":1}'),
        lambda: orders.post(path="/orders", body='{"sku":"widget","qty":1}'),
    )
    ok_count = sum(1 for r in results if r.status == 200)
    assert_eq(ok_count, 1, "exactly one order should succeed")
faultbox test faultbox.star --runs 100 --show fail   # random interleavings
faultbox test faultbox.star --explore=all             # exhaustive: ALL permutations
faultbox test faultbox.star --explore=sample           # 100 random orderings
faultbox test faultbox.star --seed 42                  # replay exact ordering

nondet(service, ...)

Excludes one or more services from interleaving control during parallel(). Their syscalls proceed immediately without being held. Use this for services that make nondeterministic background requests (healthchecks, metrics, logging).

def test_concurrent_orders():
    nondet(monitoring_svc, cache_svc)  # exclude from ordering exploration
    results = parallel(
        lambda: orders.post(path="/orders", body='...'),
        lambda: orders.post(path="/orders", body='...'),
    )

Virtual Time

When --virtual-time is enabled, fault delays advance a virtual clock instead of sleeping on real wall-clock time. A test with delay("2s") completes in milliseconds. This makes exhaustive exploration practical.

faultbox test faultbox.star --virtual-time                    # fast delays
faultbox test faultbox.star --virtual-time --explore=all      # fast + exhaustive

Scope: Virtual time applies to:

  • Fault delays (ActionDelay) — skip sleep, advance clock
  • nanosleep/clock_nanosleep syscalls — return immediately (for C/Rust targets)
  • clock_gettime — return virtual timestamp (for C/Rust targets)

Go targets limitation: Go uses vDSO for time.Now() (no syscall, not interceptable) and futex for time.Sleep(). Virtual time primarily speeds up fault delays, which is the main bottleneck in multi-run exploration.


Monitors

monitor(callback, service=, syscall=, path=, decision=) → MonitorDef

Creates a first-class monitor — a reusable value that can be stored in variables and passed to fault_assumption(monitors=), fault_scenario(monitors=), and fault_matrix(monitors=).

The callback receives a dict with event fields and is called on every matching event during test execution. If the callback raises an error (via fail() or assert_*), the test fails with “monitor violation”.

Event dict fields passed to callback:

KeyTypeDescription
"seq"intMonotonic sequence number
"type"string"syscall", "proxy", "lifecycle", etc.
"service"stringService that produced the event
"syscall"stringSyscall name (for syscall events)
"path"stringFile path (for file syscalls)
"decision"string"allow", "deny(EIO)", "delay(500ms)", etc.
"label"stringFault label if set
"latency_ms"stringLatency (for delay faults)

Filter kwargs support glob patterns (e.g., decision="deny*").

# First-class monitor — stored as variable, reusable.
def check_no_wal_write(event):
    fail("unexpected WAL write: seq=" + str(event["seq"]))

no_wal_write = monitor(check_no_wal_write,
    service = "inventory",
    syscall = "openat",
    path = "/tmp/inventory.wal",
)

# Use with fault_assumption:
inventory_down = fault_assumption("inventory_down",
    target = inventory,
    connect = deny("ECONNREFUSED"),
    monitors = [no_wal_write],  # fires during any test using this assumption
)

Inline usage (backward compatible): When called inside a running test_* function, monitor() auto-registers on the event log immediately:

def test_manual():
    monitor(lambda e: fail("bad") if e["decision"].startswith("deny") else None,
            service="inventory", syscall="write")
    orders.post(path="/orders", body='{"sku":"widget","qty":1}')

Monitors are cleared between tests automatically.


Fault Composition

The fault composition builtins separate what the system does (scenario), what goes wrong (fault assumption), and what correct means (expect oracle).

fault_assumption(name, target=, **syscall_faults, rules=, monitors=, faults=, description=)

Creates a named, reusable fault configuration. Returns a FaultAssumption value that can be stored in variables and passed to fault_scenario(), fault_matrix(), or fault().

# Syscall-level fault: deny connections to inventory.
inventory_down = fault_assumption("inventory_down",
    target = inventory,
    connect = deny("ECONNREFUSED"),
)

# Syscall-level fault: disk full on inventory WAL writes.
disk_full = fault_assumption("disk_full",
    target = inventory,
    write = deny("ENOSPC"),
)

# Latency fault on the order service network.
slow_network = fault_assumption("slow_network",
    target = orders,
    connect = delay("200ms"),
    write = delay("100ms"),
)

Syscall kwargs resolve in the same order as fault():

  1. Named operation on target.ops → expands to the op’s syscalls + path glob
  2. Syscall family name → expands via family (e.g., write → write, writev, pwrite64)
  3. Raw syscall name → used as-is

Named operations:

inventory = service("inventory", "/tmp/inventory-svc",
    interface("main", "tcp", 5432),
    ops = {"persist": op(syscalls=["write", "fsync"], path="/tmp/*.wal")},
)

wal_corrupt = fault_assumption("wal_corrupt",
    target = inventory,
    persist = deny("EIO"),  # expands to write+fsync on /tmp/*.wal
)

Protocol-level faults (when target is an interface reference):

pg_insert_fail = fault_assumption("pg_insert_fail",
    target = postgres.main,
    rules = [error(query="INSERT*", message="disk full")],
)

Composition — combine multiple assumptions into one:

cascade = fault_assumption("cascade",
    faults = [inventory_down, slow_network],
    description = "Inventory unreachable AND slow network",
)
# cascade inherits all rules and monitors from both children.

With monitors:

def check_no_traffic(event):
    fail("traffic reached inventory despite being down")

no_traffic = monitor(check_no_traffic, service="inventory", syscall="read")

inventory_down = fault_assumption("inventory_down",
    target = inventory,
    connect = deny("ECONNREFUSED"),
    monitors = [no_traffic],  # active whenever this assumption is applied
)

Using with fault() directly:

def test_order_down():
    def scenario():
        resp = orders.post(path="/orders", body='{"sku":"widget","qty":1}')
        assert_eq(resp.status, 503)
    fault(inventory_down, run=scenario)

fault_scenario(name, scenario=, faults=, expect=, monitors=, timeout=)

Composes a scenario probe with fault assumptions and an expect oracle. Registers as test_<name>.

# Basic: scenario + fault + oracle.
fault_scenario("order_inventory_down",
    scenario = order_flow,
    faults = inventory_down,
    expect = lambda r: (
        assert_eq(r.status, 503),
        assert_never(service="inventory", syscall="openat", path="/tmp/inventory.wal"),
    ),
)

# Multiple faults applied simultaneously.
fault_scenario("order_cascade",
    scenario = order_flow,
    faults = [inventory_down, slow_network],
    expect = lambda r: assert_true(r.status >= 500),
)

# Smoke test — no expect, just "must not crash".
fault_scenario("order_disk_full_smoke",
    scenario = order_lifecycle,
    faults = disk_full,
)

# With scenario-level monitor and custom timeout.
fault_scenario("order_retries",
    scenario = order_flow,
    faults = inventory_down,
    monitors = [retry_monitor],
    expect = lambda r: assert_eq(r.status, 503),
    timeout = "10s",
)

Execution model:

  1. Register monitors (from fault assumptions + scenario-level)
  2. Apply fault rules from all assumptions
  3. Run the scenario function, capture its return value
  4. If any monitor fired a violation → test fails (expect not called)
  5. Call expect(return_value) — expect validates via assert_* side-effects
  6. Remove faults and monitors

Parameters:

ParameterTypeDescription
namestringTest name → registered as test_<name>
scenariocallableThe probe function (should return observable)
faultsFaultAssumption or listFault(s) to apply
expectcallable or NoneOracle: (result) → void, calls assert_* to validate
monitorslist of MonitorDefScenario-level invariants
timeoutstringMax duration (default "30s")

fault_matrix(scenarios=, faults=, default_expect=, overrides={}, monitors=[], exclude=[])

Generates the cross-product of scenarios × fault assumptions. Each cell becomes a fault_scenario registered as test_matrix_<scenario>_<fault>.

fault_matrix(
    scenarios = [order_flow, health_check],
    faults = [inventory_down, disk_full, slow_network],
    default_expect = lambda r: assert_true(r != None, "must return a response"),
    overrides = {
        (order_flow, inventory_down): lambda r: (
            assert_eq(r.status, 503),
            assert_true("unreachable" in r.body),
        ),
        (order_flow, slow_network): lambda r: (
            assert_eq(r.status, 200),
            assert_true(r.duration_ms > 100),
        ),
        (health_check, inventory_down): lambda r: assert_eq(r.status, 503),
    },
    exclude = [
        (health_check, disk_full),  # health check doesn't touch disk
    ],
)
# Generates 5 tests: 2×3 - 1 excluded

Override precedence: cell-specific override > default_expect > None (smoke test).

Matrix report — when matrix tests run, the terminal shows a summary table:

Fault Matrix: 2 scenarios × 3 faults = 5 cells

                    │ inventory_down │ disk_full     │ slow_network
────────────────────┼────────────────┼───────────────┼──────────────
order_flow          │ PASS (12ms)    │ PASS (8ms)    │ PASS (310ms)
health_check        │ PASS (5ms)     │ — (excluded)  │ PASS (205ms)

Result: 5/5 passed

JSON output (--format json) includes a "matrix" section with scenarios, faults, cells, and pass/fail counts.

Parameters:

ParameterTypeDescription
scenarioslist of callablesScenario probe functions
faultslist of FaultAssumptionFault assumptions
default_expectcallable or NoneDefault oracle for cells without overrides
overridesdict(scenario, fault) tuple → cell-specific expect
monitorslist of MonitorDefMatrix-wide invariants (all cells)
excludelist of tuples(scenario, fault) pairs to skip

Data Integrity Verification

The expect oracle in fault_scenario() and fault_matrix() can use protocol steps to query service state directly — not just check the HTTP response. This is how you verify data integrity after fault injection.

Querying the database in expect

After a fault scenario, ask the database whether the data is correct:

def create_order():
    return api.public.post(path="/orders", body='{"item":"widget","qty":1}')

scenario(create_order)

db_write_error = fault_assumption("db_write_error",
    target = db,
    write = deny("EIO"),
)

fault_scenario("no_partial_rows_on_error",
    scenario = create_order,
    faults = db_write_error,
    expect = lambda r: (
        # 1. API returned an error
        assert_true(r.status >= 500, "should fail on DB write error"),

        # 2. No orphaned rows in the database
        assert_eq(
            db.main.query(sql="SELECT count(*) as n FROM orders WHERE status='pending'").data[0]["n"],
            0,
            "no partial rows should exist after failed INSERT"),

        # 3. The fault actually fired
        assert_eventually(type="syscall", service="db", decision="deny*"),
    ),
)

The key: db.main.query(sql=...) is a protocol step — it talks to the running database over the wire. Inside expect, the service is still running, so you can query its actual state.

Querying Redis in expect

Verify cache state after a fault:

redis_down = fault_assumption("redis_down",
    target = api,
    connect = deny("ECONNREFUSED"),
)

fault_scenario("no_stale_cache_after_failure",
    scenario = create_order,
    faults = redis_down,
    expect = lambda r: (
        assert_true(r.status >= 500),

        # No stale cache entries should remain
        assert_eq(
            len(redis.main.keys(pattern="order:*").data),
            0,
            "no cached order keys after Redis failure"),
    ),
)

Verifying Kafka message integrity

Use event sources (observe=) to track produced and consumed messages, then verify in expect:

kafka = service("kafka",
    interface("broker", "kafka", 9092),
    image = "confluentinc/cp-kafka:7.6",
    observe = [topic("order-events", decoder=json_decoder())],
    healthcheck = tcp("localhost:9092"),
)

fault_scenario("no_message_loss_on_db_error",
    scenario = create_order,
    faults = db_write_error,
    expect = lambda r: (
        assert_true(r.status >= 500),

        # No Kafka events should be published if DB write failed
        assert_never(where=lambda e:
            e.type == "topic" and e.data.get("topic") == "order-events"),
    ),
)

For “all produced messages were consumed” (no message loss):

fault_scenario("consumer_catches_up",
    scenario = publish_and_consume,
    faults = consumer_slow,
    expect = lambda r: (
        assert_eq(
            len(events(where=lambda e: e.type == "topic"
                and e.data.get("action") == "produce")),
            len(events(where=lambda e: e.type == "topic"
                and e.data.get("action") == "consume")),
            "every produced message must be consumed"),
    ),
)

Verifying proxy-injected errors

When using protocol-level faults (via rules=), the proxy logs every injected error as a type="proxy" event. Verify the fault actually fired:

db_insert_fail = fault_assumption("db_insert_fail",
    target = db.main,
    rules = [error(query="INSERT*", message="disk full")],
)

fault_scenario("insert_rejected_by_proxy",
    scenario = create_order,
    faults = db_insert_fail,
    expect = lambda r: (
        assert_true(r.status >= 500),

        # Verify the proxy intercepted and rejected the INSERT
        assert_eventually(type="proxy", where=lambda e:
            "INSERT" in e.data.get("query", "")
            and e.data.get("action") == "error"),
    ),
)

Monitor pattern: continuous data integrity

For invariants that must hold across ALL scenarios and faults — not just one specific test — use monitors on fault assumptions:

def no_orphan_events(event):
    """If Kafka event published, DB row must exist."""
    if event["type"] == "topic" and event.get("order_id"):
        rows = db.main.query(
            sql="SELECT count(*) as n FROM orders WHERE id='" + event["order_id"] + "'"
        ).data[0]["n"]
        if rows == 0:
            fail("orphan Kafka event: order " + event["order_id"] + " not in DB")

orphan_check = monitor(no_orphan_events)

# Attach to every fault that could cause this inconsistency
db_write_error = fault_assumption("db_write_error",
    target = db,
    write = deny("EIO"),
    monitors = [orphan_check],
)

Summary: which tool for which check

What you want to verifyToolExample
HTTP response status/bodyexpect lambda on rassert_eq(r.status, 503)
Database row exists/absentdb.main.query(sql=...) in expectassert_eq(row_count, 0)
Redis key exists/absentredis.main.keys(pattern=...) in expectassert_eq(len(keys), 0)
Kafka message published/absentassert_eventually/assert_never on eventsassert_never(type="topic", ...)
Proxy-injected error firedassert_eventually(type="proxy", ...)Verify INSERT was rejected
Fault actually firedassert_eventually(decision="deny*")Avoid silent test pass
Continuous invariant across all testsmonitor() on fault_assumption”no orphan events”
Message loss / consumer lagCompare events() counts in expectproduced count == consumed count

Network Partitions

partition(svc_a, svc_b, run=callback)

Creates a bidirectional network partition between two services. While the callback runs, svc_a cannot connect to svc_b and vice versa. Connections are denied with ECONNREFUSED filtered by destination address.

def test_network_partition():
    """Orders can't reach inventory — returns 503."""
    def scenario():
        resp = orders.post(path="/orders", body='{"sku":"widget","qty":1}')
        assert_eq(resp.status, 503)
        assert_never(service="inventory", syscall="openat", path="/tmp/inventory.wal")
    partition(orders, inventory, run=scenario)

Unlike fault(orders, connect=deny("ECONNREFUSED")) which blocks all outbound connections, partition() only blocks connections to the specific service’s ports — other connectivity remains unaffected.

Using Starlark Expressions

Since the configuration is code, assertions are composable:

# Compare strings
assert_true("error" in resp.body)
assert_true(resp.body.startswith("stored:"))

# Numeric comparisons
assert_true(resp.duration_ms > 400)
assert_true(resp.status >= 200 and resp.status < 300)

# Conditional logic
if resp.status != 200:
    print("unexpected status:", resp.status, "body:", resp.body)
    assert_true(False, "expected 200")

Trace Output

Every intercepted syscall is recorded in an ordered event log with:

  • Sequential event number
  • Timestamp
  • Service name
  • Syscall name, PID, decision, file path
  • PObserve-compatible event_type and partition_key
  • ShiViz-compatible vector clock

JSON Trace (--output trace.json)

{
  "version": 1,
  "star_file": "faultbox.star",
  "duration_ms": 1640,
  "pass": 2,
  "fail": 1,
  "tests": [
    {
      "name": "test_happy_path",
      "result": "pass",
      "seed": 0,
      "duration_ms": 225,
      "events": [
        {
          "seq": 1,
          "timestamp": "2026-03-25T19:15:07.547Z",
          "type": "service_started",
          "event_type": "lifecycle.started",
          "partition_key": "inventory",
          "service": "inventory",
          "vector_clock": {"inventory": 1}
        },
        {
          "seq": 42,
          "timestamp": "2026-03-25T19:15:07.650Z",
          "type": "syscall",
          "event_type": "syscall.openat",
          "partition_key": "inventory",
          "service": "inventory",
          "fields": {
            "syscall": "openat",
            "pid": "1234",
            "decision": "allow",
            "path": "/tmp/inventory.wal"
          },
          "vector_clock": {"inventory": 20, "orders": 5}
        }
      ]
    },
    {
      "name": "test_flaky",
      "result": "fail",
      "reason": "assert_true: expected 200 or 503, got 0",
      "failure_type": "assertion",
      "seed": 7,
      "duration_ms": 215,
      "replay_command": "faultbox test faultbox.star --test flaky --seed 7",
      "events": []
    }
  ]
}

Agent loop fields: Failed tests include replay_command (full CLI for deterministic replay) and failure_type ("assertion", "timeout", "service_start", or "error") for machine consumption.

Event Types (PObserve-Compatible)

Events use dotted event_type for PObserve compatibility:

Event TypeDescription
lifecycle.startedService process launched
lifecycle.readyHealthcheck passed
syscall.writewrite syscall intercepted
syscall.connectconnect syscall intercepted
syscall.openatopenat syscall intercepted
syscall.fsyncfsync syscall intercepted
step_send.<service>Test driver sent request to service
step_recv.<service>Test driver received response from service
fault_appliedFault rules activated on a service
fault_removedFault rules deactivated
proxy_conn_openTransparent proxy accepted client + dialed upstream (RFC-034)
proxy_conn_closeProxy connection terminated; carries bytes_c2s / bytes_s2c / duration_ms / reason
proxy_handshake_completeProtocol-aware proxy finished its auth/handshake phase (mysql, postgres, redis)
proxy_stallProxy direction blocked on pending bytes for ≥ stall threshold (default 5s warn, 30s extend)
stdoutService stdout line (when observe=[stdout(...)])
stderrService stderr line (when observe=[stderr(...)])

The partition_key field (default: service name) enables routing events to per-service PObserve monitor instances.

ShiViz Visualization (--shiviz trace.shiviz)

Produces a ShiViz-compatible trace file with vector clocks for visualizing causal relationships between services.

(?<host>\S+) (?<clock>\{.*\})

inventory {"inventory": 1}
lifecycle.started
orders {"orders": 1}
lifecycle.started
test {"test": 1}
step_send.orders post→orders
test {"test": 2, "inventory": 20, "orders": 15}
step_recv.orders post→orders

Vector clocks track causality:

  • Each service increments its own clock on every syscall
  • When service A makes a network call, remote clocks are merged
  • When the test driver receives a step response, the target service’s clock merges

Open the .shiviz file at https://bestchai.bitbucket.io/shiviz/ to see a space-time diagram with communication arrows between services.


CLI Summary

# Run tests
faultbox test faultbox.star                        # run all tests
faultbox test faultbox.star --test happy_path      # run one test
faultbox test faultbox.star --debug                # verbose logging
faultbox test faultbox.star --output trace.json    # JSON trace with events
faultbox test faultbox.star --shiviz trace.shiviz  # ShiViz visualization
faultbox test faultbox.star --normalize trace.norm # deterministic trace fingerprint

# Counterexample discovery (P-lang style)
faultbox test faultbox.star --runs 100 --show fail # run 100x, show failures only
faultbox test faultbox.star --seed 42              # replay with specific seed

# Exhaustive interleaving exploration
faultbox test faultbox.star --explore=all           # try all permutations (K!)
faultbox test faultbox.star --explore=sample         # 100 random orderings (default)
faultbox test faultbox.star --explore=sample --runs 500  # 500 random orderings

# Virtual time (skip fault delays)
faultbox test faultbox.star --virtual-time          # instant delay faults

# Compare traces
faultbox diff trace1.norm trace2.norm              # verify determinism

# Scaffolding
faultbox init --name orders --port 8080 ./order-svc  # generate starter .star
faultbox init --from-compose docker-compose.yml      # generate from compose
faultbox init --claude                                # Claude Code integration
faultbox init --vscode                                # VS Code autocomplete

# Generate failure scenarios
faultbox generate faultbox.star                       # per-scenario fault files
faultbox generate faultbox.star --dry-run             # preview without writing

# Structured output (for LLM agents / CI)
faultbox test faultbox.star --format json             # JSON to stdout

# MCP server (for Claude Code, Cursor, etc.)
faultbox mcp                                          # start MCP server on stdio

# Maintenance
faultbox self-update                                  # update to latest release
faultbox --version                                    # print version

Exit Codes

CodeMeaning
0All tests passed
1Faultbox error (bad config, load failure, etc.)
2One or more tests failed

Trace Summary

After each test, Faultbox prints a compact trace summary showing only fault events (non-allow decisions). Failed tests include seed for deterministic replay:

--- PASS: test_happy_path (225ms, seed=0) ---
  syscall trace (99 events):

--- PASS: test_inventory_slow (1724ms, seed=0) ---
  syscall trace (70 events):
    #57  inventory    write      delay(500ms)  (+500ms)
    #69  inventory    write      delay(500ms)  (+500ms)

--- FAIL: test_flaky_network (215ms, seed=7) ---
  reason: assert_true: expected 200 or 503, got 0
  replay: faultbox test faultbox.star --test flaky_network --seed 7
  syscall trace (46 events):
    #50  orders       connect    deny(connection refused)

Protocol Extensibility (Roadmap)

LayerExamplesStatus
L4 CoretcpBuilt-in
L7 StdlibhttpBuilt-in
L7 Extensionsgrpc, postgres, kafka, redisFuture: Starlark modules

Protocol modules implement the same step interface. Usage won’t change:

# Future: redis.star loaded as module
cache.set(key="session:123", value="active")
cache.get(key="session:123")

State Machines and Hooks (Roadmap)

Services will support state machines with lifecycle hooks:

db = service("db", "/tmp/mock-db",
    interface("main", "tcp", 5432),
    states = ["starting", "ready", "degraded", "failed"],
    on_init = db_init,
    on_syscall = db_on_syscall,
)

def db_on_syscall(ctx, deps):
    if ctx.call.name == "write" and ctx.this.state == "degraded":
        return delay("2s")
    return allow()

Hooks receive a context with:

  • ctx.this — current service (state, name, interfaces)
  • ctx.call — syscall context (name, args, counter)
  • ctx.log — global event log (emit + query)
  • deps — dependency map

Monitors (basic monitor() builtin) are already implemented — see the Monitors section above. State machine hooks will extend monitors with per-service state tracking and lifecycle-aware fault decisions.