StreamSets实时采集MySQL数据到HBase

微信扫一扫,分享到朋友圈

StreamSets实时采集MySQL数据到HBase

点击关注上方“


知了小巷



”,

设为“置顶或星标”,第一时间送达干货。

本地HBase环境

$ jps
4082 Jps
3556 NameNode
3813 QuorumPeerMain
3911 HMaster
3642 DataNode
3739 SecondaryNameNode
3999 HRegionServer

本地环境演示实例

mysql环境

$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
09849f9303ae 1fdf3806e715 "/entrypoint.sh mysq…" 22 months ago Up About a minute (healthy) 0.0.0.0:3306->3306/tcp, 33060/tcp mysql

mysql版本:8.0.12

hbase环境版本:

Apache Hadoop:hadoop-3.1.1

Apache HBase:hbase-2.1.0

Apache Phoenix:apache-phoenix-5.0.0-HBase-2.0-bin

本地sdc环境

$ docker run --restart on-failure -p 18630:18630 -d --name streamsets-dc streamsets/datacollector
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
cd2d89509457 streamsets/datacollector "/docker-entrypoint.…" 35 minutes ago Up 35 minutes 0.0.0.0:18630->18630/tcp streamsets-dc
09849f9303ae 1fdf3806e715 "/entrypoint.sh mysq…" 22 months ago Up About an hour (healthy) 0.0.0.0:3306->3306/tcp, 33060/tcp mysql

虽然
docker很方便,但是在连接HBase时需要打通网络,由于本地HBase是localhost,访问有问题,所以放弃docker版sdc,改为本地解压版。

[zk: localhost:2181(CONNECTED) 4] get /hbase/master
�master:16000 }��PBUF


localhost�}�����.�}
cZxid = 0x1a725
ctime = Tue Jul 21 11:02:46 CST 2020
mZxid = 0x1a725
mtime = Tue Jul 21 11:02:46 CST 2020
pZxid = 0x1a725
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1000069b14e0000
dataLength = 57
numChildren = 0

SDC本地解压版:

$ ./bin/streamsets dc
Java 1.8 detected; adding $SDC_JAVA8_OPTS of "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -Djdk.nio.maxCachedBufferSize=262144" to $SDC_JAVA_OPTS
Bypass activation because SDC contains only basic stage libraries.
Logging initialized @2901ms to org.eclipse.jetty.util.log.Slf4jLog
Running on URI : 'http://192.168.31.29:18630'

文档地址:

https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Getting_Started/GettingStarted_Title.html#concept_htw_ghg_jq

sdc支持的HBase版本下文测试环境演示实例中可以看到。

Phoenix与HBase服务的集成:

只需要将
Phoenix包解压后的phoenix-5.0.0-HBase-2.0-server.jar
phoenix-core-5.0.0-HBase-2.0.jar两个jar包拷贝到hbase的lib目录下,修改hbase-site.xml,添加相关配置,重启hbase集群即可。

  <property>
<name>phoenix.schema.isNamespaceMappingEnabled</name>
<value>true</value>
</property>
<property>
<name>phoenix.schema.mapSystemTablesToNamespace</name>
<value>true</value>
</property>

为了方便通过sqlline.py 访问
phoenix,将hbase-site.xml复制一份到phoenix的bin目录下。

