From Kafka Streams To Kafka Connectors
Demo – From Kafka Streams To Kafka Connectors
This Repository is created to help those who wants to set up Confluent Kafka Cluster, MySql in Docker Desktop. Then produce the messages using Kafka Producer Template, then these messages will be processed using Kafka Streams Processor and then posted to MySql using Kafka Connect API (with JDBC Connector)
I have used below tech stack
-
Spring Boot
-
Spring Integration
-
Spring Kafka
-
Kafka streams
-
Confluent Kafka [Connect – JDBC Connector, Schema Registry, Avros]
-
Kotlin
REST Ops
# Available Connectors
http://localhost:8083/connectors
#Installed Plugins
http://localhost:8083/connector-plugins
#Install a JDBC Connector Plugin
curl -X POST \
-H "Content-Type: application/json" \
--data '{ "name" : "mysql-sink-connector", "config" : { "connection.url" : "jdbc:mysql://mysql:3306/deals_db", "connection.user" : "<replace_username>", "connection.password" : "<replace_password>", "connection.attempts" : "3", "connection.backoff.ms" : "5000", "table.whitelist" : "deals_db.ALERTSUBSCRIPTION", "table.name.format" : "deals_db.ALERTSUBSCRIPTION", "db.timezone" : "UTC", "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector", "dialect.name" : "MySqlDatabaseDialect", "auto.create" : "true", "auto.evolve" : "true", "tasks.max" : "1", "batch.size" : "10", "topics" : "deals_topic_processed", "value.converter.schema.enable":"true", "key.converter.schemas.enable": "false", "key.converter" :"org.apache.kafka.connect.storage.StringConverter", "value.converter" : "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url" : "http://schema-registry:8081", "insert.mode" : "INSERT", "pk.mode" : "none" }}' \
http://localhost:8083/connectors
#Get Installed Plugin Status
http://localhost:8083/connectors/mysql-sink-connector/status
Maven Commands
- Setup the Environment i.e. Kafka & MySQL with below command
$> docker compose up -d
- Start Kafka Producer with below command
$> cd kafka-producer
$> mvn spring-boot:run -Dspring.profiles.active=avro
- If you just want to consume above messages start consumer app
$> cd kafka-consumer
$> mvn spring-boot:run -Dspring.profiles.active=avro
- To post to messages to Database using JDBC Connector, start the processor app with below command
$> cd stream-processor
$> mvn spring-boot:run -Dspring.profiles.active=avro
Open Database Explorer of your own choice (I am using IntelliJ – Database Tool)
Query the Table that you have provided in schema
You should see data flowing to Mysql
Helpful References
Database Identifiers, Case Sensitivity Guidelines
Troubleshooting JDBC Connectors
Kafka Connect Official Github Repo
Confluent Cluster Docker YML Reference
Confluent Kafka Examples – Demo Scene