问题描述
工作方案如下:我将json发送到控制器(然后将json转换为对象)->然后将对象发送到队列(我使用原则作为传输)->然后处理程序读取消息,然后将对象保存到数据库。问题:异步处理消息时,该对象的多个重复项被保存在数据库中,并且处理程序会写出有关连接错误的信息(请参见屏幕截图)。
在symfony强制转换上找到了此问题的描述,他们建议在将对象保存到数据库时使用flush函数,而无需先使用persist函数。使用这种方法,不会保存到数据库。
使用同步保存,不会发生此错误。
有人知道这个问题的解决方案吗?
ApiController:
/**
* @Route("/api/task")
* @param Request $request
* @param MessageBusInterface $messageBus
* @return Response
*/
public function createTask(Request $request,MessageBusInterface $messageBus): Response
{
try {
$serializer = SerializerBuilder::create()->build();
$this->inputCalculatorTaskDto = $serializer->deserialize($request->getContent(),InputCalculatorTaskDto::class,'json');
$this->inputCalculatorTaskDto
->setTaskGuid(Uuid::uuid4())
->setServiceName(ServicesListEnum::SERVICE_CALULATOR)
->setTaskName(TaskListEnum::CALculaTE_ORDER_PRICE)
->setCreatedAt(DateTime::createFromFormat('U.u',microtime(TRUE)));
$calculatorTaskMessage = new CalculatorTaskMessage($this->inputCalculatorTaskDto);
$messageBus->dispatch($calculatorTaskMessage);
$this->logger->info('done ' . $this->inputCalculatorTaskDto->getTaskGuid());
return new Response('done');
} catch (\Throwable $t) {
$this->logger->error($t->getMessage());
return new Response('error ' . $t->getMessage());
}
}
消息:
class CalculatorTaskMessage
{
/**
* @var InputCalculatorTaskDto
*/
private InputCalculatorTaskDto $inputCalculatorTaskDto;
public function __construct(InputCalculatorTaskDto $inputCalculatorTaskDto)
{
$this->inputCalculatorTaskDto = $inputCalculatorTaskDto;
}
/**
* @return InputCalculatorTaskDto
*/
public function getInputCalculatorTaskDto(): InputCalculatorTaskDto
{
return $this->inputCalculatorTaskDto;
}
}
MessageHandler
class CalculatorNewTaskCreatorHandler implements MessageHandlerInterface
{
public InputCalculatorTaskDtoToTaskEntityImpl $inputCalculatorTaskDtoToTaskEntity;
public EntityManagerInterface $entityManager;
public TasksRepository $tasksRepository;
private CalculatorTaskProducer $calculatorTaskProducer;
/**
* CalculatorTaskCreatorHandler constructor.
* @param EntityManagerInterface $entityManager
* @param TasksRepository $tasksRepository
* @param CalculatorTaskProducer $calculatorTaskProducer
*/
public function __construct(EntityManagerInterface $entityManager,TasksRepository $tasksRepository,CalculatorTaskProducer $calculatorTaskProducer)
{
$this->inputCalculatorTaskDtoToTaskEntity = new InputCalculatorTaskDtoToTaskEntityImpl();
$this->entityManager = $entityManager;
$this->tasksRepository = $tasksRepository;
$this->calculatorTaskProducer = $calculatorTaskProducer;
}
public function __invoke(CalculatorTaskMessage $calculatorTaskMessage)
{
$inputCalculatorTaskDto = $calculatorTaskMessage->getInputCalculatorTaskDto();
$tasks = $this->inputCalculatorTaskDtoToTaskEntity->map($inputCalculatorTaskDto);
$this->entityManager->persist($tasks);
$this->entityManager->flush();
$this->calculatorTaskProducer->sendMessage($inputCalculatorTaskDto);
}
}
解决方法
这确实是一个令人沮丧的问题。幸运的是,解决复制问题的方法应该很简单。
出现的问题是,当您将消息发布到队列并再次从队列中使用时,您不再使用同一实体管理器。在您的使用者部分,Doctrine无法再识别该对象,并将其视为非托管实体。因此,Doctrine会插入同一事物的多个实例。
如何解决此问题:
在使用者中,在尝试保留对象之前(从消息中)进行查找。如果找不到该实体,则说明它是一个新实体,可以安全地插入它。
如果找到了现有实体,Doctrine就会知道该实体已经存在,因此不会再次插入。此时,您可以更新实体的值(如果需要),然后保留并刷新它。此时,应该更新您的实体,并且您不再有任何重复。