如何通过容器化 Livy 提交 Spark 作业

问题描述

我使用以下 repo 来运行 Spark (2.4.7) 和 Livy (0.7)。

repo 上显示的 curl 命令工作正常,似乎一切都已启动并正在运行。

我写了一个简单的字数统计 maven Spark Java 程序,并使用 Livy 客户端通过 Livy 将其作为 Spark 作业提交。

我的 Java 字数统计

package spark;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.livy.Job;
import org.apache.livy.JobContext;
import scala.Tuple2;

public final class JavaWordCount implements Job<Double> {

    private static final long serialVersionUID = 4870271814150948504L;
    private static final Pattern SPACE = Pattern.compile(" ");

    @Override
    public Double call(JobContext ctx) throws Exception {
        count(ctx);
        return 0.7;
    }

    public void count(JobContext ctx){

        //JavaRDD<String> lines = ctx.textFile(args[0],1);
        JavaRDD<String> lines = ctx.sc().parallelize(Arrays.asList("It is close to midnight and something evil is lurking in the dark".split(" ")));


        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String,String>() {
            @Override
            public Iterator<String> call(String s) {
                return Arrays.asList(SPACE.split(s)).iterator();
            }
        });

        JavaPairRDD<String,Integer> ones = words.mapToPair(new PairFunction<String,String,Integer>() {
            @Override
            public Tuple2<String,Integer> call(String s) {
                return new Tuple2<>(s,1);
            }
        });

        JavaPairRDD<String,Integer> counts = ones.reduceByKey(new Function2<Integer,Integer,Integer>() {
            @Override
            public Integer call(Integer i1,Integer i2) {
                return i1 + i2;
            }
        });

        List<Tuple2<String,Integer>> output = counts.collect();
        for (Tuple2<?,?> tuple : output) {
            System.out.println(tuple._1() + ": " + tuple._2());
        }
    }
}

我的 Livy 客户:

package spark;

import org.apache.livy.Job;
import org.apache.livy.LivyClient;
import org.apache.livy.LivyClientBuilder;

import java.io.File;
import java.net.URI;

public final class SubmitJob {

    private static final String livyURI = "http://localhost:8998/";
    private static final String JAR_PATH = "/Users/.../spark-word-count/target/word-count-0.1-SNAPSHOT.jar";

    public static void main(String[] args) throws Exception {
        LivyClient livyClient = new LivyClientBuilder()
        .setURI(new URI(livyURI)).build();
        
        try {
            System.err.printf("Uploading %s to the Spark context...\n",JAR_PATH);
            livyClient.uploadJar(new File(JAR_PATH));
            
            System.err.printf("Running JavaWordCount...\n",JAR_PATH);
            double pi = livyClient.submit(new JavaWordCount()).get();
        
            System.out.println("Pi is roughly: " + pi);
        } finally {
            livyClient.stop(true);
        }

    }
}

运行客户端时出现以下错误

线程“main”中的异常 java.util.concurrent.ExecutionException: java.lang.RuntimeException: org.apache.livy.shaded.kryo.kryo.KryoException:无法找到类: spark.JavaWordCount 在 org.apache.livy.shaded.kryo.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) 在 org.apache.livy.shaded.kryo.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)

线程“main”中的异常java.util.concurrent.ExecutionException: java.lang.RuntimeException: org.apache.livy.shaded.kryo.kryo.KryoException:无法找到类: spark.JavaWordCount 在 org.apache.livy.shaded.kryo.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) 在 org.apache.livy.shaded.kryo.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) 在 org.apache.livy.shaded.kryo.kryo.Kryo.readClass(Kryo.java:656) 在 org.apache.livy.shaded.kryo.kryo.Kryo.readClassAndobject(Kryo.java:767) 在 org.apache.livy.client.common.Serializer.deserialize(Serializer.java:63) 在 org.apache.livy.rsc.driver.BypassJob.call(BypassJob.java:39) 在 org.apache.livy.rsc.driver.BypassJob.call(BypassJob.java:27) 在 org.apache.livy.rsc.driver.JobWrapper.call(JobWrapper.java:64) 在 org.apache.livy.rsc.driver.BypassJobWrapper.call(BypassJobWrapper.java:45) 在 org.apache.livy.rsc.driver.BypassJobWrapper.call(BypassJobWrapper.java:27) 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748) 引起:java.lang.classNotFoundException:spark.JavaWordCount

我该如何解决这个错误

我读到它可能是因为 livy.file.local-dir-whitelist。 我的 Livy conf 白名单如下所示:

livy.file.local-dir-whitelist = /

我尝试将 Jar 上传到 Livy 容器并将其放在“/”下,并在我的客户端上更改了 JAR_PATH = "/word-count-0.1-SNAPSHOT.jar"。我收到同样的错误...

如何提交我的 Jar?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)