经作者授权转发 原文链接:https://www.jianshu.com/p/c0db1567bfd9

Golang socket websocket

理论知识可以参考
网络信息怎么在网线中传播的 (转载自知乎)
Android 网络(一) 概念 TCP/IP Socket Http Restful
脑残式网络编程入门(一):跟着动画来学TCP三次握手和四次挥手
脑残式网络编程入门(二):我们在读写Socket时,究竟在读写什么?
TCP 粘包问题浅析及其解决方案,这个帖子里大家一顿喷粘包这个叫法
我工作五年的时候也不知道 “TCP 粘包”,继续吐槽

一、API
1.服务端通过Listen加Accept
    package main
    
    import (
        "fmt"
        "net"
        "os"
        "time"
    )
    
    func main() {
        //通过 ResolveTCPAddr 获取一个 TCPAddr
        //ResolveTCPAddr(net, addr string) (*TCPAddr, os.Error)
        
        //net参数是"tcp4"、"tcp6"、"tcp"中的任意一个,
        //分别表示 TCP(IPv4-only),TCP(IPv6-only)
        //或者 TCP(IPv4,IPv6 的任意一个)
        
        //addr 表示域名或者IP地址,
        //例如"www.google.com:80" 或者"127.0.0.1:22".
        service := ":7777"
        tcpAddr, err := net.ResolveTCPAddr("tcp4", service)
        checkError(err)
        
        //ListenTCP(net string, laddr *TCPAddr) (l *TCPListener, err os.Error)
        listener, err := net.ListenTCP("tcp", tcpAddr)
        checkError(err)
        
        //func (l *TCPListener) Accept() (c Conn, err os.Error)
        for {
            conn, err := listener.Accept()
                if err != nil {
                continue
            }
            
            daytime := time.Now().String()
            // don't care about return value
            conn.Write([]byte(daytime)) 
            
            // we're finished with this client
            conn.Close() 
        }
    }
    
    func checkError(err error) {
        if err != nil {
            fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
            os.Exit(1)
        }
    }
    

上面的服务跑起来之后,它将会一直在那里等待,直到有新的客户端请求到达。当有新的客户端请求到达并同意接受 Accept 该请求的时候他会反馈当前的时间信息。值得注意的是,在代码中 for 循环里,当有错误发生时,直接 continue而不是退出,是因为在服务器端跑代码的时候,当有错误发生的情况下最好是由服务端记录错误,然后当前连接的客户端直接报错而退出,从而不会影响到当前服务端运行的整个服务。

上面的代码有个缺点,执行的时候是单任务的,不能同时接收多个请求,那么该如何改造以使它支持多并发呢?

    ...
    for {
        conn, err := listener.Accept()
        if err != nil {
            continue
        }
        go handlerClient(conn)
    }
    ...
    
    func handleClient(conn net.Conn) {
        defer conn.Close()
        daytime := time.Now().String()
        // don't care about return value
        conn.Write([]byte(daytime)) 
        
        // we're finished with this client
    }
    ...
    
2.客户端直接调用 Dial
    package main
        import (
            "fmt"
            "io/ioutil"
            "net"
            "os"
        )
        
        func main() {
            if len(os.Args) != 2 {
                fmt.Fprintf(os.Stderr, "Usage: %s host:port ", os.Args[0])
                os.Exit(1)
            }
        
            service := os.Args[1]
            tcpAddr, err := net.ResolveTCPAddr("tcp4", service)
            checkError(err)
            
            conn, err := net.DialTCP("tcp", nil, tcpAddr)
            checkError(err)
            
            _, err = conn.Write([]byte("HEAD / HTTP/1.0\r\n\r\n"))
            checkError(err)
            
            result, err := ioutil.ReadAll(conn)
            checkError(err)
            
            fmt.Println(string(result))
            os.Exit(0)
        }
        
        func checkError(err error) {
            if err != nil {
            fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
            os.Exit(1)
        }
    }
    

首先程序将用户的输入作为参数 service 传入net.ResolveTCPAddr 获取一个 tcpAddr,然后把 tcpAddr 传入 DialTCP 后创建了一个 TCP连接 conn ,通过 conn 来发送请求信息,最后通过 ioutil.ReadAll 从 conn 中读取全部的文本,也就是服务端响应反馈的信息。

