集成RocketChat至现有的.Net项目中,为ChatGPT铺路

news2024/10/1 7:30:43

文章目录

    • 前言
    • 项目搭建
      • 后端
      • 前端
    • 代理账号
    • 鉴权方式介绍
    • 登录校验模块
      • 前端鉴权方式
    • 后端鉴权方式
    • 登录委托
    • 使用登录委托
    • 处理聊天消息
      • 前端鉴权方式
      • 后端校验方式
    • 项目地址

前言

今天我们来聊一聊一个Paas的方案,如何集成到一个既有的项目中。
以其中一个需求为例子:在产品项目中,加入IM(即时通信)功能,开始徒手撸代码,会发现工作量很大,去github找开源项目,结果也可能事与愿违:功能不够强大,或者用不同的语言编写的,编译出来程序集无法集成到项目中。
可能当下最好的方案是利用独立的聊天功能组件,作为项目的中间件(Paas方案)。

  1. 组件是独立部署,独立运行的,功能的稳定性,搭建速度快,
  2. 作为基础设施服务,可以用在其他项目中,并且项目中的对接作为抽象层,可随时替换现有组件。

这个聊天组件就是RocketChat。
RocketChat 是一款免费,开源的聊天软件平台。
其主要功能是:群组聊天、相互通信、私密聊群、桌面通知、文件上传、语音/视频、截图等,实现了用户之间的实时消息转换。
https://github.com/RocketChat/Rocket.Chat

它本身是使用Meteor全栈框架以JavaScript开发的Web聊天服务器。本身带有一个精美的web端,甚至有开源的App端。
集成到一个既有的项目中我们是需要做减法的,然而在实际对接中,我们仍然需要解决一些问题:
首先是Rocket.Chat自己有一套独立的用户系统,其中登录鉴权逻辑,这一部分是我们不需要的。
第二是Rocket.Chat聊天功能依赖这个用户系统,需要简化流程同步用户信息,只保留用户,不需要权限,角色。

准备工作:搭建Rocket.Chat服务

Rocket.Chat有两套Api,一个是基于https的REST Api,和一个基于wss的Realtime API, https://developer.rocket.chat/reference/api/realtime-api
这两个Api都需要鉴权。

解决这个有两套方案,一个是通过完全的后端接管,两个Api都经过后端项目进行转发,另一个是后端只接管REST Api, Realtime API和Rocket.Chat服务直接通信

项目搭建

后端

新建一个.Net 6 Abp项目后,添加AbpBoilerplate.RocketChat库,AbpBoilerplate.RocketChat的具体介绍请参考https://blog.csdn.net/jevonsflash/article/details/128342430

dotnet add package AbpBoilerplate.RocketChat

在Domain层中创建IM项目,创建Authorization目录存放与IM鉴权相关的代码,ImWebSocket目录用于存放处理Realtime API相关的代码.

在搭建Rocket.Chat环节,还记得有一个设置管理员的步骤吗?在AdminUserName和AdminPassword配置中,指定这个管理员的密码,

管理员用于在用户未登录时,提供操作的权限主体,

  "Im": {
    "Provider": "RocketChat",
    "Address": "http://localhost:3000/",
    "WebSocketAddress": "ws://localhost:3000/",
    "AdminUserName": "super",
    "AdminPassword": "123qwe",
    "DefaultPassword": "123qwe"
  }

前端

用vue2来搭建一个简单的前端界面,需要用到以下库

  • element-UI库
  • axios
  • vuex
  • signalr
    新建一个vue项目,在package.json中的 "dependencies"添加如下:
"axios": "^0.26.1",
"element-ui": "^2.15.6",
"@microsoft/signalr": "^5.0.6"
"vuex": "^3.6.2"

代理账号

代理账号是一个管理员账号
在程序的启动时,要登录这个管理员账号,并保存Token,程序停止时退出登录这个账号。
我们需要一个cache存储管理员账号的登录信息(用户ID和Token)
在Threads目录下创建ImAdminAgentAuthBackgroundWorker,
并在ImModule中注册这个后台任务

private async Task LoginAdminAgent()
{
    var userName = rocketChatConfiguration.AdminUserName;
    var password = rocketChatConfiguration.AdminPassword;
    var loginResult = await imManager.Authenticate(userName, password);
    if (loginResult.Success && loginResult.Content != null)
    {
        var cache = imAdminAgentCache.GetCache("ImAdminAgent");
        await cache.SetAsync("UserId", loginResult.Content.Data.UserId);
        await cache.SetAsync("AuthToken", loginResult.Content.Data.AuthToken);
        await cache.SetAsync("UserName", userName);
    }
    else
    {
        throw new UserFriendlyException("无法登录IM服务Admin代理账号");
    }
}

