如何通过带有 Spring 集成的 RabbitMQ 接收 protobuf 消息?

问题描述

我尝试使用 Spring Integration 从 RabbitMQ 接收 protobuf 消息。

我的集成流程:

public class FacadeIntegrationFlowAdapter extends IntegrationFlowAdapter {

  @SuppressWarnings("rawtypes")
  private final Facade facade;
  private final FacadeProperties facadeProperties;

  @SuppressWarnings("unchecked")
  @Override
  protected IntegrationFlowDeFinition<?> buildFlow() {
    return from(facadeProperties.getQueueName())
        .handle(facade::getNewMessages);
  }
}

getNewMessages 方法

@Override
public ExchangeResponse getNewMessages(Message<ExchangeRequest> message) {
    ExchangeRequest request = message.getPayload();
    log.info("Receiving new message: " + request.toString());

这就是我将消息发送到队列的方式。使测试易于遵循是如此简单。

  ExchangeRequest request = ExchangeRequest.newBuilder()
      .addAllAuthors(List.of("author1","author2"))
      .addAllBooks(List.of("book1","book2"))
      .build();

  ConnectionFactory connectionFactory = new ConnectionFactory();
  connectionFactory.setUsername("user");
  connectionFactory.setPassword("password");
  connectionFactory.setHost("localhost");
  connectionFactory.setPort(24130);

  try {
    Connection connection = connectionFactory.newConnection();
    Channel channel = connection.createChannel();
    var basicProperties = new AMQP.BasicProperties().builder()
        .contentType("application/x-protobuf")
        .type(request.getDescriptorForType().getFullName())
        .build();

    channel.basicpublish(
        "facade-exchange","facade-routing-key",basicProperties,request.toByteArray());
  } catch (IOException e) {

不幸的是,我不断收到异常:

com.google.protobuf.InvalidProtocolBufferException: Type of the Any message does not match the given class.

但是,当我将 getNewMessages 方法更改为以下内容时,一切似乎都很好。

  @Override
  public ExchangeResponse getNewMessages(Message message) {
    try {
      Any payload = (Any) message.getPayload();
      ByteString value = payload.getValue();
      ExchangeRequest request = ExchangeRequest.parseFrom(value);
      log.info("Receiving new message: " + request.toString());

我哪里出错了?发送!

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)