On this page

Postgres Protocol Reference

Interface declaration:

db = service("postgres",
    interface("main", "postgres", 5432),
    image = "postgres:16",
    env = {"POSTGRES_PASSWORD": "test", "POSTGRES_DB": "testdb"},
    healthcheck = tcp("localhost:5432"),
)

Methods

query(sql="", connstr="")

Execute a SQL query that returns rows (SELECT, RETURNING).

resp = db.main.query(sql="SELECT * FROM users WHERE id=1")
# resp.data = [{"id": 1, "name": "alice", "email": "alice@example.com"}]

resp = db.main.query(sql="SELECT count(*) as n FROM orders")
# resp.data = [{"n": 42}]

# Custom connection string (overrides default)
resp = db.main.query(sql="SELECT 1", connstr="postgres://user:pass@host/db")
ParameterTypeDefaultDescription
sqlstringrequiredSQL query to execute
connstrstringautoConnection string (auto-generated from service address)

exec(sql="", connstr="")

Execute a SQL statement that doesn’t return rows (INSERT, UPDATE, DELETE, DDL).

resp = db.main.exec(sql="INSERT INTO users (name) VALUES ('bob')")
# resp.data = {"rows_affected": 1}

resp = db.main.exec(sql="CREATE TABLE IF NOT EXISTS orders (id SERIAL, item TEXT)")
# resp.data = {"rows_affected": 0}

resp = db.main.exec(sql="DELETE FROM orders WHERE status='cancelled'")
# resp.data = {"rows_affected": 5}
ParameterTypeDefaultDescription
sqlstringrequiredSQL statement to execute
connstrstringautoConnection string

Response Object

For query():

FieldTypeDescription
.datalist of dictsRows as [{"col": value}, ...]
.statusint0 (success)
.okboolTrue if query succeeded
.duration_msintExecution time

For exec():

FieldTypeDescription
.datadict{"rows_affected": N}
.statusint0 (success)
.okboolTrue if statement succeeded
.duration_msintExecution time

Fault Rules

error(query=, message=)

Reject matching queries with a Postgres error.

insert_fail = fault_assumption("insert_fail",
    target = db.main,
    rules = [error(query="INSERT*", message="disk full")],
)

select_fail = fault_assumption("select_fail",
    target = db.main,
    rules = [error(query="SELECT * FROM orders*", message="relation does not exist")],
)
ParameterTypeDescription
querystringSQL query glob pattern
messagestringPostgres error message to return

delay(query=, delay=)

Delay matching queries.

slow_queries = fault_assumption("slow_queries",
    target = db.main,
    rules = [delay(query="SELECT*", delay="3s")],
)

drop(query=)

Close connection when matching query arrives.

drop_inserts = fault_assumption("drop_inserts",
    target = db.main,
    rules = [drop(query="INSERT*")],
)

Syscall-level faults

For broad infrastructure failures:

disk_full = fault_assumption("disk_full",
    target = db,  # ServiceDef, not InterfaceRef
    write = deny("ENOSPC"),
)

disk_error = fault_assumption("disk_error",
    target = db,
    write = deny("EIO"),
)

Seed / Reset Patterns

Schema + seed data

def seed_postgres():
    db.main.exec(sql="CREATE TABLE IF NOT EXISTS users (id SERIAL PRIMARY KEY, name TEXT)")
    db.main.exec(sql="CREATE TABLE IF NOT EXISTS orders (id SERIAL PRIMARY KEY, user_id INT, item TEXT, status TEXT)")
    db.main.exec(sql="INSERT INTO users (name) VALUES ('alice'), ('bob')")

def reset_postgres():
    db.main.exec(sql="TRUNCATE users, orders RESTART IDENTITY CASCADE")

db = service("postgres",
    interface("main", "postgres", 5432),
    image = "postgres:16",
    env = {"POSTGRES_PASSWORD": "test", "POSTGRES_DB": "testdb"},
    healthcheck = tcp("localhost:5432"),
    reuse = True,
    seed = seed_postgres,
    reset = reset_postgres,
)

Heavy migrations + light reset

def seed_full():
    """Run migrations + large seed. Slow (~5s)."""
    db.main.exec(sql=open("./migrations.sql").read())
    db.main.exec(sql=open("./seed.sql").read())

def reset_fast():
    """Truncate data only. Fast (~100ms)."""
    db.main.exec(sql="TRUNCATE orders, payments, inventory RESTART IDENTITY CASCADE")

db = service("postgres", ...,
    reuse = True,
    seed = seed_full,
    reset = reset_fast,
)

Event Sources

WAL Stream

Captures Postgres Write-Ahead Log changes in real-time:

db = service("postgres",
    interface("main", "postgres", 5432),
    image = "postgres:16",
    env = {"POSTGRES_PASSWORD": "test"},
    healthcheck = tcp("localhost:5432"),
    observe = [wal_stream(tables=["orders", "users"])],
)

WAL events have type "wal" with fields:

FieldTypeDescription
opstringINSERT, UPDATE, DELETE, BEGIN, COMMIT
tablestringTable name
datadictRow data (auto-decoded)
# Verify a row was inserted
assert_eventually(where=lambda e:
    e.type == "wal" and e.data.get("table") == "orders"
    and e.data.get("op") == "INSERT")

# Verify no writes during fault
assert_never(where=lambda e:
    e.type == "wal" and e.data.get("op") == "INSERT")

Data Integrity Patterns

No partial rows after error

fault_scenario("no_orphan_orders",
    scenario = create_order,
    faults = disk_error,
    expect = lambda r: (
        assert_true(r.status >= 500),
        assert_eq(
            db.main.query(sql="SELECT count(*) as n FROM orders WHERE status='pending'").data[0]["n"],
            0,
            "no orphaned rows"),
    ),
)

Verify INSERT was rejected

fault_scenario("insert_rejected",
    scenario = create_order,
    faults = insert_fail,
    expect = lambda r: (
        assert_true(r.status >= 500),
        assert_eventually(type="proxy", where=lambda e:
            "INSERT" in e.data.get("query", "") and e.data.get("action") == "error"),
    ),
)