登录
首页 >  数据库 >  MySQL

使用canal+Kafka进行数据库同步实践

来源:SegmentFault

时间:2023-02-16 15:12:42 255浏览 收藏

在数据库实战开发的过程中,我们经常会遇到一些这样那样的问题,然后要卡好半天,等问题解决了才发现原来一些细节知识点还是没有掌握好。今天golang学习网就整理分享《使用canal+Kafka进行数据库同步实践》,聊聊MySQL、微服务、数据库、kubernetes,希望可以帮助到正在努力赚钱的你。

在微服务拆分的架构中,各服务拥有自己的数据库,所以常常会遇到服务之间数据通信的问题。比如,B服务数据库的数据来源于A服务的数据库;A服务的数据有变更操作时,需要同步到B服务中。

第一种解决方案:

在代码逻辑中,有相关A服务数据写操作时,以调用接口的方式,调用B服务接口,B服务再将数据写到新的数据库中。这种方式看似简单,但其实“坑”很多。在A服务代码逻辑中会增加大量这种调用接口同步的代码,增加了项目代码的复杂度,以后会越来越难维护。并且,接口调用的方式并不是一个稳定的方式,没有重试机制,没有同步位置记录,接口调用失败了怎么处理,突然的大量接口调用会产生的问题等,这些都要考虑并且在业务中处理。这里会有不少工作量。想到这里,就将这个方案排除了。

第二种解决方案:

通过数据库的binlog进行同步。这种解决方案,与A服务是独立的,不会和A服务有代码上的耦合。可以直接TCP连接进行传输数据,优于接口调用的方式。 这是一套成熟的生产解决方案,也有不少binlog同步的中间件工具,所以我们关注的就是哪个工具能够更好的构建稳定、性能满足且易于高可用部署的方案。

经过调研,我们选择了

func Handler(entry protocol.Entry)  {
    var keys []string
    rowChange := &protocol.RowChange{}
    proto.Unmarshal(entry.GetStoreValue(), rowChange)
    if rowChange != nil {
        eventType := rowChange.GetEventType()
        for _, rowData := range rowChange.GetRowDatas() { // 遍历每一行数据             if eventType == protocol.EventType_DELETE || eventType == protocol.EventType_UPDATE {
                 columns := rowData.GetBeforeColumns() // 得到更改前的所有字段属性             } else if eventType == protocol.EventType_INSERT {
                 columns := rowData.GetAfterColumns() // 得到更后前的所有字段属性             }
            ......
        }
    }
} 

遇到的问题

为了高可用和更高的性能,我们会创建多个

# ...
# 可选项: tcp(默认), kafka, RocketMQ
canal.serverMode = kafka
# ...
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
canal.mq.servers = 127.0.0.1:9002
canal.mq.retries = 0
# flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# flatMessage模式下请将该值改大, 建议50-200
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象
canal.mq.flatMessage = false
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投递是否使用事务
canal.mq.transaction = false

# mq config
canal.mq.topic=default
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2..*,.*..*
canal.mq.dynamicTopic=mydatabase.mytable
canal.mq.partition=0
# hash partition config
canal.mq.partitionsNum=3
canal.mq.partitionHash=mydatabase.mytable

具体见:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

解决顺序消费问题

看到下面这一行配置

canal.mq.partitionHash=mydatabase.mytable

我们配置了kafka的partitionHash,并且我们一个Topic就是一个表。这样的效果就是,一个表的数据只会推到一个固定的partition中,然后再推给consumer进行消费处理,同步到新的数据库。通过这种方式,解决了之前碰到的binlog日志顺序处理的问题。这样即使我们部署了多个kafka consumer端,构成一个集群,这样consumer从一个partition消费消息,就是消费处理同一个表的数据。这样对于一个表来说,牺牲掉了并行处理,不过个人觉得,凭借kafka的性能强大的处理架构,我们的业务在kafka这个节点产生瓶颈并不容易。并且我们的业务目的不是实时一致性,在一定延迟下,两个数据库保证最终一致性。

下图是最终的同步架构,我们在每一个服务节点都实现了集群化。全都跑在UCloud的UK8s服务上,保证了服务节点的高可用性。

canal也是集群换,但是某一时刻只会有一台canal在处理binlog,其他都是冗余服务。当这台canal服务挂了,其中一台冗余服务就会切换到工作状态。同样的,也是因为要保证binlog的顺序读取,所以只能有一台canal在工作。

并且,我们还用这套架构进行缓存失效的同步。我们使用的缓存模式是:

Cache-Aside
。同样的,如果在代码中数据更改的地方进行缓存失效操作,会将代码变得复杂。所以,在上述架构的基础上,将复杂的触发缓存失效的逻辑放到
kafka-client
端统一处理,达到一定解耦的目的。

目前这套同步架构正常运行中,后续有遇到问题再继续更新。

更多内容,欢迎点击下方作者主页进行交流~

本文作者:UCloud应用研发工程师 Cary

今天关于《使用canal+Kafka进行数据库同步实践》的内容介绍就到此结束,如果有什么疑问或者建议,可以在golang学习网公众号下多多回复交流;文中若有不正之处,也希望回复留言以告知!

声明:本文转载于:SegmentFault 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>