Accounts2

crabzilla.v0.3.5 orange

This is an example of an application using Vert.x and Crabzilla

Status

Work in progress

Requirements

  • Java 11

  • Maven

  • Docker compose

Running

1 – Start Postgres:

docker-compose up

2 – Run the tests:

mvn clean verify

3 – Run the application:

mvn clean compile exec:java

Test scenarios

Core model unit tests

class AccountsSpecsTest : AnnotationSpec() {

  val id = UUID.randomUUID()
  val config = CommandControllerConfig("Account", accountEventHandler, { AccountCommandHandler() })

  @Test
  fun `opening an account with bonus credit`() {
    TestSpecification(config)
      .whenCommand(OpenAccount(id, 100.00))
      .then { it.state() shouldBe Account(id, 100.00) }
      .then { it.events() shouldBe listOf(AccountOpened(id, 100.00)) }
  }

  @Test
  fun `depositing money`() {
    TestSpecification(config)
      .givenEvents(AccountOpened(id, 100.00))
      .whenCommand(DepositMoney(200.00))
      .then { it.state() shouldBe Account(id, 300.00) }
      .then {
        it.events() shouldBe listOf(
          AccountOpened(id, 100.00),
          MoneyDeposited(200.00, 300.00)
        )
      }
  }

  // etc...
}

Note: your core unit tests does not depends on vertx-core, only on crabzilla-core.

Integration tests

  • TODO

Manual tests

Pessimistic locking concurrent test

  • First, open an account 25a4b17c-3512-11ec-8d3d-0242ac130003 with $100.00

docker run --network host -i loadimpact/k6 run - <./k6/open-account-with-100.js

Then the snapshot state will be:

{"id": "25a4b17c-3512-11ec-8d3d-0242ac130003", "type": "Account", "balance":  100.00}

And the generated event:

{"id": "25a4b17c-3512-11ec-8d3d-0242ac130003", "type": "AccountOpened", "bonusCredit": 100.0}
  • Then try to withdraw $10.00 with 10 virtual users for 10 seconds:

docker run --network host -i loadimpact/k6 run --vus 10 --duration 10s - <./k6/withdraw-10-from-account.js

Then the account 25a4b17c-3512-11ec-8d3d-0242ac130003 state will be:

{"id": "25a4b17c-3512-11ec-8d3d-0242ac130003", "type": "Account"}

Now balance is omitted because its value is now zero. And the events for the stream are:

{
  "events": [
    {
      "event_payload": "{\"id\": \"25a4b17c-3512-11ec-8d3d-0242ac130003\", \"type\": \"AccountOpened\", \"bonusCredit\": 100.0}"
    },
    {
      "event_payload": "{\"type\": \"MoneyWithdrawn\", \"amount\": 10.0, \"finalBalance\": 90.0}"
    },
    {
      "event_payload": "{\"type\": \"MoneyWithdrawn\", \"amount\": 10.0, \"finalBalance\": 80.0}"
    },
    {
      "event_payload": "{\"type\": \"MoneyWithdrawn\", \"amount\": 10.0, \"finalBalance\": 70.0}"
    },
    {
      "event_payload": "{\"type\": \"MoneyWithdrawn\", \"amount\": 10.0, \"finalBalance\": 60.0}"
    },
    {
      "event_payload": "{\"type\": \"MoneyWithdrawn\", \"amount\": 10.0, \"finalBalance\": 50.0}"
    },
    {
      "event_payload": "{\"type\": \"MoneyWithdrawn\", \"amount\": 10.0, \"finalBalance\": 40.0}"
    },
    {
      "event_payload": "{\"type\": \"MoneyWithdrawn\", \"amount\": 10.0, \"finalBalance\": 30.0}"
    },
    {
      "event_payload": "{\"type\": \"MoneyWithdrawn\", \"amount\": 10.0, \"finalBalance\": 20.0}"
    },
    {
      "event_payload": "{\"type\": \"MoneyWithdrawn\", \"amount\": 10.0, \"finalBalance\": 10.0}"
    },
    {
      "event_payload": "{\"type\": \"MoneyWithdrawn\", \"amount\": 10.0, \"finalBalance\": 0.0}"
    }
  ]
}

