登录
首页 >  数据库 >  MySQL

TiKV 源码解析系列——multi-raft 设计与实现

来源:SegmentFault

时间:2023-01-24 12:04:06 108浏览 收藏

本篇文章给大家分享《TiKV 源码解析系列——multi-raft 设计与实现》,覆盖了数据库的常见基础知识,其实一个语言的全部知识点一篇文章是不可能说完的,但希望通过这些问题,让读者对自己的掌握程度有一定的认识(B 数),从而弥补自己的不足,更好的掌握它。

本系列文章主要面向 TiKV 社区开发者,重点介绍 TiKV 的系统架构,源码结构,流程解析。目的是使得开发者阅读之后,能对 TiKV 项目有一个初步了解,更好的参与进入 TiKV 的开发中。
需要注意,TiKV 使用 Rust 语言编写,用户需要对 Rust 语言有一个大概的了解。另外,本系列文章并不会涉及到 TiKV 中心控制服务 Placement Driver(PD) 的详细介绍,但是会说明一些重要流程 TiKV 是如何与 PD 交互的。
TiKV 是一个分布式的 KV 系统,它采用 Raft 协议保证数据的强一致性,同时使用 MVCC + 2PC 的方式实现了分布式事务的支持。
本文为本系列文章第二节。

Placement Driver

在继续之前,我们先简单介绍一下 Placement Driver(PD)。PD 是 TiKV 的全局中央控制器,存储整个 TiKV 集群的元数据信息,负责整个 TiKV 集群的调度,全局 ID 的生成,以及全局 TSO 授时等。

PD 是一个非常重要的中心节点,它通过集成 etcd,自动的支持了分布式扩展以及 failover,解决了单点故障问题。关于 PD 的详细介绍,后续我们会新开一篇文章说明。

在 TiKV 里面,跟 PD 的交互是放在源码的 pd 目录下,现在跟 PD 的交互都是通过自己定义的 RPC 实现,协议非常简单,在 pd/mod.rs 里面我们直接提供了用于跟 PD 进行交互的 Client trait,以及实现了 RPC Client。

PD 的 Client trait 非常简单,多数都是对集群元信息的 set/get 操作,需要额外注意的几个:

bootstrap_cluster:当我们启动一个 TiKV 服务的时候,首先需要通过 is_cluster_bootstrapped 来判断整个 TiKV 集群是否已经初始化,如果还没有初始化,我们就会在该 TiKV 服务上面创建第一个 region。

region_heartbeat:定期 Region 向 PD 汇报自己的相关信息,供 PD 做后续的调度。譬如,如果一个 Region 给 PD 上报的 peers 的数量小于预设的副本数,那么 PD 就会给这个 Region 添加一个新的副本 Peer。

store_heartbeat:定期 store 向 PD 汇报自己的相关信息,供 PD 做后续调度。譬如,Store 会告诉 PD 当前的磁盘大小,以及剩余空间,如果 PD 发现空间不够了,就不会考虑将其他的 Peer 迁移到这个 Store 上面。

ask_split/report_split:当 Region 发现自己需要 split 的时候,就 ask_split 告诉 PD,PD 会生成新分裂 Region 的 ID ,当 Region 分裂成功之后,会 report_split 通知 PD。

注意,后面我们会让 PD 支持 gRPC 协议,所以 Client API 到时候可能会有变更。

Raftstore

因为 TiKV 目标是支持 100 TB+ 以上的数据,一个 Raft 集群是铁定没法支持这么多数据的,所以我们需要使用多个 Raft 集群,也就是 Multi Raft。在 TiKV 里面,Multi Raft 的实现是在 Raftstore 完成的,代码在 raftstore/store 目录。

Region

因为我们要支持 Multi Raft,所以我们需要将数据进行分片处理,让每个 Raft 单独负责一部分数据。

