博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink 案例整合
阅读量:7051 次
发布时间:2019-06-28

本文共 4526 字,大约阅读时间需要 15 分钟。

1.概述

  Flink 1.1.0 版本已经在官方发布了,于 2016-08-08 更新了 Flink 1.1.0 的变动。在这 Flink 版本的发布,添加了 SQL 语法这一特性。这对于业务场景复杂,依赖于 SQL 来分析统计数据,算得上是一个不错的福利。加上之前有同学和朋友邮件中提到,Flink 官方给的示例运行有困难,能否整合一下 Flink 的案例。笔者通过本篇博客来解答一下相关疑问。

2.内容

2.1 集群部署

  首先,集群的部署需要 JDK 环境。下载 JDK 以及配置 JAVA_HOME 环境,这里就不详述了,比较简单。然后,我们去下载 Flink 1.1.0 的安装包,进入到下载页面,如下图所示:

  这里需要注意的是,Flink 集群的部署,本身不依赖 Hadoop 集群,如果用到 HDFS 或是 HBase 中的存储数据,就需要选择对应的 Hadoop 版本。大家可以根据 Hadoop 集群的版本,选择相应的 Flink 版本下载。

  下载好 Flink 1.1.0 后,按以下步骤进行:

  • 解压 Flink 安装包到 Master 节点
tar xzf flink-*.tgzcd flink-*
  • 配置 Master 和 Slaves
vi $FLINK_HOME/conf/mastervi $FLINK_HOME/conf/slaves
  • 分发
scp -r flink-1.1.0 hadoop@dn2:/opt/soft/flinkscp -r flink-1.1.0 hadoop@dn3:/opt/soft/flink

  这里只用了2个 slave 节点。另外,在 flink-conf.yaml 文件中,可以按需配置,较为简单。就不多赘述了。

  • 启动集群
bin/start-cluster.sh

  注意,这里没有使用 YARN 来启动集群,若是需要使用 YARN 启动集群,可以参考官方文档进行启动。

  Flink 集群启动后,系统有一个 WebUI 监控界面,如下图所示:

2.2 案例

  这里,我们使用 Flink SQL 的 API 来运行一个场景,对一个销售表做一个聚合计算。这里,笔者将实现代码进行了分解,首先是获取操作 Flink 系统的对象,如下所示:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

  接着是读取数据源,并注册为表,如下所示:

CsvTableSource csvTableSource = new CsvTableSource(inPath, new String[] { "trans_id", "part_dt", "lstg_format_name", "leaf_categ_id", "lstg_site_id", "slr_segment_cd", "price", "item_count", "seller_id" },                    new TypeInformation
[] { Types.LONG(), Types.STRING(), Types.STRING(), Types.LONG(), Types.INT(), Types.INT(), Types.FLOAT(), Types.LONG(), Types.LONG() });tableEnv.registerTableSource("user", csvTableSource);Table tab = tableEnv.scan("user");

  这里 inPath 使用了 HDFS 上的数据路径。类型可以在 Hive 中使用 desc 命令查看该表的类型。然后,将“表”转化为数据集,如下所示:

DataSet
ds = tableEnv.toDataSet(tab, KylinSalesDomain.class);tableEnv.registerDataSet("user2", ds, "trans_id,part_dt,lstg_format_name,leaf_categ_id,lstg_site_id,slr_segment_cd,price,item_count,seller_id");Table result = tableEnv.sql("SELECT lstg_format_name as username,SUM(FLOOR(price)) as total FROM user2 group by lstg_format_name");

  最后,对结果进行存储,这里笔者将结果存在了 HDFS 上。如下所示:

TableSink
sink = new CsvTableSink(outPath, "|"); result.writeToSink(sink);env.setParallelism(1);env.execute("Flink Sales SUM");

  注意,这里并发数是可以设置的,通过 setParallelism 方法来设置并发数。

  完整示例,如下所示:

