kafak消费数据,webSocket实时推送数据到前端

news2025/1/14 1:06:17

1.导入webSocket依赖

 <!--websocket依赖包-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

2.编写webSocket类

package com.skyable.device.config.webSocket;


import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

/**
 * @author Administrator
 */
@ServerEndpoint("/vehicle/{domainId}")
@Component
@Slf4j
public class WebSocketServer {
    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
     */
    private static final Set<Session> SESSIONS = new HashSet<>();


    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        log.info("webSocket link close");
    }

    /**
     * @param error
     */
    @OnError
    public void onError(Throwable error) {
        error.printStackTrace();
    }

    /**
     * 接收数据
     *
     * @param data
     */
    public static void sendDataToClients(String data) {
        for (Session session : SESSIONS) {
            try {
                session.getBasicRemote().sendText(data);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    @OnOpen
    public void onOpen(Session session) {
        /**
         * 与某个客户端的连接会话,需要通过它来给客户端发送数据
         */
        /**
         * 接收domainId
         */
        SESSIONS.add(session);
        sendDataToClients();
    }


    public void sendDataToClients() {
        for (Session session : SESSIONS) {
            try {
                session.getBasicRemote().sendText("webSocket link succeed");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
package com.skyable.device.config.webSocket;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author Administrator
 */
@EnableWebSocket
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

3.kafak消费数据后调用webSocket方法

  /**
     * 获取kafka数据
     *
     * @param
     */
    @Override
    public void saveBatch(String jsonValue) {
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            //位置
            JsonNode jsonNode = objectMapper.readTree(jsonValue);
            if (jsonNode.has(VehicleConstant.LOCATION)) {
                RealTimePosition realTimePosition = new RealTimePosition();
                JsonNode locationNode = jsonNode.get("location");
                String vehicleId = locationNode.get("vehicleId").asText();
                double longitude = Double.parseDouble(locationNode.get("longitude").asText());
                double latitude = Double.parseDouble(locationNode.get("latitude").asText());
                long timeStamp = locationNode.get("timestamp").asLong();
                realTimePosition.setTimeStamp(timeStamp);
                realTimePosition.setLatitude(String.valueOf(latitude));
                realTimePosition.setLongitude(String.valueOf(longitude));
                realTimePosition.setVehicleId(vehicleId);
                VehicleLocationVo locationVo = deviceMapMapper.selectLonLat(vehicleId);
                if (!Objects.isNull(locationVo)) {
                    //计算距离
                    RedisUtil.addLocation(vehicleId, Double.parseDouble(locationVo.getLongitude()), Double.parseDouble(locationVo.getLatitude()), "l1");
                    RedisUtil.addLocation(vehicleId, longitude, latitude, "l2");
                    Double result = RedisUtil.calculateDistance(vehicleId, "l1", "l2");
                    Double meters = RedisUtil.convertMilesToKilometers(result);
                    DecimalFormat decimalFormat = new DecimalFormat("#.###");
                    String distance = decimalFormat.format(meters);
                    realTimePosition.setDistance(Double.parseDouble(distance));
                } else {
                    realTimePosition.setDistance(0);
                }
                //获取省份
                Map<String, Object> position = addressUtil.getPosition(longitude, latitude, null, null, null);
                Map data = (Map) position.get("data");
                String provinceName = data.get("shortname").toString().replaceAll("\"", "");
                realTimePosition.setArea(provinceName);
                deviceMapMapper.insertRealTimePosition(realTimePosition);
                RedisUtil.addZSetValue(VehicleConstant.VEHICLE_LOCATION, String.valueOf(vehicleId), timeStamp);
            }
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        try {
            //报警
            JsonNode jsonNode = objectMapper.readTree(jsonValue);
            if (jsonNode.has(VehicleConstant.ALERT)) {
                JsonNode alertNode = jsonNode.get("alert");
                String vehicleId = alertNode.get("vehicleId").asText();
                Integer alertType = alertNode.get("alertType").asInt();
                long timeStamp = alertNode.get("timestamp").asLong();
                Alerts alerts = new Alerts();
                alerts.setAlertType(alertType);
                alerts.setTimeStamp(timeStamp);
                alerts.setVehicleId(vehicleId);
                deviceMapMapper.insertAlerts(alerts);
                RedisUtil.addZSetValue(VehicleConstant.VEHICLE_ALERT, String.valueOf(vehicleId), timeStamp);
            }
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        //webSocket发送消息
        VehicleAllVo vehicles = vehicles();
        WebSocketServer.sendDataToClients(vehicles.toString());
    }

4.发送消息内容

VehicleAllVo vehicles = vehicles();
该方法就是发送的具体内容

5.kafak消费者

package com.skyable.device.listener.Vehicle;

import com.alibaba.fastjson.JSON;
import com.skyable.common.config.CloudApplicationContext;
import com.skyable.common.constants.kafka.KafkaTopicConstants;
import com.skyable.device.config.webSocket.WebSocketServer;
import com.skyable.device.entity.vehicle.Vehicle;
import com.skyable.device.service.DeviceMapService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.stream.Collectors;

/**
 * Description:
 *
 * @author yangJun
 * @date: 2023-08-18-14:12
 */
@Service
@Component
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class VehicleDataKafkaListener {
    private final DeviceMapService deviceMapService;

    @KafkaListener(topics = KafkaTopicConstants.TOPIC_VEHICLE_RECORD, groupId = "rx_1_thing", containerFactory = "batchFactory")
    public void dealDeviceDataToScript(List<ConsumerRecord<String, String>> recordList) {
        recordList.parallelStream()
                .map(ConsumerRecord::value)
                .forEach(this::saveVehicleData);
    }

    private void saveVehicleData(String jsonValue) {
        log.info("kafka data:" + jsonValue);
        deviceMapService.saveBatch(jsonValue);
    }
}
package com.skyable.device.listener.Vehicle;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * @ClassName KafkaConsumerConfig
 * @Description Kafka消费者配置
 * @Author gaoy
 * @Date 2021/2/25 15:02
 */
@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${spring.kafka.consumer.concurrency}")
    private int concurrency;

    @Value("${spring.kafka.consumer.max-poll-records}")
    private int maxPollRecords;


    /**
     * 批量消费工厂bean
     * @return
     */
    @Bean
    KafkaListenerContainerFactory batchFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new
                ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        // 开启批量监听
        factory.setBatchListener(true);
        factory.setConcurrency(concurrency);
        // 设置手动提交ackMode
        // factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

    @Bean
    public Map consumerConfigs() {
        Map<String,Object> props = new HashMap<>();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        //设置每次接收Message的数量
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //开启幂等性。
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
        return props;
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/927075.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

远程连接虚拟机中ubuntu报错:Network error:Connection refused

ping检测一下虚拟机 可以ping通&#xff0c;说明主机是没问题 #检查ssh是否安装&#xff1a; ps -e |grep ssh发现ssh没有安装 #安装openssh-server sudo apt-get install openssh-server#启动ssh service ssh startps -e |grep ssh检查一下防火墙 #防火墙状态查看 sudo ufw…

工控机驱动自助检票机,打造轨道交通的智慧未来!

随着城市化进程的加速和人口的不断增长&#xff0c;城市轨道交通建设正日益成为解决交通拥堵、提高交通工作效率的重要举措。然而&#xff0c;仅仅依靠传统的交通设施已经无法满足城市发展的需求&#xff0c;轨道交通智能系统建设成为了不可忽视的发展趋势。 AFC&#xff0c;即…

数据分享|R语言PCA主成分、lasso、岭回归降维分析近年来各国土地面积变化影响...

全文链接&#xff1a;http://tecdat.cn/?p31445 机器学习在环境监测领域的应用&#xff0c;着眼于探索全球范围内的环境演化规律&#xff0c;人类与自然生态之间的关系以及环境变化对人类生存的影响&#xff08;点击文末“阅读原文”获取完整代码数据&#xff09;。 课题着眼于…

电脑显示“Operating System not found”该怎么办?

“Operating System not found”是一种常见的电脑错误提示&#xff0c;这类错误会导致你无法成功启动Windows。那么电脑显示“Operating System not found”该怎么办呢&#xff1f; 方法1. 检查硬盘 首先&#xff0c;您可以测试硬盘是否存在问题。为此&#xff0c;您可以采取以…

.NET敏捷开发框架-RDIFramework.NET V6.0发布

1、RDIFramework.NET 敏捷开发框架介绍 RDIFramework.NET敏捷开发框架&#xff0c;是我司重磅推出的基于最新.NET6与.NET Framework的快速信息化系统开发、整合框架&#xff0c;为企业快速构建跨平台、企业级的应用提供了强大支持。 开发人员不需要开发系统的基础功能和公共模…

CentOS7安装jq命令

1. 安装依赖 yum install gmp-devel mpfr-devel libmpc-devel -y2. 安装gcc 2.1 离线环境 wget https://ftp.gnu.org/gnu/gcc/gcc-10.3.0/gcc-10.3.0.tar.gz tar -xzf gcc-10.3.0.tar.gz编译安装 yum -y install gcc c --skip-broken./configure --disable-multilib --enab…

Rust处理JSON

基本操作 Cargo.toml: [package]name "json"version "0.1.0"edition "2021"# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html[dependencies]serde { version "1", features …

uniapp小程序位置信息配置

uniapp 小程序获取当前位置信息报错 报错信息&#xff1a; getLocation:fail the api need to be declared in the requiredPrivateInfos field in app.json/ext.json 需要在manifest.json配置文件中进行配置&#xff1a;

喜欢单片机?嵌入式高薪在招手!

嵌入式技术作为热门行业之一&#xff0c;近年来得到了广泛的关注和追捧。在众多嵌入式技术中&#xff0c;单片机技术因其小巧、低功耗和强大性能而备受青睐。下面我们将探讨为何喜欢单片机&#xff0c;以及嵌入式领域高薪工作的前景。 作为嵌入式系统的核心&#xff0c;单片机具…

【java】LinkedList 和 ArrayList的简介与对比

Java LinkedList和 ArrayList 在使用上&#xff0c;几乎是一样的。由于LinkedList是基于双向链表的&#xff0c;会多出list.getFirst();获取头部元素等方法 链表&#xff08;Linked list&#xff09;是一种常见的基础数据结构&#xff0c;是一种线性表&#xff0c;但是并不会按…

24 WEB漏洞-文件上传之WAF绕过及安全修复

目录 WAF绕过上传参数名解析:明确哪些东西能修改?常见绕过方法&#xff1a;符号变异-防匹配( " ;)数据截断-防匹配(%00 ; 换行)重复数据-防匹配(参数多次)搜索引擎搜索fuzz web字典文件上传安全修复方案 WAF绕过 safedog BT(宝塔) XXX云盾 宝塔过滤的比安全狗厉害一些&a…

无涯教程-进程 - 创建终止

到现在为止&#xff0c;我们知道无论何时执行程序&#xff0c;都会创建一个进程&#xff0c;并且该进程将在执行完成后终止&#xff0c;如果我们需要在程序中创建一个进程&#xff0c;并且可能希望为其安排其他任务&#xff0c;该怎么办。能做到吗?是的&#xff0c;显然是通过…

测试神器!RunnerGo让你的测试工作更高效!

引言&#xff1a;在软件开发领域&#xff0c;测试是非常重要的一环。然而&#xff0c;传统的测试工具往往复杂且难以使用&#xff0c;让测试工作变得异常繁琐。为了解决这一问题&#xff0c;我们迎来了RunnerGo——一款轻量级、全栈式的测试平台&#xff0c;让你的测试工作更加…

【TI毫米波雷达笔记】UART串口外设配置及驱动(以IWR6843AOP为例)

【TI毫米波雷达笔记】UART串口外设初始化配置及驱动&#xff08;以IWR6843AOP为例&#xff09; 最基本的工程建立好以后 需要给SOC进行初始化配置 int main (void) {//刷一下内存memset ((void *)L3_RAM_Buf, 0, sizeof(L3_RAM_Buf));int32_t errCode; //存放SOC初…

卷积神经网络——中篇【深度学习】【PyTorch】【d2l】

文章目录 5、卷积神经网络5.5、经典卷积神经网络&#xff08;LeNet&#xff09;5.5.1、理论部分5.5.2、代码实现 5.6、深度卷积神经网络&#xff08;AlexNet&#xff09;5.6.1、理论部分5.6.2、代码实现 5.7、使用块的网络&#xff08;VGG&#xff09;5.7.1、理论部分5.7.2、代…

计网第四章(网络层)(二)

目录 IPV4地址编址 第一历史阶段&#xff08;分类编址&#xff09;&#xff1a; A类地址&#xff1a; B类地址&#xff1a; C类地址&#xff1a; D类地址&#xff08;多播地址&#xff09;&#xff1a; E类地址&#xff08;保留地址&#xff09;&#xff1a; 第二历史阶…

懵了,面试官问我Redis怎么测,我哪知道!

有些测试朋友来问我&#xff0c;redis要怎么测试&#xff1f;首先我们需要知道&#xff0c;redis是什么&#xff1f;它能做什么&#xff1f; redis是一个key-value类型的高速存储数据库。 redis常被用做&#xff1a;缓存、队列、发布订阅等。 所以&#xff0c;“redis要怎么测…

数据通信——OSPF基础

一&#xff0c;实验背景 公司盈利了&#xff0c;老总打算扩展公司规模&#xff0c;也发现了RIP协议的缺点带来的影响。身为工程师&#xff0c;老总让你替换更好的网络&#xff0c;顺带为拓展出的新部门进行新的网络部署&#xff0c;甚至买来很多设备。 此时你要用OSPF协议解决问…

推荐系统在线峰会来了,冷启动、推荐工程、模型训练…你都能找到答案

回顾推荐系统的发展历程&#xff0c;从 30 余年前的协同过滤算法起步&#xff0c;经历了深度学习的浪潮&#xff0c;到如今热火朝天的大模型&#xff0c;推荐系统一次又一次地焕发出新的活力。随着大模型的到来&#xff0c;推荐系统正处于变革的前夜&#xff0c;原有的系统模块…

计算机竞赛 基于大数据的时间序列股价预测分析与可视化 - lstm

文章目录 1 前言2 时间序列的由来2.1 四种模型的名称&#xff1a; 3 数据预览4 理论公式4.1 协方差4.2 相关系数4.3 scikit-learn计算相关性 5 金融数据的时序分析5.1 数据概况5.2 序列变化情况计算 最后 1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &…