间隔任务导致R2DBC中的IDLE连接耗尽

问题描述

我正在使用Reactor Java使用r2dbc对Postgres运行以下定期任务;

Flux.interval(Duration.ofMillis(1000)).doOnNext(i->{
            System.out.print("TIME HAS TICKED\n");
            Flux.range(0,10).flatMap(j->{
                return service.getJob(this.consumerQueueName,this.filter).then();
            }).subscribe();
        }).subscribe();

大约5分钟后,它将停止处理作业,并且当我检查postgres连接都处于空闲状态时:


select        datname as database_name,client_addr as client_address,application_name,backend_start,state,state_change
from pg_stat_activity;

integrity_service   10.0.73.1   r2dbc-postgresql    2020-09-18 04:11:07.786098  idle    2020-09-18 04:11:40.471893
integrity_service   10.0.73.1   r2dbc-postgresql    2020-09-18 04:11:07.785822  idle    2020-09-18 04:12:01.196558
integrity_service   10.0.73.1   r2dbc-postgresql    2020-09-18 04:11:07.785598  idle    2020-09-18 04:11:50.971738
integrity_service   10.0.73.1   r2dbc-postgresql    2020-09-18 04:11:07.785317  idle    2020-09-18 04:11:30.506207
integrity_service   10.0.73.1   r2dbc-postgresql    2020-09-18 04:11:07.665800  idle    2020-09-18 04:11:20.570714

如何适当地使用r2dbc和databaseClient定期从表中获取数据而不会导致此异常?

//ConnectionFactory Settings:

ConnectionFactories.get(
                ConnectionFactoryOptions.builder()
                        .option(Option.valueOf("driver"),"pool")
                        .option(Option.valueOf("protocol"),"postgresql")
                        //.option(ConnectionFactoryOptions.DRIVER,"postgresql")
                        .option(ConnectionFactoryOptions.HOST,"localhost")
                        .option(ConnectionFactoryOptions.PORT,5432)  // optional,defaults to 5432
                        .option(ConnectionFactoryOptions.USER,"db")
                        .option(ConnectionFactoryOptions.DATABASE,"integrity_service")
                        .option(MAX_SIZE,5)
                        .build());

private final String fetchJobFormat =
            " WITH cte AS ( SELECT id FROM %s WHERE chain_id='%s' and is_complete=%b  ORDER BY id ASC LIMIT 1\n" +
                    "    )\n" +
                    "                UPDATE queue q\n" +
                    "                SET timestamp = extract(epoch from Now()),\n" +
                    "                is_complete = TRUE\n" +
                    "                FROM cte WHERE q.id  = cte.id\n" +
                    "                RETURNING q.id,q.chain_id,q.timestamp,q.is_complete,q.payload";

 public Mono<Queue> getJob(String queue,String filter){

        return databaseClient.execute(String.format(fetchJobFormat,queue,filter,false))
                .fetch().all()
                .flatMap((v) -> {
                    System.out.println("retrieved result" + v.get("id").toString());
                    Queue q = this.objectMapper.convertValue(v,Queue.class);
                    return Mono.just(q);
                }).last();
    }

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)