etl——extract, transform, load,即“提取、转换、加载”
一、全量同步(t+1)
逻辑:用源表的数据直接覆盖目标表。
实现的逻辑:在往目标表中插入数据之前,【先清空目标表】,然后查询源表的数据,直接插入目标表。适用于数据量小的情况。
(一)全量同步步骤
1.创建源表
drop table emp_source; create table emp_source as select e.*, sysdate create_date, sysdate last_update_date from emp e where 1 = 2; insert into emp_source select e.*, sysdate create_date, sysdate last_update_date from emp e; commit ; select * from emp_source;

2.创建数据同步的目标表
drop table emp_tar; create table emp_tar as select e.*, sysdate etl_dt from emp e where 1 = 2; select * from emp_tar;
3.创建全量同步的存储过程并调用
create or replace procedure p_full
is
begin
----清空目标表数据
execute immediate 'truncate table emp_tar';
---插入数据到目标表
insert into emp_tar
select e.empno,
e.ename,
e.job,
e.mgr,
e.hiredate,
e.sal,
e.comm,
e.deptno,
sysdate etl_dt
from emp_source e;
commit;
----写入异常
exception
when others
then dbms_output.put_line(sqlerrm);
end;
begin
p_full;
end ;
select * from emp_tar;目标表同步成功:

(二)全量同步练习
通过入参 p_job 工种,将非这个工种的数据,全量同步到 emp_0318 这个表(全量用truncate 实现,使用动态sql实现);
create table emp_0318 as select * from emp where 1,2;
create or replace procedure p_fullsync(p_job varchar2) is
begin
execute immediate 'truncate table emp_0318'; -- 清空目标表数据
-- 插入数据到目标表
insert into emp_0318
select * from emp where job != p_job;
commit;
-- 写入异常
exception
when others then
rollback;
end;
begin
p_fullsync(p_job => 'clerk');
end;
select * from emp_0318;
(三)存储过程 & 动态sql & 全量同步之间的区别
1.存储过程:是封装一段 数据的 同步 &转换逻辑;
2.动态sql:当sql中存在不稳定因素,比如,表名不确定,筛选条件不确定,或者是 ddl语句(truncate / drop)不能直接运行,这个时候需要 拼接一个变量的sql语句字符串,然后用 execute immediate sql语句字符串 去动态执行。如果这个sql是 select 语句 一般后面还有 into 变量赋值。
3.全量同步:在插入目标表的时候,需要先清空目标表,这样才能保证目标表的数据不会重复。(否则 我们调用一次存储过程,目标表的数据就会重复一次)。
二、增量同步——merge into
(一)相关概念介绍
1.什么是增量
增量指的是那一天新增的或者发生修改的数据。
2.什么是增量同步
逻辑:用源表的数据 更新 目标表 ,如果这条数据在目标表中存在则更新,数据不存在,则插入。
实现的逻辑:首先判断 目标表中是否有源表中的数据:如果有,则用 源表的数据 更新目标表中对应的数据;如果没有,则查询源表的记录,直接插入目标表。
通常用 merge into 的方式来做增量同步数据。
3.全量同步与增量同步的区别
全量同步是同步整张表的数据,增量同步只同步增量数据(比如今天只同步昨天新增的或者修改的数据)
全量同步之前要清空目标表的数据,增量同步不用清空表,有则更新,无则插入;
4.增量同步merge into语法结构
merge into 目标表 using (增量数据的查询结果集) --子查询 查询源表的增量结果集 on (匹配字段) --用来判断增量结果集里的数据到底是更新,目标表里的数据还是插入到目标表中 when matched then update set 目标表的字段 = 增量结果集字段 --update和set之间不需要加表名 when not matched then insert(目标表字段) values(增量结果集字段) ; --insert和values之间不需要加 into 表名
(二)增量同步练习
1.练习1
示例:假如在昨天公司里新增一个员工和 7788 这个员工的薪资发生了变化,用存储过程实现,将 emp_source 表的数据增量同步到 emp_tar
源表数据:
drop table emp_tar; drop table emp_source; create table emp_source as select e.*,sysdate create_date,sysdate last_update_date from emp e; insert into emp_source(empno,ename,hiredate,create_date,last_update_date) values(1111,'lisa',trunc(sysdate)-1,trunc(sysdate)-1,trunc(sysdate)-1); update emp_source set sal=10000,last_update_date=trunc(sysdate)-1 where empno=7788; commit; select * from emp_source;