二、实现一个可以接受不同命令的服务端

参考使用 Go 进行 Socket 编程
我们实现一个服务端, 它可以接受下面这些命令:

  • ping 探活的命令, 服务端会返回 “pong”
  • echo 服务端会返回收到的字符串
  • quit 服务端收到这个命令后就会关闭连接

具体的服务端代码如下所示:

    package main
    
    import (
        "fmt"
        "net"
        "strings"
    )
    
    func connHandler(c net.Conn) {
        if c == nil {
            return
        }
    
        buf := make([]byte, 4096)
    
        for {
            cnt, err := c.Read(buf)
            if err != nil || cnt == 0 {
                c.Close()
                break
            }
    
            inStr := strings.TrimSpace(string(buf[0:cnt]))
    
            inputs := strings.Split(inStr, " ")
    
            switch inputs[0] {
            case "ping":
                c.Write([]byte("pong\n"))
            case "echo":
                echoStr := strings.Join(inputs[1:], " ") + "\n"
                c.Write([]byte(echoStr))
            case "quit":
                c.Close()
                break
            default:
                fmt.Printf("Unsupported command: %s\n", inputs[0])
            }
        }
    
        fmt.Printf("Connection from %v closed. \n", c.RemoteAddr())
    }
    
    func main() {
        server, err := net.Listen("tcp", ":1208")
        if err != nil {
            fmt.Printf("Fail to start server, %s\n", err)
        }
    
        fmt.Println("Server Started ...")
    
        for {
            conn, err := server.Accept()
            if err != nil {
                fmt.Printf("Fail to connect, %s\n", err)
                break
            }
    
            go connHandler(conn)
        }
    }

客户端的实现

    package main
    
    import (
        "bufio"
        "fmt"
        "net"
        "os"
        "strings"
    )
    
    func connHandler(c net.Conn) {
        defer c.Close()
    
        reader := bufio.NewReader(os.Stdin)
        buf := make([]byte, 1024)
    
        for {
            input, _ := reader.ReadString('\n')
            input = strings.TrimSpace(input)
    
            if input == "quit" {
                return
            }
    
            c.Write([]byte(input))
    
            cnt, err := c.Read(buf)
            if err != nil {
                fmt.Printf("Fail to read data, %s\n", err)
                continue
            }
    
            fmt.Print(string(buf[0:cnt]))
        }
    }
    
    func main() {
        conn, err := net.Dial("tcp", "localhost:1208")
        if err != nil {
            fmt.Printf("Fail to connect, %s\n", err)
            return
        }
    
        connHandler(conn)
    }
    
三、解决golang开发socket服务时粘包半包bug

基础知识可以参考tcp是流的一些思考–拆包和粘包
tcp中有一个negal算法,用途是这样的:通信两端有很多小的数据包要发送,虽然传送的数据很少,但是流程一点没少,也需要tcp的各种确认,校验。这样小的数据包如果很多,会造成网络资源很大的浪费,negal算法做了这样一件事,当来了一个很小的数据包,我不急于发送这个包,而是等来了更多的包,将这些小包组合成大包之后一并发送,不就提高了网络传输的效率的嘛。这个想法收到了很好的效果,但是我们想一下,如果是分属于两个不同页面的包,被合并在了一起,那客户那边如何区分它们呢?
这就是粘包问题。从粘包问题我们更可以看出为什么tcp被称为流协议,因为它就跟水流一样,是没有边界的,没有消息的边界保护机制,所以tcp只有流的概念,没有包的概念。

