登录
首页 >  Golang >  Go问答

Goroutine 具有高效的任务调度

来源:stackoverflow

时间:2024-02-14 15:48:24 432浏览 收藏

从现在开始,努力学习吧!本文《Goroutine 具有高效的任务调度》主要讲解了等等相关知识点,我会在golang学习网中持续更新相关的系列文章,欢迎大家关注并积极留言建议。下面就先一起来看一下本篇正文内容吧,希望能帮到你!

问题内容

我正在使用 golang 使用 goroutine 同时运行两个 websocket 客户端(一个用于私有数据,一个用于公共数据)。从表面上看,一切似乎都很顺利。两个客户端都接收从 websocket 服务器传输的数据。然而,我相信我可能设置错误,因为当我检查活动监视器时,我的程序始终有 500 - 1500 次空闲唤醒,并且使用了 >200% 的 cpu。对于像两个 websocket 客户端这样简单的事情来说,这似乎并不正常。

我已将代码放在片段中,因此需要阅读的内容较少(希望这使其更易于理解),但如果您需要完整的代码,我也可以发布。这是我的 main func 中运行 ws 客户端的代码

comms := make(chan os.signal, 1)
signal.notify(comms, os.interrupt, syscall.sigterm)
ctx := context.background()
ctx, cancel := context.withcancel(ctx)
var wg sync.waitgroup

wg.add(1)
go pubsocket.publisten(ctx, &wg, &activesubs, testing)
wg.add(1)
go privsocket.privlisten(ctx, &wg, &activesubs, testing)

<- comms
cancel()
wg.wait()

这是客户端如何运行 go 例程的代码

func (socket *Socket) PubListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
    defer wg.Done()
    for {
        select {
        case <- ctx.Done():
            log.Println("closing public socket")
            socket.Close()
            return
        default:
            socket.OnTextMessage = func(message string, socket Socket) {
                log.Println(message)
                pubJsonDecoder(message, testing)
                //tradesParser(message);
            }
        }
    }
}

func (socket *Socket) PrivListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
    defer wg.Done()
    for {
        select {
        case <- ctx.Done():
            log.Println("closing private socket")
            socket.Close()
            return
        default:
            socket.OnTextMessage = func(message string, socket Socket) {
                log.Println(message)
            }
        }
    }
}

关于为什么空闲唤醒如此高有什么想法吗?我应该使用多线程而不是并发吗?预先感谢您的帮助!


正确答案


你在这里浪费了cpu(多余的循环):

for {
       // ...
        default:
        // high cpu usage here.
        }
    }

尝试这样的事情:

func (socket *socket) publisten(ctx context.context, wg *sync.waitgroup, submanager *connstatus, testing bool) {
    defer wg.done()
    defer socket.close()

    socket.ontextmessage = func(message string, socket socket) {
        log.println(message)
        pubjsondecoder(message, testing)
        //tradesparser(message);
    }

    <-ctx.done()
    log.println("closing public socket")
}

func (socket *socket) privlisten(ctx context.context, wg *sync.waitgroup, submanager *connstatus, testing bool) {
    defer wg.done()
    defer socket.close()

    socket.ontextmessage = func(message string, socket socket) {
        log.println(message)
    }

    <-ctx.done()
    log.println("closing private socket")
}

这也可能有帮助:
https://github.com/gorilla/websocket/blob/master/examples/chat/client.go

tl/dr:websocket 很难:)

看起来您可能有几个旋转器。您将在 for - select 语句的默认情况下为 ontextmessage() 分配处理程序函数。如果没有其他情况准备就绪,则始终执行默认情况。因为在默认情况下没有任何东西会阻塞,所以 for 循环就会失去控制。像这样旋转的两个 goroutine 可能会固定 2 个核心。 websocket 是网络 io,这些 goroutine 可能会并行运行。这就是您看到 200% 利用率的原因。

看看 gorilla/websocket 库。我不会说它比任何其他 websocket 库更好或更差,我有很多使用它的经验。

https://github.com/gorilla/websocket

下面是我多次使用的实现。 它的设置方式是注册接收到特定消息时触发的处理函数。假设消息中的值之一是“type”:“start-job”,websocket 服务器将调用您分配给“start-job”websocket 消息的处理程序。感觉就像为 http 路由器编写端点。

打包服务器

上下文.go

package serverws

import (
    "errors"
    "fmt"
    "strings"
    "sync"
)

