如何使用带有Spark的JDBC驱动程序读取Druid数据?

问题描述

如何使用Spark和Avatica JDBC Driver从Druid读取数据? This is avatica JDBC document

使用python和Jaydebeapi模块从Druid读取数据,我成功完成,如下面的代码

$ python
import jaydebeapi

conn = jaydebeapi.connect("org.apache.calcite.avatica.remote.Driver","jdbc:avatica:remote:url=http://0.0.0.0:8082/druid/v2/sql/avatica/",{"user": "druid","password":"druid"},"/root/avatica-1.17.0.jar",)
cur = conn.cursor()
cur.execute("SELECT * FROM informatION_SCHEMA.TABLES")
cur.fetchall()

输出为:

[('druid','druid','wikipedia','TABLE'),('druid','informatION_SCHEMA','COLUMNS','SYstem_TABLE'),'SCHEMATA','TABLES','sys','segments','server_segments','servers','supervisors','tasks','SYstem_TABLE')]  -> default tables

但是我想使用spark和JDBC进行阅读。

我尝试过,但是使用如下代码的spark会出现问题。

$ pyspark --jars /root/avatica-1.17.0.jar

df = spark.read.format('jdbc') \
    .option('url','jdbc:avatica:remote:url=http://0.0.0.0:8082/druid/v2/sql/avatica/') \
    .option("dbtable",'informatION_SCHEMA.TABLES') \
    .option('user','druid') \
    .option('password','druid') \
    .option('driver','org.apache.calcite.avatica.remote.Driver') \
    .load()

输出为:

Traceback (most recent call last):
  File "<stdin>",line 8,in <module>
  File "/root/spark-2.4.4-bin-hadoop2.7/python/pyspark/sql/readwriter.py",line 172,in load
    return self._df(self._jreader.load())
  File "/root/spark-2.4.4-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",line 1257,in __call__
  File "/root/spark-2.4.4-bin-hadoop2.7/python/pyspark/sql/utils.py",line 63,in deco
    return f(*a,**kw)
  File "/root/spark-2.4.4-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",line 328,in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o2999.load.
: java.sql.sqlException: While closing connection
...
Caused by: java.lang.RuntimeException: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "rpcMetadata" (class org.apache.calcite.avatica.remote.Service$CloseConnectionResponse),not marked as ignorable (0 kNown properties: ])
 at [Source: {"response":"closeConnection","rpcMetadata":{"response":"rpcMetadata","serverAddress":"172.18.0.7:8082"}}
; line: 1,column: 46]
...
Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "rpcMetadata" (class org.apache.calcite.avatica.remote.Service$CloseConnectionResponse),column: 46] 
...

注意:

解决方法

我找到了解决此问题的另一种方法。我用spark-druid-connector将德鲁伊与火花连接。

但是我更改了一些代码,例如this,以在我的环境中使用此代码。

这是我的环境:

  • 火花:2.4.4
  • 斯卡拉:2.11.12
  • python:python 3.6.8
  • 德鲁伊:
    • 动物园管理员:3.5
    • 德鲁伊:0.17.0

但是,它有问题。

  • 如果您至少一次使用spark-druid-connector,则将从以下计划中使用的所有SQL查询(如spark.sql("select * from tmep_view"))输入到此计划器中。
  • 但是,如果您使用数据框的api,例如df.distinct().count(),那么就没有问题。我还没解决。
,

我尝试使用 spark-shell:

./bin/spark-shell --driver-class-path avatica-1.17.0.jar --jars avatica-1.17.0.jar

val jdbcDF = spark.read.format("jdbc")
    .option("url","jdbc:avatica:remote:url=http://0.0.0.0:8082/druid/v2/sql/avatica/")
    .option("dbtable","INFORMATION_SCHEMA.TABLES")
    .option("user","druid")
    .option("password","druid")
    .load()

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...