create table emp_tar as select e.*,sysdate etl_dt from emp e; select * from emp_tar;
目标表数据:

编写存储过程进行增量同步,并进行调用:
create or replace procedure p_emp_source(p_dt date) as
begin
merge into emp_tar a
using (select * from emp_source where last_update_date = p_dt) b
--子查询 查询源表的增量结果集
on (a.empno = b.empno)
--用来判断增量结果集里的数据到底是更新,目标表里的数据还是插入到目标表中
when matched then
update
set a.ename = b.ename, --主键不能update
a.job = b.job,
a.mgr = b.mgr,
a.hiredate = b.hiredate,
a.sal = b.sal,
a.comm = b.comm,
a.deptno = b.deptno,
a.etl_dt = sysdate --update和set之间不需要加表名
when not matched then
insert
(a.empno,
a.ename,
a.job,
a.mgr,
a.hiredate,
a.sal,
a.comm,
a.deptno,
a.etl_dt)
values
(b.empno,
b.ename,
b.job,
b.mgr,
b.hiredate,
b.sal,
b.comm,
b.deptno,
sysdate); --insert和values之间不需要加 into 表名
commit;
end;
begin
p_emp_source(p_dt=>trunc(sysdate)-1);
end;目标表数据:
select * from emp_tar;

2.练习2
--书 book表
drop table book;
create table book(bno varchar2(20),--图书编号
bname varchar2(50),--图书名称
aid int,--作者
pid int,--出版社
tid varchar2(20),--种类
buy date,--进货日期
price number(7,2),--价格
buynum int); --数量
insert into book values('j0001','计算机基础',2001,1001,'j001',date '2016-1-5',12.5,5);
insert into book values('j0002','oracle从入门到精通',2002,1004,'j001',date '2016-8-8',30,10);
insert into book values('y0001','常见病例及用药',2005,1003,'y001',date '2016-2-4',20,20);
insert into book values('w0001','平凡的世界',2006,1003,'w001',date '2016-5-15',35,30);
insert into book values('w0002','悲惨世界',2007,1004,'w001',date '2016-4-9',31,22);
insert into book values('j0003','sql入门',2001,1004,'j001',date '2016-2-15',32,20);
insert into book values('j0004','sql基础课程',2002,1001,'j001',date '2016-6-6',28,10);
commit;
select * from book; --书(主表)
drop table book_source;
create table book_source
as
select t.*
---假如昨天写入的这些数据
,sysdate -1 as create_date
,sysdate -1 as last_update_date
from book t;
select * from book_source;
---目标表
drop table book_target;
create table book_target(bno varchar2(20) ,--图书编号
bname varchar2(50),--图书名称
aid int,--作者
pid int,--出版社
tid varchar2(20),--种类
buy date,--进货日期
price number(7,2),--价格
buynum int
,etl_date date);
select * from book_target;
-- 创建存储过程,将源表数据同步到目标表中
create or replace procedure p_book_source_target(p_dt date) is
v_rowcount number;
begin
merge into book_target a
using (select * from book_source where trunc(last_update_date) = p_dt) b
on (a.bno = b.bno)
when matched then
update
set a.bname = b.bname,
a.aid = b.aid,
a.pid = b.pid,
a.tid = b.tid,
a.buy = b.buy,
a.price = b.price,
a.buynum = b.buynum,
a.etl_date = sysdate -- 这里必须是当天的日期
when not matched then
insert
(a.bno,
a.bname,
a.aid,
a.pid,
a.tid,
a.buy,
a.price,
a.buynum,
a.etl_date)
values
(b.bno,
b.bname,
b.aid,
b.pid,
b.tid,
b.buy,
b.price,
b.buynum,
sysdate);
-- 记录merge影响的行数
v_rowcount := sql%rowcount;
dbms_output.put_line('merge影响的行数: ' || v_rowcount);
commit;
exception
when others then
rollback;
end;-- 没有匹配到数据,是新增数据 begin p_book_source_target(p_dt => trunc(sysdate) - 1); -- 传入昨天的日期 end; select * from book_target;
源表数据成功insert到目标表中:

insert into book_source
select t.*
---假如昨天写入的这些数据
,sysdate as create_date
,sysdate as last_update_date
from book t;
commit;
select * from book_source;
begin p_book_source_target(p_dt => trunc(sysdate)); -- 传入今天的日期 end; -- 这次匹配到了,是更新操作 select * from book_target;

总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论