当前位置: 代码网 > 科技>电脑产品>内存 > Flink 输出至 Elasticsearch

Flink 输出至 Elasticsearch

2024年08月01日 内存 我要评论
Flink 输出至 Elasticsearch。

【1】引入pom.xml依赖

<dependency>
    <groupid>org.apache.flink</groupid>
    <artifactid>flink-connector-elasticsearch6_2.12</artifactid>
    <version>1.10.0</version>
</dependency>

【2】es6 scala代码,自动导入的scala包需要修改为scala._ 否则会出现错误。

package com.zzx.flink

import java.util

import org.apache.flink.api.common.functions.runtimecontext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{elasticsearchsinkfunction, requestindexer}
import org.apache.flink.streaming.connectors.elasticsearch6.elasticsearchsink
import org.apache.http.httphost
import org.elasticsearch.client.requests


object essinktest {
  def main(args: array[string]): unit = {
    // 创建一个流处理执行环境
    val env = streamexecutionenvironment.getexecutionenvironment
    //从文件中读取数据并转换为 类
    val inputstreamfromfile: datastream[string] = env.readtextfile("e:\\project\\flink\\src\\main\\resources\\wordcount.txt")
    //转换
    val datastream: datastream[sensorreading] = inputstreamfromfile
      .map( data => {
        var dataarray = data.split(",")
        sensorreading(dataarray(0),dataarray(1).tolong,dataarray(2).todouble)
      })

    //定义一个 httphosts
    val httphost = new util.arraylist[httphost]()
    //默认 9200 我的修改为了 9201
    httphost.add(new httphost("192.168.1.12",9200,"http"))
    httphost.add(new httphost("127.0.0.1",9200,"http"))
    //定义一个 elasticsearchfuntion 操作 es的function
    val essinkfunc = new elasticsearchsinkfunction[sensorreading] {
      //element 每一条数据 通过 index 发送
      override def process(element: sensorreading, runtimecontext: runtimecontext, index: requestindexer): unit = {
        //包装写入 es 的数据
        val datasource = new util.hashmap[string,string]()
        datasource.put("sensor_id",element.id)
        datasource.put("temp",element.temperature.tostring)
        datasource.put("ts",element.timestamp.tostring)

        //index
        val indexrequest = requests.indexrequest()
            .index("sensor_temp")
            .`type`("readingdata")
            .source(datasource)
        index.add(indexrequest)
        println("saved successfully " + element.tostring)
      }
    }
    //输出值 es
    datastream.addsink(new elasticsearchsink.builder[sensorreading](httphost,essinkfunc).build())
    env.execute("es")
  }
}

【3】es6输出展示

​ [点击并拖拽以移动] ​​

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com