Spark 流 - 从 S3/COS 读取文件失败

问题描述

我正在尝试提交在 Kubernetes 集群上运行的 wordcount spark 作业,如下面的命令所示。

./bin/spark-submit \
    --master k8s://https://c111.us-south.containers.cloud.ibm.com:32206 \
    --deploy-mode cluster \
    --name spark-pi \
    --class org.apache.spark.examples.SparkPi --packages com.ibm.stocator:stocator:1.1.3 \
    --conf spark.executor.instances=5 --conf spark.hadoop.fs.cos.myobjectstorage.access.key= --conf spark.hadoop.fs.cos.myobjectstorage.secret.key= --conf spark.hadoop.fs.stocator.scheme.list=cos --conf spark.hadoop.fs.cos.impl=com.ibm.stocator.fs.ObjectStoreFileSystem --conf spark.hadoop.fs.stocator.cos.impl=com.ibm.stocator.fs.cos.COSapiclient --conf spark.hadoop.fs.stocator.cos.scheme=cos --conf spark.jars.ivy=/tmp/.ivy\
    --conf spark.kubernetes.container.image=us.icr.io/mods15/spark-py:v1 --conf spark.hadoop.fs.cos.myobjectstorage.endpoint=http://s3.us.cloud-object-storage.appdomain.cloud --conf spark.hadoop.fs.cos.myobjectstorage.v2.signer.type=false --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark local:///opt/spark/examples/src/main/python/wordcount.py cos://vmac-code-engine-bucket.myobjectstorage/book.txt

一切正常,直到我将 Spark Streaming 添加到 Python 代码示例中,如下所示。在 wordcount.py 文件中,我尝试从存储在 COS 上的文件中读取数据并通过 StreamingContext.textFileStream 创建 DStream 作为 fileStream在 Python 中不可用。我在日志中没有看到任何错误,但写入 COS 文件夹的输出是空的(没有任何字数统计)。

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import time 

def time_in_seconds():
    seconds=time.time()
    return seconds

timeInSeconds=time_in_seconds()
sc = SparkContext("local[2]","WordCount")
ssc = StreamingContext(sc,60)
lines = ssc.textFileStream("cos://COS_BUCKET_NAME.COS_SERVICE_NAME/ES_TOPIC_NAME/")
# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))
# Count each word in each batch
pairs = words.map(lambda word: (word,1))
wordCounts = pairs.reduceByKey(lambda x,y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()

wordCounts.saveAsTextFiles(f"cos://COS_BUCKET_NAME.COS_SERVICE_NAME/results/wordcount-result-{timeInSeconds}")

ssc.start()
ssc.awaitTermination()

我找不到有关如何在 Kubernetes 上运行 Spark Streaming 的任何文档。我假设从 COS 存储桶读取失败。我是否遗漏了命令或 Python wordcount 示例中的任何内容

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)