sqoop导入数据、全库导入和创建job以及实现定时增量导入的示例分析
这篇文章将为大家详细讲解有关sqoop导入数据、全库导入和创建job以及实现定时增量导入的示例分析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
网站建设哪家好,找创新互联公司!专注于网页设计、网站建设、微信开发、微信小程序开发、集团企业网站建设等服务项目。为回馈新老客户创新互联还提供了行唐免费建站欢迎大家使用!
先从功能简单的开始介绍,
sqoop导入单个表到hive:
sqoop import \ --connect jdbc:MySQL://192.168.49.214:3306/mysqlcdc --username root \ --password 123456 \ --table data \ --hive-import \ --fields-terminated-by '\t' \ -m 1
这是最简单的将mysql表导入hive中,没有指定hive表名,默认在default库,表名和mysql表同名。sqoop也可以通过sql语句来从多表中选择自己想要的数据,比如:
sqoop import \ --connect 'jdbc:sqlserver://192.168.49.180:1433;database=rcscounty_qn' \ --username sa \ --password 123456! \ --fields-terminated-by '\t' \ --hive-import \ --hive-table rcs.user_orgname \ --m 1 \ --query 'SELECT u.USER_ID as id, u.USER_NAME as name, u.ORG_ID as orgId, o.ORG_NAME as orgName FROM USER u , ORG o where o.ORG_ID = u.ORG_ID and $CONDITIONS'
通过sqoop导入sqlserver数据库的数据,通过query查询出自己想要的数据,将这些数据导入hive中。 $CONDITIONS 是不能缺少的,有查询条件的时候查询条件和and连接,没有查询条件的时候放在where中就可以了。
通过sqoop导入数据到hive中,有以下一些特点:
1)指定的hive表可以存在也可以不存在,不存在则会自动创建,表存在假如没有数据则会将数据导入,数据格式不对会报错,加入--hive-overwrite会将hive表进行重写。
2)通过sqoop创建的hive表只能是内部表,即使通过--target-dir指定了数据在hdfs中存储的路径,实际上在hdfs中只会创建文件夹,数据默认是放在/user/hive/warehouse/里面。
3)同一张hive表通过--fields-terminated-by指定的分隔符要统一,否则后导入的数据会挤到一列,无法分开。
2.sqoop全库导入
sqoop import-all-tables "-Dorg.apache.sqoop.splitter.allow_text_splitter=true" \ --connect 'jdbc:sqlserver://192.168.49.180:1433;database=rcscounty_qn' \ --username sa \ --password 123456 \ --fields-terminated-by '\t' \ --hive-database ods_quannan \ -m 10 \ --create-hive-table \ --hive-import \ --hive-overwrite \ --autoreset-to-one-mapper
将一个数据库内的表都导入一个hive库中,假如这个库中所有的表都有主键,则不需要最后一行的--autoreset-to-one-mapper。
假如需要将每个hive表进行一定规律的改名,比如以前的表名是table,希望导入的表名叫ods_table,是无法通过sqoop来实现的,需要自己写脚本来导入。
我在执行全库导入时,全库为133张表,实际导入为80张表。可能是我参数哪里有问题,多次执行都是这样。所以建议导入之后检查一下数量是否正确。
3.sqoop增量导入
由于hive表没有主键,所以hive表无法实现update,只能将新插入的数据添加进来,也就是增量导入。
增量导入有两种方式,一种是append,一种是incremental lastmodified。
增量导入只能导入到hdfs中,不能导入到hive中,所以语句中不要有--hive import。
append方式:
sqoop import \ --connect 'jdbc:mysql://192.168.49.201:3307/blade?serverTimezone=UTC&zeroDateTimeBehavior=CONVERT_TO_NULL' \ --username root \ --password 123456 \ --table blade_blog \ --target-dir '/user/hive/warehouse/ods.db/blade_blog' \ --incremental append \ --check-column id \ --last-value 3 \ -m 1
由于修改的是hdfs数据,所以需要用target-dir指定hdfs路径。没有加时区可能会报错Establishing SSL connection without server's identity verification is not recommended.但是加了时区,在传递时间类型的数据时,假如设置不正确,可能会将hive中得到的数据比mysql中的数据快/慢。在设置的时候要先查询自己数据库的时区设置,一般mysql默认时区是UTC。
&zeroDateTimeBehavior=CONVERT_TO_NULL参数和时区原因一致,不加的话无法连接到mysql数据库。假如不加就能连到mysql数据库的话,不加也可以。
指定增量方式为append,检查列为id,设定值为3,所以id比3大(不含等于)的数据都会被导入。不会合并重复数据,所以如果你连续执行两遍,会看到两个id为4的数据。
检查列不能是字符,必须是数字或者是时间。append方式官方推荐用数字,时间建议用lastmodified方式导入。
lastmodified方式又分两种增量导入方式,一种是不合并重复数据(append),一种会合并重复数据(merge-key) ,例子如下
append方式
sqoop import --connect 'jdbc:mysql://192.168.49.214:3306/mysqlcdc?serverTimezone=UTC&zeroDateTimeBehavior=CONVERT_TO_NULL' \ --username root \ --password 123456 \ --table data \ --target-dir '/user/hive/warehouse/data' \ --check-column last_mod \ --incremental lastmodified \ --last-value '2019-08-30 16:49:12' \ --m 1 \ --append
last_mod列所有时间大于等于2019-08-30 16:49:12的数据都会被导入。
merge-key方式:
sqoop import --connect 'jdbc:mysql://192.168.49.214:3306/mysqlcdc?serverTimezone=CST&zeroDateTimeBehavior=CONVERT_TO_NULL' \ --username root \ --password 123456 \ --table data \ --target-dir '/user/hive/warehouse/data' \ --check-column last_mod \ --incremental lastmodified \ --fields-terminated-by ',' \ --last-value '2019-08-28 17:31:58' \ --m 1 \ --merge-key id
指定merge-key为id,hive表中所有id重复的数据都会合并,无论是否是本次增量导入添加的。
要注意一点,导入的数据实际时间范围是你指定的last-value到执行这个sqoop语句,比如你指定了last-value为2019-08-28 17:31:58,执行这个sqoop语句的时间是2021-1-8 15:00:00,但是数据库里有个数据时间是2022-2-4 12:31:21,这个数据是不会被导入进来的。在打印的日志里面能够看到:
假如执行没有报错,重复数据也合并了,但是数据没有更新也没有新导入,建议检查一下hdfs文件路径是否正确。
4.将增量导入创建为job,并建立定时任务
sqoop可以将一些sqoop操作保存下来作为job,方便以后执行。之后创建定时任务,来达到定时增量导入的目的。
创建sqoop job:
sqoop job \ --create one.more.time \ -- import \ --connect 'jdbc:mysql://192.168.49.101:3307/maybe?serverTimezone=UTC&zeroDateTimeBehavior=CONVERT_TO_NULL' \ --username root \ --password 123456 \ --table blade_blog \ --target-dir '/user/hive/warehouse/ods.db/b_blog' \ --fields-terminated-by '\t' \ --m 1 \ --check-column update_time \ --incremental lastmodified \ --last-value '2020-01-16 15:34:01' \ --merge-key id
这样就创建了名为one.more.time的job了。
查看job:
sqoop job --list
通过job来执行定时增量导入,第一次执行的last-value值为你指定的值,之后运行会记录你执行这个job的时间,来作为下次last-value的参数,实现动态配置last-value,避免重复导入。
执行job:
sqoop job --exec one.more.time
创建定时任务:
先检查是否安装了crontab
rpm -qa | grep crontab
没有的话安装crontab,centos为yum install crontabs。
编写一个shell脚本,来执行job。
为当前用户创建定时任务:
crontab -e
进入编辑
40 08 * * 1-5 ls /home/software/sqoop-script/maybe.sh
表示周一至周五,每天8:40执行maybe.sh。更多的crontab时间编写规范请看 Linux基础之定时任务。
这样就实现定时增量同步了。
关于sqoop导入数据、全库导入和创建job以及实现定时增量导入的示例分析就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
文章标题:sqoop导入数据、全库导入和创建job以及实现定时增量导入的示例分析
URL标题:http://azwzsj.com/article/jdgdjh.html