解决tcp粘包的方法:
客户端会定义一个标示,比如数据的前4位是数据的长度,后面才是数据。那么客户端只需发送 ( 数据长度+数据 ) 的格式数据就可以了,接收方根据包头信息里的数据长度读取buffer.
客户端:

    //客户端发送封包
    package main
    
    import (
        "fmt"
        "math/rand"
        "net"
        "os"
        "strconv"
        "strings"
        "time"
    )
    
    func main() {
    
        server := "127.0.0.1:5000"
        tcpAddr, err := net.ResolveTCPAddr("tcp4", server)
        if err != nil {
            fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
            os.Exit(1)
        }
    
        conn, err := net.DialTCP("tcp", nil, tcpAddr)
        if err != nil {
            fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
            os.Exit(1)
        }
    
        defer conn.Close()
    
        for i := 0; i < 50; i++ {
            //msg := strconv.Itoa(i)
            msg := RandString(i)
            msgLen := fmt.Sprintf("%03s", strconv.Itoa(len(msg)))
            //fmt.Println(msg, msgLen)
            words := "aaaa" + msgLen + msg
            //words := append([]byte("aaaa"), []byte(msgLen), []byte(msg))
            fmt.Println(len(words), words)
            conn.Write([]byte(words))
        }
    }
    
    /**
    *生成随机字符
    **/
    func RandString(length int) string {
        rand.Seed(time.Now().UnixNano())
        rs := make([]string, length)
        for start := 0; start < length; start++ {
            t := rand.Intn(3)
            if t == 0 {
                rs = append(rs, strconv.Itoa(rand.Intn(10)))
            } else if t == 1 {
                rs = append(rs, string(rand.Intn(26)+65))
            } else {
                rs = append(rs, string(rand.Intn(26)+97))
            }
        }
        return strings.Join(rs, "")
    }
    

服务端实例代码:

    package main
    
    import (
        "fmt"
        "io"
        "net"
        "os"
        "strconv"
    )
    
    func main() {
        netListen, err := net.Listen("tcp", ":5000")
        CheckError(err)
    
        defer netListen.Close()
    
        for {
            conn, err := netListen.Accept()
            if err != nil {
                continue
            }
    
            go handleConnection(conn)
        }
    }
    
    func handleConnection(conn net.Conn) {
        allbuf := make([]byte, 0)
        buffer := make([]byte, 1024)
        for {
            readLen, err := conn.Read(buffer)
            //fmt.Println("readLen: ", readLen, len(allbuf))
            if err == io.EOF {
                break
            }
            if err != nil {
                fmt.Println("read error")
                return
            }
    
            if len(allbuf) != 0 {
                allbuf = append(allbuf, buffer...)
            } else {
                allbuf = buffer[:]
            }
            var readP int = 0
            for {
                //fmt.Println("allbuf content:", string(allbuf))
    
                //buffer长度小于7
                if readLen-readP < 7 {
                    allbuf = buffer[readP:]
                    break
                }
    
                msgLen, _ := strconv.Atoi(string(allbuf[readP+4 : readP+7]))
                logLen := 7 + msgLen
                //fmt.Println(readP, readP+logLen)
                //buffer剩余长度>将处理的数据长度
                if len(allbuf[readP:]) >= logLen {
                    //fmt.Println(string(allbuf[4:7]))
                    fmt.Println(string(allbuf[readP : readP+logLen]))
                    readP += logLen
                    //fmt.Println(readP, readLen)
                    if readP == readLen {
                        allbuf = nil
                        break
                    }
                } else {
                    allbuf = buffer[readP:]
                    break
                }
            }
        }
    }
    
    func CheckError(err error) {
        if err != nil {
            fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
            os.Exit(1)
        }
    }
    
四、io包的ReadFull

对于第三部分的解决golang开发socket服务时粘包半包bug,有作者认为太复杂了,参见golang tcp拆包的正确姿势,他提出可以用ReadFull来简化。

关于io包基础知识,参考Golang io reader writer
关于ReadFull,可以参考达达的博客系列:
Go语言小贴士1 - io包
Go语言小贴士2 - 协议解析
Go语言小贴士3 - bufio包

原文不再转述,现在引用一下重点:

io.Reader的定义如下:

    type Reader interface {
            Read(p []byte) (n int, err error)
    }

其中文档的说明非常重要,文档中详细描述了Read方法的各种返回可能性。

文档描述中有一个要点,就是n可能小于等于len(p),也就是说Go在读IO的时候,是不会保证一次读取预期的所有数据的。如果我们要确保一次读取我们所需的所有数据,就需要在一个循环里调用Read,累加每次返回的n并小心设置下次Readp的偏移量,直到n的累加值达到我们的预期。

