ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Flink] Apache Flink 시작하기
    OpenSource 2017. 7. 18. 11:42


    install

    hadoop2, yarn 클러스터를 사용할 것이면, 설치 되어 있는 하둡 버전에 맞게 설치한다. 로컬모드로 실행할 것이라면, 하둡 설치는 필요하지 않다. java는 1.8.x 버전을 설치하고, scala는 2.10.x 버전이나 2.11.x 버전을 설치하고, flink는 하둡이나 scala 버전에 맞는 바이너리로 설치한다. 



    scala 


    F="scala-2.11.11.tgz"

    wget https://downloads.lightbend.com/scala/2.11.11/$F

    sudo tar zvxf $F              -C /usr/local

    sudo rm -rf $F

    sudo rm -rf /usr/local/scala

    sudo ln -s /usr/local/scala-2.11.11 /usr/local/scala

    sudo ln -s /usr/local/scala/bin/scala /usr/bin/scala

    sudo ln -s /usr/local/scala/bin/scalac /usr/bin/scalac

    sudo ln -s /usr/local/scala/bin/scaladoc /usr/bin/scaladoc

    sudo ln -s /usr/local/scala/bin/scalap /usr/bin/scalap



    flink


    wget http://mirror.navercorp.com/apache/flink/flink-1.3.1/flink-1.3.1-bin-hadoop27-scala_2.11.tgz

    tar xvf flink-1.3.1-bin-hadoop27-scala_2.11.tgz






    KafkaConsumer Wordcount

    example로 kafka의 topic을 읽어서 source로 사용하고, wordcount를 하는 scala 프로그램을 작성하고, 실행해본다.


    maven 프로젝트 

    maven 설치는 생략하고, maven 프로젝트를 생성한다. 


    mvn archetype:generate \

        -DarchetypeGroupId=org.apache.flink \

        -DarchetypeArtifactId=flink-quickstart-scala \

        -DarchetypeVersion=1.3.0 \

        -DgroupId=kafkaconsumer-wordcount \

        -DartifactId=kafkaconsumer-wordcount \

        -Dversion=0.1 \

        -Dpackage=kafkawordcount \

        -DinteractiveMode=false


    quickstart 프로젝트가 생성되어서 example 파일들도 같이 생성되었다. 필요없는 예제 파일들을 제거한다.

    그리고 pom.xml을 열어서 flink 버전과 scala 버전(:%s/2.10/2.11/g), java 버전(maven compiler plugin)을 변경하고, flink-kafka connector dependency를 추가한다.


    cd kafkaconsumer-wordcount/

    rm src/main/scala/kafkawordcount/*.scala

    vi pom.xml



    '0.10'은 kafka 버전이 0.10.x 라는 것이고, '2.11'은 scala 버전이다. 자신의 kafka 버전에 맞게 변경하고, 아래 내용을 추가한다.


    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>


    예제 프로그램 작성



    vi src/main/scala/kafkawordcount/KafkaConsumerWordCount.scala


    package kafkawordcount


    import org.apache.flink.api.common.restartstrategy.RestartStrategies

    import org.apache.flink.api.java.utils.ParameterTool

    import org.apache.flink.streaming.api.scala._

    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010

    import org.apache.flink.streaming.util.serialization.SimpleStringSchema

    import org.apache.flink.streaming.api.windowing.time.Time



    object KafkaConsumerWordCount{


      def main(args: Array[String]): Unit = {


        val params = ParameterTool.fromArgs(args)


        if (params.getNumberOfParameters < 4) {

          println("Missing parameters!\nUsage: Kafka --topic <topic> " +

            "--bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <some id>")

          return

        }

        val env = StreamExecutionEnvironment.getExecutionEnvironment


        env.getConfig.setGlobalJobParameters(params)


        val kafkaConsumer = new FlinkKafkaConsumer010(

          params.getRequired("topic"),

          new SimpleStringSchema,

          params.getProperties)


        val messageStream = env.addSource(kafkaConsumer)


        val windowCounts = messageStream

            .flatMap { w => w.split("\\s") }

            .map { w => WordWithCount(w, 1) }

            .keyBy("word")

            .timeWindow(Time.seconds(1))

            .sum("count")


        windowCounts.print()


        env.execute("Read from Kafka and Word Count")

      }


        case class WordWithCount(word: String, count: Long)

    }



    mvn clean package -Pbuild-jar




    예제 프로그램 실행


    ./bin/start-local.sh


    flink-1.3.1/bin/flink run -c kafkawordcount.KafkaConsumerWordCount kafkaconsumer-wordcount/target/kafkaconsumer-wordcount-0.1.jar --topic test --bootstrap.servers 10.0.0.19:9092 --zookeeper.connect 10.0.0.19:2181 --group.id beji




    Kafka Producer 실행해서 Write


    bin/kafka-console-producer.sh --broker-list 10.0.0.19:9092 --topic test



    예제 프로그램 출력 결과 확인


    tail -f log/flink-*-jobmanager-*.out


    WordWithCount(1,1)

    WordWithCount(1,5)

    WordWithCount(2,1)





    GitHub

    해당 예제 프로그램 링크


Designed by Tistory.