public override async void Stop()
{
    base.Stop();
    var cache = imAdminAgentCache.GetCache("ImAdminAgent");
    var token = (string)cache.Get("AuthToken", (i) => { return string.Empty; });
    var userId = (string)cache.Get("UserId", (i) => { return string.Empty; });

    if (string.IsNullOrEmpty(token) || string.IsNullOrEmpty(userId))
    {
        return;
    }

    using (_iocManager.IocContainer.BeginScope()) //extension method
    {
        _iocManager.Resolve<SessionContextDto>().Token = token;
        _iocManager.Resolve<SessionContextDto>().UserId = userId;
        _iocManager.Resolve<SessionContextDto>().IsAuthorized = true;


        try
        {
            await imManager.Logout();
        }
        catch (Exception ex)
        {

            throw;
        }
    }

}

SessionContextDto是一个会话上下文对象,在.net项目中,登录校验成功后写入,在请求Rocket.Chat的时候读取,并写入到请求头中。

在ImModule的PostInitialize方法中注册ImAdminAgentAuthBackgroundWorker

public override void PostInitialize()
{
    var workerManager = IocManager.Resolve<IBackgroundWorkerManager>();
    workerManager.Add(IocManager.Resolve<ImAdminAgentAuthBackgroundWorker>());
}

用户登录时,需要传用户名密码,用户名是跟.net项目中相同的,密码可以独立设置,也可以设定约定一个默认密码,那么新建用户和登录的时候,可以不用传密码,直接使用默认密码即可,用户成功登录后,将用户ID和Token回传给前端。

定义传输对象类AuthenticateResultDto

public class AuthenticateResultDto
{
    public string AccessToken { get; set; }
    public string UserId { get; set; }
}

在应用层中创建类ImAppService,创建应用层服务Authenticate,用于用户登录。

 private async Task<AuthenticateResultDto> Authenticate(MatoAppSample.Authorization.Users.User user, string password = null)
{
    var loginResult = await _imManager.Authenticate(user.UserName, password);

    if (loginResult.Success)
    {
        var userId = loginResult.Content.Data.UserId;
        var token = loginResult.Content.Data.AuthToken;
        this.imAuthTokenCache.Set(user.UserName, new ImAuthTokenCacheItem(userId, token), new TimeSpan(1, 0, 0));
    }
    else
    {
        this.imAuthTokenCache.Remove(user.UserName);
        throw new UserFriendlyException($"登录失败, {loginResult.Error}");

    }
    return new AuthenticateResultDto
    {
        AccessToken = loginResult.Content.Data.AuthToken,
        UserId = loginResult.Content.Data.UserId
    };
}

鉴权方式介绍

由于Rocket.Chat的Realtime API基于REST API基础上进行鉴权,在调用完成/api/v1/login接口后,需要在已经建立的Websocket连接中发送

{
    "msg": "method",
    "method": "login",
    "id": "42",
    "params":[
        { "resume": "auth-token" }
    ]
}

详见官方文档

在集成RocketChat时,对于Realtime API方案有二:

  1. 前端鉴权,前端通过Abp登录后,调用/api/v1/login接口,返回token之后存入前端Token缓存中,之后前端将与Rocketchat直接建立websocket联系,订阅的聊天消息和房间消息将被直接推送至前端。

    优点是消息订阅推送直接,效率较高,但前端需要同时顾及Abp的鉴权和RocketChat Realtime API鉴权,前端的代码逻辑复杂,代理账号逻辑复杂,后期扩展性差。小型项目适合此方式

  2. 后端鉴权,前端通过Abp登录后,调用/api/v1/login接口,返回token之后存入后端Token缓存中,由后端发起websocket连接,订阅的聊天消息和房间消息将被转发成signalR消息发送给前端,由后端缓存过期机制统一管理各连接的生命周期。

    优点是统一了前端的消息推送机制,架构更趋于合理,对于多用户端的大型项目,能够减少前端不必要的代码逻辑。但是后端的代码会复杂一些。适合中大型项目。

Realtime API 的前端鉴权

在这里插入图片描述
Realtime API 的后端鉴权

在这里插入图片描述

登录校验模块

前端鉴权方式

由于是从小程序,或者web端共用的所以要分别从Header和Cookie中获取登录信息,IHttpContextAccessor类型的参数用于从http请求上下文对象中访问Header或Cookie,

整个流程如下:

在这里插入图片描述

创建AuthorizedFrontendWrapper.cs,新建AuthorizationVerification方法,此方法是登录校验逻辑

