canal动态监控Mysql,将binlog日志解析后,把采集到的数据发送到Kafka
来源:SegmentFault
时间:2023-01-25 21:37:45 312浏览 收藏
在IT行业这个发展更新速度很快的行业,只有不停止的学习,才不会被行业所淘汰。如果你是数据库学习者,那么本文《canal动态监控Mysql,将binlog日志解析后,把采集到的数据发送到Kafka》就很适合你!本篇内容主要包括canal动态监控Mysql,将binlog日志解析后,把采集到的数据发送到Kafka,希望对大家的知识积累有所帮助,助力实战开发!
生产者要将发送的数据转化为字节数组才能通过网络发动给Kafka,对于一些简单的数据,Kafka自带了一些序列化工具。
//创建生产者实例 private static ProducercreateProducer(){ Properties properties = new Properties(); properties.put("metadata.broker.list" , GlobalConfigUtil.kafkaBootstrap); properties.put("zookeeper.connect" , GlobalConfigUtil.kafkaZookeeper); properties.put("serializer.class" , StringEncoder.class.getName()); return new Producer (new ProducerConfig(properties)); }
在通常的微服务中,服务之间需要频繁的传递各种负责的数据结构,但是kafka仅仅支持简单的类型如String,Integer。于是我们在服务之间使用JSONObject,因为JSON可以很容易的转化为String,而String的序列化和反序列化已经被支持。
JSONObject jsonObject = new JSONObject(); jsonObject.put("logFileName", logFileName); jsonObject.put("logFileOffset", logFileOffset); jsonObject.put("dbName", dbName); jsonObject.put("tableName", tableName); jsonObject.put("eventType", eventType); jsonObject.put("columnValueList", columnValueList); jsonObject.put("emptyCount", emptyCount); jsonObject.put("timestamp", timestamp); //拼接所有binlog解析的字段 String data = JSON.toJSONString(jsonObject); // 解析后的数据发送到kafka KafkaSender.sendMessage(GlobalConfigUtil.kafkaInput, JSON.toJSONString(key), data);
ResourceBundle类是用来读取propertise资源文件的,可以在初始化时把配置项全部一次读入,并保存在静态成员变量中。避免每次需要的时候才去读取相关配置文件的class,I/O速度慢,容易造成性能上的瓶颈。
//读取application.properties文件 private static ResourceBundle resourceBundle = ResourceBundle.getBundle("application"); public static String canalHost= resourceBundle.getString("canal.host"); public static String canalPort = resourceBundle.getString("canal.port"); public static String canalInstance = resourceBundle.getString("canal.instance"); public static String mysqlUsername = resourceBundle.getString("mysql.username"); public static String mysqlPassword= resourceBundle.getString("mysql.password"); public static String kafkaBootstrap= resourceBundle.getString("kafka.bootstrap.servers"); public static String kafkaZookeeper= resourceBundle.getString("kafka.zookeeper.connect"); public static String kafkaInput = resourceBundle.getString("kafka.input.topic");
完整代码
#pom文件com.alibaba.otter SimpleMysqlCanalKafkaSample好了,本文到此结束,带大家了解了《canal动态监控Mysql,将binlog日志解析后,把采集到的数据发送到Kafka》,希望本文对你有所帮助!关注golang学习网公众号,给大家分享更多数据库知识!
声明:本文转载于:SegmentFault 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
-
499 收藏
-
244 收藏
-
235 收藏
-
157 收藏
-
101 收藏
最新阅读
更多>
-
475 收藏
-
266 收藏
-
273 收藏
-
283 收藏
-
210 收藏
-
371 收藏
课程推荐
更多>
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 立即学习 542次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 立即学习 507次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 立即学习 497次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 立即学习 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 立即学习 484次学习