实验介绍
数据采用Criteo display Ads。这个数据一共11G,有13个integer features,26个categorical features。
Spark
由于数据比较大,且只在一个txt文件,处理前用@H_404_8@split -l 400000 train.txt对数据进行切分。
连续型数据利用log进行变换,因为从实时训练的角度上来判断,一般的标准化方式,如Z-score和最大最小标准化中用到的值都跟某一批数据的整体统计结果有关,换一批数据后标准化就程度就不一样了。
而对于离散型分类数据,一般企业应该都会有类别表而不需要自己从数据中获取(这样能节省计算时间,而且流处理下只能针对特定批量或者时间段出现的数据进行数字编码,所以对超出该批量和时间的新类别就无法进行编码了)。虽然如此,如果在离线情况且真的需要自己从数据中提取类别并进行编码,比如现在这种情况,最直接的方法是使用ML模块的StringIndexer。这个工具方面使用,但是对于数据类别过多或者需要进行编码的列数量较多时容易出现OOM。通过StringIndexer的源码可以知道,它的实现是先利用rdd的countByValue得出特定列的统计map,然后出现频率最大的编码为0,第二的为1,如此类推。另外,它会copy这个map,而StringIndexer本身并没有提供删除这个map的方法,所以如果出现上述数据类别过多或者需要进行编码的列数量较多便会积累大量的map。而刚好这份数据有26种类别数据,且某些类别的种类居然能有三百多万种,所以只能另辟蹊径。下面的方法效仿StringIndexer的部分实现来达到目的,而且运行效率比之前有了很大的提升。当然,由于某些类别出现的频率很低,也可以采取一些cutoff措施,比如该用countByValue,只保留前n个类别,或者保留频率在某个数值以上的类别。
下面实现考虑cutoff,出现次数少于万分之一的类别统一归类为UNK。
@H_404_8@val spark = SparkSession .builder() .master("local[*]") // 这里的driver.memory和memory.fraction只做展示,实际使用中要在driver启动前设置才有效。即如果在idea中想增大driver的大小,这需要在VM option中设置堆大小。另外,local模式下设置提高driver大小即可,因为executor也是在同一个JVM进程中。 .config("spark.driver.memory", 5G) .config("spark.sql.shuffle.partitions", 12) .config("spark.default.parallelism", 12) .config("spark.memory.fraction", 0.75) .getorCreate() import org.apache.spark.sql.functions._ val path = "" // 数据源是txt文件,但可以通过csv来推断格式 val df = spark.read .option("header", false) .option("delimiter", "\t") .option("inferSchema", true) .format("csv") .load(path + "..") // 如果内存够大,先把它全部加载到内存,减少IO df.persist(StorageLevel.MEMORY_AND_disK_SER) val dataSize = df.count() val cutoff = dataSize * 0.0001 val numCols = (1 until 14).map(i => s"_c$i").toArray var df1 = df numCols.foreach(column => { df1 = df1.withColumn(column, when(col(column).isNull, 0).otherwise(log(col(column) + 10))) }) val catCols = (14 until 40).map(i => s"_c$i") var df2 = df1 // 所有cat列统一编码 // 使用java的map,通常java的集合比scala的更效率,而且java的hashmap能够初始化大小 val inderMap: util.HashMap[String, util.HashMap[String, Int]] = new util.HashMap(catCols.length) var i = 0 for (column <- catCols) { val uniqueElem = df2.select(column) .groupBy(column) .agg(count(column)) .filter(col(s"count($column)") >= cutoff) .select(column) .map(_.getAs[String](0)) .collect() val len = uniqueElem.length var index = 0 val freqMap = new util.HashMap[String, Int](len) while (index < len){ freqMap.put(uniqueElem(index), i) index += 1 i += 1 } freqMap.put("UNK", i) i += 1 inderMap.put(column, freqMap) } val bcMap = spark.sparkContext.broadcast(inderMap) for (column <- catCols) { val Indexer = udf { elem: String => val curMap = bcMap.value.get(column) if (elem == null || !curMap.containsKey(elem)) curMap.get("UNK") else curMap.get(elem) } df2 = df2.withColumn(column + "_e", Indexer(col(column))).drop(column) } // 如需要划分训练集和测试集 val Array(train, test) = df2.randomSplit(Array(0.9, 0.1)) // parquet输出 df2.write .mode("overwrite") .save(path + "res/") // txt输出 df2.map(x => x.mkString(",")) .write .mode("overwrite") .format("text") .save(path + "res/txt") // 后面tensorflow需要 print("The total dimension of all categorical features is " + i) // 14670
Tensorflow
下面代码大致介绍深度学习的整体流程,生产环境的代码需要做一定的修改,可以参考“https://github.com/yangxudong/deeplearning/tree/master/DCN”和“https://github.com/lambdaji/tf_repos/tree/master/deep_ctr”两个GitHub的实现。
大致流程:定义数据输入函数input_fn,然后开始规划模型和它的训练和测试operation,最后是执行阶段的代码。
input function
@H_404_8@def input_fn(filenames, batch_size=32): def _parse_line(line): fields = tf.decode_csv(line,FIELD_DEFAULTS) label = fields[0] num_features = fields[1:14] cat_features = fields[14:] return num_features, cat_features, label num_features, cat_features, label = tf.data.TextLineDataset(filenames)\ .repeat(2)\ .prefetch(1024)\ .batch(batch_size)\ .map(_parse_line, num_parallel_calls=2)\ .make_one_shot_iterator()\ .get_next() return num_features, cat_features, label
数据和模型的变量
@H_404_8@# 构建一些input需要用到的参数。 NUM_COLUMNS = ["c%d" % i for i in range(14)] CAT_COLUMNS = ["c%d" % i for i in range(14,40)] FIELD_DEFAULTS = [] for i in range(14): FIELD_DEFAULTS.append([0.0]) for i in range(14,40): FIELD_DEFAULTS.append([0]) filenames = [] for i in range(24): ... # 本地调试 num_col = 13 cat_col = 26 cat_size = 14670 embedding_size = 12 cross_layers = 3 deep_layers = [200,100,33] label_size = 1 learning_rate = 0.0005
DCN模型
@H_404_8@# DCN模型的构建,这里利用LOW Level API,实际上按照custom Estimator的流程会更好。 with tf.name_scope("DCN_model"): he_init = tf.variance_scaling_initializer() with tf.name_scope("Embedding_layer"): x1, x2, label = input_fn(filenames,32) Embed_W = tf.get_variable(name='embed_w', shape=[cat_size, embedding_size], initializer=he_init) # TC * E embeddings = tf.nn.embedding_lookup(Embed_W, x2) # ? * C * E oned_embed = tf.reshape(embeddings, shape=[-1, cat_col * embedding_size]) # ? * (C * E) embed_layer_res = tf.concat([x1, oned_embed], 1) # ? * (N + C * E) with tf.name_scope("Cross_Network"): x0 = embed_layer_res cross_x = embed_layer_res for level in range(cross_layers): Cross_W = tf.get_variable(name='cross_w%s' % level, shape=[num_col + cat_col * embedding_size, 1], initializer=he_init) # (N + C * E) * 1 Cross_B = tf.get_variable(name='cross_b%s' % level, shape=[1,num_col + cat_col * embedding_size], initializer=he_init) # (N + C * E) * 1 xtw = tf.matmul(cross_x, Cross_W) # ? * 1 cross_x = x0 * xtw + cross_x + Cross_B # (N + C * E) * 1 with tf.name_scope("Deep_Network"): deep_x = embed_layer_res for neurons in deep_layers: deep_x = tf.layers.dense(inputs=deep_x, units=neurons, name='deep_%s' % neurons, activation=tf.nn.selu, kernel_initializer=he_init) with tf.variable_scope("Output-layer"): x_stack = tf.concat([cross_x, deep_x], 1) # ? * ((N + C * E) + deep_layers[-1]) logits = tf.layers.dense(inputs=x_stack, units=label_size, name="outputs") z = tf.reshape(logits, shape=[-1]) pred = tf.sigmoid(z)
训练和评估指标
@H_404_8@with tf.name_scope("loss"): xentropy = tf.nn.sigmoid_cross_entropy_with_logits(labels=label, logits=z) loss = tf.reduce_mean(xentropy, name="loss") loss_summary = tf.summary.scalar('log_loss', loss) with tf.name_scope("train"): optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate, beta1=0.9, beta2=0.999, epsilon=1e-8) training_op = optimizer.minimize(loss) with tf.name_scope("eval"): acc, upacc = tf.metrics.accuracy(label, tf.math.round(pred)) auc, upauc = tf.metrics.auc(label, pred) acc_summary = tf.summary.scalar('accuracy', upacc) auc_summary = tf.summary.scalar('auc', upauc)
Tensorbroad相关设置,optional
@H_404_8@from datetime import datetime def log_dir(prefix=""): Now = datetime.utcNow().strftime("%Y%m%d%H%M%s") root_logdir = "tf_logs" if prefix: prefix += "-" name = prefix + "run-" + Now return "{}/{}/".format(root_logdir, name) logdir = log_dir("my_dcn") file_writer = tf.summary.FileWriter(logdir, tf.get_default_graph())
执行阶段
@H_404_8@# 包含checkpoint、early stop。同样,这里利用LOW Level API,实际上按照custom Estimator的流程会更好。 n_epochs = 2 data_size = 12000000 batch_size = 64 n_batches = int(np.ceil(data_size / batch_size)) checkpoint_path = ".../model/my_dcn_model.ckpt" checkpoint_epoch_path = checkpoint_path + ".epoch" final_model_path = "./my_deep_mnist_model" best_auc = np.infty epochs_without_progress = 0 max_epochs_without_progress = 20 saver = tf.train.Saver() gb_init = tf.global_variables_initializer() lc_init = tf.local_variables_initializer() with tf.Session() as sess: if os.path.isfile(checkpoint_epoch_path): with open(checkpoint_epoch_path, "rb") as f: start_epoch = int(f.read()) print("Training was interrupted. Continuing at epoch", start_epoch) saver.restore(sess, checkpoint_path) else: start_epoch = 0 sess.run([gb_init,lc_init]) for epoch in range(start_epoch, n_epochs): for batch_index in range(n_batches): # 每2000批数据测试一遍 if batch_index % 2000 != 0: sess.run(training_op) else: loss_tr, loss_summary_str, up1, up2, acc_summary_str, auc_summary_str = sess.run([loss, loss_summary, upacc, upauc, acc_summary, auc_summary]) print("Epoch:", epoch, ",Batch_index:", batch_index, "\tLoss: {:.5f}".format(loss_tr), "\tACC: ", up1, "\tAUC", up2) file_writer.add_summary(acc_summary_str, batch_index) file_writer.add_summary(auc_summary_str, batch_index) file_writer.add_summary(loss_summary_str, batch_index) if batch_index % 5000 == 0: saver.save(sess, checkpoint_path) with open(checkpoint_epoch_path, "wb") as f: f.write(b"%d" % (epoch + 1)) if up2 < best_auc: saver.save(sess, final_model_path) best_auc = up2 else: epochs_without_progress += 1 if epochs_without_progress > max_epochs_without_progress: print("Early stopping") break
参考: