如何将数据帧传递到包含sparknlp函数的udf

问题描述

我正在尝试使用Spark-NLP预训练的管道来分析情感分析。我有twitter数据的数据框。我在UDF中使用了SparkNLP函数,然后传递twit dataframe列来获取所有记录的情绪。任何帮助或建议都会有所帮助。

pipeline = PretrainedPipeline('analyze_sentimentdl_use_twitter',lang='en') 


def sentiment_function(input_twit):
    senti_output = pipeline.annotate(input_twit)
    return "".join(senti_output['sentiment'])

sentiment_function_udf = udf(sentiment_function,StringType())

df = df_flat_user_profile_twitter.withColumn("sentiment",sentiment_function_udf(df_flat_user_profile_twitter['status_text']))

我正在跟踪错误

Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py",line 590,in dumps
    return cloudpickle.dumps(obj,2)
  File "/usr/hdp/current/spark2-client/python/pyspark/cloudpickle.py",line 863,in dumps
    cp.dump(obj)
  File "/usr/hdp/current/spark2-client/python/pyspark/cloudpickle.py",line 260,in dump
    return Pickler.dump(self,obj)
  File "/usr/lib/python3.6/pickle.py",line 409,in dump
    self.save(obj)
  File "/usr/lib/python3.6/pickle.py",line 476,in save
    f(self,obj) # Call unbound method with explicit self
  File "/usr/lib/python3.6/pickle.py",line 736,in save_tuple
    save(element)
  File "/usr/lib/python3.6/pickle.py",obj) # Call unbound method with explicit self
  File "/usr/hdp/current/spark2-client/python/pyspark/cloudpickle.py",line 400,in save_function
    self.save_function_tuple(obj)
  File "/usr/hdp/current/spark2-client/python/pyspark/cloudpickle.py",line 549,in save_function_tuple
    save(state)
  File "/usr/lib/python3.6/pickle.py",line 821,in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib/python3.6/pickle.py",line 847,in _batch_setitems
    save(v)
  File "/usr/lib/python3.6/pickle.py",line 852,line 521,in save
    self.save_reduce(obj=obj,*rv)
  File "/usr/lib/python3.6/pickle.py",line 634,in save_reduce
    save(state)
  File "/usr/lib/python3.6/pickle.py",line 781,in save_list
    self._batch_appends(obj)
  File "/usr/lib/python3.6/pickle.py",line 805,in _batch_appends
    save(x)
  File "/usr/lib/python3.6/pickle.py",line 496,in save
    rv = reduce(self.proto)
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",line 1257,in __call__
    answer,self.gateway_client,self.target_id,self.name)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py",line 63,in deco
    return f(*a,**kw)
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",line 332,in get_return_value
    format(target_id,".",name,value))
py4j.protocol.Py4JError: An error occurred while calling o113.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
        at py4j.reflection.ReflectionEngine.getmethod(ReflectionEngine.java:318)
        at py4j.reflection.ReflectionEngine.getmethod(ReflectionEngine.java:326)
        at py4j.Gateway.invoke(Gateway.java:274)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)


Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py",in __call__
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py",in get_return_value
py4j.protocol.Py4JError: An error occurred while calling o113.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
        at py4j.reflection.ReflectionEngine.getmethod(ReflectionEngine.java:318)
        at py4j.reflection.ReflectionEngine.getmethod(ReflectionEngine.java:326)
        at py4j.Gateway.invoke(Gateway.java:274)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)



During handling of the above exception,another exception occurred:

Traceback (most recent call last):
  File "<stdin>",line 1,in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/udf.py",line 189,in wrapper
    return self(*args)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/udf.py",line 167,in __call__
    judf = self._judf
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/udf.py",line 151,in _judf
    self._judf_placeholder = self._create_judf()
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/udf.py",line 160,in _create_judf
    wrapped_func = _wrap_function(sc,self.func,self.returnType)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/udf.py",line 35,in _wrap_function
    pickled_command,broadcast_vars,env,includes = _prepare_for_python_RDD(sc,command)
  File "/usr/hdp/current/spark2-client/python/pyspark/rdd.py",line 2420,in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
  File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py",line 600,in dumps
    raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o113.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
        at py4j.reflection.ReflectionEngine.getmethod(ReflectionEngine.java:318)
        at py4j.reflection.ReflectionEngine.getmethod(ReflectionEngine.java:326)
        at py4j.Gateway.invoke(Gateway.java:274)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...