数仓工具的使用

数仓工具的使用

数仓工具DataX的使用

1. 概述与架构

  1. 阿里推出的==异构数据源离线同步 工具==

  2. 官方文档:https://github.com/alibaba/DataX/blob/master/README.md

  3. Framework+plugin

    image-20230922090250498

    • Framework:数据传输的管道,处理缓冲、流量控制、并发、数据转换
    • Reader:数据采集模块,采集数据源数据
    • Writer:数据写入模块,将数据写入下沉地(sink)HDFS
  4. 数据源

    1686584710127

2. 写入hdfs

  1. MySQL创建表

    1
    2
    3
    4
    5
    6
    create database if not exists test character set utf8;
    use test;
    create table student(id int,name varchar(20),age int,createtime timestamp );
    insert into `student` (`id`, `name`, `age`, `createtime`) values('1','zhangsan','18','2021-05-10 18:10:00');
    insert into `student` (`id`, `name`, `age`, `createtime`) values('2','lisi','28','2021-05-10 19:10:00');
    insert into `student` (`id`, `name`, `age`, `createtime`) values('3','wangwu','38','2021-05-10 20:10:00');
  2. 查看配置模板文件

    1
    2
    cd /export/server/datax
    python ./bin/datax.py -r mysqlreader -w hdfswrite
  3. 编写配置文件

    1
    2
    3
    4
    5
    6
    cd /export/server/datax/
    vim /job/mysql2hdfs.json

    # 根据官方文档配置
    https://github.com/alibaba/DataX/blob/master/README.md

  4. mysql数据写入hdfs

    1. 启动hdfs

    2. 启动datax执行json脚本

      1
      2
      cd /export/server/datax
      python bin/datax/py job/mysql2hdfs.json
    3. 进入HDFS页面,查看是否写入成功

3. 写入hive

  1. 创建配置文件

    1
    2
    cd /export/server/datax/
    vim /job/mysql2hive.json
  2. 创建Hive表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    create database IF NOT EXISTS test;
    use test;
    create table student(
    id int,
    name varchar(20),
    age int,
    createtime timestamp
    )
    row format delimited fields terminated by "\t" STORED AS TEXTFILE;
  3. 使用datax使用JSON脚本

    1
    2
    cd /export/server/datax
    python bin/datax.py job/mysql2hive.json
  4. 进入HDFS页面,查看是否写入成功

4. DataX-Web的使用

1
2
3
4
5
6
7
8
# 进入datax目录
cd /export/server/datax-web-2.1.2/
# 执行启动脚本
./bin/start-all.sh

# 网页打开
http://192.168.88.80:9527/index.html
admin 123456
  1. 创建项目

  2. 配置数据源

  3. 配置目的地

  4. 创建任务模板(任务管理菜单)

  5. 在hive创建目标表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    create database IF NOT EXISTS test;
    use test;
    create table student2(
    id int,
    name varchar(20),
    age int,
    createtime timestamp
    )
    row format delimited fields terminated by ","
    STORED AS TEXTFILE;
  6. 构建json任务\json脚本

    1. 配置reader
    2. 配置writer
    3. 构建json脚本
    4. 选择执行任务模板
  7. 执行任务

  8. 查看日志

  9. 查看结果

DataX实现数据迁移