try {            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();            BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);            CsvTableSource csvTableSource = new CsvTableSource(args[0], new String[] { "trans_id", "part_dt", "lstg_format_name", "leaf_categ_id", "lstg_site_id", "slr_segment_cd", "price", "item_count", "seller_id" },                    new TypeInformation
[] { Types.LONG(), Types.STRING(), Types.STRING(), Types.LONG(), Types.INT(), Types.INT(), Types.FLOAT(), Types.LONG(), Types.LONG() }); tableEnv.registerTableSource("user", csvTableSource); Table tab = tableEnv.scan("user"); DataSet
ds = tableEnv.toDataSet(tab, KylinSalesDomain.class); tableEnv.registerDataSet("user2", ds, "trans_id,part_dt,lstg_format_name,leaf_categ_id,lstg_site_id,slr_segment_cd,price,item_count,seller_id"); Table result = tableEnv.sql("SELECT lstg_format_name as username,SUM(FLOOR(price)) as total FROM user2 group by lstg_format_name"); TableSink
sink = new CsvTableSink(args[1], "|"); // write the result Table to the TableSink result.writeToSink(sink); // execute the program env.setParallelism(1); env.execute("Flink Sales SUM"); } catch (Exception e) {
e.printStackTrace(); }

  最后,我们将应用提交到 Flink 集群。如下所示:

flink run flink_sales_sum.jar hdfs://master:8020/user/hive/warehouse/kylin_sales/DEFAULT.KYLIN_SALES.csv hdfs://master:8020/tmp/result3

3.Hive 对比

  同样的语句,在 Hive 下运行之后,与在 Flink 集群下运行之后,结果如下所示:

  • Hive 运行结果:

  • Flink 运行结果:

 

  通过 WebUI 监控界面观察,任务在 Flink 集群中运行所花费的时间在 2s 以内。其运行速度是比较具有诱惑力的。

4.总结

  总体来说,Flink 集群的部署较为简单,其 SQL 的 API 编写需要对官方的文档比较熟悉,需要注意的是,在本地运行 Flink 代码,若是要读取远程 HDFS 文件,那么获取 Flink 对象操作环境,需要采用远程接口(HOST & PORT),或者在本地部署一个开发集群环境,将远程数据源提交到本地 Flink 集群环境运行。若是,读取本地文件,则不需要。其中的原因是当你以集群的方式运行,Flink 会检查本地是否有 Flink 集群环境存在,如若不存在,则会出现远程数据源(如:HDFS 路径地址无法解析等错误)。

5.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

联系方式: 
邮箱:smartloli.org@gmail.com 
Twitter:  
QQ群(Hadoop - 交流社区1):  
温馨提示:请大家加群的时候写上加群理由(姓名+公司/学校),方便管理员审核,谢谢! 

热爱生活,享受编程,与君共勉!

本文转自哥不是小萝莉博客园博客,原文链接:,如需转载请自行联系原作者

你可能感兴趣的文章
EF架构~一个规范,两个实现(续)~性能可以接受的批量增删改操作
查看>>
tensorflow笔记:多层LSTM代码分析
查看>>
Selenium2(WebDriver)总结(一)---启动浏览器、设置profile&加载插件
查看>>
iOS - C 基本语法
查看>>
我的软件测试之旅:(8)困难——没有现成的测试工具
查看>>
“智慧城市”建设风生水起
查看>>
楼宇对讲防盗报警系统两大特点
查看>>
使用“伪造”数据是消除大数据隐私问题的关键
查看>>
浅谈信息安全与数据中心安全的关系
查看>>
《PostgreSQL服务器编程》一一2.7 小结
查看>>
Oracle Database 12.2新特性详解
查看>>
IBM:量子计算现在跟1940年代电脑差不多 更看重长远目标
查看>>
研究机构称独角兽更易获得融资 明后年或有大量企业IPO
查看>>
三星将斥资80亿美元收购美国哈曼国际
查看>>
纷享销客变“逍”客 缘何战略一再调整?
查看>>
立标准引导市场化 大数据产业将迎“洗牌期”
查看>>
软件测试建模:Google ACC
查看>>
《 FreeSWITCH权威指南》——1.4 信令
查看>>
Netflix正在消灭传统电视网络
查看>>
eMarketer:物联网正在重塑快速消费品行业
查看>>