Kafka¶
Real Kafka in a container or an existing cluster. Publish from tests, read directly with test consumers, and assert app-produced or app-consumed messages when the Stove Kafka bridge is wired into the AUT.
Kafka — wizard-synced snippet
Gradle
Stove configuration
Stove().with {
kafka {
KafkaSystemOptions(
serde = StoveSerde.jackson.anyByteArraySerde(),
configureExposedConfiguration = { cfg ->
listOf(
"spring.kafka.bootstrap-servers=${cfg.bootstrapServers}",
"spring.kafka.producer.properties.interceptor.classes=${cfg.interceptorClass}",
"spring.kafka.consumer.properties.interceptor.classes=${cfg.interceptorClass}"
)
}
)
}
}
Test DSL
Two modes
Standalone (stove-kafka). Plain Kafka client, works with any framework. Spring integration (stove-spring-kafka). Extra assertions for Spring's Kafka listeners. Assertions such as shouldBePublished and shouldBeConsumed rely on the Stove Kafka bridge being wired into your app's producer/consumer path so Stove can observe what the AUT publishes and consumes.
Bridge interceptor (required)¶
Stove can only assert app-side Kafka activity it can observe. For JVM Kafka clients, put the bridge interceptor on your app's producer and consumer interceptor lists. For non-JVM apps, use the language-specific bridge or report equivalent producer/consumer events yourself.
// expose Stove's interceptor class via property
"kafka.interceptorClasses=${cfg.interceptorClass}"
// or hardcode (less flexible):
"kafka.interceptorClasses=com.trendyol.stove.standalone.kafka.intercepting.StoveKafkaBridge"
The kafka.interceptorClasses prefix is whatever your app reads. Mirror your app's property names; Stove does not rewrite application configuration keys for you.
Standalone setup¶
Stove().with {
kafka {
KafkaSystemOptions(
serde = StoveSerde.jackson.anyByteArraySerde(),
configureExposedConfiguration = { cfg ->
listOf(
"kafka.bootstrapServers=${cfg.bootstrapServers}",
"kafka.interceptorClasses=${cfg.interceptorClass}"
)
}
)
}
}.run()
Custom serde:
val mapper = ObjectMapper().apply { /* your app's config */ }
kafka {
KafkaSystemOptions(
serde = StoveSerde.jackson.anyByteArraySerde(mapper),
/* ... */
)
}
Spring integration¶
When testing a Spring Boot service with Spring Kafka listeners, use the dedicated starter. Adds listener-aware assertions on top.
Register the interceptor bean for the AUT:
Test-friendly settings¶
Default Kafka client settings are tuned for production throughput, not test feedback. Without test-specific batching, offset, and commit settings, shouldBePublished / shouldBeConsumed can flake or time out.
# producer
linger.ms=0
batch.size=1
# consumer
auto.commit.interval.ms=100
auto-offset-reset=earliest
Plus broker-level auto-topic-create (handy for parameterized topic names). Wire these via the AUT's Kafka config, not via Stove options.
Test DSL¶
Publishing from the test¶
stove {
kafka {
publish(
topic = "orders.created",
message = OrderCreatedEvent(id = "1"),
key = "1",
headers = mapOf("X-Correlation-ID" to "abc")
)
}
}
Asserting published¶
stove {
kafka {
shouldBePublished<OrderCreatedEvent> {
actual.id == "1"
}
// Negative assertion: nothing matches in N seconds
shouldNotBePublished<OrderFailedEvent> {
actual.id == "1"
}
}
}
How it works under the hood:
Asserting consumed (Spring integration)¶
stove {
kafka {
publish("orders.input", incomingOrder)
shouldBeConsumed<OrderInputEvent> {
actual.id == incomingOrder.id
}
}
}
How shouldBeConsumed flows across test, broker, app, and bridge:
Testing retry / failure paths¶
stove {
kafka {
publish("orders.input", invalidOrder)
// App's listener should requeue / DLT
shouldBePublished<DLTRecord<OrderInputEvent>> {
actual.original.id == invalidOrder.id
}
}
}
Working with metadata¶
stove {
kafka {
shouldBePublished<OrderCreatedEvent> {
metadata.topic == "orders.created" &&
metadata.headers["X-Correlation-ID"] == "abc"
}
}
}
metadata exposes topic, partition, offset, key, headers, timestamp.
Peek the in-flight stream¶
stove {
kafka {
val all = peek<OrderCreatedEvent>(topic = "orders.created", limit = 50)
all.map { it.actual.id } shouldContain "1"
}
}
Admin operations¶
stove {
kafka {
admin().createTopics(NewTopic("audit", 3, 1))
admin().listTopics().names().get() shouldContain "audit"
}
}
Complete example¶
test("order placement publishes events end-to-end") {
stove {
val orderId = UUID.randomUUID().toString()
http {
postAndExpectBody<OrderResponse>(
uri = "/orders",
body = CreateOrderRequest(id = orderId).some()
) { it.status shouldBe 201 }
}
kafka {
shouldBePublished<OrderCreatedEvent> {
actual.id == orderId &&
actual.status == "CREATED"
}
}
}
}
Provided Kafka cluster¶
For shared CI clusters: KafkaSystemOptions.provided(bootstrapServers = ...). Add cleanup of test topics. See Provided Instances · Kafka isolation.
Pairs well with¶
- Tracing. Kafka spans appear with topic + partition attributes
- Bridge. Register custom interceptor beans (or replace them per test)
- Recipes · order flow. Multi-system Kafka assertion
- Quarkus. Quarkus needs a classloader tweak (see that page)