问题描述
我正在尝试使用Java Spark程序中的sql API从Azure Cosmos DB读取数据。
public class CosmosDBTest {
public static void main(String[] args) {
// Logger.getLogger("org.apache").setLevel(Level.WARN);
SparkSession sc = SparkSession.builder().appName("TestCosmosDB").master("local[*]")
.config("spark.debug.maxToStringFields",10000)
.getorCreate();
Dataset<Row> ds = sc.read().format("com.microsoft.azure.cosmosdb.spark").options(getCosmosConfigMap("<CosmosEndpoint>","<MasterKey>","<Database>","<Collection>","<sql Query>")).load();
ds.foreach(row -> {
System.out.println(row);
});
ds.printSchema();
System.out.println("Stopping spark session");
sc.stop();
System.out.println("Spark session stopped");
}
public static scala.collection.Map getCosmosConfigMap(String endpoint,String masterKey,String database,String collection,String query) {
Map<String,String> configMap = new HashMap<>();
configMap.put("Endpoint",endpoint);
configMap.put("Masterkey",masterKey);
configMap.put("Database",database);
configMap.put("Collection",collection);
configMap.put("query_custom",query);
scala.collection.Map<String,String> scalaConfigMap = JavaConverters.mapAsScalaMapConverter(configMap).asScala().toMap(
Predef.<Tuple2<String,String>>conforms()
);
return scalaConfigMap;
}
}
它绝对正常。但是程序不会终止。我在日志中看到SparkContext已终止,但Java程序将永远运行。好像某个插座或某些东西仍在打开。我很确定这与Cosmos阅读器有关,但我无法找出确切的问题。
任何见识。
谢谢。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)