A small testing library for kotlinx.coroutines Flow

Turbine

Turbine is a small testing library for kotlinx.coroutines Flow.

flowOf("one", "two").test {
  assertEquals("one", awaitItem())
  assertEquals("two", awaitItem())
  awaitComplete()
}

A turbine is a rotary mechanical device that extracts energy from a fluid flow and converts it into useful work.

Wikipedia

Download

repositories {
  mavenCentral()
}
dependencies {
  testImplementation 'app.cash.turbine:turbine:0.6.0'
}
Snapshots of the development version are available in Sonatype’s snapshots repository.

repositories {
  maven {
    url 'https://oss.sonatype.org/content/repositories/snapshots/'
  }
}
dependencies {
  testImplementation 'app.cash.turbine:turbine:0.7.0-SNAPSHOT'
}

Usage

The entrypoint for the library is the test extension for Flow which accepts a validation block. Like collect, test is a suspending function that will not return until the flow is complete or canceled.

someFlow.test {
  // Validation code here!
}

Consuming Events

Inside the test block you must consume all received events from the flow. Failing to consume all events will fail your test.

flowOf("one", "two").test {
  assertEquals("one", awaitItem())
}
Exception in thread "main" AssertionError:
  Unconsumed events found:
   - Item(two)
   - Complete

As the exception indicates, consuming the "two" item is not enough. The complete event must also be consumed.

flowOf("one", "two").test {
  assertEquals("one", awaitItem())
  assertEquals("two", awaitItem())
  awaitComplete()
}

Received events can be explicitly ignored, however.

flowOf("one", "two").test {
  assertEquals("one", awaitItem())
  cancelAndIgnoreRemainingEvents()
}

Additionally, we can receive the most recent emitted item and ignore the previous ones.

no emission yet
// 100ms – 200ms -> “one” is emitted
// 200ms – 300ms -> “two” is emitted
// 300ms – 400ms -> “three” is emitted
delay(250)
assertEquals(“two”, expectMostRecentItem())
cancelAndIgnoreRemainingEvents()
}
“>

flowOf("one", "two", "three")
  .map {
    delay(100)
    it
  }
  .test {
    // 0 - 100ms -> no emission yet
    // 100ms - 200ms -> "one" is emitted
    // 200ms - 300ms -> "two" is emitted
    // 300ms - 400ms -> "three" is emitted
    delay(250)
    assertEquals("two", expectMostRecentItem())
    cancelAndIgnoreRemainingEvents()
  }

Consuming Errors

Unlike collect, a flow which causes an exception will still be exposed as an event that you must consume.

flow { throw RuntimeException("broken!") }.test {
  assertEquals("broken!", awaitError().message)
}

Failure to consume an error will result in the same unconsumed event exception as above, but with the exception added as the cause so that the full stacktrace is available.

flow { throw RuntimeException("broken!") }.test { }

java.lang.AssertionError: Unconsumed events found:
 - Error(RuntimeException)
    at app.cash.turbine.ChannelBasedFlowTurbine.ensureAllEventsConsumed(FlowTurbine.kt:240)
    ... 53 more
Caused by: java.lang.RuntimeException: broken!
    at example.MainKt$main$1.invokeSuspend(Main.kt:7)
    ... 32 more

Asynchronous Flows

Calls to awaitItem(), awaitComplete(), and awaitError() are suspending and will wait for events from asynchronous flows.

channelFlow {
  withContext(IO) {
    Thread.sleep(100)
    send("item")
  }
}.test {
  assertEquals("item", awaitItem())
  awaitComplete()
}

By default, when one of the “await” methods suspends waiting for an event it will timeout after one second.

channelFlow {
  withContext(IO) {
    Thread.sleep(2_000)
    send("item")
  }
}.test {
  assertEquals("item", awaitItem())
  awaitComplete()
}
Exception in thread "main" TimeoutCancellationException: Timed out waiting for 1000 ms

A longer timeout can be specified as an argument to test.

channelFlow {
  withContext(IO) {
    Thread.sleep(2_000)
    send("item")
  }
}.test(timeout = 3.seconds) {
  assertEquals("item", awaitItem())
  awaitComplete()
}

Asynchronous flows can be canceled at any time and will not require consuming a complete or error event.

channelFlow {
  withContext(IO) {
    repeat(10) {
      Thread.sleep(200)
      send("item $it")
    }
  }
}.test {
  assertEquals("item 0", awaitItem())
  assertEquals("item 1", awaitItem())
  assertEquals("item 2", awaitItem())
  cancel()
}

Hot Flows

Emissions to hot flows that don’t have active consumers are dropped. It’s important to call test (and therefore have an active collector) on a flow before emissions to a flow are made. For example:

<div class="highlight highlight-source-kotlin position-relative" data-snippet-clipboard-copy-content="val mutableSharedFlow = MutableSharedFlow(replay = 0)
mutableSharedFlow.emit(1)
mutableSharedFlow.test {
assertEquals(awaitItem(), 1)
cancelAndConsumeRemainingEvents()
}
“>

val mutableSharedFlow = MutableSharedFlow<Int>(replay = 0)
mutableSharedFlow.emit(1)
mutableSharedFlow.test {
  assertEquals(awaitItem(), 1)
  cancelAndConsumeRemainingEvents()
}

will fail with a timeout exception.

kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1000 ms
	(Coroutine boundary)
	at app.cash.turbine.ChannelBasedFlowTurbine$awaitEvent$2.invokeSuspend(FlowTurbine.kt:238)
	at app.cash.turbine.ChannelBasedFlowTurbine$withTimeout$2.invokeSuspend(FlowTurbine.kt:206)
	at app.cash.turbine.ChannelBasedFlowTurbine.awaitItem(FlowTurbine.kt:243)

Proper usage of Turbine with hot flows looks like the following.

<div class="highlight highlight-source-kotlin position-relative" data-snippet-clipboard-copy-content="val mutableSharedFlow = MutableSharedFlow(replay = 0)
mutableSharedFlow.test {
mutableSharedFlow.emit(1)
assertEquals(awaitItem(), 1)
cancelAndConsumeRemainingEvents()
}
“>

val mutableSharedFlow = MutableSharedFlow<Int>(replay = 0)
mutableSharedFlow.test {
  mutableSharedFlow.emit(1)
  assertEquals(awaitItem(), 1)
  cancelAndConsumeRemainingEvents()
}

The hot flow types Kotlin currently provide are:

  • MutableStateFlow
  • StateFlow
  • MutableSharedFlow
  • SharedFlow
  • Channels converted to flow with Channel.consumeAsFlow

Experimental API Usage

Turbine uses Kotlin experimental APIs:

  • Duration is used to declare the event timeout.

Since the library targets test code, the impact and risk of any breaking changes to these APIs are minimal and would likely only require a version bump.

Instead of sprinkling the experimental annotations or @OptIn all over your tests, opt-in at the compiler level.

Groovy DSL

compileTestKotlin {
  kotlinOptions {
    freeCompilerArgs += [
        '-Xopt-in=kotlin.time.ExperimentalTime',
    ]
  }
}

Kotlin DSL

tasks.compileTestKotlin {
  kotlinOptions {
    freeCompilerArgs += listOf(
        "-Xopt-in=kotlin.time.ExperimentalTime",
    )
  }
}

For multiplatform projects:

Groovy DSL

kotlin {
  sourceSets.matching { it.name.endsWith("Test") }.all {
    it.languageSettings {
      useExperimentalAnnotation('kotlin.time.ExperimentalTime')
    }
  }
}

Kotlin DSL

kotlin.sourceSets.matching {
  it.name.endsWith("Test")
}.configureEach {
  languageSettings.useExperimentalAnnotation("kotlin.time.ExperimentalTime")
}

License

Copyright 2018 Square, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

GitHub

https://github.com/cashapp/turbine