Go项目推荐之即时通讯服务器 IM

Go语言精选

共 5877字,需浏览 12分钟

 · 2020-08-09


点击上方蓝色“Go语言中文网”关注我们,领全套Go资料,每天学习 Go 语言

简要介绍

im是一个即时通讯服务器,代码全部使用golang完成。主要功能

1.支持tcp,websocket接入

2.离线消息同步

3.单用户多设备同时在线

4.单聊,群聊,以及超大群聊天场景

5.支持服务水平扩展

gim和im有什么区别?gim可以作为一个im中台提供给业务方使用,而im可以作为以业务服务器的一个组件, 为业务服务器提供im的能力,业务服务器的user服务只需要实现user.int.proto协议中定义的GRPC接口,为im服务 提供基本的用户功能即可,其实以我目前的认知,我更推荐这种方式,这种模式相比于gim,我认为最大好处在于 以下两点:

1.im不需要考虑多个app的场景,相比gim,业务复杂度降低了一个维度

2.各个业务服务可以互不影响,可以做到风险隔离

使用技术:

数据库:MySQL+Redis

通讯框架:GRPC

长连接通讯协议:Protocol Buffers

日志框架:Zap

ORM框架:GORM

安装部署

1.首先安装MySQL,Redis

2.创建数据库im,执行sql/create_table.sql,完成初始化表的创建(数据库包含提供测试的一些初始数据)

3.修改config下配置文件,使之和你本地配置一致

4.分别切换到cmd的tcp_conn,ws_conn,logic,user目录下,执行go run main.go,启动TCP连接层服务器, WebSocket连接层服务器,逻辑层服务器,用户服务器

(注意:tcp_conn只能在linux下启动,如果想在其他平台下启动,请安装docker,执行run.sh)

项目目录简介

项目结构遵循 https://github.com/golang-standards/project-layout

cmd:          服务启动入口
config: 服务配置
internal: 每个服务私有代码
pkg: 服务共有代码
sql: 项目sql文件
test: 长连接测试脚本

服务简介

1.tcp_conn

维持与客户端的TCP长连接,心跳,以及TCP拆包粘包,消息编解码

2.ws_conn

维持与客户端的WebSocket长连接,心跳,消息编解码

3.logic

设备信息,好友信息,群组信息管理,消息转发逻辑

4.user

一个简单的用户服务,可以根据自己的业务需求,进行扩展

网络模型

TCP的网络层使用linux的epoll实现,相比golang原生,能减少goroutine使用,从而节省系统资源占用

单用户多设备支持,离线消息同步

每个用户都会维护一个自增的序列号,当用户A给用户B发送消息时,首先会获取A的最大序列号,设置为这条消息的seq,持久化到用户A的消息列表, 再通过长连接下发到用户A账号登录的所有设备,再获取用户B的最大序列号,设置为这条消息的seq,持久化到用户B的消息列表,再通过长连接下发 到用户B账号登录的所有设备。

假如用户的某个设备不在线,在设备长连接登录时,用本地收到消息的最大序列号,到服务器做消息同步,这样就可以保证离线消息不丢失。

读扩散和写扩散

首先解释一下,什么是读扩散,什么是写扩散

读扩散

简介:群组成员发送消息时,先建立一个会话,都将这个消息写入这个会话中,同步离线消息时,需要同步这个会话的未同步消息

优点:每个消息只需要写入数据库一次就行,减少数据库访问次数,节省数据库空间

缺点:一个用户有n个群组,客户端每次同步消息时,要上传n个序列号,服务器要对这n个群组分别做消息同步

写扩散

简介:在群组中,每个用户维持一个自己的消息列表,当群组中有人发送消息时,给群组的每个用户的消息列表插入一条消息即可

优点:每个用户只需要维护一个序列号和消息列表

缺点:一个群组有多少人,就要插入多少条消息,当群组成员很多时,DB的压力会增大

消息转发逻辑选型以及特点

普通群组:

采用写扩散,群组成员信息持久化到数据库保存。支持消息离线同步。

超大群组:

采用读扩散,群组成员信息保存到redis,不支持离线消息同步。

核心流程时序图

长连接登录


离线消息同步


心跳


消息单发

c1.d1和c1.d2分别表示c1用户的两个设备d1和d2,c2.d3和c2.d4同理


小群消息群发

c1,c2.c3表示一个群组中的三个用户


大群消息群发


错误处理,链路追踪,日志打印

系统中的错误一般可以归类为两种,一种是业务定义的错误,一种就是未知的错误,在业务正式上线的时候,业务定义的错误的属于正常业务逻辑,不需要打印出来, 但是未知的错误,我们就需要打印出来,我们不仅要知道是什么错误,还要知道错误的调用堆栈,所以这里我对GRPC的错误进行了一些封装,使之包含调用堆栈。

