UnknownError:无法启动 gRPC 服务器

问题描述

我找不到解决此问题的方法。 有人能告诉我如何解决这个问题吗?

[来源]

assert 'COLAB_TPU_ADDR' in os.environ,'ERROR: Not connected to a TPU runtime; please see the first cell in this notebook for instructions!'
TPU_ADDRESS = 'grpc://' + os.environ['COLAB_TPU_ADDR']

#cluster = tf.distribute.cluster_resolver.TPUClusterResolver(TPU_ADDRESS,zone=None,project=None)
def create_in_process_cluster(num_workers,num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,job_name="worker",task_index=i,config=worker_config,protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,job_name="ps",protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec,rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 1
NUM_PS = 1
cluster = create_in_process_cluster(NUM_WORKERS,NUM_PS)

def ColabTPUClusterResolver(tpu_name,zone,project):
  print("ColabTPUClusterResolver returns %s" % (cluster))
  return cluster

tf.distribute.cluster_resolver.TPUClusterResolver = ColabTPUClusterResolver

from electra.model import optimization as electra_optimization
from language.common.utils import tensor_utils
from language.common.utils import tpu_utils
from electra.model import modeling
import collections
import re
import json

def build_transformer(inputs,is_training,use_tpu,bert_config,name="electra",reuse=False,**kwargs):
  """Build a transformer encoder network."""
  with tf.variable_scope(tf.get_variable_scope(),reuse=reuse):
    return modeling.BertModel(
        bert_config=bert_config,is_training=is_training,input_ids=inputs["input_ids"],input_mask=inputs["input_mask"],token_type_ids=inputs["segment_ids"],use_one_hot_embeddings=use_tpu,scope=name,**kwargs)

def get_projected_emb(bert,scope,params,is_training):
  with tf.variable_scope(scope):
    reprs = bert.get_pooled_output()
    projected_emb = tf.layers.dense(reprs,params["projection_size"],name="weight")
    projected_emb = tf.keras.layers.Layernormalization(axis=-1)(projected_emb)
    if is_training:
      projected_emb = tf.nn.dropout(projected_emb,rate=0.1)
    return projected_emb

def get_assignment_map_from_checkpoint(tvars,init_checkpoint,target,prefix="",pattern="^electra"):
  """Compute the union of the current variables and checkpoint variables."""
  name_to_variable = collections.OrderedDict()
  for var in tvars:
    name = var.name
    m = re.match("^(.*):\\d+$",name)
    if m is not None:
      name = m.group(1)
    name_to_variable[name] = var

  initialized_variable_names = {}
  assignment_map = collections.OrderedDict()

  for x in tf.train.list_variables(init_checkpoint):
    (ckpt_name,ckpt_var) = (x[0],x[1])

    m = re.match(pattern,ckpt_name)
    if m is not None:
      # "electra/weight": "bert_q/weight"
      target_variable_name = re.sub(pattern,prefix + target,ckpt_name)
      if target_variable_name in name_to_variable:
        # Workaround for init_from_checkpoint in 2.0
        assignment_map[ckpt_name] = name_to_variable[target_variable_name]
        initialized_variable_names[ckpt_name] = 1
        initialized_variable_names[ckpt_name + ":0"] = 1
      # Workaround for extra scope generation("embeddings_1")
      #   caused by tf.keras.layers.Layernormalization (?)
      ckpt_name_emb1 = ckpt_name.replace("embeddings","embeddings_1")
      target_variable_name = re.sub(pattern,ckpt_name_emb1)
      if target_variable_name in name_to_variable and ckpt_name not in initialized_variable_names:
        # Workaround for init_from_checkpoint in 2.0
        assignment_map[ckpt_name] = name_to_variable[target_variable_name]
        initialized_variable_names[ckpt_name] = 1
        initialized_variable_names[ckpt_name + ":0"] = 1

  return assignment_map,initialized_variable_names

def init_from_checkpoint(params):
  # Load pre-trained weights from checkpoint
  init_checkpoint = params["init_check_point"]
  tf.logging.info("Using checkpoint: %s",init_checkpoint)
  tvars = tf.trainable_variables()

  scaffold_fn = None
  if init_checkpoint:
    assignment_map_q,_ = get_assignment_map_from_checkpoint(
        tvars,target="bert_q")
    assignment_map_b,target="bert_b")

    if params["use_tpu"]:
      def tpu_scaffold():
        tf.train.init_from_checkpoint(init_checkpoint,assignment_map_q)
        tf.train.init_from_checkpoint(init_checkpoint,assignment_map_b)
        return tf.train.Scaffold()
      scaffold_fn = tpu_scaffold
    else:
      tf.train.init_from_checkpoint(init_checkpoint,assignment_map_q)
      tf.train.init_from_checkpoint(init_checkpoint,assignment_map_b)
  return scaffold_fn

