1、datax简介
1.1 datax概述
datax 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(mysql、oracle等)、hdfs、hive、odps、hbase、ftp等各种异构数据源之间稳定高效的数据同步功能。
源码地址:https://github.com/alibaba/datax
1.2 datax支持的数据源
datax目前已经有了比较全面的插件体系,主流的rdbms数据库、nosql、大数据计算系统都已经接入,目前支持数据如下图。
2、datax架构原理
2.1 datax设计理念
为了解决异构数据源同步问题,datax将复杂的网状的同步链路变成了星型数据链路,datax作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到datax,便能跟已有的数据源做到无缝数据同步。
2.2 datax框架设计
datax本身作为离线数据同步框架,采用framework + plugin架构构建。将数据源读取和写入抽象成为reader/writer插件,纳入到整个同步框架中。
reader:数据采集模块,负责采集数据源的数据,将数据发送给framework。
writer:数据写入模块,负责不断向framework取数据,并将数据写入到目的端。
framework:用于连接reader和writer,作为两者的数据传输通道,并处理缓存,流控,并发,数据转换等核心技术问题。
2.3 datax运行流程
下面用一个datax作业生命周期的时序图说明datax的运行流程、核心概念以及每个概念之间的关系。
2.4 datax调度决策思路
举例来说,用户提交了一个datax作业,并且配置了总的并发度为20,目的是对一个有100张分表的mysql数据源进行同步。datax的调度决策思路是:
1)datax job根据分库分表切分策略,将同步工作分成100个task。
2)根据配置的总的并发度20,以及每个task group的并发度5,datax计算共需要分配4个taskgroup。
3)4个taskgroup平分100个task,每一个taskgroup负责运行25个task。
2.5 datax和sqoop对比
3、datax部署
1、下载datax安装包并上传到hadoop102的/opt/software
下载地址:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
2、解压datax.tar.gz到/opt/module
tar -zxvf datax.tar.gz -c /opt/module/
3、自检,执行如下命令
python /opt/module/datax/bin/datax.py /opt/module/datax/job/job.json
4、出现如下内容,则表明安装成功
4、datax使用
4.1 datax使用概述
4.1.1 datax任务提交命令
datax的使用十分简单,用户只需要根据自己同步数据的数据源和目的地选择相应的reader和writer,并将reader和writer的信息配置在一个json文件中,然后执行如下命令提交数据同步任务即可。
python bin/datax.py path/to/your/job.json
4.1.2 datax配置文件格式
可以使用如下命名查看datax配置文件模板。
python bin/datax.py -r mysqlreader -w hdfswriter
配置文件模板如下,json最外层是一个job,job包含setting和content两部分,其中setting用于对整个job进行配置,content用户配置数据源和目的地。
4.2 同步mysql数据到hdfs案例
案例要求:同步gmall数据库中base_province表数据到hdfs的/base_province目录
需求分析:要实现该功能,需选用mysqlreader和hdfswriter,mysqlreader具有两种模式分别是tablemode和querysqlmode,前者使用table,column,where等属性声明需要同步的数据;后者使用一条sql查询语句声明需要同步的数据。
下面分别使用两种模式进行演示。
4.2.1 mysqlreader之tablemode
1、编写配置文件
(1)创建配置文件base_province.json
vim /opt/module/datax/job/base_province.json
(2)配置文件内容如下
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"name",
"region_id",
"area_code",
"iso_code",
"iso_3166_2"
],
"where": "id>=3",
"connection": [
{
"jdbcurl": [
"jdbc:mysql://hadoop102:3306/gmall"
],
"table": [
"base_province"
]
}
],
"password": "000000",
"splitpk": "",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "string"
},
{
"name": "region_id",
"type": "string"
},
{
"name": "area_code",
"type": "string"
},
{
"name": "iso_code",
"type": "string"
},
{
"name": "iso_3166_2",
"type": "string"
}
],
"compress": "gzip",
"defaultfs": "hdfs://hadoop102:8020",
"fielddelimiter": "\t",
"filename": "base_province",
"filetype": "text",
"path": "/base_province",
"writemode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
2、配置文件说明
(1)reader参数说明
(2)writer参数说明
注意事项:
hfds writer并未提供nullformat参数:也就是用户并不能自定义null值写到hfds文件中的存储格式。默认情况下,hfds writer会将null值存储为空字符串(‘’),而hive默认的null值存储格式为\n。所以后期将datax同步的文件导入hive表就会出现问题。
(3)setting参数说明
3、提交任务
(1)在hdfs创建/base_province目录
使用datax向hdfs同步数据时,需确保目标路径已存在
hadoop fs -mkdir /base_province
(2)进入datax根目录
(3)执行如下命令
python bin/datax.py job/base_province.json
4、查看结果
(1)datax打印日志
(2)查看hdfs文件
hadoop fs -cat /base_province/* | zcat
4.2.2 mysqlreader之querysqlmode
1、编写配置文件
(1)修改配置文件base_province.json
(2)配置文件内容如下
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcurl": [
"jdbc:mysql://hadoop102:3306/gmall"
],
"querysql": [
"select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3"
]
}
],
"password": "000000",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "string"
},
{
"name": "region_id",
"type": "string"
},
{
"name": "area_code",
"type": "string"
},
{
"name": "iso_code",
"type": "string"
},
{
"name": "iso_3166_2",
"type": "string"
}
],
"compress": "gzip",
"defaultfs": "hdfs://hadoop102:8020",
"fielddelimiter": "\t",
"filename": "base_province",
"filetype": "text",
"path": "/base_province",
"writemode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
2、配置文件说明
(1)reader参数说明
3、提交任务
(1)清空历史数据
hadoop fs -rm -r -f /base_province/*
(2)进入datax根目录
(3)执行如下命令
python bin/datax.py job/base_province.json
4、查看结果
(1)datax打印日志
(2)查看hdfs文件
hadoop fs -cat /base_province/* | zcat
4.2.3 datax传参
通常情况下,离线数据同步任务需要每日定时重复执行,故hdfs上的目标路径通常会包含一层日期,以对每日同步的数据加以区分,也就是说每日同步数据的目标路径不是固定不变的,因此datax配置文件中hdfs writer的path参数的值应该是动态的。为实现这一效果,就需要使用datax传参的功能。
datax传参的用法如下,在json配置文件中使用${param}引用参数,在提交任务时使用-p"-dparam=value"传入参数值,具体示例如下。
1、编写配置文件
(1)修改配置文件base_province.json
(2)配置文件内容如下
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcurl": [
"jdbc:mysql://hadoop102:3306/gmall"
],
"querysql": [
"select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3"
]
}
],
"password": "000000",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "string"
},
{
"name": "region_id",
"type": "string"
},
{
"name": "area_code",
"type": "string"
},
{
"name": "iso_code",
"type": "string"
},
{
"name": "iso_3166_2",
"type": "string"
}
],
"compress": "gzip",
"defaultfs": "hdfs://hadoop102:8020",
"fielddelimiter": "\t",
"filename": "base_province",
"filetype": "text",
"path": "/base_province/${dt}",
"writemode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
2、提交任务
(1)创建目标路径
hadoop fs -mkdir /base_province/2020-06-14
(2)进入datax根目录
(3)执行如下命令
python bin/datax.py -p"-ddt=2020-06-14" job/base_province.json
3、查看结果
hadoop fs -ls /base_province
4.3 同步hdfs数据到mysql案例
案例要求:同步hdfs上的/base_province目录下的数据到mysql gmall 数据库下的test_province表。
需求分析:要实现该功能,需选用hdfsreader和mysqlwriter。
1、编写配置文件
(1)创建配置文件test_province.json
(2)配置文件内容如下
{
"job": {
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"defaultfs": "hdfs://hadoop102:8020",
"path": "/base_province",
"column": [
"*"
],
"filetype": "text",
"compress": "gzip",
"encoding": "utf-8",
"nullformat": "\\n",
"fielddelimiter": "\t",
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "000000",
"connection": [
{
"table": [
"test_province"
],
"jdbcurl": "jdbc:mysql://hadoop102:3306/gmall?useunicode=true&characterencoding=utf-8"
}
],
"column": [
"id",
"name",
"region_id",
"area_code",
"iso_code",
"iso_3166_2"
],
"writemode": "replace"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
2、配置文件说明
(1)reader参数说明
(2)writer参数说明
3、提交任务
(1)在mysql中创建gmall.test_province表
drop table if exists `test_province`;
create table `test_province` (
`id` bigint(20) not null,
`name` varchar(20) character set utf8 collate utf8_general_ci null default null,
`region_id` varchar(20) character set utf8 collate utf8_general_ci null default null,
`area_code` varchar(20) character set utf8 collate utf8_general_ci null default null,
`iso_code` varchar(20) character set utf8 collate utf8_general_ci null default null,
`iso_3166_2` varchar(20) character set utf8 collate utf8_general_ci null default null,
primary key (`id`)
) engine = innodb character set = utf8 collate = utf8_general_ci row_format = dynamic;
(2)进入datax根目录
(3)执行如下命令
python bin/datax.py job/test_province.json
4、查看结果
(1)datax打印日志
(2)查看mysql目标表数据
5、datax优化
5.1 速度控制
datax3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在数据库可以承受的范围内达到最佳的同步速度。
注意事项:
1.若配置了总record限速,则必须配置单个channel的record限速
2.若配置了总byte限速,则必须配置单个channe的byte限速
3.若配置了总record限速和总byte限速,channel并发数参数就会失效。因为配置了总record限速和总byte限速之后,实际channel并发数是通过计算得到的:
计算公式为:
min(总byte限速/单个channel的byte限速,总record限速/单个channel的record限速)
5.2 内存调整
当提升datax job内channel并发数时,内存的占用会显著增加,因为datax作为数据交换通道,在内存中会缓存较多的数据。例如channel中会有一个buffer,作为临时的数据交换的缓冲区,而在部分reader和writer的中,也会存在一些buffer,为了防止oom等错误,需调大jvm的堆内存。
建议将内存设置为4g或者8g,这个也可以根据实际情况来调整。
调整jvm xms xmx参数的两种方式:一种是直接更改datax.py脚本;另一种是在启动的时候,加上对应的参数,如下:
python datax/bin/datax.py --jvm=“-xms8g -xmx8g” /path/to/your/job.json
发表评论