协程中 rxJava onNext 和 onError 的等价物是什么

问题描述

嗨,我有一些使用 rxJava 用 Ja​​va 编写的用例。我已经将它们转换为 kotlin 文件,而不是 rxJava,我将它们变成了 couroutines 挂起函数

在我的 rxJava 代码中,我正在从用例进行 api 调用并返回结果,但同时 onNext 执行某些操作,而 onError 执行某些操作。

如何在协程中做同样的事情

这是我的 rxjava 代码

    @PerApp
public class StartFuellingUseCase {

    @Inject
    App app;
    @Inject
    CurrentOrderStorage orderStorage;
    @Inject
    FuelOrderRepository repository;

    @Inject
    StartFuellingUseCase() {
        // empty constructor for injection usage
    }

    public Observable<GenericResponse> execute(Equipment equipment) {
        if (orderStorage.getFuelOrder() == null) return null;

        DateTime startTime = new DateTime();
        TimestampedAction action = new TimestampedAction(
                app.getSession().getUser().getId(),null,startTime
        );

        return repository.startFuelling(orderStorage.getFuelOrder().getId(),action)
                .subscribeOn(Schedulers.io())
                .unsubscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(response -> onSuccess(startTime,equipment))
                .doOnError(this::onError);
    }

    private void onSuccess(DateTime startTime,Equipment equipment) {
        if (orderStorage.getFuelOrder() == null) return;

        orderStorage.getFuelOrder().setStatus(FuelOrderData.STATUS_FUELLING);
        equipment.getTimes().setStart(startTime);

        app.saveState();
    }

    private void onError(Throwable e) {
        Timber.e(e,"Error calling started fuelling! %s",e.getMessage());
    }
}

我已经使用协程用例在 Kotlin 中重新编写了代码

 @PerApp
class StartFuellingUseCaseCoroutine  @Inject constructor(
    private val currentOrderStorage: CurrentOrderStorage,private val fuelOrderRepository: FuelOrderRepository,private val app: App
): UseCaseCoroutine<GenericResponse,StartFuellingUseCaseCoroutine.Params>() {

    override suspend fun run(params: Params): GenericResponse {
        val startTime = DateTime()
        val action = TimestampedAction(
            app.session.user.id,startTime
        )
        return fuelOrderRepository.startFuelling(
            currentOrderStorage.fuelOrder!!.id,action
        )
        //SHOULD RETURN THE VALUE FROM THE fuelOrderRepository.startFuelling
        //AND ALSO
        //ON NEXT
        //CALL onSuccess PASSING startTime and equipment
        //ON ERROR
        //CALL onError
    }

    private fun onSuccess(startTime: DateTime,equipment: Equipment) {
        if (currentOrderStorage.getFuelOrder() == null) return
        currentOrderStorage.getFuelOrder()!!.setStatus(FuelOrderData.STATUS_FUELLING)
        equipment.times.start = startTime
        app.saveState()
    }

    private fun onError(errorMessage: String) {
        Timber.e(errorMessage,errorMessage)
    }

    data class Params(val equipment: Equipment)
}

你能否建议我如何调用 onSuccess 和 onError 类似于我们在 rxjava onnext 和 onError 中的调用方式。

你能建议如何解决这个问题

谢谢 R

解决方法

您可以像下面的转换示例一样使用 Kotlin Flow:

RxJava

private fun observable(
    value: Int = 1
): Observable<Int> {
    return Observable.create { emitter ->
        emitter.onNext(value)
        emitter.onError(RuntimeException())
    }
}

流程:

private fun myFlow(
    value: Int = 1
): Flow<Int> {
    return flow {
        emit(value)
        throw RuntimeException()
    }
}

更多详情:https://developer.android.com/kotlin/flow

,

使用 flowOf 将 startFuelling 转换为流量,您可以在下面进行

return flowOf(repository
       .startFuelling(orderStorage.getFuelOrder().getId(),action))
       .onEach{response -> onSuccess(startTime,equipment)}
       .catch{e -> onError(e) }
       .flowOn(Dispatchers.IO) //this will make above statements to execute on IO 

如果你想在主线程收集它,你可以使用launchIn

.onEach{ } 
.launchIn(mainScope)//could be lifeCycleScope/viewModelScope

//or

CoroutineScope(Dispatchers.Main).launch{
   flow.collect{}
}