当前位置: 代码网 > it编程>数据库>MsSqlserver > 记录使用FlinkSql进行实时工作流开发

记录使用FlinkSql进行实时工作流开发

2024年08月02日 MsSqlserver 我要评论
Apache Flink是一个开源框架,用于处理无边界(无尽)和有边界(有限)数据流。它提供了低延迟、高吞吐量和状态一致性,使开发者能够构建复杂的实时应用和微服务。Flink的核心是流处理引擎,它支持事件时间处理、窗口操作以及精确一次的状态一致性。

引言

在这里插入图片描述

在大数据时代,实时数据分析和处理变得越来越重要。apache flink,作为流处理领域的佼佼者,提供了一套强大的工具集来处理无界和有界数据流。其中,flink sql是其生态系统中一个重要的组成部分,允许用户以sql语句的形式执行复杂的数据流操作,极大地简化了实时数据处理的开发流程。

什么是apache flink?

apache flink是一个开源框架,用于处理无边界(无尽)和有边界(有限)数据流。它提供了低延迟、高吞吐量和状态一致性,使开发者能够构建复杂的实时应用和微服务。flink的核心是流处理引擎,它支持事件时间处理、窗口操作以及精确一次的状态一致性。

为什么选择flink sql?

flink sql实战

常用的connector

在配置flinksql实时开发时,使用mysql-cdc、kafka、jdbc和rabbitmq作为连接器是一个很常见的场景。以下是详细的配置说明,你可以基于这些信息来撰写你的博客:

1. mysql-cdc 连接器配置

mysql-cdc(change data capture)连接器用于捕获mysql数据库中的变更数据。配置示例如下:

create table mysql_table (
    -- 定义表结构
    id int,
    name string,
    -- 其他列
) with (
    'connector' = 'mysql-cdc',  			-- 使用mysql-cdc连接器
    'hostname' = 'mysql-host',  			-- mysql服务器主机名
    'port' = '3306',            			-- mysql端口号
    'username' = 'user',        			-- mysql用户名
    'password' = 'password',    			-- mysql密码
    'database-name' = 'db',     			-- 数据库名
    'table-name' = 'table'      			-- 表名
  	'server-time-zone' = 'gmt+8',           -- 服务器时区
    'debezium.snapshot.mode' = 'initial',  	-- 初始快照模式,initial表示从头开始读取所有数据;latest-offset表示从最近的偏移量开始读取;timestamp则可以指定一个时间戳,从该时间戳之后的数据开始读取。
    'scan.incremental.snapshot.enabled' = 'true'	-- 可选,设置为true时,flink会尝试维护一个数据库表的增量快照。这意味着flink不会每次都重新读取整个表,而是只读取自上次读取以来发生变化的数据。这样可以显著提高读取效率,尤其是在处理大量数据且频繁更新的场景下。
    'scan.incremental.snapshot.chunk.size' = '1024'  -- 可选, 增量快照块大小
    'debezium.snapshot.locking.mode' = 'none', 		 -- 可选,控制在快照阶段锁定表的方式,以防止数据冲突。none表示不锁定,lock-tables表示锁定整个表,transaction表示使用事务来锁定。
    'debezium.properties.include-schema-changes' = 'true',  -- 可选,如果设置为true,则在cdc事件中会包含模式变更信息。
    'debezium.properties.table.whitelist' = 'mydatabase.mytable',  -- 可选,指定要监控的表的白名单。如果table-name未设置,可以通过这个属性来指定。
   	'debezium.properties.database.history' = 'io.debezium.relational.history.filedatabasehistory'  -- 可选,设置数据库历史记录的实现类,通常使用filedatabasehistory来保存历史记录,以便在重启后能恢复状态。
);

2. kafka 连接器配置

kafka连接器用于读写kafka主题中的数据。配置示例如下:

