问题描述
我创建了一个 pollableChannel,它正在侦听 S3 Bucket 获取文件并启动作业。
我的班级是这样的:
@Bean
public S3SessionFactory s3SessionFactory(AmazonS3 pAmazonS3) {
return new S3SessionFactory(pAmazonS3);
}
@Bean
public S3InboundFileSynchronizer s3InboundFileSynchronizer(S3SessionFactory s3SessionFactory) {
S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(s3SessionFactory);
synchronizer.setPreserveTimestamp(true);
synchronizer.setDeleteRemoteFiles(false);
synchronizer.setRemoteDirectory(awsS3Properties.getCercBucket());
return synchronizer;
}
@Bean
public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource(
S3InboundFileSynchronizer s3InboundFileSynchronizer) {
S3InboundFileSynchronizingMessageSource messageSource = new S3InboundFileSynchronizingMessageSource(
s3InboundFileSynchronizer);
messageSource.setautocreateLocalDirectory(true);
messageSource.setLocalDirectory(new FileSystemResource(integrationProperties.getTempDirectoryName()).getFile());
return messageSource;
}
@Bean("${receivable.integration.inChannel}")
public PollableChannel s3FilesChannel() {
return new QueueChannel();
}
@Bean
public IntegrationFlow integrationFlow(
S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource) {
return IntegrationFlows
.from(s3InboundFileSynchronizingMessageSource,c -> c.poller(Pollers.fixedrate(1000).maxMessagesPerPoll(1)))
.transform(fileMessagetoJobRequest()).handle(jobLaunchingGateway())
.get();
}
@Bean
public FileMessagetoJobRequest fileMessagetoJobRequest() {
FileMessagetoJobRequest fileMessagetoJobRequest = new FileMessagetoJobRequest();
fileMessagetoJobRequest.setFileParameterName("input.file.name");
fileMessagetoJobRequest.setJob(receivablePositionJob);
return fileMessagetoJobRequest;
}
@Bean
@ServiceActivator(inputChannel = "${receivable.integration.inChannel}",poller = @Poller(fixedrate = "1000"))
public JobLaunchingGateway jobLaunchingGateway() {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository);
simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
jobLaunchingGateway.setoutputChannel(s3FilesChannel());
return jobLaunchingGateway;
}
而我的 FileMessagetoJobRequest 是这样的:
public class FileMessagetoJobRequest {
private Job job;
private String fileParameterName;
public void setFileParameterName(String fileParameterName) {
this.fileParameterName = fileParameterName;
}
public void setJob(Job job) {
this.job = job;
}
@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName,message.getPayload().getAbsolutePath());
return new JobLaunchRequest(job,jobParametersBuilder.toJobParameters());
}
}
我想在消息中添加自定义 MessageHeader 或者我的第二个选项是在消息发布之前拦截上下文,因为我需要在 ThreadLocal 中设置我的租户。
我怎么能这样做?
提前致谢。
使用丰富标题更新:
@Bean
public IntegrationFlow integrationFlow(
S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource) {
return IntegrationFlows
.from(s3InboundFileSynchronizingMessageSource,c -> c.poller(Pollers.fixedrate(1000).maxMessagesPerPoll(1)))
.transform(fileMessagetoJobRequest())
.enrichHeaders(Map.of("teste","testandio"))
.handle(jobLaunchingGateway())
.get();
}
解决方法
首先,您必须删除那个 @ServiceActivator(inputChannel = "${receivable.integration.inChannel}"
,因为它指向同一个 s3FilesChannel
,它也是那个 outputChannel
的 JobLaunchingGateway
。因此,您正在使用这样的配置进行循环。完全不知道它对你有什么作用...
要在发送到 JobLaunchingGateway
之前添加标头,您只需在 enrichHeaders()
定义中的 .handle(jobLaunchingGateway())
之前添加 integrationFlow
。