项目:Camel
文件:ReplyManagerSupport.java
public void processReply(ReplyHolder holder) {
if (holder != null && isRunAllowed()) {
try {
Exchange exchange = holder.getExchange();
boolean timeout = holder.isTimeout();
if (timeout) {
// timeout occurred do a WARN log so its easier to spot in the logs
if (log.isWarnEnabled()) {
log.warn("Timeout occurred after {} millis waiting for reply message with correlationID [{}] on destination {}."
+ " Setting ExchangeTimedOutException on {} and continue routing.",holder.getRequestTimeout(),holder.getCorrelationId(),replyTo,ExchangeHelper.logIds(exchange));
}
// no response,so lets set a timed out exception
String msg = "reply message with correlationID: " + holder.getCorrelationId() + " not received on destination: " + replyTo;
exchange.setException(new ExchangeTimedOutException(exchange,msg));
} else {
messageConverter.populaterabbitExchange(exchange,null,holder.getProperties(),holder.getMessage(),true);
// restore correlation id in case the Remote Server messed with it
if (holder.getoriginalCorrelationId() != null) {
if (exchange.hasOut()) {
exchange.getout().setHeader(RabbitMQConstants.CORRELATIONID,holder.getoriginalCorrelationId());
} else {
exchange.getIn().setHeader(RabbitMQConstants.CORRELATIONID,holder.getoriginalCorrelationId());
}
}
}
} finally {
// notify callback
AsyncCallback callback = holder.getCallback();
callback.done(false);
}
}
}
@Override
public void configure() throws Exception {
onException(Exception.class).bean(asError).bean(toJson).handled(true);
errorHandler(new LoggingErrorHandlerBuilder(log));
// from("timer:hello?period=3s")
// .process(exchange -> {
// exchange.getIn().setBody(new GreetingReceived("Hendy"));
// })
// .to("seda:greetingReceived");
// from("timer:tell me a good story?period=1s&repeatCount=1")
// .process(exchange -> {
// final AgentResponse agentResponse = aimlService.process(Locale.US,"tell me a good story",logChannel);
// droolsService.process(agentResponse);
// });
final String agentId = "arkan";
from("rabbitmq://localhost/amq.topic?connectionFactory=#amqpConnFactory&exchangeType=topic&autoDelete=false&queue=" + AvatarChannel.CHAT_INBox.wildcard() + "&routingKey=" + AvatarChannel.CHAT_INBox.wildcard())
.process(exchange -> {
final long startTime = System.currentTimeMillis();
final CommunicateAction inCommunicate = toJson.getMapper().readValue(
exchange.getIn().getBody(byte[].class),CommunicateAction.class);
inCommunicate.setAvatarId(AvatarChannel.getAvatarId((String) exchange.getIn().getHeader(RabbitMQConstants.ROUTING_KEY)));
log.info("Chat inBox for {}: {}",inCommunicate.getAvatarId(),inCommunicate);
final Optional<Locale> origLocale = Optional.ofNullable(inCommunicate.getInLanguage());
final float[] speechTruthValue = Optional.ofNullable(inCommunicate.getSpeechTruthValue()).orElse(new float[]{0f,0f,0f});
final boolean speechInput = speechTruthValue.length >= 2 && speechTruthValue[1] > 0f;
// aimL style
// final AgentResponse agentResponse = aimlService.process(origLocale,inCommunicate.getobject(),// chatChannel,speechInput);
// if (!agentResponse.getCommunicateActions().isEmpty()) {
// for (final CommunicateAction communicateAction : agentResponse.getCommunicateActions()) {
// chatChannel.express(inCommunicate.getAvatarId(),communicateAction,null);
// }
// } else if (agentResponse.getUnrecognizedinput() != null) {
// chatChannel.express(inCommunicate.getAvatarId(),Proposition.I_DONT_UNDERSTAND,true,null);
// }
// droolsService.process(agentResponse);
final InteractionSession session = sessionManager.getorCreate(chatChannel,inCommunicate.getAvatarId());
session.receiveUtterance(origLocale,factService,taskRepo,scriptRepo);
session.update(chatChannel,inCommunicate.getAvatarId());
// FIXME: re-implement SocialJournal
// final SocialJournal socialJournal = new SocialJournal();
// socialJournal.setFromresponse(origLocale,// inCommunicate.getobject(),SocialChannel.DIRECT,// agentResponse,Duration.millis(System.currentTimeMillis() - startTime));
// socialJournalRepo.save(socialJournal);
exchange.getIn().setBody(new Status());
})
.bean(toJson);
}