Datastax Java驱动程序自定义重试策略

问题描述

我已经编写了一个自定义重试策略类,在该类中我无法传递任何重试驱动程序,它将执行onWriteTimeout / onUnavilable / onReadTimeout。

public class CustomretryPolicy implements RetryPolicy {


  private static final Logger LOG = LoggerFactory.getLogger(CustomretryPolicy.class);

  @VisibleForTesting
  public static final String retrying_ON_READ_TIMEOUT =
      "[{}] retrying on read timeout on same host (consistency: {},required responses: {},"
          + "received responses: {},data retrieved: {},retries: {})";

  @VisibleForTesting
  public static final String retrying_ON_WRITE_TIMEOUT =
      "[{}] retrying on write timeout on same host (consistency: {},write type: {},"
          + "required ackNowledgments: {},received ackNowledgments: {},retries: {})";

  @VisibleForTesting
  public static final String retrying_ON_UNAVAILABLE =
      "[{}] retrying on unavailable exception on next host (consistency: {},"
          + "required replica: {},alive replica: {},retries: {})";

  @VisibleForTesting
  public static final String retrying_ON_ABORTED =
      "[{}] retrying on aborted request on next host (retries: {})";

  @VisibleForTesting
  public static final String retrying_ON_ERROR =
      "[{}] retrying on node error on next host (retries: {})";

  private static final String LOG_PREFIX = "DATASTORE-CASSANDRA";

  private final int readAttempts;
  private final int writeAttempts;
  private final int unavailableAttempts;

  public CustomretryPolicy(int readAttempts,int writeAttempts,int unavailableAttempts) {
    this.readAttempts = readAttempts;
    this.writeAttempts = writeAttempts;
    this.unavailableAttempts = unavailableAttempts;
  }

  @Override
  public RetryDecision onReadTimeout(Request request,ConsistencyLevel cl,int blockFor,int received,boolean dataPresent,int retryCount) {


    RetryDecision decision = (retryCount < readAttempts && received >= blockFor && !dataPresent)
        ? RetryDecision.RETRY_SAME
        : RetryDecision.RETHROW;

    if (decision == RetryDecision.RETRY_SAME && LOG.isTraceEnabled()) {
      LOG.trace(retrying_ON_READ_TIMEOUT,LOG_PREFIX,cl,blockFor,received,false,retryCount);
    }

    return decision;
  }



  @Override
  public RetryDecision onWriteTimeout(Request request,WriteType writeType,int retryCount) {
    RetryDecision decision = (retryCount < writeAttempts && writeType == DefaultWriteType.BATCH_LOG)
        ? RetryDecision.RETRY_SAME
        : RetryDecision.RETHROW;

    if (decision == RetryDecision.RETRY_SAME && LOG.isTraceEnabled()) {
      LOG.trace(retrying_ON_WRITE_TIMEOUT,writeType,retryCount);
    }
    return decision;
  }

  @Override
  public RetryDecision onUnavailable(Request request,int required,int alive,int retryCount) {
    RetryDecision decision =
        (retryCount < unavailableAttempts) ? RetryDecision.RETRY_NEXT : RetryDecision.RETHROW;

    if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled()) {
      LOG.trace(retrying_ON_UNAVAILABLE,required,alive,retryCount);
    }

    return decision;
  }

  @Override
  public RetryDecision onRequestAborted(Request request,Throwable error,int retryCount) {
    RetryDecision decision =
        (error instanceof ClosedConnectionException || error instanceof HeartbeatException)
            ? RetryDecision.RETRY_NEXT
            : RetryDecision.RETHROW;

    if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled()) {
      LOG.trace(retrying_ON_ABORTED,retryCount,error);
    }

    return decision;
  }

  @Override
  public RetryDecision onErrorResponse(Request request,CoordinatorException error,int retryCount) {
    RetryDecision decision =
        (error instanceof ReadFailureException || error instanceof WriteFailureException)
            ? RetryDecision.RETHROW
            : RetryDecision.RETRY_NEXT;

    if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled()) {
      LOG.trace(retrying_ON_ERROR,error);
    }

    return decision;
  }

  @Override
  public void close() {

    // nothing to do

  }



}

我正在使用datastax Java驱动程序4.6.0。 但是问题是我无法通过CQLSessionBuilder传递此类的对象,这可以通过

RetryPolicy rc = new CustomretryPolicy(3,3,2);
Cluster cluster = Cluster.builder().addContactPoint("192.168.0.0").withRetryPolicy(rc).build();

在旧版驱动程序中为

。我已经尝试过使用DriverConfigLoader,但是只有一个选项可以传递自定义名称

请您提出建议。

解决方法

如果查看DefaultRetryPolicy的实现和CustomRetryPolicy的示例,您会看到两者都接收2个参数:context类型的DriverContext和字符串与个人资料名称。然后,您应该能够使用context通过DriverConfig来获取getConfig,然后在配置上使用getProfile来提取自定义策略所需的配置值-您可以将自己的配置值放入配置文件中,然后在重试策略中使用它,如下所示:

datastax-java-driver {
  advanced.retry-policy {
    class = DefaultRetryPolicy
  }
  profiles {
    custom-retries {
      advanced.retry-policy {
        class = CustomRetryPolicy
        custom-policy {
           read-attempts = 3
           write-attempts = 2
           ...
        }
      }
    }
  }
}