登录
首页 >  Golang >  Go问答

使用 Rabbitmq 在消息队列中排列一条消息

来源:stackoverflow

时间:2024-02-16 16:06:27 429浏览 收藏

从现在开始,努力学习吧!本文《使用 Rabbitmq 在消息队列中排列一条消息》主要讲解了等等相关知识点,我会在golang学习网中持续更新相关的系列文章,欢迎大家关注并积极留言建议。下面就先一起来看一下本篇正文内容吧,希望能帮到你!

问题内容

我正在尝试遵循官方教程,但也添加了在 rabbitmq 中延迟/安排消息的可能性。我的设置在 docker 中使用 rabbitmq:3-management-alpine 运行,并且我一直在尝试设置 x-delay 标头,但消息仍然会立即发送。

send.go

package main

import (
    "context"
    "log"
    "time"

    amqp "github.com/rabbitmq/amqp091-go"
)

func failonerror(err error, msg string) {
    if err != nil {
        log.panicf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.dial("amqp://guest:guest@localhost:5672/")
    failonerror(err, "failed to connect to rabbitmq")
    defer conn.close()

    ch, err := conn.channel()
    failonerror(err, "failed to open a channel")
    defer ch.close()

    q, err := ch.queuedeclare(
        "hello", // name
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failonerror(err, "failed to declare a queue")
    ctx, cancel := context.withtimeout(context.background(), 5*time.second)
    defer cancel()

    body := "hello world!"
    err = ch.publishwithcontext(ctx,
        "",     // exchange
        q.name, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.publishing{
            headers: map[string]interface{}{
                "x-delay": 5000,
            },
            contenttype: "text/plain",
            body:        []byte(body),
        })
    failonerror(err, "failed to publish a message")
    log.printf(" [x] sent %s\n", body)
}

receive.go

package main

import (
    "log"

    amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "hello", // name
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    var forever chan struct{}

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

正确答案


为了安排消息,您需要将消息发布到定义了特定属性的交换器,但您在提供的代码中没有这样做。 以下是如何声明支持调度的交换的示例(摘自 official 文档):

map args = new hashmap();
args.put("x-delayed-type", "direct");
channel.exchangedeclare("my-exchange", "x-delayed-message", true, false, args);

那么这就是您发布的方式(再次取自官方文档):

byte[] messageBodyBytes = "delayed payload".getBytes();
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
headers = new HashMap();
headers.put("x-delay", 5000);
props.headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);

ps:抱歉没有提供 go 中的示例,我相信您可以弄清楚如何使用 go 库进行设置

终于介绍完啦!小伙伴们,这篇关于《使用 Rabbitmq 在消息队列中排列一条消息》的介绍应该让你收获多多了吧!欢迎大家收藏或分享给更多需要学习的朋友吧~golang学习网公众号也会发布Golang相关知识,快来关注吧!

声明:本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>