记录一次gRpc流式操作(jedis版)

news2024/9/30 15:34:33

使用背景: 从redis队列中发送和消费消息.(使用gRpc的流式实现的消费消息)

gRpc协议类定义

service方法定义
service MQDataService{
rpc sendFacebookAndroidMsg(google.protobuf.StringValue)returns (ResultProto);
rpc receiveFacebookAndroidMsg(empty)returns (stream google.protobuf.StringValue);
}

服务端写法


    @Override
    public void sendFacebookAndroidMsg(StringValue request, StreamObserver<ResultProto> responseObserver) {
        CacheKey cacheKey= AppKey.appReport;
        String key=cacheKey.get_keyName().replace("{PLATFORM}", MqTopic.FB_TOPIC)
                .replace("{APPTYPE}", "0");
        RedissonFactory.pushMsg(key, request.getValue(), cacheKey.get_dbIndex(),cacheKey.get_expireSecondTime());
        ResultProto.Builder builder = ResultProto.newBuilder();
        builder.setCode(ResultType.SUCCESS);
        responseObserver.onNext(builder.build());
        responseObserver.onCompleted();
    }
    
    @Override
    public void receiveFacebookAndroidMsg(empty request, StreamObserver<StringValue> responseObserver) {
        MQListener mqListener=new MQListener(responseObserver);
        try {
            CacheKey cacheKey= AppKey.appReport;
            String key=cacheKey.get_keyName().replace("{PLATFORM}", MqTopic.FB_TOPIC)
                    .replace("{APPTYPE}","0");

            RedissonFactory.getRedis().subscribe(mqListener,key);
        } catch (Exception e) {

        }
        finally {
            responseObserver.onCompleted();
        }
    }

// 消息监听响应
public class MQListener extends JedisPubSub {

    public MQListener(StreamObserver<StringValue> responseObserver)
    {
        _responseObserver=responseObserver;
    }

    private StreamObserver<StringValue> _responseObserver;
    // 取得订阅的消息后的处理
    public void onMessage(String channel, String message) {
        if(!StringUtil.isNullOrEmpty(message)){
            StringValue.Builder builder = StringValue.newBuilder();
            builder.setValue(message);
            _responseObserver.onNext(builder.build());
        }
    }

    // 初始化订阅时候的处理
    public void onSubscribe(String channel, int subscribedChannels) {
    ...
    }

    // 取消订阅时候的处理
    public void onUnsubscribe(String channel, int subscribedChannels) {
     ...
    }

    // 初始化按表达式的方式订阅时候的处理
    public void onPSubscribe(String pattern, int subscribedChannels) {
       ...
    }

    // 取消按表达式的方式订阅时候的处理
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
       ...
    }

    // 取得按表达式的方式订阅的消息后的处理
    public void onPMessage(String pattern, String channel, String message) {
     ...
    }
}

客户端写法

public static void receiveFacebookAndroidMsg() {
        try {
            log.info("facebook android msg");
            // 接收消息
            StreamObserver<StringValue> responseObserver = new StreamObserver<StringValue>() {
                @Override
                public void onNext(StringValue msgProto) {
                    try {
                        log.info("facebook android msg 接收到消息: {}", msgProto.getValue());
                        JSONObject jsonObject = JSONObject.parseObject(msgProto.getValue());
                        ...
                    } catch (Exception e) {
                        log.error("facebook ios msg 消费失败{}", e.getMessage());
                        // 发给mq重新消费
                       ...
                    }
                }

                @Override
                public void onError(Throwable throwable) {
                    System.err.println("Error occurred: " + throwable.getMessage());
                    log.info("facebook android Error occurred: {}", throwable.getMessage());
                }

                @Override
                public void onCompleted() {
                    System.out.println("Stream completed.");
                    log.info("facebook android Stream completed.");
                }
            };
            log.info("接收fb android msg 开始");
            ClientManager.getMqDataServiceStub().receiveFacebookAndroidMsg(empty.newBuilder().build(), responseObserver);
            log.info("接收fb android msg 成功");
        } catch (Exception e) {
            log.info("出错了");
        }
    }