通常的数据分片算法就是 Hash 和 Range,TiKV 使用的 Range 来对数据进行数据分片。为什么使用 Range,主要原因是能更好的将相同前缀的 key 聚合在一起,便于 scan 等操作,这个 Hash 是没法支持的,当然,在 split/merge 上面 Range 也比 Hash 好处理很多,很多时候只会涉及到元信息的修改,都不用大范围的挪动数据。

当然,Range 有一个问题在于很有可能某一个 Region 会因为频繁的操作成为性能热点,当然也有一些优化的方式,譬如通过 PD 将这些 Region 调度到更好的机器上面,提供 Follower 分担读压力等。

总之,在 TiKV 里面,我们使用 Range 来对数据进行切分,将其分成一个一个的 Raft Group,每一个 Raft Group,我们使用 Region 来表示。

Region 的 protobuf 协议定义如下:

message Region {
    optional uint64 id                  = 1 [(gogoproto.nullable) = false];
    optional bytes  start_key           = 2;
    optional bytes  end_key             = 3;
    optional RegionEpoch region_epoch   = 4;
    repeated Peer   peers               = 5;
}

message RegionEpoch {
    optional uint64 conf_ver    = 1 [(gogoproto.nullable) = false];
    optional uint64 version     = 2 [(gogoproto.nullable) = false];
}

message Peer {      
    optional uint64 id          = 1 [(gogoproto.nullable) = false]; 
    optional uint64 store_id    = 2 [(gogoproto.nullable) = false];
}

id:Region 的唯一表示,通过 PD 全局唯一分配。

start_key, end_key:用来表示这个 Region 的范围 [start_key, end_key),对于最开始的 region,start 和 end key 都是空,TiKV 内部会特殊处理。

region_epoch:当一个 Region 添加或者删除 Peer,或者 split 等,我们就会认为这个 Region 的 epoch 发生的变化,RegionEpoch 的 conf_ver 会在每次做 ConfChange 的时候递增,而 version 则是会在每次做 split/merge 的时候递增。

peers:当前 Region 包含的节点信息。对于一个 Raft Group,我们通常有三个副本,每个副本我们使用 Peer 来表示,Peer 的 id 也是全局由 PD 分配,而 store_id 则表明这个 Peer 在哪一个 Store 上面。

RocksDB / Keys Prefix

对于实际数据存储,无论是 Raft Meta,Log,还是 State Machine 的 data,我们都存到一个 RocksDB 实例里面。关于 RocksDB,可以详细参考 facebook/rocksdb

我们使用不同的前缀来对 Raft 以及 State Machine 等数据进行区分,具体可以参考 raftstore/store/keys.rs,对于 State Machine 实际的 data 数据,我们统一添加 ‘z’ 前缀。而对于其他会存在本地的元数据(包括 Raft),我们统一添加 0x01 前缀。

这里简单说明一下一些重要元数据的 Key 格式,我们忽略最开始的 0x01 前缀。

  • 0x01:用于存放StoreIdent,在初始化这个 Store 的时候,我们会将 Store 的 Cluster ID,Store ID 等信息存储到这个 key 里面。

  • 0x02:用来存储 Raft 一些信息,0x02 之后会紧跟该 Raft Region 的 ID(8字节大端序 ),然后在紧跟一个 Suffix 来标识不同的子类型:

    • 0x01:用于存放 Raft Log,后面紧跟 Log Index(8字节大端序)

    • 0x02:用于存放 RaftLocalState

    • 0x03:用于存放 RaftApplyState

  • 0x03:用来存储 Region 本地的一些元信息,0x03 之后紧跟 Raft Region ID,随后在紧跟一个 Suffix 来表示不同的子类型:

    • 0x01:用于存放 RegionLocalState

对于上面提到的几个类型,都在 protobuf 里面定义:

message RaftLocalState {
    optional eraftpb.HardState hard_state        = 1;
    optional uint64 last_index                  = 2;
}

