今日指数项目项目集成RabbitMQ与CaffienCatch

news2025/1/1 20:33:38

今日指数项目项目集成RabbitMQ与CaffienCatch

一. 为什么要集成RabbitMQ

首先CaffeineCatch 是作为一个本地缓存工具 使用CaffeineCatch 能够大大较少I/O开销

股票项目 主要分为两大工程 --> job工程(负责数据采集) , backend(负责业务处理)

由于股票的实时性也就是说 , 对于股票来说像大盘数据 , 个股数据等都是每分钟进行更新的

而使用传统的采集以及业务处理方式 , 也就是说 数据采集后将数据保存到数据库中 , 然后客户从数据库中反复获取数据

当用户数量增多 , 数据库的I/O开销也会随之增大 , 会导致时效性的降低

所以这里我采用MQ加CaffeineCatch , 在job工程中采集数据后 写入数据库 , 同时通过MQ发送消息给backend工程 重新加载缓存

将数据库中的数据读取到CaffeineCatch 中

在这里插入图片描述

二. job工程代码实现

1. 导入mq依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2. 定义配置文件

spring:
  rabbitmq:
    host: 114.116.244.165 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: jixu
    password: 123321
    virtual-host: /

3. 编写服务端代码

package com.jixu.stock.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqConfig {
    // 定义大盘消息序列化方式
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    // 定义主题交换机
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("stockExchange",true,false);
    }

    // 定义大盘队列
    @Bean
    public Queue stockMarketQueue(){
        return new Queue("marketQueue",true);
    }
    @Bean
    // 绑定大盘信息
    public Binding bindingStockeMarket(){
        // with( Routingkey 参数 --> 匹配的队列名称 )
        return BindingBuilder.bind(stockMarketQueue()).to(topicExchange()).with("inner.market");
    }
}

4. 定义客户端

package com.jixu.stock.config;

import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Date;

@Configuration
@Slf4j
public class MqConfig {
    // 定义大盘消息序列化方式
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    // 客户端接受信息
    @RabbitListener(queues = "marketQueue")
    public void stockMarketListener(Date date){
        long diffTime= DateTime.now().getMillis()-new DateTime(date).getMillis();
        //超过一分钟告警
        if (diffTime>60000) {
            log.error("采集国内大盘时间点:{},同步超时:{}ms",new DateTime(date).toString("yyyy-MM-dd HH:mm:ss"),diffTime);
        }
    }
}

3. 修改业务层代码

在数据插入成功后发送消息给MQ

log.info("当前时间点{} , 数据插入成功", DateTime.now().toString("yyyy-MM-dd HH-mm-ss"));
rabbitTemplate.convertAndSend("stockExchange","inner.market",new Date());

三. backend工程代码实现

首先在实现业务逻辑之前需要导入相关依赖 , 以及配置MQ和CaffineCache

1. 配置MQ配置类

package com.jixu.stock.config;

import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Date;

@Configuration
@Slf4j
public class MqConfig {
    // 定义大盘消息序列化方式
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }


}

2. 配置CaffineCache配置类

 	/**
     * 配置CaffienCatch
     */
    @Bean
    public Cache<String,Object> caffeineCache(){
        Cache<String, Object> cache = Caffeine
                .newBuilder()
                .maximumSize(200)//设置缓存数量上限
//                .expireAfterAccess(1, TimeUnit.SECONDS)//访问1秒后删除
//                .expireAfterWrite(1,TimeUnit.SECONDS)//写入1秒后删除
                .initialCapacity(100)// 初始的缓存空间大小
                .recordStats()//开启统计
                .build();
        return cache;
    }

3. 创建客户端类接收信息

package com.jixu.stock.mq;

import com.jixu.stock.service.StockService;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.github.benmanes.caffeine.cache.Cache;
import java.util.Date;

/**
 * @program: stock_parent
 * @description:
 * @author: jixu
 * @create: 2024-10-01 12:45
 **/
@Component
@Slf4j
public class StockMarketMQ {

    @Autowired
    private Cache caffeineCache;

    @Autowired
    private StockService service;

    // 客户端接受信息
    @RabbitListener(queues = "marketQueue")
    public void stockMarketListener(Date date){
        long diffTime= DateTime.now().getMillis()-new DateTime(date).getMillis();
        //超过一分钟告警
        if (diffTime>60000) {
            log.error("采集国内大盘时间点:{},同步超时:{}ms",new DateTime(date).toString("yyyy-MM-dd HH:mm:ss"),diffTime);
        }


    }
}

在信息接受之后需要对业务层代码进行修改 --> 实现CaffineCache缓存

这里我们使用CaffineCache.get的方法 , 其中会传入两个参数 , 分别是要从CaffineCache中查询的数据的key ,以及如果key不存在使用的补救方法(从数据库中查询)

