scala – 使用Async HTTP调用的Spark作业

我从URL列表构建一个RDD,然后尝试使用一些异步http调用获取数据.
在进行其他计算之前我需要所有结果.
理想情况下,我需要在不同节点上进行http调用以进行缩放考虑.

我做了这样的事情:

//init spark
val sparkContext = new SparkContext(conf)
val datas = Seq[String]("url1","url2")

//create rdd
val rdd = sparkContext.parallelize[String](datas)

//httpCall return Future[String]
val requests = rdd.map((url: String) => httpCall(url))

//await all results (Future.sequence may be better)
val responses = requests.map(r => Await.result(r,10.seconds))

//print responses
response.collect().foreach((s: String) => println(s))

//stop spark
sparkContext.stop()

这项工作,但Spark工作永远不会完成!

所以我想知道使用Spark(或Future [RDD])处理Future的最佳实践是什么.

我认为这个用例看起来很常见,但还没有找到任何答案.

最好的祝福

解决方法

this use case looks pretty common

不是真的,因为它根本无法正常工作(可能).由于每个任务都在标准的Scala迭代器上运行,因此这些操作将被压缩在一起.这意味着所有操作都将在实践中阻塞.假设您有三个URL [“x”,“y”,“z”],您的代码将按以下顺序执行:

Await.result(httpCall("x",10.seconds))
Await.result(httpCall("y",10.seconds))
Await.result(httpCall("z",10.seconds))

您可以轻松地在本地重现相同的行为.如果要异步执行代码,则应使用mapPartitions显式处理:

rdd.mapPartitions(iter => {
  ??? // Submit requests
  ??? // Wait until all requests completed and return Iterator of results
})

但这比较棘手.无法保证给定分区的所有数据都适合内存,因此您可能也需要一些批处理机制.

所有这一切都说我无法重现你所描述的问题可能是一些配置问题或httpCall本身的问题.

在旁注上允许单个超时终止整个任务看起来不是一个好主意.

相关文章

共收录Twitter的14款开源软件,第1页Twitter的Emoji表情 Tw...
Java和Scala中关于==的区别Java:==比较两个变量本身的值,即...
本篇内容主要讲解“Scala怎么使用”,感兴趣的朋友不妨来看看...
这篇文章主要介绍“Scala是一种什么语言”,在日常操作中,相...
这篇文章主要介绍“Scala Trait怎么使用”,在日常操作中,相...
这篇文章主要介绍“Scala类型检查与模式匹配怎么使用”,在日...