libwebsockets实现异步websocket客户端,服务端异常断开可重连

news2025/3/13 13:10:02

libwebsockets
websocket客户端基本流程网上都有,我只额外优化了重连机制。
在服务器异常断开时不触发LWS_CALLBACK_CLOSEDLWS_CALLBACK_CLIENT_CONNECTION_ERROR,导致无法自动重连
通过定时检查链接是否可写入判断链接是否有效

 // 判断wsi是否可用
    if ((std::chrono::duration_cast<std::chrono::seconds>(
             std::chrono::system_clock::now().time_since_epoch())
             .count() -
         last_time) > detect_dup)
    {
      if (lws_callback_on_writable(wsi) <= 0)
      {

        std::cerr
            << "[WebSocket] Connection failed, retrying in 3s..." << std::endl;
        wsi = nullptr;
        // continue;
      }
      last_time = std::chrono::duration_cast<std::chrono::seconds>(
                      std::chrono::system_clock::now().time_since_epoch())
                      .count();
    }

完整代码

#ifndef MYWSCLIENT_H
#define MYWSCLIENT_H

#pragma once

#include <iostream>
#include <thread>

#include <libwebsockets.h>
#include <atomic>
#include <functional>
#include <mutex>
#include <condition_variable>

/*
异步启动WebSocket
自动重连
*/
class MyWSClient
{
public:
  using MessageCallback = std::function<void(const std::string &)>;

  MyWSClient(const std::string &url, int port, const std::string &path, MessageCallback onMessage = nullptr);

  ~MyWSClient();

  void start();

  void stop();

  void sendMessage(const std::string &message);

private:
  std::string url;
  std::string path;
  int port;
  int detect_dup = 3; // s
  MessageCallback onMessageCallback;
  struct lws_context *context;
  struct lws *wsi;
  std::thread wsThread;
  std::atomic<bool> running;
  std::mutex sendMutex;
  std::vector<std::string> sendQueue;

  void run();
  void reconnect();
  static int callback(struct lws *wsi, enum lws_callback_reasons reason,
                      void *user, void *in, size_t len);
  static struct lws_protocols protocols[];
};

#endif
#include "MyWSClient.h"

#include "spdlog/spdlog.h"

using namespace std;

MyWSClient::MyWSClient(const std::string &url, int port, const std::string &path, MessageCallback onMessage)
    : url(url), port(port), path(path), onMessageCallback(onMessage), context(nullptr), wsi(nullptr), running(false) {}

MyWSClient::~MyWSClient() { stop(); }

void MyWSClient::start()
{
  running = true;
  wsThread = std::thread(&MyWSClient::run, this);
}

void MyWSClient::stop()
{

  running = false;
  if (context)
  {
    lws_context_destroy(context);
    context = nullptr;
  }
  if (wsThread.joinable())
  {
    wsThread.join();
  }
}

