当前位置: 代码网 > it编程>数据库>MsSqlserver > Flink SQL 实时读取 kafka 数据写入 Clickhouse —— 日志处理(三)

Flink SQL 实时读取 kafka 数据写入 Clickhouse —— 日志处理(三)

2024年08月01日 MsSqlserver 我要评论
本文总结了如何使用使用 Clickhouse 保存日志数据,以及如何通过 Flink SQL 将我们的日志实时从 kafka 同步至 clickhouse,然后在结合强大的第三方查询 BI 工具 superset,玩转业务日志,挖掘业务日志的潜在价值。本文设计到的技能知识点比较多,需要熟悉 Clickhouse, Kafka, FlinkSQL, Superset 等,我之前的文章中总结了一些关于 Clickhouse 和 Kafka 相关的内容,感兴趣的读者可以看看.

前言

在之前的文章中,我们总结了如何在 django 项目中进行日志配置,以及如何在 k8s 上部署 filebeat 采集 pvc 中的日志发送至 kafka:

本文将总结如何使用 flink sql 实时将 kafka 中的日志消息发送至 clickhouse 表中。

clickhouse 表设计


上图中的json 内容是kafka 中的日志消息,我们需要读取该消息中的 message 字段(我们的日志信息),然后将该字段中的 time, level, func, trace_id, message 保存至 clickhouse 中。
这里我使用两张表保存日志:

  • adlp_log_local本地表
  • adlp_log分布式表,flinksql 实时写入分布式表

adlp_log_local 本地表

create table if not exists cloud_data.adlp_log_local on cluster perftest_5shards_2replicas
(
    `dt`             datetime64(3),
    `level`          lowcardinality(string),
    `trace_id`       string,
    `func`           string,
    `message`        string,

    -- 建立索引加速低命中率内容的查询
    index idx_trace_id `trace_id` type tokenbf_v1(4096, 2, 0) granularity 2,
    index idx_message `message` type tokenbf_v1(30720, 2, 0) granularity 1
)
engine = replicatedmergetree('/clickhouse/tables/{layer}-{shard}/cloud_data/adlp_log_local', '{replica}')
    partition by toyyyymmdd(dt)
    primary key (dt, trace_id)
    order by (dt, trace_id)
    ttl todatetime(dt) + tointervalday(30);

字段说明

  • dt (datetime64(3)): 存储日志时间戳,精确到毫秒。
  • level (lowcardinality(string)): 存储日志级别,如 infoerror 等,使用 lowcardinality 优化存储和查询。
  • trace_id (string): 存储追踪 id,通常用于关联一系列相关的日志记录。
  • func (string): 存储函数或方法名称,表示日志产生的位置。
  • message(string): 存储日志消息的具体内容。

索引

  • idx_trace_id: 使用 tokenbf_v1 类型的布隆过滤器索引(tokenbf_v1(4096, 2, 0)),在 trace_id 字段上创建,粒度为 2。布隆过滤器索引适合低命中率的查询,能够快速过滤出大多数不匹配的记录。
  • idx_message: 使用 tokenbf_v1 类型的布隆过滤器索引(tokenbf_v1(30720, 2, 0)),在 message 字段上创建,粒度为 1。同样用于加速低命中率的查询。

存储引擎

  • replicatedmergetree: 使用分布式和复制的存储引擎,路径模板为 /clickhouse/tables/{layer}-{shard}/cloud_data/adlp_log_local,副本名称为 {replica},保证数据的高可用性和一致性。

分区和排序

  • 分区 (partition by): 按 dt 字段的年月日(toyyyymmdd(dt))进行分区,有助于管理和查询按天划分的数据。
  • 主键 (primary key): 主键由 dttrace_id 组成,有助于高效查询。
  • 排序 (order by): 按 dttrace_id 字段排序,优化基于时间和 trace id 的查询。

数据生命周期 (ttl)

  • ttl (time to live): 配置数据的生存时间,数据在 dt 字段的时间加上 30 天后自动过期删除,保持数据表的清洁和高效。

