RxJava takeWhile 似乎被忽略了应该只对第一页执行

问题描述

我正在尝试使用 workmanager 批量上传数据库实体。工作正常,但是执行分页的服务没有按预期工作。

我想做什么:

  1. 对我需要上传数据库条目进行分页(服务器上的最后一个时间戳比客户端上的新)。将页面作为 List
  2. 将条目映射到 api 对象
  3. 将实体传递给另一个触发 API 调用方法,发送条目列表

问题:

  1. 分页似乎忽略了 takeWhile 运算符并执行直到达到最大执行限制
  2. 永远不会执行 uploadEntitiesFor 调用
private void pageEntityAndUpload(long lastTimestamp) {
    disposable ax = Observable.range(0,MAX_IteraTION)
            .doOnNext(integer -> {
                loginformation.loginformation(TAG,null,String.format("Paging %s for user: %s,Offset (page:%s): %s,Limit: %s",resourceName,userId,integer,integer * limit,limit)
                );
            })
            .concatMap(integer -> pagesupplierFunction.page(userId,lastTimestamp,limit))
            .doOnNext(dbEntities -> {
                loginformation.loginformation(TAG,String.format("Found %s: %s",dbEntities.stream()
                                .map(EntityBase::getId)
                                .collect(Collectors.joining(",")))
                );
            })
            .takeWhile(dbEntities -> !dbEntities.isEmpty())
            .flatMapIterable(a -> a)
            .map(e -> entityToApiMapper.apply(e))
            .toList()
            //.subscribeOn(Schedulers.io())
            .subscribeOn(Schedulers.single())
            .subscribe(
                    apiEntities -> uploadEntitiesFor(apiEntities),throwable -> logError.logError(TAG,throwable,String.format("Failed to read and convert %s to api objects.",resourceName))
            );
}

接口:

private PagesupplierFunction<DBT> pagesupplierFunction;
...
@FunctionalInterface
public interface PagesupplierFunction<T> {
    public Observable<List<T>> page(String userId,long lastTimestamp,int offset,int limit);
}
private Function<DBT,AT> entityToApiMapper;

页面供应商来自一个Room Dao,例如(它是一个内部应用程序,我需要存储每个用户的东西......):

@Query("SELECT * FROM partner " +
        "WHERE user_id = ....)" +
        "LIMIT :limit OFFSET :offset")
Observable<List<Partner>> pagePartnerSuggestionsFor(String userId,int limit);

执行过程中,日志输出

