问题描述
我创建一个物化视图,该视图在经过一段时间的操作后,消费者的滞后开始增大。我尝试重新创建视图并重新启动ksqlDB服务器以解决该问题,但是它不起作用。下面是复制过程和jstack日志。
第一步:创建流ksql_stream_ori_xhtcmes_produce_test
create stream ksql_stream_ori_xhtcmes_produce_test with (kafka_topic = 'xhtcmes.prod.produce_test',value_format = 'avro');
步骤2:基于流创建物化视图
create table ksql_table_xhtcmes_produce_test_stats as
select
MO_ORDER_ID as mo_id,MO_LOT_NO,TENANT_ID,LINE_ID,(case
when IS_PASS = true then 1
when IS_PASS = false then 2
else 0
end
) cause_type,STATION_ID,substring((TIMESTAMPTOSTRING(creation_time,'yyyy-MM-dd HH:mm:ss','GMT')),1,10) as PRODUCE_DATE,substring(substring((TIMESTAMPTOSTRING(creation_time,12,12),2) as produce_hour,SUM((
case
when ((IS_PASS = true) and (IS_ACTIVE = true)) then 1
when ((IS_PASS = true) and (IS_ACTIVE = false)) then -1
when (IS_PASS = false) then 1 else 0
end)) STATS
from
Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST
where
(STATUS_CODE = 4)
group by
MO_ORDER_ID,(case
when IS_PASS = true then 1
when IS_PASS = false then 2
else 0
end
),10),2)
emit changes;
查看表的消费者信息:
ksql> describe extended Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS;
Name : Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS
Type : TABLE
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : AVRO
Kafka topic : Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS (partitions: 1,replication: 3)
Statement : CREATE TABLE Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS WITH (KAFKA_TOPIC='Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS',PARTITIONS=1,REPLICAS=3) AS SELECT
Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.MO_ORDER_ID MO_ID,Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.MO_LOT_NO MO_LOT_NO,Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.TENANT_ID TENANT_ID,Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.LINE_ID LINE_ID,(CASE WHEN (Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_PASS = true) THEN 1 WHEN (Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_PASS = false) THEN 2 ELSE 0 END) CAUSE_TYPE,Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.STATION_ID STATION_ID,SUBSTRING(TIMESTAMPTOSTRING(Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.CREATION_TIME,'GMT'),10) PRODUCE_DATE,SUBSTRING(SUBSTRING(TIMESTAMPTOSTRING(Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.CREATION_TIME,2) PRODUCE_HOUR,SUM((CASE WHEN ((Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_PASS = true) AND (Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_ACTIVE = true)) THEN 1 WHEN ((Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_PASS = true) AND (Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_ACTIVE = false)) THEN -1 WHEN (Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_PASS = false) THEN 1 ELSE 0 END)) STATS
FROM Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST
WHERE (Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.STATUS_CODE = 4)
GROUP BY Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.MO_ORDER_ID,Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.MO_LOT_NO,Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.TENANT_ID,Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.LINE_ID,(CASE WHEN (Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_PASS = true) THEN 1 WHEN (Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_PASS = false) THEN 2 ELSE 0 END),Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.STATION_ID,2)
EMIT CHANGES;
Field | Type
---------------------------------------------
Ksql_COL_0 | VARCHAR(STRING) (primary key)
STATS | INTEGER
---------------------------------------------
Queries that read from this TABLE
-----------------------------------
CTAS_Ksql_TABLE_GP_XHTcmeS_PRODUCE_TEST_STATS_SUM_215 (RUNNING) : CREATE TABLE Ksql_TABLE_GP_XHTcmeS_PRODUCE_TEST_STATS_SUM WITH (KAFKA_TOPIC='Ksql_TABLE_GP_XHTcmeS_PRODUCE_TEST_STATS_SUM',REPLICAS=3) AS SELECT Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS.Ksql_COL_0 ROWKEY,SPLIT(Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS.Ksql_COL_0,'|')[1] MO_ID,'|')[3] MO_LOT_NO,CAST(SPLIT(Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS.Ksql_COL_0,'|')[5] AS INTEGER) TENANT_ID,'|')[7] LINE_ID,'|')[9] AS INTEGER) CAUSE_TYPE,'|')[11] STATION_ID,'|')[13] PRODUCE_DATE,'|')[15] PRODUCE_HOUR,SUM(Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS.STATS) PRODUCE_COUNT FROM Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS GROUP BY Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS.Ksql_COL_0 EMIT CHANGES;
For query topology and execution plan please run: EXPLAIN <QueryId>
Queries that write from this TABLE
-----------------------------------
CTAS_Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS_205 (RUNNING) : CREATE TABLE Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS WITH (KAFKA_TOPIC='Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS',REPLICAS=3) AS SELECT Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.MO_ORDER_ID MO_ID,SUM((CASE WHEN ((Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_PASS = true) AND (Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_ACTIVE = true)) THEN 1 WHEN ((Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_PASS = true) AND (Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_ACTIVE = false)) THEN -1 WHEN (Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.IS_PASS = false) THEN 1 ELSE 0 END)) STATS FROM Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST WHERE (Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.STATUS_CODE = 4) GROUP BY Ksql_STREAM_ORI_XHTcmeS_PRODUCE_TEST.MO_ORDER_ID,2) EMIT CHANGES;
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics
------------------------
consumer-messages-per-sec: 0 consumer-total-bytes: 1512203 consumer-total-messages: 9723 messages-per-sec: 0 total-messages: 1 last-message: 2020-09-21T08:43:45.104Z
(Statistics of the local Ksql server interaction with the Kafka topic Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS)
Consumer Groups summary:
Consumer Group : _confluent-ksql-ksql0.11.0-002_query_CTAS_Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS_205
Kafka topic : _confluent-ksql-ksql0.11.0-002_query_CTAS_Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS_205-Aggregate-GroupBy-repartition
Max lag : 0
Partition | Start Offset | End Offset | Offset | Lag
------------------------------------------------------
0 | 11139 | 11154 | 11154 | 0
------------------------------------------------------
Kafka topic : xhtcmes.prod.produce_test
Max lag : 5947
Partition | Start Offset | End Offset | Offset | Lag
--------------------------------------------------------
0 | 2202590 | 2663267 | 2657320 | 5947
--------------------------------------------------------
查看ksql过程运行时日志:
"kafka-admin-client-thread | _confluent-ksql-ksql0.11.0-002_query_CTAS_Ksql_TABLE_XHTcmeS_PRODUCE_TEST_STATS_205-c9926e52-9353-49de-a7a3-7581c2eb7050-admin" #272 daemon prio=5 os_prio=0 cpu=115.18ms elapsed=429.91s tid=0x00007f786f489800 nid=0x1bf2f runnable [0x00007f76d847f000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPoll.wait([email protected]/Native Method)
at sun.nio.ch.EPollSelectorImpl.doSelect([email protected]/EPollSelectorImpl.java:120)
at sun.nio.ch.SelectorImpl.lockAndDoSelect([email protected]/SelectorImpl.java:124)
- locked <0x00000006053da4e0> (a sun.nio.ch.Util$2)
- locked <0x00000006051a7920> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select([email protected]/SelectorImpl.java:136)
at org.apache.kafka.common.network.Selector.select(Selector.java:869)
at org.apache.kafka.common.network.Selector.poll(Selector.java:465)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1302)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1233)
at java.lang.Thread.run([email protected]/Thread.java:834)
Locked ownable synchronizers:
- None
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)