网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。
一个人可以走的很快,但一群人才能走的更远!不论你是正从事it行业的老鸟或是对it行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
-
每个事件发生的时间。这个时间一般是在进入到flink之前就包含在事件中
-
针对eventtime,事件被处理的时间以来与事件本身
-
eventtime必须要指定如何生成eventtime watermark(水印)
-
理想情况,不管事件何时到达或者顺序如何,事件时间处理能够得到完整一致地结果。
-
事件处理在等待乱序事件时,会产生一些延迟。这样会对eventtime的应用性能有一定的影响
- 摄入时间
-
摄入时间是事件进入flink的时间
-
在source operator中,每个记录以时间戳的形式获取源的当前时间
-
它在概念是处于事件时间和处理时间中间
-
摄入时间不能处理乱序问题或者延迟数据,摄入时间可以由流式系统自动生成水印
flink代码中设置时间类型
通常,我们在flink初始化流式运行环境时,就会设置流处理时间特性。这个设置很重要,它决定了数据流的行为方式。(例如:是否需要给事件分配时间戳),以及窗口操作应该使用什么样的时间类型。例如:keyedstream.timewindow(time.seconds(30))。
我们接下来通过实现一个每5秒中进行一次单词计数的案例,来说明flink中如何指定时间类型。
public class wordcountwindow {
public static void main(string[] args) throws exception {
// 1. 初始化流式运行环境
configuration conf = new configuration();
streamexecutionenvironment env = streamexecutionenvironment.createlocalenvironmentwithwebui(conf);
// 2. 设置时间处理类型,这里设置的方式处理时间
env.setstreamtimecharacteristic(timecharacteristic.processingtime);
// 3. 定义数据源,每秒发送一个hadoop单词
datastreamsource wordds = env.addsource(new richsourcefunction() {
private boolean iscanaled = false;
@override
public void run(sourcecontext ctx) throws exception {
while (!iscanaled) {
ctx.collect(“hadooop”);
thread.sleep(1000);
}
}
@override
public void cancel() {
iscanaled = true;
}
});
// 4. 每5秒进行一次,分组统计
// 4.1 转换为元组
wordds.map(word -> tuple2.of(word, 1))
// 指定返回类型
.returns(types.tuple(types.string, types.int))
// 按照单词进行分组
.keyby(t -> t.f0)
// 滚动窗口,3秒计算一次
.timewindow(time.seconds(3))
.reduce(new reducefunction<tuple2<string, integer>>() {
@override
public tuple2<string, integer> reduce(tuple2<string, integer> value1, tuple2<string, integer> value2) throws exception {
return tuple2.of(value1.f0, value1.f1 + value2.f1);
}
}, new richwindowfunction<tuple2<string, integer>, tuple2<string, integer>, string, timewindow>() {
@override
public void apply(string word, timewindow window, iterable<tuple2<string, integer>> input, collector<tuple2<string, integer>> out) throws exception {
// 打印窗口开始、结束时间
simpledateformat sdf = new simpledateformat(“yyyy-mm-dd hh:mm:ss”);
system.out.println(“窗口开始时间:” + sdf.format(window.getstart())
-
" 窗口结束时间:" + sdf.format(window.getend())
-
" 窗口计算时间:" + sdf.format(system.currenttimemillis()));
int sum = 0;
iterator<tuple2<string, integer>> iterator = input.iterator();
while(iterator.hasnext()) {
integer count = iterator.next().f1;
sum += count;
}
out.collect(tuple2.of(word, sum));
}
}).print();
env.execute(“app”);
}
}
我们可以看到,这个滚动窗口,每3秒计算一次,是按照系统时间来计算的。
我们再把时间窗口设置为1分钟,再试试。
flink窗口介绍及应用
============
windows是flink流计算的核心,本文将概括的介绍几种窗口的概念,重点只放在窗口的应用上。
一、窗口(window)的类型
对于窗口的操作主要分为两种,分别对于keyedstream和datastream。他们的主要区别也仅仅在于建立窗口的时候一个为.window(…),一个为.windowall(…)。对于keyedstream的窗口来说,他可以使得多任务并行计算,每一个logical key stream将会被独立的进行处理。
stream
.keyby(…) <- keyed versus non-keyed windows
.window(…)/.windowall(…) <- required: “assigner”
[.trigger(…)] <- optional: “trigger” (else default trigger)
[.evictor(…)] <- optional: “evictor” (else no evictor)
[.allowedlateness(…)] <- optional: “lateness” (else zero)
[.sideoutputlatedata(…)] <- optional: “output tag” (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: “function”
[.getsideoutput(…)] <- optional: “output tag”
按照窗口的assigner来分,窗口可以分为
tumbling window, sliding window,session window,global window,custom window
每种窗口又可分别基于processing time和event time,这样的话,窗口的类型严格来说就有很多。
还有一种window叫做count window,依据元素到达的数量进行分配,之后也会提到。
窗口的生命周期开始在第一个属于这个窗口的元素到达的时候,结束于第一个不属于这个窗口的元素到达的时候。
二、窗口的操作
2.1 tumbling window
固定相同间隔分配窗口,每个窗口之间没有重叠看图一眼明白。
下面的例子定义了每隔3毫秒一个窗口的流:
windowedstream<movierate, integer, timewindow> rates = rates
.keyby(movierate::getuserid)
.window(tumblingeventtimewindows.of(time.milliseconds(3)));
2.2 sliding windows
跟上面一样,固定相同间隔分配窗口,只不过每个窗口之间有重叠。窗口重叠的部分如果比窗口小,窗口将会有多个重叠,即一个元素可能被分配到多个窗口里去。
下面的例子给出窗口大小为10毫秒,重叠为5毫秒的流:
windowedstream<movierate, integer, timewindow> rates = rates
.keyby(movierate::getuserid)
.window(slidingeventtimewindows.of(time.milliseconds(10), time.milliseconds(5)));
2.3 session window
这种窗口主要是根据活动的事件进行窗口化,他们通常不重叠,也没有一个固定的开始和结束时间。一个session window关闭通常是由于一段时间没有收到元素。在这种用户交互事件流中,我们首先想到的是将事件聚合到会话窗口中(一段用户持续活跃的周期),由非活跃的间隙分隔开。
// 静态间隔时间
windowedstream<movierate, integer, timewindow> rates = rates
.keyby(movierate::getuserid)
.window(eventtimesessionwindows.withgap(time.milliseconds(10)));
// 动态时间
windowedstream<movierate, integer, timewindow> rates = rates
.keyby(movierate::getuserid)
.window(eventtimesessionwindows.withdynamicgap(()));
2.4 global window
将所有相同keyed的元素分配到一个窗口里。好吧,就这样:
windowedstream<movierate, integer, globalwindow> rates = rates
.keyby(movierate::getuserid)
.window(globalwindows.create());
三、窗口函数
窗口函数就是这四个:reducefunction,aggregatefunction,foldfunction,processwindowfunction。前两个执行得更有效,因为flink可以增量地聚合每个到达窗口的元素。
flink必须在调用函数之前在内部缓冲窗口中的所有元素,所以使用processwindowfunction进行操作效率不高。不过processwindowfunction可以跟其他的窗口函数结合使用,其他函数接受增量信息,processwindowfunction接受窗口的元数据。
举一个aggregatefunction的例子吧,下面代码为movierate按user分组,且分配5毫秒的tumbling窗口,返回每个user在窗口内评分的所有分数的平均值。
datastream<tuple2<integer,double>> rates = rates
.keyby(movierate::getuserid)
.window(tumblingeventtimewindows.of(time.milliseconds(5)))
.aggregate(new aggregatefunction<movierate, averageaccumulator, tuple2<integer,double>>() {
@override
public averageaccumulator createaccumulator() {
return new averageaccumulator();
}
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!
由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新
public averageaccumulator createaccumulator() {
return new averageaccumulator();
}
[外链图片转存中…(img-radszvdx-1715297050539)]
[外链图片转存中…(img-euimiluw-1715297050539)]
[外链图片转存中…(img-nt5vv3nt-1715297050540)]
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!
由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新
发表评论