golang实现带有心跳检测的tcp长连接

package main

// golang实现带有心跳检测的tcp长连接
// server
import (
	"fmt"
	"net"
	"time"
)

// message struct:
// c#d

var (
	Req_REGISTER byte = 1 // 1 --- c register cid
	Res_REGISTER byte = 2 // 2 --- s response

	Req_HEARTBEAT byte = 3 // 3 --- s send heartbeat req
	Res_HEARTBEAT byte = 4 // 4 --- c send heartbeat res

	Req byte = 5 // 5 --- cs send data
	Res byte = 6 // 6 --- cs send ack
)

type CS struct {
	Rch chan []byte
	Wch chan []byte
	Dch chan bool
	u   string
}

func NewCs(uid string) *CS {
	return &CS{Rch: make(chan []byte),Wch: make(chan []byte),u: uid}
}

var CMap map[string]*CS

func main() {
	CMap = make(map[string]*CS)
	listen,err := net.ListenTCP("tcp",&net.TCPAddr{net.ParseIP("127.0.0.1"),6666,""})
	if err != nil {
		fmt.Println("监听端口失败:",err.Error())
		return
	}
	fmt.Println("已初始化连接,等待客户端连接...")
	go PushGRT()
	Server(listen)
	select {}
}

func PushGRT() {
	for {
		time.Sleep(15 * time.Second)
		for k,v := range CMap {
			fmt.Println("push msg to user:" + k)
			v.Wch <- []byte{Req,'#','p','u','s','h','!'}
		}
	}
}

func Server(listen *net.TCPListener) {
	for {
		conn,err := listen.AcceptTCP()
		if err != nil {
			fmt.Println("接受客户端连接异常:",err.Error())
			continue
		}
		fmt.Println("客户端连接来自:",conn.RemoteAddr().String())
		// handler goroutine
		go Handler(conn)
	}
}

func Handler(conn net.Conn) {
	defer conn.Close()
	data := make([]byte,128)
	var uid string
	var C *CS
	for {
		conn.Read(data)
		fmt.Println("客户端发来数据:",string(data))
		if data[0] == Req_REGISTER { // register
			conn.Write([]byte{Res_REGISTER,'o','k'})
			uid = string(data[2:])
			C = NewCs(uid)
			CMap[uid] = C
			//			fmt.Println("register client")
			//			fmt.Println(uid)
			break
		} else {
			conn.Write([]byte{Res_REGISTER,'e','r'})
		}
	}
	//	WHandler
	go WHandler(conn,C)

	//	RHandler
	go RHandler(conn,C)

	//	Worker
	go Work(C)
	select {
	case <-C.Dch:
		fmt.Println("close handler goroutine")
	}
}

// 正常写数据
// 定时检测 conn die => goroutine die
func WHandler(conn net.Conn,C *CS) {
	// 读取业务Work 写入Wch的数据
	ticker := time.NewTicker(20 * time.Second)
	for {
		select {
		case d := <-C.Wch:
			conn.Write(d)
		case <-ticker.C:
			if _,ok := CMap[C.u]; !ok {
				fmt.Println("conn die,close WHandler")
				return
			}
		}
	}
}

// 读客户端数据 + 心跳检测
func RHandler(conn net.Conn,C *CS) {
	// 心跳ack
	// 业务数据 写入Wch

	for {
		data := make([]byte,128)
		// setReadTimeout
		err := conn.SetReadDeadline(time.Now().Add(10 * time.Second))
		if err != nil {
			fmt.Println(err)
		}
		if _,derr := conn.Read(data); derr == nil {
			// 可能是来自客户端的消息确认
			//           	     数据消息
			fmt.Println(data)
			if data[0] == Res {
				fmt.Println("recv client data ack")
			} else if data[0] == Req {
				fmt.Println("recv client data")
				fmt.Println(data)
				conn.Write([]byte{Res,'#'})
				// C.Rch <- data
			}

			continue
		}

		conn.Write([]byte{Req_HEARTBEAT,'#'})
		fmt.Println("send ht packet")
		conn.SetReadDeadline(time.Now().Add(2 * time.Second))
		if _,herr := conn.Read(data); herr == nil {
			// fmt.Println(string(data))
			fmt.Println("resv ht packet ack")
		} else {
			delete(CMap,C.u)
			fmt.Println("delete user!")
			return
		}
	}
}

