Module kotlin-kafka

This project is still under development, andd started as a playground where I was playing around with Kafka in Kotlin
and the Kafka SDK whilst reading the Kafka book Definite Guide from Confluent.
https://www.confluent.io/resources/kafka-the-definitive-guide-v2/

Rationale

At the time of starting this repository I didn’t find any bindings between Kafka SDK and Kotlin suspension. These
operators should be implemented low-level, so they can guarantee correct cancellation support, and high optimised
runtimes.

Goals

  • Lean Core library built on top of Kotlin Std & KotlinX Coroutines (possible extensions with Arrow in additional
    module)
  • Extensions to easily operate over the Kafka SDK with KotlinX Coroutines and suspend.
  • Flow based operators, so you can easily compose KotlinX Flow based Kafka programs
  • example for testing Kafka with Test Containers in Kotlin.

Example

@JvmInline
value class Key(val index: Int)

@JvmInline
value class Message(val content: String)

fun main(): Unit =
  runBlocking(Default) {
    val topicName = "test-topic"
    val msgCount = 10
    val kafka = Kafka.container

    Admin(AdminSettings(kafka.bootstrapServers)).use { client ->
      client.createTopic(NewTopic(topicName, 1, 1))
    }

    coroutineScope { // Run produces and consumer in a single scope
      launch { // Send 20 messages, and then close the producer
        val settings: ProducerSettings<Key, Message> =
          ProducerSettings(
            kafka.bootstrapServers,
            IntegerSerializer().imap { key: Key -> key.index },
            StringSerializer().imap { msg: Message -> msg.content },
            Acks.All
          )
        (1..msgCount)
          .map { index -> ProducerRecord(topicName, Key(index), Message("msg: $index")) }
          .asFlow()
          .produce(settings)
          .collect(::println)
      }

      launch { // Consume 20 messages as a stream, and then close the consumer
        val settings: ConsumerSettings<Key, Message> =
          ConsumerSettings(
            kafka.bootstrapServers,
            IntegerDeserializer().map(::Key),
            StringDeserializer().map(::Message),
            groupId = UUID.randomUUID().toString(),
            autoOffsetReset = AutoOffsetReset.Earliest
          )
        kafkaConsumer(settings)
          .subscribeTo(topicName)
          .take(msgCount)
          .map { "${it.key()} -> ${it.value()}" }
          .collect(::println)
      }
    }
  }

You can get the full code here.

test-topic-0@0
test-topic-0@1
test-topic-0@2
test-topic-0@3
test-topic-0@4
test-topic-0@5
test-topic-0@6
test-topic-0@7
test-topic-0@8
test-topic-0@9
Key(index=1) -> Message(content=msg: 1)
Key(index=2) -> Message(content=msg: 2)
Key(index=3) -> Message(content=msg: 3)
Key(index=4) -> Message(content=msg: 4)
Key(index=5) -> Message(content=msg: 5)
Key(index=6) -> Message(content=msg: 6)
Key(index=7) -> Message(content=msg: 7)
Key(index=8) -> Message(content=msg: 8)
Key(index=9) -> Message(content=msg: 9)
Key(index=10) -> Message(content=msg: 10)

GitHub

View Github