当前位置: 代码网 > it编程>数据库>大数据 > 详解Flink同步Kafka数据到ClickHouse分布式表

详解Flink同步Kafka数据到ClickHouse分布式表

2024年05月19日 大数据 我要评论
引言业务需要一种olap引擎,可以做到实时写入存储和查询计算功能,提供高效、稳健的实时数据服务,最终决定clickhouse什么是clickhouse?clickhouse是一个用于联机分析(olap

引言

业务需要一种olap引擎,可以做到实时写入存储和查询计算功能,提供高效、稳健的实时数据服务,最终决定clickhouse

什么是clickhouse?

clickhouse是一个用于联机分析(olap)的列式数据库管理系统(dbms)。

列式数据库更适合于olap场景(对于大多数查询而言,处理速度至少提高了100倍),下面详细解释了原因(通过图片更有利于直观理解),图片来源于clickhouse中文官方文档。

行式

列式

我们使用flink编写程序,消费kafka里面的主题数据,清洗、归一,写入到clickhouse里面去。

这里的关键点,由于第一次使用,无法分清应该建立什么格式的clickhouse表,出现了一些问题,最大的问题就是程序将数据写入了,查询发现数据不完整,只有一部分。我也在网上查了一些原因,总结下来。

为什么有时看不到已经创建好的表并且查询结果一直抖动时多时少?

常见原因1:

建表流程存在问题。clickhouse的分布式集群搭建并没有原生的分布式ddl语义。如果您在自建clickhouse集群时使用create table创建表,查询虽然返回了成功,但实际这个表只在当前连接的server上创建了。下次连接重置换一个server,您就看不到这个表了。

解决方案:

建表时,请使用create table <table_name> on cluster default语句,on cluster default声明会把这条语句广播给default集群的所有节点进行执行。示例代码如下。 create table test on cluster default (a uint64) engine = mergetree() order by tuple(); 在test表上再创建一个分布式表引擎,建表语句如下。 create table test_dis on cluster default as test engine = distributed(default, default, test, cityhash64(a));

常见原因2:

replicatedmergetree存储表配置有问题。replicatedmergetree表引擎是对应mergetree表引擎的主备同步增强版,在单副本实例上限定只能创建mergetree表引擎,在双副本实例上只能创建replicatedmergetree表引擎。

解决方案:

在双副本实例上建表时,请使用replicatedmergetree(‘/clickhouse/tables/{database}/{table}/{shard}’, ‘{replica}’)或replicatedmergetree()配置replicatedmergetree表引擎。其中,replicatedmergetree(‘/clickhouse/tables/{database}/{table}/{shard}’, ‘{replica}’)为固定配置,无需修改。

这里引出了复制表的概念,这里介绍一下,只有 mergetree 系列里的表可支持副本:

replicatedmergetree

replicatedsummingmergetree

replicatedreplacingmergetree

replicatedaggregatingmergetree replicatedcollapsingmergetree

replicatedversionedcollapsingmergetree

replicatedgraphitemergetree

副本是表级别的,不是整个服务器级的。所以,服务器里可以同时有复制表和非复制表。副本不依赖分片。每个分片有它自己的独立副本。

创建复制表

先做好准备工作,该建表的建表,然后编写程序。在表引擎名称上加上 replicated 前缀。例如:replicatedmergetree。

  • 首先创建一个分布式数据库
create database test on cluster default_cluster;
  • 创建本地表

由于clickhouse是分布式的,创建本地表本来应该在每个节点上创建的,但是指定on cluster关键字可以直接完成,建表语句如下:

create table test.test_data_shade on cluster default_cluster
(
    `data` map(string, string),
    `uid` string,
    `remote_addr` string,
    `time` datetime64,
    `status` int32,
    ...其它字段省略
    `dt` string
)
engine = replicatedmergetree()
partition by dt
order by (dt, siphash64(uid));

这里表引擎为replicatedmergetree,即有副本的表,根据dt按天分区,提升查询效率,siphash64是一个hash函数,根据uid散列使得相同uid数据在同一个分片上面,如果有去重需求,速度更快,因为可以计算每个分片去重,再汇总一下即可。

  • 创建分布式表
create table test.test_data_all on cluster default_cluster as test.test_data_shade engine = distributed('default_cluster', 'test', 'test_data_shade', siphash64(uid));

在多副本分布式 clickhouse 集群中,通常需要使用 distributed 表写入或读取数据,distributed 表引擎自身不存储任何数据,它能够作为分布式表的一层透明代理,在集群内部自动开展数据的写入、分发、查询、路由等工作。

通过jdbc写入

这个我是看的官方文档,里面有2种选择,感兴趣的同学可以都去尝试一下。

这里贴一下我的pom依赖

<dependency>
    <groupid>ru.yandex.clickhouse</groupid>
    <artifactid>clickhouse-jdbc</artifactid>
    <version>0.3.1-patch</version>
    <classifier>shaded</classifier>
    <exclusions>
        <exclusion>
            <groupid>*</groupid>
            <artifactid>*</artifactid>
        </exclusion>
    </exclusions>
</dependency>

flink主程序,消费kafka,做清洗,然后写入clickhouse,这都是常规操作,这里贴一下关键代码吧。

连接clickhouse有2种方式,8123端口的http方式,和基于9000端口的tcp方式。

这里官方推荐的是连接驱动是0.3.2:

<dependency>
    <!-- please stop using ru.yandex.clickhouse as it's been deprecated -->
    <groupid>com.clickhouse</groupid>
    <artifactid>clickhouse-jdbc</artifactid>
    <version>0.3.2-patch11</version>
    <classifier>all</classifier>
    <exclusions>
        <exclusion>
            <groupid>*</groupid>
            <artifactid>*</artifactid>
        </exclusion>
    </exclusions>
</dependency>

note: ru.yandex.clickhouse.clickhousedriver has been deprecated and everything under ru.yandex.clickhouse will be removed in 0.3.3.

官方推荐升级到0.3.2,上面表格给出了升级方法,文档地址:

github.com/clickhouse/…

以上就是详解flink同步kafka数据到clickhouse分布式表的详细内容,更多关于flink数据同步kafka clickhouse的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com