监控kafka topic,钉钉报警

news2024/10/6 22:24:18

1、需求的诞生

前几天公司我们部门需要演示一个应用,应用依赖kafka的数据,但是kafka的数据来自其他部门的投递。

一些原因导致数据无法给到,导致我们部门的演示也很有问题,所以想做一个简单的kafka topic的监控,在没有数据的时候及时发现并找兄弟部门沟通

这里记录下原因,因为机房的带宽只有500M,其他部门在做一些视频录制的工作,导致带宽满了,往kafka生产数据的producer无法发送到。

2、kafka监测

kafka的检测有很多方案,但是因为我们在测试环境使用,讲究一个轻量级,所以直接写一个小程序监控就得了。

kafka的监控没搞过,但是用过Offset Explorer,Offset中可以看到总的消息数量,延续这个思路,只要这个数值有变化,证明又在投递。这是思路

2.1 加入依赖

创建一个Springboot的项目,这个没难度,也不说了

这里把所有的依赖都放这了,主要是kafka和等会要用的钉钉的sdk

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.22</version>
    <scope>compile</scope>
</dependency>
<dependency>
    <groupId>com.aliyun</groupId>
    <artifactId>alibaba-dingtalk-service-sdk</artifactId>
    <version>2.0.0</version>
</dependency>

2.2 看下配置文件

application.yml

spring:
  kafka:
    bootstrap-servers: kafka地址
    consumer:
      auto-offset-reset: earliest
      group-id: test-xiangcai
dingding:
  webHook: https://oapi.dingtalk.com/robot/send?access_token=xxx
  topics: topic1,topic2
  sign: sign

2.3 写一个类获取topic的数据

@Component
public class KafkaTools {
    private static KafkaTools _this;
    @Autowired
    private ConsumerFactory<Long, String> consumerFactory;
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    Consumer<Long, String> consumer;
    @PostConstruct
    public void init() {
        _this = this;
        _this.consumer = consumerFactory.createConsumer();
    }
    public static Long getTopicCount(String topic) {
        List<PartitionInfo> partitionInfos = _this.kafkaTemplate.partitionsFor(topic);
        long count = 0L;
        for (PartitionInfo str : partitionInfos) {
            TopicPartition topicPartition = new TopicPartition(topic, str.partition());
            long logEndOffset = _this.consumer.endOffsets(List.of(topicPartition)).get(topicPartition);
            _this.consumer.assign(List.of(topicPartition));
            long currentOffset = _this.consumer.position(topicPartition);
            count += (logEndOffset - currentOffset);
            System.err.println( "  topic  " + topic + "  最后的offset is  " + logEndOffset + "    currentOffset  " + currentOffset + "  totalCount  " + (logEndOffset - currentOffset));
        }
        return count;
    }
}

解释下重点代码

LOG-END-OFFSET :每个分区当前最新生产的消息的位移值

CURRENT-OFFSET :该消费者组当前最新消费消息的位移值

注:因为内部是使用的单机的kafka,并且也只有一个partition

3、发送到钉钉

想做一个提醒,最初的方案是发送的邮箱,没有养成看邮件的习惯,所以想要发送到微信或者钉钉,看了下方案,钉钉是最容易实现的,所以这里选择使用钉钉

3.1 钉钉webhook

创建一个钉钉群,点击右上角群设置,机器人,进入到机器人的设置界面,然后点击添加机器人,选择自定义机器人,进入界面就好

 

这里可以设置机器人的头像,机器人的名字,同时也展示webhook的地址,可以通过这个url 发送消息到群里。

这里选择的安全设置加签的方式,可以参考说明文档,等会代码展示

3.2 钉钉发送消息到群里

看下代码,这里设置的消息为文本,并且使用了at全体人

/**
 * 发送消息到钉钉
 *
 * @param topic
 */
