登录
首页 >  Golang >  Go问答

nats 限制队列中仍有消息,在 Go 中发送 ack 和 term

来源:stackoverflow

时间:2024-02-13 17:54:23 146浏览 收藏

来到golang学习网的大家,相信都是编程学习爱好者,希望在这里学习Golang相关编程知识。下面本篇文章就来带大家聊聊《nats 限制队列中仍有消息,在 Go 中发送 ack 和 term》,介绍一下,希望对大家的知识积累有所帮助,助力实战开发!

问题内容

我尝试为 nats 限制队列编写订阅者:

sub, err := js.SubscribeSync(fullSubject, nats.Context(ctx))
if err != nil {
    return err
}

msg, err := sub.NextMsgWithContext(ctx)
if err != nil {
    if errors.Is(err, nats.ErrSlowConsumer) {
        log.Printf("Slow consumer error returned. Waiting for reset...")
        time.Sleep(50 * time.Millisecond)
        continue
    } else {
        return err
    }
}

msg.InProgress()

var message pnats.NatsMessage
if err := conn.unmarshaller(msg.Data, &message); err != nil {
    msg.Term()
    return err
}

actualSubject := message.Context.FullSubject()
handler, ok := callbacks[message.Context.Category]
if !ok {
    msg.Nak()
    continue
}

callback, err := handler(&message)
if err == nil {
    msg.Ack()
    msg.Term()
} else {
    msg.Nak()
    return err
}

callback(ctx)

此代码的目标是使用多个主题上的任何消息并调用与主题关联的回调函数。这段代码可以工作,但我遇到的问题是,如果该函数不返回错误,我希望在调用 handler 后删除该消息。我以为这就是 msg.term 正在做的事情,但我仍然看到队列中的所有消息。

我最初是围绕工作队列设计的,但我希望它能够与多个订阅者一起工作,所以我不得不重新设计它。有什么办法可以做到这一点吗?


正确答案


根据提供的代码,我假设您在使用 JetStream 库创建订阅时没有提供流和消费者信息。

SubscribeSync 方法的 documentation 中,它表示当未提供流和消费者信息时,库将创建一个临时消费者,并且消费者的名称由服务器选择。它还尝试找出订阅的流。

我认为您的代码中会发生以下情况:

  • 当您调用 SubscribeSync 方法时,系统会使用您提供的主题创建一个临时使用者。
  • 当调用 msg.Ackmsg.Term 时,您确实会确认该消息,但仅限于当前使用者。
  • 下次调用 SubscribeSync 方法时,将创建一个新的临时使用者,其中包含您已在另一个使用者上删除的消息。这就是 Jetstream 的流、消费者和订阅概念的设计原理。

根据您想要实现的目标,以下是一些建议:

  • 使用普通的 NATS Core 库来处理发布/订阅或队列。不要使用 JetStream。 NATS Core 库直接处理主题,而 Jetstream 库在未提供信息的情况下会在幕后创建其他内容(流和使用者)。
  • 使用 JetStream,但可以通过代码或直接在 NATS 服务器上自行创建流和持久使用者。这样,在已经定义了流和使用者的情况下,您应该能够使其按预期工作。

今天带大家了解了的相关知识,希望对你有所帮助;关于Golang的技术知识我们会一点点深入介绍,欢迎大家关注golang学习网公众号,一起学习编程~

声明:本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>