项目:Camel
文件:RouteBuilderTest.java
public void testSplitter() throws Exception {
List<Route> routes = buildSplitter();
log.debug("Created routes: " + routes);
assertEquals("Number routes created",1,routes.size());
for (Route route : routes) {
Endpoint key = route.getEndpoint();
assertEquals("From endpoint","direct://a",key.getEndpointUri());
EventDrivenConsumerRoute consumer = assertisinstanceOf(EventDrivenConsumerRoute.class,route);
Channel channel = unwrapChannel(consumer.getProcessor());
assertisinstanceOf(Splitter.class,channel.getNextProcessor());
}
}
/**
* Creates a consumer endpoint that splits up the List of Maps into exchanges of single
* Maps,and within each exchange it converts each Map to JSON.
*/
@Override
public Consumer createConsumer(final Processor processor) throws Exception {
final ToJSONProcessor toJsonProcessor = new ToJSONProcessor();
Processor pipeline = Pipeline.newInstance(getCamelContext(),toJsonProcessor,processor);
final Expression expression = ExpressionBuilder.bodyExpression(List.class);
final Splitter splitter = new Splitter(getCamelContext(),expression,pipeline,null);
return endpoint.createConsumer(splitter);
}
@Before
public void setUp() throws Exception {
CamelContext context = new DefaultCamelContext();
messages = new ArrayList<>();
splitter = new Splitter(
context,new TestExpression(),new TestProcessor(),new UseLatestAggregationStrategy());
}
@Override
public Processor createProcessor(RouteContext routeContext) throws Exception {
Processor childProcessor = this.createChildProcessor(routeContext,true);
aggregationStrategy = createAggregationStrategy(routeContext);
boolean isParallelProcessing = getParallelProcessing() != null && getParallelProcessing();
boolean isstreaming = getStreaming() != null && getStreaming();
boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork();
boolean isParallelAggregate = getParallelAggregate() != null && getParallelAggregate();
boolean shutdownThreadPool = ProcessorDeFinitionHelper.willCreateNewThreadPool(routeContext,this,isParallelProcessing);
ExecutorService threadPool = ProcessorDeFinitionHelper.getConfiguredExecutorService(routeContext,"Split",isParallelProcessing);
long timeout = getTimeout() != null ? getTimeout() : 0;
if (timeout > 0 && !isParallelProcessing) {
throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled.");
}
if (onPrepareRef != null) {
onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(),onPrepareRef,Processor.class);
}
Expression exp = getExpression().createExpression(routeContext);
Splitter answer = new Splitter(routeContext.getCamelContext(),exp,childProcessor,aggregationStrategy,isParallelProcessing,threadPool,shutdownThreadPool,isstreaming,isstopOnException(),timeout,onPrepare,isShareUnitOfWork,isParallelAggregate);
return answer;
}
项目:Camel
文件:ManagedSplitter.java