当前位置: 代码网 > it编程>前端脚本>Golang > Go语言结合grpc和protobuf实现去中心化的聊天室

Go语言结合grpc和protobuf实现去中心化的聊天室

2024年05月18日 Golang 我要评论
介绍传统的聊天室主要是基于c/s架构,需要有一个服务端完成各个客户端的聊天转发。今天我们使用golang+grpc+protobuf,设计一个去中心化、局域网自发现的聊天客户端。完整代码地址在 git

介绍

传统的聊天室主要是基于c/s架构,需要有一个服务端完成各个客户端的聊天转发。今天我们使用golang+grpc+protobuf,设计一个去中心化、局域网自发现的聊天客户端。

完整代码地址在 github.com/alpsmonaco/proximity-chat

模块

协议

我们先定义proto消息格式 message/message.proto

syntax = "proto3";

option go_package = "proximity-chat/message";

package message;

service chat {
    rpc newnode (stream noderequest) returns (stream nodereply){ }
}

message noderequest {
    string msg = 1;
}

message nodereply {
    string msg = 1;
}

聊天软件一般需要全双工保证时效性,所以这边使用了 stream noderequeststream nodereply。 这边消息只有两个,请求和回复直接透传string就行。

执行

protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative message\message.proto

会在相同目录下生成相关的go代码文件。在文件 message_grpc.pb.go 中会包含rpc的interface

type chatserver interface {
	newnode(chat_newnodeserver) error
	mustembedunimplementedchatserver()
}

我们需要实现这个接口中的 newnode 服务。

交互

在 service/message.go 中实现 newnode(chat_newnodeserver) error

type messagewriter interface {
	write(string)
}

type message struct {
	writer messagewriter
	message.unimplementedchatserver
}
...
func (m *message) newnode(ss message.chat_newnodeserver) error {
	head, err := ss.recv()
	if err != nil {
		m.writer.write(fmt.sprint(err))
		return err
	}
	addr := head.getmsg()
	if controller.ischatnodeexist(addr) {
		return nil
	}
	if !controller.addchatnode(&serverchatnode{s: ss}, addr) {
		return nil
	}
	err = ss.send(&message.nodereply{msg: "ok"})
	if err != nil {
		return err
	}
	m.writer.write("new node " + addr + " has joined")
	for {
		msg, err := ss.recv()
		if err != nil {
			controller.removenode(addr)
			fmt.println(err)
			return err
		}
		m.writer.write(msg.getmsg())
	}
}

由于是去中心化,所以没有客户端服务端的概念,我们将它称为一个节点 node。在同一个局域网内,node监听的ip+port做唯一key,用于避免重复进入聊天室。

上面的代码中 controller 模块主要是用来控制和管理断点的,后续会讲。

整体流程是先接收其他node发来的 ip+port ,判断是否已经加入过这个端点,如果没加入过就用controller绑定节点,进行后续的聊天请求,否则中止交互。

控制

在 controller/node.go ,我们使用map和读写锁来维护node的唯一性。

package controller

import (
	"sync"
)

type chatnode interface {
	sendchatmsg(string) error
	recvchatmsg() (string, error)
}

var nodemap map[string]chatnode = make(map[string]chatnode)
var nodemaplock sync.rwmutex

func addchatnode(node chatnode, addr string) bool {
	nodemaplock.lock()
	defer nodemaplock.unlock()
	_, ok := nodemap[addr]
	if !ok {
		nodemap[addr] = node
		return true
	}
	return false
}

func removenode(addr string) {
	nodemaplock.lock()
	defer nodemaplock.unlock()
	delete(nodemap, addr)
}

func ischatnodeexist(addr string) bool {
	nodemaplock.rlock()
	defer nodemaplock.runlock()
	_, ok := nodemap[addr]
	return ok
}

func publish(s string) {
	nodemaplock.rlock()
	defer nodemaplock.runlock()
	for _, n := range nodemap {
		n.sendchatmsg(s)
	}
}

发现

discover/discover.go 下定义如何发现相同网段上的其他服务。

这边使用 ipnetgen 库来获取相同网段下的所有ip。定期去遍历其他网段上的相同服务。 将自己的监听ip+端口发送给其他node,若返回'ok'则建立通讯。

func begindiscoverservice() {
	minport := config.getconfig().getminport()
	maxport := config.getconfig().getmaxport()
	if minport > maxport {
		minport = maxport
	}
	for {
		time.sleep(time.second)
		gen, err := ipnetgen.new(config.getconfig().getcidr())
		if err != nil {
			panic(err)
		}
		for ip := gen.next(); ip != nil; ip = gen.next() {
			for i := minport; i <= maxport; i++ {
				addr := fmt.sprintf("%s:%d", ip.string(), i)
				if addr == getaddr() {
					continue
				}
				if controller.ischatnodeexist(addr) {
					continue
				}
				conn, err := grpc.dial(addr, grpc.withtransportcredentials(insecure.newcredentials()))
				if err != nil {
					fmt.printf("did not connect: %v\n", err)
					continue
				}
				client := message.newchatclient(conn)
				cli, err := client.newnode(context.background())
				if err != nil {
					continue
				}
				err = cli.send(&message.noderequest{msg: getaddr()})
				if err != nil {
					writer.write(fmt.sprint(err))
					continue
				}
				resp, err := cli.recv()
				if err != nil {
					cli.closesend()
					writer.write(fmt.sprint(err))
					continue
				}
				if resp.getmsg() != "ok" {
					cli.closesend()
					continue
				}
				if !controller.addchatnode(&service.clientchatnode{c: cli}, addr) {
					cli.closesend()
					continue
				}
				writer.write("discover " + addr)
				go func() {
					for {
						msg, err := cli.recv()
						if err != nil {
							writer.write(fmt.sprint(err))
							controller.removenode(addr)
							return
						}
						writer.write(msg.getmsg())
					}
				}()
			}
		}
	}
}

配置

我们定义配置的获取方式,配置文件格式为json,定义配置获取的方式 config.go 。

package config

type networkconfig struct {
	cidr    string `json:"cidr"`
	maxport int    `json:"max_port"`
	minport int    `json:"min_port"`
}

func defaultnetworkconfig() *networkconfig {
	return &networkconfig{
		"127.0.0.1/32", 4569, 4565,
	}
}

type constnetworkconfig struct {
	c *networkconfig
}

func (c *constnetworkconfig) getcidr() string { return c.c.cidr }
func (c *constnetworkconfig) getmaxport() int { return c.c.maxport }
func (c *constnetworkconfig) getminport() int { return c.c.minport }

var config = &constnetworkconfig{defaultnetworkconfig()}

func getconfig() *constnetworkconfig { return config }
func setconfig(nc *networkconfig)    { config = &constnetworkconfig{nc} }

这边最主要定义三个字段,内网的ip网段,服务的最小到最大的端口范围。这个配置主要用于搜寻同网段同端口上的相同服务。为了方便调试我们加一个 defaultnetworkconfig(),监听127.0.0.1上的4565~4569。 同时还加了一个 constnetworkconfig 类,供其他模块访问全局配置,同时保护配置不被修改。

运行实例

编译后直接运行,会在指定的端口范围内尝试监听,无需指定端口。主线程中scanf阻塞获取输入。我们直接打开三个进程,在一个终端中输入数据发送,其他两个终端都能获取聊天数据。

以上就是go语言结合grpc和protobuf实现去中心化的聊天室的详细内容,更多关于go聊天室的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com