在两个独立速率限制节点之间同步调用
来源:stackoverflow
时间:2024-02-27 10:54:24 369浏览 收藏
亲爱的编程学习爱好者,如果你点开了这篇文章,说明你对《在两个独立速率限制节点之间同步调用》很感兴趣。本篇文章就来给大家详细解析一下,主要介绍一下,希望所有认真读完的童鞋们,都有实质性的提高。
我正在使用一些第三方 api,每个 api 都有自己的速率限制。端点1的速率限制为10/s,端点2的速率限制为20/s。
我需要通过端点 1 处理数据,该端点将返回一个对象数组(2-3000 个对象之间)。然后,我需要获取每个对象并将一些数据发送到第二个端点,同时遵守第二个端点的速率限制。
我计划在 go 例程中一次批量发送 10 个请求,确保如果所有 10 个请求都在 <1s 中完成,我不会在 1 秒窗口内发送更多请求。
最终,我希望能够限制每个端点一次发出的并发响应数量。特别是如果我必须针对由于服务器 500 多个响应等原因导致的失败请求进行重试。
出于问题的目的,我使用 httpbin 请求来模拟以下场景:
package main import ( "bytes" "encoding/json" "fmt" "io" "net/http" "sync" "time" ) type HttpBinGetRequest struct { url string } type HttpBinGetResponse struct { Uuid string `json:"uuid"` StatusCode int } type HttpBinPostRequest struct { url string uuid string // Item to post to API } type HttpBinPostResponse struct { Data string `json:"data"` StatusCode int } func main() { // Prepare GET requests for 500 requests var requests []*HttpBinGetRequest for i := 0; i < 500; i++ { uri := "https://httpbin.org/uuid" request := &HttpBinGetRequest{ url: uri, } requests = append(requests, request) } // Create semaphore and rate limit for the GET endpoint getSemaphore := make(chan struct{}, 10) getRate := make(chan struct{}, 10) for i := 0; i < cap(getRate); i++ { getRate <- struct{}{} } go func() { // ticker corresponding to 1/10th of a second ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() for range ticker.C { _, ok := <-getRate if !ok { return } } }() // Send our GET requests to obtain a random UUID var wg sync.WaitGroup for _, request := range requests { wg.Add(1) // Go func to make request and receive the response go func(r *HttpBinGetRequest) { defer wg.Done() // Check the rate limiter and block if it is empty getRate <- struct{}{} // Add a token to the semaphore getSemaphore <- struct{}{} // Remove token when function is complete defer func() { <-getSemaphore }() resp, _ := get(r) fmt.Printf("%+v\n", resp) }(request) } wg.Wait() // I need to add code that obtains the response data from the above for loop // then sends the UUID it to its own go routines for a POST request, following a similar pattern above // To not violate the rate limit of the second endpoint which is 20 calls per second // postSemaphore := make(chan struct{}, 20) // postRate := make(chan struct{}, 20) // for i := 0; i < cap(postRate); i++ { // postRate <- struct{}{} // } } func get(hbgr *HttpBinGetRequest) (*HttpBinGetResponse, error) { httpResp := &HttpBinGetResponse{} client := &http.Client{} req, err := http.NewRequest("GET", hbgr.url, nil) if err != nil { fmt.Println("error making request") return httpResp, err } req.Header = http.Header{ "accept": {"application/json"}, } resp, err := client.Do(req) if err != nil { fmt.Println(err) fmt.Println("error getting response") return httpResp, err } // Read Response body, err := io.ReadAll(resp.Body) if err != nil { fmt.Println("error reading response body") return httpResp, err } json.Unmarshal(body, &httpResp) httpResp.StatusCode = resp.StatusCode return httpResp, nil } // Method to post data to httpbin func post(hbr *HttpBinPostRequest) (*HttpBinPostResponse, error) { httpResp := &HttpBinPostResponse{} client := &http.Client{} req, err := http.NewRequest("POST", hbr.url, bytes.NewBuffer([]byte(hbr.uuid))) if err != nil { fmt.Println("error making request") return httpResp, err } req.Header = http.Header{ "accept": {"application/json"}, } resp, err := client.Do(req) if err != nil { fmt.Println("error getting response") return httpResp, err } if resp.StatusCode == 429 { fmt.Println(resp.Header.Get("Retry-After")) } // Read Response body, err := io.ReadAll(resp.Body) if err != nil { fmt.Println("error reading response body") return httpResp, err } json.Unmarshal(body, &httpResp) httpResp.StatusCode = resp.StatusCode fmt.Printf("%+v", httpResp) return httpResp, nil }
正确答案
这是生产者/消费者模式。您可以使用 chan 来连接它们。
关于速率限制器,我会使用包 golang.org/x/time/rate
。
既然我们决定使用chan来连接生产者和消费者,那么很自然地将失败的任务发送到同一个chan,以便消费者可以再次尝试。
我已将逻辑封装到 scheduler[t]
类型中。请参阅下面的演示。请注意,该演示是匆忙编写的,仅用于说明想法。尚未经过彻底测试。
package main import ( "context" "fmt" "io" "log" "math/rand" "net/http" "net/http/httptest" "sort" "sync" "time" "golang.org/x/time/rate" ) type task[t any] struct { param t failedcount int } type scheduler[t any] struct { name string limit int maxtries int wg sync.waitgroup tasks chan task[t] action func(param t) error } // newscheduler creates a scheduler that runs the action with the specified rate limit. // it will retry the action if the action returns a non-nil error. func newscheduler[t any](name string, limit, maxtries, chansize int, action func(param t) error) *scheduler[t] { return &scheduler[t]{ name: name, limit: limit, maxtries: maxtries, tasks: make(chan task[t], chansize), action: action, } } func (s *scheduler[t]) addtask(param t) { s.wg.add(1) s.tasks <- task[t]{param: param} } func (s *scheduler[t]) retrylater(t task[t]) { s.wg.add(1) s.tasks <- t } func (s *scheduler[t]) run() { lim := rate.newlimiter(rate.limit(s.limit), 1) for t := range s.tasks { t := t if err := lim.wait(context.background()); err != nil { log.fatalf("wait: %s", err) return } go func() { defer s.wg.done() err := s.action(t.param) if err != nil { log.printf("task %s, param %v failed: %v", s.name, t.param, err) t.failedcount++ if t.failedcount == s.maxtries { log.printf("task %s, param %v failed with %d tries", s.name, t.param, s.maxtries) return } s.retrylater(t) } }() } } func (s *scheduler[t]) wait() { s.wg.wait() close(s.tasks) } func main() { s := &server{} ts := httptest.newserver(s) defer ts.close() schedulerpost := newscheduler("post", 20, 3, 1, func(param string) error { return post(fmt.sprintf("%s/%s", ts.url, param)) }) go schedulerpost.run() schedulerget := newscheduler("get", 10, 3, 1, func(param int) error { id, err := get(fmt.sprintf("%s/%d", ts.url, param)) if err != nil { return err } schedulerpost.addtask(id) return nil }) go schedulerget.run() for i := 0; i < 100; i++ { schedulerget.addtask(i) } schedulerget.wait() schedulerpost.wait() s.printstats() } func get(url string) (string, error) { resp, err := http.get(url) if err != nil { return "", err } defer resp.body.close() if resp.statuscode != 200 { return "", fmt.errorf("unexpected status code: %d", resp.statuscode) } body, err := io.readall(resp.body) if err != nil { return "", err } return string(body), nil } func post(url string) error { resp, err := http.post(url, "", nil) if err != nil { return err } defer resp.body.close() if resp.statuscode != 200 { return fmt.errorf("unexpected status code: %d", resp.statuscode) } return nil } type server struct { gmu sync.mutex gets []int64 pmu sync.mutex posts []int64 } func (s *server) servehttp(w http.responsewriter, r *http.request) { log.printf("%s: %s", r.method, r.url.path) // collect request stats. if r.method == http.methodget { s.gmu.lock() s.gets = append(s.gets, time.now().unixmilli()) s.gmu.unlock() } else { s.pmu.lock() s.posts = append(s.posts, time.now().unixmilli()) s.pmu.unlock() } n := rand.intn(1000) // simulate latency. time.sleep(time.duration(n) * time.millisecond) // simulate errors. if n%10 == 0 { w.writeheader(http.statusinternalservererror) return } if r.method == http.methodget { fmt.fprintf(w, "%s", r.url.path[1:]) return } } func (s *server) printstats() { log.printf("gets (total: %d):\n", len(s.gets)) printstats(s.gets) log.printf("posts (total: %d):\n", len(s.posts)) printstats(s.posts) } func printstats(ts []int64) { sort.slice(ts, func(i, j int) bool { return ts[i] < ts[j] }) count := 0 to := ts[0] + 1000 for i := 0; i < len(ts); i++ { if ts[i] < to { count++ } else { fmt.printf(" %d: %d\n", to, count) i-- // push back the current item count = 0 to += 1000 } } if count > 0 { fmt.printf(" %d: %d\n", to, count) } }
输出如下所示:
... 2023/03/25 21:03:30 GETS (total: 112): 1679749398998: 10 1679749399998: 10 1679749400998: 10 1679749401998: 10 1679749402998: 10 1679749403998: 10 1679749404998: 10 1679749405998: 10 1679749406998: 10 1679749407998: 10 1679749408998: 10 1679749409998: 2 2023/03/25 21:03:30 POSTS (total: 111): 1679749399079: 8 1679749400079: 8 1679749401079: 12 1679749402079: 8 1679749403079: 10 1679749404079: 9 1679749405079: 9 1679749406079: 8 1679749407079: 14 1679749408079: 12 1679749409079: 9 1679749410079: 4
今天关于《在两个独立速率限制节点之间同步调用》的内容就介绍到这里了,是不是学起来一目了然!想要了解更多关于的内容请关注golang学习网公众号!
-
502 收藏
-
502 收藏
-
501 收藏
-
501 收藏
-
501 收藏
-
139 收藏
-
204 收藏
-
325 收藏
-
477 收藏
-
486 收藏
-
439 收藏
-
357 收藏
-
352 收藏
-
101 收藏
-
440 收藏
-
212 收藏
-
143 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 立即学习 542次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 立即学习 508次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 立即学习 497次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 立即学习 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 立即学习 484次学习