adlp_log 分布式表

create table if not exists cloud_data.adlp_log on cluster perftest_5shards_2replicas
(
    `dt`             datetime64(3),
    `level`          lowcardinality(string),
    `trace_id`       string,
    `func`           string,
    `message`        string
)
engine = distributed('perftest_5shards_2replicas', 'cloud_data', 'adlp_log_local', rand());

字段说明
与本地表 adlp_log_local 相同,包含以下字段:

  • dt (datetime64(3))
  • level (lowcardinality(string))
  • trace_id (string)
  • func (string)
  • message (string)

存储引擎
distributed: 分布式引擎,允许将数据分布到多个分片和副本中。参数解释如下:

  • 集群名称 (perftest_5shards_2replicas): 指定集群的名称。
  • 数据库 (cloud_data): 数据库名称。
  • 表 (adlp_log_local): 本地表的名称。
  • 分片键 (rand()): 使用随机函数进行数据分片,保证数据均匀分布。

flink sql 说明

创建 source table (kafka) 连接器表

create temporary table source_table (
    message string
) with (
    'connector' = 'kafka',
    'topic' = 'filebeat_logs',
    'properties.bootstrap.servers' = '127.0.0.1:9092',
    'properties.group.id' = 'prod-logs-k2c',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json',
    'json.ignore-parse-errors' = 'false',
    'json.fail-on-missing-field' = 'false',
    'properties.security.protocol' = 'sasl_plaintext',
    'properties.sasl.mechanism' = 'plain',
    'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.plainloginmodule required username="admin" password="admin";'
);

创建 sink table (clickhouse) 连接器

create temporary table sink_table (
    `dt` timestamp(3),
    `level` string ,
    `trace_id` string ,
    `func` string ,
    `message` string
) with (
  'connector' = 'clickhouse',
  'url' = 'clickhouse://127.0.0.1:8123',
  'username' = 'admin',
  'password' = 'admin',
  'database-name' = 'cloud_data',
  'table-name' = 'adlp_log',
  'use-local' = 'true',
  'sink.batch-size' = '1000',
  'sink.flush-interval' = '1000',
  'sink.max-retries' = '10',
  'sink.update-strategy' = 'insert',
  'sink.sharding.use-table-definition' = 'true',
  'sink.parallelism' = '1'
);

解析 message 写入 sink

insert into sink_table
select 
    to_timestamp(json_value(message, '$.time'), 'yyyy-mm-dd hh:mm:ss') as dt,
    json_value(message, '$.level') as level,
    json_value(message, '$.trace_id') as trace_id,
    json_value(message, '$.func') as func,
    json_value(message, '$.message') as message
from source_table;

日志查询演示

我们的日志导入成功后,可以通过第三方查询工具查询 clickhouse 数据源,我这里使用的是 superset 去查询 clickhouse 数据源。
通过 trace_id 查询整个执行链路的日志
image.png
查询错误日志信息
image.png

全文检索 message 日志信息
image.png

更多扩展

  • superset 是一个强大的 bi 工具,可以将我们的日志中的一些指标做成看板,比如说关键错误日志数量,然后设置告警,发送通知。
  • 通过 flink sql 实时将我们的日志从 kafka 中写入 clickhouse ,结合 clickhouse 强大的查询功能,以及 superset 强大的 bi 功能,可以充分挖掘业务日志中的潜在价值。

总结

本文总结了如何使用使用 clickhouse 保存日志数据,以及如何通过 flink sql 将我们的日志实时从 kafka 同步至 clickhouse,然后在结合强大的第三方查询 bi 工具 superset,玩转业务日志,挖掘业务日志的潜在价值。
本文设计到的技能知识点比较多,需要熟悉 clickhouse, kafka, flinksql, superset 等,我之前的文章中总结了一些关于 clickhouse 和 kafka 相关的内容,感兴趣的读者可以看看:

kafka

superset

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com