On this page

Kafka Protocol Reference

Interface declaration:

kafka = service("kafka",
    interface("broker", "kafka", 9092),
    image = "confluentinc/cp-kafka:7.6",
    healthcheck = tcp("localhost:9092"),
)

Methods

publish(topic="", data="", key="")

Publish a message to a topic.

kafka.broker.publish(topic="order-events", data='{"id":1,"action":"created"}', key="order-1")
kafka.broker.publish(topic="notifications", data="hello world")
ParameterTypeDefaultDescription
topicstringrequiredTopic name
datastringrequiredMessage value (body)
keystring""Message key (for partitioning)

Response:

resp = kafka.broker.publish(topic="events", data="test")
# resp.data = {"published": true, "topic": "events"}

consume(topic="", group="faultbox")

Consume one message from a topic.

resp = kafka.broker.consume(topic="order-events")
# resp.data = {
#   "topic": "order-events",
#   "partition": 0,
#   "offset": 42,
#   "key": "order-1",
#   "value": "{\"id\":1,\"action\":\"created\"}"
# }
ParameterTypeDefaultDescription
topicstringrequiredTopic to consume from
groupstring"faultbox"Consumer group ID

Response fields:

FieldTypeDescription
.data["topic"]stringTopic name
.data["partition"]intPartition number
.data["offset"]intMessage offset
.data["key"]stringMessage key
.data["value"]stringMessage value

Fault Rules

drop(topic=)

Drop messages matching the topic — the producer thinks it published but the message is lost.

message_loss = fault_assumption("message_loss",
    target = kafka.broker,
    rules = [drop(topic="order-events")],
)

delay(topic=, delay=)

Delay message delivery.

slow_broker = fault_assumption("slow_broker",
    target = kafka.broker,
    rules = [delay(topic="*", delay="3s")],
)

duplicate(topic=)

Duplicate messages — the consumer sees each message twice.

duplicates = fault_assumption("duplicates",
    target = kafka.broker,
    rules = [duplicate(topic="order-events")],
)

Seed / Reset Patterns

Kafka topics are append-only — you can’t truncate them. Reset strategies:

# Option 1: Use unique topic names per test run (no reset needed)
import time
TOPIC = "orders-" + str(int(time.time()))

# Option 2: Use consumer group offsets (consume from latest)
def reset_kafka():
    # Publish a marker, then consume until you see it
    kafka.broker.publish(topic="orders", data='{"marker":"reset"}')

# Option 3: Don't reuse Kafka (default — recreate between tests)
kafka = service("kafka",
    interface("broker", "kafka", 9092),
    image = "confluentinc/cp-kafka:7.6",
    # reuse=False (default) — topic state resets with container
)

Tip: For most fault tests, reuse=False (default) is simplest — each test gets a fresh Kafka with empty topics.

Event Sources

Topic observer

Capture all messages on a topic in the event log:

kafka = service("kafka",
    interface("broker", "kafka", 9092),
    image = "confluentinc/cp-kafka:7.6",
    observe = [topic("order-events", decoder=json_decoder())],
)

Topic events have type "topic" with fields:

FieldTypeDescription
topicstringTopic name
partitionintPartition number
keystringMessage key
valuestringRaw message value
datadictAuto-decoded JSON (if decoder set)
# Check a message was published
assert_eventually(where=lambda e:
    e.type == "topic" and e.data.get("topic") == "order-events"
    and e.data.get("action") == "created")

# Check NO message was published (on error)
assert_never(where=lambda e:
    e.type == "topic" and e.data.get("topic") == "order-events")

Data Integrity Patterns

No orphan events (publish without DB commit)

def no_orphan_events(event):
    if event["type"] == "topic" and event.get("order_id"):
        rows = db.main.query(
            sql="SELECT count(*) as n FROM orders WHERE id='" + event["order_id"] + "'"
        ).data[0]["n"]
        if rows == 0:
            fail("orphan Kafka event: order " + event["order_id"] + " not in DB")

orphan_check = monitor(no_orphan_events)

db_write_error = fault_assumption("db_write_error",
    target = db,
    write = deny("EIO"),
    monitors = [orphan_check],
)

No message loss

fault_scenario("no_message_loss",
    scenario = publish_and_consume,
    faults = consumer_slow,
    expect = lambda r: assert_eq(
        len(events(where=lambda e: e.type == "topic" and e.data.get("action") == "produce")),
        len(events(where=lambda e: e.type == "topic" and e.data.get("action") == "consume")),
        "every produced message must be consumed"),
)

Exactly-once delivery

fault_scenario("no_duplicates",
    scenario = publish_order,
    faults = broker_restart,
    expect = lambda r: (
        # Count unique order IDs in consumed messages
        assert_eq(
            len(events(where=lambda e: e.type == "topic" and e.data.get("topic") == "order-events")),
            1,
            "exactly one message for this order"),
    ),
)

Note on multi-process containers

Confluent Kafka images (cp-kafka, cp-zookeeper) use shell entrypoints that fork Java. Faultbox automatically falls back to no-seccomp mode for these — syscall-level faults don’t work, but protocol-level faults (via rules=) and event sources (via observe=) work normally.

# This WORKS (protocol-level, via proxy):
message_loss = fault_assumption("message_loss",
    target = kafka.broker,
    rules = [drop(topic="orders")],
)

# This does NOT work on Confluent images (syscall-level, needs seccomp):
# disk_error = fault_assumption("disk_error", target=kafka, write=deny("EIO"))