通过 NIO + 多线程 提升硬件设备与系统的数据传输性能

news2024/9/19 22:15:59

一、项目展示

下图(模拟的数据可视化大屏)中数据是动态显示的

在这里插入图片描述

二、项目简介

描述:使用Client模拟了硬件设备,比如可燃气体浓度检测器。Client通过Socket与Server建立连接,Server保存数据到txt文件,并使用WebSocket将数据推送到数据可视化大屏

工作:通过多线程+NIO优化了Server性能

原理图:

在这里插入图片描述

三、代码实现

Server

NioSocketServerService.java

package com.example.server;

import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

@Service
public class NioSocketServerService {

    private static final int PORT = 8081;
    private static final int TIMEOUT = 5000;
    private static final BlockingQueue<String> writeQueue = new LinkedBlockingQueue<>();

    @PostConstruct
    public void startServer() {
        for (int i = 0; i < 4; i++) {
            new Thread(new FileWriterTask(writeQueue)).start();
        }

        new Thread(() -> {
            try {
                Selector selector = Selector.open();
                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                serverSocketChannel.bind(new InetSocketAddress(PORT));
                serverSocketChannel.configureBlocking(false);
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

                System.out.println("Server is listening on port " + PORT);

                while (true) {
                    if (selector.select(TIMEOUT) == 0) {
                        continue;
                    }

                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();

                        try {
                            if (key.isAcceptable()) {
                                handleAccept(key, selector);
                            } else if (key.isReadable()) {
                                handleRead(key);
                            }
                        } catch (IOException e) {
                            key.cancel();               // 取消键的注册,这意味着该通道不再被选择器监视
                            key.channel().close();      // 关闭通道,释放资源
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }

    private void handleAccept(SelectionKey key, Selector selector) throws IOException {
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);
        SocketChannelHandler.addBuffer(socketChannel);
        System.out.println("New client connected: " + socketChannel.getRemoteAddress());
    }

    private void handleRead(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        SocketChannelHandler.readFromChannel(socketChannel, writeQueue);
    }
}

SocketChannelHandler.java

package com.example.server;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;

public class SocketChannelHandler {

    private static final String DIRECTORY = "data/";
    private static final int BUFFER_SIZE = 2048;
    private static final Map<SocketChannel, ByteBuffer> bufferMap = new ConcurrentHashMap<>();
    private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    public static void addBuffer(SocketChannel socketChannel) {
        bufferMap.put(socketChannel, ByteBuffer.allocateDirect(BUFFER_SIZE));
    }

    public static void readFromChannel(SocketChannel socketChannel, BlockingQueue<String> writeQueue) throws IOException {
        ByteBuffer buffer = bufferMap.get(socketChannel);
        buffer.clear();
        int bytesRead;

        try {
            bytesRead = socketChannel.read(buffer);
        } catch (IOException e) {
            System.err.println("Error reading from socket: " + e.getMessage());
            socketChannel.close();
            bufferMap.remove(socketChannel);
            return;
        }

        if (bytesRead == -1) {          // 读取到-1表示客户端已关闭连接,移除缓冲区
            socketChannel.close();
            bufferMap.remove(socketChannel);
        } else if (bytesRead > 0) {
            buffer.flip();
            byte[] data = new byte[buffer.remaining()];
            buffer.get(data);
            String message = new String(data);

            String[] dataParts = message.split(" : ", 2);
            if (dataParts.length == 2) {
                String deviceId = dataParts[0].trim();
                String deviceData = dataParts[1].trim();
                String currentTime = LocalDateTime.now().format(dateTimeFormatter);
                String dataToWrite = DIRECTORY + deviceId + ".txt : " + currentTime + " : " + deviceData;

                writeQueue.add(dataToWrite);

                WebSocketServer.sendMessage(deviceId + " : " + currentTime + " : " + deviceData);
            }
        }
    }
}

FileWriterTask.java

package com.example.server;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class FileWriterTask implements Runnable {

    private static final int BATCH_SIZE = 10;

    /**
     * BlockingQueue是JUC包中的一个接口,提供了线程安全的队列操作
     * 支持阻塞的put和take操作,当队列满时put会阻塞,直到队列有空位;当队列空时take会阻塞,直到队列有元素
     * 其主要实现包括:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue
     */
    private final BlockingQueue<String> writeQueue;

    public FileWriterTask(BlockingQueue<String> writeQueue) {
        this.writeQueue = writeQueue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                List<String> dataList = new ArrayList<>();

                // 读取BATCH_SIZE条数据,或等待超时后退出循环
                while (dataList.size() < BATCH_SIZE) {
                    String data = writeQueue.poll(100, TimeUnit.MILLISECONDS);
                    if (data != null) {
                        dataList.add(data);
                    } else {
                        break;
                    }
                }

                // 如果读取到数据,则将其写入文件
                if (!dataList.isEmpty()) {
                    for (String data : dataList) {
                        String[] dataParts = data.split(" : ");
                        if (dataParts.length == 3) {
                            String fileName = dataParts[0].trim();
                            try (FileChannel fileChannel = FileChannel.open(Paths.get(fileName), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.APPEND)) {
                                ByteBuffer buffer = ByteBuffer.wrap((data + System.lineSeparator()).getBytes());
                                fileChannel.write(buffer);
                            }
                        }
                    }
                }
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Client

MultiThreadedSocketClient.java

package com.example.client;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MultiThreadedSocketClient {
    public static void main(String[] args) {
        String hostname = "localhost";
        int port = 8081;
        int numberOfDevices = 1000;
        ExecutorService executor = Executors.newFixedThreadPool(numberOfDevices);

        for (int i = 1; i <= numberOfDevices; i++) {
            String deviceId = "Device" + i;
            executor.submit(new DeviceClient(hostname, port, deviceId));
        }

        executor.shutdown();
    }
}

DeviceClient.java

package com.example.client;

import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Random;
import java.util.concurrent.TimeUnit;

class DeviceClient implements Runnable {
    private String hostname;
    private int port;
    private String deviceId;
    private Random random = new Random();
    private static final int MAX_RETRIES = 15;
    private static final int RETRY_DELAY_MS = 1000;

    public DeviceClient(String hostname, int port, String deviceId) {
        this.hostname = hostname;
        this.port = port;
        this.deviceId = deviceId;
    }

    @Override
    public void run() {
        int attempt = 0;
        boolean connected = false;

        while (attempt < MAX_RETRIES && !connected) {
            try {
                Thread.sleep(random.nextInt(15000));
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            try (Socket socket = new Socket(hostname, port);
                 PrintWriter out = new PrintWriter(socket.getOutputStream(), true)) {

                connected = true;

                while (true) {
                    try {
                        String data = deviceId + " : " + random.nextInt(50000);
                        out.println(data);
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            } catch (UnknownHostException e) {
                System.err.println("Unknown host: " + hostname);
                break;
            } catch (IOException e) {
                attempt++;
                int randomDelay = random.nextInt(10000);
                System.err.println(deviceId + "\tAttempt " + attempt + " - Connection refused. Retrying in " + (RETRY_DELAY_MS + randomDelay) + "ms...");
                try {
                    Thread.sleep(RETRY_DELAY_MS + randomDelay);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }

        if (!connected) {
            System.err.println("Failed to connect after " + MAX_RETRIES + " attempts.");
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1693977.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【CAN】STM32新能源汽车CAN通信实现过程

【CAN】STM32新能源汽车CAN通信实现过程 文章目录 前言一、软件1.PA11、PA12口配置2.PB8、PB9口配置 二、接线图三、硬件原理图四、上位机总结 前言 【电机控制】直流有刷电机、无刷电机汇总——持续更新 使用工具&#xff1a; 1.控制器——STM32F103C8T6 2.仿真器——STLINK …

SQL面试题练习 —— 计算次日留存率

题目 现有用户登录记录表&#xff0c;已经按照用户日期进行去重处理。以用户登录的最早日期作为新增日期&#xff0c;请计算次日留存率是多少。 样例数据 ----------------------- | user_id | login_date | ----------------------- | aaa | 2023-12-01 | | bbb …

Python 拼图游戏

拼图游戏(puzzle)是一种常见的益智游戏&#xff0c;玩家通过拖动图块来正确拼接成完整的图片。 由一张原图&#xff0c;分割成图块&#xff0c;拼图块的大小将会根据行列数自动调整&#xff0c;然后随机打乱&#xff0c;玩家通过拖拽图块&#xff0c;最后复原原图。 &#x1f…

在linux下的ROS中下载超级终端Terminator ROS开发得力助手

在一般我们运行机器人包时要打开三个终端来运行&#xff0c;关闭时还要一个一个关闭&#xff08;ctrlc&#xff09;过于麻烦 现在下载用了terminator后&#xff0c;就支持一键关闭多个终端了&#xff0c;很方便&#xff0c;具体操作如下&#xff1a; sudo apt install termin…

推荐个免费天气接口

http://www.tianqiapi.com/index/doc?versionmonthhttp://www.tianqiapi.com/index/doc?versionmonth 个人博客使用足够了&#xff01;

实验一:通过路由器实现内外网互联

通过路由器实现内外网互联 一、实验拓扑 相关配置详见下图&#xff0c;内网区域为AR2以内设备&#xff0c;外网区域以AR1和PC1代替进行实验测试。 二、实验要求 通过路由器实现内外网互联&#xff1a; 1.各内网PC可自动获取ip地址&#xff1b; 2.各内网PC可ping通外网PC&…

Mysql插入中文内容报错解决及其Mysql常用的存储引擎说明

一、问题描述 我们在Mysql数据库的表中插入带有中文内容时报错,提示【1366 - Incorrect string value: \xE5\x8C\x97\xE4\xBA\xAC... for column UserDealer at row 1】,如下图所示: 二、问题分析 一般来说插入中文内容有问题我们首先想到的就是编码问题;我们可以查看该表使…

01_尚硅谷JavaWeb最新版笔记

尚硅谷JAVAWEB概述 课程概述 计划学习时间&#xff1a;1周以内

负反馈系统中运放的相位裕度仿真、环路增益的stb仿真

这里没目录标题 一、引言二、巴克豪森判据、最坏情况下的相位裕度、相位裕度三、相位裕度与开环&#xff0c;环路&#xff0c;闭环增益的关系四、环路增益、闭环增益和相位的仿真4.1 运放为双入单出时4.1.1 系统的闭环增益4.1.2 stb仿真系统的环路增益和相位裕度&#xff08;环…

talib 安装

这里写自定义目录标题 talib 安装出错 talib 安装出错 https://github.com/cgohlke/talib-build/releases 这里找到轮子 直接装。

最新文章合集

GitHub宝藏项目&#xff1a;每天一个&#xff0c;让你的技术库增值不停&#xff01; STORM、SuperMemory、Awesome Chinese LLM、AI写作助手、资料搜集、文章生成、视角问题引导、模拟对话策略、内容导入、浏览器插件、资源库、开源微调模型 开发者必看&#xff1a;Linux终端…

world machine学习笔记(3)

打开 可以打开场景设置&#xff0c;项目设置平铺构建设置 场景设置&#xff1a; 输出范围 设置中心点和范围 设置分辨率 项目设置&#xff1a; 设置地图颜色&#xff0c;单位&#xff0c;最高地形高度 点击这个图形进行预览设置 该按钮还有其他的功能 world machine基础流程…

基于51单片机的数字频率计(电路图+pcb+论文+仿真+源码)

于51单片机的数字频率计 设计的频率计范围能够达到1HZ-1MHZ(实际上51单片机达不到这个范围&#xff0c;不要在实验环境下进行)&#xff0c;这个是课设来着&#xff0c;用Proteus仿真实现的&#xff0c;给有需要的同学参考一下 仿真原理图如下&#xff08;proteus仿真工程文件可…

【算法设计与分析】基于Go语言实现动态规划法解决TSP问题

本文针对于最近正在学习的Go语言&#xff0c;以及算法课实验所需内容进行Coding&#xff0c;一举两得&#xff01; 一、前言 由于这个实验不要求向之前的实验一样做到那种连线的可视化&#xff0c;故可以用图形界面不那么好实现的语言进行编写&#xff0c;考虑到Go语言的…

基于.net开发的博客系统

基于.net开发可以批量上传md文件生成文章的博客系统 .NET 个人博客 基于.net开发的博客系统 个人博客系统&#xff0c;采用.net core微服务技术搭建&#xff0c;采用传统的MVC模式&#xff0c;使用EF core来对mysql数据库(sqlite数据库)进行CRUD操作项目 为什么要自己开发博客…

uniapp微信小程序解决type=“nickname“获取昵称,v-model绑定值为空问题!

解决获取 type"nickname"值为空问题 文章目录 解决获取 type"nickname"值为空问题效果图Demo解决方式通过表单收集内容通过 uni.createSelectorQuery 效果图 开发工具效果图&#xff0c;真机上还会显示键盘输入框 Demo 如果通过 v-model 结合 blur 获取不…

使用梦畅闹钟,结合自定义bat、vbs脚本等实现定时功能

梦畅闹钟-每隔一段时间运行一次程序 休息五分钟bat脚本&#xff08;播放音乐视频&#xff0c;并锁屏&#xff09; chcp 65001 echo 回车开始休息5分钟 pause explorer "https://www.bilibili.com/video/BV1RT411S7Tk/?p47" timeout /t 3 /nobreak rundll32.exe use…

Font shape `U/rsfs/m/n‘ in size <29.86> not available size <24.88>

解决方法&#xff1a;mathrsfs 删除这个包 其他可以参考&#xff1a;koma script - Size substitution with fontsize14 - TeX - LaTeX Stack Exchange

若依框架对于后端返回异常后怎么处理?

1、后端返回自定义异常serviceException 2、触发该异常后返回json数据 因为若依对请求和响应都封装了&#xff0c;所以根据返回值response获取不到Code值但若依提供了一个catch方法用来捕获返回异常的数据 3、处理的方法

python给图片加上图片水印

python给图片加上图片水印 作用效果代码 作用 给图片加上图片水印图片水印的透明度&#xff0c;位置可自定义 效果 原始图片&#xff1a; 水印图片&#xff1a; 添加水印后的图片&#xff1a; 代码 from PIL import Image, ImageDraw, ImageFontdef add_watermark(in…