登录
首页 >  Golang >  Go问答

选择 Go 中 PCollection 中的前 N 行的 Apache Beam

来源:stackoverflow

时间:2024-02-14 16:54:22 482浏览 收藏

一分耕耘,一分收获!既然都打开这篇《选择 Go 中 PCollection 中的前 N 行的 Apache Beam》,就坚持看下去,学下去吧!本文主要会给大家讲到等等知识点,如果大家对本文有好的建议或者看到有不足之处,非常欢迎大家积极提出!在后续文章我会继续更新Golang相关的内容,希望对大家都有所帮助!

问题内容

我有一个 pcollection,我需要从中选择 n 个最大的行。我正在尝试使用 go 创建一个数据流管道并陷入困境。

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

type user struct {
    name string
    age  int
}

func printrow(ctx context.context, list user) {
    fmt.println(list)
}

func main() {

    flag.parse()
    beam.init()

    ctx := context.background()

    p := beam.newpipeline()
    s := p.root()

    var userlist = []user{
        {"bob", 5},
        {"adam", 8},
        {"john", 3},
        {"ben", 1},
        {"jose", 1},
        {"bryan", 1},
        {"kim", 1},
        {"tim", 1},
    }
    initial := beam.createlist(s, userlist)

    pc2 := beam.pardo(s, func(row user, emit func(user)) {
        emit(row)
    }, initial)

    beam.pardo0(s, printrow, pc2)

    if err := beamx.run(ctx, p); err != nil {
        log.exitf(ctx, "failed to execute job: %v", err)
    }

}

从上面的代码中,我需要根据 user.age 选择前 5 行 我发现链接顶部包具有相同的功能,但它说它返回单个元素 pcollection。有什么不同?

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func init() {
    beam.RegisterFunction(less)
}

type User struct {
    Name string
    Age  int
}

func printRow(ctx context.Context, list User) {
    fmt.Println(list)
}

func less(a, b User) bool {
    return a.Age < b.Age
}

func main() {

    flag.Parse()
    beam.Init()

    ctx := context.Background()

    p := beam.NewPipeline()
    s := p.Root()

    var userList = []User{
        {"Bob", 5},
        {"Adam", 8},
        {"John", 3},
        {"Ben", 1},
        {"Jose", 1},
        {"Bryan", 1},
        {"Kim", 1},
        {"Tim", 1},
    }
    initial := beam.CreateList(s, userList)

    best := top.Largest(s, initial, 5, less)

    pc2 := beam.ParDo(s, func(row User, emit func(User)) {
        emit(row)
    }, best)

    beam.ParDo0(s, printRow, pc2)

    if err := beamx.Run(ctx, p); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }

}

我像上面一样添加了选择前 5 行的函数,但出现错误 []main.user is not allocate to main.user

我需要与以前相同格式的 pcollection,因为我需要进一步处理。我怀疑这是因为 top.largest 函数返回单个元素 pcollection。关于如何转换格式有什么想法吗?


正确答案


最好的 pcollection 是 []user

所以尝试一下...

pc2 := beam.ParDo(s, func(rows []User, emit func(User)) {
    for _, row := range rows {
        emit(row)
    }
}, best)

好了,本文到此结束,带大家了解了《选择 Go 中 PCollection 中的前 N 行的 Apache Beam》,希望本文对你有所帮助!关注golang学习网公众号,给大家分享更多Golang知识!

声明:本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>