private void sendMsgToDingDing(String topic) {

    try {
        String url = makeUrl(dingDingConfig.getWebHook(), dingDingConfig.getSign());
        DingTalkClient client = new DefaultDingTalkClient(url);
        OapiRobotSendRequest request = new OapiRobotSendRequest();
        request.setMsgtype("text");
        OapiRobotSendRequest.Text text = new OapiRobotSendRequest.Text();
        text.setContent("192.168.2.8 kafka  " + topic + "超过2分钟没有数据,检查下");

        OapiRobotSendRequest.At at = new OapiRobotSendRequest.At();
        // isAtAll类型如果不为Boolean,请升级至最新SDK
        at.setIsAtAll(true);
        request.setAt(at);
        request.setText(text);

        client.execute(request);
    } catch (Exception e) {
    }
}

/**
 * 组装url
 *
 * @param url
 * @param secret
 * @return
 * @throws Exception
 */
public static String makeUrl(String url, String secret) throws Exception {
    Long timestamp = System.currentTimeMillis();

    String stringToSign = timestamp + "\n" + secret;
    Mac mac = Mac.getInstance("HmacSHA256");
    mac.init(new SecretKeySpec(secret.getBytes("UTF-8"), "HmacSHA256"));
    byte[] signData = mac.doFinal(stringToSign.getBytes("UTF-8"));
    String sign = URLEncoder.encode(new String(Base64.encodeBase64(signData)), "UTF-8");

    return url + String.format("&timestamp=%d&sign=%s", timestamp, sign);
}

看下效果

 

4、定时器功能

定时器直接使用springboot 的 schedule,在application上添加 直接 @EnableScheduling

创建定时job,并且设置2分钟执行一次,两分钟发现没有数据更新就会发送消息到钉钉提醒

public static Map<String, Long> countMap = new HashMap<>();

//    @Scheduled(cron = "0/5 * *  * * ? ")   //每5秒执行一次
@Scheduled(cron = "0 0/2 * * * ?") //  2分钟执行一次
public void execute() {
    for (String topic : dingDingConfig.getTopics().split(",")) {
        Long topicCount = KafkaTools.getTopicCount(topic);
        Long orDefault = countMap.getOrDefault(topic, 0L);
        if (orDefault.equals(topicCount)) {
            if (System.currentTimeMillis() - preDingTime > 60 * 60 * 1000L) {
                sendMsgToDingDing(topic);
                preDingTime = System.currentTimeMillis();
            }
        } else {
            countMap.put(topic, topicCount);
        }
    }
    System.err.println("job 执行时间  " + new Date());
}

这里做了一些处理

第一个点就是 在内存中保存数量,没有使用数据库,主要考虑到轻量级,在测试环境

第二个就是钉钉消息的发送,防止出现消息喷涌,这里限制了1小时才可以发送一次

5、打包部署

看下dockerfile

#项目所依赖的jdk镜像
FROM openjdk:11-jdk
RUN mkdir -p /app
WORKDIR /app
#设置时区
RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
#将maven构建好的jar添加到镜像中,第二个为别名
COPY start.sh start.sh
ADD target/kafkaMonitor.jar kafkaMonitor.jar
#镜像所执行的命令
ENTRYPOINT ["sh","start.sh"]

看下启动脚本

echo "192.168.2.8 kafka.tyjt.com" >> /etc/hosts
java -jar kafkaMonitor.jar
echo 'finish start!!'

很简单直接启动,这个程序主打就是一个轻便

5、总结

这只是一个小的需求,没有使用一些重量级的组件,尽可能的快速的解决问题。

赠人玫瑰,手留余香,感谢点赞,关注,评论。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/539604.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

内网渗透之权限维持-黄金白银票据隐藏账户远控-RustDeskGotoHTTP

0x01权限维持-隐藏用户 CreateHiddenAccount工具 CreateHiddenAccount -u test -p Psswrd用户管理能查看到&#xff0c;命令查看看不到&#xff0c;单机版无法删除(不在任何组)&#xff0c;域环境(在administrator组中)可以删除 0x02权限维持-黄金白银票据 ⻩⾦票据⽣成攻…

