当前位置: 代码网 > it编程>数据库>MsSqlserver > spark stream:从Kafka中读取数据

spark stream:从Kafka中读取数据

2024年07月31日 MsSqlserver 我要评论
【代码】spark stream:从Kafka中读取数据。

一、添加依赖

thisbuild / version := "0.1.0-snapshot"

thisbuild / scalaversion := "2.12.12"
librarydependencies ++= seq( "org.apache.spark" % "spark-core_2.12" % "3.0.0")
librarydependencies ++= seq( "org.apache.spark" % "spark-sql_2.12" % "3.0.0")
librarydependencies ++= seq( "org.apache.spark" % "spark-streaming_2.12" % "3.0.0")
librarydependencies ++= seq( "org.apache.spark" % "spark-streaming-kafka-0-10_2.12" % "3.0.0")
librarydependencies ++= seq( "com.fasterxml.jackson.core" % "jackson-core" % "2.10.1")



librarydependencies ++= seq( "mysql" % "mysql-connector-java" % "5.1.30")


lazy val root = (project in file("."))
  .settings(
    name := "scala-proj"
  )

二、demo程序

package example3

import org.apache.kafka.clients.consumer.{consumerconfig, consumerrecord}
import org.apache.spark.sparkconf
import org.apache.spark.rdd.rdd
import org.apache.spark.storage.storagelevel
import org.apache.spark.streaming.dstream.{dstream, inputdstream, receiverinputdstream}
import org.apache.spark.streaming.kafka010.{consumerstrategies, kafkautils, locationstrategies}
import org.apache.spark.streaming.receiver.receiver
import org.apache.spark.streaming.{duration, seconds, streamingcontext}

import scala.collection.mutable
import scala.util.random

object hellostreaming04 {
  def main(args: array[string]): unit = {
    val sparkconf = new sparkconf().setmaster("local[*]").setappname("kafkaconsumer")
    val ssc = new streamingcontext(sparkconf, seconds(5))

    val kafkapara: map[string, object] = map[string, object](
      consumerconfig.bootstrap_servers_config -> "xx.xx.xx.xx:9092", 
      consumerconfig.group_id_config -> "mygroup",
      "key.deserializer" -> "org.apache.kafka.common.serialization.stringdeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.stringdeserializer"
    )

    val kafkadatads: inputdstream[consumerrecord[string, string]] = kafkautils.createdirectstream[string, string](ssc,
      locationstrategies.preferconsistent,
      consumerstrategies.subscribe[string, string](set("mytopic"), kafkapara)
    )
    kafkadatads.map(_.value()).print()
    ssc.start();
    ssc.awaittermination();

  }




}

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com