Flink三种流和窗口转换的梳理

1. 简单介绍

目前所能理解的程度(持续更新),知道Flink中有三种流,DataStream, AllWindowedStream,KeyedStream,WindowedStream。

1.1 DataStream经过以下方法可以转化为AllWindowedStream

	// 1.TimeWindow之滚动窗口
  def timeWindowAll(size: Time): AllWindowedStream[T, TimeWindow] = {
    new AllWindowedStream(javaStream.timeWindowAll(size))
  }
  // 2.TimeWindow之滑动窗口
  def timeWindowAll(size: Time, slide: Time): AllWindowedStream[T, TimeWindow] = {
  	new AllWindowedStream(javaStream.timeWindowAll(size, slide))
  }
  // 3.Countwindow之滑动窗口
  def countwindowAll(size: Long, slide: Long): AllWindowedStream[T, GlobalWindow] = {
    new AllWindowedStream(stream.countwindowAll(size, slide))
  }
  // 4.Countwindow之滚动窗口
  def countwindowAll(size: Long): AllWindowedStream[T, GlobalWindow] = {
    new AllWindowedStream(stream.countwindowAll(size))
  }
  // 5.
  def windowAll[W <: Window](assigner: WindowAssigner[_ >: T, W]): AllWindowedStream[T, W] = {
    new AllWindowedStream[T, W](new JavaAllWindowedStream[T, W](stream, assigner))
  }

AllWindowedStream有很多与DataStream相似的方法

对AllWindowStream的操作流程:

stream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

1.2 DataStream经过以下方法可以转化为KeyedStream


def keyBy(fields: Int*): KeyedStream[T, JavaTuple] = asScalaStream(stream.keyBy(fields: _*))

def keyBy(firstField: String, otherFields: String*): KeyedStream[T, JavaTuple] =
    asScalaStream(stream.keyBy(firstField +: otherFields.toArray: _*))
    
def keyBy[K: Typeinformation](fun: T => K): KeyedStream[T, K] = {

    val cleanFun = clean(fun)
    val keyType: Typeinformation[K] = implicitly[Typeinformation[K]]

    val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
      def getKey(in: T) = cleanFun(in)
      override def getProducedType: Typeinformation[K] = keyType
    }
    asScalaStream(new JavaKeyedStream(stream, keyExtractor, keyType))
}

def keyBy[K: Typeinformation](fun: KeySelector[T, K]): KeyedStream[T, K] = {

    val cleanFun = clean(fun)
    val keyType: Typeinformation[K] = implicitly[Typeinformation[K]]

    asScalaStream(new JavaKeyedStream(stream, cleanFun, keyType))
}

对AllWindowStream的操作流程:

stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

1.3 KeyedStream经过以下方法可以转化为WindowedStream

注意WindowedStream是由KeyedStream转换而来的

  def timeWindow(size: Time): WindowedStream[T, K, TimeWindow] = {
    new WindowedStream(javaStream.timeWindow(size))
  }

  def countwindow(size: Long, slide: Long): WindowedStream[T, K, GlobalWindow] = {
    new WindowedStream(javaStream.countwindow(size, slide))
  }

  def countwindow(size: Long): WindowedStream[T, K, GlobalWindow] = {
    new WindowedStream(javaStream.countwindow(size))
  }

  def timeWindow(size: Time, slide: Time): WindowedStream[T, K, TimeWindow] = {
    new WindowedStream(javaStream.timeWindow(size, slide))
  }

  def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W] = {
    new WindowedStream(new WindowedJavaStream[T, K, W](javaStream, assigner))
  }

2. 运用示例

2.1 WindowedStream运用

2.1.1 示例一

import java.sql.Timestamp
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.ListStateDescriptor
import org.apache.flink.api.java.tuple.{Tuple, Tuple1}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.mutable.ListBuffer

// 定义输入数据的样例类
case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)

// 定义窗口聚合结果的样例类
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)

object HotItems {
    def main(args: Array[String]): Unit = {
        // 创建一个流处理环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)

        // 定义时间语义
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        // 从文件读取数据
        val inputStream = env.readTextFile("F:\\SparkWorkSpace\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources\\UserBehavior.csv")
        // 将数据转换成样例类型,并且提取timestamp定义watermark
        val dataStream = inputStream.map(data => {
            val dataArray = data.split(",")
            UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)
        }).assignAscendingTimestamps(_.timestamp * 1000L) // 指定时间时间  将时间戳的秒转换为微秒

        // 对数据进行转换 过滤出pv行为,开窗聚合统计个数
        val aggStream: DataStream[ItemViewCount] = dataStream
            .filter(_.behavior == "pv") // 过滤pv行为
            .keyBy("itemId") // 按照itemId分组
            .timeWindow(Time.hours(1), Time.minutes(5)) //定义滑动窗口
            .aggregate(new CountAgg(), new ItemCountwindowResult())