private static void AuthorizationVerification(IHttpContextAccessor _httpContextAccessor, bool useAdminIfNotAuthorized, out StringValues? token, out StringValues? userId)
{
    var isCommonUserLoginPassed = true;
    token = _httpContextAccessor.HttpContext?.Request.Headers["X-Auth-Token"];
    userId = _httpContextAccessor.HttpContext?.Request.Headers["X-User-Id"];
    if (!ValidateToken(token, userId))
    {

        token = _httpContextAccessor.HttpContext?.Request.Cookies["chat_token"];
        userId = _httpContextAccessor.HttpContext?.Request.Cookies["chat_uid"];
        if (!ValidateToken(token, userId))
        {
            isCommonUserLoginPassed = false;
        }
    }

    var cache = Manager.GetCache("ImAdminAgent");
    if (!isCommonUserLoginPassed)
    {
        if (useAdminIfNotAuthorized)
        {
            //若不存在则取admin作为主体
            token = (string)cache.Get("AuthToken", (i) => { return string.Empty; });
            userId = (string)cache.Get("UserId", (i) => { return string.Empty; });
            if (!ValidateToken(token, userId))
            {
                throw new UserFriendlyException("操作未取得IM服务授权, 当前用户未登录,且初始代理用户未登录");
            }
        }
        else
        {
            throw new UserFriendlyException("操作未取得IM服务授权, 当前用户未登录");
        }
    }
    else
    {
        if ((string)cache.Get("UserId", (i) => { return string.Empty; }) == userId.Value)
        {
            token = (string)cache.Get("AuthToken", (i) => { return string.Empty; });
            if (!ValidateToken(token, userId))
            {
                throw new UserFriendlyException("操作未取得IM服务授权, 初始代理用户未登录");
            }
        }
    }
}

后端鉴权方式

整个流程如下:

在这里插入图片描述

创建AuthorizedBackendWrapper.cs,新建AuthorizationVerification方法,登录校验代码如下

public void AuthorizationVerification(out string token, out string userId)
{
    User user = null;
    try
    {
        user = userManager.FindByIdAsync(abpSession.GetUserId().ToString()).Result;
    }
    catch (Exception)
    {
    }

    var userName = user != null ? user.UserName : rocketChatConfiguration.AdminUserName;
    var password = user != null ? ImUserDefaultPassword : rocketChatConfiguration.AdminPassword;
    var userIdAndToken = imAuthTokenCache.Get(userName, (i) => { return default; });
    if (userIdAndToken == default)
    {
        var loginResult = imManager.Authenticate(userName, password).Result;
        if (loginResult.Success && loginResult.Content != null)
        {
            userId = loginResult.Content.Data.UserId;
            token = loginResult.Content.Data.AuthToken;
            var imAuthTokenCacheItem = new ImAuthTokenCacheItem(userId, token);
            imAuthTokenCache.Set(userName, imAuthTokenCacheItem, new TimeSpan(1, 0, 0));
            var userIdentifier = abpSession.ToUserIdentifier();
            if (userIdentifier != null)
            {
                Task.Run(async () =>
                {
                    await Login(imAuthTokenCacheItem, userIdentifier, userName);
                });
            }
        }
        else
        {
            var adminUserName = rocketChatConfiguration.AdminUserName;
            var adminPassword = rocketChatConfiguration.AdminPassword;
            var adminLoginResult = imManager.Authenticate(adminUserName, adminPassword).Result;
            if (adminLoginResult.Success && adminLoginResult.Content != null)
            {
                userId = adminLoginResult.Content.Data.UserId;
                token = adminLoginResult.Content.Data.AuthToken;
                if (!ValidateToken(token, userId))
                {
                    throw new UserFriendlyException("操作未取得IM服务授权, 无法登录账号" + userName);
                }
            }
            else
            {
                throw new UserFriendlyException("账号登录失败:" + adminLoginResult.Error);

            }

        }

    }
    else
    {
        userId = userIdAndToken.UserId;
        token = userIdAndToken.Token;
    }
    if (!ValidateToken(token, userId))
    {
        throw new UserFriendlyException("操作未取得IM服务授权, 登录失败");
    }
}

登录委托

在AuthorizedFrontendWrapper(或AuthorizedBackendWrapper)中

写一个登录委托AuthorizedChatAction,用于包装一个需要登录之后才能使用的操作

public static async Task AuthorizedChatAction(Func<Task> func, IocManager _iocManager)
{
    if (_iocManager.IsRegistered<SessionContextDto>())
    {
        string token, userId;
        AuthorizationVerification(out token, out userId);

        using (_iocManager.IocContainer.Begin()) //extension method
        {
            _iocManager.Resolve<SessionContextDto>().Token = token;
            _iocManager.Resolve<SessionContextDto>().UserId = userId;
            _iocManager.Resolve<SessionContextDto>().IsAuthorized = true;
            try
            {
                await func();
            }
            catch (Exception ex)
            {
                throw;
            }
        }
    }
    else
    {
        throw new UserFriendlyException("没有注册即时通信会话上下文对象");
    }
}