void MyWSClient::sendMessage(const string &message)
{

  if (!wsi)
  {
    std::cout << __func__ << " error send, ws server not connected" << std::endl;
    return;
  }

  std::lock_guard<std::mutex> lock(sendMutex);
  sendQueue.push_back(message);
  if (wsi)
  {
    int res = lws_callback_on_writable(wsi);
    std::cout << __func__ << " send :" << message << ", res:" << res << std::endl;
  }
}
void MyWSClient::run()
{
  struct lws_context_creation_info ctx_info = {};
  struct lws_client_connect_info conn_info = {};

  ctx_info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
  ctx_info.port = CONTEXT_PORT_NO_LISTEN;
  ctx_info.protocols = protocols;
  context = lws_create_context(&ctx_info);

  if (!context)
  {
    std::cerr << "[WebSocket] Failed to create context" << std::endl;
    return;
  }

  conn_info.context = context;
  conn_info.address = url.c_str();
  conn_info.port = port;
  conn_info.path = path.c_str();
  conn_info.host = url.c_str();
  conn_info.origin = url.c_str();
  conn_info.protocol = "ws";
  conn_info.ssl_connection = LCCSCF_USE_SSL |
                             LCCSCF_ALLOW_SELFSIGNED |
                             LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK |
                             LCCSCF_ALLOW_EXPIRED;
  conn_info.userdata = this;

  long last_time = std::chrono::duration_cast<std::chrono::seconds>(
                       std::chrono::system_clock::now().time_since_epoch())
                       .count();

  while (running)
  {
    if (!wsi)
    {
      std::this_thread::sleep_for(std::chrono::seconds(3));
      std::cout << "[WebSocket] Attempting to connect..." << std::endl;
      wsi = lws_client_connect_via_info(&conn_info);
    }

    // 判断wsi是否可用
    if ((std::chrono::duration_cast<std::chrono::seconds>(
             std::chrono::system_clock::now().time_since_epoch())
             .count() -
         last_time) > detect_dup)
    {
      if (lws_callback_on_writable(wsi) <= 0)
      {

        std::cerr
            << "[WebSocket] Connection failed, retrying in 3s..." << std::endl;
        wsi = nullptr;
        // continue;
      }
      last_time = std::chrono::duration_cast<std::chrono::seconds>(
                      std::chrono::system_clock::now().time_since_epoch())
                      .count();
    }

    if (lws_service(context, 0) < 0)
    {
      std::cerr
          << "[WebSocket] lws_service failed" << std::endl;
    }
  }

  lws_context_destroy(context);
}

void MyWSClient::reconnect()
{
  stop();
  std::cerr
      << "[WebSocket] reconnect, retrying in 3s..." << std::endl;
  run();
}

int MyWSClient::callback(lws *wsi, lws_callback_reasons reason, void *user, void *in, size_t len)
{
  MyWSClient *client = (MyWSClient *)lws_wsi_user(wsi);

  switch (reason)
  {
  case LWS_CALLBACK_CLIENT_ESTABLISHED:
  {
    std::cout << "[WebSocket] Connected to server" << std::endl;
    lws_callback_on_writable(wsi); // 请求写入
    break;
  }

  case LWS_CALLBACK_CLIENT_RECEIVE:
  {
    if (client->onMessageCallback)
    {
      client->onMessageCallback(std::string((char *)in, len));
    }
    break;
  }

  case LWS_CALLBACK_CLIENT_WRITEABLE:
  {
    std::lock_guard<std::mutex> lock(client->sendMutex);
    if (!client->sendQueue.empty())
    {
      std::string message = client->sendQueue.front();
      client->sendQueue.erase(client->sendQueue.begin());

      std::cout << __func__ << " send:" << message << std::endl;

      // 确保 LWS_PRE 字节已预留
      unsigned char buf[LWS_PRE + 1024] = {0};
      int msgLen = message.size();
      memcpy(buf + LWS_PRE, message.c_str(), msgLen);

      int sent = lws_write(wsi, buf + LWS_PRE, msgLen, LWS_WRITE_TEXT);
      if (sent < 0)
      {
        std::cerr << __func__ << "lws_write failed!" << std::endl;
      }

      // 如果还有数据,继续请求写入
      if (!client->sendQueue.empty())
      {
        lws_callback_on_writable(wsi);
      }
    }
    break;
  }

  case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
  {
    std::cerr << "[WebSocket] Connection error: " << (in ? (char *)in : "unknown") << std::endl;
    client->wsi = nullptr;
    // client->reconnect();
    break;
  }

  case LWS_CALLBACK_CLOSED:
  {
    std::cout << "[WebSocket] Connection closed" << std::endl;
    client->wsi = nullptr;
    // client->reconnect();
    break;
  }

  default:
    break;
  }
  return 0;
}

// 定义协议数组
struct lws_protocols MyWSClient::protocols[] = {
    {"ws", MyWSClient::callback, 0, 4096},
    {nullptr, nullptr, 0, 0}};

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2314311.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

轻量级模块化前端框架:快速构建强大的Web界面

轻量级模块化前端框架&#xff1a;快速构建强大的Web界面 在当今快节奏的Web开发环境中&#xff0c;选择一个高效且灵活的前端框架至关重要。UIkit 是一个轻量级的模块化前端框架&#xff0c;旨在帮助开发者快速构建功能强大且响应迅速的Web界面。 UIkit提供了丰富的组件和工…