message RaftApplyState {
    optional uint64 applied_index               = 1;
    optional RaftTruncatedState truncated_state = 2;
}

enum PeerState {
    Normal       = 0;
    Applying     = 1;
    Tombstone    = 2;
}

message RegionLocalState {
    optional PeerState state        = 1;
    optional metapb.Region region   = 2;
}

RaftLocalState: 用于存放当前 Raft 的 HardState 以及最后一个 Log index。

RaftApplyState: 用于存放当前 Raft 最后 apply 的 Log index 以及被 truncated 的 Log 信息。

RegionLocalStaste: 用于存放 Region 信息以及在该 Store 上面对应的 Peer 状态,Normal 表明是一个正常的 Peer,Applying 表明该 Peer 还没做完 apply snapshot 的操作,而 Tombstone 则表明该 Peer 已经被移除出了 Region,不能在参与到 Raft Group 里面。

Peer Storage

前面已经知道,我们通过 RawNode 来使用 Raft。因为一个 Region 对应的一个 Raft Group,Region 里面的 Peer 就对应的是一个 Raft 副本。所以,我们在 Peer 里面封装了对 RawNode 的操作。

要使用 Raft,我们需要定义自己的 Storage,这在 raftstore/store/peer_storage.rs 的 PeerStorage 类里面实现。

当创建 PeerStorage 的时候,首先我们会从 RocksDB 里面得到该 Peer 之前的 RaftLocalState,RaftApplyState,以及 last_term 等,这些会缓存到内存里面,便于后续的快速度访问。

PeerStorage 需要注意几个地方:

首先就是 RAFT_INIT_LOG_TERM 和 RAFT_INIT_LOG_INDEX,它们的值都是 5(只要大于 1 都可以)。在 TiKV 里面,一个 Peer 的创建有如下几种方式:

  1. 主动创建,通常对于第一个 Region 的第一个副本 Peer,我们采用这样的创建方式,初始化的时候,我们会将它的 Log Term 和 Index 设置为 5。

  2. 被动创建,当一个 Region 添加一个副本 Peer 的时候,当这个 ConfChange 命令被 applied 之后, Leader 会给这个新增 Peer 所在的 Store 发送 Message,Store 收到这个 Message 之后,发现并没有相应的 Peer 存在,并且确定这个 Message 是合法的,就会创建一个对应的 Peer,但此时这个 Peer 是一个未初始化的 Peer,不知道所在的 Region 任何的信息,我们使用 0 来初始化它的 Log Term 和 Index。Leader 就能知道这个 Follower 并没有数据(0 到 5 之间存在 Log 缺口),Leader 就会给这个 Follower 直接发送 snapshot。

  3. Split 创建,当一个 Region 分裂成两个 Region,其中一个 Region 会继承分裂之前 Region 的元信息,只是会将自己的 Range 范围修改。而另一个 Region 相关的元信息,则会新建,新建的这个 Region 对应的 Peer,初始的 Log Term 和 Index 也是 5,因为这时候 Leader 和 Follower 都有最新的数据,不需要 snapshot。(注意:实际 Split 的情况非常的复杂,有可能也会出现发送 snapshot 的情况,但这里不做过多说明)。

然后就是需要注意 snapshot 的处理。无论 generate 还是 apply snapshot,都是一件比较费时的操作,为了不让 snapshot 的处理卡主整个 Raft 线程,PeerStore 都是会先只同步更新 snapshot 相关的元信息,这样就不用阻碍后续的 Raft 流程,然后会在另一个线程异步的进行 snapshot 的操作。PeerStorage 会维护一个 snapshot 的 state,如下:

