数据传输2.0

数据传输解决各类数据源之间的数据同步问题。

使用流程

  1. 登记数据源
  2. 数据传输中创建任务
  3. 试运行任务,查看运行结果
  4. 数据传输提交上线
  5. 猛犸开发平台创建数据同步任务调用数据传输中创建的任务
  6. 猛犸开发平台配置数据同步任务调度

登记数据源

登记数据源第一步需要申请相关的数据源权限:数据源连接说明

以登记MySQL为例,登记的过程中需要填写JDBC串,JDBC串生成规则:jdbc:mysql://ServerIP:Port/Database jdbc:mysql://10.173.32.220:3306/poc

数据传输中创建任务

1.点击新建任务

2.配置任务名称

3.来源与去向配置,如果表不存在,可以点击快速创建表。快速创建表实现了源端数据表的建表语句转化成目标端的建表语句,对于点击快速创建表的表会在任务运行的时候生成,不是马上生成。

4.配置字段映射

对于字段映射,默认使用的匹配策略是列名匹配,如果列名称没有匹配上,这一列的来源字段显示为不导入,用户可以自己手动修改匹配的列。

当用户选择不导入的时候:目标表中的这个字段将不会导入任何数据

当用户选择自定义表达式:如果输入‘123’,那么这个字段会导入字符串‘123’;如果输入的 123 那么导入的是数字 123 ;也可以输入源端数据库支持的SQL表达式比如 cast(date as char),将源表中的date这个字段转化成char;后续这边也会考虑做脱敏,如果有这方面的需求可以和我们沟通。

试运行任务,查看运行结果

目前的运行结果只能在日志页面查看

数据传输提交上线

注意:这里提交上线的时候需要在数据传输那边也点击一下提交上线(这块联动需要在后面的版本完成)

猛犸开发平台创建数据同步任务调用数据传输中创建的任务

猛犸开发平台配置数据同步任务调度

已经支持用户场景

抽取数据源场景

数据读取增量抽取全量抽取流水型抽取分库分表
MYSQL要求表中时间字段支持要求表中有自增字段支持
ORACLE要求表中时间字段支持要求表中有自增字段支持
DDB要求表中时间字段支持不支持不支持
HIVE要求表中时间字段支持不支持不支持

写入数据源场景

数据读取insert intoinsert overwriteinsert ignoreupsert(update or insert)
MYSQL支持支持支持
ORACLE支持支持支持
DDB支持支持支持
HIVE支持支持

使用样例说明

全量插入

参考使用说明的例子

RDS 以一个时间字段为增量,每天凌晨抽取昨天的数据,写入一个HIVE分区

这个版本的新建hive表不是在点击新建hive表的时候创建的,是在任务运行的时候创建的。目前存在一个问题,如果生成的表是分区表暂时无法显示分区信息(下个版本修改为在点击新建创建表的时候马上创建)

所以这个版本需要把这个分区的sql语句复制处理,在自助分析运行一下

时间参数参考:关于时间参数的使用说明

HIVE最新一天分区数据,每天凌晨update到RDS中

这里我们使用的是mysql写入的upset功能

upsert(update or insert), 即更新或写入。 MySQL中实现upsert操作方式:通过判断插入的记录里是否存在主键索引或唯一索引冲突,来决定是插入还是更新。当出现主键索引或唯一索引冲突时则进行update操作,否则进行insert操作。

MYSQL/ORACLE 以自增字段做流水抽取

要求表必须有一个唯一自增字段

起始值说明:

  1. 在开发模式/试运行 下运行任务 每次都是从起始值运行到最新值
  2. 在调度模式(线上)下 从起始站运行到最新值,然后用最新值作为下次到起始值

API数据传输

数据源登记说明:

自定义参数表示的静态的数据,就是不会变化的

在参数名称中点击添加,弹窗如下,这里输入需要通过API请求获取的数据

在任务配置的时候使用刚刚登记的参数用 $(xxx)表示

批量创建任务

选择数据源

配置生成表的模式

选择去向hive库和写入规则

时间参数

基本用法

需要使用的地方一般是设置抽取条件,hive的分区写入条件。

目前数据传输支持的时间参数有(后续根据用户希望的时间类型进行扩张,如果没有可先参考下面的自定义的方式):

Key nameExampleMeaning
azkaban.flow.start.timestamp2018-08-21T15:23:15.075+08:00计划执行时间的具体时间
azkaban.flow.start.year2018计划执行时间的年
azkaban.flow.start.month08计划执行时间的月
azkaban.flow.start.day21计划执行时间的日
azkaban.flow.start.hour15计划执行时间的小时
azkaban.flow.start.minute23计划执行时间的分钟
azkaban.flow.start.second15计划执行时间的秒
azkaban.flow.start.milliseconds075计划执行时间的毫秒
azkaban.flow.start.timezoneAsia/Shanghai计划执行时间的时区
azkaban.flow.current.date2018-08-21计划执行时间日期
azkaban.flow.current.month2018-08-01计划执行时间所在月开始1号
azkaban.flow.current.hour2018-08-21 15:00:00计划执行时间所在小时的0分0秒
azkaban.flow.1.days.ago2018-08-20计划执行时间一天前日期
azkaban.flow.2.days.ago2018-08-19计划执行时间两天前日期
azkaban.flow.3.days.ago2018-08-18计划执行时间三天前日期
azkaban.flow.7.days.ago2018-08-14计划执行时间七天前日期
azkaban.flow.30.days.ago2018-07-22计划执行时间三十天前日期
schedule.exec.time1534839814392计划执行时间的unix时间戳

自定义时间参数

如果用户希望的时间参数不在上述列表中,可以参考下面的方式进行:

先在猛犸开发平台

然后在数据传输中使用${user-defined}

推荐使用Joda Time来自定义时间以及日期的格式。

获取计划执行时间n小时前的日期 $(new("org.joda.time.DateTime", ${schedule.exec.time}).minusHours(n).toString('YYYY-MM-dd HH:mm:ss'))

获取计划执行时间n天前的日期 $(new("org.joda.time.DateTime", ${schedule.exec.time}).minusDays(n).toString('YYYY-MM-dd HH:mm:ss'))

获取指定时间的时间戳 $(new('azkaban.utils.DateMacro').toDateTime(${azkaban.flow.3.days.ago}).withTimeAtStartOfDay().getMillis())

任务管理

表抽取/写入情况分析

场景一:对于ODS层的数据抽取,应该尽量和源端数据库保持一致(这样就没有必要一个任务创建多个任务),但是也不乏出现同一个表被创建了好几个任务,提供这个接口帮助用户找出同一个表被那些任务引用了。

场景二:对于需要接入某个数据库的某个表,先搜索一下这个任务是否已经被导入了,导入到hive中的表是哪个

任务调度情况分析

确定了这个表归属于哪个任务,查看这个任务被那些调度任务引用了,就可以发现这个表导入数仓的情况

引用情况,可以看到这个任务被那些调度任务给引用了

运行结果,可以看到每一次的对这个任务对调度情况

任务下线流程

  1. 通过引用详情,下线对应的调度任务
  2. 点击下线按钮

数据源管理

登记数据

数据源引用任务情况

引用情况,罗列了哪些任务使用到了这个数据源。如果需要下线一个数据源,需要先下线相关任务。