使用Python实现自动化查询IP威胁情报

news2024/11/18 23:26:53

解决的问题

一名网络安全从业人员在做日常网络安全运营分析时,就从防火墙、IDP、WAF等安全设备的日志分析计算,基本都会面对成千上万条日志,好一点的可能会有态势感知之类安全产品提供辅助分析,如果没有的就只能单条进行分析,造成的工作量必定是很大,也有可能存在从业人员的投入与产出形成差异。

遇到类似的问题要怎么办?我们可以采用现有开放的资源提供辅助手段,这里使用微步在线情报查询平台进行操作,本文包含IP信誉、IP分析功能,注册账号后IP信誉功能个人每天可以查询50次,通过工作认证后IP分析功能总共有1000次查询次数,可以按需采用不同的功能进行查询。

当然以上的说明只是实现了基本的功能(相当于是造个小轮子),详见第四部分扩展功能。

官方接口文档及示例

IP信誉功能

针对入站场景的IP进行分析,能够提供IP的地理位置、ASN信息,通过判定规则精准判别IP是否恶意、风险严重级别、可信度级别;识别威胁类型,如:漏洞利用(exploit)、傀儡机(Zombie)、代理(Proxy)、可疑(Suspicious)等及相关安全事件或团伙标签。

import requests
url = "https://api.threatbook.cn/v3/scene/ip_reputation"
query = {
"apikey":"请替换apikey",
"resource":"159.203.93.255"
}
response = requests.request("GET", url, params=query)
print(response.json())

IP分析

可针对业务日志,以及从防火墙、WAF等安防设备中获取的外部IP,进行分析。获取IP相关地理位置、ASN信息,综合判定威胁类型如:
远控(C2)、傀儡机(Zombie)、失陷主机(Compromised)、扫描(Scanner)、钓鱼(Phishing)等,相关攻击团伙或安全事件标签,原始情报,相关样本信息等。

import requests
url = "https://api.threatbook.cn/v3/ip/query"
query = {
"apikey":"请替换apikey",
"resource":"159.203.93.255"
}
response = requests.request("GET", url, params=query)
print(response.json())

部分代码功能说明

定义威胁类型字典

参考平台的API文档对威胁类型进行整理,因包含恶意、非恶意的威胁类型,我们分别定义了全量威胁类型、非恶意威胁类型两个字典。

# 定义全量威胁类型字典

threat_type = {
'C2': '远控', 'Botnet': '僵尸网络', 'Hijacked': '劫持', 'Phishing': '钓鱼', 'Malware': '恶意软件',
'Exploit': '漏洞利用', 'Scanner': '扫描', 'Zombie': '傀儡机', 'Spam': '垃圾邮件', 'Suspicious': '可疑',
'Compromised': '失陷主机', 'Whitelist': '白名单', 'Brute Force': '暴力破解', 'Proxy': '代理',
'MiningPool': '矿池', 'CoinMiner': '私有矿池', 'suspicious_application': '可疑恶意软件',
'suspicious_website': '可疑恶意站点', 'Fakewebsite': '仿冒网站', 'Sinkhole C2': '安全机构接管 C2',
'SSH Brute Force': 'SSH暴力破解', 'FTP Brute Force': 'FTP暴力破解', 'SMTP Brute Force': 'SMTP暴力破解',
'Http Brute Force': 'HTTP AUTH暴力破解', 'Web Login Brute Force': '撞库', 'HTTP Proxy': 'HTTP Proxy',
'HTTP Proxy In': 'HTTP代理入口', 'HTTP Proxy Out': 'HTTP代理出口', 'Socks Proxy': 'Socks代理',
'Socks Proxy In': 'Socks代理入口', 'Socks Proxy Out': 'Socks代理出口', 'VPN': 'VPN代理', 'VPN In': 'VPN入口',
'VPN Out': 'VPN出口', 'Tor': 'Tor代理', 'Tor Proxy In': 'Tor入口', 'Tor Proxy Out': 'Tor出口', 'Bogon': '保留地址',
'Full Bogon': '未启用IP', 'Gateway': '网关', 'IDC': 'IDC服务器', 'Dynamic IP': '动态IP', 'Edu': '教育',
'DDNS': '动态域名', 'Mobile': '移动基站', 'Search Engine Crawler': '搜索引擎爬虫', 'CDN': 'CDN服务器',
'Advertisement': '广告', 'DNS': 'DNS服务器', 'BTtracker': 'BT服务器', 'Backbone': '骨干网', 'ICP': 'ICP备案',
'IoT Device': '物联网设备', 'Web Plug Deployed': '部署网站插件', 'Gameserver': '游戏服务器'}

