当前位置: 首页 > news >正文

Flink学习笔记:多流 Join

前面我们已经了解了 Flink 几个核心概念,分别是时间、Watermark 已经窗口。今天我们来一起了解下 Flink 是怎么进行多个流的 Join 的。

我们今天从两个流的 Join 来入手,扩展到多个流也是一样的道理。Flink 中的 Join 可以分为两种:Window Join 和 Interval Join。

Window Join

Window Join 是将两个流中在相同窗口中且有相同 key 的元素进行关联。关联后,可以使用 JoinFunction 和 FlatJoinFunction 进行处理。Window Join 可以根据窗口类型分为三种:Tumbling Window Join、Sliding Window Join 和 Session Window Join。

Tumbling Window Join

首先来看Tumbling Window Join,其实就是对应的使用滚动窗口进行 Join。

TumblingWindowJoin

具体使用方法如下:

DataStream<Tuple2<String, Double>> result = source1.join(source2)

.where(record -> record.f0)

.equalTo(record -> record.f0)

.window(TumblingEventTimeWindows.of(Time.seconds(2L)))

.apply(new JoinFunction<Tuple2<String, Double>, Tuple2<String, Double>, Tuple2<String, Double>>() {

@Override

public Tuple2<String, Double> join(Tuple2<String, Double> record1, Tuple2<String, Double> record2) throws Exception {

return Tuple2.of(record1.f0, record1.f1);

}

});

其中 source1 和 source2 分别代表两个流,where 为 source1 的 join key 提取方法,equalTo 为 source2 的 join key 提取方法,最后,join 好之后的数据通过 JoinFunction 来处理。

Sliding Window Join

Sliding Window Join 和 Tumbling Window Join 的用法基本一致,只是将窗口指定为滑动窗口。

SlidingWindowJoin

Session Window Join

Session Window Join 也类似,只是指定的窗口不同,具体的处理流程都是一样的,这里也不过多解释。

Interval Join

Interval Join 是将两个流中 key 相同,且一个流的 timestamp 处于另一个流的 timestamp 上下波动范围内。

假设我们有两个流 a 和 b,Interval Join可以表达为b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] 或 a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound。

需要注意的是,目前 Interval Join 仅支持 event time。

IntervalJoin

它的使用方法也很简单,只需要定义上下偏移量以及处理函数即可。

DataStream<Tuple2<String, Double>> intervalJoinResult = source1.keyBy(record -> record.f0)

.intervalJoin(source2.keyBy(record -> record.f0))

.between(Time.seconds(-2), Time.seconds(2))

.process(new ProcessJoinFunction<Tuple2<String, Double>, Tuple2<String, Double>, Tuple2<String, Double>>() {

@Override

public void processElement(Tuple2<String, Double> record1, Tuple2<String, Double> record2, ProcessJoinFunction<Tuple2<String, Double>, Tuple2<String, Double>, Tuple2<String, Double>>.Context context, Collector<Tuple2<String, Double>> out) throws Exception {

out.collect(Tuple2.of(record1.f0, record1.f1 + record2.f1));

}

});

CoGroup

前面介绍的两种 Join 都是 inner join,那么 Flink 有没有办法支持 left join 呢?答案是肯定的,我们可以使用 coGroup 来实现。

coGroup 的通用用法如下:

stream.coGroup(otherStream)

.where(<KeySelector>)

.equalTo(<KeySelector>)

.window(<WindowAssigner>)

.apply(<CoGroupFunction>);

我们通过自定义 CoGroupFunction 来实现 left join。

private static class LeftJoinFunction implements CoGroupFunction<Tuple2<String, Double>, Tuple2<String, Double>, Tuple2<String, Double>> {

@Override

public void coGroup(Iterable<Tuple2<String, Double>> iterable1, Iterable<Tuple2<String, Double>> iterable2, Collector<Tuple2<String, Double>> collector) throws Exception {

for (Tuple2<String, Double> record1 : iterable1) {

boolean match = false;

for (Tuple2<String, Double> record2 : iterable2) {

match = true;

collector.collect(Tuple2.of(record1.f0, record1.f1 + record2.f1));

}

if (!match) {

System.out.println("没有join的元素 key:" + record1.f0);

collector.collect(Tuple2.of(record1.f0, record1.f1));

}

}

}

}

在 coGroupFunction 中,需要实现 coGroup 方法,方法的参数包括两个输入流的 Iterable 和输出的 collector。如果第二个流中没有匹配的元素,那么就直接输出第一个流的元素。

总结

最后来总结一下,Flink 中有两种 Join 方法,分别为 Window Join 和 Interval Join,Window Join 是依赖窗口来执行,对窗口内的元素进行 join,Interval Join 不依赖窗口,是根据 event time 的范围来进行 join。最后还介绍了 CoGroup,我们可以使用 CoGroup 来实现 left join 和 right join。

http://www.cnnetsun.cn/news/64294.html

相关文章:

  • AI产品经理必读:构建智能交互系统的终极指南!
  • 谷歌浏览器性能面板使用指南
  • 警惕绿色积分陷阱!一分钟揭秘消费骗局
  • 13、CentOS网络管理全攻略
  • 技术实践:用大模型平台重构医疗数据分析Pipeline
  • 智元AGIBOT荣登具身智能机器人技术研发排行榜TOP1
  • Gitee vs GitHub 2025深度评测:国产代码托管平台的崛起与超越
  • JVM 安全与沙箱深度解析
  • t-SNE快速降维算法详解与实现
  • Python编程入门从零开始掌握基础语法一
  • 20、BusyBox:嵌入式系统的强大工具
  • python 生成psd文件
  • 25、Linux内核调试全攻略:挑战与解决方案
  • 30、Linux移植与实时性:从定制平台到实时系统的深入解析
  • 【界面案例】火语言RPA读取Excel文件,循环写入界面表格
  • 【JAVA进阶】鸿蒙开发与SpringBoot深度融合:从接口设计到服务部署全解析
  • [C#][winform]基于yolov11的水下目标检测系统C#源码+onnx模型+评估指标曲线+精美GUI界面
  • 【睿擎派】云端一体,多种通信协议构建机械臂运动控制系统
  • 4.1用户空间RTOSAPI
  • 11、嵌入式Linux开发:内核日志存储、追踪系统与设备树管理
  • 17、Yocto项目软件层与应用开发全解析
  • 宁波紧固件产业集群的外向型制造与装备升级路径
  • AI赋能工业4.0:数据堂一站式数据服务加速制造智能化落地
  • 如何打造吸睛动态头像?GIF动态头像制作指南
  • 评估AI的终极答案:LLM-As-a-Judge!AI时代,谁来评判AI?答案是AI自己!
  • Meta封闭技术大门:开源先锋为何倒向闭源阵营?
  • HRNet:深度高分辨率表示学习用于人体姿态估计-k学长深度学习专栏
  • Miniconda环境隔离机制揭秘:保障模型复现精准性
  • 颠覆认知:实测6款AI工具,论文写作“专用”比“通用”强在哪?
  • 【自动控制】自动控制原理中,最小相位系统是什么?