package com.mz.iot.test;
import com.mz.iot.utils.DateUtil;
import com.mz.iot.utils.FlinkUtils;
import com.mz.iot.utils.LogUtils;
import lombok.*;
import org.apache.commons.compress.utils.Lists;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Comparator;
/**
* 求窗口内最大元素与最小元素的差值
* 注意点:最小元素是前一个窗口的最大值
* 适用于窗口很短,没有触发器的窗口,作为基础任务
*/
public class TestwindowDiff2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = FlinkUtils.createEnv("");
/**
* 输入数据格式
* a,2020-10-01 00:12:01,1
* a,2020-10-01 00:12:02,3
* a,2020-10-01 00:12:06,5
*
* a,2020-10-01 00:12:10,10
* a,2020-10-01 00:12:18,12
*
* a,2020-10-01 00:12:28,15
*
* a,2020-10-01 00:12:33,18
* a,2020-10-01 00:12:38,20
*
* a,2020-10-01 00:13:05,28
* a,2020-10-01 00:13:08,38
*
* a,2020-10-01 00:13:18,40
*
*/
DataStream<String> socket = env.socketTextStream("mz-hadoop-03", 7777);
socket.print("socket stream");
SingleOutputStreamOperator<Event> mainStream = socket.map(new MapFunction<String, Event>() {
@Override
public Event map(String value) throws Exception {
String[] arr = value.split(",");
return new Event(arr[0].trim(), arr[1], Float.parseFloat(arr[2]));
}
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(5)) {
@Override
public long extractTimestamp(Event element) {
return DateUtil.getMillsFromString(element.getTime());
}
});
SingleOutputStreamOperator<WindowResult> windowResultStream = mainStream.keyBy("kind")
.window(TumblingEventTimeWindows.of(Time.seconds(10), Time.minutes(0)))
.process(new ProcessWindowFunction<Event, WindowResult, Tuple, TimeWindow>() {
ValueState<Float> lastMaxState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
lastMaxState = getRuntimeContext().getState(new ValueStateDescriptor<>("last-max", Types.FLOAT));
}
@Override
public void process(Tuple tuple, Context context, Iterable<Event> elements, Collector<WindowResult> out) throws Exception {
String t_start = DateUtil.getDateStrFromMill(context.window().getStart());
String t_end = DateUtil.getDateStrFromMill(context.window().getEnd());
String wm = DateUtil.getDateStrFromMill(context.currentWatermark());
ArrayList<Event> events = Lists.newArrayList(elements.iterator());
events.sort(new Comparator<Event>() {
@Override
public int compare(Event o1, Event o2) {
return new BigDecimal(o2.getValue() + "").compareto(new BigDecimal(o1.getValue() + ""));
}
});
Event event_max = events.get(0);
Event event_min = events.get(events.size() - 1);
LogUtils.info("窗口边界:[" + t_start + "," + t_end + "),水位线:" + wm + ",最大值:" + event_max + ",最小值:" + event_min);
/**
* 第一个窗口批次数据
*/
if (lastMaxState.value() == null) {
lastMaxState.update(event_min.getValue());
}
/**
* 取出state,使用
*/
float lastValue = lastMaxState.value();
System.out.println("上一个窗口的最大值为:" + lastValue);
/**
* 更新state
*/
lastMaxState.update(event_max.getValue());
/**
* 写出数据,补充window信息以及差值diff
*/
WindowResult result = new WindowResult();
result.setKind(event_max.getKind());
result.setValue(event_max.getValue());
result.setwindowStart(context.window().getStart());
result.setDiff(new BigDecimal(String.valueOf(event_max.getValue())).subtract(new BigDecimal(String.valueOf(lastValue))).floatValue());
out.collect(result);
}
});
windowResultStream.print("resultStream====>");
env.execute("test window diff with socket stream");
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Event {
private String kind;
private String time;
private float value;
}
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public static class WindowResult {
private String kind;
private float value;//当前批次的实时值
private long windowStart;//当前批次所在窗口起始
private float diff;//
@Override
public String toString() {
return "WindowResult{" +
"kind='" + kind + '\'' +
", value=" + value +
", useCnt=" + diff +
", windowStart=" + DateUtil.getDateStrFromMill(windowStart) +
'}';
}
}
}