当前位置: 代码网 > it编程>数据库>Redis > Flink 输出至 Redis

Flink 输出至 Redis

2024年07月31日 Redis 我要评论
【代码】Flink 输出至 Redis。

【1】引入第三方bahir提供的flink-redis相关依赖包

<!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
<dependency>
    <groupid>org.apache.bahir</groupid>
    <artifactid>flink-connector-redis_2.11</artifactid>
    <version>1.0</version>
</dependency>

【2】flink连接redis并输出sink处理结果

package com.zzx.flink

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.redissink
import org.apache.flink.streaming.connectors.redis.common.config.flinkjedispoolconfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{rediscommand, rediscommanddescription, redismapper}

object redissinktest {
  def main(args: array[string]): unit = {
    // 创建一个流处理执行环境
    val env = streamexecutionenvironment.getexecutionenvironment
    //从文件中读取数据并转换为 类
    val inputstreamfromfile: datastream[string] = env.readtextfile("e:\\project\\flink\\src\\main\\resources\\wordcount.txt")
    //转换  sensorreading为用户自定义的类,是从文件转换而来的
    val datastream: datastream[sensorreading] = inputstreamfromfile
      .map( data => {
        var dataarray = data.split(",")
        sensorreading(dataarray(0),dataarray(1).tolong,dataarray(2).todouble)
      })
    //定义一个 redis 的配置类 继承了flinkjedisconfigbase 正是 sensorreading需要传入的参数,底层将有些数据保存成了状态数据。
    val conf = new flinkjedispoolconfig.builder().sethost("192.168.52.131").setport(6379).setpassword("zzx").build()
    //定义 redismapper 数据保存的类型
    val mymapper = new redismapper[sensorreading] {
      //定义保存数据到 redis的命令,hset table key value
      override def getcommanddescription: rediscommanddescription = {
        // hset tablesname
        new rediscommanddescription(rediscommand.hset , "sensor_temp")
      }
      //设置key
      override def getkeyfromdata(data: sensorreading): string = data.id
      //设置value
      override def getvaluefromdata(data: sensorreading): string = data.temperature.tostring
    }
    datastream.addsink(new redissink[sensorreading](conf,mymapper))

    env.execute("redis sink test")
  }
}

查看源码可知redissink是继承自richsinkfunction<in>

public class redissink<in> extends richsinkfunction<in> {

【3】查看redis输出信息
[点击并拖拽以移动]

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com