Symfony Messenger组件失去连接并重复保存记录

问题描述

工作方案如下:我将json发送到控制器(然后将json转换为对象)->然后将对象发送到队列(我使用原则作为传输)->然后处理程序读取消息,然后将对象保存到数据库。问题:异步处理消息时,该对象的多个重复项被保存在数据库中,并且处理程序会写出有关连接错误的信息(请参见屏幕截图)。

在symfony强制转换上找到了此问题的描述,他们建议在将对象保存到数据库时使用flush函数,而无需先使用persist函数。使用这种方法,不会保存到数据库

使用同步保存,不会发生此错误

有人知道这个问题的解决方案吗?

ApiController:

 /**
 * @Route("/api/task")
 * @param Request $request
 * @param MessageBusInterface $messageBus
 * @return Response
 */
public function createTask(Request $request,MessageBusInterface $messageBus): Response
{
    try {
        $serializer = SerializerBuilder::create()->build();

        $this->inputCalculatorTaskDto = $serializer->deserialize($request->getContent(),InputCalculatorTaskDto::class,'json');

        $this->inputCalculatorTaskDto
            ->setTaskGuid(Uuid::uuid4())
            ->setServiceName(ServicesListEnum::SERVICE_CALULATOR)
            ->setTaskName(TaskListEnum::CALculaTE_ORDER_PRICE)
            ->setCreatedAt(DateTime::createFromFormat('U.u',microtime(TRUE)));

        $calculatorTaskMessage = new CalculatorTaskMessage($this->inputCalculatorTaskDto);
        $messageBus->dispatch($calculatorTaskMessage);

        $this->logger->info('done ' . $this->inputCalculatorTaskDto->getTaskGuid());

        return new Response('done');

    } catch (\Throwable $t) {
        $this->logger->error($t->getMessage());

        return new Response('error ' . $t->getMessage());
    }
}

消息:

class CalculatorTaskMessage
{
    /**
     * @var InputCalculatorTaskDto
     */
    private InputCalculatorTaskDto $inputCalculatorTaskDto;

    public function __construct(InputCalculatorTaskDto $inputCalculatorTaskDto)
    {
        $this->inputCalculatorTaskDto = $inputCalculatorTaskDto;
    }

    /**
     * @return InputCalculatorTaskDto
     */
    public function getInputCalculatorTaskDto(): InputCalculatorTaskDto
    {
        return $this->inputCalculatorTaskDto;
    }
}

MessageHandler

class CalculatorNewTaskCreatorHandler implements MessageHandlerInterface
{
    public InputCalculatorTaskDtoToTaskEntityImpl $inputCalculatorTaskDtoToTaskEntity;
    public EntityManagerInterface $entityManager;
    public TasksRepository $tasksRepository;
    private CalculatorTaskProducer $calculatorTaskProducer;
    
    /**
        * CalculatorTaskCreatorHandler constructor.
        * @param EntityManagerInterface $entityManager
        * @param TasksRepository $tasksRepository
        * @param CalculatorTaskProducer $calculatorTaskProducer
    */
    public function __construct(EntityManagerInterface $entityManager,TasksRepository $tasksRepository,CalculatorTaskProducer $calculatorTaskProducer)
    {
        $this->inputCalculatorTaskDtoToTaskEntity = new InputCalculatorTaskDtoToTaskEntityImpl();
        $this->entityManager = $entityManager;
        $this->tasksRepository = $tasksRepository;
        $this->calculatorTaskProducer = $calculatorTaskProducer;
    }
    
    public function __invoke(CalculatorTaskMessage $calculatorTaskMessage)
    {
        $inputCalculatorTaskDto = $calculatorTaskMessage->getInputCalculatorTaskDto();
        $tasks = $this->inputCalculatorTaskDtoToTaskEntity->map($inputCalculatorTaskDto);
        
        $this->entityManager->persist($tasks);
        $this->entityManager->flush();
        
        $this->calculatorTaskProducer->sendMessage($inputCalculatorTaskDto);
    }
}   

enter image description here

解决方法

这确实是一个令人沮丧的问题。幸运的是,解决复制问题的方法应该很简单。

出现的问题是,当您将消息发布到队列并再次从队列中使用时,您不再使用同一实体管理器。在您的使用者部分,Doctrine无法再识别该对象,并将其视为非托管实体。因此,Doctrine会插入同一事物的多个实例。

如何解决此问题:

在使用者中,在尝试保留对象之前(从消息中)进行查找。如果找不到该实体,则说明它是一个新实体,可以安全地插入它。

如果找到了现有实体,Doctrine就会知道该实体已经存在,因此不会再次插入。此时,您可以更新实体的值(如果需要),然后保留并刷新它。此时,应该更新您的实体,并且您不再有任何重复。