聊聊flink的NetworkEnvironmentConfiguration

  本文主要研究一下flink的NetworkEnvironmentConfiguration
  
  NetworkEnvironmentConfiguration
  
  flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
  
  public class NetworkEnvironmentConfiguration {
  
  private final float networkBufFraction;
  
  private final long networkBufMin;
  
  private final long networkBufMax;
  
  private final int networkBufferSize;
  
  private final IOMode ioMode;
  
  private final int partitionRequestInitialBackoff;
  
  private final int partitionRequestMaxBackoff;
  
  private final int networkBuffersPerChannel;
  
  private final int floatingNetworkBuffersPerGate;
  
  private final NettyConfig nettyConfig;
  
  /**
  
   * Constructor for a setup with purely local communication (no netty).
  
   */
  
  public NetworkEnvironmentConfiguration(
  
  float networkBufFraction,
  
  long networkBufMin,
  
  long networkBufMax,
  
  int networkBufferSize,
  
  IOMode ioMode,
  
  int partitionRequestInitialBackoff,
  
  int partitionRequestMaxBackoff,
  
  int networkBuffersPerChannel,
  
  int floatingNetworkBuffersPerGate) {
  
  this(networkBufFraction, networkBufMin, networkBufMax, networkBufferSize,
  
  ioMode,
  
  partitionRequestInitialBackoff, partitionRequestMaxBackoff,
  
  networkBuffersPerChannel, floatingNetworkBuffersPerGate,
  
  null);
  
  }
  
  public NetworkEnvironmentConfiguration(
  
  float networkBufFraction,
  
  long networkBufMin,
  
  long networkBufMax,
  
  int networkBufferSize,
  
  IOMode ioMode,
  
  int partitionRequestInitialBackoff,
  
  int partitionRequestMaxBackoff,
  
  int networkBuffersPerChannel,
  
  int floatingNetworkBuffersPerGate,
  
  @Nullable NettyConfig nettyConfig) {
  
  this.networkBufFraction = networkBufFraction;
  
  this.networkBufMin = networkBufMin;
  
  this.networkBufMax = networkBufMax;
  
  this.networkBufferSize = networkBufferSize;
  
  this.ioMode = ioMode;
  
  this.partitionRequestInitialBackoff = partitionRequestInitialBackoff;
  
  this.partitionRequestMaxBackoff = partitionRequestMaxBackoff;
  
  this.networkBuffersPerChannel = networkBuffersPerChannel;
  
  this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate;
  
  this.nettyConfig = nettyConfig;
  
  }
  
  // ------------------------------------------------------------------------
  
  public float networkBufFraction() {
  
  return networkBufFraction;
  
  }
  
  public long networkBufMin() {
  
  return networkBufMin;
  
  }
  
  public long networkBufMax() {
  
  return networkBufMax;
  
  }
  
  public int networkBufferSize() {
  
  return networkBufferSize;
  
  }
  
  public IOMode ioMode() {
  
  return ioMode;
  
  }
  
  public int partitionRequestInitialBackoff() {
  
  return partitionRequestInitialBackoff;
  
  }
  
  public int partitionRequestMaxBackoff() {
  
  return partitionRequestMaxBackoff;
  
  }
  
  public int networkBuffersPerChannel() {
  
  return networkBuffersPerChannel;
  
  }
  
  public int floatingNetworkBuffersPerGate() {
  
  return floatingNetworkBuffersPerGate;
  
  }
  
  public NettyConfig nettyConfig() {
  
  return nettyConfig;
  
  }
  
  // ------------------------------------------------------------------------
  
  @Override
  
  public int hashCode() {
  
  int result = 1;
  
  result = 31 * result + networkBufferSize;
  
  result = 31 * result + ioMode.hashCode();
  
  result = 31 * result + partitionRequestInitialBackoff;
  
  result = 31 * result + partitionRequestMaxBackoff;
  
  result = 31 * result + networkBuffersPerChannel;
  
  result = 31 * result + floatingNetworkBuffersPerGate;
  
  result = 31 * result + (nettyConfig != null ? nettyConfig.hashCode() : 0);
  
  return result;
  
  }
  
  @Override
  
  public boolean equals(Object obj) {
  
  if (this == obj) {
  
  return true;
  
  }
  
  else if (obj == null || getClass() != obj.getClass()) {
  
  return false;
  
  }
  
  else {
  
  final NetworkEnvironmentConfiguration that = (NetworkEnvironmentConfiguration) obj;
  
  return this.networkBufFraction == that.networkBufFraction &&
  
  this.networkBufMin == that.networkBufMin &&
  
  this.networkBufMax == that.networkBufMax &&
  
  this.networkBufferSize == that.networkBufferSize &&
  
  this.partitionRequestInitialBackoff == that.partitionRequestInitialBackoff &&
  
  this.partitionRequestMaxBackoff == that.partitionRequestMaxBackoff &&
  
  this.networkBuffersPerChannel == that.networkBuffersPerChannel &&
  
  this.floatingNetworkBuffersPerGate == that.floatingNetworkBuffersPerGate &&
  
  this.ioMode == that.ioMode &&
  
  (nettyConfig != null ? nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null);
  
  }
  
  }
  
  @Override
  
  public String toString() {
  
  return "NetworkEnvironmentConfiguration{" +
  
  "networkBufFraction=" + networkBufFraction +
  
  ", networkBufMin=" + networkBufMin +
  
  ", networkBufMax=" + networkBufMax +
  
  ", networkBufferSize=" + networkBufferSize +
  
  ", ioMode=" + ioMode +
  
  ", partitionRequestInitialBackoff=" + partitionRequestInitialBackoff +
  
  ", partitionRequestMaxBackoff=" + partitionRequestMaxBackoff +
  
  ", networkBuffersPerChannel=" + networkBuffersPerChannel +
  
  ", floatingNetworkBuffersPerGate=" + floatingNetworkBuffersPerGate +
  
  ", nettyConfig=" + nettyConfig +
  
  '}';
  
  }
  
  }
  
  NetworkEnvironmentConfiguration主要是flink network的相关配置,里头有networkBufFraction、networkBufMin、networkBufMax、networkBufferSize、ioMode、partitionRequestInitialBackoff、partitionRequestMaxBackoff、networkBuffersPerChannel、floatingNetworkBuffersPerGate、www.yongshiyule178.com/ nettyConfig属性
  
  TaskManagerServicesConfiguration
  
  flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
  
  public class TaskManagerServicesConfiguration {
  
  //......
  
  /**
  
   * Creates the {@link NetworkEnvironmentConfiguration} from the given {@link Configuration}.
  
   *
  
   * @param configuration to create the network environment configuration from
  
   * @param localTaskManagerCommunication true if task manager communication is local
  
   * @param taskManagerAddress www.tiaotiaoylzc.com/ address of the task manager
  
   * @param slots to start the task manager with
  
   * @return Network environment configuration
  
   */
  
  @SuppressWarnings("deprecation")
  
  private static NetworkEnvironmentConfiguration parseNetworkEnvironmentConfiguration(
  
  Configuration configuration,
  
  boolean localTaskManagerCommunication,
  
  InetAddress taskManagerAddress,
  
  int slots) throws Exception {
  
  // ----> hosts / ports for www.mytxyl1.com communication and data exchange
  
  int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT);
  
  checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(),
  
  "Leave config parameter empty or use 0 to let the system choose a port automatically.");
  
  checkConfigParameter(slots >= 1, slots, TaskManagerOptions.NUM_TASK_SLOTS.key(),
  
  "Number of task slots must be at least one.");
  
  final int pageSize = checkedDownCast(MemorySize.parse(configuration.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes());
  
  // check page size of for minimum size
  
  checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,
  
  TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
  
  "Minimum memory segment size is www.zhongxinyul2.com" + MemoryManager.MIN_PAGE_SIZE);
  
  // check page size for power of two
  
  checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
  
  TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
  
  "Memory segment size must be a power of 2.");
  
  // network buffer memory fraction
  
  float networkBufFraction = configuration.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
  
  long networkBufMin = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();
  
  long networkBufMax = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();
  
  checkNetworkBufferConfig(pageSize, networkBufFraction, networkBufMin, networkBufMax);
  
  // fallback: number of network buffers
  
  final int numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
  
  checkNetworkConfigOld(numNetworkBuffers);
  
  if (!hasNewNetworkBufConf(www.meiwanyule.cn/ configuration)) {
  
  // map old config to new one:
  
  networkBufMin = networkBufMax = ((long) numNetworkBuffers) * pageSize;
  
  } else {
  
  if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
  
  LOG.info("Ignoring old (but still present) network buffer configuration via {}.",
  
  TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
  
  }
  
  }
  
  final NettyConfig nettyConfig;
  
  if (!localTaskManagerCommunication) {
  
  final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport);
  
  nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(),
  
  taskManagerInetSocketAddress.getPort(), pageSize, slots, configuration);
  
  } else {
  
  nettyConfig = null;
  
  }
  
  // Default spill I/O mode for intermediate results
  
  final String syncOrAsync = configuration.getString(
  
  ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
  
  ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE);
  
  final IOManager.IOMode ioMode;
  
  if (syncOrAsync.equals("async")) {
  
  ioMode = IOManager.IOMode.ASYNC;
  
  } else {
  
  ioMode = IOManager.IOMode.SYNC;
  
  }
  
  int initialRequestBackoff = configuration.getInteger(
  
  TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
  
  int maxRequestBackoff = configuration.getInteger(
  
  TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
  
  int buffersPerChannel = configuration.getInteger(
  
  TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
  
  int extraBuffersPerGate = configuration.getInteger(
  
  TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
  
  return new NetworkEnvironmentConfiguration(
  
  networkBufFraction,
  
  networkBufMin,
  
  networkBufMax,
  
  pageSize,
  
  ioMode,
  
  initialRequestBackoff,
  
  maxRequestBackoff,
  
  buffersPerChannel,
  
  extraBuffersPerGate,
  
  nettyConfig);
  
  }
  
  //......
  
  }
  
  TaskManagerServicesConfiguration有个私有方法parseNetworkEnvironmentConfiguration,用于创建NetworkEnvironmentConfiguration;它会读取TaskManagerOptions.MEMORY_SEGMENT_SIZE、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX、TaskManagerOptions.NETWORK_NUM_BUFFERS、ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX、TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL、TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE等配置
  
  TaskManagerOptions
  
  flink-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
  
  @PublicEvolving
  
  public class TaskManagerOptions {
  
  //......
  
  /**
  
   * Size of memory buffers used by the network stack and the memory manager.
  
   */
  
  public static final ConfigOption<String> MEMORY_SEGMENT_SIZE =
  
  key("taskmanager.memory.segment-size")
  
  .defaultValue("32kb")
  
  .withDescription("Size of memory buffers used by the network stack and the memory manager.");
  
  /**
  
   * Fraction of JVM memory to use for network buffers.
  
   */
  
  public static final ConfigOption<Float> NETWORK_BUFFERS_MEMORY_FRACTION =
  
  key("taskmanager.network.memory.fraction")
  
  .defaultValue(0.1f)
  
  .withDescription("Fraction of JVM memory to use for network buffers. This determines how many streaming" +
  
  " data exchange channels a TaskManager can have at the same time and how well buffered the channels" +
  
  " are. If a job is rejected or you get a warning that the system has not enough buffers available," +
  
  " increase this value or the min/max values below. Also note, that \"taskmanager.network.memory.min\"" +
  
  "` and \"taskmanager.network.memory.max\" may override this fraction.");
  
  /**
  
   * Minimum memory size for network buffers.
  
   */
  
  public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MIN =
  
  key("taskmanager.network.memory.min")
  
  .defaultValue("64mb")
  
  .withDescription("Minimum memory size for network buffers.");
  
  /**
  
   * Maximum memory size for network buffers.
  
   */
  
  public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MAX =
  
  key("taskmanager.network.memory.max")
  
  .defaultValue("1gb")
  
  .withDescription("Maximum memory size for network buffers.");
  
  /**
  
   * Number of buffers used in the network stack. This defines the number of possible tasks and
  
   * shuffles.
  
   *
  
   * @deprecated use {@link #NETWORK_BUFFERS_MEMORY_FRACTION}, {@link #NETWORK_BUFFERS_MEMORY_MIN},
  
   * and {@link #NETWORK_BUFFERS_MEMORY_MAX} instead
  
   */
  
  @Deprecated
  
  public static final ConfigOption<Integer> NETWORK_NUM_BUFFERS =
  
  key("taskmanager.network.numberOfBuffers")
  
  .defaultValue(2048);
  
  /**
  
   * Minimum backoff for partition requests of input channels.
  
   */
  
  public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
  
  key("taskmanager.network.request-backoff.initial")
  
  .defaultValue(100)
  
  .withDeprecatedKeys("taskmanager.net.request-backoff.initial")
  
  .withDescription("Minimum backoff in milliseconds for partition requests of input channels.");
  
  /**
  
   * Maximum backoff for partition requests of input channels.
  
   */
  
  public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX =
  
  key("taskmanager.network.request-backoff.max")
  
  .defaultValue(10000)
  
  .withDeprecatedKeys("taskmanager.net.request-backoff.max")
  
  .withDescription("Maximum backoff in milliseconds for partition requests of input channels.");
  
  /**
  
   * Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).
  
   *
  
   * <p>Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization.
  
   */
  
  public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
  
  key("taskmanager.network.memory.buffers-per-channel")
  
  .defaultValue(2)
  
  .withDescription("Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel)." +
  
  "In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be" +
  
  " configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is" +
  
  " for parallel serialization.");
  
  /**
  
   * Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate).
  
   */
  
  public static final ConfigOption<Integer> NETWORK_EXTRA_BUFFERS_PER_GATE =
  
  key("taskmanager.network.memory.floating-buffers-per-gate")
  
  .defaultValue(8)
  
  .withDescription("Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate)." +
  
  " In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels." +
  
  " The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can" +
  
  " help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be" +
  
  " increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.");
  
  //......
  
  }
  
  taskmanager.memory.segment-size指定memory segment的大小,默认为32kb;taskmanager.network.memory.fraction指定network buffers使用的memory的比例,默认为0.1;taskmanager.network.memory.min指定network buffers使用的最小内存,默认为64mb;taskmanager.network.memory.max指定network buffers使用的最大内存,默认为1gb;taskmanager.network.numberOfBuffers指定network使用的buffers数量,默认为2048,该配置已经被废弃,使用taskmanager.network.memory.fraction、taskmanager.network.memory.min、taskmanager.network.memory.max这几个配置来替代
  
  taskmanager.network.request-backoff.initial指定input channels的partition requests的最小backoff时间(毫秒),默认为100;taskmanager.network.request-backoff.max指定input channels的partition requests的最大backoff时间(毫秒),默认为10000
  
  taskmanager.network.memory.buffers-per-channel指定每个outgoing/incoming channel使用buffers数量,默认为2;taskmanager.network.memory.floating-buffers-per-gate指定每个outgoing/incoming gate使用buffers数量,默认为8
  
  NettyConfig
  
  flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
  
  public class NettyConfig {
  
  private static final Logger LOG = LoggerFactory.getLogger(NettyConfig.class);
  
  // - Config keys ----------------------------------------------------------
  
  public static final ConfigOption<Integer> NUM_ARENAS = ConfigOptions
  
  .key("taskmanager.network.netty.num-arenas")
  
  .defaultValue(-1)
  
  .withDeprecatedKeys("taskmanager.net.num-arenas")
  
  .withDescription("The number of Netty arenas.");
  
  public static final ConfigOption<Integer> NUM_THREADS_SERVER = ConfigOptions
  
  .key("taskmanager.network.netty.server.numThreads")
  
  .defaultValue(-1)
  
  .withDeprecatedKeys("taskmanager.net.server.numThreads")
  
  .withDescription("The number of Netty server threads.");
  
  public static final ConfigOption<Integer> NUM_THREADS_CLIENT = ConfigOptions
  
  .key("taskmanager.network.netty.client.numThreads")
  
  .defaultValue(-1)
  
  .withDeprecatedKeys("taskmanager.net.client.numThreads")
  
  .withDescription("The number of Netty client threads.");
  
  public static final ConfigOption<Integer> CONNECT_BACKLOG = ConfigOptions
  
  .key("taskmanager.network.netty.server.backlog")
  
  .defaultValue(0) // default: 0 => Netty's default
  
  .withDeprecatedKeys("taskmanager.net.server.backlog")
  
  .withDescription("The netty server connection backlog.");
  
  public static final ConfigOption<Integer> CLIENT_CONNECT_TIMEOUT_SECONDS = ConfigOptions
  
  .key("taskmanager.network.netty.client.connectTimeoutSec")
  
  .defaultValue(120) // default: 120s = 2min
  
  .withDeprecatedKeys("taskmanager.net.client.connectTimeoutSec")
  
  .withDescription("The Netty client connection timeout.");
  
  public static final ConfigOption<Integer> SEND_RECEIVE_BUFFER_SIZE = ConfigOptions
  
  .key("taskmanager.network.netty.sendReceiveBufferSize")
  
  .defaultValue(0) // default: 0 => Netty's default
  
  .withDeprecatedKeys("taskmanager.net.sendReceiveBufferSize")
  
  .withDescription("The Netty send and receive buffer size. This defaults to the system buffer size" +
  
  " (cat /proc/sys/net/ipv4/tcp_[rw]mem) and is 4 MiB in modern Linux.");
  
  public static final ConfigOption<String> TRANSPORT_TYPE = ConfigOptions
  
  .key("taskmanager.network.netty.transport")
  
  .defaultValue("nio")
  
  .withDeprecatedKeys("taskmanager.net.transport")
  
  .withDescription("The Netty transport type, either \"nio\" or \"epoll\"");
  
  // ------------------------------------------------------------------------
  
  enum TransportType {
  
  NIO, EPOLL, AUTO
  
  }
  
  static final String SERVER_THREAD_GROUP_NAME = "Flink Netty Server";
  
  static final String CLIENT_THREAD_GROUP_NAME = "Flink Netty Client";
  
  private final InetAddress serverAddress;
  
  private final int serverPort;
  
  private final int memorySegmentSize;
  
  private final int numberOfSlots;
  
  private final Configuration config; // optional configuration
  
  public NettyConfig(
  
  InetAddress serverAddress,
  
  int serverPort,
  
  int memorySegmentSize,
  
  int numberOfSlots,
  
  Configuration config) {
  
  this.serverAddress = checkNotNull(serverAddress);
  
  checkArgument(serverPort >= 0 && serverPort <= 65536, "Invalid port number.");
  
  this.serverPort = serverPort;
  
  checkArgument(memorySegmentSize > 0, "Invalid memory segment size.");
  
  this.memorySegmentSize = memorySegmentSize;
  
  checkArgument(numberOfSlots > 0, "Number of slots");
  
  this.numberOfSlots = numberOfSlots;
  
  this.config = checkNotNull(config);
  
  LOG.info(this.toString());
  
  }
  
  InetAddress getServerAddress() {
  
  return serverAddress;
  
  }
  
  int getServerPort() {
  
  return serverPort;
  
  }
  
  int getMemorySegmentSize() {
  
  return memorySegmentSize;
  
  }
  
  public int getNumberOfSlots() {
  
  return numberOfSlots;
  
  }
  
  // ------------------------------------------------------------------------
  
  // Getters
  
  // ------------------------------------------------------------------------
  
  public int getServerConnectBacklog() {
  
  return config.getInteger(CONNECT_BACKLOG);
  
  }
  
  public int getNumberOfArenas() {
  
  // default: number of slots
  
  final int configValue = config.getInteger(NUM_ARENAS);
  
  return configValue == -1 ? numberOfSlots : configValue;
  
  }
  
  public int getServerNumThreads() {
  
  // default: number of task slots
  
  final int configValue = config.getInteger(NUM_THREADS_SERVER);
  
  return configValue == -1 ? numberOfSlots : configValue;
  
  }
  
  public int getClientNumThreads() {
  
  // default: number of task slots
  
  final int configValue = config.getInteger(NUM_THREADS_CLIENT);
  
  return configValue == -1 ? numberOfSlots : configValue;
  
  }
  
  public int getClientConnectTimeoutSeconds() {
  
  return config.getInteger(CLIENT_CONNECT_TIMEOUT_SECONDS);
  
  }
  
  public int getSendAndReceiveBufferSize() {
  
  return config.getInteger(SEND_RECEIVE_BUFFER_SIZE);
  
  }
  
  public TransportType getTransportType() {
  
  String transport = config.getString(TRANSPORT_TYPE);
  
  switch (transport) {
  
  case "nio":
  
  return TransportType.NIO;
  
  case "epoll":
  
  return TransportType.EPOLL;
  
  default:
  
  return TransportType.AUTO;
  
  }
  
  }
  
  @Nullable
  
  public SSLHandlerFactory createClientSSLEngineFactory() throws Exception {
  
  return getSSLEnabled() ?
  
  SSLUtils.createInternalClientSSLEngineFactory(config) :
  
  null;
  
  }
  
  @Nullable
  
  public SSLHandlerFactory createServerSSLEngineFactory() throws Exception {
  
  return getSSLEnabled() ?
  
  SSLUtils.createInternalServerSSLEngineFactory(config) :
  
  null;
  
  }
  
  public boolean getSSLEnabled() {
  
  return config.getBoolean(TaskManagerOptions.DATA_SSL_ENABLED)
  
  && SSLUtils.isInternalSSLEnabled(config);
  
  }
  
  public boolean isCreditBasedEnabled() {
  
  return config.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
  
  }
  
  public Configuration getConfig() {
  
  return config;
  
  }
  
  @Override
  
  public String toString() {
  
  String format = "NettyConfig [" +
  
  "server address: %s, " +
  
  "server port: %d, " +
  
  "ssl enabled: %s, " +
  
  "memory segment size (bytes): %d, " +
  
  "transport type: %s, " +
  
  "number of server threads: %d (%s), " +
  
  "number of client threads: %d (%s), " +
  
  "server connect backlog: %d (%s), " +
  
  "client connect timeout (sec): %d, " +
  
  "send/receive buffer size (bytes): %d (%s)]";
  
  String def = "use Netty's default";
  
  String man = "manual";
  
  return String.format(format, serverAddress, serverPort, getSSLEnabled() ? "true" : "false",
  
  memorySegmentSize, getTransportType(), getServerNumThreads(),
  
  getServerNumThreads() == 0 ? def : man,
  
  getClientNumThreads(), getClientNumThreads() == 0 ? def : man,
  
  getServerConnectBacklog(), getServerConnectBacklog() == 0 ? def : man,
  
  getClientConnectTimeoutSeconds(), getSendAndReceiveBufferSize(),
  
  getSendAndReceiveBufferSize() == 0 ? def : man);
  
  }
  
  }
  
  NettyConfig的构造器接收serverAddress、serverPort、memorySegmentSize、numberOfSlots、config这几个参数;它还提供了getServerConnectBacklog、getNumberOfArenas、getServerNumThreads、getClientNumThreads、getClientConnectTimeoutSeconds、getSendAndReceiveBufferSize、getTransportType等方法用于从config读取配置
  
  taskmanager.network.netty.server.backlog用于指定netty server的connection backlog,默认值为0即使用netty默认的配置;taskmanager.network.netty.client.connectTimeoutSec指定netty client的connection timeout,默认为120(单位秒);taskmanager.network.netty.sendReceiveBufferSize指定netty send/receive buffer大小,默认为0即使用netty的默认配置,默认是使用system buffer size,即/proc/sys/net/ipv4/tcp_[rw]mem的配置;taskmanager.network.netty.transport指定的是netty transport的类型,默认是nio
  
  taskmanager.network.netty.num-arenas指定的是netty arenas的数量,默认为-1;taskmanager.network.netty.server.numThreads指定的是netty server的threads数量,默认为-1;taskmanager.network.netty.client.numThreads指定的是netty client的threads数量,默认为-1;这几个配置当配置值为-1的时候,对应get方法返回的是numberOfSlots值
  
  小结
  
  NetworkEnvironmentConfiguration主要是flink network的相关配置,里头有networkBufFraction、networkBufMin、networkBufMax、networkBufferSize、ioMode、partitionRequestInitialBackoff、partitionRequestMaxBackoff、networkBuffersPerChannel、floatingNetworkBuffersPerGate、nettyConfig属性
  
  TaskManagerServicesConfiguration有个私有方法parseNetworkEnvironmentConfiguration,用于创建NetworkEnvironmentConfiguration;它会读取TaskManagerOptions.MEMORY_SEGMENT_SIZE、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX、TaskManagerOptions.NETWORK_NUM_BUFFERS、ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX、TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL、TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE等配置
  
  NettyConfig的构造器接收serverAddress、serverPort、memorySegmentSize、numberOfSlots、config这几个参数;它还提供了getServerConnectBacklog、getNumberOfArenas、getServerNumThreads、getClientNumThreads、getClientConnectTimeoutSeconds、getSendAndReceiveBufferSize、getTransportType等方法用于从config读取配置
  
  doc
  
  taskmanager-network-memory-fraction

相关文章

Flink-core小总结1.实时计算和离线计算1.1离线计算离线计算的...
2022年7月26日,Taier1.2版本正式发布!本次版本发布更新功能...
关于Flink相关的概念性东西就不说了,网上都有,官网也很详尽...
最近准备用flink对之前项目进行重构,这是一个有挑战(但我很...
Thispostoriginallyappearedonthe ApacheFlinkblog.Itwasre...
Flink配置文件对于管理员来说,差不多经常调整的就只有conf下...