登录
首页 >  Golang >  Go问答

使用 Google Pub/Sub 轮询订阅器的频道

来源:stackoverflow

时间:2024-03-03 18:42:25 354浏览 收藏

你在学习Golang相关的知识吗?本文《使用 Google Pub/Sub 轮询订阅器的频道》,主要介绍的内容就涉及到,如果你想提升自己的开发能力,就不要错过这篇文章,大家要知道编程理论基础和实战操作都是不可或缺的哦!

问题内容

我正在尝试在 golang 中创建一个 google pubsub 订阅者,我一次接收 100 条消息,然后将它们写入 influx。我正在尝试使用渠道来执行此操作,如下所示:

package main

import (
    "os"
    "fmt"
    "cloud.google.com/go/pubsub"
    "log"
    "sync"
    "golang.org/x/net/context"
    "encoding/json"
    clnt "github.com/influxdata/influxdb/client/v2"
    "time"
)

type sensordata struct {
    pressure      float64 `json:"pressure"`
    temperature   float64 `json:"temperature"`
    dewpoint      float64 `json:"dewpoint"`
    timecollected int64   `json:"timecollected"`
    latitude      float64 `json:"latitude"`
    longitude     float64 `json:"longitude"`
    humidity      float64 `json:"humidity"`
    sensorid      string  `json:"sensorid"`
    zipcode       int     `json:"zipcode"`
    warehouse     string  `json:"warehouse"`
    area          string  `json:"area"`
}

type sensorpoints struct {
    sensordata      []sensordata
}

func main () {

    messages := make(chan sensordata, 100)

    // create a new influx httpclient
    c, err := clnt.newhttpclient(clnt.httpconfig{
        addr:     "http://localhost:8086",
        username: "user",
        password: "pass",
    })
    if err != nil {
        log.fatal(err)
    }


    // create pubsub subscriber
    ctx := context.background()
    proj := os.getenv("google_cloud_project")
    if proj == "" {
        fmt.fprintf(os.stderr, "google_cloud_project environment variable must be set.\n")
        os.exit(1)
    }
    client, err := pubsub.newclient(ctx, proj)
    if err != nil {
        log.fatalf("could not create pubsub client: %v", err)
    }
    const sub = "influxwriter"


    //create influx a blank batchpoint set
    bp, err := clnt.newbatchpoints(clnt.batchpointsconfig{
        database:  "sensordata",
        precision: "s",
    })
    if err != nil {
        log.fatal(err)
    }



    // pull messages via the subscription.
    go pullmsgs(client, sub, messages)
    if err != nil {
        log.fatal(err)
    }

    writeinflux(messages, bp)

    c.close()

}


func pullmsgs(client *pubsub.client, name string, messages chan<- sensordata) {
    ctx := context.background()

    // [start pubsub_subscriber_async_pull]
    // [start pubsub_quickstart_subscriber]
    // consume 10 messages.

    var mu sync.mutex
    var sensorinfos sensorpoints
    sensorinfo := &sensordata{}

    received := 0
    sub := client.subscription(name)
    cctx, _ := context.withcancel(ctx)
    err := sub.receive(cctx, func(ctx context.context, msg *pubsub.message) {
        msg.ack()

        json.unmarshal(msg.data, sensorinfo)

        //fmt.println(string(msg.data))
        //fmt.println(sensorinfo.sensorid)

        sensorinfos.sensordata = append(sensorinfos.sensordata, *sensorinfo)

        mu.lock()
        defer mu.unlock()
        received++
        fmt.println("rcv: ", received)
        messages <- *sensorinfo

    })
    if err != nil {
        fmt.println(err)
    }
    // [end pubsub_subscriber_async_pull]
    // [end pubsub_quickstart_subscriber]
}

func writeinflux(sensorpoints <- chan sensordata, bp clnt.batchpoints) {

    for p := range sensorpoints {

        // create a point and add to batch
        tags := map[string]string{
            "sensorid": p.sensorid,
            "warehouse": p.warehouse,
            "area": p.area,
            "zipcode": string(p.zipcode),
        }

        fields := map[string]interface{}{
            "pressure":   p.pressure,
            "humidity": p.humidity,
            "temperature":   p.temperature,
            "dewpoint":   p.dewpoint,
            "longitude":   p.longitude,
            "latitude":   p.latitude,
        }

        pt, err := clnt.newpoint("sensordata", tags, fields, time.unix(p.timecollected, 0))
        if err != nil {
            log.fatal(err)
        }
        bp.addpoint(pt)


    }


}

但它并没有看到每个都通过了最初的 pullmsgs 函数,并且只是继续在其中打印输出:

rcv:  1
rcv:  2
rcv:  3
rcv:  4
rcv:  5
rcv:  6
rcv:  7

我认为一旦通道满了,它应该阻塞直到通道被清空

这是我用作参考的 pubsub pull 代码。


解决方案


当您在频道上发送了所需数量的消息后,关闭频道并取消上下文。尝试使用 the documentation 中演示的在收到一定数量的消息后取消的技术。由于您的缓冲区为 100,并且您尝试一次消费 100 条消息,所以这就是数字。如果您希望程序退出,请关闭通道,以便 writeinflux 中的 for e := range ch 循环达到停止点,并且不会阻塞等待更多元素添加到通道中。

请注意 the Go pubsub API doc 中的这一点:

这并不是阻碍你的主 goroutine 的原因,但是如果没有取消,你的 pullmsgs goroutine 将不会自行退出。

此外,检查 unmarshal 上的错误。如果此时您不想在代码中处理解组错误,请考虑更改通道类型并发送 msgmsg.data,并在通道接收时进行解组。

cctx, cancel := context.WithCancel(ctx)
err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
    msg.Ack()
    err := json.Unmarshal(msg.Data, sensorinfo)
    if err != nil {
         fmt.Printf("Failed to unmarshal: %s\n", err)
    }
    mu.Lock()
    defer mu.Unlock()
    received++
    fmt.Println("rcv: ", received)
    messages <- *sensorinfo
    if received == 100 {
        close(messages)  // no more messages will be sent on channel
        cancel()
    }

今天关于《使用 Google Pub/Sub 轮询订阅器的频道》的内容就介绍到这里了,是不是学起来一目了然!想要了解更多关于的内容请关注golang学习网公众号!

声明:本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>