在系统启动时加载文件数据,处理新文件并更新映射状态
来源:stackoverflow
时间:2024-02-20 13:12:25 422浏览 收藏
编程并不是一个机械性的工作,而是需要有思考,有创新的工作,语法是固定的,但解决问题的思路则是依靠人的思维,这就需要我们坚持学习和更新自己的知识。今天golang学习网就整理分享《在系统启动时加载文件数据,处理新文件并更新映射状态》,文章讲解的知识点主要包括,如果你对Golang方面的知识点感兴趣,就不要错过golang学习网,在这可以对大家的知识积累有所帮助,助力开发能力的提升。
我正在开发一个项目,在启动过程中,我需要读取某些文件并将其存储在地图的内存中,然后定期查找新文件(如果有),然后替换之前在地图中内存中的所有文件使用这个新数据启动。基本上每次如果有一个 full state 的新文件,那么我想将内存中的映射对象刷新到这个新文件,而不是附加到它。
下面的方法 loadatstartupandprocessnewchanges 在服务器启动期间被调用,该方法读取文件并将数据存储在内存中。它还启动一个 go 例程 detectnewfiles ,定期检查是否有任何新文件并将其存储在 deltachan 通道上,该通道稍后由另一个 go 例程 processnewfiles 访问以再次读取该新文件并将数据存储在同一个映射中。如果有任何错误,我们会将其存储在 err 通道上。 loadfiles 是读取内存中的文件并将其存储在 map 中的函数。
type customerconfig struct {
deltachan chan string
err chan error
wg sync.waitgroup
data *cmap.concurrentmap
}
// this is called during server startup.
func (r *customerconfig) loadatstartupandprocessnewchanges() error {
path, err := r.getpath("...", "....")
if err != nil {
return err
}
r.wg.add(1)
go r.detectnewfiles(path)
err = r.loadfiles(4, path)
if err != nil {
return err
}
r.wg.add(1)
go r.processnewfiles()
return nil
}
此方法基本上确定是否有任何需要消耗的新文件,如果有,则会将其放入 deltachan 通道上,该通道稍后将由 processnewfiles go-routine 消耗并读取内存中的文件。如果有任何错误,则会将错误添加到错误通道。
func (r *customerconfig) detectnewfiles(rootpath string) {
}
这将读取所有 s3 文件并将其存储在内存中并返回错误。在这种方法中,我清除了地图的先前状态,以便它可以从新文件中获得新的状态。该方法在服务器启动期间调用,并且每当我们需要处理 processnewfiles go-routine 中的新文件时也会调用该方法。
func (r *customerconfig) loadfiles(workers int, path string) error {
var err error
...
var files []string
files = .....
// reset the map so that it can have fresh state from new files.
r.data.clear()
g, ctx := errgroup.withcontext(context.background())
sem := make(chan struct{}, workers)
for _, file := range files {
select {
case <-ctx.done():
break
case sem <- struct{}{}:
}
file := file
g.go(func() error {
defer func() { <-sem }()
return r.read(spn, file, bucket)
})
}
if err := g.wait(); err != nil {
return err
}
return nil
}
此方法读取文件并添加到 data 并发映射中。
func (r *customerconfig) read(file string, bucket string) error {
// read file and store it in "data" concurrent map
// and if there is any error then return the error
var err error
fr, err := pars3.news3filereader(context.background(), bucket, file, r.s3client.getsession().config)
if err != nil {
return errs.wrap(err)
}
defer xio.closeignoringerrors(fr)
pr, err := reader.newparquetreader(fr, nil, 8)
if err != nil {
return errs.wrap(err)
}
if pr.getnumrows() == 0 {
spn.infof("skipping %s due to 0 rows", file)
return nil
}
for {
rows, err := pr.readbynumber(r.cfg.rowstoread)
if err != nil {
return errs.wrap(err)
}
if len(rows) <= 0 {
break
}
byteslice, err := json.marshal(rows)
if err != nil {
return errs.wrap(err)
}
var invmods []compmodel
err = json.unmarshal(byteslice, &invmods)
if err != nil {
return errs.wrap(err)
}
for i := range invmods {
key := strconv.formatint(invmods[i].productid, 10) + ":" + strconv.itoa(int(invmods[i].iaz))
hasinventory := false
if invmods[i].available > 0 {
hasinventory = true
}
r.data.set(key, hasinventory)
}
}
return nil
}
此方法将选择 delta 通道 上的内容,如果有任何新文件,则它将通过调用 loadfiles 方法开始读取该新文件。如果有任何错误,则会将错误添加到错误通道。
// processnewfiles - load new files found by detectnewfiles
func (r *customerconfig) processnewfiles() {
// find new files on delta channel
// and call "loadfiles" method to read it
// if there is any error, then it will add it to the error channel.
}
如果 error 通道 上有任何错误,那么它将通过以下方法记录这些错误 -
func (r *customerConfig) handleError() {
// read error from error channel if there is any
// then log it
}
问题陈述
上述逻辑对我来说没有任何问题,但我的代码中有一个小错误,我无法弄清楚如何解决它。正如您所看到的,我有一个并发映射,我在 read 方法中填充该映射,并在 loadfiles 方法中清除整个映射。因为每当 delta 通道上有新文件时,我都不想在地图中保留以前的状态,所以这就是为什么我要从地图中删除所有内容,然后从新文件中添加新状态。
现在,如果 read 方法中有任何错误,那么错误就会发生,因为我已经清除了 data 地图中的所有数据,该地图将具有空地图,这不是我想要的。基本上,如果有任何错误,那么我想保留 data 地图中的先前状态。我如何在上述当前设计中解决这个问题。
注意:我使用的是golang 并发map
正确答案
为 collecteddata 添加了 rwmutex 通过工作协程进行并发写保护
type customerconfig struct {
...
m sync.rwmutex
}
不要在 read 方法中更新 read 方法,而是让 read 方法返回数据和错误
func (r *customerconfig) read(file string, bucket string) ([]compmodel, error) {
// read file data and return with error if any
var err error
fr, err := pars3.news3filereader(context.background(), bucket, file, r.s3client.getsession().config)
if err != nil {
return (nil, errs.wrap(err))
}
defer xio.closeignoringerrors(fr)
pr, err := reader.newparquetreader(fr, nil, 8)
if err != nil {
return (nil, errs.wrap(err))
}
if pr.getnumrows() == 0 {
spn.infof("skipping %s due to 0 rows", file)
return (nil, errors.new("no data"))
}
var invmods = []compmodel{}
for {
rows, err := pr.readbynumber(r.cfg.rowstoread)
if err != nil {
return (nil, errs.wrap(err))
}
if len(rows) <= 0 {
break
}
byteslice, err := json.marshal(rows)
if err != nil {
return (nil, errs.wrap(err))
}
var jsondata []compmodel
err = json.unmarshal(byteslice, &jsondata)
if err != nil {
return (nil, errs.wrap(err))
}
invmods = append(invmods, jsondata...)
}
return invmods, nil
}
然后loadfiles就可以收集read返回的数据
方法,如果没有错误,则清除并更新地图,否则
保留旧数据原样
func (r *customerconfig) loadfiles(workers int, path string) error {
var err error
...
var files []string
files = .....
// reset the map so that it can have fresh state from new files.
// r.data.clear() <- remove the clear from here
g, ctx := errgroup.withcontext(context.background())
sem := make(chan struct{}, workers)
collecteddata := []compmodel{}
for _, file := range files {
select {
case <-ctx.done():
break
case sem <- struct{}{}:
}
file := file
g.go(func() error {
defer func() { <-sem }()
data, err:= r.read(spn, file, bucket)
if err != nil {
return err
}
r.m.lock()
append(collecteddata, data...)
r.m.unlock()
return nil
})
}
if err := g.wait(); err != nil {
return err
}
r.data.clear()
for i := range collecteddata {
key := strconv.formatint(collecteddata[i].productid, 10) + ":" + strconv.itoa(int(collecteddata[i].iaz))
hasinventory := false
if collecteddata[i].available > 0 {
hasinventory = true
}
r.data.set(key, hasinventory)
}
return nil
}
注意:由于代码不可运行,仅更新了参考方法,并且我没有包含用于更新您可能需要处理这种情况的切片的互斥锁。
仅用 3 个函数即可实现相同的功能 - 检测、读取、加载,检测将按时间间隔检查新文件,如果发现有则推送到增量通道,加载将从增量通道获取文件路径并调用读取方法获取数据和错误,然后检查是否没有错误,然后清除地图并使用新内容更新,否则记录错误,因此您将有 2 个 go 例程和 1 个由加载例程调用的函数
package main
import (
"fmt"
"time"
"os"
"os/signal"
"math/rand"
)
func main() {
fmt.println(">>>", center("started", 30), "<<<")
c := &config{
initialpath: "old path",
detectinterval: 3000,
}
c.start()
fmt.println(">>>", center("ended", 30), "<<<")
}
// https://stackoverflow.com/questions/41133006/how-to-fmt-printprint-this-on-the-center
func center(s string, w int) string {
return fmt.sprintf("%[1]*s", -w, fmt.sprintf("%[1]*s", (w + len(s))/2, s))
}
type config struct {
deltach chan string
ticker *time.ticker
stopsignal chan os.signal
initialpath string
detectinterval time.duration
}
func (c *config) start() {
c.stopsignal = make(chan os.signal, 1)
signal.notify(c.stopsignal, os.interrupt)
c.ticker = time.newticker(c.detectinterval * time.millisecond)
c.deltach = make(chan string, 1)
go c.detect()
go c.load()
if c.initialpath != "" {
c.deltach <- c.initialpath
}
<- c.stopsignal
c.ticker.stop()
}
// detect new files
func (c *config) detect() {
for {
select {
case <- c.stopsignal:
return
case <- c.ticker.c:
fmt.println(">>>", center("detect", 30), "<<<")
c.deltach <- fmt.sprintf("path %f", rand.float64() * 1.5)
}
}
}
// read files
func read(path string) (map[string]int, error) {
data := make(map[string]int)
data[path] = 0
fmt.println(">>>", center("read", 30), "<<<")
fmt.println(path)
return data, nil
}
// load files
func (c *config) load() {
for {
select {
case <- c.stopsignal:
return
case path := <- c.deltach:
fmt.println(">>>", center("load", 30), "<<<")
data, err := read(path)
if err != nil {
fmt.println("log error")
} else {
fmt.println("success", data)
}
fmt.println()
}
}
}
注意:示例代码中未包含地图,它可以轻松更新以包含地图
我认为您的设计过于复杂。它可以更简单地解决,从而提供您想要的所有好处:
- 并发访问安全
- 重新加载检测到的更改
- 访问配置将为您提供最新的、成功加载的配置
- 即使由于检测到的更改而加载新配置需要很长时间,最新的配置也始终可以立即访问
- 如果加载新配置失败,则保留之前的“快照”并保持当前状态
- 作为奖励,它更简单,甚至不使用第 3 方库
让我们看看如何实现这一目标:
有一个 customerconfig 结构来保存您想要缓存的所有内容(这是“快照”):
type customerconfig struct {
data map[string]bool
// add other props if you need:
loadedat time.time
}
提供一个函数来加载您想要缓存的配置。注意:该函数是无状态的,它不会访问/操作包级变量:
func loadconfig() (*customerconfig, error) {
cfg := &customerconfig{
data: map[string]bool{},
loadedat: time.now(),
}
// logic to load files, and populate cfg.data
// if an error occurs, return it
// if loading succeeds, return the config
return cfg, nil
}
现在让我们创建我们的“缓存管理器”。缓存管理器存储实际/当前配置(快照),并提供对其的访问。为了安全的并发访问(和更新),我们使用sync.RWMutex。还有停止管理器的方法(停止并发刷新):
type configcache struct {
configmu sync.rwmutex
config *customerconfig
closech chan struct{}
}
创建缓存会加载初始配置。还启动一个 goroutine 负责定期检查更改。
func newconfigcache() (*configcache, error) {
cfg, err := loadconfig()
if err != nil {
return nil, fmt.errorf("loading initial config failed: %w", err)
}
cc := &configcache{
config: cfg,
closech: make(chan struct{}),
}
// launch goroutine to periodically check for changes, and load new configs
go cc.refresher()
return cc, nil
}
refresher() 定期检查更改,如果检测到更改,则调用 loadconfig() 加载要缓存的新数据,并将其存储为当前/实际配置(同时锁定 configmu)。它还监视 closech 以在收到请求时停止:
func (cc *configcache) refresher() {
ticker := time.newticker(1 * time.minute) // every minute
defer ticker.stop()
for {
select {
case <-ticker.c:
// check if there are changes
changes := false // logic to detect changes
if !changes {
continue // no changes, continue
}
// changes! load new config:
cfg, err := loadconfig()
if err != nil {
log.printf("failed to load config: %v", err)
continue // keep the previous config
}
// apply / store new config
cc.configmu.lock()
cc.config = cfg
cc.configmu.unlock()
case <-cc.closech:
return
}
}
}
关闭缓存管理器(刷新 goroutine)非常简单:
func (cc *configcache) stop() {
close(cc.closech)
}
最后一个缺失的部分是如何访问当前配置。这是一个简单的 getconfig() 方法(也使用 configmu,但处于只读模式):
func (cc *configcache) getconfig() *customerconfig {
cc.configmu.rlock()
defer cc.configmu.runlock()
return cc.config
}
这是您如何使用它:
cc, err := NewConfigCache()
if err != nil {
// Decide what to do: retry, terminate etc.
}
// Where ever, whenever you need the actual (most recent) config in your app:
cfg := cc.GetConfig()
// Use cfg
在关闭应用程序(或者想要停止刷新)之前,您可以调用 cc.stop()。
本篇关于《在系统启动时加载文件数据,处理新文件并更新映射状态》的介绍就到此结束啦,但是学无止境,想要了解学习更多关于Golang的相关知识,请关注golang学习网公众号!
-
502 收藏
-
502 收藏
-
501 收藏
-
501 收藏
-
501 收藏
-
139 收藏
-
204 收藏
-
325 收藏
-
478 收藏
-
486 收藏
-
439 收藏
-
357 收藏
-
352 收藏
-
101 收藏
-
440 收藏
-
212 收藏
-
143 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 立即学习 543次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 立即学习 516次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 立即学习 500次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 立即学习 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 立即学习 485次学习