canal动态监控Mysql,将binlog日志解析后,把采集到的数据发送到Kafka
来源:SegmentFault
时间:2023-01-25 21:37:45 312浏览 收藏
在IT行业这个发展更新速度很快的行业,只有不停止的学习,才不会被行业所淘汰。如果你是数据库学习者,那么本文《canal动态监控Mysql,将binlog日志解析后,把采集到的数据发送到Kafka》就很适合你!本篇内容主要包括canal动态监控Mysql,将binlog日志解析后,把采集到的数据发送到Kafka,希望对大家的知识积累有所帮助,助力实战开发!
生产者要将发送的数据转化为字节数组才能通过网络发动给Kafka,对于一些简单的数据,Kafka自带了一些序列化工具。
//创建生产者实例 private static ProducercreateProducer(){ Properties properties = new Properties(); properties.put("metadata.broker.list" , GlobalConfigUtil.kafkaBootstrap); properties.put("zookeeper.connect" , GlobalConfigUtil.kafkaZookeeper); properties.put("serializer.class" , StringEncoder.class.getName()); return new Producer (new ProducerConfig(properties)); }
在通常的微服务中,服务之间需要频繁的传递各种负责的数据结构,但是kafka仅仅支持简单的类型如String,Integer。于是我们在服务之间使用JSONObject,因为JSON可以很容易的转化为String,而String的序列化和反序列化已经被支持。
JSONObject jsonObject = new JSONObject();
jsonObject.put("logFileName", logFileName);
jsonObject.put("logFileOffset", logFileOffset);
jsonObject.put("dbName", dbName);
jsonObject.put("tableName", tableName);
jsonObject.put("eventType", eventType);
jsonObject.put("columnValueList", columnValueList);
jsonObject.put("emptyCount", emptyCount);
jsonObject.put("timestamp", timestamp);
//拼接所有binlog解析的字段
String data = JSON.toJSONString(jsonObject);
// 解析后的数据发送到kafka
KafkaSender.sendMessage(GlobalConfigUtil.kafkaInput, JSON.toJSONString(key), data);ResourceBundle类是用来读取propertise资源文件的,可以在初始化时把配置项全部一次读入,并保存在静态成员变量中。避免每次需要的时候才去读取相关配置文件的class,I/O速度慢,容易造成性能上的瓶颈。
//读取application.properties文件
private static ResourceBundle resourceBundle = ResourceBundle.getBundle("application");
public static String canalHost= resourceBundle.getString("canal.host");
public static String canalPort = resourceBundle.getString("canal.port");
public static String canalInstance = resourceBundle.getString("canal.instance");
public static String mysqlUsername = resourceBundle.getString("mysql.username");
public static String mysqlPassword= resourceBundle.getString("mysql.password");
public static String kafkaBootstrap= resourceBundle.getString("kafka.bootstrap.servers");
public static String kafkaZookeeper= resourceBundle.getString("kafka.zookeeper.connect");
public static String kafkaInput = resourceBundle.getString("kafka.input.topic");完整代码
#pom文件com.alibaba.otter canal.client 1.0.24 org.apache.kafka kafka_2.11 0.9.0.1 org.slf4j slf4j-log4j12 com.alibaba fastjson 1.2.44
import java.util.Locale;
import java.util.ResourceBundle;
/**
* 配置文件的公共类
*/
public class GlobalConfigUtil {
//读取application.properties文件
private static ResourceBundle resourceBundle = ResourceBundle.getBundle("application");
public static String canalHost= resourceBundle.getString("canal.host");
public static String canalPort = resourceBundle.getString("canal.port");
public static String canalInstance = resourceBundle.getString("canal.instance");
public static String mysqlUsername = resourceBundle.getString("mysql.username");
public static String mysqlPassword= resourceBundle.getString("mysql.password");
public static String kafkaBootstrap= resourceBundle.getString("kafka.bootstrap.servers");
public static String kafkaZookeeper= resourceBundle.getString("kafka.zookeeper.connect");
public static String kafkaInput = resourceBundle.getString("kafka.input.topic");
public static void main(String[] args) {
System.out.println(canalHost);
}
}import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;
import java.util.Properties;
/**
* Kafka生产消息工具类
*/
public class KafkaSender {
private String topic;
public KafkaSender(String topic){
super();
this.topic = topic;
}
/**
* 发送消息到Kafka指定topic
* * @param topic topic名字
* @param key 键值
* @param data 数据
*/
public static void sendMessage(String topic , String key , String data){
Producer producer = createProducer();
producer.send(new KeyedMessage(topic , key , data));
}
/**
* 创建生产者实例
* @return
*/
private static Producer createProducer(){
Properties properties = new Properties();
properties.put("metadata.broker.list" , GlobalConfigUtil.kafkaBootstrap);
properties.put("zookeeper.connect" , GlobalConfigUtil.kafkaZookeeper);
properties.put("serializer.class" , StringEncoder.class.getName());
return new Producer(new ProducerConfig(properties));
}
} import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
/**
* Canal解析binlog日志工具类
*/
public class CanalClient {
static class ColumnValuePair {
private String columnName;
private String columnValue;
private Boolean isValid;
public ColumnValuePair(String columnName, String columnValue, Boolean isValid) {
this.columnName = columnName;
this.columnValue = columnValue;
this.isValid = isValid;
}
public String getColumnName() { return columnName; }
public void setColumnName(String columnName) { this.columnName = columnName; }
public String getColumnValue() { return columnValue; }
public void setColumnValue(String columnValue) { this.columnValue = columnValue; }
public Boolean getIsValid() { return isValid; }
public void setIsValid(Boolean isValid) { this.isValid = isValid; }
}
/**
* 获取Canal连接
*
* @param host 主机名
* @param port 端口号
* @param instance Canal实例名
* @param username 用户名
* @param password 密码
* @return Canal连接器
*/
public static CanalConnector getConn(String host, int port, String instance, String username, String password) {
CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password);
return canalConnector;
}
/**
* 解析Binlog日志
*
* @param entries Binlog消息实体
* @param emptyCount 操作的序号
*/
public static void analysis(List entries, int emptyCount) {
for (CanalEntry.Entry entry : entries) {
// 只解析mysql事务的操作,其他的不解析
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
// 那么解析binlog
CanalEntry.RowChange rowChange = null;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
e.printStackTrace();
}
// 获取操作类型字段(增加 删除 修改)
CanalEntry.EventType eventType = rowChange.getEventType();
// 获取binlog文件名称
String logfileName = entry.getHeader().getLogfileName();
// 读取当前操作在binlog文件的位置
long logfileOffset = entry.getHeader().getLogfileOffset();
// 获取当前操作所属的数据库
String dbName = entry.getHeader().getSchemaName();
// 获取当前操作所属的表
String tableName = entry.getHeader().getTableName();//当前操作的是哪一张表
long timestamp = entry.getHeader().getExecuteTime();//执行时间
// 解析操作的行数据
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
// 删除操作
if (eventType == CanalEntry.EventType.DELETE) {
// 获取删除之前的所有列数据
dataDetails(rowData.getBeforeColumnsList(), logfileName, logfileOffset, dbName, tableName, eventType, emptyCount,timestamp);
}
// 新增操作
else if (eventType == CanalEntry.EventType.INSERT) {
// 获取新增之后的所有列数据
dataDetails(rowData.getAfterColumnsList(), logfileName, logfileOffset, dbName, tableName, eventType, emptyCount,timestamp);
}
// 更新操作
else {
// 获取更新之后的所有列数据
dataDetails(rowData.getAfterColumnsList(), logfileName, logfileOffset, dbName, tableName, eventType, emptyCount,timestamp);
}
}
}
}
/**
* 解析具体一条Binlog消息的数据
*
* @param columns 当前行所有的列数据
* @param logFileName binlog文件名
* @param logFileOffset 当前操作在binlog中的位置
* @param dbName 当前操作所属数据库名称
* @param tableName 当前操作所属表名称
* @param eventType 当前操作类型(新增、修改、删除)
* @param emptyCount 操作的序号
*/
private static void dataDetails(List columns,
String logFileName,
Long logFileOffset,
String dbName,
String tableName,
CanalEntry.EventType eventType,
int emptyCount,
long timestamp) {
// 找到当前那些列发生了改变 以及改变的值
List columnValueList = new ArrayList();
for (CanalEntry.Column column : columns) {
ColumnValuePair columnValuePair = new ColumnValuePair(column.getName(), column.getValue(), column.getUpdated());
columnValueList.add(columnValuePair);
}
String key = UUID.randomUUID().toString();
JSONObject jsonObject = new JSONObject();
// jsonObject.put("logFileName", logFileName);
// jsonObject.put("logFileOffset", logFileOffset);
jsonObject.put("dbName", dbName);
jsonObject.put("tableName", tableName);
jsonObject.put("eventType", eventType);
jsonObject.put("columnValueList", columnValueList);
// jsonObject.put("emptyCount", emptyCount);
// jsonObject.put("timestamp", timestamp);
// 拼接所有binlog解析的字段
String data = JSON.toJSONString(jsonObject);
System.out.println("【JSON】" + data);
// 解析后的数据发送到kafka
KafkaSender.sendMessage(GlobalConfigUtil.kafkaInput, JSON.toJSONString(key), data);
}
/**
* 客户端入口方法
* @param args
*/
public static void main(String[] args) {
// 加载配置文件
String host = GlobalConfigUtil.canalHost;
int port = Integer.parseInt(GlobalConfigUtil.canalPort);
String instance = GlobalConfigUtil.canalInstance;
String username = GlobalConfigUtil.mysqlUsername;
String password = GlobalConfigUtil.mysqlPassword;
// 获取Canal连接
CanalConnector conn = getConn(host, port, instance, username, password);
// 从binlog中读取数据
int batchSize = 100;
int emptyCount = 1;
try {
conn.connect();
conn.subscribe(".*..*");
conn.rollback();
int totalCount = 120; //循环次数
while (emptyCount #application.properties, 以下请更改为自已的数据库信息 canal.host=xxx.xx.xxx.xxx canal.port=11111 canal.instance=example mysql.username=root mysql.password=xxxxxx kafka.bootstrap.servers = xxx.xx.xxx.xxx:9092 kafka.zookeeper.connect = xxx.xx.xxx.xxx:2182 kafka.input.topic=test
具体代码请移步:SimpleMysqlCanalKafkaSample
好了,本文到此结束,带大家了解了《canal动态监控Mysql,将binlog日志解析后,把采集到的数据发送到Kafka》,希望本文对你有所帮助!关注golang学习网公众号,给大家分享更多数据库知识!
声明:本文转载于:SegmentFault 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
-
499 收藏
-
244 收藏
-
235 收藏
-
157 收藏
-
101 收藏
最新阅读
更多>
课程推荐
更多>
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 立即学习 543次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 立即学习 516次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 立即学习 500次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 立即学习 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 立即学习 485次学习