登录
首页 >  数据库 >  MySQL

MySQL数据库增量日志解析工具 Canal 实战

来源:SegmentFault

时间:2023-01-14 21:15:39 199浏览 收藏

数据库小白一枚,正在不断学习积累知识,现将学习到的知识记录一下,也是将我的所得分享给大家!而今天这篇文章《MySQL数据库增量日志解析工具 Canal 实战》带大家来了解一下MySQL数据库增量日志解析工具 Canal 实战,希望对大家的知识积累有所帮助,从而弥补自己的不足,助力实战开发!

简介

canal,阿里开源工具,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

应用场景

  • 数据库实时备份
  • 业务cache刷新
  • 索引构建和实时维护,例:将商品数据推送到es中构建倒排索引
  • 带业务逻辑的增量数据处理,例:增量数据推送到第三方平台

官网

https://github.com/alibaba/canal

原理

image.png
  1. MySQL
    master
    将数据写入
    binlog
  2. canal
    master
    发送
    dump
    协议
  3. master
    收到
    dump
    请求,推送
    binlog
    canal
  4. canal
    解析
    binlog
    ,可讲数据投递到
    MQ
    系统中,目前支持
    kafka
    RocketMQ

安装

配置mysql

建议的mysql版本是5.7.x
mysql8.0.x见官方说明https://github.com/alibaba/ca...

修改mysql配置文件

my.cnf

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1

新增用户并授权,测试的话可以直接使用root用户

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '123456'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

配置canal

查看是否安装java

➜  canal-admin java -version
java version "1.8.0_251"
Java(TM) SE Runtime Environment (build 1.8.0_251-b08)
Java HotSpot(TM) 64-Bit Server VM (build 25.251-b08, mixed mode)

若没有安装,去oracle官网下载

1.8
版本(切勿选择高版本),选择适合自己系统的版本安装即可,mac系统下载的dmg,直接点击安装,不再累述image.png

访问release页面,下载最新稳定版1.14

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

解压到指定目录

mkdir /tmp/canal
tar zxvf canal.deployer-1.1.4.tar.gz  -C /tmp/canal

进入到canal目录,查看文件

  • bin目录:运行命令
  • conf目录:配置文件目录
  • logs目录:记录运行log
  • lib目录:jar包

➜  ll                                  
total 0
drwxr-xr-x   7 jiao  staff   224B  6  3 17:17 bin
drwxr-xr-x   9 jiao  staff   288B  6  3 20:12 conf
drwxr-xr-x  83 jiao  staff   2.6K  6  1 17:54 lib
drwxr-xr-x   7 jiao  staff   224B  6  3 17:05 logs
drwxr-xr-x   7 jiao  staff   224B  5 29 17:43 pierced

修改

instance
配置文件

vi conf/example/instance.properties

配置参数详解:https://github.com/alibaba/ca...

我这里修改了数据连接地址,用户和密码,以及同步规则:只同步test数据库的sc_user表

## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.dbUsername = canal  
canal.instance.dbPassword = 123456
#主库binlog文件路径
#canal.instance.standby.journal.name =
#主库binlog偏移量
#canal.instance.standby.position =
canal.instance.filter.regex = test\\.sc_user

修改

Server
配置文件
vim conf/canal.properties

#tcp bind ip,设置投递到tcp时需设置
canal.ip = 192.168.101.47
#register ip to zookeeper
canal.register.ip = 192.168.101.47
canal.port = 11111
canal.metrics.pull.port = 11112
#canal instance user/passwd
#canal.user = canal
#canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

#canal-admin需设置,先不设置 
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441

#zk设置,单机的canal可不设置
canal.zkServers = 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
#flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
#tcp, kafka, RocketMQ,投递到哪
canal.serverMode = RocketMQ
#flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
##memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
##memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
##meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

##detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

#support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
#mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

#network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

#binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

#binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

#binlog ddl isolation
canal.instance.get.ddl.isolation = false

#parallel parser config
canal.instance.parser.parallel = true
##concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
##disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

#table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
#dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
#purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

#aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =

#################################################
#########         destinations        #############
#################################################
#使用的instance,可设置上面定义的example
canal.destinations = example
#conf root dir
canal.conf.dir = ../conf
#auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = manager
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
#########              MQ              #############
##################################################
#rocketmq地址设置
canal.mq.servers = 192.168.101.47:9876
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = test
#Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
#aliyun mq namespace
#canal.mq.namespace =