qt+opengl 播放yuv视频

一、实现效果 二、pro文件 Qt widgets opengl 三、主要代码 #include "glwidget.h"GLWidget::GLWidget(QWidget *parent) : QOpenGLWidget(parent) {connect(&m_timer, &QTimer::timeout, this,[&](){this->update();});m_timer.start(1000/33); }v…

5G基本概念

作者:私语茶馆 1. 5G应用场景概述 1.1.5G应用场景 ITU域2015年定义了三大应用场景:eMBB(增强型移动宽带)、uRLLC(低时延高可靠通信)、mMTC(海量物联网通信); emBB:Enhanced Mobile Broadband ,移动互联网应用,是4G MBB(移动宽带)的升级,主要侧重于网络速率、带…

PH热榜 | 2025-03-12

1. Fluently 标语&#xff1a;开始说英语&#xff0c;就像说你的母语一样流利。 介绍&#xff1a;想象一下&#xff0c;有一个像人类一样的英语教练&#xff0c;全天候在线、价格却便宜15倍。这就是 Fluently &#x1f680; 纠正你的错误&#xff0c;提升你的词汇量、发音和语…

Python Web项目的服务器部署

一.部署运行 1.虚拟环境的安装&#xff1a;&#xff08;一行一行运行&#xff09; wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O miniconda.sh bash miniconda.sh -b -p /opt/miniconda3 echo export PATH"/opt/miniconda3/bin:$PAT…

[项目]基于FreeRTOS的STM32四轴飞行器: 八.遥控器摇杆

基于FreeRTOS的STM32四轴飞行器: 八.遥控器摇杆 一.摇杆数据的扫描二.处理摇杆数据三.微调按键处理 一.摇杆数据的扫描 下面摇杆初始化时&#xff0c;启动了ADC-DMA进行了采集&#xff0c;已经开始转换直接将数据通过DMA存入buff数组中&#xff1a; static uint16_t buff[4] …

附下载 | 2024 OWASP Top 10 基础设施安全风险.pdf

《2024 OWASP Top 10 基础设施安全风险》报告&#xff0c;由OWASP&#xff08;开放网络应用安全项目&#xff09;发布&#xff0c;旨在提升企业和组织对基础设施安全风险、威胁与漏洞的意识&#xff0c;并提供高质量的信息和最佳实践建议。报告列出了2024年最重要的10大基础设施…

Pytorch的一小步,昇腾芯片的一大步

Pytorch的一小步&#xff0c;昇腾芯片的一大步 相信在AI圈的人多多少少都看到了最近的信息&#xff1a;PyTorch最新2.1版本宣布支持华为昇腾芯片&#xff01; 1、 发生了什么事儿&#xff1f; 在2023年10月4日PyTorch 2.1版本的发布博客上&#xff0c;PyTorch介绍的beta版本…

化工厂防爆气象站:为石油化工、天然气等领域提供安全保障

【TH-FB02】在石油化工、天然气等高危行业中&#xff0c;安全生产是至关重要的。这些行业常常面临着易燃易爆、有毒有害等潜在风险&#xff0c;因此&#xff0c;对气象条件的监测和预警显得尤为重要。化工厂防爆气象站作为一种专门设计用于这些特殊环境的气象监测设备&#xff…

《A Gentle Introduction to Graph Neural Networks》-GNN的综述性论文

目录 一、什么数据可以表示成一张图 &#xff08;1&#xff09;什么是图&#xff1f; &#xff08;2&#xff09;如何表示图的属性 &#xff08;3&#xff09;images as graphs&#xff08;将图片表示为图&#xff09; &#xff08;4&#xff09;text as graphs&#xff08…

[023-01-40].第40节:组件应用 - OpenFeign与 Sentinel 集成实现fallback服务降级

