Flux 只返回一个值而不是所有值

问题描述

我有以下代码的问题,即它只返回每个表的第一个结果。

@Service
@Slf4j
@Lazy
@AllArgsConstructor
public class QuizQueryService {

    R2dbcEntityOperations client;


 @Transactional(readOnly = true)
  public Flux<QuizWithoutResults> getAllQuizzes(String courseId) {


    return getAllQuizEntitiesByCourseId(courseId)
            .onErrorResume(Flux::error)
            .switchIfEmpty(Flux.error(new RuntimeException("No quiz found with provided courseId")))
        .flatMap(
            y -> {
              final Flux<QuestionResponseDto> question =
                      getQuestionWithoutAnswer(y.id().toString()).onErrorResume(Mono::error);
              return Flux.zip(Mono.just(y),question);
            })
        .flatMap(
            tuple -> {
              var r = QuestionResponseDto.builder()
                      .id(tuple.getT2().id())
                      .title(tuple.getT2().title())
                      .answers(tuple.getT2().answers())
                      .build();
              final Set<QuestionResponseDto> questions =
                  Stream.of(
                          tuple.getT1().questions(),Set.of(r))
                      .flatMap(Stream::ofNullable)
                      .flatMap(Collection::stream)
                      .collect(Collectors.toSet());

              var quiz = tuple.getT1();
              quiz = quiz.withQuestions(questions);
              return Flux.just(quiz);
            })
        .flatMap(quiz-> Flux.zip(Flux.just(quiz),combineQuestionsWithAnswers(quiz)))
         .flatMap(tuple-> {
             var q = tuple.getT1();
             q = q.withQuestions(Set.of(tuple.getT2()));
             return Flux.just(q);
         });
 }

    private Flux<QuestionResponseDto> combineQuestionsWithAnswers(QuizWithoutResults quiz) {
        return Flux.fromIterable(quiz.questions())
                .flatMap(question-> Flux.zip(Mono.just(question),getAnswers(question.id().toString())))
                .flatMap(x-> {
                    var question = x.getT1();

                    question = question.withAnswers(Stream.concat(x.getT1().answers().stream(),Set.of(x.getT2()).stream())
                            .flatMap(Stream::ofNullable)
                            .collect(Collectors.toSet()));
                    return Flux.just(question);
                });
    }



    @Timed("getAllQuizEntitiesByCourseId")
     Flux<QuizWithoutResults> getAllQuizEntitiesByCourseId(String courseId) {
        //language=Postgresql
        String query =
                """
                select qi.* from unnest(array(select c.quiz_ids from courses c where c.id::text = $1)) quiz_id
                join quizzes qi on qi.id=quiz_id
                """;
        return client.getDatabaseClient().sql(query)
                .bind("$1",courseId)
                .map(QuizAdapters.QUIZ_FROM_ROW_MAP::apply)
                .all();
    }

    @Timed("getAllQuestionEntitiesByQuizId")
    Flux<QuestionResponseDto> getQuestionWithoutAnswer(String quizId) {
        //language=Postgresql
        String query = """
             select qs.* from unnest(array(select qi.question_ids from quizzes qi where qi.id::text = $1)) question_id
             join questions qs on qs.id=question_id
            """;

        return client.getDatabaseClient().sql(query)
                .bind("$1",quizId)
                .map(QuizAdapters.QUESTION_FROM_ROW_MAP::apply)
                .all();

    }

    @Timed("getAnswers")
    Flux<AnswerDto> getAnswers(String questionId) {
        //language=Postgresql
        String query = """
            select a.* from unnest(array(select qs.answer_ids from questions qs where qs.id::text = $1)) answer_id
            join answers a on a.id=answer_id
            """;

        return client.getDatabaseClient().sql(query)
                .bind("$1",questionId)
                .map(QuizAdapters.ANSWER_FROM_ROW_MAP::apply)
                .all();

    }


}

因此,我尝试仅返回此测试功能的结果,效果很好

    @Timed("getAllQuestionEntitiesByQuizId")
    public Flux<QuestionResponseDto> testFetch(String quizId) {
        //language=Postgresql
        String query = """
             select qs.* from unnest(array(select qi.question_ids from quizzes qi where qi.id::text = '409a1c2b-b1e5-42a7-b2ae-58212aaf3b37')) question_id
             join questions qs on qs.id=question_id
            """;

        return client.getDatabaseClient().sql(query)
                .map(QuizAdapters.QUESTION_FROM_ROW_MAP::apply)
                .all();

    }