##################################################
#########     Kafka Kerberos Info    #############
##################################################
canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"

测试tcp

下载canal php客户端: https://github.com/xingwenge/canal-php

修改

src/sample/client.php
文件

connect($host, 11111);
    $client->checkValid('canal','E3619321C1A937C46A0D8BD1DAC39F93B27D4458');//认证信息,对于canal.properties配置的canal.user 和canal.passwd
//    $client->subscribe("1001", "example", ".*\\..*");
    $client->subscribe("1001", "example", "test.sc_user"); # 设置过滤规则

    while (true) {
        $message = $client->get(100);
        if ($entries = $message->getEntries()) {
            foreach ($entries as $entry) {
                Fmt::println($entry);
            }
        }
        sleep(1);
    }

    $client->disConnect();
} catch (\Exception $e) {
    echo $e->getMessage(), PHP_EOL;
}

运行

canal

sh bin/startup.sh

查看
log
是否有错误信息
tail -n 50 -f logs/example/example.log

输出一下内容说明
canal
已正常启动,
canal
已成功连接
mysql
准备发送
dump
指令同步数据

2020-06-03 19:30:47.625 [destination = example , address = /127.0.0.1:3307 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=303179,serverId=1,gtid=,timestamp=1591183839000] cost : 5ms , the next step is binlog dump

运行

php client.php
,并在数据库
sc_user
表中修改一条数据,可以看到
client
输出了数据库修改的内容
`================> binlog[mysql-bin.000001 : 208118],name[wcc-scm-service,sc_status_history], eventType: 2

-------> before
status_history_id : 2543 update= false
record_id : 662 update= false
status_type : 151 update= false
old_status : 158 update= false
new_status : 153 update= false
opt_type : SELLER_REVOKE update= false
user_id : 282 update= false
user_name : 加工厂1 update= false
remark : update= false
created : 2020-05-28 18:40:02 update= false
-------> after
status_history_id : 2543 update= false
record_id : 662 update= false
status_type : 151 update= false
old_status : 158 update= false
new_status : 153 update= false
opt_type : SELLER_REVOKE update= false
user_id : 282 update= false
user_name : 加工厂12 update= true
remark : update= false
created : 2020-05-28 18:40:02 update= false
TSocket: Could not read 4 bytes from 192.168.101.47:11111`

测试RocketMQ

安装RocketMQ

这里直接在docker里安装

1.安装server

docker run -d -p 9876:9876 --name rocketmq-server -e "MAX\_POSSIBLE\_HEAP=100000000" rocketmqinc/rocketmq sh mqnamesrv

2.新增配置文件
echo "brokerIP1=192.168.101.47" > broker.properties

3.安装
broker
/path/broker.properties
替换成第2步文件路径
docker run -d -p 10911:10911 -p 10909:10909 -v /path/broker.properties:/opt/rocketmq-4.4.0/bin/broker.properties --name rocketmq-broker --link rocketmq-server -e "NAMESRV\_ADDR=rocketmq-server:9876" -e "MAX\_POSSIBLE\_HEAP=200000000" rocketmqinc/rocketmq sh mqbroker -c broker.properties

4.安装RocketMQ Web
docker run -e "JAVA\_OPTS=-Drocketmq.namesrv.addr=192.168.101.47:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng -d

访问Rocket管理页面
http://localhost:8080/

image.png

修改配置文件

修改

canal.properties

投递到
tcp
修改成投递到
RocketMQ

`canal.serverMode = RocketMQ
canal.mq.servers = 192.168.101.47:9876`

运行canal

sh bin/startup.sh

查看log文件,排查是否有错误
tail -n 50 -f logs/canal/canal.log

tail -n 50 -f logs/example/example.log

顺利的话,在RocketMQ中可以查看到,新增的Topic和Message,RocketMQ可以在manage页面直接查看messge内容

image.png
image.png

后记

在安装和使用canal的时候还是遇到了一些

,比如
java高版本报错
投递到消息队列失败
等,需要耐心排查log日志,分析原因,canal也用到了很多技术栈[zookeeper、kafka、RocketMQ],后续将会进一步去深入研究。

今天关于《MySQL数据库增量日志解析工具 Canal 实战》的内容就介绍到这里了,是不是学起来一目了然!想要了解更多关于mysql的内容请关注golang学习网公众号!

声明:本文转载于:SegmentFault 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>
评论列表