        val aggStream111: WindowedStream[UserBehavior, Tuple, TimeWindow] = dataStream
            .filter(_.behavior == "pv") // 过滤pv行为
            .keyBy("itemId") // 按照itemId分组
            .timeWindow(Time.hours(1), Time.minutes(5)) //定义滑动窗口

        //        aggStream.print("aggStream")
        // 对窗口聚合结果按照窗口进行分组,并做排序取TopN输出
        val resultStream: DataStream[String] = aggStream
            .keyBy("windowEnd")
            .process(new TopNHotItems(5))

        resultStream.print("resultStream")
        env.execute("hot items job")
    }
}

// 自定义预聚合函数 来一条数据就加1
class CountAgg() extends AggregateFunction[UserBehavior, Long, Long] {
    // 初始值
    override def createAccumulator(): Long = 0L

    // 来一条数据就加1
    override def add(value: UserBehavior, accumulator: Long): Long = accumulator + 1

    // 输出结果
    override def getResult(accumulator: Long): Long = accumulator

    // 两个累加器结果相加
    override def merge(a: Long, b: Long): Long = a + b
}

// 扩展示例:求平均值
class AggAvg() extends AggregateFunction[UserBehavior, (Int, Long), Double] {
    override def createAccumulator(): (Int, Long) = (0, 0L)

    override def add(value: UserBehavior, accumulator: (Int, Long)): (Int, Long) = {
        (accumulator._1 + 1, accumulator._2 + value.timestamp)
    }

    override def getResult(accumulator: (Int, Long)): Double = {
        accumulator._2 / accumulator._1.todouble
    }

    override def merge(a: (Int, Long), b: (Int, Long)): (Int, Long) = {
        (a._1 + b._1, a._2 + b._2)
    }
}

// 自定义窗口函数,结合window信息包装成样例类  泛型:预聚合的输出作为in 输出out key window
class ItemCountwindowResult() extends WindowFunction[Long, ItemViewCount, Tuple, TimeWindow] {

    override def apply(key: Tuple, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
        // 获取key 需要用java的元组类型
        val itemId = key.asInstanceOf[Tuple1[Long]].f0
        // 获取窗口结束时间
        val windowEnd = window.getEnd
        // 获取预聚合值
        val count = input.iterator.next()
        // 输出结果
        out.collect(ItemViewCount(itemId, windowEnd, count))
    }
}

// 自定义keyedProcessFunction
class TopNHotItems(n: Int) extends KeyedProcessFunction[Tuple, ItemViewCount, String] {
    // 定义listState,用于保存当前所有的count结果
    lazy val itemCountListState = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("itemcount-list", classOf[ItemViewCount]))

    override def processElement(value: ItemViewCount,
                                ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context,
                                out: Collector[String]): Unit = {
        // 每来一条数据,就把它保存在状态中
        itemCountListState.add(value)
        // 注册定时器,在windowEnd+100触发 1.定时器是对keyBy后的每个键分别起作用2.对于同一个key多次注册也只有一个
        // 对窗口分组,对其中的每条数据排序,然后输出
        ctx.timerService().registerEventTimeTimer(value.windowEnd + 100)
    }

    // 定时器触发时,从状态中取数据,然后排序输出
    override def onTimer(timestamp: Long,
                         ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext,
                         out: Collector[String]): Unit = {
        // 先把状态中的数据提取一个ListBuffer中
        val allItemCountList: ListBuffer[ItemViewCount] = ListBuffer()
        // 由于itemCountListState底层是java使用scala的遍历语法需要引入隐式转换
        import scala.collection.JavaConversions._
        for (itemCount <- itemCountListState.get()) {
            allItemCountList += itemCount
        }

        // 按照count大小排序
        val sortedItemCountList = allItemCountList.sortBy(_.count)(Ordering.Long.reverse).take(n)

        // 清空状态
        itemCountListState.clear()

        // 排名信息格式化成String, 方便监控显示
        val result: StringBuilder = new StringBuilder
        result.append("时间").append(new Timestamp(timestamp - 100)).append("\n")
        // 遍历sorted列表,输出TopN信息
        for (i <- sortedItemCountList.indices) {
            // 获取当前商品的count信息
            val currentItemCount = sortedItemCountList(i)
            result.append("Top").append(i + 1).append(":")
                .append(" 商品Id=").append(currentItemCount.itemId)
                .append(" 访问量=").append(currentItemCount.count)
                .append("\n")
        }
        result.append("===============================\n\n")

        Thread.sleep(1000)
        // 输出结果
        out.collect(result.toString())
    }
}