捷威信keithley吉时利2410数字源表 销售回收KEITHLEY2470新款源表

吉时利Keithley 2410 /2470高压源表/数字源表 产品概览 Keithley 2410 高压源表专为需要紧密耦合源和测量的测试应用而设计。Keithly 2410 提供精密电压和电流源以及测量功能。它既是高度稳定的直流电源&#xff0c;又是真正的仪器级 5-1/2 数字万用表。电源特性包括低噪声、…

three.js实现拖拽生成模型场景

THREE.js 可以实现很多web 3D的效果&#xff0c;最近想实现一个拖拽生成场景的功能&#xff0c;主要用到拖拽API,draggable 的几个事件来实现&#xff0c;拖拽模型时记录模型属性&#xff0c;释放到画布时生成当前屏幕坐标对应的焦点上&#xff0c;最后生成模型。 <!DOCTYP…

增量更新和原始快照

三色标记法 2.1 基本算法 要找出存活对象&#xff0c;根据可达性分析&#xff0c;从GC Roots开始进行遍历访问&#xff0c;可达的则为存活对象&#xff1a; 最终结果&#xff1a;A/D/E/F/G 可达 我们把遍历对象图过程中遇到的对象&#xff0c;按“是否访问过”这个条件标记成…

【LeeCode】每日一题:复制带随机指针的链表

​&#x1f47b;内容专栏&#xff1a;《LeetCode刷题专栏》 &#x1f428;本文概括&#xff1a; 138.复制带随机指针的链表 &#x1f43c;本文作者&#xff1a;花 碟 &#x1f438;发布时间&#xff1a;2023.5.18 ​复制带随机指针的链表 力扣链接-> 138.复制带随机指针的链…

VS2019新建WebService/Web服务/asmx并通过IIS实现发布和调用

场景 对接第三方系统提供接口文档中显示为asmx接口 访问接口返回数据格式为 xml中的数据格式为json数据。 需要在本地新建并模拟调试环境。 注&#xff1a; 博客&#xff1a;霸道流氓气质的博客_CSDN博客-C#,架构之路,SpringBoot领域博主 实现 1、WebService WebService…

LeetCode刷题 --- 链表

定义一个node节点 class ListNode {int val;ListNode next;ListNode() {}ListNode(int val) {this.val val;}ListNode(int val, ListNode next) {this.val val;this.next next;} } 206 反转链表 题目&#xff1a;给你单链表的头节点 head &#xff0c;请你反转链表&#x…

【FAQ】视频编辑服务常见问题及解答

Q1问题描述 1、 访问贴纸等素材的时候提示“网络异常&#xff0c;请重试”怎么办&#xff1f; 2、 使用AI能力时&#xff0c;提示“errorCode:20124 errorMsg:Method not Allowed”&#xff1f; 解决方案 请做以下检查&#xff1a; 1、 在代码中检查鉴权信息是否已设置。如…

AI法律助手:ChatGPT如何提供智能化解决方案

日常生活中&#xff0c;民事纠纷不可避免&#xff0c;涉及到多种问题&#xff0c;如合同纠纷、劳动纠纷、婚姻家事、民间借贷、交通事故、工伤赔偿、房屋租赁等。解决这些问题&#xff0c;需要花费大量时间和精力&#xff0c;但现在&#xff0c;我们有了一种全新的解决方案&…

蓝库云:建立智慧零售,零代码技术能起到什么作用

随着科技的进步更多智能化的技术及应用融入我们生活中&#xff0c;例如零售行业在现代零售和传统零售的区别不仅在于营销策略、销售方式、销售渠道以及运营模式等方面&#xff0c;更多是在现代零售则更注重数字化营销&#xff0c;发挥社交媒体和电子商务渠道的作用&#xff0c;…

“前端”工匠系列(二):合格的工匠,怎么做好价值落地 | 京东云技术团队

