在 Spring Cloud 函数中使用 MessageConverter 用于以 SQS 事件作为触发器的 lambda

问题描述

我正在努力理解 MessageConverters provided out of the box from spring 的正确用法。我的用例是使用 SQS 事件触发 lambda 函数的执行。

以下函数不起作用

<RadioButton GroupName="Lead" IsChecked="{Binding Widget.prop1,Mode=TwoWay,ElementName=root}" />
<RadioButton GroupName="Lead" IsChecked="{Binding Widget.prop2,ElementName=root}" />

我为该字段提供了空值。这是相关的日志片段

@SpringBootApplication
@EnableReactiveMongoRepositories
@Slf4j
public class ServerlessApplication {

    public static void main(String[] args) {
        FunctionalSpringApplication.run(ServerlessApplication.class,args);
    }

    @Bean
    public Function<SuperHero,Boolean> imperativeMessageFunction() {
        return superHeroMessage -> {
            log.debug("Imperative function received a message from the queue: {}",superHeroMessage);
            // AckNowledgement that the message is processed!
            return true;
        };
    }
}

但是,如果我将函数的输入类型提供为 Message,它确实如此。

13:14:41.992 [main] DEBUG c.e.sqslambda.ServerlessApplication - Imperative function received a message from the queue: SuperHero(name=null,powers=null)

我当然可以提取消息的有效负载并将其转换为 POJO,但是我没有利用预先构建的 MessageConverter。拼图的缺失部分是什么?


通过网络控制台发送到 SQS 队列的消息

Message sent to SQS


使用 lambda 的测试功能

Test feature of AWS lambda


我的实体类如下图。

@Bean
    public Function<Message<String>,Boolean> imperativeMessageFunction() {
        return message -> {
            log.debug("Imperative function received a message from the queue: {}",message.getPayload());
            // AckNowledgement that the message is processed!
            return true;
        };
    }

Also see the following GitHub issue for details.,其中更新的文档指出后备内容类型应为“application/json”

现在,我有一个自定义MessageConverter,如下所示作为一种解决方法,但我希望摆脱这个样板代码;这是我对春季项目的期望。

import lombok.Data;
import lombok.ToString;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

import java.util.Set;

@Data
@Document
@ToString
public class SuperHero {
    @Id
    @ToString.Exclude
    private String id;
    private String name;
    private Set<String> powers;
}

请注意,引入自定义 MessageConverter 还需要一个随附的 @Bean 定义,如下所示。

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.jsonpath.JsonPath;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.AbstractMessageConverter;
import org.springframework.util.MimeType;

import java.util.Optional;
import java.util.Set;

@Slf4j
public class SuperHeroMessageConverter extends AbstractMessageConverter {

    public SuperHeroMessageConverter() {
        super(new MimeType("*","*"));
    }

    @Override
    protected boolean supports(final Class<?> clazz) {
        return SuperHero.class.equals(clazz);
    }

    @Override
    protected Object convertFromInternal(Message<?> message,Class<?> targetClass,Object conversionHint) {
        log.debug("Converting an incoming message to an object of type SuperHero ...");
        message.getHeaders().forEach( (header,value) -> log.debug("Header name: {},Header value {}",header,value) );
        return Optional.of(message.getPayload())
                .map( payload -> payload instanceof String ? (String) payload : new String((byte[]) payload))
                .map( payloadText -> JsonPath.parse(payloadText).read("$['Records'][0]['body']",String.class))
                .flatMap(json -> extractEntity(json,SuperHero.class))
                .orElseThrow( () -> new RuntimeException("Failed to extract an object of type SuperHero from message!") );
    }

    private <T> Optional<T> extractEntity(String messagePayload,final Class<T> clazz) {
        log.debug("Converting JSON '{}' to an entity of type {}",messagePayload,clazz.getSimpleName());
        try {
            return Optional.ofNullable(new ObjectMapper().readValue(messagePayload,clazz));
        } catch (final JsonProcessingException jsonEx) {
            log.error("Error encountered while parsing JSON content: "+messagePayload,jsonEx);
            return Optional.empty();
        }
    }
}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)