# 定义非恶意威胁类型字典

infos = {'Bogon', 'Full Bogon', 'Gateway', 'IDC', 'Dynamic IP', 'Edu', 'DDNS', 'Mobile', 'Search Engine Crawler', 'CDN',
'Advertisement', 'DNS', 'BTtracker', 'Backbone', 'ICP', 'IoT Device', 'Web Plug Deployed', 'Gameserver',
'Whitelist', 'Whitelist', 'Info'}

威胁类型转换

向API接口发起请求后返回的数据格式为json格式,里面包含了所查询IP的威胁类型(详见官方响应示例)属于英文,而输出的结果是需要中文。实现方法就是定义一个转换类型的类进行实现,也包含了排除非恶意类型的函数。

class ThreatType:
    def __init__(self):
        self.threat_type = threat_type
        self.infos = infos

    # 威胁类型字典转换
    def get_threat_type(self, original_list):
        # 使用列表推导式将每个元素替换为相应的中文名称
        new_list = [self.threat_type[item] for item in original_list]
        return new_list

    # 排除非恶意的类型
    def non_malicious(self, info_type):
        # 去除重复的info_type
        # info_type = list(set(str(info_type)))
        # 遍历info_type、infos中的每个值,对比后删除info_type和infos相同的值
        for info in self.infos:
            if info in info_type:
                info_type.remove(info)
        return info_type

调用API接口

参照官方接口文档调用IP信誉、IP分析接口,同时定义状态码及其返回的错误提示。上面介绍到过IP信誉个人每天可以查询50次,当然也可以采用多个账号进行查询(一个账号对应一个key),详细代码如下:

# 调用API接口

class ThreatBookApi:
    def __init__(self):
        # 输入key
        self.apikey = '主key'
        # 定义key列表
        self.apikey_box = ['备key0', '备key1', '备key2', '备key3', '.....']

    # 状态码判断
    def get_code_query(self, text, re_code):
        re_codes = [-1, -2, -3, -5]
        if re_code in re_codes:
            get_json = {'response_code': re_code}
            if get_json['response_code'] == -1:
                print(str(text) + '查询失败,API权限受限或请求出错.')
            if get_json['response_code'] == -2:
                print(str(text) + '查询失败,请求无效.')
            if get_json['response_code'] == -3:
                print(str(text) + '查询失败,请求参数缺失.')
            if get_json['response_code'] == -5:
                print(str(text) + '查询失败,系统错误.')
        else:
            pass

    # IP信誉
    def ip_reputation(self, ip):
        for i in range(len(self.apikey_box)):
            url = "https://api.threatbook.cn/v3/scene/ip_reputation"
            query = {"apikey": self.apikey, "resource": ip}
            response = requests.get(url, params=query)
            if response.json()['response_code'] == -4:
                self.apikey_backup = self.apikey_box[i]
                i -= 1
                query = {"apikey": self.apikey_backup, "resource": ip}
                # self.progress_bar()
                response = requests.get(url, params=query)
                if response.json()['response_code'] == 0:

                    return response.json()
                else:
                    continue
            else:
                return response.json()

    # IP分析
    def ip_query(self, ip):
        url = "https://api.threatbook.cn/v3/ip/query"
        self.apikey = '主key'
        query = {"apikey": self.apikey, "resource": ip}
        response = requests.request("GET", url, params=query)
        if response.json()['response_code'] == -4:
            print('IP分析查询次数已使用完成(共1000次)')
        return response.json()

IP信誉功能

# IP信誉查询
class ThreatReputation:
    def __init__(self, input_ip):
        # 初始化IP
        self.ip = input_ip
        # 调用接口
        self.BookApi = ThreatBookApi()
        # 调用IP信誉函数
        self.ip_json = self.BookApi.ip_reputation(ip=self.ip)
        # 执行函数
        self.threat_reputation()

    def threat_reputation(self):
    # 调用状态码判断,获取状态码并提示信息
    self.BookApi.get_code_query(text='IP信誉', re_code=self.ip_json['response_code'])
    if 'data' not in self.ip_json:
        print('威胁情报查询结束!')
        return None
    # 按照update_time字段对数据进行排序
    sorted_data = sorted(self.ip_json['data'].values(), key=lambda x: x['update_time'], reverse=True)
    # 获取最新的数据并提取所需的字段
    latest_data = sorted_data[0]
    # 可信度。分"low(低)","medium(中)","high(高)" 三档来标识
    confidence_level = latest_data['confidence_level']
    # 提取出威胁类型
    judgments = latest_data['judgments']
    # 是否为恶意IP。布尔类型,true代表恶意,false代表非恶意。
    latest_identity = self.ip_json['data'][self.ip]['is_malicious']
    # 取judgments列表第一位、latest_identity、confidence_level综合判断是否为恶意类型
    if (latest_identity is True) and (judgments[0] in threat_type) and (confidence_level == 'high'):
        # 删除无效的非恶意类型
        ThreatType().non_malicious(info_type=judgments)
        # 调用get_threat_type将intel_types翻译为中文
        type_cn = ThreatType().get_threat_type(original_list=set(judgments))
        print(
            "[IP信誉-可信度:{}]-源IP'{}'属于恶意IP,威胁类型包括'{}';".format(confidence_level, self.ip, ','.join(type_cn)))
    else:
        # 调用get_threat_type将intel_types翻译为中文
        # type_cn = ThreatType().get_threat_type(original_list=set(judgments))
        print("[IP信誉-可信度:{}]-源IP'{}'不属于恶意地址.".format(confidence_level, self.ip))

