在Reactor Mono中获取地图总和

问题描述

如何将Mono / Flux的所有结果相加

我有一些反应式代码,其中我返回一个Flux并链接其他函数,我将用一些代码来解释它。

return commentService.getAllCommentByIssueId(issueId)
                .map {
                    taggingService.tagging(it.content).map {
                        //Get the count of the returned set and add it up to 
                        //all the other returned map results
                        //So the result should be like
                        //tags in first comment 2,tags in second comment 3
                        //so it should return 5
                    }

所以我返回一个通量,它是评论通量。我想将此查询的所有注释映射到一个函数,在该函数中我扫描每个注释的内容获取标记,该函数taggingservice.tagging实现,该函数返回一个Mono<MutableSet<UUID>>,因此它返回了所有被标记的UUID。 一个问题的注释数为x,我想总结所有注释中的所有标签

要做的是计算问题的每个注释的所有标签,并将其返回到由StatsModel(issueId,numberOfTagsInComments)组成的StatsModel中。

我现在将向您展示标记功能

(我测试了该功能,它正在工作)

fun tagging(text:String) : Mono<MutableSet<UUID>> {
        val words = text.split( " ")
                .filter { it.startsWith("@")}
        val matches : MutableSet<UUID> = mutableSetof()

        return userRepository.findAll().collectList()
                .map { userList ->
                    for(word in words){
                        userList.map {user ->
                            if(user.username == word.substring(1)) {
                                matches.add(user.id!!)
                            }
                        }
                    }
                    matches
                }
    }

还有我的CommentRepository,我正在使用此存储库的第一个函数

@Repository
interface CommentRepository: ReactiveCrudRepository<CommentModel,UUID> {
    fun findAllByIssueId(issueId: UUID): Flux<CommentModel>
    fun findAllByUserId(userId: UUID): Flux<CommentModel>
}

解决方法

您的问题尚不清楚,但是就像我说的那样,您似乎想减少。

您的问题不清楚,因为您正在从标记函数返回Set,但是Set仅允许唯一值,因此如果同一标记两次出现在注释中怎么办?

无论如何,假设您希望每个注释中的唯一标签总数,您可以考虑一下:

我制作了标签和UUID的数据库(地图)

Map<String,UUID> tags;
tags = Arrays.asList(new String[] {"tag1","tag2"}).stream().collect(Collectors.toMap(Function.identity(),s->UUID.randomUUID() ));

我的标记功能版本:

Mono<Set<UUID>> tagging(String string) {
    return Flux.fromArray(text.split(" ")).filter(str->str.startsWith("@")).map(str->str.substring(1)).map(tags::get).collect(Collectors.toSet());
}

获取评论Flux<String>的函数:

Flux<String> getAllComments() {
    return Flux.fromArray(new String[] {"this is @tag1","and @tag2","finally @tag1 and @tag2","unclear @tag1 and @tag1"});
}

一些简单的逻辑求和每个Set的大小:

getAllComments()
    .flatMap(this::tagging)
    .map(Set::size)
    .reduce(Integer::sum)
    .map(StatsModel::new)
    .subscribe(System.out::println);

假设lombok是你的朋友

@ToString
@RequiredArgsConstructor
class StatsModel{
    UUID issueId = UUID.randomUUID();
    @NonNull
    Integer count;
}

给我:

StatsModel(issueId=a3cf1243-8a89-406c-b0e8-c13a6bdc832c,count=5)

生成的UUID是免费的。

,

让我们尝试明确说明最终目标:您想获得StatsModel(issueId,numberOfUniqueTagsInComments)。然后,我们可以将其分解为子问题(为清楚起见,我将命名作一些更改):

  • 获取问题的所有评论:fun getAllComments(issueId): Flux<Comment>
  • 从内容中获取唯一的标签:fun uniqueTags(text: String): Mono<Set<UUID>>
  • 合并标签:给一个List<Set<UUID>>,您可以使用reduce获得最终的Set<UUID>。请注意,为此,您不需要使用Reactor,只需使用常规的Kotlin。

放在一起:

getAllComments(issueId)            // Flux<Comment>
  .flatMap { uniqueTags(it) }      // Flux<Set<UUID>>
  .collectList()                   // Mono<List<Set<UUID>>
  .map { uuidSets ->
    uuidSets
      .reduce { acc,set -> acc + set } // Set<UUID>
      .size()                           // Int
  }
  .map { StatsModel(issueId,it) }