$ ./bin/sqlline.py 
Setting property: [incremental, false]
Setting property: [isolation, TRANSACTION_READ_COMMITTED]
issuing: !connect jdbc:phoenix: none none org.apache.phoenix.jdbc.PhoenixDriver
Connecting to jdbc:phoenix:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/shaozhipeng/Development/apache-phoenix-5.0.0-HBase-2.0-bin/phoenix-5.0.0-HBase-2.0-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/shaozhipeng/Development/hadoop-3.1.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
20/07/20 16:28:09 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Connected to: Phoenix (version 5.0)
Driver: PhoenixEmbeddedDriver (version 5.0)
Autocommit status: true
Transaction isolation: TRANSACTION_READ_COMMITTED
Building list of tables and columns for tab-completion (set fastconnect to true to skip)...
133/133 (100%) Done
Done
sqlline version 1.2.0
0: jdbc:phoenix:> !tables
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------------+-+
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION | INDEX_STATE | IMMUTABLE_ROWS | SALT_BUCKETS | |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------------+-+
| | SYSTEM | CATALOG | SYSTEM TABLE | | | | | | false | null | |
| | SYSTEM | FUNCTION | SYSTEM TABLE | | | | | | false | null | |
| | SYSTEM | LOG | SYSTEM TABLE | | | | | | true | 32 | |
| | SYSTEM | SEQUENCE | SYSTEM TABLE | | | | | | false | null | |
| | SYSTEM | STATS | SYSTEM TABLE | | | | | | false | null | |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------------+-+
0: jdbc:phoenix:>

MySQL创建表user

CREATE TABLE `user` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`user_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL DEFAULT '' COMMENT '用户名',
`update_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

HBase创建命名空间和表

create_namespace 'ZLXX'
create 'ZLXX:USER', 'INFO'


hbase(main):001:0> create_namespace 'ZLXX'
Took 0.9431 seconds
hbase(main):002:0> create 'ZLXX:USER', 'INFO'
Created table ZLXX:USER
Took 1.4457 seconds
=> Hbase::Table - ZLXX:USER

Phoenix创建schema和表映射

create schema ZLXX;
create table ZLXX.USER (
id varchar primary key,
info.id varchar,
info.user_name varchar,
info.update_time varchar
) column_encoded_bytes=0;


0: jdbc:phoenix:> !tables
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------+
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION | INDEX_STATE | IMMUTABLE_ROWS | SALT_BU |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------+
| | SYSTEM | CATALOG | SYSTEM TABLE | | | | | | false | null |
| | SYSTEM | FUNCTION | SYSTEM TABLE | | | | | | false | null |
| | SYSTEM | LOG | SYSTEM TABLE | | | | | | true | 32 |
| | SYSTEM | SEQUENCE | SYSTEM TABLE | | | | | | false | null |
| | SYSTEM | STATS | SYSTEM TABLE | | | | | | false | null |
| | ZLXX | USER | TABLE | | | | | | false | null |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------+

SDC创建pipeline流水线

需要先安装JDBC和CDH的组件

选择Origin:JDBC Query Consumer和Destination HBase

简单完整pipeline如图

上图中,直接从
UI管理界面上传SDC-MySQL JDBC驱动,根据提示重启SDC即可

上传成功后,可以在列表里面看到

MySQL版本是8.0.12,需要注意jar包版本

https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.12/mysql-connector-java-8.0.12.jar

主要的数据源配置和数据目标系统配置

JDBC-MySQL:

HBase-CDH6.3.0:

Validate成功之后,直接Start,运行后的界面:

简单演示一下,往
MySQL test.user表中插入和更新数据

INSERT INTO `test`.`user`(`id`, `user_name`, `update_time`) VALUES (2, 'ZLXX_INSERT', SYSDATE());

Record Count (since last startup)上面显示数量大于1是因为查询SQL里面的更新时间是大于等于OFFSET,而且每10秒扫描一次,因此会不断被扫描到,SQL里面加上了最近五分钟的限制,因此重复次数不会太多,如果是大于OFFSET,有可能会导致数据丢失。

hbase端的数据

hbase(main):004:0> scan 'ZLXX:USER', {LIMIT=>5}
ROW COLUMN+CELL
1 column=INFO:ID, timestamp=1595384382829, value=1
1 column=INFO:UPDATE_TIME, timestamp=1595384382829, value=20180903190809
1 column=INFO:USER_NAME, timestamp=1595384382829, value=xE9x82xB5xE5xBFx97xE9xB9x8F
1 column=INFO:_0, timestamp=1595384382829, value=
2 column=INFO:ID, timestamp=1595388092263, value=2
2 column=INFO:UPDATE_TIME, timestamp=1595388092263, value=20200722111640
2 column=INFO:USER_NAME, timestamp=1595388092263, value=ZLXX_INSERT
2 row(s)
Took 0.0598 seconds