pub enum SnapState {
    Relax,
    Generating(Receiver),
    Applying(Arcmio 驱动整个流程(后续我们会使用 tokio-core 来简化异步逻辑处理)。

我们在 mio 里面注册一个 base Raft Tick,每隔 100ms,调用一次,Store 会遍历所有的 Peer,一次调用对应的 RawNode tick 函数,驱动 Raft。

Store 通过 mio 的 notify 机制,接受外面 Client 的请求处理,以及其他 Store 发过来的 Raft message。 譬如收到 Msg::RaftCmd 消息之后,Store 就会调用 propose_raft_command 来处理,而收到 Msg::RaftMessage 消息之后,Store 就会调用 on_raft_message 来处理。

在每次 EventLoop 循环的最后,也就是 mio 的 tick 回调里面,Store 会进行 on_raft_ready 的处理:

  1. Store 会遍历所有的 ready Peers,调用 handle_raft_ready_append,我们会使用一个 WriteBatch 来处理所有的 ready append 数据,同时保存相关的结果。

  2. 如果 WriteBatch 成功,会依次调用 post_raft_ready_append,这里主要用来处理Follower 的消息发送(Leader 的消息已经在 handle_raft_ready_append 里面完成)。

  3. 然后,Store 会依次调用 handle_raft_ready_apply,apply 相关 committed entries,然后调用 on_ready_result 处理最后的结果。

Server

Server 层就是 TiKV 的网络层,现阶段,TiKV 使用 mio 来实现整个网络的处理,而网络协议则是使用自定义的,如下:

message = header + body 
header:  | 0xdaf4(2 bytes magic value) | 0x01(version 2 bytes) | msg_len(4 bytes) | msg_id(8 bytes) |

任何一个 message,我们都使用 header + body 的方式,body 就是实际的 message 数据,使用 protobuf 编码,而 header,首先就是两个字节的 magic value,0xdaf4,然后就是版本号,再就是 message 的整个长度,以及 message 的唯一 ID。

对于 mio,在 Linux 下面就是封装的 epoll,所以熟悉 epoll 的用户应该能非常方便的使用 mio 进行网络开发,简单流程如下:

  • bind 一个端口,生成一个 TcpListener 对象,并且 register 到 mio。

  • 处理 TcpListener on_readable 的回调,调用 accept 函数得到生成的 socket TcpStream,register 到 mio,我们后续就用这个 TcpStream 跟客户端进行交互。

  • TcpStream 处理 on_readable 或者 on_writable 的回调。

同时,Server 通过 mio 的 notify 来接受外面发过来的消息,譬如 TiKV 实现的 Transport,就是 Peer 在调用 send 的时候,将这个 message 直接通过 channel 发给 Server,然后在 notify 里面处理,找到对应的 Store connection,再发送给远端的 Store 的。

对于 snapshot 的发送,Server 会单独新开一个连接,直接使用一个线程同步发送,这样代码逻辑就会简单很多,不需要处理过多的异步 IO 逻辑。而对于接收端来说,在收到一个 message 的时候,会首先看这个 message 的类型,如果发现是 snapshot 的,则会进入接受 snapshot 的流程,会将收到的数据直接发给 snapshot 相关的线程,写到对应的 snapshot 文件里面。如果是其他的 message,也会直接 dispatch 到对应的处理逻辑处理,可以参考 Server 的 on_conn_msg 函数。

因为 Server 就是对网络 IO 的处理,逻辑比较简单,这里就不过多说明,但是,鉴于现阶段 TiKV 使用的是自定义的网络协议,并不利于跟外部其他客户端的对接,并且也没有 pipeline,stream 等优秀特性的 支持,所以后续我们会换成 gRPC。

总结

这里,我们解释了 TiKV 核心的 Raft 库,Multi Raft。在后续的章节,我们会介绍 Transaction,Coprocessor 以及 PD 是如何对整个集群进行变更的。
(第二部分完结)

今天关于《TiKV 源码解析系列——multi-raft 设计与实现》的内容介绍就到此结束,如果有什么疑问或者建议,可以在golang学习网公众号下多多回复交流;文中若有不正之处,也希望回复留言以告知!

声明:本文转载于:SegmentFault 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>