flink入门

2018/10/31

摘要

Flink 是一个针对流数据和批数据的分布式处理引擎。它主要是由 Java 代码实现。目前主要还是依靠开源社区的贡献而发展。对 Flink 而言,其所要处理的主要场景就是流数据,批数据只是流数据的一个极限特例而已。再换句话说,Flink 会把所有任务当成流来处理,这也是其最大的特点。

特点

流处理特性

  • 支持高吞吐、低延迟、高性能的流处理
  • 支持带有事件时间的窗口(Window)操作
  • 支持有状态计算的Exactly-once语义
  • 支持高度灵活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作
  • 支持具有Backpressure功能的持续流模型
  • 支持基于轻量级分布式快照(Snapshot)实现的容错
  • 一个运行时同时支持Batch on Streaming处理和Streaming处理
  • Flink在JVM内部实现了自己的内存管理
  • 支持迭代计算
  • 支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存

API支持

  • 对Streaming数据类应用,提供DataStream API
  • 对批处理类应用,提供DataSet API(支持Java/Scala)

Libraries支持

  • 支持机器学习(FlinkML)
  • 支持图分析(Gelly)
  • 支持关系数据处理(Table)
  • 支持复杂事件处理(CEP)

整合支持

  • 支持Flink on YARN
  • 支持HDFS
  • 支持来自Kafka的输入数据
  • 支持Apache HBase
  • 支持Hadoop程序
  • 支持Tachyon
  • 支持ElasticSearch
  • 支持RabbitMQ
  • 支持Apache Storm
  • 支持S3
  • 支持XtreemFS

安装

Flink 有三种部署模式,分别是 Local、Standalone Cluster 和 Yarn Cluster。

Local模式

对于 Local 模式来说,JobManager 和 TaskManager 会公用一个 JVM 来完成 Workload。如果要验证一个简单的应用,Local 模式是最方便的。实际应用中大多使用 Standalone 或者 Yarn Cluster,而local模式只是将安装包解压启动(./bin/start-local.sh)即可,在这里不在演示。

Standalone 模式

下载

http://flink.apache.org/downloads.html

版本说明

Apache Flink 1.6.2 only

Flink 1.6.2 with Hadoop

解压

tar xf flink-1.3.2-bin-hadoop26-scala_2.10.tgz

软连接

ln -s /opt/soft/flink-1.6.2 /opt/flink

修改环境变量

vim /etc/profile
 export FLINK_HOME=/opt/flink
 export PATH=$PATH:$FLINK_HOME/bin
source /etc/profile

修改配置文件

vim $FLINK_HOME/conf/masters
 master1:8081
vim $FLINK_HOME/conf/slaves

未完待续

代码调用

maven

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-core</artifactId>
  <version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.11</artifactId>
  <version>1.6.1</version>
  <scope>provided</scope>
</dependency>

java代码

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketWindowWordCount {
  public static void main(String[] args) throws Exception {
    // the port to connect to
    final int port;

    try {
      final ParameterTool params = ParameterTool.fromArgs(args);
      port = params.getInt("port");
    } catch (Exception e) {
      System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
      return;
    }

    // get the execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // get input data by connecting to the socket
    DataStream<String> text = env.socketTextStream("localhost", port, "\n");

    // parse the data, group it, window it, and aggregate the counts
    DataStream<WordWithCount> windowCounts = text
      .flatMap(new FlatMapFunction<String, WordWithCount>() {
        @Override
        public void flatMap(String value, Collector<WordWithCount> out) {
            for (String word : value.split("\\s")) {
              out.collect(new WordWithCount(word, 1L));
            }
        }
      })
      .keyBy("word")
      .timeWindow(Time.seconds(5), Time.seconds(1))
      .reduce(new ReduceFunction<WordWithCount>() {
        @Override
        public WordWithCount reduce(WordWithCount a, WordWithCount b) {
            return new WordWithCount(a.word, a.count + b.count);
        }
      });

    // print the results with a single thread, rather than in parallel
    windowCounts.print().setParallelism(1);
    env.execute("Socket Window WordCount");
  }

  // Data type for words with count
  public static class WordWithCount {

    public String word;
    public long count;
    public WordWithCount() {}
    public WordWithCount(String word, long count) {
      this.word = word;
      this.count = count;
    }

    @Override
    public String toString() {
      return word + " : " + count;
    }
  }
}

scala代码

object SocketWindowWordCount {

  def main(args: Array[String]) : Unit = {

    // the port to connect to
    val port: Int = try {
      ParameterTool.fromArgs(args).getInt("port")
    } catch {
      case e: Exception => {
        System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")
        return
      }
    }

    // get the execution environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // get input data by connecting to the socket
    val text = env.socketTextStream("localhost", port, '\n')

    // parse the data, group it, window it, and aggregate the counts
    val windowCounts = text
      .flatMap { w => w.split("\\s") }
      .map { w => WordWithCount(w, 1) }
      .keyBy("word")
      .timeWindow(Time.seconds(5), Time.seconds(1))
      .sum("count")

    // print the results with a single thread, rather than in parallel
    windowCounts.print().setParallelism(1)

    env.execute("Socket Window WordCount")
  }

  // Data type for words with count
  case class WordWithCount(word: String, count: Long)
}

目录