4. 完善业务代码

/**
     * 实现股票大盘数据查询
     * @return
     */
    @Override
    public R<ArrayList<InnerMarketDomain>> getInnerMarketDomain() {

        R<ArrayList<InnerMarketDomain>> msg = (R<ArrayList<InnerMarketDomain>>) caffeineCache.get("stockMarketMsg" , key -> {
            // 1. 获取最新时间数据
            Date curTime = DateTimeUtil.getLastDate4Stock(DateTime.now()).toDate();
            // 创建mock数据
            curTime = DateTime.parse("2022-01-02 09:32:00", DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toDate();

            // 2. 获取股票代码
            ArrayList<String> marketInfo = stockInfoConfig.getInner();

            // 3. dao层查询数据
            ArrayList<InnerMarketDomain> data = stockMarketIndexInfoMapper.getMarketInfo(curTime , marketInfo);

            return R.ok(data);
        });
        return msg;


    }

5. 完善StockMarketMQ类刷新数据

// 清除caffeineCache中的缓存
caffeineCache.invalidate("stockMarketMsg");
// 调用service重新获取
service.getInnerMarketDomain();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2183013.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Redis】Redis中的 AOF(Append Only File)持久化机制

目录 1、AOF日志 2、AOF 的执行顺序与潜在风险 3、如何优化 AOF&#xff1f;&#xff08;写入策略&#xff09; 4、AOF重写机制&#xff08;防止日志文件无限增长&#xff09; 1、AOF日志 想象一下&#xff0c;Redis 每次执行写操作的时候&#xff0c;都把这些操作以追加的…

SpringBoot项目 | 瑞吉外卖 | 短信发送验证码功能改为免费的邮箱发送验证码功能 | 代码实现

0.前情提要 之前的po已经说了单独的邮箱验证码发送功能怎么实现&#xff1a; https://blog.csdn.net/qq_61551948/article/details/142641495 这篇说下如何把该功能整合到瑞吉项目里面&#xff0c;也就是把原先项目里的短信发送验证码的功能改掉&#xff0c;改为邮箱发送验证…

World of Warcraft [CLASSIC][80][Grandel] /console cameraDistanceMaxZoomFactor 2

学习起来&#xff01;&#xff01;&#xff01; 调整游戏界面镜头距离&#xff0c;默认值为&#xff1a;2 /console cameraDistanceMaxZoomFactor 2 大于4&#xff0c;效果不明显了&#xff0c;鼠标滚轮向后滚&#xff0c;拉起来镜头 World of Warcraft [CLASSIC][80][Grandel…

Another redis desktop manager使用说明

Another redis desktop manager使用说明 概述界面介绍图示说明连接界面设置界面查看操作日志主界面信息进入redis-cli控制台更多 概述 Another Redis Desktop Manager是一个开源的跨平台 Redis 客户端&#xff0c;提供了简洁易用的图形用户界面&#xff08;GUI&#xff09;&am…

第5篇:勒索病毒自救指南----应急响应篇

经常会有一些小伙伴问&#xff1a;中了勒索病毒&#xff0c;该怎么办&#xff0c;可以解密吗&#xff1f; 第一次遇到勒索病毒是在早几年的时候&#xff0c;客户因网站访问异常&#xff0c;进而远程协助进行排查。登录服务器&#xff0c;在站点目录下发现所有的脚本文件及附件…

【JaveEE】——多线程中使用顺序表,队列,哈希表

阿华代码&#xff0c;不是逆风&#xff0c;就是我疯 你们的点赞收藏是我前进最大的动力&#xff01;&#xff01; 希望本文内容能够帮助到你&#xff01;&#xff01; 目录 一&#xff1a;多线程环境使用ArrayList 引入&#xff1a; 1&#xff1a;顺序表使用同步机制 2&…

Linux服务器配置anaconda3,下载torch

如图&#xff0c;vscode连接远程服务器后&#xff0c;如下所示&#xff1a; 下载 Anaconda 下载及安装 进入下载官网&#xff0c;点击linux&#xff0c; 下载方式有两种&#xff0c; 直接下载安装包&#xff0c;下载完上传服务器&#xff0c;并安装&#xff0c;安装执行b…

【算法系列-链表】移除链表元素

【算法系列-链表】移除链表元素 欢迎来到【算法系列】第二弹 &#x1f3c6; 链表&#xff0c;接下来我们将围绕链表这类型的算法题进行解析与练习&#xff01;一起加油吧&#xff01;&#xff01;( •̀ ω •́ )✧✨ 文章目录 【算法系列-链表】移除链表元素1. 算法分析&am…

Spring Data(学习笔记)

JPQL语句&#xff1f;&#xff1f;&#xff1f;&#xff08;Query括号中的就是JPQL语句&#xff09; 怎么又会涉及到连表查询呢&#xff1f; 用注解来实现表间关系。 分页是什么&#xff1f;为什么什么都有分页呢 &#xff1f; 继承&#xff0c;与重写方法的问题 Deque是什么 ?…

线程池:线程池的实现 | 日志

&#x1f308;个人主页&#xff1a; 南桥几晴秋 &#x1f308;C专栏&#xff1a; 南桥谈C &#x1f308;C语言专栏&#xff1a; C语言学习系列 &#x1f308;Linux学习专栏&#xff1a; 南桥谈Linux &#x1f308;数据结构学习专栏&#xff1a; 数据结构杂谈 &#x1f308;数据…

C++容器之vector模拟实现(代码纯享版!!!)

目录 前言 一、头文件 .h文件 总结 前言 本文是模拟实现vector部分功能的代码&#xff0c;可以直接拿去使用 一、头文件 .h文件 #include<assert.h> #include<iostream> using namespace std; namespace zz {template<class T>class vector{public:typedef…

C++ set,multiset与map,multimap的基本使用

1. 序列式容器和关联式容器 string、vector、list、deque、array、forward_list等STL容器统称为序列式容器&#xff0c;因为逻辑结构为线性序列的数据结构&#xff0c;两个位置存储的值之间一般没有紧密的关联关系&#xff0c;比如交换一下&#xff0c;他依旧是序列式容器。顺…

STM32器件支持包安装,STLINK/JLINK驱动安装

一、支持包安装 1、离线安装 先下载支持包之后&#xff0c;再进行安装。如下图要安装STM32F1系列&#xff0c;双击 出现如下&#xff0c;会自动锁定安装路径&#xff0c;然后点击下一步&#xff0c;直接安装。 2、在线安装 首先需要电脑联网。如下。先点击第一个红框绿色按钮…

常见的VPS或者独立服务器的控制面板推荐

随着越来越多的企业和个人转向VPS和独立服务器以获得更高的性能和灵活性&#xff0c;选择合适的控制面板变得尤为重要。一个好的控制面板可以大大简化服务器管理&#xff0c;提高工作效率。本篇文章将介绍2024年最值得推荐的VPS控制面板&#xff0c;帮助您做出明智的选择。 1.…

STL容器适配器

欢迎来到本期节目- - - STL容器适配器 适配器模式&#xff1a; 在C中&#xff0c;适配器是一种设计模式&#xff0c;有时也称包装样式&#xff1b; 通过将类自己的接口包裹在一个已存在的类中&#xff0c;使得因接口不兼容而不能在一起工作的类能在一起工作&#xff1b; 也就…

使用VBA快速生成Excel工作表非连续列图片快照

Excel中示例数据如下图所示。 现在需要拷贝A2:A15,D2:D15,J2:J15,L2:L15,R2:R15为图片&#xff0c;然后粘贴到A18单元格&#xff0c;如下图所示。 大家都知道VBA中Range对象有CopyPicture方法可以拷贝为图片&#xff0c;但是如果Range对象为非连续区域&#xff0c;那么将产生10…

详解DHCP服务工作原理及配置案例

一. DHCP概述 DHCP&#xff08;Dynamic Host Configuration Protocol&#xff0c;动态主机配置协议&#xff09;是一个主机IP简化分配管理的TCP/IP协议&#xff0c;用户通过DHCP服务器动态的分配给客户端IP地址及其他环境的配置工作&#xff0c;包括IP地址、子网掩码、网关和…

【NVIDIA】如何使用nvidia-smi命令管理和监控GPU

博主未授权任何人或组织机构转载博主任何原创文章&#xff0c;感谢各位对原创的支持&#xff01; 博主链接 本人就职于国际知名终端厂商&#xff0c;负责modem芯片研发。 在5G早期负责终端数据业务层、核心网相关的开发工作&#xff0c;目前牵头6G算力网络技术标准研究。 博客…

KPConv: Flexible and Deformable Convolution for Point Clouds

Abstract Kernel Point Convolution&#xff08;KPConv&#xff09;是一种点云卷积方法&#xff0c;它可以直接在点云数据上进行操作&#xff0c;无需任何中间的表示形式。方法的核心在于使用核点来定义卷积权重&#xff0c;核点位于欧几里得空间中&#xff0c;并仅对靠近它们…

Spring DI 笔记

目录 1.什么是DI? 2.依赖注入的三种⽅式 2.1属性注⼊ 2.2构造⽅法注⼊ 2.3Setter 注⼊ 2.4三种注⼊优缺点分析 3.Autowired存在问题 1.什么是DI? DI: 依赖注⼊ 依赖注⼊是⼀个过程&#xff0c;是指IoC容器在创建Bean时, 去提供运⾏时所依赖的资源&#xff0c;⽽资源指的…