使用PySpark进行Spark流式处理

问题描述

我有一个Spark Streaming应用程序(pyspark),在其中我从后端数据中心监视应用程序通过Kafka端点接收消息(警报),如下所示。

时间T:{操作:“插入”,alarmId:.....,alarmName:.....,alarmType:.....,///} 时间T + 1:{操作:“更新”,与上面相同...} 时间T + 2:/// .... 时间T + n:{操作:“删除”,与上面相同...}

肯定会插入和删除,而更新则发生0 ..许多次。

如果有新警报,它首先以“插入”消息的形式出现,然后随后的“更新”消息也将在“插入”之后到达相同警报ID的流中,以提供同一警报的更新。清除警报后,删除就会到达。

我的业务问题是,每当有新警报消息“插入”到达时,我都需要检查以下“更新”,以及是否在下一个 Y 分钟,我需要创建一个票证(新消息)并将其自动发送到另一个工作流应用程序(使用REST API)。

我的问题是,如果要通过Spark Streaming解决此问题,应在此处应用哪种类型的窗口?如何确保此资格认证流程运行稳定?例如我不希望在X + 1次之后再次提出票证,因为它在X次发生时已经被提出。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...