func Work(C *CS) {
	time.Sleep(5 * time.Second)
	C.Wch <- []byte{Req,'l','o'}

	time.Sleep(15 * time.Second)
	C.Wch <- []byte{Req,'o'}
	// 从读ch读信息
	/*	ticker := time.NewTicker(20 * time.Second)
		for {
			select {
			case d := <-C.Rch:
				C.Wch <- d
			case <-ticker.C:
				if _,ok := CMap[C.u]; !ok {
					return
				}
			}

		}
	*/ // 往写ch写信息
}
package main

// golang实现带有心跳检测的tcp长连接
// server

import (
	"fmt"
	"net"
)

var (
	Req_REGISTER byte = 1 // 1 --- c register cid
	Res_REGISTER byte   = 2 // 2 --- s response

	Req_HEARTBEAT byte = 3 // 3 --- s send heartbeat req
	Res_HEARTBEAT byte = 4 // 4 --- c send heartbeat res

	Req  byte = 5 // 5 --- cs send data
	Res  byte = 6 // 6 --- cs send ack
)

var Dch chan bool
var Rch chan []byte
var Wch chan []byte

func main() {
	Dch = make(chan bool)
	Rch = make(chan []byte)
	Wch = make(chan []byte)
	addr,err := net.ResolveTCPAddr("tcp","127.0.0.1:6666")
	conn,err := net.DialTCP("tcp",nil,addr)
//	conn,err := net.Dial("tcp","127.0.0.1:6666")
	if err != nil {
		fmt.Println("连接服务端失败:",err.Error())
		return
	}
	fmt.Println("已连接服务器")
	defer conn.Close()
	go Handler(conn)
	select {
	    case <- Dch:
		    fmt.Println("关闭连接")
	}
}

func Handler(conn *net.TCPConn) {
	// 直到register ok
	data := make([]byte,128)
	for {
		conn.Write([]byte{Req_REGISTER,'2'})
		conn.Read(data)
//		fmt.Println(string(data))
		if data[0] == Res_REGISTER {
			break
		}
	}
//	fmt.Println("i'm register")
	go RHandler(conn)
	go WHandler(conn)
	go Work()
}

func RHandler(conn *net.TCPConn) {

	for {
		// 心跳包,回复ack
	data := make([]byte,128)
		i,_ := conn.Read(data)
		if i == 0 {
			Dch <- true
			return
		}
		if data[0] == Req_HEARTBEAT {
			fmt.Println("recv ht pack")
			conn.Write([]byte{Res_REGISTER,'h'})
			fmt.Println("send ht pack ack")
		} else if data[0] == Req {
			fmt.Println("recv data pack")
			fmt.Printf("%v\n",string(data[2:]))
			Rch <- data[2:]
			conn.Write([]byte{Res,'#'})
		}
	}
}

func WHandler(conn net.Conn) {
	for {
		select {
			case msg := <- Wch:
				fmt.Println((msg[0]))
				fmt.Println("send data after: " + string(msg[1:]))
				conn.Write(msg)
		}
	}

}

func Work() {
	for {
		select {
		case msg := <- Rch:
				fmt.Println("work recv " + string(msg))
				Wch <- []byte{Req,'x','x'}
		}
	}
}

相关文章

Golang的文档和社区资源:为什么它可以帮助开发人员快速上手...
Golang:AI 开发者的实用工具
Golang的标准库:为什么它可以大幅度提高开发效率?
Golang的部署和运维:如何将应用程序部署到生产环境中?
高性能AI开发:Golang的优势所在
本篇文章和大家了解一下go语言开发优雅得关闭协程的方法。有...