带有平面图的RxJava2间隔

问题描述

我需要定期发送项目列表,并在发送每个项目时将其从本地存储库中删除。因此,问题在于我无法每个周期发送整个列表,每个周期只能发送一个项目。这是我的开始发送方法

 public void startSending() {
        disposables.add(Observable.interval(INTERVAL_TIME,TimeUnit.SECONDS)
                .filter(new Predicate<Long>() {
                    @Override
                    public boolean test(Long aLong) throws Exception {
                        Log.v("Service11","Service to send way bill is started");
                        if (wayBillList.size() > 0 && !isWorking) {
                            return true;
                        }
                        return false;
                    }
                })
                .flatMap(new Function<Long,ObservableSource<WayBill>>() {
                    @Override
                    public ObservableSource<WayBill> apply(Long aLong) throws Exception {
                        return wayBillController
                                .makeFixation(wayBillList)
                                .subscribeOn(Schedulers.io())
                                .observeOn(AndroidSchedulers.mainThread())
                                .doOnSubscribe(new Consumer<disposable>() {
                                    @Override
                                    public void accept(disposable disposable) throws Exception {
                                        Log.v("Service11","Sending was started");
                                        isWorking = true;
                                    }
                                }).doOnTerminate(new Action() {
                                    @Override
                                    public void run() throws Exception {
                                        Log.v("Service11","Sending was finished");
                                        isWorking = false;
                                    }
                                });
                    }
                })
                .subscribeOn(AndroidSchedulers.mainThread())
                .observeOn(AndroidSchedulers.mainThread())
                .retry()
                .subscribeWith(new disposableObserver<WayBill>() {
                    @Override
                    public void onNext(WayBill wayBill) {
                        Log.v("Service11","WayBill id: " + wayBill.getId() + " and time is " + wayBill.getTimeData());
                        removeFromStack(wayBill);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.v("Service11",e.getMessage());

                        Toast.makeText(WayBillSendService.this,getResources().getString(R.string.error_occurred_while_sending),Toast.LENGTH_SHORT).show();
                    }

                    @Override
                    public void onComplete() {
                        Log.v("Service11","On completed");
                    }
                }));

    }

这是我的wayBillController.makefixation()方法

public Observable<WayBill> makeFixation(final List<WayBill> wayBillList){
        return Observable.create(new ObservableOnSubscribe<WayBill>() {
            @Override
            public void subscribe(ObservableEmitter<WayBill> emitter) throws Exception {
                String siteUrl = SharedPreferencesHelper
                        .getDefaultPreferences()
                        .getString(SharedPreferencesHelper.SERVER_URL_KEY,"");
                for (WayBill wayBill: wayBillList){
                    Response serverResponse = RequestHelper.doPostRequest(getUpdatedClient(),siteUrl + WAY_BILL_ENDPOINT,createRequestBody(wayBill));
                    if (serverResponse.code() == HttpURLConnection.HTTP_OK){
                        Log.v("Service11",messageDecode(serverResponse.body().string()));
                        emitter.onNext(wayBill);
                        removeImage(wayBill.getUri().getPath());
                    } else {
                        Log.v("Service11","Server is error,code: " + serverResponse.code());
                        emitter.onError(new Exception(serverResponse.body().string()));
                        break;
                    }
                }
                emitter.onComplete();
            }
        });
    }

实际上,我不需要使用generator.onComplete();。因为makeFixation()可观察到在onNext()或onError()之后完成。如何预见该行为并完成每个周期发送所有项目,而不是每个周期发送一次?

一个选择是放弃项目列表,但是如果发送了第一项而第二项却给我一个错误怎么办?我会收到错误而不是已发送的邮件

此外,我可以使用数据库。同样,这是一个附加代码((

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)