2021-05-18 15:41:33.684 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:0): 0,Limit: 1000
2021-05-18 15:41:34.189 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:1): 1000,Limit: 1000
2021-05-18 15:41:34.191 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:2): 2000,Limit: 1000
2021-05-18 15:41:34.206 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:3): 3000,Limit: 1000
2021-05-18 15:41:34.207 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:4): 4000,Limit: 1000
2021-05-18 15:41:34.209 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:5): 5000,Limit: 1000
2021-05-18 15:41:34.217 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:6): 6000,Limit: 1000
2021-05-18 15:41:34.219 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:7): 7000,Limit: 1000
2021-05-18 15:41:34.226 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:8): 8000,Limit: 1000
2021-05-18 15:41:34.230 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:9): 9000,Limit: 1000
2021-05-18 15:41:34.240 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:10): 10000,Limit: 1000
2021-05-18 15:41:34.247 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:11): 11000,Limit: 1000
2021-05-18 15:41:34.253 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:12): 12000,Limit: 1000
2021-05-18 15:41:34.254 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:13): 13000,Limit: 1000
2021-05-18 15:41:34.255 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:14): 14000,Limit: 1000
2021-05-18 15:41:34.258 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:15): 15000,Limit: 1000
2021-05-18 15:41:34.259 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:16): 16000,Limit: 1000
2021-05-18 15:41:34.262 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:17): 17000,Limit: 1000
2021-05-18 15:41:34.265 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:18): 18000,Limit: 1000
2021-05-18 15:41:34.267 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:19): 19000,Limit: 1000
2021-05-18 15:41:34.268 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:20): 20000,Limit: 1000
2021-05-18 15:41:34.270 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:21): 21000,Limit: 1000
2021-05-18 15:41:34.271 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:22): 22000,Limit: 1000
2021-05-18 15:41:34.276 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:23): 23000,Limit: 1000
2021-05-18 15:41:34.279 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:24): 24000,Limit: 1000
2021-05-18 15:41:34.283 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:25): 25000,Limit: 1000
2021-05-18 15:41:34.285 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:26): 26000,Limit: 1000
2021-05-18 15:41:34.287 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:27): 27000,Limit: 1000
2021-05-18 15:41:34.291 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:28): 28000,Limit: 1000
2021-05-18 15:41:34.294 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:29): 29000,Limit: 1000
2021-05-18 15:41:34.298 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:30): 30000,Limit: 1000
2021-05-18 15:41:34.307 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:31): 31000,Limit: 1000
2021-05-18 15:41:34.311 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:32): 32000,Limit: 1000
2021-05-18 15:41:34.314 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:33): 33000,Limit: 1000
2021-05-18 15:41:34.316 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:34): 34000,Limit: 1000
2021-05-18 15:41:34.316 25902-26058/... I/PartnerUploadService: Found Partner: 957690320dee4f7983070a1fb630f487
2021-05-18 15:41:34.317 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:35): 35000,Limit: 1000
2021-05-18 15:41:34.318 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:36): 36000,Limit: 1000
2021-05-18 15:41:34.319 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:37): 37000,Limit: 1000
2021-05-18 15:41:34.321 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:38): 38000,Limit: 1000
2021-05-18 15:41:34.322 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:39): 39000,Limit: 1000
2021-05-18 15:41:34.335 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:40): 40000,Limit: 1000
2021-05-18 15:41:34.337 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:41): 41000,Limit: 1000
2021-05-18 15:41:34.342 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:42): 42000,Limit: 1000
2021-05-18 15:41:34.343 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:43): 43000,Limit: 1000
2021-05-18 15:41:34.346 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:44): 44000,Limit: 1000
2021-05-18 15:41:34.359 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:45): 45000,Limit: 1000
2021-05-18 15:41:34.361 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:46): 46000,Limit: 1000
2021-05-18 15:41:34.362 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:47): 47000,Limit: 1000
2021-05-18 15:41:34.363 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:48): 48000,Limit: 1000
2021-05-18 15:41:34.369 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:49): 49000,Limit: 1000

我现在只有一个合作伙伴要上传上传不会被触发,分页仍然执行,直到达到限制并且在第一页之后不会停止。

解决方法

您向您的 concatMap 提供了将按顺序处理的 observable。下一个 observable 只有在前一个完成后才会被处理。

问题是,你的 Dao 返回 observable,它实际上并不完整 - 除非订阅者被处理,否则房间表将被观察。

在您的情况下,您应该只通过更改 Dao 方法从数据库中获取当前值,返回 Single 而不是 Obervable(并且还将 concatMap 更改为 {{1 }}.)


示例

我故意没有在 concatMapSingle 上调用 onComplete 来模拟您的情况。

emitter

结果:

Observable.range(0,5)
    .concatMap { id ->
        Observable.create<Int> { emitter ->
            emitter.onNext(id)
            // emitter.onComplete()
        }
    }
    .subscribe(
        { println("Next-$it") },{ println("Error") },{ println("Complete") }
    )

可以通过将 Next-0 更改为 Observable 来修复。

Single

结果:

Observable.range(0,5)
    .concatMapSingle { id ->
        Single.create<Int> { emitter ->
            emitter.onSuccess(id)
        }
    }
    .subscribe(
        { println("Next-$it") },{ println("Complete") }
    )