Apache Samza低级API不会加载StreamTask

问题描述

我现在正在公司的市长问题上工作,到目前为止,我还无法解决解决这个问题的总体思路是通过Apache Kafka Topic的Apache Samza低级api接收和读取多条消息。

恶魔当然在程序中。代码本身不包含任何编译错误等,但是在此小程序的运行期间,StreamTaskFactory应尝试调用类“ PageViewFilterTask”。事实并非如此,我根本不知道为什么。

感谢您很快的帮助! :)

代码

public class Launch {
    public static void main(final String[] args) {
        final CommandLine cmdLine = new CommandLine();
        final OptionSet options = cmdLine.parser().parse(args);
        final Config config = cmdLine.loadConfig(options);

        final LocalApplicationRunner runner = new LocalApplicationRunner(new SamzaListener(args),config);
        runner.run();
        runner.waitForFinish();
    }
}
public class SamzaListener implements TaskApplication {

    private static final List<String>           KAFKA_CONSUMER_ZK_CONNECT           = ImmutableList.of("x.x.x.x:2181");
    private static final List<String>           KAFKA_PRODUCER_BOOTSTRAP_SERVERS    = ImmutableList.of("x.x.x.x:9092");
    private static final Map<String,String>    KAFKA_DEFAULT_STREAM_CONfigS        = ImmutableMap.of("replication.factor","1");

    private static final String INPUT_STREAM_ID     = "sample-input";
    private static final String OUTPUT_STREAM_ID    = "sample-output";

    public SamzaListener(String[] args) {
        
    }
    
    @Override
    public void describe(final TaskApplicationDescriptor appDesc) {
        appDesc.getConfig().get(INPUT_STREAM_ID);

        final KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor("kafka").withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT).withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
                .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONfigS);

        final KafkaInputDescriptor<KV<String,String>> pageViewInput = kafkaSystemDescriptor.getInputDescriptor(INPUT_STREAM_ID,KVSerde.of(new StringSerde(),new StringSerde()));
        final KafkaOutputDescriptor<KV<String,String>> pageViewOutput = kafkaSystemDescriptor.getoutputDescriptor(OUTPUT_STREAM_ID,new StringSerde()));

        System.out.println("process started");

        appDesc.withInputStream(pageViewInput);
        appDesc.withOutputStream(pageViewOutput);
        appDesc.withTaskFactory((StreamTaskFactory) () -> new PageViewFilterTask());

        System.out.println("process ended");
    }

}
public class PageViewFilterTask implements StreamTask {

    @Override
    public void process(final IncomingMessageEnvelope envelope,final MessageCollector collector,final TaskCoordinator coordinator) {
        System.out.println("StreamTask started");
    }

}

PowerShell输出

PS D:\eclipse-workspace\SamzaTask> ./gradlew run --args="--config job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory --config job.config.loader.properties.path=./src/main/resources/samzaListener.properties"

> Task :run
log4j:WARN No appenders Could be found for logger (org.apache.samza.config.loaders.PropertiesConfigLoader).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
process started
process ended
<=========----> 75% EXECUTING [6s]
> :run

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...