// conncontext is the connection context to track a connected websocket user
type conncontext struct {
    specialkey  string
    supportgzip string
    userid      string
    mu         sync.mutex // websockets are not thread safe, we'll use a mutex to lock writes.
}

// hashkeyasctx returns a conncontext based on the hash provided
func hashkeyasctx(hashkey string) (*conncontext, error) {
    values := strings.split(hashkey, ":")
    if len(values) != 3 {
        return nil, errors.new("invalid key received: " + hashkey)
    }
    return &conncontext{values[0], values[1], values[2], sync.mutex{}}, nil
}

// ashashkey returns the hash key for a given connection context conncontext
func (ctx *conncontext) ashashkey() string {
    return strings.join([]string{ctx.specialkey, ctx.supportgzip, ctx.userid}, ":")
}

// string returns a string of the hash of a given connection context conncontext
func (ctx *conncontext) string() string {
    return fmt.sprint("specialkey: ", ctx.specialkey, " gzip ", ctx.supportgzip, " auth ", ctx.userid)
}

wshandler.go

package serverws

import (
    "encoding/json"
    "errors"
    "fmt"
    "net/http"
    "strings"
    "sync"
    "time"

    "github.com/gorilla/websocket"
    "github.com/rs/zerolog/log"
)

var (
    receivefunctionmap = make(map[string]receiveobjectfunc)
    ctxhashmap         sync.map
)

// receiveobjectfunc is a function signature for a websocket request handler
type receiveobjectfunc func(conn *websocket.conn, ctx *conncontext, t map[string]interface{})

// websockethandler does what it says, handles websockets (makes them easier for us to deal with)
type websockethandler struct {
    wsupgrader websocket.upgrader
}

// websocketmessage that is sent over a websocket.   messages must have a conversation type so the server and the client js know
// what is being discussed and what signals to raise on the server and the client.
// the "notification" message instructs the client to display an alert popup.
type websocketmessage struct {
    messagetype string      `json:"type"`
    message     interface{} `json:"message"`
}

// newwebsockethandler sets up a new websocket.
func newwebsockethandler() *websockethandler {
    wsh := new(websockethandler)
    wsh.wsupgrader = websocket.upgrader{
        readbuffersize:  4096,
        writebuffersize: 4096,
    }
    return wsh

}

// registermessagetype sets up an event bus for a message type.   when messages arrive from the client that match messagetypename,
// the function you wrote to handle that message is then called.
func (wsh *websockethandler) registermessagetype(messagetypename string, f receiveobjectfunc) {
    receivefunctionmap[messagetypename] = f
}

// onmessage triggers when the underlying websocket has received a message.
func (wsh *websockethandler) onmessage(conn *websocket.conn, ctx *conncontext, msg []byte, msgtype int) {
    //  handling text messages or binary messages. binary is usually some gzip text.
    if msgtype == websocket.textmessage {
        wsh.processincomingtextmsg(conn, ctx, msg)
    }
    if msgtype == websocket.binarymessage {

    }
}

// onopen triggers when the underlying websocket has established a connection.
func (wsh *websockethandler) onopen(conn *websocket.conn, r *http.request) (ctx *conncontext, err error) {
    //user, err := gothic.getfromsession("id", r)
    user := "testuser"
    if err := r.parseform(); err != nil {
        return nil, errors.new("parameter check error")
    }

    specialkey := r.formvalue("specialkey")
    supportgzip := r.formvalue("support_gzip")

    if user != "" && err == nil {
        ctx = &conncontext{specialkey, supportgzip, user, sync.mutex{}}
    } else {
        ctx = &conncontext{specialkey, supportgzip, "", sync.mutex{}}
    }

    keystring := ctx.ashashkey()

    if oldconn, ok := ctxhashmap.load(keystring); ok {
        wsh.onclose(oldconn.(*websocket.conn), ctx)
        oldconn.(*websocket.conn).close()
    }
    ctxhashmap.store(keystring, conn)
    return ctx, nil
}

// onclose triggers when the underlying websocket has been closed down
func (wsh *websockethandler) onclose(conn *websocket.conn, ctx *conncontext) {
    //log.info().msg(("client close itself as " + ctx.string()))
    wsh.closeconnwithctx(ctx)
}

// onerror triggers when a websocket connection breaks
func (wsh *websockethandler) onerror(errmsg string) {
    //log.error().msg(errmsg)
}