源码下载

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2179977.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数字化智能工厂应用场景

数字化智能工厂的应用场景广泛&#xff0c;涵盖了多个行业和领域。以下是一些主要的应用场景&#xff1a; 一、制造业 汽车制造&#xff1a;数字化智能工厂在汽车制造业中得到了广泛应用。通过自动化生产线、机器人、物联网和人工智能等技术&#xff0c;汽车制造商能够实现高…

三分钟速览:Node.js 版本差异与关键特性解析

Node.js 是一个广泛使用的 JavaScript 运行时环境&#xff0c;允许开发者在服务器端运行 JavaScript 代码。随着技术的发展&#xff0c;Node.js 不断推出新版本&#xff0c;引入新特性和改进。了解不同版本之间的差异对于开发者来说至关重要。以下是一个快速指南&#xff0c;帮…

Redis篇(应用案例 - 附近商户)(持续更新迭代)

目录 一、GEO数据结构的基本用法 二、导入店铺数据到GEO 三、实现附近商户功能 一、GEO数据结构的基本用法 GEO就是Geolocation的简写形式&#xff0c;代表地理坐标。 Redis在3.2版本中加入了对GEO的支持&#xff0c;允许存储地理坐标信息&#xff0c;帮助我们根据经纬度来…

【高性能内存池】基本框架 + 固定长度内存池实现 1

高性能内存池 1. 基本框架2. 定长内存池的实现2.1 介绍定长内存池2.2 T* New()2.3 void Delete(T* obj) 3. 源码&#xff08;附赠测试&#xff09;4. 总结 1. 基本框架 高并发内存池主要由三个部分构成&#xff1a; 1.thread cache:用于小于256KB的内存的分配。线程缓存是每个…

opencv实战项目(三十):使用傅里叶变换进行图像边缘检测

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 一&#xff0c;什么是傅立叶变换&#xff1f;二&#xff0c;图像处理中的傅立叶变换&#xff1a;三&#xff0c;傅里叶变换进行边缘检测&#xff1a; 一&#xff0c…

适合初学者的[JAVA]: 基础面试题

目录 说明 前言 String/StringBuffer/StringBuilder区别 第一点: 第二点: 总结&#xff1a; 反射机制 JVM内存结构 运行时数据区域被划分为5个主要组件&#xff1a; 方法区&#xff08;Method Area&#xff09; 堆区&#xff08;Heap Area&#xff09; 栈区&#x…

局部整体(七)利用python绘制圆形嵌套图

局部整体&#xff08;七&#xff09;利用python绘制圆形嵌套图 圆形嵌套图&#xff08; Circular Packing&#xff09;简介 将一组组圆形互相嵌套起来&#xff0c;以显示数据的层次关系&#xff0c;类似于矩形树图。数据集中每个实体都由一个圆表示&#xff0c;圆圈大小与其代…

Spring Task 调度任务

Spring Task是调度任务框架&#xff0c;通过配置&#xff0c;程序可以按照约定的时间自动执行代码逻辑&#xff0c;基于注解方式实现需要如下注解&#xff1a; Component 任务调度类交给Spring IOC容器管理EnableScheduling 启用 Spring 的定时任务&#xff08;Scheduling&…

专业学习|随机规划概观(内涵、分类以及例题分析)

一、随机规划概览 &#xff08;一&#xff09;随机规划的定义 随机规划是通过考虑随机变量的不确定性来制定优化决策的一种方法。其基本思想是在决策过程中&#xff0c;目标函数和约束条件可以包含随机因素。 &#xff08;1&#xff09;重点 随机规划的中心问题是选择参数&am…

最新版ingress-nginx-controller安装 使用host主机模式

最新版ingress-nginx-controller安装 使用host主机模式 文章目录 最新版ingress-nginx-controller安装 使用host主机模式单节点安装方式多节点高可用安装方式 官方参考链接&#xff1a; https://github.com/kubernetes/ingress-nginx/ https://kubernetes.github.io/ingress-ng…

05_中断与数码管动态显示

