Golang分布式注册中心实现流程讲解
来源:脚本之家
时间:2023-05-12 09:46:33 361浏览 收藏
编程并不是一个机械性的工作,而是需要有思考,有创新的工作,语法是固定的,但解决问题的思路则是依靠人的思维,这就需要我们坚持学习和更新自己的知识。今天golang学习网就整理分享《Golang分布式注册中心实现流程讲解》,文章讲解的知识点主要包括分布式、注册中心,如果你对Golang方面的知识点感兴趣,就不要错过golang学习网,在这可以对大家的知识积累有所帮助,助力开发能力的提升。
动手实现一个分布式注册中心
以一个日志微服务为例,将日志服务注册到注册中心展开!

日志服务
log/Server.go
其实这一个日志类的功能就是有基本的写文件功能,然后就是注册一个http的接口去写日志进去
package log
import (
"io/ioutil"
stlog "log"
"net/http"
"os"
)
var log *stlog.Logger
type fileLog string
// 编写日志的方法
func (fl fileLog) Write(data []byte) (int, error) {
f, err := os.OpenFile(string(fl), os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
if err != nil {
return 0, err
}
defer f.Close()
return f.Write(data)
}
// 启动一个日志对象 参数为日志文件名
func Run(destination string) {
log = stlog.New(fileLog(destination), "[go] - ", stlog.LstdFlags)
}
// 自身注册的一个服务方法
func RegisterHandlers() {
http.HandleFunc("/log", func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodPost:
msg, err := ioutil.ReadAll(r.Body)
if err != nil || len(msg) == 0 {
w.WriteHeader(http.StatusBadRequest)
return
}
write(string(msg))
default:
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
})
}
func write(message string) {
log.Printf("%v\n", message)
}log/Client.go
提供给外部服务的接口,定义好日志的命名格式,来显示调用接口去使用已经注册好的日志接口并且返回状态
package log
import (
"bytes"
"distributed/registry"
"fmt"
"net/http"
stlog "log"
)
func SetClientLogger(serviceURL string, clientService registry.ServiceName) {
stlog.SetPrefix(fmt.Sprintf("[%v] - ", clientService))
stlog.SetFlags(0)
stlog.SetOutput(&clientLogger{url: serviceURL})
}
type clientLogger struct {
url string
}
func (cl clientLogger) Write(data []byte) (int, error) {
b := bytes.NewBuffer([]byte(data))
res, err := http.Post(cl.url+"/log", "text/plain", b)
if err != nil {
return 0, err
}
if res.StatusCode != http.StatusOK {
return 0, fmt.Errorf("Failed to send log message. Service responded with %d - %s", res.StatusCode, res.Status)
}
return len(data), nil
}主启动程序LogService
启动服务Logservice,主要执行start方法,里面有细节实现服务注册与服务发现
package main
import (
"context"
"distributed/log"
"distributed/registry"
"distributed/service"
"fmt"
stlog "log"
)
func main() {
// 初始化启动一个日志文件对象
log.Run("./distributed.log")
// 日志服务注册的端口和地址
host, port := "localhost", "4000"
serviceAddress := fmt.Sprintf("http://%s:%s", host, port)
// 初始化注册对象
r := registry.Registration{
ServiceName: registry.LogService, // 自身服务名
ServiceURL: serviceAddress, // 自身服务地址
RequiredServices: make([]registry.ServiceName, 0),// 依赖服务
ServiceUpdateURL: serviceAddress + "/services", // 服务列表
HeartbeatURL: serviceAddress + "/heartbeat", // 心跳
}
// 启动日志服务包含服务注册,发现等细节
ctx, err := service.Start(
context.Background(),
host,
port,
r,
log.RegisterHandlers,
)
// 异常写入到日志中
if err != nil {
stlog.Fatalln(err)
}
// 超时停止退出服务
服务启动与注册
service/service.go
Start 启动服务的主方法
/*
host: 地址
port: 端口号
reg: 注册的服务对象
registerHandlersFunc: 注册方法
*/
func Start(ctx context.Context, host, port string,
reg registry.Registration,
registerHandlersFunc func()) (context.Context, error) {
registerHandlersFunc() // 启动注册方法
// 启动服务
ctx = startService(ctx, reg.ServiceName, host, port)
// 注册服务
err := registry.RegisterService(reg)
if err != nil {
return ctx, err
}
return ctx, nil
}startService
func startService(ctx context.Context, serviceName registry.ServiceName,
host, port string) context.Context {
ctx, cancel := context.WithCancel(ctx)
var srv http.Server
srv.Addr = ":" + port
// 该协程为监听http服务,并且停止服务的时候cancel
go func() {
log.Println(srv.ListenAndServe())
// 删除对应的服务
err := registry.ShutdownService(fmt.Sprintf("http://%s:%s", host, port))
if err != nil {
log.Println(err)
}
cancel()
}()
// 该协程为监听手动停止服务的信号
go func() {
fmt.Printf("%v started. Press any key to stop. \n", serviceName)
var s string
fmt.Scanln(&s)
err := registry.ShutdownService(fmt.Sprintf("http://%s:%s", host, port))
if err != nil {
log.Println(err)
}
srv.Shutdown(ctx)
cancel()
}()
return ctx
}服务注册与发现
registry/client.go
注册服务的时候会连着心跳以及服务更新的方法一起注册!
而服务更新里面的细节就是自己自定义了一个Handler然后ServeHttp方法里面去update全局的服务提供对象,
update主要是更新服务和删除服务的最新消息
然后就是提供一个注销服务的方法
package registry
import (
"bytes"
"encoding/json"
"fmt"
"log"
"math/rand"
"net/http"
"net/url"
"sync"
)
// 注册服务
func RegisterService(r Registration) error {
// 获得心跳地址并注册
heartbeatURL, err := url.Parse(r.HeartbeatURL)
if err != nil {
return err
}
http.HandleFunc(heartbeatURL.Path, func (w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
// 获得服务更新地址,并且自定义http服务的handler,因为每次更新服务的时候,可以在ServeHttp方法里面去维护
serviceUpdateURL, err := url.Parse(r.ServiceUpdateURL)
if err != nil {
return err
}
http.Handle(serviceUpdateURL.Path, &serviceUpdateHanlder{})
// 写入buf值将服务对象发送给注册中心的services地址
buf := new(bytes.Buffer)
enc := json.NewEncoder(buf)
err = enc.Encode(r)
if err != nil {
return err
}
res, err := http.Post(ServicesURL, "application/json", buf)
if err != nil {
return err
}
if res.StatusCode != http.StatusOK {
return fmt.Errorf("Failed to register service. Registry service "+
"responded with code %v", res.StatusCode)
}
return nil
}
type serviceUpdateHanlder struct{}
func (suh serviceUpdateHanlder) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
dec := json.NewDecoder(r.Body)
var p patch
err := dec.Decode(&p)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}
fmt.Printf("Updated received %v\n", p)
prov.Update(p) // 更新服务提供对象
}
// 删除对应注册中心的服务地址
func ShutdownService(url string) error {
req, err := http.NewRequest(http.MethodDelete, ServicesURL,
bytes.NewBuffer([]byte(url)))
if err != nil {
return err
}
req.Header.Add("Content-Type", "text/plain")
res, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if res.StatusCode != http.StatusOK {
return fmt.Errorf("Failed to deregister service. Registry "+
"service responded with code %v", res.StatusCode)
}
return nil
}
// 更新服务列表
func (p *providers) Update(pat patch) {
p.mutex.Lock()
defer p.mutex.Unlock()
// 将patch中有新增的进行添加
for _, patchEntry := range pat.Added {
if _, ok := p.services[patchEntry.Name]; !ok {
p.services[patchEntry.Name] = make([]string, 0)
}
p.services[patchEntry.Name] = append(p.services[patchEntry.Name],
patchEntry.URL)
}
// 将patch中被标记删除的
for _, patchEntry := range pat.Removed {
if providerURLs, ok := p.services[patchEntry.Name]; ok {
for i := range providerURLs {
if providerURLs[i] == patchEntry.URL {
p.services[patchEntry.Name] = append(providerURLs[:i],
providerURLs[i+1:]...)
}
}
}
}
}
// 根据服务名负载均衡随机获取服务地址
func (p providers) get(name ServiceName) (string, error) {
providers, ok := p.services[name]
if !ok {
return "", fmt.Errorf("No providers available for service %v", name)
}
idx := int(rand.Float32() * float32(len(providers)))
return providers[idx], nil
}
// 对外暴露生产者的方法
func GetProvider(name ServiceName) (string, error) {
return prov.get(name)
}
type providers struct {
services map[ServiceName][]string
mutex *sync.RWMutex
}
// 服务提供对象
var prov = providers{
services: make(map[ServiceName][]string), // 服务列表 服务名->集群地址集合
mutex: new(sync.RWMutex), // 锁 防止服务注册更新时的并发情况
}registry/registration.go
主要是一些关于服务使用到的参数以及对象!
package registry
type Registration struct {
ServiceName ServiceName // 服务名
ServiceURL string // 服务地址
RequiredServices []ServiceName // 依赖的服务
ServiceUpdateURL string // 服务更新的地址
HeartbeatURL string // 心跳地址
}
type ServiceName string
// 服务名集合
const (
LogService = ServiceName("LogService")
GradingService = ServiceName("GradingService")
PortalService = ServiceName("Portald")
)
// 服务对象参数
type patchEntry struct {
Name ServiceName
URL string
}
// 更新的服务对象参数
type patch struct {
Added []patchEntry
Removed []patchEntry
}registry/server.go
服务端的注册中心服务的增删改查管理以及心跳检测,及时将最新的更新的服务消息通知回给客户端
package registry
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"sync"
"time"
)
const ServerPort = ":3000"
const ServicesURL = "http://localhost" + ServerPort + "/services" // 注册中心地址
// 服务对象集合
type registry struct {
registrations []Registration
mutex *sync.RWMutex
}
// 添加服务
func (r *registry) add(reg Registration) error {
r.mutex.Lock()
r.registrations = append(r.registrations, reg)
r.mutex.Unlock()
err := r.sendRequiredServices(reg)
r.notify(patch{
Added: []patchEntry{
patchEntry{
Name: reg.ServiceName,
URL: reg.ServiceURL,
},
},
})
return err
}
// 通知服务接口请求去刷新改变后到服务
func (r registry) notify(fullPatch patch) {
r.mutex.RLock()
defer r.mutex.RUnlock()
for _, reg := range r.registrations {
go func(reg Registration) {
for _, reqService := range reg.RequiredServices {
p := patch{Added: []patchEntry{}, Removed: []patchEntry{}}
sendUpdate := false
for _, added := range fullPatch.Added {
if added.Name == reqService {
p.Added = append(p.Added, added)
sendUpdate = true
}
}
for _, removed := range fullPatch.Removed {
if removed.Name == reqService {
p.Removed = append(p.Removed, removed)
sendUpdate = true
}
}
if sendUpdate {
err := r.sendPatch(p, reg.ServiceUpdateURL)
if err != nil {
log.Println(err)
return
}
}
}
}(reg)
}
}
// 更新每个服务的依赖服务
func (r registry) sendRequiredServices(reg Registration) error {
r.mutex.RLock()
defer r.mutex.RUnlock()
var p patch
for _, serviceReg := range r.registrations {
for _, reqService := range reg.RequiredServices {
if serviceReg.ServiceName == reqService {
p.Added = append(p.Added, patchEntry{
Name: serviceReg.ServiceName,
URL: serviceReg.ServiceURL,
})
}
}
}
err := r.sendPatch(p, reg.ServiceUpdateURL)
if err != nil {
return err
}
return nil
}
// 告诉客户端更新,最新的服务列表是这个
func (r registry) sendPatch(p patch, url string) error {
d, err := json.Marshal(p)
if err != nil {
return err
}
_, err = http.Post(url, "application/json", bytes.NewBuffer(d))
if err != nil {
return err
}
return nil
}
// 注册中心删除服务对象
func (r *registry) remove(url string) error {
for i := range reg.registrations {
if reg.registrations[i].ServiceURL == url {
// 通知客户端更新对象信息
r.notify(patch{
Removed: []patchEntry{
{
Name: r.registrations[i].ServiceName,
URL: r.registrations[i].ServiceURL,
},
},
})
r.mutex.Lock()
reg.registrations = append(reg.registrations[:i], reg.registrations[i+1:]...)
r.mutex.Unlock()
return nil
}
}
return fmt.Errorf("Service at URL %s not found", url)
}
// 心跳检测
func (r *registry) heartbeat(freq time.Duration) {
for {
var wg sync.WaitGroup
for _, reg := range r.registrations {
wg.Add(1)
go func(reg Registration) {
defer wg.Done()
success := true
for attemps := 0; attemps 本篇关于《Golang分布式注册中心实现流程讲解》的介绍就到此结束啦,但是学无止境,想要了解学习更多关于Golang的相关知识,请关注golang学习网公众号!
声明:本文转载于:脚本之家 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
-
220 收藏
-
117 收藏
-
384 收藏
-
113 收藏
-
269 收藏
最新阅读
更多>
-
241 收藏
-
467 收藏
-
500 收藏
-
249 收藏
-
150 收藏
-
132 收藏
-
450 收藏
-
209 收藏
-
237 收藏
-
188 收藏
-
317 收藏
-
209 收藏
课程推荐
更多>
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 立即学习 543次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 立即学习 516次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 立即学习 500次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 立即学习 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 立即学习 485次学习