引言:
在网络安全攻防实践中,资产测绘是红队作战与蓝队安全运营的第一步,其本质都是通过系统性信息采集实现攻击面管理。
当前普遍存在的痛点在于,当企业级资产规模呈指数级增长时,传统基于规则引擎的低效批量处理方式导致信息价值密度显著下降。
这种情况下,红队人员往往陷入"线索过载"困境——在繁冗的开放数据与私有资产中定位关键攻击向量,因为涉及多维度风险评估(包括资产暴露度、漏洞可利用性、业务关联性等要素)及攻击路径成本核算,其时间成本与人力投入常呈非线性增长。
而对于企业安全运营而言,单纯的资产发现已无法满足风险管理需求。
总的来说,典型挑战体现在三个维度:
-
威胁情报融合:需建立多源异构数据的关联分析机制
-
风险量化评估:构建包含CVSS评分、业务影响因子、修复优先级的动态评估模型
-
防御体系映射:通过攻击链逆向推导实现纵深防御体系的闭环验证
这篇文章所提出的解决方案,是基于AI大模型(以DeepSeek为例)构建智能信息处理引擎,结合Neo4j图数据库实现威胁情报的知识图谱化构建。主要分为两个层面的处理机制:
• 前端:通过AI大模型的上下文感知能力,自动提取与资产属性标签,根据专业经验预测可行的APT攻击模式特征
• 后端:基于Neo4j的图遍历算法生成攻击面拓扑视图,从而支持攻击路径模拟与防御策略推演。
接下来,我们逐步阐明思路和方法。
【注意】
本文所述技术方案均为网络安全领域的学术探讨,旨在促进防御体系的技术演进,严禁任何形式的非法利用。作者及研究团队:
1. 不提供、不支持、不鼓励将文中方法用于未授权测试或攻击行为
2. 不承担因技术误用导致的任何法律及道德责任
3. 不公开任何可能降低攻击门槛的模型细节(如exploit生成模块的奖励函数设计)相关技术实施应严格遵守《网络安全法》《数据安全法》及所在国法律法规,建议在隔离测试环境中验证学术猜想。
4. 技术锋芒的指向应是加固系统而非突破防线——这是所有安全研究者不可逾越的伦理基线。
01
研究背景与价值定位
传统资产测绘的痛点
紧接上文,传统的资产测绘方法面临着诸多挑战,这些挑战严重影响了安全团队的工作效率和效果。具体来说,主要痛点包括:
-
多源数据孤岛难以形成攻击面全景视图:通过对企业内外部的资产进行勘测,多类工具、多种手段往往带来繁荣的数据;另一方面,企业内部通常存在多个不同的数据源,如网络设备日志、应用日志、安全设备日志等。以上的数据往往分散在不同的系统中,形成了数据孤岛,使得安全团队难以整合这些数据,构建一个全面的攻击面视图。
-
人工分析效率低下:在渗透测试项目中,安全团队需要处理大量的资产节点,人工分析不仅耗时费力,还容易出现遗漏和错误,降低了整体工作效率。
本文希望达成的测绘体系的核心价值
为了解决上述痛点,本文探索DeepSeek或AI加持下,帮助红队人员或企业安全团队实现更智能、直观的测绘能力。核心价值主要体现在以下几个方面:
-
红队作战视角:
-
攻击路径预构建:利用知识图谱技术,测绘体系能够自动推导攻击链可能性。这有助于红队提前识别潜在的攻击路径,制定更为有效的防御策略。
-
目标优先级判定:基于动态权重算法,识别出最关键的突破点。这种优先级判定机制使得红队能够集中资源和精力,对最具威胁的目标进行重点防护。
-
成果可视化呈现:系统自动生成符合MITRE ATT&CK框架的战术路径图,直观展示攻击路径和关键节点。这不仅提高了分析结果的可读性,还便于红队成员之间的沟通和协作。
-
-
成本效益分析:
-
扫描耗时降低:通过目标优先级划分与筛选,动态频率调整和协议识别技术,在保证扫描质量的前提下,大幅减少扫描时间。这对于快速响应和处置安全事件具有重要意义。
-
漏洞验证效率提升:借助AI辅助PoC生成技术,系统能够自动化生成漏洞验证代码,助力漏洞验证的效果提升。
-
报告编制辅助:通过资产测绘知识图谱的提供,使用代码或原生指令,快速生成详细的渗透测试逻辑路径,减少人工编写工作。
-
02
隐侠具体实践
接下来,简单打样,调通数据逻辑,简单体现DeepSeek或其他AI大模型赋能的可能性和效果。整体架构为:
graph TD
A[探测层] -->|存活资产| B(知识图谱构建)
A -->|流量特征| C(动态资产捕获)
B --> D{风险计算}
C --> D
D --> E[可视化平台]
在下面的实验环境中,环境:Python 3.11 + Neo4j 5.26.1
步骤一:资产扫描
简单来说,调用nmap库扫描IP资产即可,不管是直接运行nmap还是相关工具或代码,建议都将命令行切到root权限。
# nmap扫描模块
def nmap_scan(target):
nm = nmap.PortScanner()
# 采用混合扫描策略(SYN+ACK+UDP)
nm.scan(hosts=target, arguments='-sS -sU -T4 --min-rate 100')
assets = []
for host in nm.all_hosts():
asset = {
"ip": host,
"mac": nm[host]['addresses'].get('mac', 'Unknown'),
"os": nm[host].get('osmatch', [{}])[0].get('name', 'Unknown'),
"ports": [],
"last_seen": datetime.now().isoformat()
}
for proto in nm[host].all_protocols():
ports = nm[host][proto].keys()
for port in ports:
service = nm[host][proto][port].get('name', 'unknown')
asset["ports"].append({
"port": port,
"protocol": proto,
"service": service,
"version": nm[host][proto][port].get('version', '')
})
assets.append(asset)
return assets
扫描结果:
当然,这里扫描的范围后续根据需要可以扩展到IPv6、域名、主域名等,需要类型判断和处理,这里简单提供示例代码:
import re
import ipaddress
from idna import encode as idna_encode
from typing import Dict, Union
classInvalidTargetException(ValueError):
"""用于标识非法目标格式 """
def__init__(self, message: str):
super().__init__(f"Invalid target format: {message}")
defvalidate_ipv4(addr: str) -> bool:
"""
验证IPv4地址合法性(支持CIDR)
"""
try:
ipaddress.IPv4Network(addr, strict=False)
returnTrue
except ValueError:
returnFalse
defvalidate_ipv6(addr: str) -> bool:
"""
验证IPv6地址合法性(支持缩写格式)
"""
try:
ipaddress.IPv6Network(addr, strict=False)
returnTrue
except ValueError:
returnFalse
defvalidate_domain(domain: str) -> bool:
"""
验证域名合法性(支持国际化域名IDNA)
遵循RFC 5891规范:
- 单个标签长度1~63字符
- 总长度不超过253字符
- 允许字母、数字、连字符(不能以连字符开头/结尾)
"""
try:
# 转换为Punycode格式进行规范验证
encoded = idna_encode(domain).decode('ascii')
except UnicodeError:
returnFalse
pattern = r"^([a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?\.)+[a-z0-9]{2,63}$"
return re.fullmatch(pattern, encoded) isnotNone
deftarget_adapter(target: str) -> Dict[str, Union[str, None]]:
"""
param target: 输入目标(IPv4/IPv6/域名)
"""
# 去除首尾空白及端口号(如果存在)
target = target.strip().split(":", 1)[0]
# 类型判断逻辑
if validate_ipv4(target):
return {
"type": "ipv4",
"value": str(ipaddress.IPv4Network(target, strict=False).network_address)
}
elif validate_ipv6(target):
# IPv6地址规范化(扩展缩写形式)
normalized = ipaddress.IPv6Address(target).exploded
return {
"type": "ipv6",
"value": normalized
}
elif validate_domain(target):
# 国际化域名编码转换
try:
encoded_domain = idna_encode(target).decode('ascii')
return {
"type": "domain",
"value": encoded_domain.lower() # 域名统一小写处理
}
except UnicodeError as e:
raise InvalidTargetException(f"IDNA encoding failed: {e}")
else:
raise InvalidTargetException(f"Unsupported target type: {target}")
# 测试用例
if __name__ == "__main__":
test_cases = [
"192.168.1.1",
"2001:db8::1",
"example.com",
"例子.中国", # 国际化域名
"invalid..domain",
"123.456.789.000"
]
for case in test_cases:
try:
print(f"Input: {case}\nOutput: {target_adapter(case)}\n")
except InvalidTargetException as e:
print(f"Input: {case}\nError: {e}\n")
通过步骤一我们已经实现了对特定目标进行信息搜集,从结果图片就可以看到,已经有大量IP、端口、服务结果。
步骤二:基于DeepSeek/AI做资产风险识别
基于步骤一结果,这边我们基于DeepSeek/AI做资产风险识别,该部分也是本文的核心。
那本部分的核心,就是prompt的设置了,规定数据的输入和输出格式,以便AI大模型更好的进行数据分析。这部分我写的细一些,把思考的过程展示一下。
初始的时候,只考虑了对资产进行分析和风险估算,因此主要内容为:
# 构建结构化提示词
prompt = f"""作为网络安全分析师,请基于以下扫描数据进行深度分析:
## 资产特征
- IP地址:{asset_data['ip']}
- 操作系统:{asset_data['os']}
- 开放端口数:{len(asset_data['ports'])}
- 暴露服务:{', '.join(set(p['service'] for p in asset_data['ports']))}
- 最后活动时间:{asset_data['last_seen']}
## 分析要求
1. 漏洞风险评估(CVSS评分+漏洞类型分类)
2. 建议的ATT&CK攻击路径(包含战术编号)
3. 资产关键性评分(1-10分制,需说明评分依据)"""
这里的资产特征,主要是来源于python使用nmap库扫描完成后的结果输出。
而在AI大模型输出结果后,发现格式不一,因为只给了分析要求,没给结果模板。于是通过反复调试和对接(过程中步骤三已经在尝试了),最后的prompt为:
# 结构化输出模板(强制API返回格式)
ANALYSIS_TEMPLATE = """请严格按照以下Markdown模板生成分析报告:
### 网络安全分析报告
#### 一、漏洞风险评估
{% for vuln in vulnerabilities %}
**{{ loop.index }}. {{ vuln.service }} (端口{{ vuln.port }})**
- 风险等级: {{ vuln.risk_level }}
- CVSS评分: {{ vuln.cvss }} ({{ vuln.vector }})
- 漏洞类型: {{ vuln.types|join('、') }}
{% endfor %}
#### 二、建议的ATT&CK攻击路径
{% for tactic in tactics %}
**战术编号**: {{ tactic.id }} - {{ tactic.name }}
- 描述: {{ tactic.description }}
{% endfor %}
#### 三、资产关键性评分
**评分**: {{ criticality }}/10
**评分依据**:
{% for reason in criticality_reasons %}
{{ loop.index }}. {{ reason }}
{% endfor %}"""
prompt = f"""作为网络安全分析师,请严格按模板分析以下数据:
### 输入数据
- IP:{asset_data['ip']}
- 操作系统:{asset_data['os']}
- 开放端口:{', '.join(f"{p['port']}/{p['protocol']}({p['service']})"for p in asset_data['ports'])}
- 最后活动时间:{asset_data['last_seen']}
### 分析要求
{ANALYSIS_TEMPLATE}"""
在AI大模型的选择方面,其实DeepSeek、ChatGPT还是混元大模型等等,没什么太大的区别,都是通用AI大模型,关键还是咱上面prompt设置的好。
调用DeepSeek的api,进行分析:
# DeepSeek连接
defdeepseek_analysis(asset_data) -> str:
# 初始化官方SDK客户端
client = OpenAI(
api_key=DEEPSEEK_API_KEY,
base_url="https://api.deepseek.com/v1",
timeout=30
)
# 构建结构化提示词
prompt = f"""作为网络安全分析师,请基于以下扫描数据进行深度分析:
## 资产特征
- IP地址:{asset_data['ip']}
- 操作系统:{asset_data['os']}
- 开放端口数:{len(asset_data['ports'])}
- 暴露服务:{', '.join(set(p['service'] for p in asset_data['ports']))}
- 最后活动时间:{asset_data['last_seen']}
## 分析要求
1. 漏洞风险评估(CVSS评分+漏洞类型分类)
2. 建议的ATT&CK攻击路径(包含战术编号)
3. 资产关键性评分(1-10分制,需说明评分依据)"""
#print(prompt)
try:
response = client.chat.completions.create(
model="deepseek-chat",
messages=[
{"role": "system", "content": "你是一名经验丰富的网络安全分析师,擅长漏洞挖掘和攻击路径分析"},
{"role": "user", "content": prompt}
],
temperature=0.2,
max_tokens=1500,
stream=False
)
# 增强响应解析健壮性
if response.choices and response.choices[0].message:
return response.choices[0].message.content
else:
raise ValueError("Empty response from API")
except Exception as e:
# 异常处理
error_msg = f"DeepSeek API调用失败: {str(e)}"
print(error_msg)
return error_msg
余额不足,报错:
暂时无法充值:
那么,这里转而使用腾讯混元大模型的api,用的是OpenAI的SDK:
defhunyuan_analysis(asset_data) -> str:
# 初始化OpenAI兼容客户端
client = OpenAI(
api_key=HUNYUAN_API_KEY,
base_url="https://api.hunyuan.cloud.tencent.com/v1", # 官方API端点
timeout=30# 网络超时保护
)
# 结构化提示词
prompt = f"""作为网络安全分析师,请基于以下扫描数据进行深度分析:
## 资产特征
- IP地址:{asset_data['ip']}
- 操作系统:{asset_data['os']}
- 开放端口数:{len(asset_data['ports'])}
- 暴露服务:{', '.join(set(p['service'] for p in asset_data['ports']))}
- 最后活动时间:{asset_data['last_seen']}
## 分析要求
1. 漏洞风险评估(CVSS评分+漏洞类型分类)
2. 建议的ATT&CK攻击路径(包含战术编号)
3. 资产关键性评分(1-10分制,需说明评分依据)"""
try:
response = client.chat.completions.create(
model="hunyuan-pro", # 付费模型(免费可用hunyuan-lite[1](@ref))
messages=[
{"role": "system", "content": "你是一名经验丰富的网络安全分析师,擅长漏洞挖掘和攻击路径分析"},
{"role": "user", "content": prompt}
],
temperature=0.3, # 平衡严谨性与创造性
max_tokens=1200,
extra_body={ # 腾讯增强参数
"enable_enhancement": True, # 启用知识检索增强
"enable_speed_search": False# 关闭极速模式保证准确性
}
)
# 增强型响应解析
if response.choices and response.choices[0].message:
return response.choices[0].message.content
else:
raise ValueError("API返回空响应")
except Exception as e:
在使用最初的prompt时,得到如下响应:
分析报告:### 网络安全分析报告
#### 一、漏洞风险评估
**1.1 MySQL (开放端口)**
- **风险等级**: 高
- **CVSS评分**: 8.0 (CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:C/C:H/I:H/A:H)
- **漏洞类型**: 未授权访问、SQL注入
**1.2 HTTPS (开放端口)**
- **风险等级**: 中
- **CVSS评分**: 6.5 (CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:C/C:L/I:L/A:N)
- **漏洞类型**: 中间人攻击(MITM)、弱加密
**1.3 HTTP (开放端口)**
- **风险等级**: 高
- **CVSS评分**: 7.5 (CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:C/C:H/I:H/A:N)
- **漏洞类型**: 跨站脚本攻击(XSS)、跨站请求伪造(CSRF)
**1.4 Intermapper (开放端口)**
- **风险等级**: 低
- **CVSS评分**: 4.0 (CVSS:3.0/AV:N/AC:M/PR:N/UI:N/S:U/C:N/I:N/A:N)
- **漏洞类型**: 信息泄露
**1.5 HTTP-Proxy (开放端口)**
- **风险等级**: 中
- **CVSS评分**: 6.5 (CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:C/C:L/I:L/A:N)
- **漏洞类型**: 代理绕过、未授权访问
**1.6 FTP (开放端口)**
- **风险等级**: 高
- **CVSS评分**: 8.5 (CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:C/C:H/I:H/A:H)
- **漏洞类型**: 未授权访问、弱密码
#### 二、建议的ATT&CK攻击路径
**T1190 - 利用目标共享**
- 攻击者通过未授权访问FTP服务获取敏感文件。
**T1203 - Exploit Public-Facing Application**
- 利用HTTP/HTTPS服务的漏洞(如XSS、CSRF)进行攻击。
**T1192 - 利用目标协议漏洞**
- 利用Intermapper或HTTP-Proxy的协议漏洞进行信息收集或未授权访问。
**T1206 - Exploit SQL Database**
- 通过MySQL的未授权访问或SQL注入漏洞获取数据库中的敏感数据。
**T1021.002 - 远程桌面协议(RDP)**
- 虽然未直接提及RDP,但攻击者可能利用其他服务的漏洞获取RDP访问权限。
#### 三、资产关键性评分
**评分**: 8分
**评分依据**:
1. **开放端口多**: 共有6个开放端口,提供了多个潜在的攻击面。
2. **高风险服务多**: 包括MySQL、HTTP、FTP等高风险服务,这些服务历史上常见多种漏洞。
3. **敏感服务暴露**: HTTPS和HTTP服务的暴露增加了数据传输过程中的风险。
4. **未知操作系统**: 操作系统的不确定性增加了评估难度,可能隐藏更多未知风险。
5. **最后活动时间较近**: 资产在2025年仍有活动,表明其仍在使用中,攻击价值较高。
### 总结
该资产存在多个高风险漏洞,建议立即进行详细的漏洞扫描和渗透测试,并采取相应的安全措施,如修补已知漏洞、加强访问控制、定期更新系统和软件等。同时,建议对资产的关键性进行持续监控和评估,确保网络安全。
sh-3.2#
获得响应后,对markdown文档进行解析,解析代码为:
def_parse_hunyuan_response(text: str) -> Dict:
"""解析结构化响应"""
# 漏洞解析
vuln_pattern = (
r"\*\*(\d+)\.\s*(.+?)\s*$端口(\d+)$\*\*\s*\n"# 允许空格和换行差异
r"-+\s*风险等级\s*[:-]+\s*(\S+)\s*\n" # 兼容中英文冒号
r"-+\s*CVSS评分\s*[:-]+\s*([\d.]+)\s*$([^)]+)$\s*\n"
r"-+\s*漏洞类型\s*[:-]+\s*(.+?)\s*(\n|$)"
)
print("[DEBUG] 原始分析报告:\n", text)
matches = re.findall(vuln_pattern, text)
print("[DEBUG] 匹配到的漏洞条目:", matches)
vulnerabilities = [
{
"service": match[1],
"port": int(match[2]),
"risk_level": match[3],
"cvss": float(match[4]),
"vector": match[5],
"types": match[6].split("、")
} for match in re.findall(vuln_pattern, text)
]
# 攻击路径解析
tactic_pattern = r"\*\*战术编号\*\*: (T\d+) - (.+?)\n- 描述: (.+)"
tactics = [
{
"id": match[0],
"name": match[1],
"description": match[2]
} for match in re.findall(tactic_pattern, text)
]
# 关键性评分解析
criticality_match = re.search(r"评分: (\d+)/10", text)
criticality = int(criticality_match.group(1)) if criticality_match else5
return {
"vulnerabilities": vulnerabilities,
"tactics": tactics,
"criticality": criticality
}
最终得到结构化的分析数据:
{'vulnerabilities': [], 'tactics': [{'id': 'T1190', 'name': '利用FTP', 'description': '攻击者通过未授权访问或弱密码获取FTP服务器的控制权,进而进行数据窃取或上传恶意文件。'}, {'id': 'T1192', 'name': '利用HTTP', 'description': '攻击者通过SQL注入、XSS等漏洞获取敏感信息或执行恶意代码。'}, {'id': 'T1195', 'name': '利用HTTPS', 'description': '攻击者通过中间人攻击或证书伪造获取加密数据的明文内容。'}, {'id': 'T1203', 'name': '利用MySQL', 'description': '攻击者通过未授权访问或SQL注入获取数据库中的敏感信息。'}, {'id': 'T1190', 'name': '利用Intermapper', 'description': '攻击者通过信息泄露获取系统配置或网络拓扑信息,为进一步攻击做准备。'}], 'criticality': 5}
因为nmap所扫描的漏洞比较局限,对于最新关注的漏洞的扫描能力是缺失的,因此上方结果中漏洞内容为空,只有可能能产生危害的攻击策略或者风险的防护策略,即tactics。
步骤三:数据存储与展示
当我们已经通过AI大模型得到分析后,即可根据现有结构化数据,做数据输入。
这里简单介绍Neo4j的使用,如果在本地实验,下载Neo4j Desktop,起一个本地数据库即可:
检查Neo4j连通性:
import nmap
import requests
from neo4j import GraphDatabase
from datetime import datetime
# 配置参数
#DEEPSEEK_API_KEY = "your_deepseek_api_key"
NEO4J_URI = "bolt://localhost:7687"
NEO4J_USER = "neo4j"
NEO4J_PASSWORD = "your_password"
# 测试neo4j连通性
deftest_neo4j_connection():
try:
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
with driver.session() as session:
# 版本监测
version_data = session.execute_read(
lambda tx: list(tx.run("CALL dbms.components() YIELD versions RETURN versions[0] AS version"))
)
node_data = session.execute_read(
lambda tx: list(tx.run("MATCH (n) RETURN count(n) AS count"))
)
# 分析数据
db_version = version_data[0]["version"]
node_count = node_data[0]["count"]
print("\n=== Neo4j连接状态 ===")
print(f"连接成功!服务状态:正常")
print(f"数据库版本: {db_version}")
print(f"当前节点总数: {node_count}")
print("====================")
except Exception as e:
print(f"\n!!! 连接异常: {str(e)}")
if"Unable to connect"in str(e):
print("⇒ 检查Neo4j服务是否启动(neo4j.bat console)")
elif"authentication failure"in str(e):
print("⇒ 密码错误,尝试重置: ALTER USER neo4j SET PASSWORD 'newpassword'")
test_neo4j_connection()
正常情况下,输出结果为:
接着我们只需要设计节点类型和关系类型,目前根据我们有的数据,可以创建ip(资产)、端口服务、漏洞、攻击策略等类型节点,响应就可创建“ip拥有端口服务”、“端口服务存在漏洞”和“针对该ip有哪些攻击策略”的关系。
坦言之,我好久没碰Neo4j和正则了,上文的代码和下面的代码我整了两天。。
我有点没眼看了,少侠们根据我的注释即可了解代码逻辑。
classThreatIntelligenceGraph:
def__init__(self):
self.driver = GraphDatabase.driver(
NEO4J_URI,
auth=(NEO4J_USER, NEO4J_PASSWORD),
encrypted=False,
max_connection_pool_size=20
)
self._clean_legacy_indexes()
self._init_constraints()
def_clean_legacy_indexes(self):
"""清理与约束冲突的遗留索引"""
with self.driver.session() as session:
# 合规查询
result = session.run("""
SHOW INDEXES
YIELD name, type, labelsOrTypes, properties
WHERE labelsOrTypes IN ['Asset', 'Port', 'Vulnerability']
AND (
properties = ['ip']
OR properties = ['number', 'protocol']
OR properties = ['cve_id']
)
""")
for record in result:
print(f"删除遗留索引: {record['name']}")
session.run(f"DROP INDEX {record['name']}")
def_init_constraints(self):
"""创建带名称的约束"""
with self.driver.session() as session:
session.run("""
CREATE CONSTRAINT asset_ip_unique IF NOT EXISTS
FOR (a:Asset)
REQUIRE a.ip IS UNIQUE
""")
session.run("""
CREATE CONSTRAINT port_identifier IF NOT EXISTS
FOR (p:Port)
REQUIRE (p.number, p.protocol) IS UNIQUE
""")
session.run("""
CREATE CONSTRAINT vuln_cve_unique IF NOT EXISTS
FOR (v:Vulnerability)
REQUIRE v.cve_id IS UNIQUE
""")
session.run("""
CREATE CONSTRAINT tactic_id_unique IF NOT EXISTS
FOR (t:AttackTactic)
REQUIRE t.id IS UNIQUE
""")
defstore_analysis(self, asset: Dict, analysis: Dict):
"""全量存储入口"""
with self.driver.session() as session:
# 资产节点
session.execute_write(
self._merge_asset,
asset["ip"],
asset["os"],
analysis["criticality"],
asset["last_seen"]
)
# 端口及漏洞
for port in asset["ports"]:
session.execute_write(
self._link_port,
asset["ip"],
port["port"],
port["protocol"],
port["service"]
)
for vuln in analysis.get("vulnerabilities", []):
if vuln["port"] == port["port"]:
session.execute_write(
self._link_vulnerability,
port["port"],
port["protocol"],
vuln
)
for tactic in analysis.get("tactics", []):
session.execute_write(
self._link_tactic,
asset['ip'],
tactic
)
# 攻击路径
for tactic in analysis.get("tactics", []):
session.execute_write(
self._link_attack_tactic,
tactic,
[v["types"] for v in analysis["vulnerabilities"]]
)
@staticmethod
def_merge_asset(tx, ip: str, os: str, criticality: int, last_seen: str):
tx.run("""
MERGE (a:Asset {ip: $ip})
SET a.os = $os,
a.criticality = $criticality,
a.last_seen = datetime($last_seen),
a.updated = datetime()
""",
ip=ip, os=os, criticality=criticality, last_seen=last_seen)
@staticmethod
def_link_port(tx, ip: str, port: int, protocol: str, service: str):
tx.run("""
MATCH (a:Asset {ip: $ip})
MERGE (p:Port {number: $port, protocol: $protocol})
SET p.service = $service
MERGE (a)-[r:HAS_PORT]->(p)
""",
ip=ip, port=port, protocol=protocol, service=service)
@staticmethod
def_link_vulnerability(tx, port: int, protocol: str, vuln: Dict):
"""漏洞关系创建"""
try:
# 设置ID
cve_id = f"DFYX-VUL-{datetime.now().year}-{port}-{protocol.upper()}"
tx.run("""
// 确保Port存在
MERGE (p:Port {number: $port, protocol: $protocol})
// 创建或更新漏洞
MERGE (v:Vulnerability {cve_id: $cve_id})
ON CREATE SET
v.risk_level = $risk,
v.cvss_score = $cvss,
v.vector = $vector,
v.types = $types
ON MATCH SET
v.last_updated = datetime()
// 创建关系
MERGE (p)-[r:HAS_VULNERABILITY]->(v)
""",
port=port,
protocol=protocol,
cve_id=cve_id,
risk=vuln["risk_level"],
cvss=vuln["cvss"],
vector=vuln["vector"],
types=vuln["types"])
print(f"成功创建漏洞关系: {port}/{protocol} → {cve_id}")
except Exception as e:
print(f"关系创建失败: {str(e)}")
raise
@staticmethod
def_link_tactic(tx, ip: int, tactic: Dict):
"""漏洞关系创建"""
try:
tx.run("""
// 确保ip存在
MERGE (a:Asset {ip: $ip})
// 创建或更新策略
MERGE (t:AttackTactic {id: $id})
ON CREATE SET
t.name = $name,
t.description = $description
ON MATCH SET
t.last_updated = datetime()
// 创建关系
MERGE (a)-[r:HAS_TACTIC]->(t)
""",
ip=ip,
id=tactic["id"],
name=tactic["name"],
description=tactic["description"]
)
#print("成功创建TACTIC关系")
except Exception as e:
print(f"关系创建失败: {str(e)}")
raise
# 正则表达式
def_parse_hunyuan_response(text: str) -> Dict:
# 漏洞解析
vuln_pattern = r"\*\*(\d+)\. (.+?) $端口(\d+)$\*\*\n- 风险等级: (.+?)\n- CVSS评分: (.+?) $(.+?)$\n- 漏洞类型: (.+)"
vulnerabilities = [
{
"service": match[1],
"port": int(match[2]),
"risk_level": match[3],
"cvss": float(match[4]),
"vector": match[5],
"types": match[6].split("、")
} for match in re.findall(vuln_pattern, text)
]
@staticmethod
def_link_attack_tactic(tx, tactic: Dict, vuln_types: List[List[str]]):
tx.run("""
MERGE (t:AttackTactic {id: $id})
SET t.name = $name,
t.description = $desc,
t.last_observed = datetime()
WITH t
UNWIND $vuln_types AS types
MATCH (v:Vulnerability)
WHERE ANY(type IN v.types WHERE type IN types)
MERGE (t)-[r:EXPLOITS]->(v)
""",
id=tactic["id"],
name=tactic["name"],
desc=tactic["description"],
vuln_types=vuln_types)
效果咋样呢,请看:
从图中可以清晰查看当前ip开放了什么端口,ip有哪些攻击策略,因为此时没有漏洞数据所以没有出现漏洞类型节点,另外ip和端口的关系、ip和攻击策略之间的关系也很清晰,所有关系、节点都可以查看详细的属性值。
当然,攻击策略和端口关联会更好,我实在不想去调整prompt和正则了,太痛苦了。
另外,当前模型未建立"攻击战术→漏洞类型"的关系,后续可以补充CYPHER语句:
MATCH (t:AttackTactic), (v:Vulnerability)
WHEREANY(type IN v.types WHERE type IN t.required_conditions)
MERGE (t)-[r:EXPLOITS]->(v)
现有criticality评分未体现在节点属性,也可以在Neo4j中设置节点大小映射:
:paramcriticality_scale: {8: 2.0, 5: 1.5, 3: 1.0}
MATCH (a:Asset) SET a.size = $criticality_scale[a.criticality]
步骤四:整体连通
那么整体效果咋样呢,拭目以待,我们清除一下测试数据,正式扫描个C端看看:
最终代码为:
import nmap
import requests
from neo4j import GraphDatabase
from datetime import datetime
from openai import OpenAI
import os
import re
from typing import Dict, Any, List
# 配置参数(建议使用环境变量)
HUNYUAN_API_KEY = "API_KEY_value"
NEO4J_URI = "bolt://localhost:7687"
NEO4J_USER = "neo4j"
NEO4J_PASSWORD = "PASSWORD_value"
# nmap扫描模块
def nmap_scan(target):
nm = nmap.PortScanner()
# 采用混合扫描策略(SYN+ACK+UDP)
nm.scan(hosts=target, arguments='-sS -sU -T4 --min-rate 100')
assets = []
for host in nm.all_hosts():
asset = {
"ip": host,
"mac": nm[host]['addresses'].get('mac', 'Unknown'),
"os": nm[host].get('osmatch', [{}])[0].get('name', 'Unknown'),
"ports": [],
"last_seen": datetime.now().isoformat()
}
for proto in nm[host].all_protocols():
ports = nm[host][proto].keys()
for port in ports:
service = nm[host][proto][port].get('name', 'unknown')
asset["ports"].append({
"port": port,
"protocol": proto,
"service": service,
"version": nm[host][proto][port].get('version', '')
})
assets.append(asset)
return assets
# 结构化输出模板(强制API返回格式)
ANALYSIS_TEMPLATE = """请严格按照以下Markdown模板生成分析报告:
### 网络安全分析报告
#### 一、漏洞风险评估
{% for vuln in vulnerabilities %}
**{{ loop.index }}. {{ vuln.service }} (端口{{ vuln.port }})**
- 风险等级: {{ vuln.risk_level }}
- CVSS评分: {{ vuln.cvss }} ({{ vuln.vector }})
- 漏洞类型: {{ vuln.types|join('、') }}
{% endfor %}
#### 二、建议的ATT&CK攻击路径
{% for tactic in tactics %}
**战术编号**: {{ tactic.id }} - {{ tactic.name }}
- 描述: {{ tactic.description }}
{% endfor %}
#### 三、资产关键性评分
**评分**: {{ criticality }}/10
**评分依据**:
{% for reason in criticality_reasons %}
{{ loop.index }}. {{ reason }}
{% endfor %}"""
# 腾讯混元API增强调用
def hunyuan_analysis(asset_data: Dict) -> Dict:
"""结构化漏洞分析"""
client = OpenAI(
api_key=HUNYUAN_API_KEY,
base_url="https://api.hunyuan.cloud.tencent.com/v1",
timeout=30
)
# 强制结构化提示词
prompt = f"""作为网络安全分析师,请严格按模板分析以下数据:
### 输入数据
- IP:{asset_data['ip']}
- 操作系统:{asset_data['os']}
- 开放端口:{', '.join(f"{p['port']}/{p['protocol']}({p['service']})" for p in asset_data['ports'])}
- 最后活动时间:{asset_data['last_seen']}
### 分析要求
{ANALYSIS_TEMPLATE}"""
try:
response = client.chat.completions.create(
model="hunyuan-pro",
messages=[
{"role": "system", "content": "你是一名严格遵循输出格式的网络安全专家"},
{"role": "user", "content": prompt}
],
temperature=0.1, # 降低随机性
max_tokens=1500,
extra_body={"enable_enhancement": True}
)
return _parse_hunyuan_response(response.choices[0].message.content)
except Exception as e:
return {"error": str(e)}
def _parse_hunyuan_response(text: str) -> Dict:
"""解析结构化响应"""
# 漏洞解析
vuln_pattern = (
r"\*\*(\d+)\.\s*(.+?)\s*$端口(\d+)$\*\*\s*\n" # 允许空格和换行差异
r"-+\s*风险等级\s*[:-]+\s*(\S+)\s*\n" # 兼容中英文冒号
r"-+\s*CVSS评分\s*[:-]+\s*([\d.]+)\s*$([^)]+)$\s*\n"
r"-+\s*漏洞类型\s*[:-]+\s*(.+?)\s*(\n|$)"
)
#print("[DEBUG] 原始分析报告:\n", text)
matches = re.findall(vuln_pattern, text)
print("[DEBUG] 匹配到的漏洞条目:", matches)
vulnerabilities = [
{
"service": match[1],
"port": int(match[2]),
"risk_level": match[3],
"cvss": float(match[4]),
"vector": match[5],
"types": match[6].split("、")
} for match in re.findall(vuln_pattern, text)
]
# 攻击路径解析
tactic_pattern = r"\*\*战术编号\*\*: (T\d+) - (.+?)\n- 描述: (.+)"
tactics = [
{
"id": match[0],
"name": match[1],
"description": match[2],
} for match in re.findall(tactic_pattern, text)
]
# 关键性评分解析
criticality_match = re.search(r"评分: (\d+)/10", text)
criticality = int(criticality_match.group(1)) if criticality_match else 5
return {
"vulnerabilities": vulnerabilities,
"tactics": tactics,
"criticality": criticality
}
# Neo4j增强存储模块
class ThreatIntelligenceGraph:
def __init__(self):
self.driver = GraphDatabase.driver(
NEO4J_URI,
auth=(NEO4J_USER, NEO4J_PASSWORD),
encrypted=False,
max_connection_pool_size=20
)
self._clean_legacy_indexes()
self._init_constraints()
def _clean_legacy_indexes(self):
"""清理与约束冲突的遗留索引"""
with self.driver.session() as session:
# 合规查询
result = session.run("""
SHOW INDEXES
YIELD name, type, labelsOrTypes, properties
WHERE labelsOrTypes IN ['Asset', 'Port', 'Vulnerability']
AND (
properties = ['ip']
OR properties = ['number', 'protocol']
OR properties = ['cve_id']
)
""")
for record in result:
print(f"删除遗留索引: {record['name']}")
session.run(f"DROP INDEX {record['name']}")
def _init_constraints(self):
"""创建带名称的约束"""
with self.driver.session() as session:
session.run("""
CREATE CONSTRAINT asset_ip_unique IF NOT EXISTS
FOR (a:Asset)
REQUIRE a.ip IS UNIQUE
""")
session.run("""
CREATE CONSTRAINT port_identifier IF NOT EXISTS
FOR (p:Port)
REQUIRE (p.number, p.protocol) IS UNIQUE
""")
session.run("""
CREATE CONSTRAINT vuln_cve_unique IF NOT EXISTS
FOR (v:Vulnerability)
REQUIRE v.cve_id IS UNIQUE
""")
session.run("""
CREATE CONSTRAINT tactic_id_unique IF NOT EXISTS
FOR (t:AttackTactic)
REQUIRE t.id IS UNIQUE
""")
def store_analysis(self, asset: Dict, analysis: Dict):
"""全量存储入口"""
with self.driver.session() as session:
# 资产节点
session.execute_write(
self._merge_asset,
asset["ip"],
asset["os"],
analysis["criticality"],
asset["last_seen"]
)
# 端口及漏洞
for port in asset["ports"]:
session.execute_write(
self._link_port,
asset["ip"],
port["port"],
port["protocol"],
port["service"]
)
for vuln in analysis.get("vulnerabilities", []):
if vuln["port"] == port["port"]:
session.execute_write(
self._link_vulnerability,
port["port"],
port["protocol"],
vuln
)
for tactic in analysis.get("tactics", []):
session.execute_write(
self._link_tactic,
asset['ip'],
tactic
)
# 攻击路径
for tactic in analysis.get("tactics", []):
session.execute_write(
self._link_attack_tactic,
tactic,
[v["types"] for v in analysis["vulnerabilities"]]
)
@staticmethod
def _merge_asset(tx, ip: str, os: str, criticality: int, last_seen: str):
tx.run("""
MERGE (a:Asset {ip: $ip})
SET a.os = $os,
a.criticality = $criticality,
a.last_seen = datetime($last_seen),
a.updated = datetime()
""",
ip=ip, os=os, criticality=criticality, last_seen=last_seen)
@staticmethod
def _link_port(tx, ip: str, port: int, protocol: str, service: str):
tx.run("""
MATCH (a:Asset {ip: $ip})
MERGE (p:Port {number: $port, protocol: $protocol})
SET p.service = $service
MERGE (a)-[r:HAS_PORT]->(p)
""",
ip=ip, port=port, protocol=protocol, service=service)
@staticmethod
def _link_vulnerability(tx, port: int, protocol: str, vuln: Dict):
"""漏洞关系创建"""
try:
# 生成更合理的ID(加入协议)
cve_id = f"DFYX-VUL-{datetime.now().year}-{port}-{protocol.upper()}"
tx.run("""
// 确保Port存在
MERGE (p:Port {number: $port, protocol: $protocol})
// 创建或更新漏洞
MERGE (v:Vulnerability {cve_id: $cve_id})
ON CREATE SET
v.risk_level = $risk,
v.cvss_score = $cvss,
v.vector = $vector,
v.types = $types
ON MATCH SET
v.last_updated = datetime()
// 创建关系
MERGE (p)-[r:HAS_VULNERABILITY]->(v)
""",
port=port,
protocol=protocol,
cve_id=cve_id,
risk=vuln["risk_level"],
cvss=vuln["cvss"],
vector=vuln["vector"],
types=vuln["types"])
print(f"成功创建漏洞关系: {port}/{protocol} → {cve_id}")
except Exception as e:
print(f"关系创建失败: {str(e)}")
raise
@staticmethod
def _link_tactic(tx, ip: int, tactic: Dict):
"""策略关系创建"""
try:
tx.run("""
// 确保ip存在
MERGE (a:Asset {ip: $ip})
// 创建或更新策略
MERGE (t:AttackTactic {id: $id})
ON CREATE SET
t.name = $name,
t.description = $description
ON MATCH SET
t.last_updated = datetime()
// 创建关系
MERGE (a)-[r:HAS_TACTIC]->(t)
""",
ip=ip,
id=tactic["id"],
name=tactic["name"],
description=tactic["description"]
)
#print("成功创建TACTIC关系")
except Exception as e:
print(f"关系创建失败: {str(e)}")
raise
# 修正正则表达式
def _parse_hunyuan_response(text: str) -> Dict:
# 漏洞解析(修正$符号问题)
vuln_pattern = r"\*\*(\d+)\. (.+?) $端口(\d+)$\*\*\n- 风险等级: (.+?)\n- CVSS评分: (.+?) $(.+?)$\n- 漏洞类型: (.+)"
vulnerabilities = [
{
"service": match[1],
"port": int(match[2]),
"risk_level": match[3],
"cvss": float(match[4]),
"vector": match[5],
"types": match[6].split("、")
} for match in re.findall(vuln_pattern, text)
]
@staticmethod
def _link_attack_tactic(tx, tactic: Dict, vuln_types: List[List[str]]):
tx.run("""
MERGE (t:AttackTactic {id: $id})
SET t.name = $name,
t.description = $desc,
t.last_observed = datetime()
WITH t
UNWIND $vuln_types AS types
MATCH (v:Vulnerability)
WHERE ANY(type IN v.types WHERE type IN types)
MERGE (t)-[r:EXPLOITS]->(v)
""",
id=tactic["id"],
name=tactic["name"],
desc=tactic["description"],
vuln_types=vuln_types)
# 主流程
if __name__ == "__main__":
# 模拟扫描数据
assets = nmap_scan("xx.xx.xx.xx/24")
for asset in assets:
# 获取分析结果
analysis_result = hunyuan_analysis(asset)
print(analysis_result)
if "error" not in analysis_result:
# 存储到知识图谱
graph = ThreatIntelligenceGraph()
graph.store_analysis(asset, analysis_result)
print("数据存储成功!")
else:
print(f"分析失败:{analysis_result['error']}")
04
资产与威胁图谱应用
其实这里就是畅想应用场景,以此也能得出后续的针对性补强动作需求。
场景一:基于高危端口清单梳理当前资产暴露面
以22端口、3306端口为例,企业安全管理中一般禁止这样的高危服务端口对公网开放,容易遭受暴力破解、密码泄漏、数据泄露等攻击或损失,因此可以通过筛选条件,直接得到高危服务子图:
场景二:构建动态的渗透攻击序列模型
基于场景一的工作来做也可以,或者按照渗透的思路,比如我现在需要提取所有web资产,因为当前没有漏洞扫描的结果,那就下一步导出结果给nuclei进行扫描:
值得注意的是,具体ip或者端口与Tactic(攻击策略)也有关联,利用策略可以减少无谓的无用功,针对可行方向进行针对性漏洞扫描或者渗透测试,类似于根据指纹匹配对应的PoCs来进行测试。
漏洞扫描完成后,可再次利用AI大模型对识别出的脆弱点进行智能化的攻击路径编排。具体而言,将结合漏洞的CVSS评分、利用可行性、资产权重(如业务系统等级、数据敏感度)及网络拓扑关系,构建动态的渗透攻击序列模型。
通过集成MITRE ATT&CK知识库的攻击模式特征,采用相关机器学习算法对攻击路径进行概率推演,智能生成包含横向移动策略、权限提升路线和载荷注入节点的渗透方案。还可通过prompt指定匹配Metasploit、Cobalt Strike等工具链的攻击模块,预演漏洞组合利用的叠加效应,最终输出包含POC验证脚本、修复优先级建议和应急响应预案的渗透测试报告,实现从脆弱性识别到攻击模拟验证的闭环安全处置流程。
通过一些公开的渗透文章为例,可以有如下应用案例:
实例1:Web应用漏洞链式攻击
漏洞扫描结果发现电商系统存在:
-
Apache Struts2 RCE(CVE-2023-XXXX,CVSS 9.8)
-
后台管理弱口令(admin/admin)
-
PostgreSQL数据库未授权访问(CVE-2022-4567,CVSS 8.5)
智能化编排过程
-
优先级判定
-
根据CVSS评分和资产权重(电商系统属于核心业务),优先利用Struts2 RCE漏洞
-
结合MITRE ATT&CK T1190(漏洞利用)和T1059(命令执行)生成攻击链
-
-
攻击路径推演
# 计算攻击成功率
if 漏洞可利用性 > 90% and 目标在DMZ区:
生成Payload: msfvenom反向Shell → 植入Cobalt Strike Beacon
if 获取Web服务器权限:
探测内网PostgreSQL数据库(T1046网络发现)
利用弱口令横向移动(T1078合法凭证)
3. 工具链自动化
-
-
Metasploit调用
exploit/multi/http/struts2_code_exec
模块 -
通过数据库未授权访问执行
pg_read_file('/etc/passwd')
提取敏感信息 -
输出POC验证脚本:自动生成Struts2漏洞的HTTP请求包和数据库爆破脚本
-
实例2:内网横向移动渗透
初始入口通过边界打印机服务漏洞(CVE-2022-XXXX,CVSS 7.2)获取内网据点
智能化决策过程
-
网络拓扑分析
-
识别目标为研发部门子网(资产权重高)
-
检测到子网间存在防火墙隔离(仅开放SMB端口)
-
-
攻击链生成
攻击序列:
1. 打印机漏洞获取立足点(Initial Access)
2. 使用PsExec横向移动(T1570 Lateral Tool Transfer)
3. 窃取研发服务器RDP凭据(T1552 Credential Access)
4. 通过Mimikatz注入域控(T1484 Domain Policy Modification)
3.组合漏洞利用
-
-
结合打印机服务的低危漏洞(CVSS 7.2)与域控服务器的Kerberos协议漏洞(CVSS 8.8)
-
自动化生成Kerberoasting攻击脚本
-
实例3:供应链攻击模拟
漏洞环境某企业OA系统存在:
-
第三方组件漏洞(CVE-2023-XXXX,CVSS 9.0)
-
文件上传功能未鉴权(CVSS 8.0)
-
开发环境与生产环境未隔离
渗透方案生成
-
ATT&CK战术映射
-
初始访问(T1195):通过供应链组件漏洞植入WebShell
-
权限提升(T1068):利用文件上传漏洞获取系统权限
-
横向移动(T1210):通过开发环境JumpServer跳板攻击代码仓库
-
-
自动化工具联动
-
调用Nuclei验证组件漏洞:
nuclei -t cve-2023-5678.yaml
-
使用Chisel建立穿透隧道绕过网络隔离
-
生成修复建议
-
04
总结
本文主要介绍了DeepSeek或其他AI大模型可以在资产测绘、渗透路径规划方面的赋能方向,并通过一些基础代码做了基础常识,给出了较为基础的prompt案例,希望能助力师傅们往技术更深处思考。
值得一提,企业的安全体系建设可以遵循"攻防同步进化,边作战边建设"的原则:在持续完善IT基础设施(如自动化资产发现平台、威胁情报管道)的同时,配套开展安全团队能力建设(包括ATT&CK框架适配训练、红蓝对抗演习机制),在实战化对抗中实现技术优势向防御效能的有效转化。
否则,业内的这项看家本领,还真可能被AI替代。