Simple as RabbitMQ, Robust as Apache Kafka, and Perfect for microservices.

Memphis UI

CNCF Silver Member

CNCF Silver Member

SandboxDocsTwitterYouTube

Discord Code Of Conduct GitHub release (latest by date)

Memphis is a next-generation message broker. A simple, robust, and durable cloud-native message broker wrapped with an entire ecosystem that enables fast and reliable development of next-generation event-driven use cases. Memphis enables building next-generation applications that require large volumes of streamed and enriched data, modern protocols, zero ops, rapid development, extreme cost reduction, and a significantly lower amount of dev time for data-oriented developers and data engineers.

Creator: @adrianNEMO Maintainer: @adrianNEMO, @Memphis team

Installation

After installing and running memphis broker, Add to the dependencies in your gradle file

implementation("TODO")

Importing

import dev.memphis.sdk.Memphis

Connecting to Memphis

val memphis = Memphis.connect("<memphis-host>", "<application type username>", "<broker-token>")

It is possible to pass connection configuration parameters.

val memphis = Memphis.connect("<memphis-host>", "<application type username>", "<broker-token>") {
    port = 6666
    autoReconnect = true
    maxReconnects = 3
    reconnectWait = 5.seconds
    connectionTimeout = 15.seconds
            
}

Once connected, all features offered by Memphis are available.

Disconnecting from Memphis

To disconnect from Memphis, call Close() on the Memphis connection object.

memphis.close()

Creating a Station

Stations can be created from Conn Passing optional parameters If a station already exists nothing happens, the new configuration will not be applied

val station = memphis.createStation("<station-name>")

val station = memphis.createStation("<station-name>") {
    retentionType = RetentionType.MAX_AGE_SECONDS
    retentionValue = 604800
    storageType = StorageType.DISK
    replicas = 1
    idempotencyWindow = 2.minutes
    schemaName = "<Schema Name>"
    sendPoisonMsgToDls = true
    sendSchemaFailedMsgToDls = true
}

Retention Types

Memphis currently supports the following types of retention:

RetentionType.MAX_AGE_SECONDS

The above means that every message persists for the value set in the retention value field (in seconds).

RetentionType.MESSAGES

The above means that after the maximum number of saved messages (set in retention value)has been reached, the oldest messages will be deleted.

RetentionType.BYTES

The above means that after maximum number of saved bytes (set in retention value)has been reached, the oldest messages will be deleted.

Storage Types

Memphis currently supports the following types of messages storage:

StorageType.DISK

The above means that messages persist on disk.

StorageType.MEMORY

The above means that messages persist on the main memory.

Destroying a Station

Destroying a station will remove all its resources (including producers and consumers).

station.Destroy()

Attaching a Schema to an Existing Station

memphis.attachSchema("<schema-name>", "<station-name>")

// Or from a station

station.attachSchema("<schema-name>")

Detaching a Schema from Station

memphis.detachSchema("<station-name>")

// Or from a station
station.detachSchema()

Produce and Consume Messages

The most common client operations are producing messages and consuming messages. Messages are published to a station and consumed from itby creating a consumer and consuming the resulting flow.Consumers are pull-based and consume all the messages in a station unless you are using a consumers group,in which case messages are spread across all members in this group. Memphis messages are payload agnostic. Payloads are ByteArray. In order to stop receiving messages, you have to call consumer.stopConsuming().The consumer will terminate regardless of whether there are messages in flight for the client.

Creating a Producer

val producer = memphis.producer("<station-name>", "<producer-name>") {
    genUniqueSuffix = false
}

Producing a message

producer.produce("<message in ByteArray or (schema validated station - protobuf) or ByteArray(schema validated station - json schema) or ByteArray (schema validated station - graphql schema)>") {
    ackWait = 15.seconds
    messageId = "<message Id>"
}

Add headers

producer.produce("<message in ByteArray or (schema validated station - protobuf) or ByteArray(schema validated station - json schema) or ByteArray (schema validated station - graphql schema)>") {
    headers.put("key", "value")
}

Async produce

Meaning your application won’t wait for broker acknowledgement – use only in case you are tolerant for data loss

producer.produceAsync("<message in ByteArray or (schema validated station - protobuf) or ByteArray(schema validated station - json schema) or ByteArray (schema validated station - graphql schema)>")

Message ID

Stations are idempotent by default for 2 minutes (can be configured), Idempotency achieved by adding a message id

producer.produce("<message in ByteArray or (schema validated station - protobuf) or ByteArray(schema validated station - json schema) or ByteArray (schema validated station - graphql schema)>") {
    messageId = "123"
}

Destroying a Producer

producer.destroy()

Creating a Consumer

val consumer = memphis.consumer("<station-name>", "<consumer-name>") {
    consumerGroup = "<consumer-group>"
    pullInterval = 1.seconds
    batchSize = 10
    batchMaxTimeToWait = 5.seconds
    maxAckTime = 30.seconds
    maxMsgDeliveries = 10
    genUniqueSuffix = false
}

Processing Messages

To consume messages you just need to collect the messages from the flow.

consumer.consume().collect {
    println("Received message:")
    println(it.data.toString(Charset.defaultCharset()))
    println(it.headers)
    println()
    it.ack()
}

If you need tighter control on when a new message should be fetched. subscribeMessages only fetches a message when an item in the flow is collected. It does not listen on DLS messages. You need to listen separately for DLS messages with subscribeDls

consumer.subscribeMessages().collect {
    println("Received message:")
    println(it.data.toString(Charset.defaultCharset()))
    println(it.headers)
    println()
    it.ack()
}

consumer.subscribeDls().collect {
    println("Received DLS message:")
    println(it.data.toString(Charset.defaultCharset()))
    println(it.headers)
    println()
    it.ack()
}

Acknowledging a Message

Acknowledging a message indicates to the Memphis server to not re-send the same message again to the same consumer or consumers group.

message.ack()

Get headers

Get headers per message

val headers = msg.headers

Destroying a Consumer

consumer.destroy()

GitHub

View Github