使用登录委托

我们在创建IM相关方法的时候,需要用AuthorizedFrontendWrapper(或AuthorizedBackendWrapper),来包装登录校验的逻辑。

public async Task<bool> DeleteUser(long userId)
{
    var user = await _userManager.GetUserByIdAsync(userId);
    var result = await AuthorizedBackendWrapper.AuthorizedChatAction(() =>
    {
        return _imManager.DeleteUser(user.UserName);
    }, _iocManager);

    if (!result.Success || !result.Content)
    {
        throw new UserFriendlyException($"删除失败, {result.Error}");
    }
    return result.Content;
}

处理聊天消息

前端鉴权方式

新建messageHandler_frontend_auth.ts处理程序

客户端支持WebSocket的浏览器中,在创建socket后,可以通过onopen、onmessage、onclose和onerror四个事件对socket进行响应。

我已经封装好了一个WebSocket 通信模块\web\src\utils\socket.ts,Socket对象是一个WebSocket抽象,后期将扩展到uniapp小程序项目上使用的WebSocket。通过这个对象可以方便的进行操作。

创建一个Socket对象wsConnection,用于接收和发送基于wss的Realtime API消息

const wsRequestUrl: string = "ws://localhost:3000/websocket";

const socketOpt: ISocketOption = {
  server: wsRequestUrl,
  reconnect: true,
  reconnectDelay: 2000,
};

const wsConnection: Socket = new Socket(socketOpt);

WebSocket的所有操作都是采用事件的方式触发的,这样不会阻塞UI,是的UI有更快的响应时间,有更好的用户体验。

连接建立后,客户端和服务器就可以通过TCP连接直接交换数据。我们订阅onmessage事件触发newMsgHandler处理信息

wsConnection.$on("message", newMsgHandler);

当链接打开后,立即发送{"msg":"connect","version":"1","support":["1","pre2","pre1"]}报文

wsConnection.$on("open", (newMsg) => {
    console.info("WebSocket Connected");
    wsConnection.send({
      msg: "connect",
      version: "1",
      support: ["1"],
    });
  });

建立链接后,会从Rocket.Chat收到connected消息,此时需要发送登录请求的消息到Rocket.Chat
接收到报文

"{"msg":"connected","session":"cMvzWpCNSCR24bwCf"}"

发送报文

{"msg":"method","method":"login","params":[{"resume":"wY67O8rJFyf2FrqD5vxpQjIUs5tdThmyfW_VaA7MrsG"}],"id":"1"}

接下来,在newMsgHandler方法中,根据msg类型,处理一系列的消息

const newMsgHandler: Function = (newMsgRaw) => {
  if (!getIsNull(newMsgRaw)) {
    if (newMsgRaw.msg == "ping") {
      wsConnection.send({
        msg: "pong",
      });
    } else if (newMsgRaw.msg == "connected") {
      let newMsg: ConnectedWsDto = newMsgRaw
      let session = newMsg.session;
      if (
        wsConnection.isConnected
      ) {
        wsConnection.send({
          msg: "method",
          method: "login",
          params: [
            {
              resume: UserModule.chatToken,
            },
          ],
          id: "1",
        });
      }
    } else if (newMsgRaw.msg == "added") {
      subEvent("stream-notify-user", "message");
      subEvent("stream-notify-user", "subscriptions-changed");
      subEvent("stream-notify-user", "rooms-changed");
    } else if (newMsgRaw.msg == "changed") {
      let newMsg: SubChangedWsDto = newMsgRaw
      if (newMsg.collection == "stream-notify-user") {
        let fields = newMsg.fields;
        if (fields.eventName.indexOf("/") != -1) {
          let id = fields.eventName.split('/')[0];
          let eventName = fields.eventName.split('/')[1];
          if (eventName == "subscriptions-changed") {
            let args = fields.args;
            let msg: ISubscription = null;
            let method: string;
            args.forEach((arg) => {
              if (typeof arg == "string") {
                if (arg == "remove" || arg == "insert") {
                  method = arg;
                }
              }
              else if (typeof arg == "object") {
                msg = arg
              }
            });
            $EventBus.$emit("getRoomSubscriptionChangedNotification", { msg, method });
          }
          else if (eventName == "rooms-changed") {
            let args = fields.args;
            let msg: RoomMessageNotificationDto = null;
            args.forEach((arg) => {
              if (typeof arg == "object") {
                msg = arg
              }
            });
            $EventBus.$emit("getRoomMessageNotification", msg.lastMessage);

          }
        }
        else {
          let id = fields.eventName
        }


      }
      else if (newMsg.collection == "stream-room-messages") {
        let fields = newMsg.fields;

        let id = fields.eventName
        let msg: MessageItemDto = fields.args;

        $EventBus.$emit("getRoomMessageNotification", msg);
      }
    }
  }
}

