Skip to content

Kafka

Real Kafka in a container or an existing cluster. Publish from tests, read directly with test consumers, and assert app-produced or app-consumed messages when the Stove Kafka bridge is wired into the AUT.

Open in setup wizard

Kafka — wizard-synced snippet

Gradle

testImplementation("com.trendyol:stove-kafka")

Stove configuration

Stove().with {
    kafka {
      KafkaSystemOptions(
        serde = StoveSerde.jackson.anyByteArraySerde(),
        configureExposedConfiguration = { cfg ->
          listOf(
            "spring.kafka.bootstrap-servers=${cfg.bootstrapServers}",
            "spring.kafka.producer.properties.interceptor.classes=${cfg.interceptorClass}",
            "spring.kafka.consumer.properties.interceptor.classes=${cfg.interceptorClass}"
          )
        }
      )
    }
}

Test DSL

stove {
    kafka {
      shouldBePublished<Map<String, Any>> {
        actual["type"] == "created"
      }
    }
}

Two modes Standalone (stove-kafka). Plain Kafka client, works with any framework. Spring integration (stove-spring-kafka). Extra assertions for Spring's Kafka listeners. Assertions such as shouldBePublished and shouldBeConsumed rely on the Stove Kafka bridge being wired into your app's producer/consumer path so Stove can observe what the AUT publishes and consumes.

Bridge interceptor (required)

Stove can only assert app-side Kafka activity it can observe. For JVM Kafka clients, put the bridge interceptor on your app's producer and consumer interceptor lists. For non-JVM apps, use the language-specific bridge or report equivalent producer/consumer events yourself.

// expose Stove's interceptor class via property
"kafka.interceptorClasses=${cfg.interceptorClass}"
// or hardcode (less flexible):
"kafka.interceptorClasses=com.trendyol.stove.standalone.kafka.intercepting.StoveKafkaBridge"

The kafka.interceptorClasses prefix is whatever your app reads. Mirror your app's property names; Stove does not rewrite application configuration keys for you.

Standalone setup

Stove().with {
  kafka {
    KafkaSystemOptions(
      serde = StoveSerde.jackson.anyByteArraySerde(),
      configureExposedConfiguration = { cfg ->
        listOf(
          "kafka.bootstrapServers=${cfg.bootstrapServers}",
          "kafka.interceptorClasses=${cfg.interceptorClass}"
        )
      }
    )
  }
}.run()

Custom serde:

val mapper = ObjectMapper().apply { /* your app's config */ }

kafka {
  KafkaSystemOptions(
    serde = StoveSerde.jackson.anyByteArraySerde(mapper),
    /* ... */
  )
}

Spring integration

When testing a Spring Boot service with Spring Kafka listeners, use the dedicated starter. Adds listener-aware assertions on top.

dependencies {
  testImplementation("com.trendyol:stove-spring-kafka")
}

Register the interceptor bean for the AUT:

springBoot(runner = { params ->
  runApplication<MyApp>(*params) {
    addTestDependencies {
      bean<TestSystemKafkaInterceptor<*, *>>(isPrimary = true)
      bean { StoveSerde.jackson.anyByteArraySerde() }
    }
  }
})
springBoot(runner = { params ->
  runApplication<MyApp>(*params) {
    addTestDependencies4x {
      registerBean<TestSystemKafkaInterceptor<*, *>>(primary = true)
      registerBean { StoveSerde.jackson.anyByteArraySerde() }
    }
  }
})

Test-friendly settings

Default Kafka client settings are tuned for production throughput, not test feedback. Without test-specific batching, offset, and commit settings, shouldBePublished / shouldBeConsumed can flake or time out.

# producer
linger.ms=0
batch.size=1

# consumer
auto.commit.interval.ms=100
auto-offset-reset=earliest

Plus broker-level auto-topic-create (handy for parameterized topic names). Wire these via the AUT's Kafka config, not via Stove options.

Test DSL

Publishing from the test

stove {
  kafka {
    publish(
      topic = "orders.created",
      message = OrderCreatedEvent(id = "1"),
      key = "1",
      headers = mapOf("X-Correlation-ID" to "abc")
    )
  }
}

Asserting published

stove {
  kafka {
    shouldBePublished<OrderCreatedEvent> {
      actual.id == "1"
    }

    // Negative assertion: nothing matches in N seconds
    shouldNotBePublished<OrderFailedEvent> {
      actual.id == "1"
    }
  }
}

How it works under the hood:

Asserting consumed (Spring integration)

stove {
  kafka {
    publish("orders.input", incomingOrder)

    shouldBeConsumed<OrderInputEvent> {
      actual.id == incomingOrder.id
    }
  }
}

How shouldBeConsumed flows across test, broker, app, and bridge:

Testing retry / failure paths

stove {
  kafka {
    publish("orders.input", invalidOrder)

    // App's listener should requeue / DLT
    shouldBePublished<DLTRecord<OrderInputEvent>> {
      actual.original.id == invalidOrder.id
    }
  }
}

Working with metadata

stove {
  kafka {
    shouldBePublished<OrderCreatedEvent> {
      metadata.topic == "orders.created" &&
        metadata.headers["X-Correlation-ID"] == "abc"
    }
  }
}

metadata exposes topic, partition, offset, key, headers, timestamp.

Peek the in-flight stream

stove {
  kafka {
    val all = peek<OrderCreatedEvent>(topic = "orders.created", limit = 50)
    all.map { it.actual.id } shouldContain "1"
  }
}

Admin operations

stove {
  kafka {
    admin().createTopics(NewTopic("audit", 3, 1))
    admin().listTopics().names().get() shouldContain "audit"
  }
}

Complete example

test("order placement publishes events end-to-end") {
  stove {
    val orderId = UUID.randomUUID().toString()

    http {
      postAndExpectBody<OrderResponse>(
        uri = "/orders",
        body = CreateOrderRequest(id = orderId).some()
      ) { it.status shouldBe 201 }
    }

    kafka {
      shouldBePublished<OrderCreatedEvent> {
        actual.id == orderId &&
          actual.status == "CREATED"
      }
    }
  }
}

Provided Kafka cluster

For shared CI clusters: KafkaSystemOptions.provided(bootstrapServers = ...). Add cleanup of test topics. See Provided Instances · Kafka isolation.

Pairs well with

  • Tracing. Kafka spans appear with topic + partition attributes
  • Bridge. Register custom interceptor beans (or replace them per test)
  • Recipes · order flow. Multi-system Kafka assertion
  • Quarkus. Quarkus needs a classloader tweak (see that page)