Kafka实战 - 03 Kafka生产者:将X平台的告警和事件数据接入到S平台

news2024/11/29 8:33:19

文章目录

      • 1. 项目背景
      • 2. 依赖和配置
      • 3. 生产者配置 KafkaConfiguration
      • 4. 同步数据Topic枚举 SyncDataTopicEnum
      • 5. 请求体 DataSyncQo
      • 6. 同步数据控制层 AppSyncDataController
      • 7. 同步数据业务层 XdrDataSyncServiceImpl

1. 项目背景

资产可能会遭受各种网络攻击,安全事件和安全告警就是已经被攻击的资产产生的日志,一条攻击链路可能会经过多个资产,由此产生的日志为安全事件,而具体某一个被攻击的资产产生的日志为安全告警。一个安全事件关联多个安全告警,安全事件存在数据库mongodb中,安全告警存在数据库ElasticSearch中。

SIR和XDR是两个不同的产品,SIR平台是安全事件协同响应平台,能够根据安全告警和安全事件日志对已经遭受攻击的资产进行处置闭环,处置完成后需要修改安全事件和安全告警日志的处置状态。

但是XDR产品无法对安全事件和安全告警处置闭环,因此需要将XDR平台的安全告警和安全事件数据接入到SIR平台处置闭环,待处置完成后将数据的处置状态同步给XDR平台,保证两个平台的数据的处置状态一致;

所以下面要做的就是XDR平台的数据上报:

将XDR平台的安全告警和安全事件数据发送到指定的topic中,SIR平台通过Python脚本从topic中取出数据将安全事件存在数据库mongodb中,安全告警存在数据库ElasticSearch中。
在这里插入图片描述

2. 依赖和配置

相关文章:Kafka实战 - 01 自定义 SpringBoot Starter 实现 Kafka 的自动配置

在 ngsoc-open 服务中引入自定义的 ngsoc-common-kafka 的依赖:

<dependency>
    <groupId>com.hh</groupId>
    <artifactId>ngsoc-common-kafka</artifactId>
    <version>3.0.1</version>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

kafka的相关配置:项目的配置中心使用的confd,#{{}}是相关配置文件语法,不影响

ngsoc:
  kafka:
    clusters:
      - name: ngsoc
        #{{$data := json (getv "/ngsoc/kafka/common/cluster/conn_info")}}
        bootstrap-servers: #{{range $data.route}}
          - '127.0.0.1:9092' #{{end}}
        topics:
          - name: NGSOC_APP_ALARM
            partition: 1
            replication: 1
          - name: NGSOC_APP_INCIDENT
            partition: 1
            replication: 1

3. 生产者配置 KafkaConfiguration

@Configuration
public class KafkaConfiguration {

    @Bean("jsonKafkaTemplate")
    public KafkaTemplate<String, Object> jsonKafkaTemplate(ProducerFactory<String, Object> pf) {
        Map<String, Object> config = Map.of(
                // 3次重试
                ProducerConfig.RETRIES_CONFIG, "3",
                // 5ms批量发送
                ProducerConfig.LINGER_MS_CONFIG, "500",
                // JSON序列化
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class
        );

        return new KafkaTemplate<>(pf, config );
    }
}

4. 同步数据Topic枚举 SyncDataTopicEnum

public enum SyncDataTopicEnum {

    /**
     * 安全告警
     */
    ALARM(0, "NGSOC_APP_ALARM"),

    /**
     * 安全事件
     */
    INCIDENT(1, "NGSOC_APP_INCIDENT");

    private final int type;
    private final String name;

    SyncDataTopicEnum(int type, String name) {
        this.type = type;
        this.name = name;
    }

    public static String getTopicNameByType(int type) {
        for (SyncDataTopicEnum constants : SyncDataTopicEnum.values()) {
            if (type == constants.type) {
                return constants.name;
            }
        }
        return null;
    }

    public static boolean contains(int type) {
        for (SyncDataTopicEnum value : SyncDataTopicEnum.values()) {
            if (type == value.type) {
                return true;
            }
        }
        return false;
    }
}

5. 请求体 DataSyncQo

@Data
public class DataSyncQo implements ValidateAble {

    @ApiModelProperty("数据类型,0-安全事件,1-安全告警")
    @NotNull(message = "data.type.must.be.not.null")
    private Integer type;

    @ApiModelProperty("具体数据")
    @NotEmpty(message = "data.must.not.empty")
    private List<Object> data;

    private final int DATA_MAX_SIZE = 200;

    @Override
    public void validate() throws ValidateException {
        if (!SyncDataTopicEnum.contains(type)) {
            throw new ValidateException("type.of.data.is.not.valid");
        }

        if (data.size() > DATA_MAX_SIZE) {
            throw new ValidateException("data.size.limit");
        }
    }
}

