问题描述
我正在尝试使用本地机器上的 Apache Hudi 将数据写入 S3。我可以看到数据已成功写入,并且还创建了 .hoodie
目录。但是,当 Hudi 尝试发送远程压缩请求时,我看到超时异常。我什至不确定该主机在尝试发送压缩请求的地方究竟是什么。任何帮助表示赞赏。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.spark.{SparkConf,SparkContext}
object TestAppRunner{
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("HudiRunner")
.setMaster("local[2]")
.set("spark.driver.bindAddress","127.0.0.1")
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
val sparkContext = new SparkContext(conf)
sparkContext.hadoopConfiguration.set("fs.s3a.access.key","access_key")
sparkContext.hadoopConfiguration.set("fs.s3a.secret.key","secret_key")
sparkContext.hadoopConfiguration.set("fs.s3a.endpoint","s3.amazonaws.com")
val spark = SparkSession.builder.config(sparkContext.getConf).getorCreate()
val basePath = "s3a://my-bucket/hudi/test/"
val hudioptions = Map[String,String](
HoodieWriteConfig.TABLE_NAME -> "test_user_login",DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "copY_ON_WRITE",DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "user_id",DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_time",DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_login_time"
)
val df = spark.read.format("csv").option("header","true").load("feature-store-consumer/src/test/resources/test_data.csv")
df.write
.format("org.apache.hudi")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.options(hudioptions)
.mode(SaveMode.Overwrite)
.save(basePath)
}
}
test_data.csv:
user_id,user_name,creation_time,last_login_time
100,Smith,2015-01-01,2015-01-01T13:51:39.340396Z
101,Sam,2015-01-01T12:14:58.597216Z
102,Tom,2015-01-01T13:51:40.417052Z
103,Maxi,2015-01-01T13:51:40.519832Z
104,Ash,2015-01-02,2015-01-01T12:15:00.512679Z
105,Pam,2015-01-01T13:51:42.248818Z
日志:
60464 [main] INFO org.apache.hudi.common.table.view.FileSystemViewManager - Creating View Manager with storage type :REMOTE_FirsT
60464 [main] INFO org.apache.hudi.common.table.view.FileSystemViewManager - Creating remote first table view
60470 [main] INFO org.apache.hudi.common.table.view.FileSystemViewManager - Creating remote view for basePath s3a://bucket_name/hudi/test/. Server=120.255.123.180:58547
60471 [main] INFO org.apache.hudi.common.table.view.FileSystemViewManager - Creating InMemory based view for basePath s3a://bucket_name/hudi/test/
60810 [main] INFO org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView - Sending request : (http://120.255.123.180:58547/v1/hoodie/view/compactions/pending/?basepath=s3a%3A%2F%2Fbucket_name%2Fhudi%2Ftest%2F&lastinstantts=20210118223222&timelinehash=445d2b001605b8c0770ee065281c50f2f3b2cb431837e982b72aaf030bec4b86)
136198 [main] ERROR org.apache.hudi.common.table.view.PriorityBasedFileSystemView - Got error running preferred function. Trying secondary
org.apache.hudi.exception.Hoodieremoteexception: Connect to 120.255.123.180:58547 [/120.255.123.180] Failed: Operation timed out (Connection timed out)
at org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.getPendingCompactionoperations(RemoteHoodieTableFileSystemView.java:375)
at org.apache.hudi.common.table.view.PriorityBasedFileSystemView.execute(PriorityBasedFileSystemView.java:65)
at org.apache.hudi.common.table.view.PriorityBasedFileSystemView.getPendingCompactionoperations(PriorityBasedFileSystemView.java:198)
at org.apache.hudi.table.action.clean.CleanPlanner.<init>(CleanPlanner.java:86)
at org.apache.hudi.table.action.clean.CleanActionExecutor.requestClean(CleanActionExecutor.java:74)
at org.apache.hudi.table.action.clean.CleanActionExecutor.requestClean(CleanActionExecutor.java:201)
at org.apache.hudi.table.action.clean.CleanActionExecutor.execute(CleanActionExecutor.java:286)
at org.apache.hudi.table.HoodiecopyOnWriteTable.clean(HoodiecopyOnWriteTable.java:188)
at org.apache.hudi.client.HoodieWriteClient.clean(HoodieWriteClient.java:554)
at org.apache.hudi.client.HoodieWriteClient.autoCleanOnCommit(HoodieWriteClient.java:409)
at org.apache.hudi.client.HoodieWriteClient.postCommit(HoodieWriteClient.java:382)
at org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:126)
at org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:99)
at org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:90)
at org.apache.hudi.HoodieSparksqlWriter$.commitAndPerformPostOperations(HoodieSparksqlWriter.scala:395)
at org.apache.hudi.HoodieSparksqlWriter$.write(HoodieSparksqlWriter.scala:205)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:125)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runcommand$1.apply(DataFrameWriter.scala:677)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runcommand$1.apply(DataFrameWriter.scala:677)
at org.apache.spark.sql.execution.sqlExecution$$anonfun$withNewExecutionId$1.apply(sqlExecution.scala:80)
at org.apache.spark.sql.execution.sqlExecution$.withsqlConfPropagated(sqlExecution.scala:127)
at org.apache.spark.sql.execution.sqlExecution$.withNewExecutionId(sqlExecution.scala:75)
at org.apache.spark.sql.DataFrameWriter.runcommand(DataFrameWriter.scala:677)
at org.apache.spark.sql.DataFrameWriter.savetoV1Source(DataFrameWriter.scala:286)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:272)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:230)
at TestAppRunner$.main(TestAppRunner.scala:47)
at TestAppRunner.main(TestAppRunner.scala)
Caused by: org.apache.http.conn.HttpHostConnectException: Connect to 120.255.123.180:58547 [/120.255.123.180] Failed: Operation timed out (Connection timed out)
at org.apache.http.impl.conn.HttpClientConnectionoperator.connect(HttpClientConnectionoperator.java:142)
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:319)
at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:363)
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:219)
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:195)
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:86)
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:108)
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:106)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
at org.apache.http.client.fluent.Request.execute(Request.java:144)
at org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.executeRequest(RemoteHoodieTableFileSystemView.java:153)
at org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.getPendingCompactionoperations(RemoteHoodieTableFileSystemView.java:371)
... 39 more
Caused by: java.net.ConnectException: Operation timed out (Connection timed out)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.socksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.socket.connect(Socket.java:607)
at org.apache.http.conn.socket.PlainConnectionSocketFactory.connectSocket(PlainConnectionSocketFactory.java:72)
at org.apache.http.impl.conn.HttpClientConnectionoperator.connect(HttpClientConnectionoperator.java:125)
... 52 more
136203 [main] INFO org.apache.hudi.table.action.clean.CleanPlanner - No earliest commit to retain. No need to scan partitions !!
136203 [main] INFO org.apache.hudi.table.action.clean.CleanActionExecutor - nothing to clean here. It is already clean
136214 [main] INFO org.apache.hudi.client.AbstractHoodieWriteClient - Committed 20210118223222
136214 [main] INFO org.apache.hudi.HoodieSparksqlWriter$ - Commit 20210118223222 successful!
136214 [main] INFO org.apache.hudi.HoodieSparksqlWriter$ - Config.isInlineCompaction ? false
136215 [main] INFO org.apache.hudi.HoodieSparksqlWriter$ - Compaction Scheduled is Option{val=null}
136216 [main] INFO org.apache.hudi.HoodieSparksqlWriter$ - Is Async Compaction Enabled ? false
136216 [main] INFO org.apache.hudi.client.AbstractHoodieClient - Stopping Timeline service !!
136216 [main] INFO org.apache.hudi.client.embedded.EmbeddedTimelineservice - Closing Timeline server
136216 [main] INFO org.apache.hudi.timeline.service.Timelineservice - Closing Timeline Service
136227 [main] INFO org.apache.hudi.timeline.service.Timelineservice - Closed Timeline Service
136227 [main] INFO org.apache.hudi.client.embedded.EmbeddedTimelineservice - Closed Timeline server
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)