create table kafka_table (
    -- 定义表结构
    id int,
    name string,
    -- 其他列
) with (
    'connector' = 'kafka',      -- 使用kafka连接器
    'topic' = 'topic_name',     -- kafka主题名
    'properties.bootstrap.servers' = 'kafka-broker:9092',  -- kafka服务器地址
    'format' = 'json'           -- 数据格式,例如json
    'properties.group.id' = 'flink-consumer-group',  -- 消费者组id
    'scan.startup.mode' = 'earliest-offset',  -- 启动模式(earliest-offset, latest-offset, specific-offset, timestamp)
    'format' = 'json',  -- 数据格式
    'json.fail-on-missing-field' = 'false',  -- 是否在字段缺失时失败
    'json.ignore-parse-errors' = 'true',     -- 是否忽略解析错误
    'properties.security.protocol' = 'sasl_ssl', -- 安全协议(可选)
    'properties.sasl.mechanism' = 'plain',       -- sasl机制(可选)
    'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.plainloginmodule required username="user" password="password";'  -- sasl配置(可选)
);

3. jdbc 连接器配置

jdbc连接器用于与其他关系型数据库进行交互。配置示例如下:

create table jdbc_table (
    -- 定义表结构
    id int,
    name string,
    -- 其他列
) with (
    'connector' = 'jdbc',       -- 使用jdbc连接器
    'url' = 'jdbc:mysql://mysql-host:3306/db',  -- jdbc连接url
    'table-name' = 'table_name', -- 数据库表名
    'username' = 'user',        -- 数据库用户名
    'password' = 'password'     -- 数据库密码
    'driver' = 'com.mysql.cj.jdbc.driver',   -- jdbc驱动类
    'lookup.cache.max-rows' = '5000',        -- 可选,查找缓存的最大行数
    'lookup.cache.ttl' = '10min',            -- 可选,查找缓存的ttl(时间到期)
    'lookup.max-retries' = '3',              -- 可选,查找的最大重试次数
    'sink.buffer-flush.max-rows' = '1000',   -- 可选,缓冲区刷新最大行数
    'sink.buffer-flush.interval' = '2s'      -- 可选,缓冲区刷新间隔
);

4. rabbitmq 连接器配置

rabbitmq连接器用于与rabbitmq消息队列进行交互。配置示例如下:

create table rabbitmq_table (
    -- 定义表结构
    id int,
    name string,
    -- 其他列
) with (
    'connector' = 'rabbitmq',   -- 使用rabbitmq连接器
    'host' = 'rabbitmq-host',   -- rabbitmq主机名
    'port' = '5672',            -- rabbitmq端口号
    'username' = 'user',        -- rabbitmq用户名
    'password' = 'password',    -- rabbitmq密码
    'queue' = 'queue_name',     -- rabbitmq队列名
    'exchange' = 'exchange_name' -- rabbitmq交换机名
    'routing-key' = 'routing_key',   -- 路由键
    'delivery-mode' = '2',           -- 投递模式(2表示持久)
    'format' = 'json',               -- 数据格式
    'json.fail-on-missing-field' = 'false',  -- 是否在字段缺失时失败
    'json.ignore-parse-errors' = 'true'      -- 是否忽略解析错误
);

5. rest lookup 连接器配置

rest lookup 连接器允许在 sql 查询过程中,通过 rest api 进行查找操作。

create table rest_table (
    id int,
    name string,
    price decimal(10, 2),
    primary key (id) not enforced
) with (
    'connector' = 'rest-lookup',
    'url' = 'http://api.example.com/user/{id}',  -- rest api url,使用占位符 {product_id}
    'lookup-method' = 'post'	-- 'get' 或 'post'
    'format' = 'json',  -- 数据格式
    'asyncpolling' = 'false'	-- 可选,指定查找操作是否使用异步轮询模式。默认值为 'false'。当设置为 'true' 时,查找操作会以异步方式执行,有助于提高性能。
    'gid.connector.http.source.lookup.header.content-type' = 'application/json'	-- 可选,设置 content-type 请求头。用于指定请求体的媒体类型。例如,设置为 application/json 表示请求体是 json 格式。
    'gid.connector.http.source.lookup.header.origin' = '*'	-- 可选,设置 origin 请求头。通常用于跨域请求。
    'gid.connector.http.source.lookup.header.x-content-type-options' = 'nosniff'	-- 可选,设置 x-content-type-options 请求头。用于防止 mime 类型混淆攻击。
	'json.fail-on-missing-field' = 'false',  -- 可选,是否在字段缺失时失败
	'json.ignore-parse-errors' = 'true'  -- 可选,是否忽略解析错误
    'lookup.cache.max-rows' = '5000',  -- 可选,查找缓存的最大行数
    'lookup.cache.ttl' = '10min',  -- 可选,查找缓存的ttl(时间到期)
    'lookup.max-retries' = '3'  -- 可选,查找的最大重试次数
);