6. 同步数据控制层 AppSyncDataController

@RestController
@RequestMapping("/api/v1/app")
public class AppSyncDataController {

    @Setter(onMethod_ = @Autowired)
    private IXdrDataSyncService incidentUploadService;

    @OpenApi
    @PostMapping("/syncData")
    @CheckValidateAble
    @OperateLog(target = "app.data", action = "xdr.data.sync")
    public ApiResponse<Object> upload(@RequestHeader("Authorization") String key, @Validated @RequestBody DataSyncQo qo) {
        return incidentUploadService.upload(key, qo);
    }
}

7. 同步数据业务层 XdrDataSyncServiceImpl

@Slf4j
@Service
public class XdrDataSyncServiceImpl implements IXdrDataSyncService {

    @Autowired
    @Qualifier("jsonKafkaTemplate")
    private KafkaTemplate<String, Object> kafkaTemplate;

    @Override
    public ApiResponse<Object> upload(String key, DataSyncQo qo) {
        // 根据同步数据类型获取topic名称
        String topicName = Objects.requireNonNull(SyncDataTopicEnum.getTopicNameByType(qo.getType()));
        // 将同步数据发送到topic中
        pushToTopic(topicName, qo);
        return ApiResponse.ok();
    }

    private void pushToTopic(String topicName, DataSyncQo qo) {
 log.info("[XDR数据上报]XDR数据[{}]正在进行上报,数据长度为:{}", topicName, qo.getData().size());
        for (Object data : qo.getData()) {
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topicName, data);
            future.addCallback(new ListenableFutureCallback<>() {
                @Override
                public void onFailure(@NotNull Throwable t) {
                    log.error("[XDR数据上报]设备上传的数据打入Kafka异常:", t);
                }

                @Override
                public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                    // 正确不做记录
                }
            });
        }
        log.info("[XDR数据上报]XDR数据[{}]上报行为执行完毕", topicName);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/87931.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[附源码]Python计算机毕业设计SSM基于JAVA语言的宠物寄养管理(程序+LW)

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

1,4-丁炔二醇BYD物料中含有大量铜离子、二氧化硅等杂质怎么办?

1,4-丁炔二醇BYD&#xff08;but-2-yne-1,4-diol&#xff09;是一种重要的中间体化工原料&#xff0c;广泛应用于生产丁二醇及其下游产品、维生素B6的主要原料&#xff0c;还可以用于镀镍的增亮剂、防腐抑制剂等领域。 1,4&#xff0d;丁二醇&#xff08;BDO&#xff09;是一种…

[附源码]Python计算机毕业设计SSM基于web的社团管理系统(程序+LW)

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

免费开源的图片修复工具Lama Cleaner

什么是 Lama Cleaner &#xff1f; Lama Cleaner 是由 SOTA AI 模型提供支持的图像修复工具。可以从图片中移除任何不需要的物体、缺陷和人&#xff0c;或者擦除并替换&#xff08;powered by stable diffusion&#xff09;图片上的任何东西。 看看官方提供的视频&#xff0c;应…

图片怎么转换成PDF格式?这两种方法都可以实现转换

怎么把图片转换成PDF格式呢&#xff1f;大家在日常中也会经常使用到图片&#xff0c;不管是出门游玩还是办公学习&#xff0c;图片都会给我们带来极大的便利。但是一旦图片的数量多了起来&#xff0c;我们又不能删除&#xff0c;那么这些图片的存放就是一个关键的问题&#xff…

[附源码]计算机毕业设计的小说阅读系统Springboot程序

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; Springboot mybatis MavenVue等等组成&#xff0c;B/S模式…

“数据湖存储”冠军杯足球赛开幕,腾讯云存储出征!

“数据湖存储”冠军杯是数据湖领域内的世界级赛事&#xff0c;随着云上“数据湖存储”产品理念的逐步普及&#xff0c;今年的比赛也获得了国内外众多球迷的关注。腾讯云以COS、GooseFS、GooseFSx、元数据加速器、COS加速器等球员组成的球队一路披荆斩棘&#xff0c;成为最闪耀的…

化工集团公司安全风险智能化管控平台

加快数字化发展&#xff0c;大力推进信息化、工业化融合&#xff0c;是国家新时代、新阶段作出的重要决策部署&#xff0c;是化工集团公司打造世界领先企业的必由之路。要充分认识加快数字化发展的重要性紧迫性。要锚定集团公司数字化转型升级的总目标&#xff0c;坚持顶层设计…

Java 开发如何通过 IoT 边缘 ModuleSDK 进行协议转换

