登录
首页 >  数据库 >  MySQL

canal动态监控Mysql,将binlog日志解析后,把采集到的数据发送到Kafka

来源:SegmentFault

时间:2023-01-25 21:37:45 312浏览 收藏

在IT行业这个发展更新速度很快的行业,只有不停止的学习,才不会被行业所淘汰。如果你是数据库学习者,那么本文《canal动态监控Mysql,将binlog日志解析后,把采集到的数据发送到Kafka》就很适合你!本篇内容主要包括canal动态监控Mysql,将binlog日志解析后,把采集到的数据发送到Kafka,希望对大家的知识积累有所帮助,助力实战开发!

生产者要将发送的数据转化为字节数组才能通过网络发动给Kafka,对于一些简单的数据,Kafka自带了一些序列化工具。

//创建生产者实例
private static Producer createProducer(){
    Properties properties = new Properties();

    properties.put("metadata.broker.list" , GlobalConfigUtil.kafkaBootstrap);
    properties.put("zookeeper.connect" , GlobalConfigUtil.kafkaZookeeper);
    properties.put("serializer.class" , StringEncoder.class.getName());
    
    return new Producer(new ProducerConfig(properties));
}

在通常的微服务中,服务之间需要频繁的传递各种负责的数据结构,但是kafka仅仅支持简单的类型如String,Integer。于是我们在服务之间使用JSONObject,因为JSON可以很容易的转化为String,而String的序列化和反序列化已经被支持。

JSONObject jsonObject = new JSONObject();
jsonObject.put("logFileName", logFileName);
jsonObject.put("logFileOffset", logFileOffset);
jsonObject.put("dbName", dbName);
jsonObject.put("tableName", tableName);
jsonObject.put("eventType", eventType);
jsonObject.put("columnValueList", columnValueList);
jsonObject.put("emptyCount", emptyCount);
jsonObject.put("timestamp", timestamp);

//拼接所有binlog解析的字段
String data = JSON.toJSONString(jsonObject);

// 解析后的数据发送到kafka
KafkaSender.sendMessage(GlobalConfigUtil.kafkaInput, JSON.toJSONString(key), data);

ResourceBundle类是用来读取propertise资源文件的,可以在初始化时把配置项全部一次读入,并保存在静态成员变量中。避免每次需要的时候才去读取相关配置文件的class,I/O速度慢,容易造成性能上的瓶颈。

//读取application.properties文件
private static ResourceBundle resourceBundle = ResourceBundle.getBundle("application");

public static String canalHost= resourceBundle.getString("canal.host");
public static String canalPort = resourceBundle.getString("canal.port");
public static String canalInstance = resourceBundle.getString("canal.instance");
public static String mysqlUsername = resourceBundle.getString("mysql.username");
public static String mysqlPassword=  resourceBundle.getString("mysql.password");
public static String kafkaBootstrap= resourceBundle.getString("kafka.bootstrap.servers");
public static String kafkaZookeeper= resourceBundle.getString("kafka.zookeeper.connect");
public static String kafkaInput = resourceBundle.getString("kafka.input.topic");

完整代码

#pom文件
com.alibaba.otterSimpleMysqlCanalKafkaSample

好了,本文到此结束,带大家了解了《canal动态监控Mysql,将binlog日志解析后,把采集到的数据发送到Kafka》,希望本文对你有所帮助!关注golang学习网公众号,给大家分享更多数据库知识!

声明:本文转载于:SegmentFault 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>