def model_fn(features,labels,mode,params):
  """Model function."""
  del labels

  # [local_batch_size,block_seq_len]
  block_ids = features["block_ids"]
  block_mask = features["block_mask"]
  block_segment_ids = features["block_segment_ids"]

  # [local_batch_size,query_seq_len]
  query_ids = features["query_ids"]
  query_mask = features["query_mask"]

  local_batch_size = tensor_utils.shape(block_ids,0)
  tf.logging.info("Model batch size: %d",local_batch_size)

  is_training = mode == tf.estimator.ModeKeys.TRAIN 

  # Load config.json
  with tf.io.gfile.GFile(params["bert_config"],"r") as f:
    config_json = json.load(f)
    bert_config = modeling.BertConfig.from_dict(config_json)

  # Build BERT_Q
  bert_q = build_transformer(
    dict(
      input_ids=query_ids,input_mask=query_mask,segment_ids=tf.zeros_like(query_ids)),params["use_tpu"],embedding_size=bert_config.hidden_size,untied_embeddings=True,name="bert_q")

  query_emb = get_projected_emb(bert_q,"emb_q",is_training)

  # Build BERT_B
  bert_b = build_transformer(
    dict(
      input_ids=block_ids,input_mask=block_mask,segment_ids=block_segment_ids),name="bert_b")

  block_emb = get_projected_emb(bert_b,"emb_b",is_training)

  if params["use_tpu"]:
    # [global_batch_size,hidden_size]
    block_emb = tpu_utils.cross_shard_concat(block_emb)

    # [global_batch_size,local_batch_size]
    labels = tpu_utils.cross_shard_pad(tf.eye(local_batch_size))

    # [local_batch_size]
    labels = tf.argmax(labels,0)
  else:
    # [local_batch_size]
    labels = tf.range(local_batch_size)

  tf.logging.info("Global batch size: %s",tensor_utils.shape(block_emb,0))

  # [batch_size,global_batch_size]
  logits = tf.matmul(query_emb,block_emb,transpose_b=True)

  # []
  loss = tf.losses.sparse_softmax_cross_entropy(labels=labels,logits=logits)

  # Load pre-trained ELECTRA checkpoint 
  scaffold_fn = init_from_checkpoint(params)

  train_op = electra_optimization.create_optimizer(
      loss=loss,learning_rate=params["learning_rate"],num_train_steps=params["num_train_steps"],warmup_steps=min(10000,max(100,int(params["num_train_steps"]/10))),use_tpu=params["use_tpu"] if "use_tpu" in params else False)

  predictions = tf.argmax(logits,-1)

  metric_args = [query_mask,block_mask,predictions,features["mask_query"]]

  def metric_fn(query_mask,mask_query):
    masked_accuracy = tf.metrics.accuracy(
        labels=labels,predictions=predictions,weights=mask_query)
    unmasked_accuracy = tf.metrics.accuracy(
        labels=labels,weights=tf.logical_not(mask_query))
    return dict(
        query_non_padding=tf.metrics.mean(query_mask),block_non_padding=tf.metrics.mean(block_mask),actual_mask_ratio=tf.metrics.mean(mask_query),masked_accuracy=masked_accuracy,unmasked_accuracy=unmasked_accuracy)

  if params["use_tpu"]:
    return tf.estimator.tpu.TPUEstimatorSpec(
        mode=mode,loss=loss,train_op=train_op,scaffold_fn=scaffold_fn,eval_metrics=(metric_fn,metric_args))
  else:
    return tf.estimator.EstimatorSpec(
        mode=mode,eval_metric_ops=metric_fn(*metric_args),predictions=predictions)

from language.orqa.utils import bert_utils
from transformers import BertJapanesetokenizer