1. 建模设计

  1. 数据同步方式

    • 全量覆盖
    • 仅新增
    • 新增或更新
    • 全量同步
  2. 内部表还是外部表

    • 根据是否对数据有完全的控制权
    • 内部表对表数据有完全的控制权,删除数据时不仅可以删除元数据还可以删除实际数据
    • 外部表仅支持删除元数据,不能删除实际数据
    • 元数据存储在MySQL中,实际数据存储在HDFS中
  3. 分区表还是分桶表

    • 分区表,将数据拆分成一个一个文件夹进行存储
    • 分桶表将数据拆分成一个个数据文件进行存储,主要用来处理多表join、抽样
  4. 存储方式和压缩方式

    1. 存储方式
      • 行式存储:textfile
      • 列式存储:ORC、PARQUIT
    2. 压缩方式
      • SNAPPY
      • GZIP
      • ZLIB
  5. 表字段选择

    • 根据用户需求确定
  6. 缓慢渐变维

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    SCD1:不维护历史变更行为, 直接对过去数据进行覆盖即可 
    此种操作 仅适用于错误数据的处理

    SCD2: 维护历史变化行为, 每天同步全量的数据, 不管是否有变化, 均全量维护到一个新的分区中

    SCD3:维护历史变更行为,处理方式在表中新增两个新的字段,一个是起始时间,一个是结束时间,当数据发生变更后,将之前的数据设置为过期,将新的变更后完整的数据添加到表中,重新记录其起始和结束时间,将这种方案称为拉链表
    好处:可以维护更多的历史版本的数据, 处理起来也是比较简单的 (利于维护)
    弊端:造成数据冗余存储 大量占用磁盘空间


    SCD4:维度历史变化,处理方式,当表中有字段发生变更后,新增一列,将变更后的数据存储到这一列中即可
    好处:减少数据冗余存储
    弊端:只能维护少量的历史版本, 而且维护不方便, 效率比较低

2. 构建原始业务表及导入数据

  1. 表类型:内部表+ORC+ZLIB+分区表

  2. 表字段image-20230922192557207

  3. 基于DataX-Web实现数据导入(全量覆盖)

    1. 新建数仓项目
    2. 创建任务模板:每天凌晨00:20:00执行一次,失败重新执行2次
    3. 添加数据源
      1. MySQL数据源
      2. Hive目标库
    4. 构建任务
      1. reader
      2. writer
      3. 字段映射
      4. 构建JSON文件-选择任务模板
        • 将writeMode属性修改为 $ truncate$
    5. 执行任务
    6. 查看任务执行日志
    7. Hive中验证是否导入成功
  4. 基于DataX-Web实现数据导入(仅新增)

    1. 新建数仓项目

    2. 创建任务模板:每天凌晨00:20:00执行一次,失败重新执行2次

    3. 首次导入为全量覆盖,新增导入前一天的数据

    4. 全量覆盖等同 ==$ 基于DataX-Web实现数据导入(全量覆盖)$==

      • 需要先将全部数据导入临时表

      • 将临时表的数据进行分区处理,导入目标分区表

        1
        2
        3
        4
        5
        insert into[overwrite] table ods.ods_sale_shop_refund_i partition (dt)
        select
        *,date(create_time) dt
        -- date_format(create_time,'yyyy-MM-dd') as dt
        from ods.ods_sale_shop_refund_i_temp;
      • 分区后,删除临时表

        1
        drop table ods.ods_sale_shop_refund_i_temp;
    5. 增量模式(T+1)

      • 处理前一天的数据(在DataX的Reader中的where条件中添加如下语句)

        1
        2
        3
        4
        create_time between concat(date_sub(current_date,INTERVAL 1 DAY),' 00:00:00') and concat(date_sub(current_date,INTERVAL 1 DAY),' 23:59:59')

        或者:
        date_format(create_time,'%Y-%m-%d') = DATE_FORMAT(date_sub(NOW(),INTERVAL 1 DAY),'%Y-%m-%d')
      • ==将append修改为truncate==

      • ==在任务列表里面编辑辅助参数(选择Hive分区)==

      • ==在json配置文件中添加后执行sql:postSql属性中添加修复分区的sql==

        1
        2
        3
        4
        5
        # 方式一: HIVE自动修复分区 (比较适合于有多个分区的情况)(推荐,可以在任务执行完成后自动执行)
        MSCK REPAIR TABLE ods.ods_sale_shop_refund_i;

        # 方式二:手动在hive中添加一个分区(不推荐)
        alter table ods.ods_sale_shop_refund_i add partition (dt='2023-01-05')
    6. 执行任务

    7. 查看执行日志

    8. hive中查看结果

    9. HDFS中查看结果

  5. 新增及更新导入

    1. 首次导入同全量导入
    2. 增量导入,修改where条件==导入昨天一天新增的和更新的==
  6. 全量同步(相当于历史数据快照的功能)

    1. 创建一个新的分区
    2. 将所有的数据导入新的分区
    3. 仅保留近3个月的数据