【1】引入第三方bahir
提供的flink-redis
相关依赖包
<!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
<dependency>
<groupid>org.apache.bahir</groupid>
<artifactid>flink-connector-redis_2.11</artifactid>
<version>1.0</version>
</dependency>
【2】flink
连接redis
并输出sink
处理结果
package com.zzx.flink
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.redissink
import org.apache.flink.streaming.connectors.redis.common.config.flinkjedispoolconfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{rediscommand, rediscommanddescription, redismapper}
object redissinktest {
def main(args: array[string]): unit = {
// 创建一个流处理执行环境
val env = streamexecutionenvironment.getexecutionenvironment
//从文件中读取数据并转换为 类
val inputstreamfromfile: datastream[string] = env.readtextfile("e:\\project\\flink\\src\\main\\resources\\wordcount.txt")
//转换 sensorreading为用户自定义的类,是从文件转换而来的
val datastream: datastream[sensorreading] = inputstreamfromfile
.map( data => {
var dataarray = data.split(",")
sensorreading(dataarray(0),dataarray(1).tolong,dataarray(2).todouble)
})
//定义一个 redis 的配置类 继承了flinkjedisconfigbase 正是 sensorreading需要传入的参数,底层将有些数据保存成了状态数据。
val conf = new flinkjedispoolconfig.builder().sethost("192.168.52.131").setport(6379).setpassword("zzx").build()
//定义 redismapper 数据保存的类型
val mymapper = new redismapper[sensorreading] {
//定义保存数据到 redis的命令,hset table key value
override def getcommanddescription: rediscommanddescription = {
// hset tablesname
new rediscommanddescription(rediscommand.hset , "sensor_temp")
}
//设置key
override def getkeyfromdata(data: sensorreading): string = data.id
//设置value
override def getvaluefromdata(data: sensorreading): string = data.temperature.tostring
}
datastream.addsink(new redissink[sensorreading](conf,mymapper))
env.execute("redis sink test")
}
}
查看源码可知redissink
是继承自richsinkfunction<in>
类
public class redissink<in> extends richsinkfunction<in> {
【3】查看redis
输出信息
发表评论