问题描述
我现在正在公司的市长问题上工作,到目前为止,我还无法解决。解决这个问题的总体思路是通过Apache Kafka Topic的Apache Samza低级api接收和读取多条消息。
恶魔当然在程序中。代码本身不包含任何编译错误等,但是在此小程序的运行期间,StreamTaskFactory应尝试调用类“ PageViewFilterTask”。事实并非如此,我根本不知道为什么。
感谢您很快的帮助! :)
源代码
public class Launch {
public static void main(final String[] args) {
final CommandLine cmdLine = new CommandLine();
final OptionSet options = cmdLine.parser().parse(args);
final Config config = cmdLine.loadConfig(options);
final LocalApplicationRunner runner = new LocalApplicationRunner(new SamzaListener(args),config);
runner.run();
runner.waitForFinish();
}
}
public class SamzaListener implements TaskApplication {
private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("x.x.x.x:2181");
private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("x.x.x.x:9092");
private static final Map<String,String> KAFKA_DEFAULT_STREAM_CONfigS = ImmutableMap.of("replication.factor","1");
private static final String INPUT_STREAM_ID = "sample-input";
private static final String OUTPUT_STREAM_ID = "sample-output";
public SamzaListener(String[] args) {
}
@Override
public void describe(final TaskApplicationDescriptor appDesc) {
appDesc.getConfig().get(INPUT_STREAM_ID);
final KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor("kafka").withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT).withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
.withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONfigS);
final KafkaInputDescriptor<KV<String,String>> pageViewInput = kafkaSystemDescriptor.getInputDescriptor(INPUT_STREAM_ID,KVSerde.of(new StringSerde(),new StringSerde()));
final KafkaOutputDescriptor<KV<String,String>> pageViewOutput = kafkaSystemDescriptor.getoutputDescriptor(OUTPUT_STREAM_ID,new StringSerde()));
System.out.println("process started");
appDesc.withInputStream(pageViewInput);
appDesc.withOutputStream(pageViewOutput);
appDesc.withTaskFactory((StreamTaskFactory) () -> new PageViewFilterTask());
System.out.println("process ended");
}
}
public class PageViewFilterTask implements StreamTask {
@Override
public void process(final IncomingMessageEnvelope envelope,final MessageCollector collector,final TaskCoordinator coordinator) {
System.out.println("StreamTask started");
}
}
PowerShell输出:
PS D:\eclipse-workspace\SamzaTask> ./gradlew run --args="--config job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory --config job.config.loader.properties.path=./src/main/resources/samzaListener.properties"
> Task :run
log4j:WARN No appenders Could be found for logger (org.apache.samza.config.loaders.PropertiesConfigLoader).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
process started
process ended
<=========----> 75% EXECUTING [6s]
> :run
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)