store/chat.ts文件中,定义了ChatState用于存储聊天信息,当有消息收到,或者房间信息变更时,更新这些存储对象

export interface IChatState {
  currentChannel: ChannelDto;
  channelList: Array<ChannelDto>;
  currentMessage: MessageDto;
}

后端校验方式

Login时将生成webSocket对象,并发送connect消息

public async Task Login(ImAuthTokenCacheItem imAuthTokenCacheItem, UserIdentifier userIdentifier, string userName)
{
    using (var webSocket = new ClientWebSocket())
    {
        webSocket.Options.RemoteCertificateValidationCallback = delegate { return true; };
        var url = Flurl.Url.Combine(rocketChatConfiguration.WebSocketHost, "websocket");
        await webSocket.ConnectAsync(new Uri(url), CancellationToken.None);
        if (webSocket.State == WebSocketState.Open)
        {

            var model = new ImWebSocketConnectRequest()
            {
                Msg = "connect",
                Version = "1",
                Support = new string[] { "1" }
            };
            var jsonStr = JsonConvert.SerializeObject(model);
            var sendStr = Encoding.UTF8.GetBytes(jsonStr);
            await webSocket.SendAsync(sendStr, WebSocketMessageType.Text, true, CancellationToken.None);
            await Echo(webSocket, imAuthTokenCacheItem, userIdentifier, userName);
        }
    }
}

每次接收指令时,将判断缓存中的Token值是否合法,若不存在,或过期(session变化),将主动断开websocket连接
在接收Realtime API消息后,解析方式同前端鉴权逻辑
在拿到数据后,做signalR转发。

private async Task Echo(WebSocket webSocket, ImAuthTokenCacheItem imAuthTokenCacheItem, UserIdentifier userIdentifier, string userName)
{
    JsonSerializerSettings serializerSettings = new JsonSerializerSettings()
    {
        NullValueHandling = NullValueHandling.Ignore
    };
    var buffer = new byte[1024 * 4];
    var receiveResult = await webSocket.ReceiveAsync(
        new ArraySegment<byte>(buffer), CancellationToken.None);
    string session=string.Empty;
    ImAuthTokenCacheItem im;
    while (!receiveResult.CloseStatus.HasValue)
    {
        im = imAuthTokenCache.GetOrDefault(userName);
        if (im == null)
        {
            await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure,
                "缓存超时自动退出",
                CancellationToken.None);
            Console.WriteLine(userName + "超时主动断开IM连接");

            break;


        }
        else
        {
            if (!string.IsNullOrEmpty(session) && im.Session!=session)
            {
                await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure,
                    "缓存更新自动退出",
                    CancellationToken.None);
                Console.WriteLine(userName + "缓存更新主动断开IM连接");

                break;
            }
        }
        var text = Encoding.UTF8.GetString(buffer.AsSpan(0, receiveResult.Count));
        if (!string.IsNullOrEmpty(text))
        {
            dynamic response = JsonConvert.DeserializeObject<dynamic>(text);
            if (response.msg == "ping")
            {

                var model = new ImWebSocketCommandRequest()
                {
                    Msg = "pong",
                };

                var jsonStr = JsonConvert.SerializeObject(model, serializerSettings);
                var sendStr = Encoding.UTF8.GetBytes(jsonStr);
                await webSocket.SendAsync(sendStr, WebSocketMessageType.Text, true, CancellationToken.None);

            }
            if (response.msg == "connected")
            {
                session = response.session;

                var model = new ImWebSocketCommandRequest()
                {
                    Msg = "method",
                    Method = "login",
                    Params = new object[]{
                        new {
                            resume = imAuthTokenCacheItem.Token,
                        }
                    },
                    Id = "1"
                };
                imAuthTokenCacheItem.Session = session;
                imAuthTokenCache.Set(userName, imAuthTokenCacheItem, new TimeSpan(1, 0, 0));

                var jsonStr = JsonConvert.SerializeObject(model, serializerSettings);
                var sendStr = Encoding.UTF8.GetBytes(jsonStr);
                await webSocket.SendAsync(sendStr, WebSocketMessageType.Text, true, CancellationToken.None);

            }
            else if (response.msg == "added")
            {
                await SubEvent(webSocket, imAuthTokenCacheItem, "stream-notify-user", "message");
                await SubEvent(webSocket, imAuthTokenCacheItem, "stream-notify-user", "subscriptions-changed");
                await SubEvent(webSocket, imAuthTokenCacheItem, "stream-notify-user", "rooms-changed");
            }
            else if (response.msg == "changed")
            {
                var newMsg = response;
                if (newMsg.collection == "stream-notify-user")
                {
                    var fields = newMsg.fields;
                    var fullEventName = fields.eventName.ToString();
                    if (fullEventName.IndexOf("/") != -1)
                    {
                        var id = fullEventName.Split('/')[0];
                        var eventName = fullEventName.Split('/')[1];
                        if (eventName == "subscriptions-changed")
                        {
                            var args = fields.args;
                            dynamic msg = null;
                            var method = string.Empty;

                            foreach (var arg in args as IEnumerable<dynamic>)
                            {

                                if (arg.ToString() == "remove" || arg.ToString() == "insert")
                                {
                                    method = arg.ToString();
                                }

                                else
                                {
                                    msg = arg;
                                }
                            }

                            await signalREventPublisher.PublishAsync(userIdentifier, "getRoomSubscriptionChangedNotification", new { msg, method });
                        }
                        else if (eventName == "rooms-changed")
                        {
                            var args = fields.args;
                            dynamic msg = null;
                            var method = string.Empty;
                            foreach (var arg in args as IEnumerable<dynamic>)
                            {

                                if (arg.ToString() == "updated")
                                {
                                    method = arg.ToString();
                                }

                                else
                                {
                                    msg = arg;
                                }
                            };

                            var jobject = msg.lastMessage as JObject;

                            await signalREventPublisher.PublishAsync(userIdentifier, "getRoomMessageNotification", jobject);

                        }
                    }
                    else
                    {
                        var id = fields.eventName;
                    }
                }

            }
            else if (response.collection == "stream-room-messages")
            {
                var fields = response.fields;
                var id = fields.eventName;
                var msg = fields.args;
                var jobject = msg as JObject;
                await signalREventPublisher.PublishAsync(userIdentifier, "getRoomMessageNotification", jobject);
            }
        }
        try
        {
            receiveResult = await webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer), CancellationToken.None);
        }
        catch (Exception ex)
        {
            Console.WriteLine(userName + "异常断开IM连接");

            break;
        }

    }

    try
    {
        await webSocket.CloseAsync(
receiveResult.CloseStatus.Value,
receiveResult.CloseStatusDescription,
CancellationToken.None);
    }
    catch (Exception ex)
    {
    }

    imAuthTokenCache.Remove(userName);

}

