问题描述
我正在尝试执行以下操作
@Autowired
private TaskProcessorProperties processorProperties;
@Autowired
Processor processor;
@Autowired
private AppConfiguration appConfiguration ;
@Transformer(inputChannel = MyProcessor.intermidiate,outputChannel = Processor.OUTPUT)
public Object setupRequest(String message) {
Map<String,String> properties = new HashMap<>();
if (StringUtils.hasText(this.processorProperties.getDataSourceUrl())) {
properties.put("spring_datasource_url",this.processorProperties.getDataSourceUrl());
}
if (StringUtils.hasText(this.processorProperties.getDataSourceDriverClassName())) {
properties.put("spring_datasource_driverClassName",this.processorProperties
.getDataSourceDriverClassName());
}
if (StringUtils.hasText(this.processorProperties.getDataSourceUserName())) {
properties.put("spring_datasource_username",this.processorProperties
.getDataSourceUserName());
}
if (StringUtils.hasText(this.processorProperties.getDataSourcePassword())) {
properties.put("spring_datasource_password",this.processorProperties
.getDataSourcePassword());
}
properties.put("payload",message);
TaskLaunchRequest request = new TaskLaunchRequest(
this.processorProperties.getUri(),null,properties,this.processorProperties.getApplicationName());
System.out.println("inside task launcher **************************");
System.out.println(request.toString() +"**************************");
return new Genericmessage<>(request);
}
@ServiceActivator(inputChannel = Processor.INPUT,outputChannel = MyProcessor.intermidiate)
@Bean
public MessageHandler aggregator() {
AggregatingMessageHandler aggregatingMessageHandler =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),new SimpleMessageStore(10));
Aggregatorfactorybean aggregatorfactorybean = new Aggregatorfactorybean();
//aggregatorfactorybean.setMessageStore();
//aggregatingMessageHandler.setoutputChannel(processor.output());
//aggregatorfactorybean.setdiscardChannel(processor.output());
aggregatingMessageHandler.setSendPartialResultOnExpiry(true);
aggregatingMessageHandler.setSendTimeout(1000L);
aggregatingMessageHandler.setCorrelationStrategy(new ExpressionEvaluatingCorrelationStrategy("'FOO'"));
aggregatingMessageHandler.setReleaseStrategy(new MessageCountReleaseStrategy(3)); //ExpressionEvaluatingreleaseStrategy("size() == 5")
aggregatingMessageHandler.setExpireGroupsUponCompletion(true);
aggregatingMessageHandler.setGroupTimeoutExpression(new ValueExpression<>(3000L)); //size() ge 2 ? 5000 : -1
aggregatingMessageHandler.setExpireGroupsUponTimeout(true);
return aggregatingMessageHandler;
}
要在聚合器和任务启动器方法之间传递消息(setupRequest(String message)),我正在使用如下定义的通道 MyProcessor.intermidiate
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Indexed;
public interface MyProcessor {
String intermidiate = "intermidiate";
@Output("intermidiate")
MessageChannel intermidiate();
}
使用的Applicaion.properties在下面
aggregator.message-store-type=persistentMessageStore
spring.cloud.stream.bindings.input.destination=output
spring.cloud.stream.bindings.output.destination=input
使用上述方法无效。
在此类中,如果我将通道名称从定义的通道MyProcessor.intermediate更改为Processor.input或Processor.output,则其中任何一项都不起作用(基于更改为Processor。*的通道名称)
我想先聚集消息,而不想在处理器中的聚集消息上启动任务,
解决方法
查看此处:
public Object setupRequest(String message) {
因此,您希望一些string
作为请求有效负载。
您的AggregatorFactoryBean
使用DefaultAggregatingMessageGroupProcessor
,它可以做到这一点:
List<Object> payloads = new ArrayList<Object>(messages.size());
for (Message<?> message : messages) {
payloads.add(message.getPayload());
}
return payloads;
因此,它绝对不是String
。
奇怪的是,您没有显示配置发生了什么异常,但是我认为您需要更改setupRequest()
签名才能获得List
的有效负载,或者您需要提供一些自定义的{ {1}},以便从您汇总的邮件组中构建MessageGroupProcessor
。