登录
首页 >  Golang >  Go问答

使用Go elasticsearch将批量数据插入到传入的pulsar中

来源:stackoverflow

时间:2024-02-14 15:15:26 492浏览 收藏

从现在开始,我们要努力学习啦!今天我给大家带来《使用Go elasticsearch将批量数据插入到传入的pulsar中》,感兴趣的朋友请继续看下去吧!下文中的内容我们主要会涉及到等等知识点,如果在阅读本文过程中有遇到不清楚的地方,欢迎留言呀!我们一起讨论,一起学习!

问题内容

我必须使用 goelastic 库从即将到来的脉冲星中大量插入数据。但我有一个问题。

首先,pulsar 每个部分批量发送 1000 个数据。然后当我插入松紧带时,有时会出现问题。这个问题附后。此问题会导致数据丢失。感谢您的回答... error: circuit_writing_exception: [parent] 数据太大,[indices:data/write/bulk[s]] 的数据将为 [524374312/500mb],大于 [510027366/486.3mb] 的限制,实际用法: [524323448/500mb],保留的新字节:[50864/49.6kb],用法[request = 0/0b,fielddata = 160771183/153.3mb,in_flight_requests = 50864/49.6kb,model_inference = 0/0b,eql_sequence = 0/0b , 会计=6898128/6.5mb]

这部分是批量代码。

func InsertElastic(y []models.CP, ElasticStruct *config.ElasticStruct) {
    fmt.Println("------------------")
    bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
        Index:      enum.IndexName,
        Client:     ElasticStruct.Client,
        FlushBytes: 10e+6,
    })
    if err != nil {
        panic(err)
    }
    start := time.Now().UTC()

    for _, x := range y {
        data, err := json.Marshal(x)
        if err != nil {
            panic(err)
        }
        err = bi.Add(
            context.Background(),
            esutil.BulkIndexerItem{
                Action: "index",

                Body: bytes.NewReader(data),

                OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
                    i++
                },

                OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
                    if err != nil {
                        log.Printf("ERROR: %s", err)
                    } else {
                        log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
                    }
                },
            },
        )
        if err != nil {
            log.Fatalf("Unexpected error: %s", err)
        }
        x++

    }
    if err := bi.Close(context.Background()); err != nil {
        log.Fatalf("Unexpected error: %s", err)
    }
    dur := time.Since(start)
    fmt.Println(dur)
    fmt.Println("Success writing data to elastic : ", i)
    fmt.Println("Success incoming data from pulsar : ", x)
    fmt.Println("Difference : ", x-i)
    fmt.Println("Now : ", time.Now().UTC().String())
    if i < x {
        fmt.Println("FATAL")
    }
    fmt.Println("------------------")

}

正确答案


Tldr;

您的节点上似乎没有足够的 JVM 堆。

您正在输入 circuit breaker 以避免 Elasticsearch 内存不足 (OOM)。

解决方案

  • 增加 JVM 内存,您将找到 here 一些用于调整节点大小的文档。
  • 较小的批量请求

以上就是本文的全部内容了,是否有顺利帮助你解决问题?若是能给你带来学习上的帮助,请大家多多支持golang学习网!更多关于Golang的相关知识,也可关注golang学习网公众号。

声明:本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>