bert_japanese_tokenizer = BertJapanesetokenizer.from_pretrained('cl-tohoku/bert-base-japanese-whole-word-masking')

def get_bert_japanese_tokenizer(bert_hub_module_path):
  return bert_japanese_tokenizer

bert_utils.get_tokenizer = get_bert_japanese_tokenizer

def run_experiment(model_fn,train_input_fn,eval_input_fn,exporters=None,params=None,params_fname=None):
  params = params if params is not None else {}
  params.setdefault("use_tpu",FLAGS.use_tpu)

  if FLAGS.model_dir and params_fname:
    tf.io.gfile.makedirs(FLAGS.model_dir)
    params_path = os.path.join(FLAGS.model_dir,params_fname)
    with tf.io.gfile.GFile(params_path,"w") as params_file:
      json.dump(params,params_file,indent=2,sort_keys=True)

  if params["use_tpu"]:
    if FLAGS.tpu_name:
      tpu_cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
          FLAGS.tpu_name,zone=FLAGS.tpu_zone,project=FLAGS.gcp_project)
    else:
      tpu_cluster_resolver = None
    run_config = tf.estimator.tpu.runconfig(
        cluster=tpu_cluster_resolver,master=FLAGS.master,model_dir=FLAGS.model_dir,tf_random_seed=FLAGS.tf_random_seed,save_checkpoints_steps=FLAGS.save_checkpoints_steps,tpu_config=tf.estimator.tpu.TPUConfig(
            iterations_per_loop=FLAGS.iterations_per_loop)) # NOTE: modified from FLAGS.save_checkpoints_steps
    if "batch_size" in params:
      # Let the TPUEstimator fill in the batch size.
      params.pop("batch_size")
    estimator = tf.estimator.tpu.TPUEstimator(
        use_tpu=True,model_fn=model_fn,params=params,config=run_config,train_batch_size=FLAGS.batch_size,eval_batch_size=FLAGS.eval_batch_size,predict_batch_size=FLAGS.eval_batch_size)
  else:
    run_config = tf.estimator.runconfig(
        model_dir=FLAGS.model_dir,keep_checkpoint_max=FLAGS.keep_checkpoint_max)
    params["batch_size"] = FLAGS.batch_size
    estimator = tf.estimator.Estimator(
        config=run_config,model_dir=FLAGS.model_dir)

  train_spec = tf.estimator.TrainSpec(
      input_fn=train_input_fn,max_steps=FLAGS.num_train_steps)
  eval_spec = tf.estimator.EvalSpec(
      name="default",input_fn=eval_input_fn,exporters=exporters,start_delay_secs=FLAGS.eval_start_delay_secs,throttle_secs=FLAGS.eval_throttle_secs,steps=FLAGS.num_eval_steps)

  tf.logging.set_verbosity(tf.logging.INFO)
  tf.estimator.train_and_evaluate(
      estimator=estimator,train_spec=train_spec,eval_spec=eval_spec)

[日志]

