问题描述
我正在尝试为我的项目应用 Redisson 功能作为消息代理,我有一个问题。是否可以将 Redisson 异步推送到先前收到的消息?我创建了一个小例子,从不同的 URL 发送了 4 条消息。我预计,Redisson 会异步处理它们,但它一一做到了。 这里的实现:
singularity build --fakeroot <OUTPUT_CONTAINER.sif> Singularity
和配置:
public class RedisListenerServiceImpl implements MessageListener<String> {
private static final Logger log = LoggerFactory.getLogger(RedisListenerServiceImpl.class);
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void onMessage(CharSequence channel,String stringMsg) {
log.info("Message received: {}",stringMsg);
MessageDto msg;
try {
msg = objectMapper.readValue(stringMsg,MessageDto.class);
} catch (final IOException e) {
log.error("Unable to deserialize message: {}",e.getMessage(),e);
return;
}
try {
//Do my stuff
} catch (Exception e) {
log.error("Unable to get service from factory: {}",e);
}
}
}
解决方法
这是预期的行为。如果您希望在 Listener onMessage() 方法被触发时同时处理您的消息,只需使用线程池来处理它。
由于 Redisson 不知道您要使用多少个线程触发的事件,因此将实现细节留给您。
public class RedisListenerServiceImpl implements MessageListener<String> {
private static final Logger log = LoggerFactory.getLogger(RedisListenerServiceImpl.class);
private final ObjectMapper objectMapper = new ObjectMapper();
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
@Override
public void onMessage(CharSequence channel,String stringMsg) {
log.info("Message received: {}",stringMsg);
MessageDto msg;
try {
msg = objectMapper.readValue(stringMsg,MessageDto.class);
executorService.submit(()->{
System.out.println("do something with message: "+msg);
});
} catch (final IOException e) {
log.error("Unable to deserialize message: {}",e.getMessage(),e);
return;
}
try {
//Do my stuff
} catch (Exception e) {
log.error("Unable to get service from factory: {}",e);
}
}