在两个独立速率限制节点之间同步调用
来源:stackoverflow
时间:2024-02-27 10:54:24 369浏览 收藏
亲爱的编程学习爱好者,如果你点开了这篇文章,说明你对《在两个独立速率限制节点之间同步调用》很感兴趣。本篇文章就来给大家详细解析一下,主要介绍一下,希望所有认真读完的童鞋们,都有实质性的提高。
我正在使用一些第三方 api,每个 api 都有自己的速率限制。端点1的速率限制为10/s,端点2的速率限制为20/s。
我需要通过端点 1 处理数据,该端点将返回一个对象数组(2-3000 个对象之间)。然后,我需要获取每个对象并将一些数据发送到第二个端点,同时遵守第二个端点的速率限制。
我计划在 go 例程中一次批量发送 10 个请求,确保如果所有 10 个请求都在 <1s 中完成,我不会在 1 秒窗口内发送更多请求。
最终,我希望能够限制每个端点一次发出的并发响应数量。特别是如果我必须针对由于服务器 500 多个响应等原因导致的失败请求进行重试。
出于问题的目的,我使用 httpbin 请求来模拟以下场景:
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"sync"
"time"
)
type HttpBinGetRequest struct {
url string
}
type HttpBinGetResponse struct {
Uuid string `json:"uuid"`
StatusCode int
}
type HttpBinPostRequest struct {
url string
uuid string // Item to post to API
}
type HttpBinPostResponse struct {
Data string `json:"data"`
StatusCode int
}
func main() {
// Prepare GET requests for 500 requests
var requests []*HttpBinGetRequest
for i := 0; i < 500; i++ {
uri := "https://httpbin.org/uuid"
request := &HttpBinGetRequest{
url: uri,
}
requests = append(requests, request)
}
// Create semaphore and rate limit for the GET endpoint
getSemaphore := make(chan struct{}, 10)
getRate := make(chan struct{}, 10)
for i := 0; i < cap(getRate); i++ {
getRate <- struct{}{}
}
go func() {
// ticker corresponding to 1/10th of a second
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
_, ok := <-getRate
if !ok {
return
}
}
}()
// Send our GET requests to obtain a random UUID
var wg sync.WaitGroup
for _, request := range requests {
wg.Add(1)
// Go func to make request and receive the response
go func(r *HttpBinGetRequest) {
defer wg.Done()
// Check the rate limiter and block if it is empty
getRate <- struct{}{}
// Add a token to the semaphore
getSemaphore <- struct{}{}
// Remove token when function is complete
defer func() {
<-getSemaphore
}()
resp, _ := get(r)
fmt.Printf("%+v\n", resp)
}(request)
}
wg.Wait()
// I need to add code that obtains the response data from the above for loop
// then sends the UUID it to its own go routines for a POST request, following a similar pattern above
// To not violate the rate limit of the second endpoint which is 20 calls per second
// postSemaphore := make(chan struct{}, 20)
// postRate := make(chan struct{}, 20)
// for i := 0; i < cap(postRate); i++ {
// postRate <- struct{}{}
// }
}
func get(hbgr *HttpBinGetRequest) (*HttpBinGetResponse, error) {
httpResp := &HttpBinGetResponse{}
client := &http.Client{}
req, err := http.NewRequest("GET", hbgr.url, nil)
if err != nil {
fmt.Println("error making request")
return httpResp, err
}
req.Header = http.Header{
"accept": {"application/json"},
}
resp, err := client.Do(req)
if err != nil {
fmt.Println(err)
fmt.Println("error getting response")
return httpResp, err
}
// Read Response
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println("error reading response body")
return httpResp, err
}
json.Unmarshal(body, &httpResp)
httpResp.StatusCode = resp.StatusCode
return httpResp, nil
}
// Method to post data to httpbin
func post(hbr *HttpBinPostRequest) (*HttpBinPostResponse, error) {
httpResp := &HttpBinPostResponse{}
client := &http.Client{}
req, err := http.NewRequest("POST", hbr.url, bytes.NewBuffer([]byte(hbr.uuid)))
if err != nil {
fmt.Println("error making request")
return httpResp, err
}
req.Header = http.Header{
"accept": {"application/json"},
}
resp, err := client.Do(req)
if err != nil {
fmt.Println("error getting response")
return httpResp, err
}
if resp.StatusCode == 429 {
fmt.Println(resp.Header.Get("Retry-After"))
}
// Read Response
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println("error reading response body")
return httpResp, err
}
json.Unmarshal(body, &httpResp)
httpResp.StatusCode = resp.StatusCode
fmt.Printf("%+v", httpResp)
return httpResp, nil
}正确答案
这是生产者/消费者模式。您可以使用 chan 来连接它们。
关于速率限制器,我会使用包 golang.org/x/time/rate。
既然我们决定使用chan来连接生产者和消费者,那么很自然地将失败的任务发送到同一个chan,以便消费者可以再次尝试。
我已将逻辑封装到 scheduler[t] 类型中。请参阅下面的演示。请注意,该演示是匆忙编写的,仅用于说明想法。尚未经过彻底测试。
package main
import (
"context"
"fmt"
"io"
"log"
"math/rand"
"net/http"
"net/http/httptest"
"sort"
"sync"
"time"
"golang.org/x/time/rate"
)
type task[t any] struct {
param t
failedcount int
}
type scheduler[t any] struct {
name string
limit int
maxtries int
wg sync.waitgroup
tasks chan task[t]
action func(param t) error
}
// newscheduler creates a scheduler that runs the action with the specified rate limit.
// it will retry the action if the action returns a non-nil error.
func newscheduler[t any](name string, limit, maxtries, chansize int, action func(param t) error) *scheduler[t] {
return &scheduler[t]{
name: name,
limit: limit,
maxtries: maxtries,
tasks: make(chan task[t], chansize),
action: action,
}
}
func (s *scheduler[t]) addtask(param t) {
s.wg.add(1)
s.tasks <- task[t]{param: param}
}
func (s *scheduler[t]) retrylater(t task[t]) {
s.wg.add(1)
s.tasks <- t
}
func (s *scheduler[t]) run() {
lim := rate.newlimiter(rate.limit(s.limit), 1)
for t := range s.tasks {
t := t
if err := lim.wait(context.background()); err != nil {
log.fatalf("wait: %s", err)
return
}
go func() {
defer s.wg.done()
err := s.action(t.param)
if err != nil {
log.printf("task %s, param %v failed: %v", s.name, t.param, err)
t.failedcount++
if t.failedcount == s.maxtries {
log.printf("task %s, param %v failed with %d tries", s.name, t.param, s.maxtries)
return
}
s.retrylater(t)
}
}()
}
}
func (s *scheduler[t]) wait() {
s.wg.wait()
close(s.tasks)
}
func main() {
s := &server{}
ts := httptest.newserver(s)
defer ts.close()
schedulerpost := newscheduler("post", 20, 3, 1, func(param string) error {
return post(fmt.sprintf("%s/%s", ts.url, param))
})
go schedulerpost.run()
schedulerget := newscheduler("get", 10, 3, 1, func(param int) error {
id, err := get(fmt.sprintf("%s/%d", ts.url, param))
if err != nil {
return err
}
schedulerpost.addtask(id)
return nil
})
go schedulerget.run()
for i := 0; i < 100; i++ {
schedulerget.addtask(i)
}
schedulerget.wait()
schedulerpost.wait()
s.printstats()
}
func get(url string) (string, error) {
resp, err := http.get(url)
if err != nil {
return "", err
}
defer resp.body.close()
if resp.statuscode != 200 {
return "", fmt.errorf("unexpected status code: %d", resp.statuscode)
}
body, err := io.readall(resp.body)
if err != nil {
return "", err
}
return string(body), nil
}
func post(url string) error {
resp, err := http.post(url, "", nil)
if err != nil {
return err
}
defer resp.body.close()
if resp.statuscode != 200 {
return fmt.errorf("unexpected status code: %d", resp.statuscode)
}
return nil
}
type server struct {
gmu sync.mutex
gets []int64
pmu sync.mutex
posts []int64
}
func (s *server) servehttp(w http.responsewriter, r *http.request) {
log.printf("%s: %s", r.method, r.url.path)
// collect request stats.
if r.method == http.methodget {
s.gmu.lock()
s.gets = append(s.gets, time.now().unixmilli())
s.gmu.unlock()
} else {
s.pmu.lock()
s.posts = append(s.posts, time.now().unixmilli())
s.pmu.unlock()
}
n := rand.intn(1000)
// simulate latency.
time.sleep(time.duration(n) * time.millisecond)
// simulate errors.
if n%10 == 0 {
w.writeheader(http.statusinternalservererror)
return
}
if r.method == http.methodget {
fmt.fprintf(w, "%s", r.url.path[1:])
return
}
}
func (s *server) printstats() {
log.printf("gets (total: %d):\n", len(s.gets))
printstats(s.gets)
log.printf("posts (total: %d):\n", len(s.posts))
printstats(s.posts)
}
func printstats(ts []int64) {
sort.slice(ts, func(i, j int) bool {
return ts[i] < ts[j]
})
count := 0
to := ts[0] + 1000
for i := 0; i < len(ts); i++ {
if ts[i] < to {
count++
} else {
fmt.printf(" %d: %d\n", to, count)
i-- // push back the current item
count = 0
to += 1000
}
}
if count > 0 {
fmt.printf(" %d: %d\n", to, count)
}
}
输出如下所示:
... 2023/03/25 21:03:30 GETS (total: 112): 1679749398998: 10 1679749399998: 10 1679749400998: 10 1679749401998: 10 1679749402998: 10 1679749403998: 10 1679749404998: 10 1679749405998: 10 1679749406998: 10 1679749407998: 10 1679749408998: 10 1679749409998: 2 2023/03/25 21:03:30 POSTS (total: 111): 1679749399079: 8 1679749400079: 8 1679749401079: 12 1679749402079: 8 1679749403079: 10 1679749404079: 9 1679749405079: 9 1679749406079: 8 1679749407079: 14 1679749408079: 12 1679749409079: 9 1679749410079: 4
今天关于《在两个独立速率限制节点之间同步调用》的内容就介绍到这里了,是不是学起来一目了然!想要了解更多关于的内容请关注golang学习网公众号!
-
502 收藏
-
502 收藏
-
501 收藏
-
501 收藏
-
501 收藏
-
139 收藏
-
204 收藏
-
325 收藏
-
478 收藏
-
486 收藏
-
439 收藏
-
357 收藏
-
352 收藏
-
101 收藏
-
440 收藏
-
212 收藏
-
143 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 立即学习 543次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 立即学习 516次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 立即学习 500次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 立即学习 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 立即学习 485次学习