IP分析功能

# IP分析查询

class ThreatQuery:
    def __init__(self, input_ip):
        # 定义变量
        self.ip = input_ip # 定义IP
        self.BookApi = ThreatBookApi()  # 调用API接口
        self.ip_json = self.BookApi.ip_query(ip=self.ip)  # 调用IP分析函数
        self.threat_type = threat_type
        self.infos = infos
        self.valid_intel = []
        self.valid_box = []
        self.confidence_box = []
        self.intel_types_box = []
        # 执行函数
        self.json_query()

    def json_query(self):
        # 调用状态码判断,获取状态码并提示信息
        self.BookApi.get_code_query(text='IP分析', re_code=self.ip_json['response_code'])
        if 'data' not in self.ip_json:
            print('威胁情报查询结束!')
            return None
        # 遍历键值为False(情报有效)的所有信息
        valid_json = self.ip_json['data'][self.ip]["intelligences"]["threatbook_lab"]
        # 当expired为False时,获取包含confidence,intel_types的值
        for j in valid_json:
            if not j['expired']:
                self.valid_intel.append(
                    {'confidence': j['confidence'], 'intel_types': j['intel_types'],
                     'expired': j['expired']})
        for i in range(len(self.valid_intel)):
            valid = (self.valid_intel[i]['expired'])
            self.valid_box.append(valid)
            confidence = (self.valid_intel[i]['confidence'])
            self.confidence_box.append(confidence)
            intel_types = (self.valid_intel[i]['intel_types'])
            self.intel_types_box.append(intel_types)
        # self.valid_box, self.confidence_box, self.intel_types_box = self.get_param(valid_intel=self.valid_intel)
        box_data = [self.valid_box, self.confidence_box, self.intel_types_box]
        new_box = list(box_data)
        # 获取有效性
        new_valid = new_box[0][0]
        # 获取可信度
        new_confidence = max(set(new_box[1]))
        # 获取威胁类型
        intel_types_1 = new_box[2]
        intel_types_2 = list(set([item for sublist in intel_types_1 for item in sublist]))
        # print(new_valid, new_confidence, intel_types_2)
        # 调用Non_Malicious函数去除非恶意类型
        result = ThreatType().non_malicious(info_type=intel_types_2)
        # 用valid、威胁类型长度综合判断
        if new_valid is False and len(result) != 0:
            # 调用get_threat_type将intel_types翻译为中文
            cn_type = ThreatType().get_threat_type(original_list=set(result))
            # print(cn_type)
            print(
                "[IP分析-可信度:{}%]-源IP'{}'属于恶意IP,威胁类型包括'{}';".format(new_confidence, self.ip, ','.join(cn_type)))
        if len(result) == 0:
            print("[IP分析-可信度:{}%]-源IP'{}'不属于恶意地址.".format(new_confidence, self.ip))

初步运行结果

在项目当前目录下新建test.txt文件,输入需要查询的IP,查询结果如下。
 

image


与在线查询进行多次对比,查询结果基本无较大的差异,当然还是要根据可信度以及该IP的其他行为进行综合分析。
 

image

image

扩展功能

以上的源IP为自动进行录入,回到开头提到的问题在日常网络安全运营工作中成千上万的日志要如何做分析,这里提出以下几点方向:

(1)获取边界安全设备的源IP(含防火墙、WAF、IDP及IPS等);
(2)将获取的源IP参数传递给上述程序进行处理;
(3)确认后联动安全设备进行处理或封禁。
(4)获取源IP时可以从安全设备导出,也可以调用安全设备的API接口进行提取,若采用后者则会面临安全设备API泄露风险,需谨慎进行操作。

image