func WrapError(err error) error {  if err == nil {    return nil  }

s := &spb.Status{ Code: int32(codes.Unknown), Message: err.Error(), Details: []*any.Any{ { TypeUrl: TypeUrlStack, Value: util.Str2bytes(stack()), }, }, } return status.FromProto(s).Err()}

// Stack 获取堆栈信息func stack() string { var pc = make([]uintptr, 20) n := runtime.Callers(3, pc)

var build strings.Builder for i := 0; i < n; i++ { f := runtime.FuncForPC(pc[i] - 1) file, line := f.FileLine(pc[i] - 1) n := strings.Index(file, name) if n != -1 { s := fmt.Sprintf(" %s:%d \n", file[n:], line) build.WriteString(s) } } return build.String()}

这样,不仅可以拿到错误的堆栈,错误的堆栈也可以跨RPC传输,但是,但是这样你只能拿到当前服务的堆栈,却不能拿到调用方的堆栈,就比如说,A服务调用 B服务,当B服务发生错误时,在A服务通过日志打印错误的时候,我们只打印了B服务的调用堆栈,怎样可以把A服务的堆栈打印出来。我们在A服务调用的地方也获取 一次堆栈。

func WrapRPCError(err error) error {  if err == nil {    return nil  }  e, _ := status.FromError(err)  s := &spb.Status{    Code:    int32(e.Code()),    Message: e.Message(),    Details: []*any.Any{      {        TypeUrl: TypeUrlStack,        Value:   util.Str2bytes(GetErrorStack(e) + " --grpc-- \n" + stack()),      },    },  }  return status.FromProto(s).Err()}
func interceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { err := invoker(ctx, method, req, reply, cc, opts...) return gerrors.WrapRPCError(err)}
var LogicIntClient pb.LogicIntClient
func InitLogicIntClient(addr string) { conn, err := grpc.DialContext(context.TODO(), addr, grpc.WithInsecure(), grpc.WithUnaryInterceptor(interceptor)) if err != nil { logger.Sugar.Error(err) panic(err) }
LogicIntClient = pb.NewLogicIntClient(conn)}

像这样,就可以获取完整一次调用堆栈。错误打印也没有必要在函数返回错误的时候,每次都去打印。因为错误已经包含了堆栈信息

// 错误的方式if err != nil {  logger.Sugar.Error(err)  return err}
// 正确的方式if err != nil { return err}

然后,我们在上层统一打印就可以

func startServer() {  extListen, err := net.Listen("tcp", conf.LogicConf.ClientRPCExtListenAddr)  if err != nil {    panic(err)  }  extServer := grpc.NewServer(grpc.UnaryInterceptor(LogicClientExtInterceptor))  pb.RegisterLogicClientExtServer(extServer, &LogicClientExtServer{})  err = extServer.Serve(extListen)  if err != nil {    panic(err)  }}
func LogicClientExtInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { defer func() { logPanic("logic_client_ext_interceptor", ctx, req, info, &err) }()
resp, err = handler(ctx, req) logger.Logger.Debug("logic_client_ext_interceptor", zap.Any("info", info), zap.Any("ctx", ctx), zap.Any("req", req), zap.Any("resp", resp), zap.Error(err))
s, _ := status.FromError(err) if s.Code() != 0 && s.Code() < 1000 { md, _ := metadata.FromIncomingContext(ctx) logger.Logger.Error("logic_client_ext_interceptor", zap.String("method", info.FullMethod), zap.Any("md", md), zap.Any("req", req), zap.Any("resp", resp), zap.Error(err), zap.String("stack", gerrors.GetErrorStack(s))) } return}

这样做的前提就是,在业务代码中透传context,golang不像其他语言,可以在线程本地保存变量,像Java的ThreadLocal,所以只能通过函数参数的形式进行传递,im中,service层函数的第一个参数 都是context,但是dao层和cache层就不需要了,不然,显得代码臃肿。

最后可以在客户端的每次请求添加一个随机的request_id,这样客户端到服务的每次请求都可以串起来了。

func getCtx() context.Context {  token, _ := util.GetToken(1, 2, 3, time.Now().Add(1*time.Hour).Unix(), util.PublicKey)  return metadata.NewOutgoingContext(context.TODO(), metadata.Pairs(    "app_id", "1",    "user_id", "2",    "device_id", "3",    "token", token,    "request_id", strconv.FormatInt(time.Now().UnixNano(), 10)))}

github

https://github.com/alberliu/im


来源:

https://www.toutiao.com/a6853605017463554572/



推荐阅读


学习交流 Go 语言,扫码回复「进群」即可


站长 polarisxu

自己的原创文章

不限于 Go 技术

职场和创业经验


Go语言中文网

每天为你

分享 Go 知识

Go爱好者值得关注


浏览 17
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报