【1】引入pom.xml
依赖
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-connector-elasticsearch6_2.12</artifactid>
<version>1.10.0</version>
</dependency>
【2】es6 scala
代码,自动导入的scala
包需要修改为scala._
否则会出现错误。
package com.zzx.flink
import java.util
import org.apache.flink.api.common.functions.runtimecontext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{elasticsearchsinkfunction, requestindexer}
import org.apache.flink.streaming.connectors.elasticsearch6.elasticsearchsink
import org.apache.http.httphost
import org.elasticsearch.client.requests
object essinktest {
def main(args: array[string]): unit = {
// 创建一个流处理执行环境
val env = streamexecutionenvironment.getexecutionenvironment
//从文件中读取数据并转换为 类
val inputstreamfromfile: datastream[string] = env.readtextfile("e:\\project\\flink\\src\\main\\resources\\wordcount.txt")
//转换
val datastream: datastream[sensorreading] = inputstreamfromfile
.map( data => {
var dataarray = data.split(",")
sensorreading(dataarray(0),dataarray(1).tolong,dataarray(2).todouble)
})
//定义一个 httphosts
val httphost = new util.arraylist[httphost]()
//默认 9200 我的修改为了 9201
httphost.add(new httphost("192.168.1.12",9200,"http"))
httphost.add(new httphost("127.0.0.1",9200,"http"))
//定义一个 elasticsearchfuntion 操作 es的function
val essinkfunc = new elasticsearchsinkfunction[sensorreading] {
//element 每一条数据 通过 index 发送
override def process(element: sensorreading, runtimecontext: runtimecontext, index: requestindexer): unit = {
//包装写入 es 的数据
val datasource = new util.hashmap[string,string]()
datasource.put("sensor_id",element.id)
datasource.put("temp",element.temperature.tostring)
datasource.put("ts",element.timestamp.tostring)
//index
val indexrequest = requests.indexrequest()
.index("sensor_temp")
.`type`("readingdata")
.source(datasource)
index.add(indexrequest)
println("saved successfully " + element.tostring)
}
}
//输出值 es
datastream.addsink(new elasticsearchsink.builder[sensorreading](httphost,essinkfunc).build())
env.execute("es")
}
}
【3】es6
输出展示
发表评论