黑客&网络安全如何学习

今天只要你给我的文章点赞,我私藏的网安学习资料一样免费共享给你们,来看看有哪些东西。

 1.学习路线图 

 攻击和防守要学的东西也不少,具体要学的东西我都写在了上面的路线图,如果你能学完它们,你去就业和接私活完全没有问题。

2.视频教程

网上虽然也有很多的学习资源,但基本上都残缺不全的,这是我自己录的网安视频教程,上面路线图的每一个知识点,我都有配套的视频讲解。

内容涵盖了网络安全法学习、网络安全运营等保测评、渗透测试基础、漏洞详解、计算机基础知识等,都是网络安全入门必知必会的学习内容。

(都打包成一块的了,不能一一展开,总共300多集)

因篇幅有限,仅展示部分资料,需要保存下方图片,微信扫码即可前往获取

3.技术文档和电子书

技术文档也是我自己整理的,包括我参加大型网安行动、CTF和挖SRC漏洞的经验和技术要点,电子书也有200多本,由于内容的敏感性,我就不一一展示了。 

 因篇幅有限,仅展示部分资料,需要保存下方图片,微信扫码即可前往获取

4.工具包、面试题和源码

“工欲善其事必先利其器”我为大家总结出了最受欢迎的几十款款黑客工具。涉及范围主要集中在 信息收集、Android黑客工具、自动化工具、网络钓鱼等,感兴趣的同学不容错过。 

 还有我视频里讲的案例源码和对应的工具包,需要的话也可以拿走。

因篇幅有限,仅展示部分资料,需要保存下方图片,微信扫码即可前往获取

最后就是我这几年整理的网安方面的面试题,如果你是要找网安方面的工作,它们绝对能帮你大忙。

这些题目都是大家在面试深信服、奇安信、腾讯或者其它大厂面试时经常遇到的,如果大家有好的题目或者好的见解欢迎分享。

参考解析:深信服官网、奇安信官网、Freebuf、csdn等

内容特点:条理清晰,含图像化表示更加易懂。

内容概要:包括 内网、操作系统、协议、渗透测试、安服、漏洞、注入、XSS、CSRF、SSRF、文件上传、文件下载、文件包含、XXE、逻辑漏洞、工具、SQLmap、NMAP、BP、MSF…

 因篇幅有限,仅展示部分资料,需要保存下方图片,微信扫码即可前往获取 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1838662.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

彻底卸载Ubuntu双系统

操作系统 文章目录 操作系统前言一、把开机启动项设为默认Windows启动二、删除Ubuntu系统分区三、删除开机启动引导项 前言 我们卸载Ubuntu双系统,可能出于以下原因: 1、Ubuntu系统内核损坏无法正常进入 2、Ubuntu系统分配空间不足,直接扩区…

如何规范信息技术课堂纪律

在信息技术课堂上,为了营造一个良好的学习环境,确保学生能够专注于学习任务,我们需要采取一系列措施来规范课堂纪律。以下是一些具体而详细的建议: 一、明确课堂规则 上课座位固定:学生的座位应固定,未经…

分享一个dnslog在线平台

DNSLog Platform 页面只有两个按钮,点击Get Subdomain可以随机生成一个dnslog 点击Refresh Record,刷新这个dnslog的记录。可以查看到这条dnslog的IP地址和创建时间。

vue elementui table给表格中满足条件的每一条记录添加计时器

需求: 在前端给表格中给满足条件的每一条记录增加一个计时器,用于计算工作时长。 1.数据库中存储的有每条记录的作业开始时间,将当前时间和作业开始时间计算一个差值,作为作业时长的初始值; 2.把满足条件的每条记录绑…

为什么很多Java程序员会下意识觉得Java的就是最好的?

在开始前刚好我有一些资料,是我根据网友给的问题精心整理了一份「Java的资料从专业入门到高级教程」, 点个关注在评论区回复“888”之后私信回复“888”,全部无偿共享给大家!!!做为一个真正热爱编程&#…

信捷PLC与上位机通讯-以太网通讯dll

应用场景 最近做项目,电气部分PLC选用了国产的信捷PLC,需要考虑上位机与信捷PLC通讯的问题,直接读写寄存器或线圈。 解决方案 信捷官网找资料,介绍的各种通讯方法,感觉都不是很好理解,而且也没办法直接拿…

二刷算法训练营Day27 (Day26 休息) | 回溯算法(3/6)

目录 详细布置: 1. 39. 组合总和 2. 40. 组合总和 II 3. 131. 分割回文串 详细布置: 1. 39. 组合总和 给你一个 无重复元素 的整数数组 candidates 和一个目标整数 target ,找出 candidates 中可以使数字和为目标数 target 的 所有 不同…

