登录
首页 >  Golang >  Go问答

使用 Apache Beam ParDo 进行 Go 中的数据过滤

来源:stackoverflow

时间:2024-03-01 11:06:26 151浏览 收藏

从现在开始,我们要努力学习啦!今天我给大家带来《使用 Apache Beam ParDo 进行 Go 中的数据过滤》,感兴趣的朋友请继续看下去吧!下文中的内容我们主要会涉及到等等知识点,如果在阅读本文过程中有遇到不清楚的地方,欢迎留言呀!我们一起讨论,一起学习!

问题内容

我是一名 python 开发人员,但应该使用 go 制作数据流管道。 与 python 或 java 相比,我找不到那么多使用 go 的 apache beam 示例。

我有以下代码,其中具有用户名和年龄的结构。任务是增加年龄,然后根据年龄进行过滤。我找到了增加年龄的方法,但卡在过滤部分。

package main

import (
    "context"
    "flag"
    "fmt"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func init() {
    beam.registerfunction(incrementage)
}

type user struct {
    name string
    age  int
}

func printrow(ctx context.context, list user) {
    fmt.println(list)
}

func incrementage(list user) user {
    list.age++
    return list
}

func main() {

    flag.parse()
    beam.init()

    ctx := context.background()

    p := beam.newpipeline()
    s := p.root()

    var userlist = []user{
        {"bob", 40},
        {"adam", 50},
        {"john", 35},
        {"ben", 8},
    }
    initial := beam.createlist(s, userlist)

    pc := beam.pardo(s, incrementage, initial)

    pc1 := beam.pardo(s, func(row user, emit func(user)) {
        emit(row)
    }, pc)

    beam.pardo0(s, printrow, pc1)

    if err := beamx.run(ctx, p); err != nil {
        log.exitf(ctx, "failed to execute job: %v", err)
    }

}

我尝试创建一个如下所示的函数,但这返回一个布尔值而不是用户对象。我知道我错过了一些简单但无法弄清楚的事情。

func filterage(list user) user {
    return list.age > 40    
}

在 python 中,我可以编写如下所示的函数。

beam.Filter(lambda line: line["Age"] >= 40))

正确答案


您需要在函数中添加一个发射器来发射用户:

func filterAge(list user, emit func(user)) {
    if list.Age > 40 {
        emit(list)
    }
}

正如您当前代码中所写, 返回 list.age > 40 list.age > 40 首先评估为 true(布尔值),并且返回该布尔值。

今天关于《使用 Apache Beam ParDo 进行 Go 中的数据过滤》的内容介绍就到此结束,如果有什么疑问或者建议,可以在golang学习网公众号下多多回复交流;文中若有不正之处,也希望回复留言以告知!

声明:本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>