问题描述
背景:
Redisson 版本: 3.15.0 框架: Spring Boot Redis 实例类型:集群模式(Azure Cache Premium with 2 nodes)
我正在使用 redisson 的 scheduleWithFixedDelay API 来调度两种延迟作业:
每 2 分钟重复一次,初始延迟为 5 分钟,最大执行次数为 5 次 每 2 分钟重复一次,初始延迟为 15 分钟,最大执行次数为 10 次 我已经使用 RAtomicLong 数据类型实现了最大执行计数的逻辑。
问题:
当我使用 executor.scheduleWithFixedDelay API 进行调度时,作业不会在配置的预期初始延迟执行。为了添加更多上下文,我生成了延迟作业类型的多个实例,并且没有一个在预期的延迟(5 分钟或 15 分钟)下运行。事实上,所有这些都在第二天大约在同一时间运行,比最初的创建时间延迟了大约 8 小时。
另一方面,我使用 executor.execute API 安排的作业运行良好,没有任何问题。该问题仅发生在 scheduleWithFixedDelay API 中。
下面是一些示例源代码,大致展示了延迟作业的样子以及 redissonClient / redisson 节点创建代码。
非常感谢您对此的任何帮助:)
代码
DelayedJobCreator
@Service
public class DelayedJobCreator {
@Autowired
private final RedissonClient redissonClient;
public void createTypeOne(DelayedJobData jobData) {
Long initialDelay = 300 L;
Long repeatedDelay = 120 L;
Rscheduledexecutorservice executor = redissonClient
.getExecutorService("delayed-job-service");
executor.scheduleWithFixedDelay(
new TypeOneDelayedJob(jobData),initialDelay,repeatedDelay,TimeUnit.SECONDS
);
}
public void createTypeTwo(DelayedJobData jobData) {
Long initialDelay = 900 L;
Long repeatedDelay = 120 L;
Rscheduledexecutorservice executor = redissonClient
.getExecutorService("delayed-job-service");
executor.scheduleWithFixedDelay(
new TypeTwoDelayedJob(jobData),TimeUnit.SECONDS
);
}
}
TypeOneDelayedJob
@Slf4j
@Component
public class TypeOneDelayedJob implements Runnable,Serializable {
private Long maxCount = 5;
private Long maxDelay = 1800000; // 30 mins
DelayedJobData jobData;
@RInject
RedissonClient redissonClient;
@RInject
String taskId;
@Override
public void run() {
RAtomicLong count = redissonClient.getAtomicLong("countTypeOne:" + taskId);
Long newValue = count.incrementAndGet();
Long createdEpoch = jobData.getEpoch();
Long currentEpoch = System.currentTimeMillis();
Rscheduledexecutorservice executor = redissonClient.getExecutorService("delayed-job-service");
if (newValue > maxCount) {
log.info("Maximum retries hit for TypeOneDelayedJob with taskId: {}",taskId);
executor.cancelTask(taskId);
} else if (currentEpoch - createdEpoch > maxDelay) {
log.info("Maximum delay TypeOneDelayedJob with taskId: {}",taskId);
executor.cancelTask(taskId);
} else {
// Job logic
}
}
}
TypeTwoDelayedJob
@Slf4j
@Component
public class TypeTwoDelayedJob implements Runnable,Serializable {
private Long maxCount = 10;
private Long maxDelay = 1800000; // 30 mins
DelayedJobData jobData;
@RInject
RedissonClient redissonClient;
@RInject
String taskId;
@Override
public void run() {
RAtomicLong count = redissonClient.getAtomicLong("countTypeOne:" + taskId);
Long newValue = count.incrementAndGet();
Long createdEpoch = jobData.getEpoch();
Long currentEpoch = System.currentTimeMillis();
Rscheduledexecutorservice executor = redissonClient.getExecutorService("delayed-job-service");
if (newValue > maxCount) {
log.info("Maximum retries hit for TypeTwoDelayedJob with taskId: {}",taskId);
executor.cancelTask(taskId);
} else if (currentEpoch - createdEpoch > maxDelay) {
log.info("Maximum delay TypeTwoDelayedJob with taskId: {}",taskId);
executor.cancelTask(taskId);
} else {
// Job logic
}
}
}
Spring bean创建-配置类
@Bean
public Config redissonConfig() {
Config config = new Config();
config.useClusterServers()
.addNodeAddress(host)
.setPassword(password);
return config;
}
@SneakyThrows
@Bean(destroyMethod = "shutdown")
public RedissonClient redissonClient(Config config) {
return Redisson.create(config);
}
@Bean(destroyMethod = "shutdown")
public RedissonNode redissonNode(Config config) {
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.setbeanfactory(beanfactory);
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("delayed-job-service",2));
RedissonNode node = RedissonNode.create(nodeConfig);
node.start();
return node;
}
目前采取的步骤
我们注意到与 Redisson 连接超时相关的日志中很少有 redisson 错误 i,e
错误 6 --- [isson-timer-4-1] orchandler.PingConnectionHandler:无法通过通道发送 PING 命令:[id: 0x6f24e6ea,L:/10.114.48.173:57092 - R:euw-prod- 052-fps-ctwr-redis-cluster.redis.cache.windows.net/10.114.49.200:6380]
org.redisson.client.RedisTimeoutException:命令执行超时:(PING),参数:[],Redis 客户端:[addr=rediss://euw-prod-052-fps-ctwr-redis-cluster。 redis.cache.windows.net:6380] 在 org.redisson.client.RedisConnection.lambda$async$1(RedisConnection.java:207) ~[redisson-3.15.0.jar!/:3.15.0] 在 io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672) [netty-common-4.1.58.Final.jar!/:4.1.58.Final] 在 io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747) [netty-common-4.1.58.Final.jar!/:4.1.58.Final] 在 io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472) [netty-common-4.1.58.Final.jar!/:4.1.58.Final] 在 io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.58.Final.jar!/:4.1.58.Final] 在 java.lang.Thread.run(Thread.java:748) [na:1.8.0_212] 我们认为这些错误导致 scheduleWithFixedDelay 无法正常工作,因此为了修复这些连接超时,我们为 redisson 客户端设置了以下参数:
pingConnectionInterval: 10000
keepAlive: true
在执行这些配置更改后,我们有 2 天没有注意到这些超时,并且 scheduleWithFixedDelay API 确实按预期工作。但是,我们在 2 天后开始收到这些超时。
我想知道处理这种情况的正确方法是什么。此外,如前所述,这仅适用于 scheduleWithFixedDelay 用例,而不适用于执行 API。为什么这种情况只发生在调度 API 而不是执行 API 上?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)