// handleconn happens when a user connects to us at the listening point.  we ask
// the user to authenticate and then send the required http upgrade return code.
func (wsh *websockethandler) handleconn(w http.responsewriter, r *http.request) {

    user := ""
    if r.url.path == "/websocket" {
        user = "testuser" // authenticate however you want
        if user == "" {
            fmt.println("unauthenticated user tried to connect to websocket from ", r.header.get("x-forwarded-for"))
            return
        }
    }
    // don't do this.  you need to check the origin, but this is here as a place holder
    wsh.wsupgrader.checkorigin = func(r *http.request) bool {
        return true
    }

    conn, err := wsh.wsupgrader.upgrade(w, r, nil)
    if err != nil {
        log.error().msg("failed to set websocket upgrade: " + err.error())
        return
    }
    defer conn.close()

    ctx, err := wsh.onopen(conn, r)
    if err != nil {
        log.error().msg("open connection failed " + err.error() + r.url.rawquery)
        if user != "" {
            ctx.userid = user
        }
        return
    }

    if user != "" {
        ctx.userid = user
    }
    conn.setpinghandler(func(message string) error {
        conn.writecontrol(websocket.pongmessage, []byte(message), time.now().add(time.second))
        return nil
    })

    // message pump for the underlying websocket connection
    for {
        t, msg, err := conn.readmessage()
        if err != nil {
            // read errors are when the user closes the tab. ignore.
            wsh.onclose(conn, ctx)
            return
        }

        switch t {
        case websocket.textmessage, websocket.binarymessage:
            wsh.onmessage(conn, ctx, msg, t)
        case websocket.closemessage:
            wsh.onclose(conn, ctx)
            return
        case websocket.pingmessage:
        case websocket.pongmessage:
        }

    }

}

func (wsh *websockethandler) closeconnwithctx(ctx *conncontext) {
    keystring := ctx.ashashkey()
    ctxhashmap.delete(keystring)
}

func (wsh *websockethandler) processincomingtextmsg(conn *websocket.conn, ctx *conncontext, msg []byte) {
    //log.debug().msg("client said " + string(msg))
    data := websocketmessage{}

    // try to turn this into data
    err := json.unmarshal(msg, &data)

    // and try to get at the data underneath
    var raw = make(map[string]interface{})
    terr := json.unmarshal(msg, &raw)

    if err == nil {
        // what kind of message is this?
        if receivefunctionmap[data.messagetype] != nil {
            // we'll try to cast this message and call the handler for it
            if terr == nil {
                if v, ok := raw["message"].(map[string]interface{}); ok {
                    receivefunctionmap[data.messagetype](conn, ctx, v)
                } else {
                    log.debug().msg("nonsense sent over the websocket.")
                }
            } else {
                log.debug().msg("nonsense sent over the websocket.")
            }
        }
    } else {
        // received garbage from the transmitter.
    }
}

// sendjsontosocket sends a specific message to a specific websocket
func (wsh *websockethandler) sendjsontosocket(socketid string, msg interface{}) {
    fields := strings.split(socketid, ":")
    message, _ := json.marshal(msg)

    ctxhashmap.range(func(key interface{}, value interface{}) bool {
        if ctx, err := hashkeyasctx(key.(string)); err != nil {
            wsh.onerror(err.error())
        } else {
            if ctx.specialkey == fields[0] {
                ctx.mu.lock()
                if value != nil {
                    err = value.(*websocket.conn).writemessage(websocket.textmessage, message)
                }
                ctx.mu.unlock()
            }
            if err != nil {
                ctx.mu.lock() // we'll lock here even though we're going to destroy this
                wsh.onclose(value.(*websocket.conn), ctx)
                value.(*websocket.conn).close()
                ctxhashmap.delete(key) // remove the websocket immediately
                //wsh.onerror("write err to user " + key.(string) + " err: " + err.error())
            }
        }
        return true
    })
}

封装wsocket

类型.go

package wsocket



// acknowledgement is for acking simple messages and sending errors
type acknowledgement struct {
    responseid string `json:"responseid"`
    status     string `json:"status"`
    ipaddress  string `json:"ipaddress"`
    errortext  string `json:"errortext"`
}

wsocket.go

package wsocket

