【Canal】Canal集群实践
来源:SegmentFault
时间:2023-02-23 16:47:42 246浏览 收藏
你在学习数据库相关的知识吗?本文《【Canal】Canal集群实践》,主要介绍的内容就涉及到MySQL、Elasticsearch、rocketmq、canal、zookeeper,如果你想提升自己的开发能力,就不要错过这篇文章,大家要知道编程理论基础和实战操作都是不可或缺的哦!
一、前言
Canal是alibaba开源的中间件,纯java开发,是一个用来数据同步的数据管道,它将自己伪装成mysql的slaver,具备解析bin log的能力,为下游增量同步业务提供服务。Canal可以多个节点结合zookeeper组成高可用集群,Canal集群中同时只有一个active状态的节点用来解析(多线程)bin log,如果有节点退出,之前节点已经解析完成的解析位点和消费位点会同步到zookeeper,另外的节点就会顶上去继续解析bin log(从zookeeper读取解析位点和消费位点),为下游客户端提供服务。
以下将搭建一个Canal高可用集群并将解析的bin log直接投递消息到阿里云RocketMQ,供业务消费者消费。完成mysql数据增量同步到Elasticsearch的任务。
二、集群搭建资源准备
1、mysql准备
- 需要准备mysql服务标的
服务地址 | 库名称 | 用户 | 明文密码 |
---|---|---|---|
mysql.test.yiyaowang.com:3306 | b2c | b2c | d41d8cd98f00b204 |
mysql2.test.yiyaowang.com:3306 | yc_order | yc_order | d41d8cd98f00b204 |
说明:存在两个数据库实例,以下实践Canal需要同时监听多个库。
- 数据库明文密码采用druid加密
D:\yyw_mvn_repo\repository\com\alibaba\druid\1.1.21>java -cp druid-1.1.21.jar com.alibaba.druid.filter.config.ConfigTools d41d8cd98f00b204 privateKey:MIIBVQIBADANBgkqhkiG9w0BAQEFAASCAT8wggE7AgEAAkEAmmWRWcOG/HlVwLsN4FpnaOaQVKPAdvJBU5b24EVo0UHwLf8W08nqBr+DbTgKH3idgLtK0WURps4kFlGQKtOcEQIDAQABAkAmDeyiXD/0EI/jPfdwmbetMk7Wnbm9V35kdOwKYPExyhWtYjJlrBrRXJH+cafCEov13UvFpB5PO5PnUJLnqeoVAiEAzsb5W74wj6yc8En+DBwhI9Yd/HD40orl+U8wuhvmprMCIQC/JoVs28aj2YzphvtzeGCuxKIxeFcCqE9iybhHzIH0KwIgJlGnSkIfm7CAUONVagcYeRyn5+1DnzjQT3hGbmbXQpMCIQCKP2sKk110TbirgXPFTM/oNtDzpIyRoHdiBHDihNeMZwIhAIpE+nSOCNIWfbpc/ysOfTF/0iMqdHug3eo3HrYY75ht publicKey:MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJplkVnDhvx5VcC7DeBaZ2jmkFSjwHbyQVOW9uBFaNFB8C3/FtPJ6ga/g204Ch94nYC7StFlEabOJBZRkCrTnBECAwEAAQ== password:KbOYWBV9y9OFRVcegHMdkyPtV4vBDSahBsRRxXzvLfCUQLdhVJd5Tif571EHvM0EuBwJLmr+6LkR92KxNs4heg==
- druid加解密测试
/** * druid加解密测试 * @throws Exception */ @Test public void druidDecryptTest() throws Exception { //私钥 String privateKey = "MIIBVQIBADANBgkqhkiG9w0BAQEFAASCAT8wggE7AgEAAkEAmmWRWcOG/HlVwLsN4FpnaOaQVKPAdvJBU5b24EVo0UHwLf8W08nqBr+DbTgKH3idgLtK0WURps4kFlGQKtOcEQIDAQABAkAmDeyiXD/0EI/jPfdwmbetMk7Wnbm9V35kdOwKYPExyhWtYjJlrBrRXJH+cafCEov13UvFpB5PO5PnUJLnqeoVAiEAzsb5W74wj6yc8En+DBwhI9Yd/HD40orl+U8wuhvmprMCIQC/JoVs28aj2YzphvtzeGCuxKIxeFcCqE9iybhHzIH0KwIgJlGnSkIfm7CAUONVagcYeRyn5+1DnzjQT3hGbmbXQpMCIQCKP2sKk110TbirgXPFTM/oNtDzpIyRoHdiBHDihNeMZwIhAIpE+nSOCNIWfbpc/ysOfTF/0iMqdHug3eo3HrYY75ht"; //公钥 String publicKey = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJplkVnDhvx5VcC7DeBaZ2jmkFSjwHbyQVOW9uBFaNFB8C3/FtPJ6ga/g204Ch94nYC7StFlEabOJBZRkCrTnBECAwEAAQ=="; //密文密码 String password = "KbOYWBV9y9OFRVcegHMdkyPtV4vBDSahBsRRxXzvLfCUQLdhVJd5Tif571EHvM0EuBwJLmr+6LkR92KxNs4heg=="; //druid解密 log.info("ConfigTools.decrypt:{}", ConfigTools.decrypt(publicKey, password)); //druid加密 log.info("ConfigTools.encrypt:{}", ConfigTools.encrypt(privateKey, "d41d8cd98f00b204")); }
mysql服务必须开启bin log支持
- 修改mysql配置文件my.cnf
[mysqld] #开启bin log log-bin=mysql-bin #选择row模式 binlog-format=ROW #配置mysql replaction需要定义,不能和canal的slaveId重复 server_id=1
说明:Canal的原理是基于mysql binlog技术,所以这里一定需要开启mysql的binlog写入功能,并且配置binlog模式为row。
- 验证mysql配置文件my.cnf
mysql> show variables like 'binlog_format'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | binlog_format | ROW | +---------------+-------+ mysql> show variables like 'log_bin'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | log_bin | ON | +---------------+-------+
准备一个具备复制相关权限的mysql用户
#创建用户 CREATE USER b2c IDENTIFIED BY 'd41d8cd98f00b204'; #授予slaver复制所需的相关权限 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'b2c'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'b2c'@'%' ; #刷新权限,使得创建的用户权限生效 FLUSH PRIVILEGES; #查看用户被授予的权限 show grants for 'b2c'
2、zookeeper准备
Canal高可用集群依赖于zookeeper作为统一协调者,各个Canal server信息、Canal client信息、解析位点、消费位点会写入zookeeper;Canal客户端直接从zookeeper获取Canal服务端信息来消费,zk统一协调,保证只有一个Canal server处于激活状态。
- zookeeper节点地址
zk1.test.yiyaowang.com:2181,zk2.test.yiyaowang.com:2181,zk3.test.yiyaowang.com:2181
3、RocketMQ准备
- MQ的TOPIC和GID准备
环境 | 业务服务 | TOPIC | GID |
---|---|---|---|
测试环境 | 商品 | TOPIC_ITEM_ES_DATA_SYNC_TEST | GID_ITEM_ES_DATA_SYNC_TEST |
预发布环境 | 商品 | TOPIC_ITEM_ES_DATA_SYNC_STG | GID_ITEM_ES_DATA_SYNC_STG |
生产环境 | 商品 | TOPIC_ITEM_ES_DATA_SYNC | GID_ITEM_ES_DATA_SYNC |
测试环境 | 订单 | TOPIC_ORDER_ES_DATA_SYNC_TEST | GID_ORDER_ES_DATA_SYNC_TEST |
预发布环境 | 订单 | TOPIC_ORDER_ES_DATA_SYNC_STG | GID_ORDER_ES_DATA_SYNC_STG |
生产环境 | 订单 | TOPIC_ORDER_ES_DATA_SYNC | GID_ORDER_ES_DATA_SYNC |
说明Canal高可用集群并将解析的bin log直接投递消息到阿里云RocketMQ,供业务消费者消费。完成mysql数据增量同步到Elasticsearch的任务。
4、canal机器准备
系统版本 | 机器IP | 部署应用 | 应用版本 |
---|---|---|---|
CentOS 7.5 | 10.6.123.33 | canal.deployer | 1.1.4 |
CentOS 7.5 | 10.6.85.15 | canal.deployer | 1.1.4 |
三、Canal集群搭建及RocketMQ消息投递
1、Canal下载
- 直接下载,官网下载链接
- wget方式获取
#以v1.1.5-alpha-2版本为例 wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.deployer-1.1.5-SNAPSHOT.tar.gz
- 编译源码
git clone git@github.com:alibaba/canal.git git co canal-$version #切换到对应的版本上 mvn clean install -Denv=release
说明:执行完成后,会在canal工程根目录下生成一个target目录,里面会包含一个 canal.deployer-$verion.tar.gz
- canal deployer的目录结构
drwxr-xr-x. 2 root root 76 Sep 4 13:11 bin #启停脚本 drwxr-xr-x. 5 root root 130 Sep 4 13:28 conf #配置文件 drwxr-xr-x. 2 root root 4096 Sep 3 15:03 lib #依赖库 drwxrwxrwx. 4 root root 41 Sep 3 16:16 logs #日志目录
- 解压Canal程序包
#解压canal程序包至指定文件夹 mkdir /usr/local/canal tar zxvf /usr/local/canal.deployer-1.1.4.tar.gz -C /usr/local/canal #从当前机器(10.6.123.33)拷贝canal文件夹至另一台机器(10.6.85.15)的/usr/local目录下 scp -r /usr/local/canal root@10.6.85.15:/usr/local
2、Canal配置
[root@localhost conf]# pwd /usr/local/canal/conf [root@localhost conf]# ll total 16 -rwxrwxrwx. 1 root root 291 Sep 2 2019 canal_local.properties -rwxrwxrwx. 1 root root 5394 Sep 4 13:28 canal.properties -rwxrwxrwx. 1 root root 3119 Sep 2 2019 logback.xml drwxrwxrwx. 2 root root 39 Sep 3 15:03 metrics drwxrwxrwx. 2 root root 49 Sep 4 11:38 order_instance drwxrwxrwx. 3 root root 149 Sep 3 15:03 spring
我们需要配置的文件基本就只有canal.properties以及我们自定义的instance.properties,每个canal server都可以加载多个instance.properties,每个instance实例可以向一个mysql实例同步bin log,也就是说canal支持多个rdb库的同步业务需求。默认配置下Canal会自动扫描conf目录下我们自定义的目录(如:order_instance),并加载改目录下的instance.properties来启动一个instance实例。
- 配置系统canal.properties,所有instance实例公用,可被instance配置覆盖
################################################# ######### common argument ############# ################################################# # canal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务 canal.ip = 10.6.123.33 # canal server注册到外部zookeeper、admin的ip信息 canal.register.ip = 10.6.123.33 # canal server提供socket服务的端口 canal.port = 11111 canal.metrics.pull.port = 11112 # canal instance user/passwd # canal.user = canal # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458 # canal admin config #canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 #canal server链接zookeeper集群的链接信息 canal.zkServers = zk1.test.yiyaowang.com:2181,zk2.test.yiyaowang.com:2181,zk3.test.yiyaowang.com:2181 # flush data to zk canal.zookeeper.flush.period = 1000 canal.withoutNetty = false # 服务模式,包括tcp(canal客户端), kafka(直接投递消息到kafka), RocketMQ(直接投递消息到RocketMQ) canal.serverMode = RocketMQ # flush meta cursor/parse position to file canal.file.data.dir = ${canal.conf.dir} canal.file.flush.period = 1000 ## memory store RingBuffer size, should be Math.pow(2,n) canal.instance.memory.buffer.size = 16384 ## memory store RingBuffer used memory unit size , default 1kb canal.instance.memory.buffer.memunit = 1024 ## meory store gets mode used MEMSIZE or ITEMSIZE canal.instance.memory.batch.mode = MEMSIZE canal.instance.memory.rawEntry = true ## detecing config canal.instance.detecting.enable = false #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() canal.instance.detecting.sql = select 1 canal.instance.detecting.interval.time = 3 canal.instance.detecting.retry.threshold = 3 canal.instance.detecting.heartbeatHaEnable = false # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery canal.instance.transaction.size = 1024 # mysql fallback connected to new master should fallback times canal.instance.fallbackIntervalInSeconds = 60 # network config canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30 # binlog filter config canal.instance.filter.druid.ddl = true canal.instance.filter.query.dcl = false canal.instance.filter.query.dml = false canal.instance.filter.query.ddl = false canal.instance.filter.table.error = false canal.instance.filter.rows = false canal.instance.filter.transaction.entry = false # binlog format/image check canal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB # binlog ddl isolation canal.instance.get.ddl.isolation = false # parallel parser config canal.instance.parser.parallel = true ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors() canal.instance.parser.parallelThreadSize = 4 ## disruptor ringbuffer size, must be power of 2 canal.instance.parser.parallelBufferSize = 256 # table meta tsdb info canal.instance.tsdb.enable = true canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:} canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL; canal.instance.tsdb.dbUsername = canal canal.instance.tsdb.dbPassword = canal # dump snapshot interval, default 24 hour canal.instance.tsdb.snapshot.interval = 24 # purge snapshot expire , default 360 hour(15 days) canal.instance.tsdb.snapshot.expire = 360 # aliyun账号的ak/sk信息,如果使用阿里云的RocketMQ服务必填, support rds/mq canal.aliyun.accessKey = xxxxxxxxxxxxxxxxxxxxxx canal.aliyun.secretKey = xxxxxxxxxxxxxxxxxxxxxx ################################################# ######### destinations ############# ################################################# #当前server上部署的instance列表,不配置自动探测 canal.destinations = # conf root dir canal.conf.dir = ../conf # 开启instance自动扫描,如果配置为true,canal.conf.dir目录下的instance配置变化会自动触发: # a. instance目录新增: 触发instance配置载入,lazy为true时则自动启动 # b. instance目录删除:卸载对应instance配置,如已启动则进行关闭 # c. instance.properties文件变化:reload instance配置,如已启动自动进行重启操作 canal.auto.scan = true #instance自动扫描的间隔时间,单位秒 canal.auto.scan.interval = 5 canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml canal.instance.global.mode = spring canal.instance.global.lazy = false canal.instance.global.manager.address = ${canal.admin.manager} #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml #canal.instance.global.spring.xml = classpath:spring/file-instance.xml canal.instance.global.spring.xml = classpath:spring/default-instance.xml ################################################## ######### MQ ############# ################################################## #阿里云RocketMQ服务实例的TCP协议客户端接入点 canal.mq.servers = onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80 canal.mq.retries = 0 canal.mq.batchSize = 16384 canal.mq.maxRequestSize = 1048576 canal.mq.lingerMs = 100 canal.mq.bufferMemory = 33554432 canal.mq.canalBatchSize = 50 canal.mq.canalGetTimeout = 100 canal.mq.flatMessage = true canal.mq.compressionType = none canal.mq.acks = all #canal.mq.properties. = #canal.mq.producerGroup = # Set this value to "cloud", if you want open message trace feature in aliyun. canal.mq.accessChannel = cloud # aliyun mq namespace #canal.mq.namespace = ################################################## ######### Kafka Kerberos Info ############# ################################################## canal.mq.kafka.kerberos.enable = false canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf" canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"
- 在conf目录下自定义目录(如:order_instance)并创建instance.properties
################################################# ## mysql serverId , v1.0.26+ will autoGen # canal.instance.mysql.slaveId=0 # enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=mysql2.test.yiyaowang.com:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password canal.instance.dbUsername=yc_order # 密文密码 canal.instance.dbPassword=KbOYWBV9y9OFRVcegHMdkyPtV4vBDSahBsRRxXzvLfCUQLdhVJd5Tif571EHvM0EuBwJLmr+6LkR92KxNs4heg== canal.instance.connectionCharset = UTF-8 # 开启druid的加解密方式支持 canal.instance.enableDruid=true # druid公钥 canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJplkVnDhvx5VcC7DeBaZ2jmkFSjwHbyQVOW9uBFaNFB8C3/FtPJ6ga/g204Ch94nYC7StFlEabOJBZRkCrTnBECAwEAAQ== # table regex canal.instance.filter.regex=yc_order\\.t_order,yc_order\\.t_order_detail,yc_order\\.t_order_child,yc_order\\.t_order_delivery,yc_order\\.t_system_pay_type,yc_order\\.t_order_exception # table black regex canal.instance.filter.black.regex= # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config canal.mq.topic= TOPIC_ORDER_ES_DATA_SYNC_TEST canal.mq.producerGroup = GID_ORDER_ES_DATA_SYNC_TEST #canal.mq.namespace = # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #################################################
3、Canal相关命令与状态
Canal相关命令
- 启动canal
sh bin/startup.sh
- 停止canal
sh bin/stop.sh
- 查看日志
# 以order_instance实例日志为例 [root@localhost order_instance]# tail -f -n 500 logs/order_instance/order_instance.log 2020-09-03 16:16:19.143 [canal-instance-scan-0] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties] 2020-09-03 16:16:19.183 [canal-instance-scan-0] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [order_instance/instance.properties] 2020-09-03 16:16:20.751 [canal-instance-scan-0] WARN o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)] 2020-09-03 16:16:21.160 [canal-instance-scan-0] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties] 2020-09-03 16:16:21.161 [canal-instance-scan-0] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [order_instance/instance.properties] 2020-09-03 16:16:21.649 [canal-instance-scan-0] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-order_instance 2020-09-03 16:16:21.660 [canal-instance-scan-0] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^yc_order\.t_system_pay_type$|^yc_order\.t_order_exception$|^yc_order\.t_order_delivery$|^yc_order\.t_order_detail$|^yc_order\.t_order_child$|^yc_order\.t_order$ 2020-09-03 16:16:21.660 [canal-instance-scan-0] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : 2020-09-03 16:16:21.675 [canal-instance-scan-0] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful.... 2020-09-03 16:16:21.757 [destination = order_instance , address = mysql2.test.yiyaowang.com/10.6.168.14:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position 2020-09-03 16:16:21.761 [destination = order_instance , address = mysql2.test.yiyaowang.com/10.6.168.14:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status 2020-09-03 16:16:26.312 [destination = order_instance , address = mysql2.test.yiyaowang.com/10.6.168.14:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000571,position=283581902,serverId=168252,gtid=,timestamp=1599120981000] cost : 4543ms , the next step is binlog dump
说明:如果是canal集群,只会看到一台机器上出现了启动成功的日志。因为canal集群中的各个节点仅有一个节点是处于active激活状态的。
Canal集群注册状态
分别启动所有canal节点,观察zookeeper中的状态,zk会记录已经注册的所有canal server节点信息、当前运行的节点具体信息、消费节点信息。
- zookeeper中的注册状态
- 当前运行节点信息
{"active":true,"address":"10.6.123.33:11111"}
- 消费位点信息
# 数据消费成功后,canal server会在zookeeper中记录下当前最后一次消费成功的binlog位点. # (下次你重启client时,会从这最后一个位点继续进行消费) { "@type":"com.alibaba.otter.canal.protocol.position.LogPosition", "identity":{ "slaveId":-1, "sourceAddress":{ "address":"mysql2.test.yiyaowang.com", "port":3306 } }, "postion":{ "gtid":"", "included":false, "journalName":"mysql-bin.000573", "position":308524409, "serverId":168252, "timestamp":1599205987000 } }
四、Canal admin可视化配置管理
canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。
- 参考官方详细wiki
- 下载软件包
# 以 1.1.4 版本为例 wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
- 解压缩到指定位置
mkdir /usr/local/canal-admin tar zxvf canal.admin-1.1.4.tar.gz -C /usr/local/canal-admin
- 修改conf目录下application.yml
server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: address: 127.0.0.1:3306 database: canal_manager username: canal password: canal driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false hikari: maximum-pool-size: 30 minimum-idle: 1 canal: adminUser: admin adminPasswd: admin
- 初始化canal-admin依赖的mysql scheme
mysql -h127.1 -uroot -p # 导入初始化SQL source conf/canal_manager.sql
- 启动canal-admin
# 成功启动canal-admin,可以通过http://127.0.0.1:8089访问,默认密码:admin/123456 sh bin/startup.sh
- 修改canal server的canal_local.properties配置
# register ip canal.register.ip = # canal admin config canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # admin auto register canal.admin.register.auto = true #如果通过canal-admin管理canal集群,那么此处指定的集群名称必须在canal-admin中提前创建 canal.admin.register.cluster = canal-es-data-sync-cluster
- 指定读取canal_local.properties配置方式启动canal server
sh bin/startup.sh local
说明:canal配置交给canal-admin管理后,只需要配置canal_local.properties文件中的关于和canal-admin通信的几个配置即可,canal server会从canal-admin读取配置。
说明:成功启动canal-admin后,可配置canal集群,集群下server、server下instance。
五、客户端接入
当前最新稳定版本(1.1.4)的canal server支持tcp模式(canal client)、kafka消息投递、RocketMQ消息投递等方式
消费Canal投递的RocketMQ消息方式
- 消息格式如下
{ "data":[ { "order_id":"1", "flow_id":"XXD20181123155734506823", "cust_name":"测试更新abcd", "cust_id":"8366", "supply_name":"广东壹号药业有限公司", "supply_id":"8353", } ], "database":"yc_order", "es":1599189063000, "id":38, "isDdl":false, "mysqlType":{ "order_id":"int(11)", "flow_id":"varchar(50)", "cust_name":"varchar(150)", "cust_id":"int(11)", "supply_name":"varchar(150)", "supply_id":"int(11)", }, "old":[ { "cust_name":"测试更新abc" } ], "pkNames":[ "order_id" ], "sql":"", "sqlType":{ "order_id":4, "flow_id":12, "cust_name":12, "cust_id":4, "supply_name":12, "supply_id":4, }, "table":"t_order", "ts":1599189064173, "type":"UPDATE" }
- 消息数据模型
package com.gangling.b2border.obs.search.rocketmq.model; import java.io.Serializable; import java.util.List; /** * 消费canal消息的数据载体 */ public class CanalMessageDTO implements Serializable { /** * 消息数据 */ private String data; /** * 数据库名称 */ private String database; private Long es; /** * 递增,从1开始 */ private Integer id; /** * 是否是DDL语句 */ private boolean isDdl; /** * 表结构的字段类型 */ private String mysqlType; /** * UPDATE语句,旧数据 */ private String old; /** * 主键名称 */ private ListpkNames; /** * sql语句 */ private String sql; private String sqlType; /** * 表名 */ private String table; private Long ts; /** * (新增)INSERT、(更新)UPDATE、(删除)DELETE等等 */ private String type; }
以上就是本文的全部内容了,是否有顺利帮助你解决问题?若是能给你带来学习上的帮助,请大家多多支持golang学习网!更多关于数据库的相关知识,也可关注golang学习网公众号。
-
499 收藏
-
244 收藏
-
235 收藏
-
157 收藏
-
101 收藏
-
475 收藏
-
266 收藏
-
273 收藏
-
283 收藏
-
210 收藏
-
371 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 立即学习 542次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 立即学习 507次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 立即学习 497次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 立即学习 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 立即学习 484次学习