问题描述
我已经在单台计算机上试用了TFF教程(MNIST),现在我正尝试使用MNIST数据执行多机处理。
很明显,我不能使用create_tf_dataset_for_client
,所以我已经使用GRPC来学习如何将数据从一台计算机传递到另一台计算机。
我的情况是服务器将向所有参与的客户端分发初始模型(带有零),在该模型上本地数据将在该模型上运行。每个客户端都会将新的权重分配给将执行federated_mean的服务器。
我当时正在考虑使用tff.learning.build_federated_averaging_process
,希望可以自定义next
函数(第二个参数),但是失败了……我什至不确定我们是否使用这种方法来发送模型并从远程客户端获取权重。
然后我想我可以在tff.federated_mean
装饰器下使用@tff.federated_computation
。但是,由于权重是数组,而且我有它们的列表(因为我有很多客户),所以我无法理解如何创建指向列表列表的tff.FederatedType
。在分布式数据集上对联盟建模的人的任何帮助将很容易理解。
关于, 开发
解决方法
TFF计算被设计为与平台/运行时无关;一个计算可以由几个尊重不同语义的不同后端执行。
TFF的类型系统在这里可以帮助您推断数据在计算中如何流动。有关custom federated algorithms part 1 tutorial的TFF如何思考类型的介绍,请参见。
build_federated_averaging_process
的结果需要一个放在客户端的数据集的参数;对于元素类型为T
的数据集,以TFF的常用符号表示为{T*}@C
。对于数据集 如何到达客户端,或者实际上客户端如何表示,该签名是不可知的。
对数据进行材料化处理并代表客户是运行时的工作。 TFF在此提供了一些所谓的native选项。
例如,在本地运行时中,客户端由本地计算机上的线程表示。数据集只是渴望的tf.data.Dataset
对象,线程在训练过程中从数据集中提取数据。
在远程运行时中,客户端由远程工作程序(上的线程)表示,因此单个远程工作程序可以运行多个客户端。如您所述,在这种情况下,必须在远程工作者上实现数据才能进行训练。
完成此操作有几种选择。
一个,TFF实际上将为您处理整个RPC连接中急切的数据集的序列化和反序列化,因此您可以使用 identical 模式指定数据,就像在本地运行时一样,它应该“工作”(有一些警告;以某种方式具有状态的数据集,例如经过改组的数据无法序列化。)
也许更好地映射到联邦计算的概念是使用最近引入的一些库函数来简单地实例化工作人员上的数据集。
假设您有一个迭代过程ip
,它接受一个state
和data
参数,其中data
的类型为{T*}@C
。进一步假设我们有一个TFF计算get_dataset_for_client_id
,它接受一个字符串并返回适当类型的数据集(即,其TFF类型签名为tf.str -> T*
)。
然后我们可以将这两个计算组合成另一个:
@tff.federated_computation(STATE_TYPE,tff.FederatedType(tf.string,tff.CLIENTS))
def new_next(state,client_ids):
datasets_on_clients = tff.federated_map(get_dataset_for_client_id,client_ids)
return ip.next(state,datasets_on_clients)
new_next
现在要求控制器仅指定要在其上进行培训的客户端的 ids ,并将指向数据存储的责任委托给代表客户端的任何人。
我认为这种模式可能是您想要的;并且最近在TFF中通过tff.simulation.ClientData
上的dataset_computation
属性和诸如tff.simulation.compose_dataset_computation_with_iterative_process
之类的助手来促进其使用,
这里还有一些警告。鉴于此更改是相对较新的,并不是每个具体的ClientData
类都有一个实现,因此您可能最终会得到一个NotImplementedError
(使用HDF5文件进行这项工作有些麻烦)。但从高层次来看,我认为这是我们的两个选择。
让我们逐步进行此操作。请让我们知道下面的解释是否回答您的问题。
- 让我们从一个TF(非联邦的,仅是本地的)代码示例开始,该代码获取数据集并对其执行某些操作,比如说加数字:
@tff.tf_computation(tff.SequenceType(tf.int32))
def process_data(ds):
return ds.reduce(np.int32(0),lambda x,y: x + y)
此代码在输入时获取一个整数数据集,并在输出时返回一个带有其总和的整数。
您可以通过查看类型签名来确认这一点,就像这样:
str(process_data.type_signature)
您应该看到以下内容:
(int32* -> int32)
因此,process_data
接受一组整数,然后返回一个整数。
- 现在,使用TFF的联合运算符,我们可以创建在多个客户端上执行此操作的联合计算,如下所示:
@tff.federated_computation(tff.FederatedType(tff.SequenceType(tf.int32),tff.CLIENTS))
def process_data_on_clients(federated_ds):
return tff.federated_map(process_data,federated_ds)
如果您查看此新计算的类型签名(就像上面一样),您将看到:
({int32*}@CLIENTS -> {int32}@CLIENTS)
这意味着process_data_on_clients
接受一组联合整数(每个客户端一组),并返回一个联合整数(每个客户端上有一个总和的整数)。
上面发生的是,process_data
中的TF逻辑将在每个客户端上执行一次。 federated_map
运算符就是这样的。
- 现在,
process_data_on_clients
有点像您正在使用的迭代过程。它希望您提供一个联合数据集作为参数。
让我们看看如何按照与上面相同的方式制作一个。
以下是一些TF代码,它们创建了一个包含整数的单个数据集,例如,您提供了一个整数n
,并希望创建一个数字范围为1到n(即{1、2,...)的数据集。 ..,n}:
@tff.tf_computation(tf.int32)
def make_data(n):
return tf.data.Dataset.range(tf.cast(n,tf.int64)).map(lambda x: tf.cast(x + 1,tf.int32))
这显然是一个愚蠢的示例,您可以根据需要执行更多操作(例如,从由名称指定的文件中读取数据等)。
这是它的类型签名:
(int32 -> int32*)
您可以看到与process_data
的相似之处。
而且,就像处理数据一样,现在我们可以使用federated_map
运算符在所有客户端上制作数据了:
@tff.federated_computation(tff.FederatedType(tf.int32,tff.CLIENTS))
def make_data_on_clients(federated_n):
return tff.federated_map(make_data,federated_n)
这是类型签名:
({int32}@CLIENTS -> {int32*}@CLIENTS)
太好了,因此make_data_on_clients
接受一个联合整数(告诉我们每个客户端要生成多少个数据项),并返回一个联合数据集,就像process_data_on_clients
想要的那样。
您可以检查两者是否可以按预期工作:
federated_n = [2,3,4]
federated_ds = make_data_on_clients(federated_n)
result = process_data_on_clients(federated_ds)
result
您应该在此计算涉及的3个客户端上获得总和1 + 2、1 + 2 + 3和1 + 2 + 3 + 4(请注意,上面的联合整数中有3个数字,因此有3个客户在这里):
[<tf.Tensor: shape=(),dtype=int32,numpy=3>,<tf.Tensor: shape=(),numpy=6>,numpy=10>]
请注意,您到目前为止已经看到的所有TF代码(包括数据集创建和数据集归约)都在客户端上执行(使用federated_map
)。
- 现在,您可以将两者放在一起:
@tff.federated_computation(tff.FederatedType(tf.int32,tff.CLIENTS))
def make_and_process_data_on_clients(federated_n):
federated_ds = make_data_on_clients(federated_n)
return process_data_on_clients(federated_ds)
现在,您可以一口气调用制造和处理数据组合:
make_and_process_data_on_clients(federated_n)
同样,这里的所有TF代码都在客户端上执行,就像上面一样。
- 那这把你留在哪里?
回到Keith的解释,您从TFF获得的迭代过程需要输入联合数据集,就像process_data_on_clients
。
Keith的示例中的功能get_dataset_for_client_id
与我们的make_data
相似,因为它假定包含要在每个客户端上运行以在该客户端上物理构造数据集的TensorFlow代码。
在一个愚蠢的示例中,数据集构造逻辑使用了range
,但可以是任何东西。例如,您可以从同一本地文件my_data
或使用自定义TF op或通过其他任何方式在每个客户端上加载数据。就像在我们的示例中一样,您可以将参数传递给该函数以提供更集中的控制(类似于上面对联合整数所做的任何操作)。
Keith的示例中的代码抓取器new_next
就像我们的make_and_process_data_on_clients
一样,它结合了两个联合计算:一个在客户端上生成联合数据(由您提供,如此处所述),还有一个处理该数据的程序(来自tff.learning(迭代过程))。
有帮助吗?
如果仍然不清楚,我建议您尝试上面在分布式设置中包括的示例,因为您已经有了一个。您可以向该代码注入一些TF打印操作,以确认您编写的TF代码正在系统中的客户端计算机上执行。
一旦您掌握了这一部分,只需进行简单的调整即可将make_data
中的愚蠢数据集构造逻辑替换为一个从您正在使用的本地数据源向每个客户端加载数据集的逻辑。
编辑:
关于:如何打印,出现在@ tff.tf_computation主体中的任何TensorFlow代码都将以急切模式执行,并且您可以使用标准的TensorFlow机制(例如tf.print)从TensorFlow中进行打印。
tensorflow.org/api_docs/python/tf/print
有关如何配置具有多个工作程序节点的多计算机系统的信息,请参见Kubernetes教程。请注意,驱动进程的计算机连接到工作程序节点,而不是相反。
https://www.tensorflow.org/federated/tutorials/high_performance_simulation_with_kubernetes