Sandbox – Docs – Twitter – YouTube
Memphis is a next-generation message broker. A simple, robust, and durable cloud-native message broker wrapped with an entire ecosystem that enables fast and reliable development of next-generation event-driven use cases. Memphis enables building next-generation applications that require large volumes of streamed and enriched data, modern protocols, zero ops, rapid development, extreme cost reduction, and a significantly lower amount of dev time for data-oriented developers and data engineers.
Creator: @adrianNEMO Maintainer: @adrianNEMO, @Memphis team
Installation
After installing and running memphis broker, Add to the dependencies in your gradle file
implementation("TODO")
Importing
import dev.memphis.sdk.Memphis
Connecting to Memphis
val memphis = Memphis.connect("<memphis-host>", "<application type username>", "<broker-token>")
It is possible to pass connection configuration parameters.
val memphis = Memphis.connect("<memphis-host>", "<application type username>", "<broker-token>") {
port = 6666
autoReconnect = true
maxReconnects = 3
reconnectWait = 5.seconds
connectionTimeout = 15.seconds
}
Once connected, all features offered by Memphis are available.
Disconnecting from Memphis
To disconnect from Memphis, call Close() on the Memphis connection object.
memphis.close()
Creating a Station
Stations can be created from Conn Passing optional parameters If a station already exists nothing happens, the new configuration will not be applied
val station = memphis.createStation("<station-name>")
val station = memphis.createStation("<station-name>") {
retentionType = RetentionType.MAX_AGE_SECONDS
retentionValue = 604800
storageType = StorageType.DISK
replicas = 1
idempotencyWindow = 2.minutes
schemaName = "<Schema Name>"
sendPoisonMsgToDls = true
sendSchemaFailedMsgToDls = true
}
Retention Types
Memphis currently supports the following types of retention:
RetentionType.MAX_AGE_SECONDS
The above means that every message persists for the value set in the retention value field (in seconds).
RetentionType.MESSAGES
The above means that after the maximum number of saved messages (set in retention value)has been reached, the oldest messages will be deleted.
RetentionType.BYTES
The above means that after maximum number of saved bytes (set in retention value)has been reached, the oldest messages will be deleted.
Storage Types
Memphis currently supports the following types of messages storage:
StorageType.DISK
The above means that messages persist on disk.
StorageType.MEMORY
The above means that messages persist on the main memory.
Destroying a Station
Destroying a station will remove all its resources (including producers and consumers).
station.Destroy()
Attaching a Schema to an Existing Station
memphis.attachSchema("<schema-name>", "<station-name>")
// Or from a station
station.attachSchema("<schema-name>")
Detaching a Schema from Station
memphis.detachSchema("<station-name>")
// Or from a station
station.detachSchema()
Produce and Consume Messages
The most common client operations are producing messages and consuming messages.
Messages are published to a station and consumed from itby creating a consumer and consuming the resulting flow.Consumers are pull-based and consume all the messages in a station unless you are using a consumers group,in which case messages are spread across all members in this group.
Memphis messages are payload agnostic. Payloads are ByteArray
.
In order to stop receiving messages, you have to call consumer.stopConsuming()
.The consumer will terminate regardless of whether there are messages in flight for the client.
Creating a Producer
val producer = memphis.producer("<station-name>", "<producer-name>") {
genUniqueSuffix = false
}
Producing a message
producer.produce("<message in ByteArray or (schema validated station - protobuf) or ByteArray(schema validated station - json schema) or ByteArray (schema validated station - graphql schema)>") {
ackWait = 15.seconds
messageId = "<message Id>"
}
Add headers
producer.produce("<message in ByteArray or (schema validated station - protobuf) or ByteArray(schema validated station - json schema) or ByteArray (schema validated station - graphql schema)>") {
headers.put("key", "value")
}
Async produce
Meaning your application won’t wait for broker acknowledgement – use only in case you are tolerant for data loss
producer.produceAsync("<message in ByteArray or (schema validated station - protobuf) or ByteArray(schema validated station - json schema) or ByteArray (schema validated station - graphql schema)>")
Message ID
Stations are idempotent by default for 2 minutes (can be configured), Idempotency achieved by adding a message id
producer.produce("<message in ByteArray or (schema validated station - protobuf) or ByteArray(schema validated station - json schema) or ByteArray (schema validated station - graphql schema)>") {
messageId = "123"
}
Destroying a Producer
producer.destroy()
Creating a Consumer
val consumer = memphis.consumer("<station-name>", "<consumer-name>") {
consumerGroup = "<consumer-group>"
pullInterval = 1.seconds
batchSize = 10
batchMaxTimeToWait = 5.seconds
maxAckTime = 30.seconds
maxMsgDeliveries = 10
genUniqueSuffix = false
}
Processing Messages
To consume messages you just need to collect the messages from the flow.
consumer.consume().collect {
println("Received message:")
println(it.data.toString(Charset.defaultCharset()))
println(it.headers)
println()
it.ack()
}
If you need tighter control on when a new message should be fetched. subscribeMessages
only fetches a message when an item in the flow is collected.
It does not listen on DLS messages. You need to listen separately for DLS messages with subscribeDls
consumer.subscribeMessages().collect {
println("Received message:")
println(it.data.toString(Charset.defaultCharset()))
println(it.headers)
println()
it.ack()
}
consumer.subscribeDls().collect {
println("Received DLS message:")
println(it.data.toString(Charset.defaultCharset()))
println(it.headers)
println()
it.ack()
}
Acknowledging a Message
Acknowledging a message indicates to the Memphis server to not re-send the same message again to the same consumer or consumers group.
message.ack()
Get headers
Get headers per message
val headers = msg.headers
Destroying a Consumer
consumer.destroy()