登录
首页 >  数据库 >  MySQL

聊聊rocketmq-mysql的Replicator

来源:SegmentFault

时间:2023-01-17 07:59:53 257浏览 收藏

小伙伴们对数据库编程感兴趣吗?是否正在学习相关知识点?如果是,那么本文《聊聊rocketmq-mysql的Replicator》,就很适合你,本篇文章讲解的知识点主要包括MySQL。在之后的文章中也会多多分享相关知识点,希望对大家的知识积累有所帮助!

本文主要研究一下rocketmq-mysql的Replicator

Replicator

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java

public class Replicator {

    private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class);

    private static final Logger POSITION_LOGGER = LoggerFactory.getLogger("PositionLogger");

    private Config config;

    private EventProcessor eventProcessor;

    private RocketMQProducer rocketMQProducer;

    private Object lock = new Object();
    private BinlogPosition nextBinlogPosition;
    private long nextQueueOffset;
    private long xid;

    public static void main(String[] args) {

        Replicator replicator = new Replicator();
        replicator.start();
    }

    public void start() {

        try {
            config = new Config();
            config.load();

            rocketMQProducer = new RocketMQProducer(config);
            rocketMQProducer.start();

            BinlogPositionLogThread binlogPositionLogThread = new BinlogPositionLogThread(this);
            binlogPositionLogThread.start();

            eventProcessor = new EventProcessor(this);
            eventProcessor.start();

        } catch (Exception e) {
            LOGGER.error("Start error.", e);
            System.exit(1);
        }
    }

    public void commit(Transaction transaction, boolean isComplete) {

        String json = transaction.toJson();

        for (int i = 0; i 
  • Replicator提供了start、commit、logPosition方法;start方法会创建RocketMQProducer、BinlogPositionLogThread及EventProcessor,然后执行其start方法;commit方法会通过rocketMQProducer将transaction.toJson()发送出去,对于isComplete为true的会更新xid、nextBinlogPosition、nextQueueOffset;logPosition方法会打印binlogFilename、nextPosition、nextOffset

RocketMQProducer

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java

public class RocketMQProducer {
    private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQProducer.class);

    private DefaultMQProducer producer;
    private Config config;

    public RocketMQProducer(Config config) {
        this.config = config;
    }

    public void start() throws MQClientException {
        producer = new DefaultMQProducer("BINLOG_PRODUCER_GROUP");
        producer.setNamesrvAddr(config.mqNamesrvAddr);
        producer.start();
    }

    public long push(String json) throws Exception {
        LOGGER.debug(json);

        Message message = new Message(config.mqTopic, json.getBytes("UTF-8"));
        SendResult sendResult = producer.send(message);

        return sendResult.getQueueOffset();
    }
}
  • RocketMQProducer的start方法创建DefaultMQProducer并执行其start方法;其push方法则通过producer.send(message)发送消息,并返回sendResult.getQueueOffset()

BinlogPositionLogThread

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.java

public class BinlogPositionLogThread extends Thread {
    private Logger logger = LoggerFactory.getLogger(BinlogPositionLogThread.class);

    private Replicator replicator;

    public BinlogPositionLogThread(Replicator replicator) {
        this.replicator = replicator;
        setDaemon(true);
    }

    @Override
    public void run() {

        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                logger.error("Offset thread interrupted.", e);
            }

            replicator.logPosition();
        }
    }
}
  • BinlogPositionLogThread会定时执行replicator.logPosition()来打印position信息

小结

Replicator提供了start、commit、logPosition方法;start方法会创建RocketMQProducer、BinlogPositionLogThread及EventProcessor,然后执行其start方法;commit方法会通过rocketMQProducer将transaction.toJson()发送出去,对于isComplete为true的会更新xid、nextBinlogPosition、nextQueueOffset;logPosition方法会打印binlogFilename、nextPosition、nextOffset

doc

以上就是《聊聊rocketmq-mysql的Replicator》的详细内容,更多关于mysql的资料请关注golang学习网公众号!

声明:本文转载于:SegmentFault 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>
评论列表