ksqlDB 0.11.0聚合使用者延迟

问题描述

我创建一个物化视图,该视图在经过一段时间的操作后,消费者的滞后开始增大。我尝试重新创建视图并重新启动ksqlDB服务器以解决该问题,但是它不起作用。下面是复制过程和jstack日志。

第一步:创建流ksql_stream_ori_xhtcmes_produce_test

create stream ksql_stream_ori_xhtcmes_produce_test with (kafka_topic = 'xhtcmes.prod.produce_test',value_format = 'avro'); 

步骤2:基于流创建物化视图

create table ksql_table_xhtcmes_produce_test_stats as
    select
        MO_ORDER_ID as mo_id,MO_LOT_NO,TENANT_ID,LINE_ID,(case 
        when IS_PASS = true then 1
        when IS_PASS = false then 2
        else 0
        end
        ) cause_type,STATION_ID,substring((TIMESTAMPTOSTRING(creation_time,'yyyy-MM-dd HH:mm:ss','GMT')),1,10) as PRODUCE_DATE,substring(substring((TIMESTAMPTOSTRING(creation_time,12,12),2) as produce_hour,SUM((
        case 
        when ((IS_PASS = true) and (IS_ACTIVE = true)) then 1 
        when ((IS_PASS = true) and (IS_ACTIVE = false)) then -1 
        when (IS_PASS = false) then 1 else 0 
        end)) STATS
    from
        Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST
    where
        (STATUS_CODE = 4)
    group by
        MO_ORDER_ID,(case 
        when IS_PASS = true then 1
        when IS_PASS = false then 2
        else 0
        end
        ),10),2)
        emit changes;

查看表的消费者信息:

ksql> describe extended  Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS;

Name                 : Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS
Type                 : TABLE
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : AVRO
Kafka topic          : Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS (partitions: 1,replication: 3)
Statement            : CREATE TABLE Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS WITH (KAFKA_TOPIC='Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS',PARTITIONS=1,REPLICAS=3) AS SELECT
  Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.MO_ORDER_ID MO_ID,Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.MO_LOT_NO MO_LOT_NO,Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.TENANT_ID TENANT_ID,Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.LINE_ID LINE_ID,(CASE WHEN (Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_PASS = true) THEN 1 WHEN (Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_PASS = false) THEN 2 ELSE 0 END) CAUSE_TYPE,Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.STATION_ID STATION_ID,SUBSTRING(TIMESTAMPTOSTRING(Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.CREATION_TIME,'GMT'),10) PRODUCE_DATE,SUBSTRING(SUBSTRING(TIMESTAMPTOSTRING(Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.CREATION_TIME,2) PRODUCE_HOUR,SUM((CASE WHEN ((Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_PASS = true) AND (Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_ACTIVE = true)) THEN 1 WHEN ((Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_PASS = true) AND (Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_ACTIVE = false)) THEN -1 WHEN (Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_PASS = false) THEN 1 ELSE 0 END)) STATS
FROM Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST
WHERE (Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.STATUS_CODE = 4)
GROUP BY Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.MO_ORDER_ID,Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.MO_LOT_NO,Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.TENANT_ID,Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.LINE_ID,(CASE WHEN (Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_PASS = true) THEN 1 WHEN (Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_PASS = false) THEN 2 ELSE 0 END),Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.STATION_ID,2)
EMIT CHANGES;

 Field      | Type                           
---------------------------------------------
 Ksql_COL_0 | VARCHAR(STRING)  (primary key) 
 STATS      | INTEGER                        
---------------------------------------------

Queries that read from this TABLE
-----------------------------------
CTAS_Ksql_TABLE_GP_XHTcmeS_PRODUCE_TEST_STATS_SUM_215 (RUNNING) : CREATE TABLE Ksql_TABLE_GP_XHTcmeS_PRODUCE_TEST_STATS_SUM WITH (KAFKA_TOPIC='Ksql_TABLE_GP_XHTcmeS_PRODUCE_TEST_STATS_SUM',REPLICAS=3) AS SELECT   Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS.Ksql_COL_0 ROWKEY,SPLIT(Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS.Ksql_COL_0,'|')[1] MO_ID,'|')[3] MO_LOT_NO,CAST(SPLIT(Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS.Ksql_COL_0,'|')[5] AS INTEGER) TENANT_ID,'|')[7] LINE_ID,'|')[9] AS INTEGER) CAUSE_TYPE,'|')[11] STATION_ID,'|')[13] PRODUCE_DATE,'|')[15] PRODUCE_HOUR,SUM(Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS.STATS) PRODUCE_COUNT FROM Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS GROUP BY Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS.Ksql_COL_0 EMIT CHANGES;