So, although the API received a lot of concurrent requests, only 10 withdraw command requests were successful. ACID is good ?

Stress test

  • Opening many accounts with 1000 virtual users for 60 seconds:

docker run --network host -i loadimpact/k6 run --vus 1000 --duration 60s - <./k6/open-many-accounts.js

And you will see something like:

         /\      |‾‾| /‾‾/   /‾‾/
     /\  /  \     |  |/  /   /  /
    /  \/    \    |     (   /   ‾‾\
   /          \   |  |\  \ |  (‾)  |
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: -
     output: -

  scenarios: (100.00%) 1 scenario, 1000 max VUs, 1m30s max duration (incl. graceful stop):
           * default: 1000 looping VUs for 1m0s (gracefulStop: 30s)
     data_received..................: 39 MB  648 kB/s
     data_sent......................: 25 MB  414 kB/s
     http_req_blocked...............: avg=890.28µs min=847ns    med=2.26µs   max=281.92ms p(90)=5.48µs   p(95)=7.37µs
     http_req_connecting............: avg=876.3µs  min=0s       med=0s       max=281.89ms p(90)=0s       p(95)=0s
     http_req_duration..............: avg=473.78ms min=81.67ms  med=544.84ms max=1.11s    p(90)=812.89ms p(95)=915.29ms
       { expected_response:true }...: avg=473.78ms min=81.67ms  med=544.84ms max=1.11s    p(90)=812.89ms p(95)=915.29ms
     http_req_failed................: 0.00%  ✓ 0           ✗ 126587
     http_req_receiving.............: avg=41.09µs  min=9.78µs   med=30.58µs  max=15.49ms  p(90)=70.1µs   p(95)=92.86µs
     http_req_sending...............: avg=129.8µs  min=6.07µs   med=15.78µs  max=52.38ms  p(90)=35.53µs  p(95)=49.09µs
     http_req_tls_handshaking.......: avg=0s       min=0s       med=0s       max=0s       p(90)=0s       p(95)=0s
     http_req_waiting...............: avg=473.61ms min=79.73ms  med=544.79ms max=1.11s    p(90)=812.8ms  p(95)=915.17ms
     http_reqs......................: 126587 2091.369327/s
     iteration_duration.............: avg=474.94ms min=129.55ms med=545.22ms max=1.12s    p(90)=813.16ms p(95)=915.65ms
     iterations.....................: 126587 2091.369327/s
     vus............................: 1000   min=1000      max=1000
     vus_max........................: 1000   min=1000      max=1000

Conclusion

  • You have a clean and testable CQRS domain model.

  • Idiomatic Kotlin: type safeness: pattern matching, honor your constructors

  • Immutability: state transitions occurs after computing results of pure functions (State, Event) → State

  • JSON serialization using Kotlin serialization

To handle your command in a non-blocking way, Crabzilla will :

  • Validate your command using your implementation of:

fun interface CommandValidator<C : Command> {
  fun validate(command: C): List<String>
}
  • Lock the target state instance (using Postgres Advisory Locks).

  • Retrieve the target snapshot (state and version).

  • Submit your command and snapshot.state using your implementation of:

abstract class CommandHandler<S : State, C : Command, E : Event>(applier: EventHandler<S, E>) :
  CommandHandlerApi<S, C, E>(applier) {
  abstract fun handleCommand(command: C, state: S?): CommandSession<S, E>
}
  • Persistence (within a db transaction)

    • Persist your command as JSON to commands table.

    • Persist the resulting events + metadata as JSON to events table.

    • Optionally persist the resulting snapshot (state and version) as JSON to snapshots table.

  • Return a SessionData to the caller:

data class CommandSessionData(
  val originalState: State?,
  val events: List<Event>,
  val newState: State
)

And about 2K TPS running all the stack within on my local machine. It’s pretty fair for many use cases.

From here, you can use any mechanism (manual, CDC, etc) to publish these events to wherever you need.

Next steps (TODO)

  • Projecting events

GitHub

View Github