老实说,在这种情况下,我会使用 .block() 因为我需要包含在另一个表中的信息才能继续前进,它们是 3 个表,但是这种可能性已被取消。

对于每个测验我都需要问题集,对于每个问题我都需要答案。

你会怎么做?

编辑

我已将上面的代码更改为:

package io.deviad.ripeti.webapp.application.query;

import io.deviad.ripeti.webapp.adapter.QuizAdapters;
import io.deviad.ripeti.webapp.api.command.AnswerDto;
import io.deviad.ripeti.webapp.api.queries.QuestionResponseDto;
import io.deviad.ripeti.webapp.api.queries.QuizWithoutResults;
import io.micrometer.core.annotation.Timed;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.r2dbc.core.R2dbcEntityOperations;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Service
@Slf4j
@Lazy
@AllArgsConstructor
public class QuizQueryService {

    R2dbcEntityOperations client;


 @Transactional(readOnly = true)
  public Mono<Map<UUID,Collection<QuizWithoutResults>>> getAllQuizzes(String courseId) {

    return getAllQuizEntitiesByCourseId(courseId)
            .onErrorResume(Flux::error)
            .switchIfEmpty(Flux.error(new RuntimeException("No quiz found with provided courseId")))
        .flatMap(
             y -> {
                 final Flux<QuestionResponseDto> questions =
                         getQuestionWithoutAnswer(y.id().toString()).onErrorResume(Mono::error);
                 return Flux.zip(Mono.defer(()->Mono.just(y)).repeat(),questions);
             })
        .flatMap(
            tuple -> {
              var r = QuestionResponseDto.builder()
                      .id(tuple.getT2().id())
                      .title(tuple.getT2().title())
                      .answers(tuple.getT2().answers())
                      .build();
              final Set<QuestionResponseDto> questions =
                  Stream.of(
                          tuple.getT1().questions(),Set.of(r))
                      .flatMap(Stream::ofNullable)
                      .flatMap(Collection::stream)
                      .collect(Collectors.toSet());

              var quiz = tuple.getT1();
              quiz = quiz.withQuestions(questions);
              return Flux.just(quiz);
            })
        .flatMap(quiz-> Flux.zip(Mono.defer(()->Mono.just(quiz)).repeat(),combineQuestionsWithAnswers(quiz)))
        .flatMap(tuple-> {
             var q = tuple.getT1();
             q = q.withQuestions(Set.of(tuple.getT2()));
             return Flux.just(q);
         })
        .collectMultimap(QuizWithoutResults::id);
 }

    private Flux<QuestionResponseDto> combineQuestionsWithAnswers(QuizWithoutResults quiz) {
        return Flux.fromIterable(quiz.questions())
                .flatMap(question-> Flux.zip(Mono.defer(()->Mono.just(question)).repeat(),questionId)
                .map(QuizAdapters.ANSWER_FROM_ROW_MAP::apply)
                .all();

    }


}

这是邮递员的结果:

{
    "409a1c2b-b1e5-42a7-b2ae-58212aaf3b37": [
        {
            "id": "409a1c2b-b1e5-42a7-b2ae-58212aaf3b37","quizName": "test quiz name","quizContent": "test quiz content","questions": [
                {
                    "id": "ca454179-83fe-4c11-aedf-23cfd415bb04","title": "question 2","answers": [
                        {
                            "id": "2d7e47ad-b34d-4d44-b931-f16a88976c3d","title": "Is it a good day?","correct": true
                        }
                    ]
                }
            ]
        },{
            "id": "409a1c2b-b1e5-42a7-b2ae-58212aaf3b37","answers": [
                        {
                            "id": "7cf65bad-6f24-4a9a-9278-c5a3ce27e750","title": "answer 2","questions": [
                {
                    "id": "c11ee15c-7792-41ce-b537-f9ef6d695b33","title": "question name","correct": true
                        }
                    ]
                }
            ]
        }
    ],"ec9fa95f-624d-4ba0-9b06-0ac3fd4c0679": [
        {
            "id": "ec9fa95f-624d-4ba0-9b06-0ac3fd4c0679","quizName": "mannaggia","quizContent": "alla miseria",{
            "id": "ec9fa95f-624d-4ba0-9b06-0ac3fd4c0679","correct": true
                        }
                    ]
                }
            ]
        }
    ]
}

