博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
flink第一个应用
阅读量:6087 次
发布时间:2019-06-20

本文共 3152 字,大约阅读时间需要 10 分钟。

去年华为大佬就开始在用flink,今天刚有空就稍微跟着写了个demo玩起来(就不用java了 spark和flink还是用scala玩)

 

package flink.test import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time object StreamingWindowWordCount {
def main(args:Array[String]):Unit={
//get port param val port:Int = try {
ParameterTool.fromArgs(args).getInt("port") }catch{
case e:Exception=> {
System.err.println("no port") } 9876(启动linux的NC -l 9876端口进行监听) } //获取运行环境 val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment; //解析参数 val text = env.socketTextStream("rhel071",port,'\n') //解析数据,分组,窗口操作,聚合求sum //注意:在这需要做一个隐式转换,否则使用flatmap会报错 import org.apache.flink.api.scala._ val windowCount = text.flatMap(line=>line.split("\\s")) .map(word=>WordWithCount(word,1L)) .keyBy("word") .timeWindow(Time.seconds(2),Time.seconds(1)) .reduce((a,b)=>WordWithCount(a.word,a.count + b.count))//key 一样获取相同的数据进行汇总(scala逻辑基本和spark没什么两样,都是进行数据的算子操作,需要action算子才能触发动作) //.sum("count") //使用一个单线程打印结果 windowCount.print().setParallelism(1) env.execute("streaming word count") } case class WordWithCount(word:String,count:Long) } maven项目(这种东西不适合自己找jar包,本地测试需要的jar包量实在太多,特别项目牵扯上hadoop hbase的时候) 这里是小例子应用到的maven,记录下
4.0.0
finkDemo_20180918
finkDemo
1.0-SNAPSHOT
org.apache.flink
flink-java
1.4.2
org.apache.flink
flink-streaming-java_2.11
1.4.2
org.apache.flink
flink-scala_2.11
1.4.2
org.apache.flink
flink-streaming-scala_2.11
1.4.2

 

 

flink的离线操作

package flink.test import org.apache.flink.api.scala.ExecutionEnvironment object BatchWordCountScala {
def main(args:Array[String]):Unit = {
val inputPath:String = "D:\\flink\\batch\\file"; val outputPath:String = "D:\\flink\\data\\result"; val env:ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment; val text = env.readTextFile(inputPath) import org.apache.flink.api.scala._ val counts = text.flatMap(line => line.split(" ")).map(word => WordWithCount(word,1L)).groupBy(0).sum(1) counts.writeAsCsv(outputPath,"\n"," ") env.execute("batch word count") } case class WordWithCount(word:String,count:Long) } 把目录下的text文件word解析统计后存入result目录

 

 

转载于:https://www.cnblogs.com/yaohaitao/p/9674770.html

你可能感兴趣的文章
WPF 列表开启虚拟化的方式
查看>>
一入前端深似海,从此红尘是路人系列第十二弹之移动端模拟IOS虚拟按钮效果...
查看>>
查找一 线性表的查找
查看>>
Android传递Bitmap的两种简单方式及其缺陷
查看>>
性能不好怎么办?对着清单撸一遍[转]
查看>>
一文读懂物体分类AI算法:LeNet-5 AlexNet VGG Inception ResNet MobileNet
查看>>
量子十问之一:量子究竟是什么?读过你就不会相信“量子水”了
查看>>
宜信陈欢:为何你要用区块链技术?真的准备好了吗?
查看>>
Android 渗透测试学习手册 第四章 对 Android 设备进行流量分析
查看>>
OpenCV的+安卓+号牌识别(OpenCV + Android + 图像水平矫正)
查看>>
话说区块链,它真的不是比特币
查看>>
展望VR AR 2017,旅游、营销以及家装行业或许会火一把
查看>>
“九”答不可 | 如何设计量子计算机?科学家称其速度存在理论上限
查看>>
蓝凌副总裁夏敬华:智明当下,慧看未来——移动互联下企业知识管理应用趋势...
查看>>
Mac下的SSH插件(默认自带)
查看>>
安卓应用安全指南 4.6.3 处理文件 高级话题
查看>>
独家专访阿里高级技术专家北纬:Dubbo开源重启半年来的快意江湖
查看>>
黑科技时代,不了解这些你就OUT了
查看>>
机器视觉中评价光源质量的指标
查看>>
AI为移动医疗APP加码
查看>>