去年华为大佬就开始在用flink,今天刚有空就稍微跟着写了个demo玩起来(就不用java了 spark和flink还是用scala玩)
package flink.test import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time object StreamingWindowWordCount { def main(args:Array[String]):Unit={ //get port param val port:Int = try { ParameterTool.fromArgs(args).getInt("port") }catch{ case e:Exception=> { System.err.println("no port") } 9876(启动linux的NC -l 9876端口进行监听) } //获取运行环境 val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment; //解析参数 val text = env.socketTextStream("rhel071",port,'\n') //解析数据,分组,窗口操作,聚合求sum //注意:在这需要做一个隐式转换,否则使用flatmap会报错 import org.apache.flink.api.scala._ val windowCount = text.flatMap(line=>line.split("\\s")) .map(word=>WordWithCount(word,1L)) .keyBy("word") .timeWindow(Time.seconds(2),Time.seconds(1)) .reduce((a,b)=>WordWithCount(a.word,a.count + b.count))//key 一样获取相同的数据进行汇总(scala逻辑基本和spark没什么两样,都是进行数据的算子操作,需要action算子才能触发动作) //.sum("count") //使用一个单线程打印结果 windowCount.print().setParallelism(1) env.execute("streaming word count") } case class WordWithCount(word:String,count:Long) } maven项目(这种东西不适合自己找jar包,本地测试需要的jar包量实在太多,特别项目牵扯上hadoop hbase的时候) 这里是小例子应用到的maven,记录下
4.0.0 finkDemo_20180918 finkDemo 1.0-SNAPSHOT org.apache.flink flink-java 1.4.2 org.apache.flink flink-streaming-java_2.11 1.4.2 org.apache.flink flink-scala_2.11 1.4.2 org.apache.flink flink-streaming-scala_2.11 1.4.2
flink的离线操作
package flink.test import org.apache.flink.api.scala.ExecutionEnvironment object BatchWordCountScala { def main(args:Array[String]):Unit = { val inputPath:String = "D:\\flink\\batch\\file"; val outputPath:String = "D:\\flink\\data\\result"; val env:ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment; val text = env.readTextFile(inputPath) import org.apache.flink.api.scala._ val counts = text.flatMap(line => line.split(" ")).map(word => WordWithCount(word,1L)).groupBy(0).sum(1) counts.writeAsCsv(outputPath,"\n"," ") env.execute("batch word count") } case class WordWithCount(word:String,count:Long) } 把目录下的text文件word解析统计后存入result目录