mysql 同步数据到 elasticsearch

数据量比较大的情况下,elasticsearch单表操作要比mysql快很多,全文检索也比mysql快很多。试用了一下阿里的canal,感觉还不错。

一,安装java

# yum install java-1.8.0-openjdk-devel.x86_64

二,下载canal

下载地址:https://github.com/alibaba/canal/releases/tag/canal-1.1.5-alpha-2

# mkdir adapter deployer
# tar zxvf canal.adapter-1.1.5-SNAPSHOT.tar.gz -C adapter
# tar zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C deployer

不要直接解压,指定目录解压

三,创建测试mysql数据库和表

1,创建数据库和表

CREATE DATABASE `result_lianshan_saas` default CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE TABLE `test` (
`id` int(4) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',
`ids` int(4) NOT NULL DEFAULT 0 COMMENT 'ID',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin AUTO_INCREMENT=1 COMMENT='tank test';

2,创建用户和分配权限

mysql> CREATE USER canal IDENTIFIED BY 'Canal-123';
Query OK, 0 rows affected (0.06 sec)
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
Query OK, 0 rows affected (0.04 sec)
mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.10 sec)

3,开启binlog

# vim /etc/my.cnf
[mysqld]
log-bin=mysql-bin   # 开启 binlog
binlog-format=ROW   # 选择 ROW 模式
server_id=1      # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
# systemctl restart mysqld

四,创建索引

# curl -XPUT "http://10.0.40.200:9200/result_lianshan_saas/?pretty" -H "Content-Type: application/json" -d'
{
"mappings" : {
"test": {
"_all":{
"enabled":false
},
"properties": {
"id": {
"type": "integer"
},
"ids": {
"type": "integer"
}
}
}
},
"settings" : {
"number_of_shards" : "3",
"number_of_replicas" : "1"
}
}'

索引名最好和数据库名一样,方便自己

五,配置Canal-server

# egrep -v "(^#|^$)" deployer/conf/example/instance.properties
canal.instance.gtidon=false
canal.instance.master.address=10.0.40.200:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
canal.instance.tsdb.enable=true
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal-123
canal.instance.connectionCharset = UTF-8
canal.instance.enableDruid=false
canal.instance.filter.regex=.*\\..*
canal.instance.filter.black.regex=mysql\\.slave_.*
canal.mq.topic=example
canal.mq.partition=0
# ./deployer/bin/startup.sh
# tail -f logs/canal/canal.log

日志很重要,如果有报错肯定是同步不了的

六,配置canal.adapter

1,修改application.yml

# egrep -v "(^#|^$)" adapter/conf/application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
srcDataSources:
defaultDS:
url: jdbc:mysql://10.0.40.200:3306/result_lianshan_saas?useUnicode=true
username: canal
password: Canal-123
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: es6   //根目录要对上,不然报错的,es6还是7,要根es的版本对上
hosts: 10.0.10.245:19200 # 127.0.0.1:9200 for rest mode
properties:
mode: rest # or rest
# security.auth: test:123456 #  only used for rest mode
cluster.name: estestcluster

2,设置同步的表

# cat adapter/conf/es6/test.yml
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
_index: result_lianshan_saas
_type: test
_id: _id
sql: "select test.id as _id ,test.id,test.ids from test"
commitBatch: 3000

删除默认的demo,索引名和type类型要和es的对上

3,启动canal.adapter

# ./bin/startup.sh
# tail -f logs/adapter/adapter.log

日志很重要,如果有报错肯定是同步不了的

4,测试在mysql中插入

mysql> insert into result_lianshan_saas.test(ids)value(1);

在logs/adapter/adapter.log 日志会有以下内容

2021-02-20 10:15:47.032 [pool-2-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample – DML: {“data”:[{“id”:1,”ids”:1}],”database”:”result_lianshan_saas”,”destination”:”example”,”es”:1613787346000,”groupId”:”g1″,”isDdl”:false,”old”:null,”pkNames”:[“id”],”sql”:””,”table”:”test”,”ts”:1613787347032,”type”:”INSERT”}

在es中就查看

# curl -XPOST "http://10.0.40.193:9200/result_lianshan_saas/test/_search?pretty"  -H "Content-Type: application/json"
{
"took" : 3,
"timed_out" : false,
"_shards" : {
"total" : 3,
"successful" : 3,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 1,
"max_score" : 1.0,
"hits" : [
{
"_index" : "result_lianshan_saas",
"_type" : "test",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"id" : 1,
"ids" : 1
}
}
]
}
}

七,mysql添加字段同步

1,修改adapter/conf/es6/test.yml

# vim adapter/conf/es6/test.yml
sql: "select test.id as _id ,test.id,test.ids,test.hao from test" //加字段

2,停止adapter/bin/stop.sh

3,删除deployer/conf/example/h2.mv.db

4,重启deployer/bin/restart.sh

5,启动adapter/bin/startup.sh

海底苍鹰(tank)博客
我还没有学会写个人说明!
上一篇

HQYJ嵌入式学习笔记——C语言复习day1

下一篇

网友投诉网络剧控评 市场监管总局:涉违反《反不正当竞争法》

你也可能喜欢

评论已经被关闭。

插入图片