问题描述
我有一个奇怪的虫子,我被困了几天,无法解决。
我的目标是在hyperopt中评估几个keras nns。为了提高评估过程,我使用了SparkTrails(另请参见http://hyperopt.github.io/hyperopt/scaleout/spark/)。对于所有scikit-learn回归器来说,这都可以正常工作。但是每次我使用keras nn时,都会对模型进行评估,但不会返回结果。以下是我收到的错误消息:
20/11/11 11:30:56 ERROR TaskSetManager: Task 0 in stage 9.0 failed 1 times; aborting job
trial task 9 failed,exception is An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServeWithJobGroup.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times,most recent failure: Lost task 0.0 in stage 9.0 (TID 9,path executor driver): java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:210)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:628)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2164)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
at org.apache.spark.api.python.PythonRDD$.collectAndServeWithJobGroup(PythonRDD.scala:183)
at org.apache.spark.api.python.PythonRDD.collectAndServeWithJobGroup(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:210)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:628)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
.
None
0%| | 0/10 [00:41<?,?trial/s,best loss=?]
Total Trials: 10: 0 succeeded,10 failed,0 cancelled.
Traceback (most recent call last):
File "E:/Git/SystemidentificationTool/play/mini_example.py",line 68,in <module>
best = fmin(f_nn,space,algo=tpe.suggest,max_evals=10,trials=trials)
File "E:\venv\lib\site-packages\hyperopt\fmin.py",line 522,in fmin
trials_save_file=trials_save_file,File "E:\venv\lib\site-packages\hyperopt\spark.py",line 274,in fmin
raise e
File "E:\venv\lib\site-packages\hyperopt\spark.py",line 270,in fmin
trials_save_file="",# not supported
File "E:\venv\lib\site-packages\hyperopt\fmin.py",line 558,in fmin
"There are no evaluation tasks,cannot return argmin of task losses."
Exception: There are no evaluation tasks,cannot return argmin of task losses.
我建立了一个最小的代码示例,可以使用以下方法重新创建错误:
from hyperopt import fmin,tpe,hp,STATUS_OK,Trials,SparkTrials
from sklearn.metrics import mean_squared_error
import sys
import numpy as np
from pandas import DataFrame
time = np.arange(0,100,0.1)
X = DataFrame(np.sin(time))
y = DataFrame(np.cos(time))
X_val = DataFrame(np.sin(time))
y_val = DataFrame(np.cos(time))
space = {'choice': hp.choice('num_layers',[ {'layers':'two',},{'layers':'three','units3': hp.uniform('units3',64,1024),'dropout3': hp.uniform('dropout3',.25,.75)}
]),'units': hp.uniform('units','units1': hp.uniform('units1','units2': hp.uniform('units2','dropout1': hp.uniform('dropout1',.75),'dropout2': hp.uniform('dropout2','batch_size' : 28,'nb_epochs' : 2,'optimizer': hp.choice('optimizer',['adadelta','adam','rmsprop']),'activation': 'relu'
}
def f_nn(params):
from keras.models import Sequential
from keras.layers.core import Dense,Dropout,Activation
from keras.optimizers import Adadelta,Adam
print ('Params testing: ',params)
model = Sequential()
model.add(Dense(params['units1'],input_dim = X.shape[1]))
model.add(Activation(params['activation']))
model.add(Dropout(params['dropout1']))
model.add(Dense(params['units2'],kernel_initializer="glorot_uniform"))
model.add(Activation(params['activation']))
model.add(Dropout(params['dropout2']))
if params['choice']['layers']== 'three':
model.add(Dense(params['choice']['units3'],kernel_initializer="glorot_uniform"))
model.add(Activation(params['activation']))
model.add(Dropout(params['choice']['dropout3']))
model.add(Dense(1))
model.add(Activation('sigmoid'))
model.compile(loss='binary_crossentropy',optimizer=params['optimizer'])
model.fit(X,y,epochs=params['nb_epochs'],batch_size=params['batch_size'])
pred_auc =model.predict(X_val)
acc = mean_squared_error(y_val,pred_auc)
print('AUC:',acc)
sys.stdout.flush()
return {'loss': -acc,'status': STATUS_OK}
trials = SparkTrials()
best = fmin(f_nn,trials=trials)
print('best: ',best)
我的系统设置如下: 的Python 3.7 超级opt 0.2.5 keras 2.4.3 张量流2.3.1 pyspark 3.0.1
我使用Java版本: openjdk版本“ 1.8.0_272” OpenJDK运行时环境(AdoptOpenJDK)(内部版本1.8.0_272-b10) OpenJDK 64位服务器VM(AdoptOpenJDK)(内部版本25.272-b10,混合模式)
和星火 版本3.0.1
有人可以帮助我吗?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)