因为上述需求实在太常见了,所以Go在io包中提供了一个ReadFull函数来做到一次读取要求的所有数据,通过阅读ReadFull函数的代码,也可以反过来帮助大家理解io.Reader是怎么运作的。

    //io.go源码
    func ReadFull(r Reader, buf []byte) (n int, err error) {
        return ReadAtLeast(r, buf, len(buf))
    }
    
    func ReadAtLeast(r Reader, buf []byte, min int) (n int, err error) {
        if len(buf) < min {
            return 0, ErrShortBuffer
        }
        for n < min && err == nil {
            var nn int
            nn, err = r.Read(buf[n:])
            n += nn
        }
        if n >= min {
            err = nil
        } else if n > 0 && err == EOF {
            err = ErrUnexpectedEOF
        }
        return
    }

在很多应用场景中,消息包的长度是不固定的,就像上面的字符串字段一样。我们一样可以用开头固定的几个字节来存放消息长度,在解析通讯协议的时候就可以从字节流中截出一个个的消息包了,这样的操作通常叫做协议分包或者粘包处理。
贴个从Socket读取消息包的伪代码(没编译):

    func ReadPacket(conn net.Conn) ([]byte, error) {
            var head [2]byte
    
            if _, err := io.ReadFull(conn, head[:]); err != nil {
                    return err
            }
    
            size := binary.BigEndian.Uint16(head)
            packet := make([]byte, size)
    
            if _, err := io.ReadFull(conn, packet); err != nil {
                    return err
            }
    
            return packet
    }

上面的代码就用到了前一个小贴士中说到的io.ReadFull来确保一次读取完整数据。

要注意,这段代码不是线程安全的,如果有两个线程同时对一个net.Conn进行ReadPacket操作,很可能会发生严重错误,具体逻辑请自行分析。

从上面结构体序列化和反序列化的代码中,大家不难看出,实现一个二进制协议是挺繁琐和容易出BUG的,只要稍微有一个数值计算错就解析出错了。所以在工程实践中,不推荐大家手写二进制协议的解析代码,项目中通常会用自动化的工具来辅助生成代码。

Leaf 游戏服务器框架简介的tcp_msg.go中,Read方法也使用了ReadFull这种方式来处理。

五、WebSocket

参考封装golang websocket
websocket是个二进制协议,需要先通过Http协议进行握手,从而协商完成从Http协议向websocket协议的转换。一旦握手结束,当前的TCP连接后续将采用二进制websocket协议进行双向双工交互,自此与Http协议无关。

可以通过这篇知乎了解一下websocket协议的基本原理:《WebSocket 是什么原理?为什么可以实现持久连接?》

1.粘包

我们开发过TCP服务的都知道,需要通过协议decode从TCP字节流中解析出一个一个请求,那么websocket又怎么样呢?

websocket以message为单位进行通讯,本身就是一个在TCP层上的一个分包协议,其实并不需要我们再进行粘包处理。但是因为单个message可能很大很大(比如一个视频文件),那么websocket显然不适合把一个视频作为一个message传输(中途断了前功尽弃),所以websocket协议其实是支持1个message分多个frame帧传输的。

我们的浏览器提供的编程API都是message粒度的,把frame拆帧的细节对开发者隐蔽了,而服务端websocket框架一般也做了同样的隐藏,会自动帮我们收集所有的frame后拼成messasge再回调,所以结论就是:

websocket以message为单位通讯,不需要开发者自己处理粘包问题。

更多参考Websocket需要像TCP Socket那样进行逻辑数据包的分包与合包吗?

2.golang实现

golang官方标准库里有一个websocket的包,但是它提供的就是frame粒度的API,压根不能用。

不过官方其实已经认可了一个准标准库实现,它实现了message粒度的API,让开发者不需要关心websocket协议细节,开发起来非常方便,其文档地址:https://godoc.org/github.com/gorilla/websocket

开发websocket服务时,首先要基于http库对外暴露接口,然后由websocket库接管TCP连接进行协议升级,然后进行websocket协议的数据交换,所以开发时总是要用到http库和websocket库。

上述websocket文档中对开发websocket服务有明确的注意事项要求,主要是指:

  • 读和写API不是并发安全的,需要启动单个goroutine串行处理。
  • 关闭API是线程安全的,一旦调用则阻塞的读和写API会出错返回,从而终止处理。
