RxJava2 中的函数反应式操作符,它从先前的状态和额外的输入产生新的状态

问题描述

我正在做一个小展示,旨在展示如何使用函数式反应式编程(特别是 RxJava2)以纯函数方式编写交互式程序。

我的目标是创建一个简单的互动游戏。游戏的主要功能(我们称之为 Next Game State,简称 NGS)获取游戏的当前状态、用户输入以及一个随机数,并根据这三个输入计算游戏的下一个状态。到目前为止相当简单。用户输入和随机数是从 Flowable 或通过生成器创建的 Iterable。我设想游戏状态本身也会是一个 Flowable(但我可能错了)。

我正在努力寻找合适的函数反应式运算符,将函数应用于三个输入并生成游戏的下一个状态。最初,我认为 Flowable.zip(source1,source2,source3,zipper) 是正确的操作:它可以采用三个流并通过 NGS 功能将其组合成一个新的 Flowable。不幸的是,这并没有考虑到生成的 Flowable 本身需要成为 zip 操作的输入之一的事实,这似乎是一个不可能的设置。我的下一个想法是使用 Flowable.generate,但我需要来自其他 Flowable 的两个额外输入来计算下一个状态,并且无法将这些输入提供给 generate 运算符。

简而言之,我试图实现类似于这个(伪)大理石图的东西:

  Game --------------(0)-------------(1)-----------------(2)-----------------(3)---
  State                \   _______   / \       _______   / \       _______   /
                        \_|       |_/   \_____|       |_/   \_____|       |_/
                          | Next  |           | Next  |           | Next  |
                     _____| Game  |      _____| Game  |      _____| Game  |
                    /   __| State |     /   __| State |     /   __| State |
                   /   /  '_______'    /   /  '_______'    /   /  '_______'
                  /   /               /   /               /   /
  User           /   /               /   /               /   /
  Input --------o---/---------------o---/---------------o---/--------------------
                   /                   /                   /
  Random          /                   /                   /
  Number --------o-------------------o-------------------o---------------------------

我承认,最上面的一行有点不标准,但这是我最好的形象化尝试,从初始游戏状态 (0) 开始,每个连续的游戏状态 (1),{{1 }}、(2) 等,是根据之前的游戏状态加上额外的输入创建的。

我意识到使用 (3) 或在下游订阅者内部可能有一种相对简单的方法来执行此操作,但此练习的目标之一是表明可以完全解决此问题而不会产生副作用或 Emitter 方法。

那么,长话短说,是否有支持这种行为的现有运算符?如果不是,创建一个会涉及什么?或者是否有其他解决方案使用略有不同的整体设置?

解决方法

假设

用户输入和随机数是两个独立的流,它们可以随时自行发出。

解决方案

长话短说,是否有支持这种行为的现有运算符?

是的,有。您可以使用 #scan(seed<T>,{ current<T>,upstream<Q> -> newValue<T> }

import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.functions.BiFunction
import io.reactivex.rxjava3.processors.PublishProcessor
import org.junit.jupiter.api.Test

class So65589721 {
    @Test
    fun `65589721`() {
        val userInput = PublishProcessor.create<String>()
        val randomNumber = PublishProcessor.create<Int>()

        val scan = Flowable.combineLatest(userInput,randomNumber,BiFunction<String,Int,Pair<String,Int>> { u,r ->
            u to r
        }).scan<GameState>(GameState.State1,{ currentState,currentInputTuple ->
            // calculate new state from `currentState` and combined pair
            GameState.State3(currentInputTuple.first,currentInputTuple.second)
        })

        val test = scan.test()

        randomNumber.offer(42)

        userInput.offer("input1")
        userInput.offer("input2")

        test
            .assertValues(GameState.State1,GameState.State3("input1",42),GameState.State3("input2",42))
    }

    interface GameState {
        object State1 : GameState
        data class State3(val value: String,val random: Int) : GameState
    }
}

此示例展示了如何使用 scan 从给定的输入对和当前状态计算新状态。使用 scan,您可以安全地“保持”状态,而不会使其暴露于副作用。

注意

  • 传递给 seed-valuescan 将作为订阅的第一个值发出。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...