操作场景 使用 ModuleSDK 开发插件应用&#xff0c;接入其他协议设备&#xff08;如 HTTP 请求数据&#xff09;&#xff0c;将其他协议的数据转化为 MQTT 协议 JSON 数据上报到 IoTDA。 代码解析 项目结构如下 ModbusDriver 代码解析 片段一 通过 DriverClient.createFromEnv…

[附源码]Nodejs计算机毕业设计基于Web课堂签到管理系统Express(程序+LW)

该项目含有源码、文档、程序、数据库、配套开发软件、软件安装教程。欢迎交流 项目运行 环境配置&#xff1a; Node.js Vscode Mysql5.7 HBuilderXNavicat11VueExpress。 项目技术&#xff1a; Express框架 Node.js Vue 等等组成&#xff0c;B/S模式 Vscode管理前后端分…

web开发框可以为提升办公效率赋能吗?

提升办公效率可以引用专业的web开发框架&#xff0c;值得一提的是&#xff0c;随着数字化时代的到来&#xff0c;要想做好数据管理&#xff0c;低代码开发平台功不可没&#xff0c;在做好数据管理的过程中发挥了重要的应用价值。研发低代码开发平台&#xff0c;流辰信息一直都以…

从BI到ABI,守正创新的思迈特软件持续推进国产BI产业创新

‍‍数据智能产业创新服务媒体——聚焦数智 改变商业近年来&#xff0c;国内外数字化转型加速渗透&#xff0c;企业客户已不再满足“用上”数据&#xff0c;能否“用好”数据、提升管理效率成为企业数字化转型的核心诉求。在提升企业管理效率的工具中&#xff0c;商业智能&…

Linux C编程一站式学习笔记1

Linux C编程一站式学习笔记 chap1程序的基本概念 打算重学计算机&#xff0c;重学C语言 这本书的前言写的真好 实在是惭愧… 文章目录Linux C编程一站式学习笔记 chap1程序的基本概念一.程序和编程语言1.什么是程序2.程序由指令组成3.编程语言编译执行过程解释执行过程本节总结…

力扣(LeetCode)1697. 检查边长度限制的路径是否存在(C++)

并查集离线查询 由于评测系统对 vectorvectorvector 的排序可能较慢&#xff0c;使用结构体保存 vectorvectorvector &#xff0c;接下来的查询和边集就对结构体操作。 结构体的属性 aaa 点 、 bbb 点 、ccc 长度、 ddd 顺序。重载 <<< &#xff0c;排序时按照 ccc …

ssm+Vue计算机毕业设计校园图书漂流系统(程序+LW文档)

ssmVue计算机毕业设计校园图书漂流系统&#xff08;程序LW文档&#xff09; 项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技…

C++基础篇之什么是 数据结构

&#x1f4d2;博客主页&#xff1a; ​​开心档博客主页​​ &#x1f389;欢迎关注&#x1f50e;点赞&#x1f44d;收藏⭐留言&#x1f4dd; &#x1f4cc;本文由开心档原创&#xff01; &#x1f4c6;51CTO首发时间&#xff1a;&#x1f334;2022年12月12日&#x1f334; ✉…

12.1、后渗透测试--提权

meterpreter提权方式&#xff1a; getsystem自动提权bypassuac提权migrate1、getsystem自动提权 meterpreter > getsystemgetsystem工作原理&#xff1a;getsystem创建一个新的Windows服务&#xff0c;设置为SYSTEM运行&#xff0c;当它启动时连接到一个命名管道。getsystem…

Java 线程池详解

线程池&#xff08;thread pool&#xff09;&#xff1a;一种线程使用模式。线程过多会带来调度开销&#xff0c;进而影响缓存局部性和整体性能。而线程池维护着多个线程&#xff0c;对线程统一管理。 使用线程池的优势 提高效率&#xff0c;创建好一定数量的线程放在池中&am…

技术分享 | 被测系统架构与数据流分析

深入了解测试过程中被测系统的架构与数据流&#xff0c;有助于理解业务逻辑&#xff0c;梳理业务用例以及促进部门协同。 更深的理解业务逻辑是指要分析公司是做什么的&#xff0c;公司的重要的商务决策是什么&#xff0c;公司内部数据流是怎么运行的&#xff0c;有哪些常见的…

数字孪生重点商业实践展示

数字孪生是一个系统或一组对象的虚拟表示。数字孪生背后的技术旨在准确反映系统的生命周期和应用程序&#xff0c;并使用机器学习、模拟和人工智能的组合来帮助在现实生活中运行之前对使用、问题或效率进行建模。如今&#xff0c;数字孪生技术的爆火离不开人们对其商业价值的看…