private async Task SubEvent(WebSocket webSocket, ImAuthTokenCacheItem imAuthTokenCacheItem, string name, string type)
{
    var eventstr = $"{imAuthTokenCacheItem.UserId}/${type}";
    var id = RandomHelper.GetRandom(100000).ToString().PadRight(5, '0');

    var model = new ImWebSocketCommandRequest()
    {
        Msg = "sub",
        Params = new object[]{eventstr,
            new {
                useCollection= false,
                args = new string[]{ }
            }
        },
        Id = id,
        Name = name,
    };
    var jsonStr = JsonConvert.SerializeObject(model);
    var sendStr = Encoding.UTF8.GetBytes(jsonStr);
    await webSocket.SendAsync(sendStr, WebSocketMessageType.Text, true, CancellationToken.None);
}

SignalREventPublisher.cs 中的PublishAsync,将消息转发给对应的用户。

public async Task PublishAsync(IUserIdentifier userIdentifier, string method, object message)
{

    try
    {
        var onlineClients = _onlineClientManager.GetAllByUserId(userIdentifier);
        foreach (var onlineClient in onlineClients)
        {
            var signalRClient = _hubContext.Clients.Client(onlineClient.ConnectionId);
            if (signalRClient == null)
            {
                Logger.Debug("Can not get user " + userIdentifier.ToUserIdentifier() + " with connectionId " + onlineClient.ConnectionId + " from SignalR hub!");
                continue;
            }

            await signalRClient.SendAsync(method, message);
        }
    }
    catch (Exception ex)
    {
        Logger.Warn("Could not send notification to user: " + userIdentifier.ToUserIdentifier());
        Logger.Warn(ex.ToString(), ex);
    }

}

前端代码则要简单得多
新建messageHandler_backend_auth.ts处理程序

import * as signalR from "@microsoft/signalr";

创建一个HubConnection对象hubConnection,用于接收SignalR消息

const baseURL = "http://localhost:44311/"; // url = base url + request url
const requestUrl = "signalr";
let header = {};
if (UserModule.token) {
  header = {
    "X-XSRF-TOKEN": UserModule.token,
    Authorization: "Bearer " + UserModule.token,
  };
}

//signalR config
const hubConnection: signalR.HubConnection = new signalR.HubConnectionBuilder()
  .withUrl(baseURL + requestUrl, {
    headers: header,
    accessTokenFactory: () => getAccessToken(),
    transport: signalR.HttpTransportType.WebSockets,
    logMessageContent: true,
    logger: signalR.LogLevel.Trace,
  })
  .withAutomaticReconnect()
  .withHubProtocol(new signalR.JsonHubProtocol())
  .build();

