netty之SpringBoot+Netty+Elasticsearch收集日志信息数据存储

news2025/1/24 17:34:50

前言


将大量的业务以及用户行为数据存储起来用于分析处理,但是由于数据量较大且需要具备可分析功能所以将数据存储到文件系统更为合理。尤其是一些互联网高并发级应用,往往数据库都采用分库分表设计,那么将这些分散的数据通过binlog汇总到一个统一的文件系统就显得非常有必要。

#开发环境
在这里插入图片描述
环境准备
windows安装包 下载
注意 es是以来java环境 所以需要安装jdk 支持1.7以上
es-hander下载可视化操作插件

@Document(indexName = "stack", type = "group_user")
public class User {

    @Id
    private String id;
    private String name;   //姓名
    private Integer age;   //年龄
    private String level;  //级别
    private Date entryDate;//时间
    private String mobile; //电话
    private String email;  //邮箱
    private String address;//地址


    public User(String id, String name, Integer age, String level, Date entryDate, String mobile, String email, String address) {
        this.id = id;
        this.name = name;
        this.age = age;
        this.level = level;
        this.entryDate = entryDate;
        this.mobile = mobile;
        this.email = email;
        this.address = address;

    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }

    public String getLevel() {
        return level;
    }

    public void setLevel(String level) {
        this.level = level;
    }

    public Date getEntryDate() {
        return entryDate;
    }

    public void setEntryDate(Date entryDate) {
        this.entryDate = entryDate;
    }

    public String getMobile() {
        return mobile;
    }

    public void setMobile(String mobile) {
        this.mobile = mobile;
    }

    public String getEmail() {
        return email;
    }

    public void setEmail(String email) {
        this.email = email;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }
}
@Service("myServerHandler")
public class MyServerHandler extends ChannelInboundHandlerAdapter {

    private Logger logger = LoggerFactory.getLogger(MyServerHandler.class);

    @Autowired
    private UserService userService;

