Cloud Dataflow 上的 apache 光束中的会话窗口无法正常工作

问题描述

要求如下: 我们希望根据以下逻辑跟踪用户事件并创建会话

  • 30 分钟不活动
  • UTC 时间结束

为此,我们将所有用户事件发布到 pubsub。 在apache beam管道中,我们读取pubsub消息,按visitor_id和event_date分组,并将它们分组到具有条件的用户会话窗口中:

beam.WindowInto(window.Sessions(30 * 60),timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW)

            (pipeline
     | "Read PubSub Messages" >> beam.io.ReadFrompubSub(subscription=ap_options.subscription)
     | 'Keyed on Visitor Id and Date' >> beam.Map(lambda x: ((x['visitor_id'],x['event_date']),x))
     | 'User Session Window' >> beam.WindowInto(window.Sessions(30 * 60),timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW)
     | 'Group on VisitorId And Date' >> beam.GroupByKey()
     | 'Get Sessionsed Rows' >> beam.ParDo(AddSessionId())
     | 'Write to BigQuery' >> beam.io.WritetoBigQuery(table=ap_options.bq_table,schema=TABLE_SCHEMA,write_disposition=beam.io.BigQuerydisposition.WRITE_APPEND,create_disposition=beam.io.BigQuerydisposition.CREATE_NEVER)
     )

问题是——我们在 bigquery 中看到两个连续事件相隔超过 30 分钟的记录。 例如:

enter image description here

发生的事情是——beam 比较最后两个事件,如果它们相隔超过 30 分钟,它会将所有事件转储到 bigquery,包括最后一个事件。 而应该将相隔 30 分钟以上的最后一个事件添加到单独的会话中。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)