6. hdfs 连接器配置

hdfs connector用于读取或写入hadoop分布式文件系统中的数据。

创建hdfs source

create table hdfssource (
   line string
) with (
   'connector' = 'filesystem',
   'path' = 'hdfs://localhost:9000/data/input',		-- hdfs上的路径。
   'format' = 'csv'		-- 文件格式。
);

创建hdfs sink

create table hdfssink (
   line string
) with (
   'connector' = 'filesystem',
   'path' = 'hdfs://localhost:9000/data/output',
   'format' = 'csv'
);

flinksql数据类型

在flinksql中,数据类型的选择和定义是非常重要的,因为它们直接影响数据的存储和处理方式。flinksql提供了多种数据类型,可以满足各种业务需求。以下是flinksql中的常见数据类型及其详细介绍:

1. 基本数据类型

  • boolean: 布尔类型,表示truefalse

    create table example_table (
        is_active boolean
    );
    
  • tinyint: 8位带符号整数,范围是-128127

    create table example_table (
        tiny_value tinyint
    );
    
  • smallint: 16位带符号整数,范围是-3276832767

    create table example_table (
        small_value smallint
    );
    
  • int: 32位带符号整数,范围是-21474836482147483647

    create table example_table (
        int_value int
    );
    
  • bigint: 64位带符号整数,范围是-92233720368547758089223372036854775807

    create table example_table (
        big_value bigint
    );
    
  • float: 单精度浮点数。

    create table example_table (
        float_value float
    );
    
  • double: 双精度浮点数。

    create table example_table (
        double_value double
    );
    
  • decimal(p, s): 精确数值类型,p表示总精度,s表示小数位数。

    create table example_table (
        decimal_value decimal(10, 2)
    );
    

2. 字符串数据类型

  • char(n): 定长字符串,n表示字符串的长度。

    create table example_table (
        char_value char(10)
    );
    
  • varchar(n): 可变长字符串,n表示最大长度。

    create table example_table (
        varchar_value varchar(255)
    );
    
  • string: 可变长字符串,无长度限制。

    create table example_table (
        string_value string
    );
    

3. 日期和时间数据类型

  • date: 日期类型,格式为yyyy-mm-dd

    create table example_table (
        date_value date
    );
    
  • time§: 时间类型,格式为hh:mm:ssp表示秒的小数位精度。

    create table example_table (
        time_value time(3)
    );
    
  • timestamp§: 时间戳类型,格式为yyyy-mm-dd hh:mm:ss.sssp表示秒的小数位精度。

    create table example_table (
        timestamp_value timestamp(3)
    );
    
  • timestamp§ with local time zone: 带有本地时区的时间戳类型。

    create table example_table (
        local_timestamp_value timestamp(3) with local time zone
    );
    

4. 复杂数据类型

  • array: 数组类型,t表示数组中的元素类型。

    create table example_table (
        array_value array<int>
    );
    
  • map<k, v>: 键值对映射类型,k表示键的类型,v表示值的类型。

    create table example_table (
        map_value map<string, int>
    );
    
  • row<…>: 行类型,可以包含多个字段,每个字段可以有不同的类型。

    create table example_table (
        row_value row<name string, age int>
    );
    

5. 特殊数据类型

  • binary(n): 定长字节数组,n表示长度。

    create table example_table (
        binary_value binary(10)
    );
    
  • varbinary(n): 可变长字节数组,n表示最大长度。

    create table example_table (
        varbinary_value varbinary(255)
    );
    

数据类型的使用示例

以下是一个包含各种数据类型的表的定义示例:

create table example_table (
    id int,
    name string,
    is_active boolean,
    salary decimal(10, 2),
    birth_date date,
    join_time timestamp(3),
    preferences array<string>,
    attributes map<string, string>,
    address row<street string, city string, zip int>
);
(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com