我们只需要响应后端程序中定义好的signalR消息的methodName就可以了

hubConnection.on("getRoomMessageNotification", (n: MessageItemDto) => {
  console.info(n.msg)
  if (ChatModule.currentChannel._id != n.rid) {
    ChatModule.increaseChannelUnread(n.rid);
  } else {
    if (n.t == null) {
      n.from =
        n.u.username == UserModule.userName
          ? constant.MSG_FROM_SELF
          : constant.MSG_FROM_OPPOSITE;
    } else {
      n.from = constant.MSG_FROM_SYSTEM;
    }
    ChatModule.appendMessage(n);
  }
});

hubConnection.on("getRoomSubscriptionChangedNotification", (n) => {
  console.info(n.method, n.msg)

  if (n.method == "insert") {
    console.info(n.msg + "has been inserted!");

    ChatModule.insertChannel(n.msg);

  }
  else if (n.method == "update") {

  }
});

至此,完成了所有的集成工作。

此文目的是介绍一种思路,使用缓存生命周期管理的相关机制,规避第三方用户系统对现有项目的用户系统的影响。举一反三,可以用到其他Paas的方案集成中。最近ChatGPT很火,可惜没时间研究怎么接入,有闲工夫的同学们可以尝试着写一个ChatGPT聊天机器人,欢迎大家评论留言!

最终效果如图
在这里插入图片描述

项目地址

Github:matoapp-samples

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/380708.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用Vue展示数据(动态查询)

学习内容来源&#xff1a;视频P4 本篇文章进度接着之前的文章进行续写 精简前后端分离项目搭建 Vue基础容器使用 目录选择组件修改表格组件修改分页组件增加后端接口前端请求数据接口页面初始化请求数据点击页码请求数据选择组件 在官方文档中选择现成的组件&#xff0c;放在页…

大数据技术——概述

根据IBM前首席执行官郭士纳的观点&#xff0c;IT领域每隔十五年就会迎来一次重大变革三次信息化浪潮1.存储设备容量不断增加2.CPU处理能力大幅提升3.网络带宽不断增加运营式系统阶段数据库的出现使得数据管理的复杂度大大降低,数据往往伴随着一定的运营活动而产生并记录在数据库…

手把手搭建springboot项目06-springboot整合RabbitMQ及其原理和应用场景

