问题描述
我正在尝试使用 RDKit 生成分子描述符,然后使用 Spark 对它们进行机器学习。我设法生成了描述符,并且找到了 the following code for doing Random Forest。该代码从以 svmlight 格式存储的文件加载数据帧,我可以使用 dump_svmlight_file
创建这样的文件,但写入文件感觉不是很“Sparky”。
我已经走到这一步了:
from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit.Chem import DataStructs
import numpy as np
from sklearn.datasets import dump_svmlight_file
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SimpleApp").getorCreate()
df = spark.read.option("header","true")\
.option("delimiter",'\t').csv("acd_logd_100.smiles")
mols = df.select("canonical_smiles").rdd.flatMap(lambda x : x)\
.map(lambda x: Chem.MolFromSmiles(x))\
.map(lambda x: AllChem.GetMorganFingerprintAsBitVect(x,2,nBits=1024))\
.map(lambda x: np.array(x))
spark.createDataFrame(mols)
但显然我不能从我的 np.arrays 的 RDD 像这样创建一个 DataFrame。 (我收到一条关于 ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()
的奇怪错误消息)。
我想我还需要添加 y 值,并以某种方式告诉随机森林实现数据帧中的 x 和 y 是什么,但我还无法从这些数据中创建数据帧。如何做到这一点?
编辑:
我试图通过 pyspark.ml.linalg.Vectors
松散地基于 Creating Spark dataframe from numpy matrix 创建一个数据框,但我似乎无法像这样创建一个 Vector:
from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit.Chem import DataStructs
import numpy as np
from sklearn.datasets import dump_svmlight_file
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
spark = SparkSession.builder.appName("SimpleApp").getorCreate()
df = spark.read.option("header",nBits=1024))\
.map(lambda x: np.array(x))\
.map(lambda x: Vectors.sparse(x))
print(mols.take(5))
mydf = spark.createDataFrame(mols,schema=["features"])
我明白了:
TypeError: only size-1 arrays can be converted to Python scalars
我完全不明白。
解决方法
所以如果你在这里找到了自己的方式,我想我会分享我最终的结果。最后我选择了密集向量,因为它更容易。我从 RDKit 向量中想出的唯一方法是首先创建一个 numpy.array
,然后从中创建一个 Spark Vectors.dense
。我也意识到我需要在整个转换过程中拖动 y 值,显然一旦 x 值被整理出来,你就不能将该列添加到最后的 ataframe 中,因此是复杂的元组。
from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit.Chem import DataStructs
import numpy as np
from sklearn.datasets import dump_svmlight_file
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
df = spark.read.option("header","true")\
.option("delimiter",'\t').csv("acd_logd_100.smiles")
print(df.select("canonical_smiles","acd_logd").rdd)
data = df.select("canonical_smiles","acd_logd").rdd.map( lambda row: (row.canonical_smiles,float(row.acd_logd)) )\
.map( lambda x: (Chem.MolFromSmiles(x[0]),x[1]) )\
.map( lambda x: (AllChem.GetMorganFingerprintAsBitVect(x[0],2,nBits=1024),x[1]) )\
.map( lambda x: (np.array(x[0]),x[1]) )\
.map( lambda x: (Vectors.dense(x[0].tolist()),x[1]) )\
.map( lambda x: (x[0],x[1]))\
.toDF(["features","label"] )
# Automatically identify categorical features,and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
VectorIndexer(inputCol="features",outputCol="indexedFeatures",maxCategories=4).fit(data)
# Split the data into training and test sets (30% held out for testing)
(trainingData,testData) = data.randomSplit([0.7,0.3])
# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol="indexedFeatures")
# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[featureIndexer,rf])
# Train model. This also runs the indexer.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("prediction","label","features").show(5)
# Select (prediction,true label) and compute test error
evaluator = RegressionEvaluator(
labelCol="label",predictionCol="prediction",metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
rfModel = model.stages[1]
print(rfModel) # summary only
spark.stop()