CassandrSink的Flink作业失败,并写入错误

问题描述

我有两个简单的Flink流作业,这些作业从Kafka中读取,进行一些转换并将结果放入Cassandra Sink中。他们从不同的Kafka主题中阅读并保存到不同的Cassandra表中。

当我单独执行两个作业中的任何一个时,一切正常。触发并完成检查点,并将数据保存到Cassandra。

但是,每当我同时运行两个作业(或其中两个作业)时,第二个作业在启动时都会失败,但会出现以下异常: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query Failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [localhost/127.0.0.1] Error writing))

我找不到有关此错误的太多信息,这可能是由以下任何一种原因引起的:

  • Flink(v 1.10.0-scala_2.12),
  • Flink Cassandra连接器(flink-connector-cassandra_2.11:jar:1.10.2,也尝试过使用flink-connector-cassandra_2.12:jar:1.10.0),
  • Datastax基础驱动程序(v 3.10.2),
  • Cassandra v4.0(与v3.0相同)
  • 网络运输(v 4.1.51.Final)。

我还使用了可能与第一个有冲突的软件包:

  • mysql-connector-java(v 8.0.19),
  • cassandra-driver-extras(v 3.10.2)

最后,这是我给集群构建器的代码

ClusterBuilder builder = new ClusterBuilder() {
    @Override
    protected Cluster buildCluster(Cluster.Builder builder) {
        Cluster cluster = null;
        try {
            cluster = builder
                    .addContactPoint("localhost")
                    .withPort(9042)
                    .withClusterName("Test Cluster")
                    .withoutJMXReporting()
                    .withProtocolVersion(ProtocolVersion.V4)
                    .withoutMetrics()
                    .build();

            // register codecs from datastax extras.
            cluster.getConfiguration().getCodecRegistry()
                    .register(LocalTimeCodec.instance);
        } catch (ConfigurationException e) {
            e.printstacktrace();
        } catch (NoHostAvailableException nhae) {
            nhae.printstacktrace();
        }

        return cluster;
    }
};

我尝试使用不同的PoolingOptions和Socketoptions设置,但没有成功。

Cassandra Sink:

CassandraSink.addSink(daTarows)
.setQuery("insert into table_name_(16 columns names) " +
        "values (16 placeholders);")
.enableWriteAheadLog()
.setClusterBuilder(builder)
.setFailureHandler(new CassandraFailureHandler() {
    @Override
    public void onFailure(Throwable throwable) {
        LOG.error("A {} occurred.","Cassandra Failure",throwable);
    }
})
.build()
.setParallelism(1)
.name("Cassandra Sink For Unique Count every N minutes.");

来自flink作业管理器的完整跟踪日志:

com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query Failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [localhost/127.0.0.1] Error writing))
    at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
    at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
    at com.datastax.driver.core.Cluster.init(Cluster.java:162)
    at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
    at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
    at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
    at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.createSession(CassandraSinkBase.java:143)
    at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:87)
    at org.apache.flink.streaming.connectors.cassandra.AbstractCassandratupleSink.open(AbstractCassandratupleSink.java:49)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.base/java.lang.Thread.run(Thread.java:834)

感谢您的帮助。

编辑:

  • 我刚刚尝试使用两个Cassandra单独的实例(不同的机器和不同的集群)。然后,我将一个作业指向一个实例,将另一个作业指向另一个实例。什么都没改变,我仍然遇到同样的错误
  • 试图减少依赖性,这是新的pom文件
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional @R_94_4045@ion
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License,Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,software distributed under the License is distributed on an
"AS IS" BASIS,WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND,either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.abcde.ai</groupId>
    <artifactId>analytics-etl</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>Flink Quickstart Job</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.10.2</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided,because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-cassandra_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>commons-configuration</groupId>
            <artifactId>commons-configuration</artifactId>
            <version>1.10</version>
        </dependency>
        <!-- Add logging framework,to produce console output when running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR by default. -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>

            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>

            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the meta-inf folder.
                                    Otherwise,this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>meta-inf/*.SF</exclude>
                                        <exclude>meta-inf/*.DSA</exclude>
                                        <exclude>meta-inf/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.abcde.analytics.etl.KafkaUniqueCountsstreamingJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>

                <!-- This improves the out-of-the-Box experience in Eclipse by resolving some warnings. -->
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-shade-plugin</artifactId>
                                        <versionRange>[3.1.1,)</versionRange>
                                        <goals>
                                            <goal>shade</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
</project>

编辑: 我设法缩小了问题的范围。当我将提供的依赖项标记flink-connector-cassandra时,错误就得到解决,我只是从本地maven存储库(〜/ .m2 / repository / org / apache / flink / flink-connector-cassandra_2.11 / 1.10.2 / flink-connector-cassandra_2.11-1.10.2.jar)到Flink lib文件夹。我的问题解决了,但根本原因仍然是个谜。

解决方法

我可能是错的,但最有可能是由Netty客户端版本冲突引起的。错误状态为NoHostAvailableException,但是潜在错误为TransportException,错误消息为Error writing。卡桑德拉(Cassandra)的运转非常出色。

有一种类似的stackoverflow案例-Cassandra - error writing,具有非常相似的症状-单个项目运行良好,并且AllNodesFailedExceptionTransportException并带有Error writing消息再添加一个的根本原因。作者通过统一网络客户来解决此问题。

在您的情况下,我不确定为什么会有如此多的依赖项,因此我将尝试摆脱所有附加功能和库,而只留下Flink(v 1.10.0-scala_2.12)和Flink Cassandra Connector (flink-connector-cassandra_2.12:jar:1.10.0)库。它们必须已经包括必要的驱动程序,净值等。应该跳过所有其他驱动程序(至少对于初始迭代而言,以确保这可以解决问题并且与库冲突)。

,

要解决该错误,我将提供的依赖项flink-connector-cassandra标记为已提供,我只需从本地maven存储库(〜/ .m2 / repository / org / apache / flink / flink-connector-cassandra_2)复制jar文件。 11 / 1.10.2 / flink-connector-cassandra_2.11-1.10.2.jar)到Flink lib文件夹并重新启动Flink,这是我的新pom.xml文件:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-cassandra_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

我是怎么找到这个的?我将尝试使用较新的驱动程序版本从源代码编译连接器。首先,我尝试使用未更改的来源重现该错误。因此,我在不做任何更改的情况下对其进行了编译,将jar放入Flink lib文件夹中,万岁!然后我怀疑来自行家的罐子有什么不同。我将其复制到lib文件夹中,令我惊讶的是它也起作用。

我的问题已解决,但根本原因仍然是个谜。

我的最后一次尝试是检查是否有任何软件包与Cassandra连接器冲突,因此我运行dependency:tree -Dverboseorg.apache.flink:flink-metrics-dropwizard关于metrics-core有冲突:

[INFO] +- org.apache.flink:flink-connector-cassandra_2.12:jar:1.10.0:provided
[INFO] |  +- (io.dropwizard.metrics:metrics-core:jar:3.1.2:provided - omitted for conflict with 3.1.5)
[INFO] |  \- (org.apache.flink:force-shading:jar:1.10.0:provided - omitted for duplicate)

我从项目中删除了此依赖关系,但是如果未将连接器标记为提供的连接器,也将其放置在lib文件夹中,则错误仍然存​​在。