目录前言工作流程-灵魂画手名词解释交换机类型一、安装1.1 [RabbitMQ官网安装](https://www.rabbitmq.com/download.html)1.2 Docker安装并启动二、食用教程2.1.导入依赖2.2 添加配置2.3 代码实现2.3.1 直连&#xff08;Direct&#xff09;类型2.3.2 引入消息手动确认机制2.3.2…

【保姆级】Java后端查询数据库结果导出xlsx文件+打印xlsx表格

目录前言一、需求一&#xff1a;数据库查询的数据导出成Excel表格1.1 Vue前端实现导出按钮点击事件1.2 后端根据数据库查询结果生成xlsx文件二、需求二&#xff1a;对生成的xlsx文件调用打印机打印2.1 Vue前端实现按钮事件2.2 后端实现打印前言 最近在弄一个需求&#xff0c;需…

低代码如何推动自动化未来

一项全球研究表明&#xff0c;企业平均每个月有60个小时的工作是手动完成的&#xff0c;也就是每个员工每天花3个小时完成文件归档、数据输入和报告整合&#xff0c;而这些工作都是可以通过自动化的方式完成的。 组织实现数字化转型的关键环节就是自动化。通过自动化&#xff…

温控负荷的需求响应潜力评估及其协同优化管理研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

登录Oracle数据库遇到ORA-01017密码错误的解决办法

文章目录症状分析解决办法欢迎加下方我的微信&#x1f447;&#xff0c;拉你入学习群我们在登录Oracle数据库时可能会遇到ORA-01017错误&#xff0c;这里分析原因并提供解决办法。点击试看博主的专著《MySQL 8.0运维与优化》&#xff08;清华大学出版社&#xff09; 症状 图像…

Linux rpm安装mysql

个人记录 第一步&#xff1a;卸载已安装的mysql rpm -qa | grep -i mysql 查询已安装的mysql1、确认停止mysql服务 2、删除卸载mysql –nodeps&#xff1a;表示强制卸载&#xff0c;如果因为依赖关系导致卸载不成功&#xff0c;加上强制卸载选项–nodeps rpm -ev mysql-com…

C盘爆满?两个超简单的解决办法

我们在使用电脑的过程中&#xff0c;经常容易出现C盘爆红&#xff0c;反而其他盘还有大量可用空间的情况。为什么会这样呢&#xff1f;其实主要就两种原因&#xff1a;一是电脑使用习惯不好&#xff0c;不管什么软件都默认安装在C盘&#xff0c;大文件又喜欢放在桌面&#xff0…

Python(青铜时代)——列表

列表 在Python中&#xff0c;所有 非数字型变量 都支持以下特点&#xff1a; 都是一个序列 sequence, 也可以理解为 容器 取值 [] 遍历 for in 计算长度、最大/最小值、比较、删除 链接 和 重复 * 切片 列表的定义 List (列表) 是Python 中使用 最频繁 的数据类型&#…

6.3 负反馈放大电路的方块图及一般表达式

一、负反馈放大电路的方块图表示法 任何负反馈放大电路都可以用图6.3.1所示的方块图来表示&#xff0c;上面一个方块是负反馈放大电路的基本放大电路&#xff0c;下面一个方块是反馈放大电路的反馈网络。负反馈放大电路的基本放大电路是在断开反馈且考虑了反馈网络的负载效应的…

让您的客户了解您的制造过程“VR云看厂实时数字化展示”

一、工厂云考察&#xff0c;成为市场热点虚拟现实&#xff08;VR&#xff09;全景技术问世已久&#xff0c;但由于应用范围较为狭窄&#xff0c;一直未得到广泛应用。国外客户无法亲自到访&#xff0c;从而导致考察难、产品取样难等问题&#xff0c;特别是对于大型制造企业来说…

剑指 Offer 14-剪绳子

摘要 ​​​​​​剑指 Offer 14- I. 剪绳子 剑指 Offer 14- II. 剪绳子 II 343. 整数拆分 一、动态规划解析 这道题给定一个大于1的正整数n&#xff0c;要求将n 拆分成至少两个正整数的和&#xff0c;并使这些正整数的乘积最大化&#xff0c;返回最大乘积。令x是拆分出的第…

Spark Transformation转换算子和Action行动算子

1、Transformation转换算子 RDD整体上分为Value类型、双Value类型和Key-Value类型 1.1&#xff0c;Value类型 1.1.1&#xff0c;map()映射 object value01_map {def main(args: Array[String]): Unit {//1.创建SparkConf并设置App名称val conf new SparkConf().setAppName(…

c语言入门-5-字符串

c语言入门-5-字符串正文1、字符串怎么用方式一方式二2、字符串的长度深度解析1 字符串的特性2 \0 的含义3 ascii码表下一篇正文 1、字符串怎么用 方式一 // 字符串的标准使用方式&#xff0c;用char类型的数组表示字符串 #include<stdio.h> int main() {char arr[] &…

语音识别技术对比分析

文章目录一、语音识别产品对比二、百度语音识别产品1、套餐及价格&#xff1a;2、官网3、调研结果三、华为语音识别产品四、阿里云语音识别产品1、套餐及价格&#xff1a;2、官网地址3、调研结果五、科大讯飞语音识别产品1、套餐及价格&#xff1a;2、官网3、调研结果六、有道语…

一、Redis入门概述(是什么,能干嘛,去哪下,怎么玩)

一. redis是什么&#xff1f; Redis:REmote Dictionary Server(远程字典服务器)官方解释&#xff1a; Remote Dictionary Server(远程字典服务)是完全开源的&#xff0c;使用ANSIC语言编写遵守BSD协议&#xff0c;是一个高性能的Key-Value数据库提供了丰富的数据结构&#xff…

何谓dB , dB怎么理解?

dB 是什么单位 ?愈低愈好吗?对于声频 ( 声学及电子声学 ) 方面的单位&#xff0c;它是以分贝(decibel &#xff0c;dB ) 来做结果的。斯多里一生专注于科学,1876 发明电话&#xff0c;我们都知道贝尔发明了电话&#xff0c;然而重要的是&#xff0c;他发现我们人类耳朵对声音…

一文带你了解什么是PACS系统源码

▷ 运维级带三维重建和还原的医院PACS系统有源码&#xff0c;有演示&#xff0c;带使用手册和操作说明书。 ▷ PACS系统及影像存取与传输系统( Picture Archiving and Communication System)&#xff0c;为以实现医学影像数字化存储、诊断为核心任务&#xff0c;从医学影像设备…

uniapp小程序接入腾讯地图sdk

新建一个项目。配置uniapp配置文件设置小程序的appid注意&#xff1a;匿名用户可能存在地理定位失效。查uniapp官网官网->apiuni.getLocation(OBJECT) 获取当前的地理位置、速度。属性&#xff1a;success匿名函数返回值&#xff1a;uni.getLocation({type: gcj02,success: …