介绍
传统的聊天室主要是基于c/s架构,需要有一个服务端完成各个客户端的聊天转发。今天我们使用golang+grpc+protobuf,设计一个去中心化、局域网自发现的聊天客户端。
完整代码地址在 github.com/alpsmonaco/proximity-chat
模块
协议
我们先定义proto消息格式 message/message.proto
syntax = "proto3"; option go_package = "proximity-chat/message"; package message; service chat { rpc newnode (stream noderequest) returns (stream nodereply){ } } message noderequest { string msg = 1; } message nodereply { string msg = 1; }
聊天软件一般需要全双工保证时效性,所以这边使用了 stream noderequest
和 stream nodereply
。 这边消息只有两个,请求和回复直接透传string就行。
执行
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative message\message.proto
会在相同目录下生成相关的go代码文件。在文件 message_grpc.pb.go
中会包含rpc的interface
type chatserver interface { newnode(chat_newnodeserver) error mustembedunimplementedchatserver() }
我们需要实现这个接口中的 newnode
服务。
交互
在 service/message.go 中实现 newnode(chat_newnodeserver) error
type messagewriter interface { write(string) } type message struct { writer messagewriter message.unimplementedchatserver } ... func (m *message) newnode(ss message.chat_newnodeserver) error { head, err := ss.recv() if err != nil { m.writer.write(fmt.sprint(err)) return err } addr := head.getmsg() if controller.ischatnodeexist(addr) { return nil } if !controller.addchatnode(&serverchatnode{s: ss}, addr) { return nil } err = ss.send(&message.nodereply{msg: "ok"}) if err != nil { return err } m.writer.write("new node " + addr + " has joined") for { msg, err := ss.recv() if err != nil { controller.removenode(addr) fmt.println(err) return err } m.writer.write(msg.getmsg()) } }
由于是去中心化,所以没有客户端服务端的概念,我们将它称为一个节点 node
。在同一个局域网内,node监听的ip+port做唯一key,用于避免重复进入聊天室。
上面的代码中 controller 模块主要是用来控制和管理断点的,后续会讲。
整体流程是先接收其他node发来的 ip+port
,判断是否已经加入过这个端点,如果没加入过就用controller绑定节点,进行后续的聊天请求,否则中止交互。
控制
在 controller/node.go ,我们使用map和读写锁来维护node的唯一性。
package controller import ( "sync" ) type chatnode interface { sendchatmsg(string) error recvchatmsg() (string, error) } var nodemap map[string]chatnode = make(map[string]chatnode) var nodemaplock sync.rwmutex func addchatnode(node chatnode, addr string) bool { nodemaplock.lock() defer nodemaplock.unlock() _, ok := nodemap[addr] if !ok { nodemap[addr] = node return true } return false } func removenode(addr string) { nodemaplock.lock() defer nodemaplock.unlock() delete(nodemap, addr) } func ischatnodeexist(addr string) bool { nodemaplock.rlock() defer nodemaplock.runlock() _, ok := nodemap[addr] return ok } func publish(s string) { nodemaplock.rlock() defer nodemaplock.runlock() for _, n := range nodemap { n.sendchatmsg(s) } }
发现
discover/discover.go 下定义如何发现相同网段上的其他服务。
这边使用 ipnetgen 库来获取相同网段下的所有ip。定期去遍历其他网段上的相同服务。 将自己的监听ip+端口发送给其他node,若返回'ok'则建立通讯。
func begindiscoverservice() { minport := config.getconfig().getminport() maxport := config.getconfig().getmaxport() if minport > maxport { minport = maxport } for { time.sleep(time.second) gen, err := ipnetgen.new(config.getconfig().getcidr()) if err != nil { panic(err) } for ip := gen.next(); ip != nil; ip = gen.next() { for i := minport; i <= maxport; i++ { addr := fmt.sprintf("%s:%d", ip.string(), i) if addr == getaddr() { continue } if controller.ischatnodeexist(addr) { continue } conn, err := grpc.dial(addr, grpc.withtransportcredentials(insecure.newcredentials())) if err != nil { fmt.printf("did not connect: %v\n", err) continue } client := message.newchatclient(conn) cli, err := client.newnode(context.background()) if err != nil { continue } err = cli.send(&message.noderequest{msg: getaddr()}) if err != nil { writer.write(fmt.sprint(err)) continue } resp, err := cli.recv() if err != nil { cli.closesend() writer.write(fmt.sprint(err)) continue } if resp.getmsg() != "ok" { cli.closesend() continue } if !controller.addchatnode(&service.clientchatnode{c: cli}, addr) { cli.closesend() continue } writer.write("discover " + addr) go func() { for { msg, err := cli.recv() if err != nil { writer.write(fmt.sprint(err)) controller.removenode(addr) return } writer.write(msg.getmsg()) } }() } } } }
配置
我们定义配置的获取方式,配置文件格式为json,定义配置获取的方式 config.go 。
package config type networkconfig struct { cidr string `json:"cidr"` maxport int `json:"max_port"` minport int `json:"min_port"` } func defaultnetworkconfig() *networkconfig { return &networkconfig{ "127.0.0.1/32", 4569, 4565, } } type constnetworkconfig struct { c *networkconfig } func (c *constnetworkconfig) getcidr() string { return c.c.cidr } func (c *constnetworkconfig) getmaxport() int { return c.c.maxport } func (c *constnetworkconfig) getminport() int { return c.c.minport } var config = &constnetworkconfig{defaultnetworkconfig()} func getconfig() *constnetworkconfig { return config } func setconfig(nc *networkconfig) { config = &constnetworkconfig{nc} }
这边最主要定义三个字段,内网的ip网段,服务的最小到最大的端口范围。这个配置主要用于搜寻同网段同端口上的相同服务。为了方便调试我们加一个 defaultnetworkconfig()
,监听127.0.0.1上的4565~4569。 同时还加了一个 constnetworkconfig
类,供其他模块访问全局配置,同时保护配置不被修改。
运行实例
编译后直接运行,会在指定的端口范围内尝试监听,无需指定端口。主线程中scanf阻塞获取输入。我们直接打开三个进程,在一个终端中输入数据发送,其他两个终端都能获取聊天数据。
以上就是go语言结合grpc和protobuf实现去中心化的聊天室的详细内容,更多关于go聊天室的资料请关注代码网其它相关文章!
发表评论