2.1.2 示例二

import java.sql.Timestamp
import java.text.SimpleDateFormat
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.ListStateDescriptor
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.mutable.ListBuffer

// 定义输入数据样例类
case class ApacheLogEvent(ip: String, userId: String, eventTime: Long, method: String, url: String)

// 定义聚合结果样例类
case class PageViewCount(url: String, windowEnd: Long, count: Long)

object NetworkTopNPage {
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)

        // 从文件中读取数据
        val inputStream: DataStream[String] = env.readTextFile("F:\\SparkWorkSpace\\UserBehaviorAnalysis\\NetworKFlowAnalysis\\src\\main\\resources\\apache.log")

        // 转换成样例类,指定timestamp和watermark
        val dataStream = inputStream
            .map(data => {
                val dataArray = data.split(" ")
                // 将时间字段转换为时间戳
                val simpleDataFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
                val timestamp = simpleDataFormat.parse(dataArray(3)).getTime

                ApacheLogEvent(dataArray(0), dataArray(1), timestamp, dataArray(5), dataArray(6))
            })
            .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent](Time.seconds(60)) {
                override def extractTimestamp(element: ApacheLogEvent): Long = element.eventTime
            })

        // 开窗聚合

        val aggStream = dataStream
            //            .keyBy("url")
            .keyBy(_.url) // 后面获取间的类型就可以不是Tuple类型了,且可以直接获取
            .timeWindow(Time.minutes(10), Time.seconds(5))
            .aggregate(new PageCountAgg(), new PageCountwindowResult())

        // 每个窗口的统计值排序输出
        val resultStream = aggStream
            //            .key("window")
            .keyBy(_.windowEnd)
            .process(new TopNHotPage(3))

        resultStream.print("resultStream")
        env.execute("  ")
    }
}

// 自定义预聚合函数
class PageCountAgg() extends AggregateFunction[ApacheLogEvent, Long, Long] {
    override def createAccumulator(): Long = 0L

    override def add(value: ApacheLogEvent, accumulator: Long): Long = accumulator + 1

    override def getResult(accumulator: Long): Long = accumulator

    override def merge(a: Long, b: Long): Long = a + b
}

// 自定义windowFunction, 包装成样例类
class PageCountwindowResult() extends WindowFunction[Long, PageViewCount, String, TimeWindow] {
    override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PageViewCount]): Unit = {
        out.collect(PageViewCount(key, window.getEnd, input.head))
    }
}

// 自定义Process Function
class TopNHotPage(n: Int) extends KeyedProcessFunction[Long, PageViewCount, String] {

    // 定义ListState保存所有聚合结果
    lazy val pageCountListState = getRuntimeContext.getListState(new ListStateDescriptor[PageViewCount]("pagecount-list", classOf[PageViewCount]))

    override def processElement(value: PageViewCount,
                                ctx: KeyedProcessFunction[Long,
                                    PageViewCount, String]#Context, out: Collector[String]): Unit = {
        pageCountListState.add(value)
        ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)
    }

    // 等到数据都到齐,从状态中取出,排序输出
    override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PageViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
        val allPageCountList: ListBuffer[PageViewCount] = ListBuffer()

        val iter = pageCountListState.get().iterator()
         // 清空状态
        pageCountListState.clear()

        while (iter.hasNext) {
            allPageCountList += iter.next()

            val sortedPageCountList: ListBuffer[PageViewCount] = allPageCountList.sortWith(_.count > _.count).take(n)

            // 排名信息格式化成String, 方便监控显示
            val result: StringBuilder = new StringBuilder
            result.append("时间").append(new Timestamp(timestamp - 1)).append("\n")
            // 遍历sorted列表,输出TopN信息
            for (i <- sortedPageCountList.indices) {
                // 获取当前商品的count信息
                val currentItemCount = sortedPageCountList(i)
                result.append("Top").append(i + 1).append(":")
                    .append(" 页面url=").append(currentItemCount.url)
                    .append(" 访问量=").append(currentItemCount.count)
                    .append("\n")
            }
            result.append("===============================\n\n")

            Thread.sleep(1000)
            // 输出结果
            out.collect(result.toString())

        }
    }
}

相关文章

Flink-core小总结1.实时计算和离线计算1.1离线计算离线计算的...
2022年7月26日,Taier1.2版本正式发布!本次版本发布更新功能...
关于Flink相关的概念性东西就不说了,网上都有,官网也很详尽...
最近准备用flink对之前项目进行重构,这是一个有挑战(但我很...
Thispostoriginallyappearedonthe ApacheFlinkblog.Itwasre...
Flink配置文件对于管理员来说,差不多经常调整的就只有conf下...