现在我需要了解如何将这些问题和这些答案合并到一个集合中。

解决方法

我设法找到了解决方案,我不知道是否有更好的解决方案,但这个有效:

package io.deviad.ripeti.webapp.application.query;

import io.deviad.ripeti.webapp.adapter.QuizAdapters;
import io.deviad.ripeti.webapp.api.command.AnswerDto;
import io.deviad.ripeti.webapp.api.queries.QuestionResponseDto;
import io.deviad.ripeti.webapp.api.queries.QuizWithoutResults;
import io.micrometer.core.annotation.Timed;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.r2dbc.core.R2dbcEntityOperations;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Service
@Slf4j
@Lazy
@AllArgsConstructor
public class QuizQueryService {

    R2dbcEntityOperations client;


 @Transactional(readOnly = true)
  public Mono<Map<UUID,Collection<QuizWithoutResults>>> getAllQuizzes(String courseId) {

    return getAllQuizEntitiesByCourseId(courseId)
            .onErrorResume(Flux::error)
            .switchIfEmpty(Flux.error(new RuntimeException("No quiz found with provided courseId")))
        .flatMap(
             y -> {
                 final Flux<QuestionResponseDto> questions =
                         getQuestionWithoutAnswer(y.id().toString()).onErrorResume(Mono::error);
                 return Flux.zip(Mono.defer(()->Mono.just(y)).repeat(),questions.collect(Collectors.toSet()));
             })
        .flatMap(
            tuple -> {
              var quiz = tuple.getT1();
              quiz = quiz.withQuestions(tuple.getT2());
              return Flux.just(quiz);
            })
        .flatMap(quiz-> Flux.zip(Mono.defer(()->Mono.just(quiz)).repeat(),combineQuestionsWithAnswers(quiz).collect(Collectors.toSet())))
        .flatMap(tuple-> {
             var q = tuple.getT1();
             q = q.withQuestions(tuple.getT2());
             return Flux.just(q);
         })
        .collectMultimap(QuizWithoutResults::id);
 }

    private Flux<QuestionResponseDto> combineQuestionsWithAnswers(QuizWithoutResults quiz) {
        return Flux.fromIterable(quiz.questions())
                .flatMap(question-> Flux.zip(Mono.defer(()->Mono.just(question)).repeat(),getAnswers(question.id().toString()).collect(Collectors.toSet())))
                .flatMap(x-> {
                    var question = x.getT1();

                    question = question.withAnswers(x.getT2());
                    return Flux.just(question);
                });
    }



    @Timed("getAllQuizEntitiesByCourseId")
     Flux<QuizWithoutResults> getAllQuizEntitiesByCourseId(String courseId) {
        //language=PostgreSQL
        String query =
                """
                select qi.* from unnest(array(select c.quiz_ids from courses c where c.id::text = $1)) quiz_id
                join quizzes qi on qi.id=quiz_id
                """;
        return client.getDatabaseClient().sql(query)
                .bind("$1",courseId)
                .map(QuizAdapters.QUIZ_FROM_ROW_MAP::apply)
                .all();
    }

    @Timed("getAllQuestionEntitiesByQuizId")
    Flux<QuestionResponseDto> getQuestionWithoutAnswer(String quizId) {
        //language=PostgreSQL
        String query = """
             select qs.* from unnest(array(select qi.question_ids from quizzes qi where qi.id::text = $1)) question_id
             join questions qs on qs.id=question_id
            """;

        return client.getDatabaseClient().sql(query)
                .bind("$1",quizId)
                .map(QuizAdapters.QUESTION_FROM_ROW_MAP::apply)
                .all();

    }

    @Timed("getAnswers")
    Flux<AnswerDto> getAnswers(String questionId) {
        //language=PostgreSQL
        String query = """
            select a.* from unnest(array(select qs.answer_ids from questions qs where qs.id::text = $1)) answer_id
            join answers a on a.id=answer_id
            """;

        return client.getDatabaseClient().sql(query)
                .bind("$1",questionId)
                .map(QuizAdapters.ANSWER_FROM_ROW_MAP::apply)
                .all();

    }


}