SpringCloud学习大纲 一、需求说明&#xff1a; 需求1&#xff1a;通过fallback属性进行统一配置 a.问题分析&#xff1a; 1.需要实现cloudalibaba-consumer-nacos-order83模块通过OpenFeign调用cloudalibaba-provider-payment9001 83服务通过OpenFeign调用 9001微服务&…

设计模式-结构型模式-装饰器模式

概述 装饰器模式 : Decorator Pattern : 是一种结构型设计模式. 作用 &#xff1a; 允许你动态地给对象添加功能或职责&#xff0c;而无需修改其原始类的代码,非常的符合 开闭原则。 实现思路 &#xff1a;通过创建一个包装对象&#xff08;即装饰器&#xff09;&#xff0c;来…

visual studio配置opencv

文章目录 step1 下载opencvstep2 配置包含目录step 3 配置链接器step4 配置环境变量并重启vs2022step5 检查代码 step1 下载opencv 下载 opencv-4.8.0-windows.exe https://cloud.189.cn/web/share?codefUnqEb7naUra step2 配置包含目录 step 3 配置链接器 step4 配置环境变…

docker修改daemon.json文件后无法启动

1.问题描述 使用阿里云docker镜像安装的docker&#xff0c;安装成功后默认可以启动。但是修改daemon.json配置后docker服务无法启动&#xff0c;提示如下错误&#xff1a; 从上图发现&#xff0c;docker服务默认使用阿里docker镜像仓库 2.解决方法 根据提示找到docker服务目…

Linux网络:网络与操作系统1

本文是介绍网络的基本结构&#xff0c;以及和OS之间有什么关系 OSI七层模型 引入 使用网络是为了解决信息的长距离传送&#xff0c;那就需要解决四个问题&#xff1a; 接收方如何使用数据传输的可靠性主机如何定位数据包在局域网如何转发 人们选择用网络协议&#xff08;t…

姚安娜新剧瘦了一圈,《仁心俱乐部》急诊医生顾诗宜在线上岗

《仁心俱乐部》在芒果 TV 播出&#xff0c;湖南卫视金鹰独播剧场也随之播出&#xff0c;这一剧集受到了不少观众的关注。姚安娜在剧中饰演的急诊科医生顾诗宜&#xff0c;她为患者检查身体时动作娴熟&#xff0c;与患者沟通时展现出的耐心和专注&#xff0c;都展现出很高的专业…

串口数据记录仪DIY,体积小,全开源

作用 产品到客户现场出现异常情况&#xff0c;这个时候就需要一个日志记录仪、黑匣子&#xff0c;可以记录产品的工作情况&#xff0c;当出现异常时&#xff0c;可以搜集到上下文的数据&#xff0c;从而判断问题原因。 之前从网上买过&#xff0c;但是出现过丢数据的情况耽误…

51单片机Proteus仿真速成教程——P1-软件与配置+Proteus绘制51单片机最小系统+新建程序模版

前言&#xff1a;本文主要围绕 51 单片机最小系统的绘制及程序模板创建展开。首先介绍了使用 Proteus 绘制 51 单片机最小系统的详细步骤&#xff0c;包括软件安装获取途径、工程创建、器件添加&#xff08;如单片机 AT89C51、晶振、电容、电阻、按键等&#xff09;、外围电路&…

使用 pytesseract 进行 OCR 识别:以固定区域经纬度提取为例

引言 在智能交通、地图定位等应用场景中&#xff0c;经常会遇到需要从图像中提取经纬度信息的需求。本篇文章将介绍如何利用 Python 的 pytesseract 库结合 PIL 对图像进行预处理&#xff0c;通过固定区域裁剪&#xff0c;来有效地识别出图像上显示的经纬度信息。 1. OCR 与 …

【51单片机】程序实验15.DS18B20温度传感器

主要参考学习资料&#xff1a;B站【普中官方】51单片机手把手教学视频 开发资料下载链接&#xff1a;http://www.prechin.cn/gongsixinwen/208.html 单片机套装&#xff1a;普中STC51单片机开发板A4标准版套餐7 目录 DS18B20介绍主要特性内部结构控制时序初始化时序写时序读时序…