For query topology and execution plan please run: EXPLAIN <QueryId>

Queries that write from this TABLE
-----------------------------------
CTAS_Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS_205 (RUNNING) : CREATE TABLE Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS WITH (KAFKA_TOPIC='Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS',REPLICAS=3) AS SELECT   Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.MO_ORDER_ID MO_ID,SUM((CASE WHEN ((Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_PASS = true) AND (Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_ACTIVE = true)) THEN 1 WHEN ((Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_PASS = true) AND (Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_ACTIVE = false)) THEN -1 WHEN (Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_PASS = false) THEN 1 ELSE 0 END)) STATS FROM Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST WHERE (Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.STATUS_CODE = 4) GROUP BY Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.MO_ORDER_ID,2) EMIT CHANGES;

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------
consumer-messages-per-sec:         0 consumer-total-bytes:   1512203 consumer-total-messages:      9723 messages-per-sec:         0   total-messages:         1     last-message: 2020-09-21T08:43:45.104Z

(Statistics of the local Ksql server interaction with the Kafka topic Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS)

Consumer Groups summary:

Consumer Group       : _confluent-ksql-ksql0.11.0-002_query_CTAS_Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS_205

Kafka topic          : _confluent-ksql-ksql0.11.0-002_query_CTAS_Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS_205-Aggregate-GroupBy-repartition
Max lag              : 0

 Partition | Start Offset | End Offset | Offset | Lag 
------------------------------------------------------
 0         | 11139        | 11154      | 11154  | 0   
------------------------------------------------------

Kafka topic          : xhtcmes.prod.produce_test
Max lag              : 5947

 Partition | Start Offset | End Offset | Offset  | Lag  
--------------------------------------------------------
 0         | 2202590      | 2663267    | 2657320 | 5947 
--------------------------------------------------------

查看ksql过程运行时日志:

"kafka-admin-client-thread | _confluent-ksql-ksql0.11.0-002_query_CTAS_Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS_205-c9926e52-9353-49de-a7a3-7581c2eb7050-admin" #272 daemon prio=5 os_prio=0 cpu=115.18ms elapsed=429.91s tid=0x00007f786f489800 nid=0x1bf2f runnable  [0x00007f76d847f000]
   java.lang.Thread.State: RUNNABLE
    at sun.nio.ch.EPoll.wait([email protected]/Native Method)
    at sun.nio.ch.EPollSelectorImpl.doSelect([email protected]/EPollSelectorImpl.java:120)
    at sun.nio.ch.SelectorImpl.lockAndDoSelect([email protected]/SelectorImpl.java:124)
    - locked <0x00000006053da4e0> (a sun.nio.ch.Util$2)
    - locked <0x00000006051a7920> (a sun.nio.ch.EPollSelectorImpl)
    at sun.nio.ch.SelectorImpl.select([email protected]/SelectorImpl.java:136)
    at org.apache.kafka.common.network.Selector.select(Selector.java:869)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:465)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1302)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1233)
    at java.lang.Thread.run([email protected]/Thread.java:834)

   Locked ownable synchronizers:
    - None

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)