六、心跳实现

Golang 心跳的实现
在多客户端同时访问服务器的工作模式下,首先要保证服务器的运行正常。因此,Server和Client建立通讯后,确保连接的及时断开就非常重要。否则,多个客户端长时间占用着连接不关闭,是非常可怕的服务器资源浪费。会使得服务器可服务的客户端数量大幅度减少。因此,针对短链接和长连接,根据业务的需求,配套不同的处理机制。

  • 短连接:一般建立完连接,就立刻传输数据。传输完数据,连接就关闭。服务端根据需要,设定连接的时长。超过时间长度,就算客户端超时。立刻关闭连接。
  • 长连接:建立连接后,传输数据,然后要保持连接,然后再次传输数据。直到连接关闭。

socket读写可以通过 SetDeadline、SetReadDeadline、SetWriteDeadline设置阻塞的时间。

    func (*IPConn) SetDeadline  
    func (c *IPConn) SetDeadline(t time.Time) error  
    
    func (*IPConn) SetReadDeadline  
    func (c *IPConn) SetReadDeadline(t time.Time) error  
    
    func (*IPConn) SetWriteDeadline 
    func (c *IPConn) SetWriteDeadline(t time.Time) error

如果做短连接,直接在Server端的连接上设置SetReadDeadline。当你设置的时限到达,无论客户端是否还在继续传递消息,服务端都不会再接收。并且已经关闭连接。

    func main() {
        server := ":7373"
        netListen, err := net.Listen("tcp", server)
        if err != nil{
            Log("connect error: ", err)
            os.Exit(1)
        }
        Log("Waiting for Client ...")
        for{
            conn, err := netListen.Accept()
            if err != nil{
                Log(conn.RemoteAddr().String(), "Fatal error: ", err)
                continue
            }
    
            //设置短连接(10秒)
            conn.SetReadDeadline(time.Now().Add(time.Duration(10)*time.Second))
    
            Log(conn.RemoteAddr().String(), "connect success!")
            ...
        }
    }
    

这就可以了。在这段代码中,每当10秒中的时限一道,连接就终止了。

根据业务需要,客户端可能需要长时间保持连接。但是服务端不能无限制的保持。这就需要一个机制,如果超过某个时间长度,服务端没有获得客户端的数据,就判定客户端已经不需要连接了(比如客户端挂掉了)。做到这个,需要一个心跳机制。在限定的时间内,客户端给服务端发送一个指定的消息,以便服务端知道客户端还活着。

    func sender(conn *net.TCPConn) {
        for i := 0; i < 10; i++{
            words := strconv.Itoa(i)+" Hello I'm MyHeartbeat Client."
            msg, err := conn.Write([]byte(words))
            if err != nil {
                Log(conn.RemoteAddr().String(), "Fatal error: ", err)
                os.Exit(1)
            }
            Log("服务端接收了", msg)
            time.Sleep(2 * time.Second)
        }
        for i := 0; i < 2 ; i++ {
            time.Sleep(12 * time.Second)
        }
        for i := 0; i < 10; i++{
            words := strconv.Itoa(i)+" Hi I'm MyHeartbeat Client."
            msg, err := conn.Write([]byte(words))
            if err != nil {
                Log(conn.RemoteAddr().String(), "Fatal error: ", err)
                os.Exit(1)
            }
            Log("服务端接收了", msg)
            time.Sleep(2 * time.Second)
        }
    
    }

这段客户端代码,实现了两个相同的信息发送频率给服务端。两个频率中间,我们让运行休息了12秒。然后,我们在服务端的对应机制是这样的。

    func HeartBeating(conn net.Conn, bytes chan byte, timeout int) {
        select {
        case fk := <- bytes:
            Log(conn.RemoteAddr().String(), "心跳:第", string(fk), "times")
            conn.SetDeadline(time.Now().Add(time.Duration(timeout) * time.Second))
            break
    
            case <- time.After(5 * time.Second):
                Log("conn dead now")
                conn.Close()
        }
    }
    

每次接收到心跳数据就 SetDeadline 延长一个时间段 timeout。如果没有接到心跳数据,5秒后连接关闭。