Kafka Router
The Kafka Router allows you to consume data from multiple Kafka instances / topics and route them to other Kafka instances / topics.
Use Cases
- clone Kafka data e.g. consume data from a demo environment and replicate it to your local Kafka.
- proxy external Kafka instances e.g. have one whitelisted host consume from external Kafka sources and replicate data to internal Kafka instances, can also go the other way by routing data from internal Kafka sources to external Kafka instances.
- merge messages from multiple topics into one aggregate topic.
Environment variables
Two environment variables are used:
CONFIG_FILE
: the path to the config file. Default:/config/config.yaml
. The config file can be YAML (*.yaml
,*.yml
) or JSON (*.json
).RESSOURCE_PATH
: the path to the directory for additional resources (truststores, keystores, etc.). Default:/config
.
Configuration
The config file format can be YAML (*.yaml
, *.yml
) or JSON (*.json
), auto-detected over the file suffix.
General config
General configuration:
Properties:
consumer-group
Name of the consumer group for the Kafka Router (allowing multiple instances running concurrently and resuming data processing from committed offsets). Default:kafka-router
backoff-strategy.backoff-time-seconds
When messages were read but could not be delivered (e.g. when the target Kafka is not reachable), the messages will not be committed in the source, but message delivery is retried instead. The backoff times (list of doubles for seconds) state after how many seconds a next attempt at delivering the messages should be made for the first, second, …, attempt. Default:0.5, 1, 2, 3, 5, 10
(after the 5th retry, wait 10 seconds before trying again).
consumer-group: kafka-router
backoff-strategy:
backoff-time-seconds: 1,2,3,4,5,10,30,60
Kafka instances
Configure the Kafka instances you want to work with (as a consumer or provider, or both).
Config prefix: kafka
: map of Kafka instance key (used for routing config) with Kafka properties.
Properties:
bootstrap-servers
: Kafka bootstrap servers, required.truststore-path
: for TLS connections: path to the X.509 truststore.truststore-password
: password of the truststore.keystore-path
: for MTLS connections: path to the X.509 keystore.keystore-password
: password of the keystore.properties
: additional Kafka properties, as key/value pairs
kafka:
city-winterthur:
bootstrap-servers: localhost:9092
country-switzerland:
bootstrap-servers: external.kafka-host.org:443
truststore-path: example-truststore.p12
truststore-password: SeCrEt-007!
keystore-path: example-keystore.p12
keystore-password: SeCrEt-007!
Message routing
Configure the message routing (from which sources you want to consume data, and where it should be published):
Config prefix: routes
: list of routes, with the following properties:
source
: Kafka instance key (see above) from where data should be consumed.source-topic
: Source topic name, regular expression (allowing to specify several source topics)target
: Kafka instance key (see above) to which data should be routed.target-topic
: Optional target topic. If absent, the same topic as the source of the message is used.
routes:
# copy the sales topic from winterthur to switzerland
- source: city-winterthur
source-topic: sales
target: country-switzerland
target-topic: sales-winterthur
# copy the sales topic from chur to switzerland
- source: city-chur
source-topic: sales
target: country-switzerland
target-topic: sales-chur
# collect the sales from all locations to a sales-all topic
- source: country-switzerland
source-topic: sales-.+
target: country-switzerland
target-topic: sales-all
# collect all sales globally
- source: country-switzerland
source-topic: sales-all
target: global
target-topic: all-sales
Maven Build / run
Build with Maven (default goals: clean install
):
mvn
This builds an Uber JAR (portable Java application with libraries included), which you can run with:
java -jar target/kafka-router.jar
Docker build / run
Build the docker image:
mvn && docker build -t pwalser75/kafka-router .
Run the docker container:
docker run -it --volume ./config:/config pwalser75/kafka-router
Docker-compose example
Complete example with Kafka-Router, Kafka and Redpanda Console (Kafka viewer):
docker-compose up
or in detached mode:
docker-compose up -d
You can then view the logs with:
docker logs -f kafka-router
__ __ _____ ___ __
/ //_/__ _/ _/ /_____ _ / _ \___ __ __/ /____ ____
/ ,< / _ `/ _/ '_/ _ `/ / , _/ _ \/ // / __/ -_) __/
/_/|_|\_,_/_//_/\_\\_,_/ /_/|_|\___/\_,_/\__/\__/_/
https://github.com/pwalser75/kafka-router
13:15:49.439 INFO [main] | c.f.tools.kafkarouter.ResourceLoader - reading configuration from /config/config.yaml
13:15:49.593 INFO [main] | c.f.tools.kafkarouter.KafkaRouterMain - Configuring routes:
13:15:49.593 INFO [main] | c.f.tools.kafkarouter.KafkaRouterMain - - kafka (sales-winterthur) -> kafka (sales-switzerland)
13:15:49.712 INFO [main] | c.f.tools.kafkarouter.KafkaRouterMain - - kafka (sales-chur) -> kafka (sales-switzerland)
13:15:49.719 INFO [main] | c.f.tools.kafkarouter.KafkaRouterMain - Starting routes:
13:15:49.720 INFO [main] | c.f.tools.kafkarouter.KafkaRouter - Joining consumer group...
13:15:49.720 INFO [main] | c.f.tools.kafkarouter.KafkaRouter - Subscribing to initial topics:
13:15:52.758 INFO [main] | c.f.tools.kafkarouter.KafkaRouter - - sales-winterthur
13:15:52.759 INFO [main] | c.f.tools.kafkarouter.KafkaRouter - Joining consumer group...
13:15:52.759 INFO [main] | c.f.tools.kafkarouter.KafkaRouter - Subscribing to initial topics:
13:15:52.763 INFO [main] | c.f.tools.kafkarouter.KafkaRouter - - sales-chur
13:15:52.764 INFO [main] | c.f.tools.kafkarouter.KafkaRouterMain - Startup complete, ready to route messages...
The Redpanda Kafka Viewer is accessible over http://localhost:9000. Here you see the topics, and can also upload test messages (select topic, then Actions>Publish Message) and check if they’re routed.
The Kafka Router also adds an additional header X-Kafka-Router-Source
,
stating from which source / topic / partition / offset the messages was routed: