登录
首页 >  Golang >  Go问答

限定通道处理消息数

来源:stackoverflow

时间:2024-03-22 23:45:33 125浏览 收藏

为了每秒限制发送给客户端的消息数量为 20 条,本文介绍了两种解决方案。第一个解决方案使用速率限制器,创建两个通道:一个包含所有原始通道项目,另一个以固定速率中继项目。第二个解决方案使用 time.ticker,通过 select 语句将消息写入主通道或以固定速率写入主通道和节流通道。这些方法使应用程序能够跳过在阻塞的 50 毫秒内出现的所有消息,仅保存最新的消息,并在循环内阻塞时间结束且没有新消息到来时处理该消息。

问题内容

我通过通道每秒收到大约 200 000 条消息,我需要将发送给客户端的消息数量限制为每秒 20 条。 这使得每 50 毫秒 1 条消息

并且在 loop 的帮助下,worker 在整个程序生命周期内仍然处于活动状态(并且不会为每条消息打开通道)。

我的目标: - 由于消息的顺序很重要,我想跳过在阻塞的 50 毫秒内出现的所有消息,只保存最新的消息 - 如果最新消息在阻塞的 50 毫秒内出现,我希望在循环内的阻塞时间结束并且没有新消息到来时处理保存的消息! <--这是我的问题

我的策略 - 不断将尚未处理的最新消息发送到同一通道

但问题是,如果该消息是在(来自应用程序)收到新消息之后发送的,该怎么办?

下面的代码更像是一个作为工作代码的算法,只是想要一个关于如何做到这一点的提示/方法。

func example (new_message_from_channel <-chan *message) {
    default = message
    time = now_milliseconds
    diff_accepted = 50milli
    for this_message := range new_message_from_channel {
        if now_millisecond -  time >= diff_accepted {
            send_it_to_the_client
            time = now_milliseconds
        } else {
            //save the latest message
            default = this_message

            //My problem is how to process this latest message when the blocked 50ms is over and no new message coming ?!

            //My strategy - keep sending it to the same channel
            theChannel <- default
        }

    }
}

如果你有更好的方法,欢迎与我分享:)


解决方案


使用速率限制器,您可以创建一个 throttle 函数,该函数将: 速率和通道作为输入;并返回两个通道 - 一个包含所有原始通道项目,另一个仅以固定速率中继项目:

func throttle(r time.duration, in <-chan event) (c, tc <-chan event) {

    // "writeable" channels
    var (
        wc  = make(chan event)
        wtc = make(chan event)
    )

    // read-only channels - returned to caller
    c = wc
    tc = wtc

    go func() {
        defer close(wc)
        defer close(wtc)

        rl := rate.newlimiter(
            rate.every(r),
            1,
        )

        // relays input channel's items to two channels:
        // (1) gets all writes from original channel
        // (2) only writes at a fixed frequency
        for ev := range in {
            wc <- ev
            if rl.allow() {
                wtc <- ev
            }
        }
    }()
    return
}

工作示例:https://play.golang.org/p/upei0TiyzNr

编辑:

为了避免使用速率限制器,而是使用简单的 time.ticker

tick := time.NewTicker(r)

for ev := range in {
    select {
    case wC <- ev: // write to main
    case <-tick.C:
        wC <- ev  // write to main ...
        wtC <- ev // ... plus throttle channel
    }
}

工作示例:https://play.golang.org/p/UTRXh72BvRl

到这里,我们也就讲完了《限定通道处理消息数》的内容了。个人认为,基础知识的学习和巩固,是为了更好的将其运用到项目中,欢迎关注golang学习网公众号,带你了解更多关于的知识点!

声明:本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>