发布网友 发布时间:2024-10-23 22:50
共1个回答
热心网友 时间:2024-10-27 23:02
摘要:深入理解Flink实时数据处理程序的开发,通过简单示例,利用Flink的DataStream API构建有状态流应用程序。
在深入了解Flink实时数据处理程序的开发之前,先通过一个示例来了解如何使用Flink的DataStream API构建有状态流应用程序的过程。
01、流数据类型
Flink以独特方式处理数据类型和序列化,包括Java和Scala语言,实现一套自定义类型系统,支持多种类型,如图1所示。
■ 图1 Flink类型系统
Flink要求流数据内容可序列化,并内置了多种序列化器,如对于其他类型可使用Kryo,支持如Avro等。
1.java DataStream API使用的流数据类型
对于Java API,Flink定义了从Tuple1到Tuple25的类型表示元组,而POJO(plain old Java Object)为普通Java类。
例如,定义Person类如下:
2.Scala DataStream API使用的流数据类型
对于元组使用Scala的Tuple类,对象类型使用case class表示,如定义Person类如下:
3.Flink类型系统
创建任意POJO类型,Flink描述为PojoTypeInfo,PojoTypeInfo是TypeInformation的子类。
TypeInformation是Flink核心类,描述所有支持的数据类型,如POJO类型对应PojoTypeInfo,Map类型对应MapTypeInfo。
除了描述类型,TypeInformation还提供序列化支持,通过createSerializer方法创建序列化器,用于内存中对象的序列化。
4.类型与Lambda表达式支持
Java编译器类型擦除,导致Flink无法推断算子输出类型,使用Lambda表达式时需指定TypeInformation或TypeHint。
创建TypeInformation和TypeHint的代码如下:
02、流应用程序实现
Flink程序由stream和transformation组成,实现完整工作示例如下。
示例1:筛选未成年人信息
Scala代码如下:
执行代码,输出结果如下:
Java代码如下:
执行程序,输出结果如下:
注意:Flink将批处理程序视为流程序的特殊情况,内部视为数据流,适用于流程序和批处理程序。
03、流应用程序剖析
每个Flink应用程序遵循特定步骤,如图2所示。
■ 图2 Flink应用程序工作步骤
每个Flink程序由基础部分组成:
1.获取执行环境
从main方法生成Flink作业,使用StreamExecutionEnvironment,提供作业执行控制。
创建执行环境的代码如下:
2.加载/创建初始数据
执行环境从多种数据源读取数据,使用fromElements方法加载集合数据。
代码如下:
3.数据转换
每个Flink程序执行数据转换,如map、filter等,示例代码如下。
代码如下:
4.指定计算结果位置
通过接收器(sink)将结果写入外部系统,如打印输出到屏幕。
代码如下:
5.触发执行
调用execute方法启动程序执行,数据加载和转换在执行时实际发生。
代码如下:
这个分布式运行时要求Flink应用程序可序列化,集群中每个节点使用所有依赖项。