问题描述
我已经编写了一个自定义重试策略类,在该类中我无法传递任何重试驱动程序,它将执行onWriteTimeout / onUnavilable / onReadTimeout。
public class CustomretryPolicy implements RetryPolicy {
private static final Logger LOG = LoggerFactory.getLogger(CustomretryPolicy.class);
@VisibleForTesting
public static final String retrying_ON_READ_TIMEOUT =
"[{}] retrying on read timeout on same host (consistency: {},required responses: {},"
+ "received responses: {},data retrieved: {},retries: {})";
@VisibleForTesting
public static final String retrying_ON_WRITE_TIMEOUT =
"[{}] retrying on write timeout on same host (consistency: {},write type: {},"
+ "required ackNowledgments: {},received ackNowledgments: {},retries: {})";
@VisibleForTesting
public static final String retrying_ON_UNAVAILABLE =
"[{}] retrying on unavailable exception on next host (consistency: {},"
+ "required replica: {},alive replica: {},retries: {})";
@VisibleForTesting
public static final String retrying_ON_ABORTED =
"[{}] retrying on aborted request on next host (retries: {})";
@VisibleForTesting
public static final String retrying_ON_ERROR =
"[{}] retrying on node error on next host (retries: {})";
private static final String LOG_PREFIX = "DATASTORE-CASSANDRA";
private final int readAttempts;
private final int writeAttempts;
private final int unavailableAttempts;
public CustomretryPolicy(int readAttempts,int writeAttempts,int unavailableAttempts) {
this.readAttempts = readAttempts;
this.writeAttempts = writeAttempts;
this.unavailableAttempts = unavailableAttempts;
}
@Override
public RetryDecision onReadTimeout(Request request,ConsistencyLevel cl,int blockFor,int received,boolean dataPresent,int retryCount) {
RetryDecision decision = (retryCount < readAttempts && received >= blockFor && !dataPresent)
? RetryDecision.RETRY_SAME
: RetryDecision.RETHROW;
if (decision == RetryDecision.RETRY_SAME && LOG.isTraceEnabled()) {
LOG.trace(retrying_ON_READ_TIMEOUT,LOG_PREFIX,cl,blockFor,received,false,retryCount);
}
return decision;
}
@Override
public RetryDecision onWriteTimeout(Request request,WriteType writeType,int retryCount) {
RetryDecision decision = (retryCount < writeAttempts && writeType == DefaultWriteType.BATCH_LOG)
? RetryDecision.RETRY_SAME
: RetryDecision.RETHROW;
if (decision == RetryDecision.RETRY_SAME && LOG.isTraceEnabled()) {
LOG.trace(retrying_ON_WRITE_TIMEOUT,writeType,retryCount);
}
return decision;
}
@Override
public RetryDecision onUnavailable(Request request,int required,int alive,int retryCount) {
RetryDecision decision =
(retryCount < unavailableAttempts) ? RetryDecision.RETRY_NEXT : RetryDecision.RETHROW;
if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled()) {
LOG.trace(retrying_ON_UNAVAILABLE,required,alive,retryCount);
}
return decision;
}
@Override
public RetryDecision onRequestAborted(Request request,Throwable error,int retryCount) {
RetryDecision decision =
(error instanceof ClosedConnectionException || error instanceof HeartbeatException)
? RetryDecision.RETRY_NEXT
: RetryDecision.RETHROW;
if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled()) {
LOG.trace(retrying_ON_ABORTED,retryCount,error);
}
return decision;
}
@Override
public RetryDecision onErrorResponse(Request request,CoordinatorException error,int retryCount) {
RetryDecision decision =
(error instanceof ReadFailureException || error instanceof WriteFailureException)
? RetryDecision.RETHROW
: RetryDecision.RETRY_NEXT;
if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled()) {
LOG.trace(retrying_ON_ERROR,error);
}
return decision;
}
@Override
public void close() {
// nothing to do
}
}
我正在使用datastax Java驱动程序4.6.0。 但是问题是我无法通过CQLSessionBuilder传递此类的对象,这可以通过
RetryPolicy rc = new CustomretryPolicy(3,3,2);
Cluster cluster = Cluster.builder().addContactPoint("192.168.0.0").withRetryPolicy(rc).build();
在旧版驱动程序中为。我已经尝试过使用DriverConfigLoader,但是只有一个选项可以传递自定义类名称。
请您提出建议。
解决方法
如果查看DefaultRetryPolicy的实现和CustomRetryPolicy的示例,您会看到两者都接收2个参数:context
类型的DriverContext
和字符串与个人资料名称。然后,您应该能够使用context
通过DriverConfig
来获取getConfig
,然后在配置上使用getProfile
来提取自定义策略所需的配置值-您可以将自己的配置值放入配置文件中,然后在重试策略中使用它,如下所示:
datastax-java-driver {
advanced.retry-policy {
class = DefaultRetryPolicy
}
profiles {
custom-retries {
advanced.retry-policy {
class = CustomRetryPolicy
custom-policy {
read-attempts = 3
write-attempts = 2
...
}
}
}
}
}