【数据结构】线性表之《无头单链表》超详细实现

单链表 一.链表的概念及结构二.顺序表与链表的区别与联系三.单链表的实现1.创建单链表2.初始化单链表3.购买节点4.打印单链表5.插入操作1.头插2.尾插3.给定位置之前插入 6.删除操作1.头删2.尾删3.删除给定位置的结点 7.查找数据8.修改数据9.求单链表长度10.清空单链表11.销毁单…

python代码

# 请在______处使用一行代码或表达式替换# 注意:请不要修改其他已给出代码s input("请输入一个字符串:") print("{:*^30}".format(s))# 请在______处使用一行代码或表达式替换 # # 注意:请不要修改其他已给出代码a, b 0, 1 while …

程序员们,能告诉我你们为什么选择arch linux吗?

在开始前刚好我有一些资料,是我根据网友给的问题精心整理了一份「linux的资料从专业入门到高级教程」, 点个关注在评论区回复“888”之后私信回复“888”,全部无偿共享给大家!!! Arch Linux 受到程序员青…

Qt|海康摄像头多个页面展示问题

为大家分享一个使用海康摄像头的小功能,希望对大家有用~ 使用场景: 在程序中多个不同功能页面需要展示摄像头的实时预览画面,该如何高效的展示呢? 对于海康摄像头的实时预览接口调用流程,如下所示: 按照流…

GD32F4xx 移植agile_modbus软件包与电能表通信

目录 1. agile_modbus1.1 简介1.2 下载2. agile_modbus使用2.1 源码目录2.2 移植3. 通信调试3.1 代码3.3 通信测试1. agile_modbus 1.1 简介 agile_modbus是一个轻量级的Modbus协议栈,主要特点: 支持RTU和TCP协议,采用纯C语言开发,不涉及任何硬件接口,可直接在任何形式的…

Java学习 (一) 环境安装及入门程序

一、安装java环境 1、获取软件包 https://www.oracle.com/java/technologies/downloads/ .exe 文件一路装过去就行,最好别装c盘 ,我这里演示的时候是云主机只有C盘 2、配置环境变量 我的电脑--右键属性--高级系统设置--环境变量 在环境变量中添加如下配…

有路网整体布局

有路网地址 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</title><style>.…

昇思25天学习打卡营第1天 | 快速入门

内容介绍&#xff1a;通过MindSpore的API来快速实现一个简单的深度学习模型。 具体内容&#xff1a; 1. 导包 import mindspore from mindspore import nn from mindspore.dataset import vision, transforms from mindspore.dataset import MnistDataset 2. 处理数据 fro…

Gobject tutorial 六

Instantiatable classed types Initialization and destruction 类型的实例化是通过函数g_tpye_create_instance()实现的。这个函数首先会查找与类型相关的GTypeInfo结构体&#xff0c;之后&#xff0c;查询结构体中的instance_size和 instance policy即 n_preallocs(在 2.10版…

Nuxt3页面开发实战探索

title: Nuxt3页面开发实战探索 date: 2024/6/19 updated: 2024/6/19 author: cmdragon excerpt: 摘要&#xff1a;这篇文章是关于Nuxt3页面开发实战探索的。它介绍了Nuxt3的基础入门&#xff0c;安装与配置&#xff0c;项目结构&#xff0c;内置组件与功能&#xff0c;以及页…

持续集成jenkins+gitee

首先要完成gitee部署&#xff0c;详见自动化测试git的使用-CSDN博客 接下来讲如何从git上自动拉取代码&#xff0c;实现jenkins无人值守&#xff0c;定时执行测试&#xff0c;生成测试报告。 需要这三个安装包 由于目前的jenkins需要至少java11到java17的版本&#xff0c;所以…

深度解析消费者最关心的车联网核心问题

随着科技的迅猛发展&#xff0c;车联网&#xff08;V2X&#xff09;或智能网联汽车成为了提供车辆非视距信息的独特解决方案。它们是传感器技术的关键补充&#xff0c;通过车联网&#xff08;V2X&#xff09;&#xff0c;交通工具可以与其他车辆或基础设施进行信息交流。车联网…

upload-labs第十三关教程

upload-labs第十三关教程 第十三关一、源代码分析代码审计 二、绕过分析1&#xff09;0x00绕过a.上传eval.pngb.使用burpsuite进行拦截修改之前&#xff1a;修改之后&#xff1a;进入hex模块&#xff1a; c.放包上传成功&#xff1a; d.使用中国蚁剑进行连接 2&#xff09;%00绕…