import (
    "fmt"
    server "project/serverws"
    "project/utils"
    "sync"
    "time"

    "github.com/gin-gonic/gin"
    "github.com/gorilla/websocket"
    // "github.com/mitchellh/mapstructure"
    "github.com/inconshreveable/log15"
)
var (
    websocket         *server.websockethandler // so other packages can send out websocket messages
    websocketlocation string
    log               log15.logger = log15.new("package", "wsocket"
)

func setupwebsockets(r *gin.engine, socket *server.websockethandler, debug_mode bool) {

    websocket = socket
    websocketlocation = "example.mydomain.com"
    //websocketlocation = "example.mydomain.com"
    r.get("/websocket", func(c *gin.context) {
        socket.handleconn(c.writer, c.request)

    })

socket.registermessagetype("hello", func(conn *websocket.conn, ctx *server.conncontext, data map[string]interface{}) {

        response := acknowledgement{
            responseid: "hello",
            status:     fmt.sprintf("ok/%v", ctx.authid),
            ipaddress:  conn.remoteaddr().string(),
        }
        // mapstructure.decode(data, &request) -- used if we wanted to read what was fed in
        socket.sendjsontosocket(ctx.ashashkey(), &response)
    })

socket.registermessagetype("start-job", func(conn *websocket.conn, ctx *server.conncontext, data map[string]interface{}) {

        response := acknowledgement{
            responseid: "starting_job",
            status:     fmt.sprintf("%s is being dialed.", data["did"]),
            ipaddress:  conn.remoteaddr().string(),
        }
        // mapstructure.decode(data, &request) -- used if we wanted to read what was fed in to a struct.
        socket.sendjsontosocket(ctx.ashashkey(), &response)

    })

此实现是针对 web 应用程序的。这是 javascript 客户端的简化版本。您可以使用此实现处理许多并发连接,并且您所要做的通信就是定义包含与下面 switch 中的情况匹配的responseid 的对象/结构,它基本上是一个长 switch 语句,将其序列化并将其发送到另一端,对方就会确认。我有一些版本在多个生产环境中运行。

websocket.js

$(() => {

    function wsMessage(object) {
        switch (object.responseId) {
            case "Hello": // HELLO! :-)
                console.log("Heartbeat received, we're connected.");
                break;

            case "Notification":
                if (object.errortext != "") {
                    $.notify({
                        // options
                        message: '
  ' + object.errortext + '
', }, { // settings type: 'danger', offset: 50, placement: { align: 'center', } }); } else { $.notify({ // options message: '
' + object.status + '
', }, { // settings type: 'success', offset: 50, placement: { align: 'center', } }); } break; } } $(document).ready(function () { function heartbeat() { if (!websocket) return; if (websocket.readyState !== 1) return; websocket.send("{\"type\": \"Hello\", \"message\": { \"RequestID\": \"Hello\", \"User\":\"" + /*getCookie("_loginuser")*/"TestUser" + "\"} }"); setTimeout(heartbeat, 24000); } //TODO: CHANGE TO WSS once tls is enabled. function wireUpWebsocket() { websocket = new WebSocket('wss://' + WEBSOCKET_LOCATION + '/websocket?specialKey=' + WEBSOCKET_KEY + '&support_gzip=0'); websocket.onopen = function (event) { console.log("Websocket connected."); heartbeat(); //if it exists if (typeof (wsReady) !== 'undefined') { //execute it wsReady(); } }; websocket.onerror = function (event) { console.log("WEBSOCKET ERROR " + event.data); }; websocket.onmessage = function (event) { wsMessage(JSON.parse(event.data)); }; websocket.onclose = function () { // Don't close! // Replace key console.log("WEBSOCKET CLOSED"); WEBSOCKET_KEY = Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15); websocketreconnects++; if (websocketreconnects > 30) { // Too much, time to bounce // location.reload(); Don't reload the page anymore, just re-connect. } setTimeout(function () { wireUpWebsocket(); }, 3000); }; } wireUpWebsocket(); }); }); function getCookie(name) { var value = "; " + document.cookie; var parts = value.split("; " + name + "="); if (parts.length == 2) return parts.pop().split(";").shift(); } function setCookie(cname, cvalue, exdays) { var d = new Date(); d.setTime(d.getTime() + (exdays * 24 * 60 * 60 * 1000)); var expires = "expires=" + d.toUTCString(); document.cookie = cname + "=" + cvalue + ";" + expires + ";path=/"; }

在无限循环中一遍又一遍地分配处理函数肯定是行不通的。

https://github.com/gorilla/websocket

今天关于《Goroutine 具有高效的任务调度》的内容就介绍到这里了,是不是学起来一目了然!想要了解更多关于的内容请关注golang学习网公众号!

声明:本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>