问题描述
我使用以下 repo 来运行 Spark (2.4.7) 和 Livy (0.7)。
repo 上显示的 curl 命令工作正常,似乎一切都已启动并正在运行。
我写了一个简单的字数统计 maven Spark Java 程序,并使用 Livy 客户端通过 Livy 将其作为 Spark 作业提交。
我的 Java 字数统计:
package spark;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.livy.Job;
import org.apache.livy.JobContext;
import scala.Tuple2;
public final class JavaWordCount implements Job<Double> {
private static final long serialVersionUID = 4870271814150948504L;
private static final Pattern SPACE = Pattern.compile(" ");
@Override
public Double call(JobContext ctx) throws Exception {
count(ctx);
return 0.7;
}
public void count(JobContext ctx){
//JavaRDD<String> lines = ctx.textFile(args[0],1);
JavaRDD<String> lines = ctx.sc().parallelize(Arrays.asList("It is close to midnight and something evil is lurking in the dark".split(" ")));
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String,String>() {
@Override
public Iterator<String> call(String s) {
return Arrays.asList(SPACE.split(s)).iterator();
}
});
JavaPairRDD<String,Integer> ones = words.mapToPair(new PairFunction<String,String,Integer>() {
@Override
public Tuple2<String,Integer> call(String s) {
return new Tuple2<>(s,1);
}
});
JavaPairRDD<String,Integer> counts = ones.reduceByKey(new Function2<Integer,Integer,Integer>() {
@Override
public Integer call(Integer i1,Integer i2) {
return i1 + i2;
}
});
List<Tuple2<String,Integer>> output = counts.collect();
for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}
}
}
我的 Livy 客户:
package spark;
import org.apache.livy.Job;
import org.apache.livy.LivyClient;
import org.apache.livy.LivyClientBuilder;
import java.io.File;
import java.net.URI;
public final class SubmitJob {
private static final String livyURI = "http://localhost:8998/";
private static final String JAR_PATH = "/Users/.../spark-word-count/target/word-count-0.1-SNAPSHOT.jar";
public static void main(String[] args) throws Exception {
LivyClient livyClient = new LivyClientBuilder()
.setURI(new URI(livyURI)).build();
try {
System.err.printf("Uploading %s to the Spark context...\n",JAR_PATH);
livyClient.uploadJar(new File(JAR_PATH));
System.err.printf("Running JavaWordCount...\n",JAR_PATH);
double pi = livyClient.submit(new JavaWordCount()).get();
System.out.println("Pi is roughly: " + pi);
} finally {
livyClient.stop(true);
}
}
}
运行客户端时出现以下错误:
线程“main”中的异常 java.util.concurrent.ExecutionException: java.lang.RuntimeException: org.apache.livy.shaded.kryo.kryo.KryoException:无法找到类: spark.JavaWordCount 在 org.apache.livy.shaded.kryo.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) 在 org.apache.livy.shaded.kryo.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
线程“main”中的异常java.util.concurrent.ExecutionException: java.lang.RuntimeException: org.apache.livy.shaded.kryo.kryo.KryoException:无法找到类: spark.JavaWordCount 在 org.apache.livy.shaded.kryo.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) 在 org.apache.livy.shaded.kryo.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) 在 org.apache.livy.shaded.kryo.kryo.Kryo.readClass(Kryo.java:656) 在 org.apache.livy.shaded.kryo.kryo.Kryo.readClassAndobject(Kryo.java:767) 在 org.apache.livy.client.common.Serializer.deserialize(Serializer.java:63) 在 org.apache.livy.rsc.driver.BypassJob.call(BypassJob.java:39) 在 org.apache.livy.rsc.driver.BypassJob.call(BypassJob.java:27) 在 org.apache.livy.rsc.driver.JobWrapper.call(JobWrapper.java:64) 在 org.apache.livy.rsc.driver.BypassJobWrapper.call(BypassJobWrapper.java:45) 在 org.apache.livy.rsc.driver.BypassJobWrapper.call(BypassJobWrapper.java:27) 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748) 引起:java.lang.classNotFoundException:spark.JavaWordCount
我读到它可能是因为 livy.file.local-dir-whitelist。 我的 Livy conf 白名单如下所示:
livy.file.local-dir-whitelist = /
我尝试将 Jar 上传到 Livy 容器并将其放在“/”下,并在我的客户端上更改了 JAR_PATH = "/word-count-0.1-SNAPSHOT.jar"。我收到同样的错误...
如何提交我的 Jar?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)