PySpark无法在hyperopt SparkTrials中评估keras神经网络

问题描述

我有一个奇怪的虫子,我被困了几天,无法解决。

我的目标是在hyperopt中评估几个keras nns。为了提高评估过程,我使用了SparkTrails(另请参见http://hyperopt.github.io/hyperopt/scaleout/spark/)。对于所有scikit-learn回归器来说,这都可以正常工作。但是每次我使用keras nn时,都会对模型进行评估,但不会返回结果。以下是我收到的错误消息:

20/11/11 11:30:56 ERROR TaskSetManager: Task 0 in stage 9.0 failed 1 times; aborting job
trial task 9 failed,exception is An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServeWithJobGroup.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times,most recent failure: Lost task 0.0 in stage 9.0 (TID 9,path executor driver): java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:210)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
    at java.io.DataInputStream.readInt(DataInputStream.java:387)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:628)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
    at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
    at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
    at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
    at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2164)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
    at org.apache.spark.api.python.PythonRDD$.collectAndServeWithJobGroup(PythonRDD.scala:183)
    at org.apache.spark.api.python.PythonRDD.collectAndServeWithJobGroup(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:210)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
    at java.io.DataInputStream.readInt(DataInputStream.java:387)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:628)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
    at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
    at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
    at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
    at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
.
 None
  0%|          | 0/10 [00:41<?,?trial/s,best loss=?]
Total Trials: 10: 0 succeeded,10 failed,0 cancelled.
Traceback (most recent call last):
  File "E:/Git/SystemidentificationTool/play/mini_example.py",line 68,in <module>
    best = fmin(f_nn,space,algo=tpe.suggest,max_evals=10,trials=trials)
  File "E:\venv\lib\site-packages\hyperopt\fmin.py",line 522,in fmin
    trials_save_file=trials_save_file,File "E:\venv\lib\site-packages\hyperopt\spark.py",line 274,in fmin
    raise e
  File "E:\venv\lib\site-packages\hyperopt\spark.py",line 270,in fmin
    trials_save_file="",# not supported
  File "E:\venv\lib\site-packages\hyperopt\fmin.py",line 558,in fmin
    "There are no evaluation tasks,cannot return argmin of task losses."
Exception: There are no evaluation tasks,cannot return argmin of task losses.

我建立了一个最小的代码示例,可以使用以下方法重新创建错误:

from hyperopt import fmin,tpe,hp,STATUS_OK,Trials,SparkTrials
from sklearn.metrics import mean_squared_error
import sys
import numpy as np
from pandas import DataFrame
time = np.arange(0,100,0.1)


X = DataFrame(np.sin(time))
y = DataFrame(np.cos(time))
X_val = DataFrame(np.sin(time))
y_val = DataFrame(np.cos(time))

space = {'choice': hp.choice('num_layers',[ {'layers':'two',},{'layers':'three','units3': hp.uniform('units3',64,1024),'dropout3': hp.uniform('dropout3',.25,.75)}
                ]),'units': hp.uniform('units','units1': hp.uniform('units1','units2': hp.uniform('units2','dropout1': hp.uniform('dropout1',.75),'dropout2': hp.uniform('dropout2','batch_size' : 28,'nb_epochs' :  2,'optimizer': hp.choice('optimizer',['adadelta','adam','rmsprop']),'activation': 'relu'
    }

def f_nn(params):
    from keras.models import Sequential
    from keras.layers.core import Dense,Dropout,Activation
    from keras.optimizers import Adadelta,Adam

    print ('Params testing: ',params)
    model = Sequential()
    model.add(Dense(params['units1'],input_dim = X.shape[1]))
    model.add(Activation(params['activation']))
    model.add(Dropout(params['dropout1']))

    model.add(Dense(params['units2'],kernel_initializer="glorot_uniform"))
    model.add(Activation(params['activation']))
    model.add(Dropout(params['dropout2']))

    if params['choice']['layers']== 'three':
        model.add(Dense(params['choice']['units3'],kernel_initializer="glorot_uniform"))
        model.add(Activation(params['activation']))
        model.add(Dropout(params['choice']['dropout3']))

    model.add(Dense(1))
    model.add(Activation('sigmoid'))
    model.compile(loss='binary_crossentropy',optimizer=params['optimizer'])

    model.fit(X,y,epochs=params['nb_epochs'],batch_size=params['batch_size'])

    pred_auc =model.predict(X_val)
    acc = mean_squared_error(y_val,pred_auc)
    print('AUC:',acc)
    sys.stdout.flush()
    return {'loss': -acc,'status': STATUS_OK}


trials = SparkTrials()
best = fmin(f_nn,trials=trials)
print('best: ',best)

我的系统设置如下: 的Python 3.7 超级opt 0.2.5 keras 2.4.3 张量流2.3.1 pyspark 3.0.1

我使用Java版本: openjdk版本“ 1.8.0_272” OpenJDK运行时环境(AdoptOpenJDK)(内部版本1.8.0_272-b10) OpenJDK 64位服务器VM(AdoptOpenJDK)(内部版本25.272-b10,混合模式)

和星火 版本3.0.1

有人可以帮助我吗?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...