    /**
     * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        SocketChannel channel = (SocketChannel) ctx.channel();
        logger.info("链接报告开始");
        logger.info("链接报告信息:有一客户端链接到本服务端");
        logger.info("链接报告IP:{}", channel.localAddress().getHostString());
        logger.info("链接报告Port:{}", channel.localAddress().getPort());
        logger.info("链接报告完毕");
    }

    /**
     * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        logger.info("客户端断开链接{}", ctx.channel().localAddress().toString());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //接收msg消息{与上一章节相比,此处已经不需要自己进行解码}
        logger.info(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端接收到消息:" + JSON.toJSONString(msg));
        //接收数据写入到Elasticsearch
        TransportProtocol transportProtocol = (TransportProtocol) msg;
        userService.save((User) transportProtocol.getObj());
    }

    /**
     * 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
        logger.info("异常信息:\r\n" + cause.getMessage());
    }

}

@Component("nettyServer")
public class NettyServer {

    private Logger logger = LoggerFactory.getLogger(NettyServer.class);

    @Resource
    private MyChannelInitializer myChannelInitializer;

    //配置服务端NIO线程组
    private final EventLoopGroup parentGroup = new NioEventLoopGroup(); //NioEventLoopGroup extends MultithreadEventLoopGroup Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    private final EventLoopGroup childGroup = new NioEventLoopGroup();
    private Channel channel;

    public ChannelFuture bing(InetSocketAddress address) {
        ChannelFuture channelFuture = null;
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(parentGroup, childGroup)
                    .channel(NioServerSocketChannel.class)    //非阻塞模式
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childHandler(myChannelInitializer);

            channelFuture = b.bind(address).syncUninterruptibly();
            channel = channelFuture.channel();
        } catch (Exception e) {
            logger.error(e.getMessage());
        } finally {
            if (null != channelFuture && channelFuture.isSuccess()) {
                logger.info("itstack-demo-netty server start done. {关注明哥,获取源码}");
            } else {
                logger.error("itstack-demo-netty server start error. {关注明哥,获取源码}");
            }
        }
        return channelFuture;
    }

    public void destroy() {
        if (null == channel) return;
        channel.close();
        parentGroup.shutdownGracefully();
        childGroup.shutdownGracefully();
    }

    public Channel getChannel() {
        return channel;
    }

}


public interface UserService {

    void save(User user);

    void deleteById(String id);

    User queryUserById(String id);

    Iterable<User> queryAll();

    Page<User> findByName(String name, PageRequest request);

}

提供一个可拓展的操作实体表的接口


public interface UserRepository extends ElasticsearchRepository<User, String> {

    Page<User> findByName(String name, Pageable pageable);

}

@Service("userService")
public class UserServiceImpl implements UserService {

    private UserRepository dataRepository;

    @Autowired
    public void setDataRepository(UserRepository dataRepository) {
        this.dataRepository = dataRepository;
    }

    @Override
    public void save(User user) {
        dataRepository.save(user);
    }

    @Override
    public void deleteById(String id) {
        dataRepository.deleteById(id);
    }

    @Override
    public User queryUserById(String id) {
        Optional<User> optionalUser = dataRepository.findById(id);
        return optionalUser.get();
    }

    @Override
    public Iterable<User> queryAll() {
        return dataRepository.findAll();
    }

    @Override
    public Page<User> findByName(String name, PageRequest request) {
        return dataRepository.findByName(name, request);
    }

}

@RestController
public class NettyController {

    @Resource
    private NettyServer nettyServer;

    @RequestMapping("/localAddress")
    public String localAddress() {
        return "nettyServer localAddress " + nettyServer.getChannel().localAddress();
    }

}

@SpringBootApplication
public class Application implements CommandLineRunner {

    private Logger logger = LoggerFactory.getLogger(Application.class);

    @Value("${netty.host}")
    private String host;
    @Value("${netty.port}")
    private int port;
    @Resource
    private NettyServer nettyServer;

    public static void main(String[] args) {
        System.setProperty("es.set.netty.runtime.available.processors", "false");
        SpringApplication.run(Application.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        InetSocketAddress address = new InetSocketAddress(host, port);
        ChannelFuture channelFuture = nettyServer.bing(address);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> nettyServer.destroy()));
        channelFuture.channel().closeFuture().syncUninterruptibly();
    }

}

## 服务端口
server.port = 8080

## Netty服务端配置
netty.host = 127.0.0.1
netty.port = 7397

## Elasticsearch配置{更换为自己的cluster-name、cluster-nodes}
spring.data.elasticsearch.cluster-name=es-itstack
spring.data.elasticsearch.cluster-nodes=127.0.0.1:9300
spring.data.elasticsearch.repositories.enabled=true

ApiTest.java *Netty客户端,用于向服务端发送数据

public class ApiTest {

    public static void main(String[] args) {
        System.out.println("hi 微信公众号:关注明哥");
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.AUTO_READ, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel channel) throws Exception {
                    //对象传输处理
                    channel.pipeline().addLast(new ObjDecoder(TransportProtocol.class));
                    channel.pipeline().addLast(new ObjEncoder(TransportProtocol.class));
                    // 在管道中添加我们自己的接收数据实现方法
                    channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

                        }
                    });
                }
            });
            ChannelFuture f = b.connect("127.0.0.1", 7397).sync();
            System.out.println("itstack-demo-netty client start done. {关注明哥,获取源码}");

            TransportProtocol tp1 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "李小明", 1, "T0-1", new Date(), "13566668888", "184172133@qq.com", "北京"));
            TransportProtocol tp2 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "张大明", 2, "T0-2", new Date(), "13566660001", "huahua@qq.com", "南京"));
            TransportProtocol tp3 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "李书鹏", 2, "T1-1", new Date(), "13566660002", "xiaobai@qq.com", "榆树"));
            TransportProtocol tp4 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "韩小雪", 2, "T2-1", new Date(), "13566660002", "xiaobai@qq.com", "榆树"));
            TransportProtocol tp5 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "董叔飞", 2, "T4-1", new Date(), "13566660002", "xiaobai@qq.com", "河北"));
            TransportProtocol tp6 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "候明相", 2, "T5-1", new Date(), "13566660002", "xiaobai@qq.com", "下花园"));
            TransportProtocol tp7 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "田明明", 2, "T3-1", new Date(), "13566660002", "xiaobai@qq.com", "东平"));
            TransportProtocol tp8 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "王大伟", 2, "T4-1", new Date(), "13566660002", "xiaobai@qq.com", "西湖"));
            TransportProtocol tp9 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "李雪明", 2, "T1-1", new Date(), "13566660002", "xiaobai@qq.com", "南昌"));
            TransportProtocol tp10 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "朱小飞", 2, "T2-1", new Date(), "13566660002", "xiaobai@qq.com", "吉林"));
            TransportProtocol tp11 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "牛大明", 2, "T1-1", new Date(), "13566660002", "xiaobai@qq.com", "长春"));
            TransportProtocol tp12 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "关雪儿", 2, "T2-1", new Date(), "13566660002", "xiaobai@qq.com", "深圳"));

            //向服务端发送信息
            f.channel().writeAndFlush(tp1);
            f.channel().writeAndFlush(tp2);
            f.channel().writeAndFlush(tp3);
            f.channel().writeAndFlush(tp4);
            f.channel().writeAndFlush(tp5);
            f.channel().writeAndFlush(tp6);
            f.channel().writeAndFlush(tp7);
            f.channel().writeAndFlush(tp8);
            f.channel().writeAndFlush(tp9);
            f.channel().writeAndFlush(tp10);
            f.channel().writeAndFlush(tp11);
            f.channel().writeAndFlush(tp12);

            f.channel().closeFuture().syncUninterruptibly();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

}

好了到这里就结束了netty之SpringBoot+Netty+Elasticsearch收集日志信息数据存储的学习,大家一定要跟着动手操作起来。需要的源码的 可si我获取;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2188141.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

第L9周:无监督学习|K-means聚类算法

本文为365天深度学习训练营 中的学习记录博客原作者&#xff1a;K同学啊 任务描述&#xff1a; ●学会调用sklearn实现KMeans算法。 ●了解误差平方和与轮廓系数。 1.聚类算法是什么&#xff1f; 聚类就是将一个庞杂数据集中具有相似特征的数据自动归类到一起&#xff0c;称为…

Leetcode 1498. 满足条件的子序列数目

1.题目基本信息 1.1.题目描述 给你一个整数数组 nums 和一个整数 target 。 请你统计并返回 nums 中能满足其最小元素与最大元素的 和 小于或等于 target 的 非空 子序列的数目。 由于答案可能很大&#xff0c;请将结果对 109 7 取余后返回。 1.2.题目地址 https://leet…

【优选算法之队列+宽搜/优先级队列】No.14--- 经典队列+宽搜/优先级队列算法

文章目录 前言一、队列宽搜示例&#xff1a;1.1 N 叉树的层序遍历1.2 ⼆叉树的锯⻮形层序遍历1.3 ⼆叉树最⼤宽度1.4 在每个树⾏中找最⼤值 二、优先级队列&#xff08;堆&#xff09;示例&#xff1a;2.1 最后⼀块⽯头的重量2.2 数据流中的第 K ⼤元素2.3 前 K 个⾼频单词2.4 …

气象网格数据与卫星轨道数据如何匹配??

&#x1f3c6;本文收录于《全栈Bug调优(实战版)》专栏&#xff0c;主要记录项目实战过程中所遇到的Bug或因后果及提供真实有效的解决方案&#xff0c;希望能够助你一臂之力&#xff0c;帮你早日登顶实现财富自由&#x1f680;&#xff1b;同时&#xff0c;欢迎大家关注&&am…

IDEA里面的长截图插件

1.我的悲惨经历 兄弟们啊&#xff0c;我太惨了&#xff0c;我刚刚在准备这个继承和多态的学习&#xff0c;写博客的时候想要截图代码&#xff0c;因为这个代码比较大&#xff0c;一张图截取不下来&#xff0c;所以需要长截图&#xff0c;之前使用的qq截图突然间拉胯&#xff0…

栈和队列相互实现(Java)

本篇任务 前篇我们分别介绍了栈和队列&#xff0c;并对其进行了简单的自我实现&#xff0c;本篇我们将通过栈和队列的相互实现来进一步熟悉和运用栈和队列&#xff0c;如下是我们将要完成的题目&#xff1a; 用队列实现栈https://leetcode-cn.com/problems/implement-stack-u…

【2022工业图像异常检测文献】CFLOW-AD: 通过条件归一化流实现实时无监督定位异常检测

CFLOW-AD: Real-Time Unsupervised Anomaly Detection with Localization via Conditional Normalizing Flows 1、Background 虽然最近提出针对此类数据设置的模型在准确性指标上取得了很高的成绩&#xff0c;但它们的复杂性限制了实时处理的能力。 CFLOW-AD由一个经过判别式…

区块链+Web3学习笔记

学习资料来源于B站&#xff1a; 17小时最全Web3教程&#xff1a;ERC20&#xff0c;NFT&#xff0c;Hardhat&#xff0c;CCIP跨链_哔哩哔哩_bilibili 该课程提供的Github代码地址&#xff0c;相关资料详见README.md&#xff1a; Web3_tutorial_Chinese/README.md at main sm…

Netty系列-8 Netty处理粘包和半包问题

1.半包和粘包问题 TCP协议是基于字节流的数据通讯协议&#xff0c;数据被看做是一连串的字节流&#xff1b;不具备边界信息&#xff0c;给接收方带来半包和粘包问题。 半包&#xff1a;TCP传输时&#xff0c;将数据切割成一个个数据包进行传输。接收方一次读取操作&#xff0c…

吉他弹唱打谱软件哪个好用 吉他弹唱制谱教程

吉他这门乐器一直受到大众的欢迎&#xff0c;究其原因&#xff0c;还是因为其成本低廉、易上手的特性。但是吉他是一个入门容易精通难的乐器&#xff0c;想要成为一个资深的吉他玩家&#xff0c;那么就少不了用到一些吉他弹唱打谱软件。今天我们就来说一说吉他弹唱打谱软件哪个…

学习 CSS 新的属性 conic-gradient 实现环形进度条

我们在工作中用到环形进度条的时候&#xff0c;一般都是使用组件库提供的&#xff0c;那么你有没有想过这是怎么实现的呢&#xff1f; <divclass"progress"style"--progress: 80%; --last: 20%"data-progress"80%"></div><style …

【宽搜】2. leetcode 102 二叉树的层序遍历

题目描述 题目链接&#xff1a;二叉树的层序遍历 根据上一篇文章的模板可以直接写代码&#xff0c;需要改变的就是将N叉树的child改为二叉树的left和right。 代码 class Solution { public:vector<vector<int>> levelOrder(TreeNode* root) {vector<vector&…

k8s的学习和使用

为什么用k8s&#xff0c;不用docker&#xff1f; k8s更适合复杂的微服务架构和大规模的容器应用。 Pods(Pod) Pod是k8s最小可部署单元&#xff0c;他包含一个或多个相关容器。这些容器共享网络命名空间和存储卷&#xff0c;他们通常协同工作来构成一个应用程序。 Serv…

开启AI新篇章:探索GPT-4与大模型!订阅方案!简单支付!

开启AI新篇章&#xff1a;探索GPT-4的无限可能 随着人工智能技术的飞速发展&#xff0c;我们正处于一个前所未有的变革时代。作为人工智能领域的领导者&#xff0c;OpenAI 推出的GPT-4&#xff0c;以其卓越的自然语言处理能力和强大的计算潜力&#xff0c;引发了行业内外的广泛…

深入浅出MySQL

深入浅出MySQL 以下内容参考自 《MySQL是怎样运行的&#xff1a;从根儿上理解MySQL》一书&#xff0c;强烈推荐 存储引擎 对于不同的表可以设置不同的存储引擎 CREATE TABLE tableName (xxxx ) ENGINE 引擎名称; # 修改 ALTER TABLE tableName ENGINE xxx; 编码格式 my…

(C语言贪吃蛇)10.贪吃蛇向右自行行走

目录 前言 本节内容 实现效果 修改后的代码 其他封装函数&#xff1a; 运行效果 总结 前言 我们上节讲解了关于贪吃蛇撞墙然后死翘翘重新初始化蛇身的操作&#xff0c;主要是关于程序初始化释放内存的操作&#xff0c;不理解的再去看看&#x1f618;(贪吃蛇撞墙找死详解)。…

SpringBoot技术栈:构建高效古典舞交流平台

第二章 相关技术介绍 2.1Java技术 Java是一种非常常用的编程语言&#xff0c;在全球编程语言排行版上总是前三。在方兴未艾的计算机技术发展历程中&#xff0c;Java的身影无处不在&#xff0c;并且拥有旺盛的生命力。Java的跨平台能力十分强大&#xff0c;只需一次编译&#xf…

openpnp - 吸嘴校正失败的opencv参数分析

文章目录 openpnp - 吸嘴校正失败的opencv参数分析概述笔记阶段验证 - N2吸嘴校验完NT1NT2 阶段验证 - 底部相机高级校验完NT1NT2 参数比对保存 “阶段验证 - N2吸嘴校验完” 的NT1/NT2图像重建参数检测环境NT1ok的3个参数值NT1err的3个参数值NT2ok的3个参数值NT2err的3个参数值…

如何入门运动规划算法? 50篇教程教你手把手推导公式! 实现代码!

经常听到有想入门规划算法的同学说: 各路教程不成体系, 不知从何学起? 网上的规划算法教程资料确实很多. 但是东一篇frenet, 西一篇QP优化, 大部分都是各路大佬写给自己看的学习笔记, 杂乱无章不成体系. 有没有给小白看的, 完整成体系的运动规划算法教程呢? 穷学生囊中羞…

Redis入门第四步:Redis发布与订阅

欢迎继续跟随《Redis新手指南&#xff1a;从入门到精通》专栏的步伐&#xff01;在本文中&#xff0c;我们将深入探讨Redis的发布与订阅&#xff08;Pub/Sub&#xff09;模式。这是一种强大的消息传递机制&#xff0c;适用于各种实时通信场景&#xff0c;如聊天应用、实时通知和…