中断是单片机系统重点中的重点&#xff0c;因为有了中断&#xff0c;单片机就具备了快速协调多模块工作的能力&#xff0c;可以完成复杂的任务。本章将首先带领大家学习一些必要的 C 语言基础知识&#xff0c;然后讲解数码管动态显示的原理&#xff0c;并最终借助于中断系统来完…

VS code user setting 与 workspace setting 的区别

VS code user setting 与 workspace setting 的区别 引言正文引言 相信有不少开始接触 VS code 的小伙伴会有疑问,user setting 与 workspace setting 有什么区别呢?这里我们来说明一下 正文 首先,当我们使用 Ctrl + Shift + P 打开搜索输入 setting 后,可以弹出 4 个se…

SSM+Vue家教平台系统

目录 1 项目介绍2 项目截图3 核心代码3.1 Controller3.2 Service3.3 Dao3.4 spring-mybatis.xml3.5 spring-mvc.xml3.5 Vue 4 数据库表设计5 文档参考6 计算机毕设选题推荐7 源码获取 1 项目介绍 博主个人介绍&#xff1a;CSDN认证博客专家&#xff0c;CSDN平台Java领域优质创作…

网站建设中,https协议和http协议分别是什么,有什么区别?

HTTP&#xff08;超文本传输协议&#xff09;和 HTTPS&#xff08;安全超文本传输协议&#xff09;是互联网通信中两种非常关键的协议&#xff0c;它们在安全性、性能以及证书等方面存在区别。以下是具体分析&#xff1a; 安全性 HTTP&#xff1a;数据传输以明文形式进行&#…

宝塔搭建nextcould 30docker搭建onlyoffic8.0

宝塔搭建nextcould 宝塔搭建nextcould可以参考这两个博文 我搭建的是30版本的nextcould&#xff0c;服务组件用的是下面这些&#xff0c;步骤是一样的&#xff0c;只是版本不一样而已 nginx 1.24.0 建议选择nginx&#xff0c;apache没成功。 MySQL 8.0以上都可以 php 8.2.…

“你好BOE”重磅亮相首届上海国际光影节 打造“艺术x科技”顶级影像盛宴

黄浦江畔,北外滩胜地。作为首届上海国际光影节虹口区分会场的重点项目之一,9月29日-10月5日,BOE(京东方)年度标杆性品牌巡展IP“你好BOE”Super O SPACE影像科技展在上海北外滩滨江5米平台盛大启幕,BOE(京东方)携手上海电影、上影元、OUTPUT、新浪微博、海信、OPPO、京东等众多…

信创产品测试报告有什么作用?测试依据是什么?

一、信创产品测试报告是什么&#xff1f; 针对于某一款具体的软件产品或硬件产品进行的产品测试&#xff0c;验证其是否符合信创的要求。这一类产品&#xff0c;主要分为四类&#xff1a; 三类九款产品&#xff08;计算机终端、操作系统、数据库&#xff09;&#xff1b;通用…

【Python快速学习笔记02】基础语法学习(变量等)

目录 1.标识符与代码书写注意点 2.变量类型 1.标识符与代码书写注意点 &#xff08;1&#xff09;组成&#xff1a;字母&#xff0c;下划线&#xff0c;数字 &#xff08;2&#xff09;注意点&#xff1a;但是不能由数字开头&#xff0c;区分大小写 &#xff08;3&#xff…

AltiumDesigner脚本开发-DIP封装制作

1.点击工具栏的运行工具(蓝色向右三角图标)可以执行脚本程序&#xff1b; 2.点击菜单栏Run->Run可以执行脚本程序&#xff1b; 3.在脚本编辑器中&#xff0c;按键盘的F9键可以执行脚本程序&#xff1b; 4.通过菜单栏执行脚本程序&#xff08;需要将程序添加到菜单栏中&am…

Qt多线程操作sqlite数据库

问题 就是为了多线程操作sqlite数据库,为什么,因为数据库是耗时的操作,一条数据的插入,差不多200ms,如果是数据插入多了,界面会有明显的卡顿,因此必须,多线程操作数据库。 问题是这样的: 插入数据之后,接着更新界面;然而,插入数据是比较耗时的操作,尤其插入数据…