ColabTPUClusterResolver 返回 INFO:tensorflow:Using config: {'_model_dir': '/content/drive/MyDrive/ORQA/orqa/ict_model','_tf_random_seed':无, '_save_summary_steps':100,'_save_checkpoints_steps':1000, '_save_checkpoints_secs':无,'_session_config': allow_soft_placement: true cluster_def { 作业 { 名称:“ps” 任务 { 关键:0 值:“本地主机:21782” } } 工作 { 姓名:《工人》 任务 { 关键:0 值:“本地主机:22658” 隔离会话状态:真,'_keep_checkpoint_max':5,'_keep_checkpoint_every_n_hours':10000,'_log_step_count_steps': 无,'_train_distribute':无,'_device_fn':无,'_protocol': 无,'_eval_distribute':无,'_experimental_distribute':无, '_experimental_max_worker_delay_secs':无, '_session_creation_timeout_secs':7200,'_checkpoint_save_graph_def': 是的,'_service':无,'_cluster_spec':ClusterSpec({'ps': ['localhost:21782'],'worker': ['localhost:22658']}),'_task_type': '工人','_task_id':0,'_global_id_in_cluster':0,'_master': 'grpc://','_evaluation_master': 'grpc://','_is_chief': 真, '_num_ps_replicas':0,'_num_worker_replicas':1,'_tpu_config': TPUConfig(iterations_per_loop=200,num_shards=None,num_cores_per_replica=无,per_host_input_for_training=2, tpu_job_name=无,initial_inFeed_sleep_secs=无, input_partition_dims=无,eval_training_input_configuration=2, Experiment_host_call_every_n_steps=1, Experiment_allow_per_host_v2_parallel_get_next=假, Experiment_Feed_hook=None),'_cluster': } 信息:t​​ensorflow:使用配置: {'_model_dir': '/content/drive/MyDrive/ORQA/orqa/ict_model','_tf_random_seed':无,'_save_summary_steps':100, '_save_checkpoints_steps':1000,'_save_checkpoints_secs':无, '_session_config': allow_soft_placement: true cluster_def { job { 名称:“ps” 任务 { 关键:0 值:“本地主机:21782” } } 工作 { 姓名:《工人》 任务 { 关键:0 值:“本地主机:22658” 隔离会话状态:真,'_keep_checkpoint_max':5,'_keep_checkpoint_every_n_hours':10000,'_log_step_count_steps': 无,'_train_distribute':无,'_device_fn':无,'_protocol': 无,'_eval_distribute':无,'_experimental_distribute':无, '_experimental_max_worker_delay_secs':无, '_session_creation_timeout_secs':7200,'_checkpoint_save_graph_def': 是的,'_service':无,'_cluster_spec':ClusterSpec({'ps': ['localhost:21782'],'_cluster': } 信息:t​​ensorflow:_TPUContext:eval_on_tpu 真信息:tensorflow:_TPUContext: eval_on_tpu 真信息:tensorflow:Not 使用分发协调器。信息:张量流:不使用分发 协调员。警告:tensorflow:TF_CONfig 中不应为空 分布式环境。警告:tensorflow:TF_CONfig 不应该是 在分布式环境中为空。信息:张量流:启动张量流 服务器。信息:tensorflow:启动 Tensorflow 服务器。 -------------------------------------------------- ------------------------- UnkNownError Traceback(最近调用 最后) 在 () 23 eval_input_fn=functools.partial(ict_model.input_fn,is_train=False),24 个出口商=无, ---> 25 个参数=参数)

run_experiment(model_fn,train_input_fn、eval_input_fn、exporters、params、params_fname) 第336话 第337话 --> 338 eval_spec=eval_spec)

/usr/local/lib/python3.7/dist-packages/tensorflow_estimator/python/estimator/training.py 在 train_and_evaluate(estimator,train_spec,eval_spec) 503 '(任务 ID 为 0)。给定任务 id {}'.format(config.task_id)) 504 --> 505 返回 executor.run() 506 507

/usr/local/lib/python3.7/dist-packages/tensorflow_estimator/python/estimator/training.py 在运行(自己) 670 '不支持任务类型{}。支持的任务类型是 {}'.format( 第 671 章 --> 672 getattr(自我,task_to_run)() 673 674 def run_chief(self):

/usr/local/lib/python3.7/dist-packages/tensorflow_estimator/python/estimator/training.py 在 run_worker(self) 中 681 """运行任务(训练)工人。""" 682 # Todo(xiejw):允许执行框架添加训练钩子。 --> 683 返回 self._start_distributed_training() 684 685 def run_master(自我):

/usr/local/lib/python3.7/dist-packages/tensorflow_estimator/python/estimator/training.py 在 _start_distributed_training(self,Saving_listeners) 803 # 开始训练前相互连接。 804 如果不是 _is_google_env(): --> 805 self._start_std_server(config) 806 807 # 延迟工作器启动。对于异步训练,这通常有助于建模

/usr/local/lib/python3.7/dist-packages/tensorflow_estimator/python/estimator/training.py 在 _start_std_server(self,config) 第791话 792开始=假, --> 793 协议=config.protocol) 第794章 795返回服务器

/usr/local/lib/python3.7/dist-packages/tensorflow/python/training/server_lib.py 在 init(self,server_or_cluster_def,job_name,task_index,协议、配置、启动) 第146话 第147话 --> 148 self._server = c_api.TF_NewServer(self._server_def.SerializetoString()) 149 如果开始: 150 self.start()

未知错误:无法启动 gRPC 服务器

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)