一、“技术鄙视链&#xff1f;” 如果你是一个技术人&#xff0c;相信都知道技术圈有个相互的鄙视链&#xff0c;这个链条从技术人自己认知的角度在以业务价值为中心嵌套的一层一层的环&#xff0c;就像洋葱&#xff0c;具体的描述这里不赘述了。 出门左拐随便抓住一个人问一…

RabbitMQ应用问题——消息补偿机制以及代码示例

RabbitMQ应用问题——消息补偿机制以及代码示例 RabbitMQ应用问题 消息可靠性的保障 消息补偿机制 详细说明 这里使用了简单的代码进行演示&#xff0c;订单的消费者没有写&#xff0c;在订单的消费同时&#xff0c;发送一条增加积分消息到积分队列。 详细流程途中都有注明…

STM32+ov7725+ESP8266实现无线图传-完成上位机图像显示

一、需求 stm32f407探索者开发板和STM32F103ZET6战舰开发板。接正点原子ov5640、OV7725、OV2640摄像头,通过esp8266Wi-Fi模块(透传模式)将摄像头采集到的rgb565格式图片通过tcp/ip协议上传到上位机显示。 二、设计思路 【1】使用QT开发上位机,建立TCP服务器,接收ESP8266…

DistilPose: Tokenized Pose Regression with Heatmap Distillation

论文名字&#xff1a;DistilPose&#xff1a;使用热图蒸馏的令牌化姿势回归 论文地址&#xff1a;2303.02455.pdf (arxiv.org)https://arxiv.org/pdf/2303.02455.pdf项目地址&#xff1a;yshMars/DistilPose: Implementation for: DistilPose: Tokenized Pose Regression with…

科幻风的卡片视频播放

上一篇博文展示了卡片中的VR展示&#xff0c;那篇主要是卡片的3D转动来展示未显示的部分图片。这篇&#xff0c;我们来点科幻的。 我们在卡片中播放视频的同时来拖动卡片或转动它。像下面那样&#xff1a; 这个主要依赖了两个库&#xff0c;具体代码如下&#xff1a; <!D…

智能问答支持自定义问答

# -*- coding: utf-8 -*- # Time : 2023-5-12 14:15 # Author : shenzh # FileName: chat_bot_v1.py # Software: PyCharm """Description:一期智能机器人设计完成&#xff0c;支持自定义问题和答案随时增加功能""" import json import jie…

NOV Diagram for .NET Crack

NOV Diagram for .NET Crack 增加了对Microsoft.NET 7.0的支持-NOV现在完全支持.NET Core 7.0&#xff0c;此外还支持Microsoft.NET Framework 4.7.2、.NET Core 5.0和.NET Core 6.0的内部版本。 添加了对读取Microsoft Visio 2003-2010绘图(VSD文件)的支持。 改进了SVG导出。 …

哈希表应用——位图

应用场景&#xff1a;海量数据处理&#xff08;这里的海量是指一般数据量非常大如以亿为单位的数据量&#xff09; 目录 面试题 位图概念 位图的实现 位图的应用 应用一 应用二 位图应用变形 面试题 给40亿个不重复的无符号整数&#xff0c;没排过序。给一个无符号整数&…

Java之多线程进阶

目录 一.上节内容复习 1.线程池的实现 2.自定义一个线程池,构造方法的参数及含义 3.线程池的工作原理 4.拒绝策略 5.为什么不推荐系统提供的线程池 二.常见的锁策略 1.乐观锁和悲观锁 2.轻量级锁和重量级锁 3.读写锁和普通互斥锁 4.自旋锁和挂起等待锁 5.可重入锁和…

精彩回顾 | Fortinet Accelerate 2023·中国区巡展厦门站

Fortinet Accelerate 2023中国区 5月16日&#xff0c;Fortinet Accelerate 2023中国区巡展来到魅力“鹭岛”——厦门&#xff0c;技术、产品和业务专家&#xff0c;携手亚马逊云科技、唯一网络等云、网、安合作伙伴&#xff0c;与交通、物流、金融等各行业典型代表客户&#x…