Phoenix端查询

0: jdbc:phoenix:> select * from zlxx.user limit 5;
+-----+-----+--------------+-----------------+
| ID | ID | USER_NAME | UPDATE_TIME |
+-----+-----+--------------+-----------------+
| 1 | 1 | 邵志鹏 | 20180903190809 |
| 2 | 2 | ZLXX_INSERT | 20200722111640 |
+-----+-----+--------------+-----------------+
2 rows selected (0.134 seconds)

MySQL test.user更新操作

UPDATE `test`.`user` SET `user_name` = 'ZLXX_IN_UPDATE', `update_time` = SYSDATE() WHERE `id` = 2;

Phoenix和HBase查询

0: jdbc:phoenix:> select * from zlxx.user limit 5;
+-----+-----+-----------------+-----------------+
| ID | ID | USER_NAME | UPDATE_TIME |
+-----+-----+-----------------+-----------------+
| 1 | 1 | 邵志鹏 | 20180903190809 |
| 2 | 2 | ZLXX_IN_UPDATE | 20200722113033 |
+-----+-----+-----------------+-----------------+
2 rows selected (0.128 seconds)
hbase(main):005:0> scan 'ZLXX:USER', {LIMIT=>5}
ROW COLUMN+CELL
1 column=INFO:ID, timestamp=1595384382829, value=1
1 column=INFO:UPDATE_TIME, timestamp=1595384382829, value=20180903190809
1 column=INFO:USER_NAME, timestamp=1595384382829, value=xE9x82xB5xE5xBFx97xE9xB9x8F
1 column=INFO:_0, timestamp=1595384382829, value=
2 column=INFO:ID, timestamp=1595388662927, value=2
2 column=INFO:UPDATE_TIME, timestamp=1595388662927, value=20200722113033
2 column=INFO:USER_NAME, timestamp=1595388662927, value=ZLXX_IN_UPDATE
2 row(s)
Took 0.0761 seconds

需要注意上面
Phoenix映射表的时候,两个ID字段,命名需要注意。

【本地环境
SDC实时采集MySQL数据到HBase并映射Phoenix表查询、END】

附CDH的HBase版本:

不同
CDH版本对应的hbase版本(重要)

往期推荐:


到底什么样的企业应该建设数据中台?


数据中台到底是不是大数据的下一站?


Phoenix Java API配置及使用总结


Phoenix表映射


Phoenix视图映射


Kafka消息送达语义说明


Kafka基础知识总结


Hadoop YARN:ApplicationMaster向ResourceManager注册AM源码调试


Apache Hadoop YARN:Client<–>ResourceManager源码解析


Apache Hadoop YARN:Client<–>ResourceManager源码DEBUG


Hadoop YARN:ApplicationMaster与ResourceManager交互源码解析


Hive企业级调优


HiveQL查询连续三天有销售记录的店铺


HiveQL实战蚂蚁森林低碳用户排名分析:解法一


HiveQL实战蚂蚁森林低碳用户排名分析:解法二


HiveQL实战蚂蚁森林植物申领统计分析


Hive-函数


Hive-查询


Hive-DML(Data Manipulation Language)数据操作语言


Hive-DDL(Data Definition Language)数据定义


Hive优化(整理版)


Spark Core之Shuffle解析


数据仓库开发规范

喜欢就分享-点赞-在看吧,谢谢~~

微信扫一扫,分享到朋友圈

StreamSets实时采集MySQL数据到HBase

Boolean源码解剖学

上一篇

Redis服务之常用配置(三)

下一篇

你也可能喜欢

StreamSets实时采集MySQL数据到HBase

长按储存图像,分享给朋友