问题描述
给定具有相同架构的两个 keras 模型 model1
和 model2
,我需要使用模型权重训练第一个模型,使用模型权重的移动平均值训练第二个模型。下面是一个例子来说明:
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow_addons.optimizers import MovingAverage
import tensorflow as tf
model1 = Model(...)
model2 = tf.keras.models.clone_model(model1)
opt1 = Adam()
opt2 = MovingAverage(Adam())
model1.compile(optimizer=opt1)
model2.compile(optimizer=opt2)
with tf.GradientTape() as tape,tf.GradientTape() as tape2:
loss = calculate_loss() # the loss is the same
grads1 = tape.gradient(loss,model1.trainable_variables)
grads2 = tape2.gradient(loss,model2.trainable_variables)
model1.optimizer.apply_gradients(zip(grads1,model1.trainable_variables))
model2.optimizer.apply_gradients(zip(grads2,model2.trainable_variables))
每次梯度更新后,两个模型都将在同一输入上调用以输出不同的值。
v1 = model1(inp)
v2 = model2(inp)
是否有可能通过合并两个模型以某种方式从权重和平均权重?
解决方法
基本上,您可以在一个模型下创建同一网络的两个副本,但名称范围不同,然后在优化时,使用一个优化器更新您的 regular
权重,并让另一个优化器只更新您的moving average
权重。
数据
import numpy as np
import tensorflow as tf
from tensorflow_addons.optimizers import MovingAverage
from tensorflow.keras.optimizers import Adam
# fake data
X = tf.random.normal([1000,128])
y = tf.one_hot(
tf.random.uniform(
[1000,],minval=0,maxval=3,dtype=tf.int64),3)
自定义模型
# custom model with weights under specific name scopes
class DualWeightModel(tf.keras.Model):
def __init__(self,num_units=256):
super().__init__()
self.num_units = num_units
self.x_r = tf.keras.layers.Dense(self.num_units)
self.l_r = tf.keras.layers.Dense(3,activation="softmax")
self.x_ma = tf.keras.layers.Dense(self.num_units)
self.l_ma = tf.keras.layers.Dense(3,activation="softmax")
def call(self,x):
with tf.name_scope("regular"):
out_r = self.l_r(self.x_r(x))
with tf.name_scope("ma"):
out_ma = self.l_ma(self.x_ma(x))
return out_r,out_ma
# loss function
def calc_loss(y_true,y_pred):
return tf.keras.losses.CategoricalCrossentropy()(y_true,y_pred)
优化
# optimizers
opt_r = Adam(1e-4)
opt_ma = MovingAverage(Adam(1e-4))
# instantiate model
model = DualWeightModel()
# define one train step
def train_step(X,y):
# forward pass
with tf.GradientTape(persistent=True) as tape:
y_hat_r,y_hat_ma = model(X)
r_loss = calc_loss(y,y_hat_r)
ma_loss = calc_loss(y,y_hat_ma)
# get trainable variables under each name scope
r_vars = []
ma_vars = []
for v in model.trainable_variables:
if 'regular' in v.name:
r_vars.append(v)
if 'ma' in v.name:
ma_vars.append(v)
# optimize
r_grads = tape.gradient(r_loss,r_vars)
ma_grads = tape.gradient(ma_loss,ma_vars)
opt_r.apply_gradients(zip(r_grads,r_vars))
opt_ma.apply_gradients(zip(ma_grads,ma_vars))
return r_loss,ma_loss
训练模型
# train
train_iter = iter(tf.data.Dataset.from_tensor_slices((X,y)).batch(32))
for epoch in range(10):
r_losses,ma_losses = [],[]
for batch in range(100):
X_train,y_train = next(train_iter)
r_loss,ma_loss = train_step(X_train,y_train)
r_losses.append(r_loss)
ma_losses.append(ma_loss)
if batch % 5 == 0:
msg = (f"r_loss: {np.mean(r_losses):.4f} "
f"\tma_loss: {np.mean(ma_losses):.4f}")
print(msg)
r_losses = []
ma_losses = []
# r_loss: 1.6749 ma_loss: 1.7274
# r_loss: 1.4319 ma_loss: 1.6590
# ...