diff --git a/CHANGES.md b/CHANGES.md index 72d2bcd..a10aa6e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,9 @@ # DurianRx releases ## [Unreleased] +### Added +- `RxExecutor.launch` lets a user run `suspend` functions within that executor. +- `GuardedExecutor` now has a lazily populated scope which cancels when its guard disposes, as well as a `launch` method. ## [5.0.2] - 2025-01-31 ### Fixed diff --git a/src/main/java/com/diffplug/common/rx/GuardedExecutor.kt b/src/main/java/com/diffplug/common/rx/GuardedExecutor.kt index d0592b1..c4a1756 100644 --- a/src/main/java/com/diffplug/common/rx/GuardedExecutor.kt +++ b/src/main/java/com/diffplug/common/rx/GuardedExecutor.kt @@ -20,9 +20,13 @@ import java.util.* import java.util.concurrent.CompletionStage import java.util.concurrent.Executor import java.util.function.Supplier +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.launch /** * GuardedExecutor is an [Executor] and [RxSubscriber] which promises to cancel its subscriptions @@ -31,6 +35,14 @@ import kotlinx.coroutines.flow.Flow * Useful for tying asynchronous tasks to gui elements. */ open class GuardedExecutor(val delegate: RxExecutor, val guard: Chit) : Executor, RxSubscriber { + val scope: CoroutineScope by lazy { + CoroutineScope(SupervisorJob() + delegate.dispatcher).apply { + guard.runWhenDisposed { cancel() } + } + } + + fun launch(block: suspend CoroutineScope.() -> Unit): Job = scope.launch(block = block) + override fun execute(command: Runnable) { delegate.executor.execute(guard.guard(command)) } diff --git a/src/main/java/com/diffplug/common/rx/RxExecutor.kt b/src/main/java/com/diffplug/common/rx/RxExecutor.kt index e77880a..70b1d88 100644 --- a/src/main/java/com/diffplug/common/rx/RxExecutor.kt +++ b/src/main/java/com/diffplug/common/rx/RxExecutor.kt @@ -36,6 +36,9 @@ import kotlinx.coroutines.launch class RxExecutor internal constructor(val executor: Executor, val dispatcher: CoroutineDispatcher) : RxSubscriber { + fun launch(block: suspend CoroutineScope.() -> Unit): Job = + CoroutineScope(Job() + dispatcher).launch(block = block) + interface Has : Executor { val rxExecutor: RxExecutor }