问题描述
我正在尝试使用 workmanager 批量上传数据库实体。工作正常,但是执行分页的服务没有按预期工作。
我想做什么:
问题:
private void pageEntityAndUpload(long lastTimestamp) {
disposable ax = Observable.range(0,MAX_IteraTION)
.doOnNext(integer -> {
loginformation.loginformation(TAG,null,String.format("Paging %s for user: %s,Offset (page:%s): %s,Limit: %s",resourceName,userId,integer,integer * limit,limit)
);
})
.concatMap(integer -> pagesupplierFunction.page(userId,lastTimestamp,limit))
.doOnNext(dbEntities -> {
loginformation.loginformation(TAG,String.format("Found %s: %s",dbEntities.stream()
.map(EntityBase::getId)
.collect(Collectors.joining(",")))
);
})
.takeWhile(dbEntities -> !dbEntities.isEmpty())
.flatMapIterable(a -> a)
.map(e -> entityToApiMapper.apply(e))
.toList()
//.subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.single())
.subscribe(
apiEntities -> uploadEntitiesFor(apiEntities),throwable -> logError.logError(TAG,throwable,String.format("Failed to read and convert %s to api objects.",resourceName))
);
}
接口:
private PagesupplierFunction<DBT> pagesupplierFunction;
...
@FunctionalInterface
public interface PagesupplierFunction<T> {
public Observable<List<T>> page(String userId,long lastTimestamp,int offset,int limit);
}
private Function<DBT,AT> entityToApiMapper;
页面供应商来自一个Room Dao,例如(它是一个内部应用程序,我需要存储每个用户的东西......):
@Query("SELECT * FROM partner " +
"WHERE user_id = ....)" +
"LIMIT :limit OFFSET :offset")
Observable<List<Partner>> pagePartnerSuggestionsFor(String userId,int limit);
执行过程中,日志输出:
2021-05-18 15:41:33.684 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:0): 0,Limit: 1000
2021-05-18 15:41:34.189 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:1): 1000,Limit: 1000
2021-05-18 15:41:34.191 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:2): 2000,Limit: 1000
2021-05-18 15:41:34.206 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:3): 3000,Limit: 1000
2021-05-18 15:41:34.207 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:4): 4000,Limit: 1000
2021-05-18 15:41:34.209 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:5): 5000,Limit: 1000
2021-05-18 15:41:34.217 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:6): 6000,Limit: 1000
2021-05-18 15:41:34.219 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:7): 7000,Limit: 1000
2021-05-18 15:41:34.226 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:8): 8000,Limit: 1000
2021-05-18 15:41:34.230 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:9): 9000,Limit: 1000
2021-05-18 15:41:34.240 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:10): 10000,Limit: 1000
2021-05-18 15:41:34.247 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:11): 11000,Limit: 1000
2021-05-18 15:41:34.253 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:12): 12000,Limit: 1000
2021-05-18 15:41:34.254 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:13): 13000,Limit: 1000
2021-05-18 15:41:34.255 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:14): 14000,Limit: 1000
2021-05-18 15:41:34.258 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:15): 15000,Limit: 1000
2021-05-18 15:41:34.259 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:16): 16000,Limit: 1000
2021-05-18 15:41:34.262 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:17): 17000,Limit: 1000
2021-05-18 15:41:34.265 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:18): 18000,Limit: 1000
2021-05-18 15:41:34.267 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:19): 19000,Limit: 1000
2021-05-18 15:41:34.268 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:20): 20000,Limit: 1000
2021-05-18 15:41:34.270 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:21): 21000,Limit: 1000
2021-05-18 15:41:34.271 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:22): 22000,Limit: 1000
2021-05-18 15:41:34.276 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:23): 23000,Limit: 1000
2021-05-18 15:41:34.279 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:24): 24000,Limit: 1000
2021-05-18 15:41:34.283 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:25): 25000,Limit: 1000
2021-05-18 15:41:34.285 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:26): 26000,Limit: 1000
2021-05-18 15:41:34.287 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:27): 27000,Limit: 1000
2021-05-18 15:41:34.291 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:28): 28000,Limit: 1000
2021-05-18 15:41:34.294 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:29): 29000,Limit: 1000
2021-05-18 15:41:34.298 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:30): 30000,Limit: 1000
2021-05-18 15:41:34.307 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:31): 31000,Limit: 1000
2021-05-18 15:41:34.311 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:32): 32000,Limit: 1000
2021-05-18 15:41:34.314 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:33): 33000,Limit: 1000
2021-05-18 15:41:34.316 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:34): 34000,Limit: 1000
2021-05-18 15:41:34.316 25902-26058/... I/PartnerUploadService: Found Partner: 957690320dee4f7983070a1fb630f487
2021-05-18 15:41:34.317 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:35): 35000,Limit: 1000
2021-05-18 15:41:34.318 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:36): 36000,Limit: 1000
2021-05-18 15:41:34.319 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:37): 37000,Limit: 1000
2021-05-18 15:41:34.321 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:38): 38000,Limit: 1000
2021-05-18 15:41:34.322 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:39): 39000,Limit: 1000
2021-05-18 15:41:34.335 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:40): 40000,Limit: 1000
2021-05-18 15:41:34.337 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:41): 41000,Limit: 1000
2021-05-18 15:41:34.342 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:42): 42000,Limit: 1000
2021-05-18 15:41:34.343 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:43): 43000,Limit: 1000
2021-05-18 15:41:34.346 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:44): 44000,Limit: 1000
2021-05-18 15:41:34.359 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:45): 45000,Limit: 1000
2021-05-18 15:41:34.361 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:46): 46000,Limit: 1000
2021-05-18 15:41:34.362 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:47): 47000,Limit: 1000
2021-05-18 15:41:34.363 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:48): 48000,Limit: 1000
2021-05-18 15:41:34.369 25902-26054/... I/PartnerUploadService: Paging Partner for user: TWnF0Tk....Uk1,Offset (page:49): 49000,Limit: 1000
我现在只有一个合作伙伴要上传。上传不会被触发,分页仍然执行,直到达到限制并且在第一页之后不会停止。
解决方法
您向您的 concatMap
提供了将按顺序处理的 observable。下一个 observable 只有在前一个完成后才会被处理。
问题是,你的 Dao 返回 observable,它实际上并不完整 - 除非订阅者被处理,否则房间表将被观察。
在您的情况下,您应该只通过更改 Dao 方法从数据库中获取当前值,返回 Single
而不是 Obervable
(并且还将 concatMap
更改为 {{1 }}.)
示例
我故意没有在 concatMapSingle
上调用 onComplete
来模拟您的情况。
emitter
结果:
Observable.range(0,5)
.concatMap { id ->
Observable.create<Int> { emitter ->
emitter.onNext(id)
// emitter.onComplete()
}
}
.subscribe(
{ println("Next-$it") },{ println("Error") },{ println("Complete") }
)
可以通过将 Next-0
更改为 Observable
来修复。
Single
结果:
Observable.range(0,5)
.concatMapSingle { id ->
Single.create<Int> { emitter ->
emitter.onSuccess(id)
}
}
.subscribe(
{ println("Next-$it") },{ println("Complete") }
)