太极安全监控系统1.0(Python)

news2024/10/22 13:14:04

一、项目介绍和功能介绍

1.项目介绍

安全监控系统 是一个综合性的安全监控和防护工具,旨在帮助系统管理员检测和应对网络中的可疑活动。该系统集成了多种安全技术和工具,包括日志分析、网络流量监控、机器学习模型、动态防火墙规则配置、蜜罐部署、沙箱管理和自动反击功能。通过这些功能,系统能够有效地识别和应对潜在的安全威胁。

2.兼容性

操作系统: Windows 10
依赖库: scapy, whois, numpy, scikit-learn, tensorflow, geopy, Evtx, psutil, PyQt5, matplotlib
权限: 确保运行程序的用户具有管理员权限,以便配置防火墙规则和捕获网络流量。

import os
import sys
import subprocess
import re
import datetime
import threading
import tkinter as tk
from tkinter import messagebox, simpledialog, ttk
import scapy.all as scapy
import whois
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
import json
import random
import socket
import pickle
from geopy.geocoders import Nominatim
from collections import defaultdict
import Evtx.Evtx as evtx
import tensorflow as tf
import psutil
import logging
import time
import multiprocessing
from PyQt5.QtWidgets import QApplication, QMainWindow, QTabWidget, QWidget, QVBoxLayout, QPushButton, QListWidget, QLabel, QTreeView, QFileSystemModel, QTableWidget, QTableWidgetItem, QComboBox, QProgressBar
from PyQt5.QtCore import Qt, QThread, pyqtSignal
from PyQt5.QtGui import QPixmap, QImage
import matplotlib.pyplot as plt
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas

# 配置日志
logging.basicConfig(filename=os.path.join('logs', 'security_system.log'), level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 确保 TensorFlow 使用 GPU
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        tf.config.experimental.set_memory_growth(gpus[0], True)
    except RuntimeError as e:
        print(e)
        logging.error(f"TensorFlow GPU configuration error: {e}")

# 全局变量
suspicious_activities = []
packets = []
geolocator_cache = {}
whois_cache = {}
taiji_shield = None

# 配置防火墙规则
def configure_firewall():
    print("配置防火墙规则...")
    logging.info("配置防火墙规则...")
    # Windows 防火墙规则配置示例
    subprocess.run(["netsh", "advfirewall", "set", "currentprofile", "state", "on"])
    # 阻断已知恶意 IP 地址
    known_malicious_ips = ["192.168.1.100", "10.0.0.1"]
    for ip in known_malicious_ips:
        subprocess.run(["netsh", "advfirewall", "firewall", "add", "rule", "name=BlockMaliciousIP", "dir=in", "action=block", "remoteip=" + ip])
        subprocess.run(["netsh", "advfirewall", "firewall", "add", "rule", "name=BlockMaliciousIP", "dir=out", "action=block", "remoteip=" + ip])

# 读取和解析系统日志
def analyze_logs(log_file):
    print(f"分析日志文件 {log_file}...")
    logging.info(f"分析日志文件 {log_file}...")
    suspicious_activities = []
    
    try:
        with evtx.Evtx(log_file) as log:
            for record in log.records():
                xml = record.xml()
                if "IPTables-Input" in xml or "IPTables-Output" in xml or "Security-Audit" in xml:
                    match = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', xml)
                    if match:
                        ip_address = match.group(1)
                        timestamp = re.search(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}', xml)
                        if timestamp:
                            timestamp = timestamp.group(0)
                            suspicious_activities.append((timestamp, ip_address, xml.strip()))
    except Exception as e:
        print(f"分析日志文件时发生错误: {e}")
        logging.error(f"分析日志文件时发生错误: {e}")
    
    return suspicious_activities

# 使用 Scapy 抓取特定端口的流量
def capture_traffic(interface, port):
    print(f"抓取 {interface} 上的 {port} 端口流量...")
    logging.info(f"抓取 {interface} 上的 {port} 端口流量...")
    packets = scapy.sniff(iface=interface, filter=f"port {port}", count=100)
    return packets

# 获取入侵者地理位置
def get_geolocation(ip_address, geolocator_cache):
    if ip_address in geolocator_cache:
        return geolocator_cache[ip_address]
    
    try:
        geolocator = Nominatim(user_agent="security_system")
        location = geolocator.geocode(ip_address)
        if location:
            geolocator_cache[ip_address] = f"{location.city}, {location.country}"
            return geolocator_cache[ip_address]
        else:
            geolocator_cache[ip_address] = "未知位置"
            return "未知位置"
    except Exception as e:
        geolocator_cache[ip_address] = f"获取地理位置失败: {str(e)}"
        logging.error(f"获取地理位置失败: {e}")
        return geolocator_cache[ip_address]

# 验证 IP 地址
def verify_ip(ip_address, whois_cache):
    if ip_address in whois_cache:
        return whois_cache[ip_address]
    
    try:
        w = whois.whois(ip_address)
        if w and w.get('nets'):
            whois_cache[ip_address] = w.nets[0].get('description', "未知描述")
            return whois_cache[ip_address]
        else:
            whois_cache[ip_address] = "未知描述"
            return "未知描述"
    except Exception as e:
        whois_cache[ip_address] = f"验证 IP 失败: {str(e)}"
        logging.error(f"验证 IP 失败: {e}")
        return whois_cache[ip_address]

# 生成报告
def generate_report(suspicious_activities, report_file, geolocator_cache, whois_cache):
    print(f"生成报告到 {report_file}...")
    logging.info(f"生成报告到 {report_file}...")
    os.makedirs(os.path.dirname(report_file), exist_ok=True)
    with open(report_file, 'w') as file:
        file.write("可疑活动报告\n")
        file.write("=" * 30 + "\n")
        file.write(f"生成时间: {datetime.datetime.now()}\n")
        file.write("\n")
        file.write("时间戳\tIP 地址\t地理位置\t描述\t日志条目\n")
        file.write("-" * 80 + "\n")
        for activity in suspicious_activities:
            geolocation = get_geolocation(activity[1], geolocator_cache)
            description = verify_ip(activity[1], whois_cache)
            file.write(f"{activity[0]}\t{activity[1]}\t{geolocation}\t{description}\t{activity[2]}\n")

# 生成沙箱环境
def create_sandbox(ip_address):
    print(f"为 IP 地址 {ip_address} 创建沙箱...")
    logging.info(f"为 IP 地址 {ip_address} 创建沙箱...")
    sandbox_dir = os.path.join('temp', f'sandbox_{ip_address}')
    os.makedirs(sandbox_dir, exist_ok=True)
    
    # 模拟多线程和多核处理
    def simulate_computation():
        for i in range(1000000):
            pass
    
    processes = []
    for _ in range(2):
        process = multiprocessing.Process(target=simulate_computation)
        processes.append(process)
        process.start()
    
    for process in processes:
        process.join()
    
    with open(os.path.join(sandbox_dir, "README.txt"), 'w') as file:
        file.write(f"沙箱环境用于 IP 地址: {ip_address}\n")
        file.write("此目录被隔离以防止潜在威胁。\n")
    
    return sandbox_dir

# 自适应防护机制(太极护盾)
class TaiJiShield:
    def __init__(self):
        self.isolation_forest = IsolationForest(contamination=0.01)
        self.one_class_svm = OneClassSVM(nu=0.01, kernel='rbf', gamma='scale')
        self.scaler = StandardScaler()
        self.data = []
        self.model_trained = False
        self.model_path = os.path.join('models', 'taiji_model.pkl')
    
    def train(self, new_data):
        self.data.extend(new_data)
        self.data = np.array(self.data)
        self.data = self.scaler.fit_transform(self.data)
        self.isolation_forest.fit(self.data)
        self.one_class_svm.fit(self.data)
        self.model_trained = True
        self.save_model()
        logging.info("模型训练完成并保存")
    
    def incremental_train(self, new_data):
        if not self.model_trained:
            self.train(new_data)
            return
        
        new_data = np.array(new_data)
        new_data = self.scaler.transform(new_data)
        self.isolation_forest.fit(new_data)
        self.one_class_svm.fit(new_data)
        self.save_model()
        logging.info("模型增量训练完成并保存")
    
    def predict(self, data):
        if not self.model_trained:
            return 1, 1
        data = np.array(data).reshape(1, -1)
        data = self.scaler.transform(data)
        isolation_forest_pred = self.isolation_forest.predict(data)
        one_class_svm_pred = self.one_class_svm.predict(data)
        return isolation_forest_pred[0], one_class_svm_pred[0]
    
    def save_model(self):
        model_data = {
            'isolation_forest': self.isolation_forest,
            'one_class_svm': self.one_class_svm,
            'scaler': self.scaler
        }
        with open(self.model_path, 'wb') as file:
            pickle.dump(model_data, file)
        logging.info("模型已保存到文件")
    
    def load_model(self):
        if os.path.exists(self.model_path):
            with open(self.model_path, 'rb') as file:
                model_data = pickle.load(file)
                self.isolation_forest = model_data['isolation_forest']
                self.one_class_svm = model_data['one_class_svm']
                self.scaler = model_data['scaler']
                self.model_trained = True
                logging.info("模型已从文件加载")

# 动态黑名单
def update_blacklist(ip_address):
    blacklist_path = os.path.join('temp', 'blacklist.conf')
    with open(blacklist_path, 'a') as file:
        file.write(f"{ip_address}\n")
    subprocess.run(["netsh", "advfirewall", "firewall", "add", "rule", "name=BlockMaliciousIP", "dir=in", "action=block", "remoteip=" + ip_address])
    subprocess.run(["netsh", "advfirewall", "firewall", "add", "rule", "name=BlockMaliciousIP", "dir=out", "action=block", "remoteip=" + ip_address])

# 部署蜜罐
def deploy_honeypot(port):
    def handle_client(client_socket):
        client_socket.send(b"Welcome to the honeypot!")
        client_socket.close()
    
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(('0.0.0.0', port))
    server.listen(5)
    print(f"Honeypot listening on port {port}")
    logging.info(f"Honeypot listening on port {port}")
    
    while True:
        client_socket, addr = server.accept()
        print(f"Connection from {addr}")
        logging.info(f"Connection from {addr}")
        threading.Thread(target=handle_client, args=(client_socket,)).start()

# 动态端口跳跃
def dynamic_port_jumping(service_port):
    new_port = random.randint(1024, 65535)
    subprocess.run(["netsh", "interface", "portproxy", "add", "v4tov4", "listenport=" + str(service_port), "connectport=" + str(new_port), "connectaddress=127.0.0.1"])
    print(f"Service port {service_port} redirected to {new_port}")
    logging.info(f"Service port {service_port} redirected to {new_port}")

# 模拟攻击
def simulate_attack(ip_address):
    for _ in range(10):
        subprocess.run(["ping", "-n", "1", ip_address])

# 有限反击增强
def enhanced_limited_counterattack(ip_address):
    print(f"对 IP 地址 {ip_address} 进行增强有限反击...")
    logging.info(f"对 IP 地址 {ip_address} 进行增强有限反击...")
    update_blacklist(ip_address)
    deploy_honeypot(random.randint(1024, 65535))
    dynamic_port_jumping(22)
    simulate_attack(ip_address)

# 数据流识别与转化
def transform_traffic(packets):
    transformed_packets = []
    for packet in packets:
        if packet.haslayer(scapy.IP):
            packet[scapy.IP].src = "192.168.1.100"  # 模拟正常流量
            packet[scapy.IP].dst = "192.168.1.101"
        transformed_packets.append(packet)
    return transformed_packets

# 蜜罐虚拟小世界空间
def create_advanced_honeypot(ip_address):
    sandbox_dir = create_sandbox(ip_address)
    honeypot_port = random.randint(1024, 65535)
    threading.Thread(target=deploy_honeypot, args=(honeypot_port,)).start()
    return sandbox_dir, honeypot_port

# 对冲数据流反击
def release_counter_traffic(packets):
    transformed_packets = transform_traffic(packets)
    for packet in transformed_packets:
        scapy.sendp(packet, iface="以太网")  # 替换为实际的网络接口名称

# 失控次级蜜罐的定位、突破、引爆
def explode_honeypot(ip_address, sandbox_dir, honeypot_port):
    print(f"引爆蜜罐 {ip_address}:{honeypot_port}...")
    logging.info(f"引爆蜜罐 {ip_address}:{honeypot_port}...")
    subprocess.run(["netsh", "advfirewall", "firewall", "add", "rule", "name=BlockMaliciousIP", "dir=in", "action=block", "remoteip=" + ip_address])
    subprocess.run(["netsh", "advfirewall", "firewall", "add", "rule", "name=BlockMaliciousIP", "dir=out", "action=block", "remoteip=" + ip_address])

# 高级防护机制
def advanced_protection(ip_address, data_stream):
    print(f"对 IP 地址 {ip_address} 进行高级防护...")
    logging.info(f"对 IP 地址 {ip_address} 进行高级防护...")
    # 复制数据流
    copied_data = data_stream.copy()
    
    # 反向释放对冲数据流
    release_counter_traffic(copied_data)
    
    # 模拟学习入侵者攻击方法
    def simulate_attack(data):
        # 模拟学习过程
        model = Sequential()
        model.add(Dense(64, input_dim=len(data[0]), activation='relu'))
        model.add(Dropout(0.5))
        model.add(Dense(1, activation='sigmoid'))
        model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
        model.fit(data, np.zeros(len(data)), epochs=10, batch_size=32)
        return model
    
    model = simulate_attack(data_stream)
    
    # 形成沙盒等虚拟小空间进行捕捉、吞噬、拟合、强化、反击
    sandbox_dir, honeypot_port = create_advanced_honeypot(ip_address)
    
    # 发送即将失控的沙箱到对方并进行引爆
    explode_honeypot(ip_address, sandbox_dir, honeypot_port)

# 动态端口监控
def monitor_ports():
    while True:
        open_ports = []
        for conn in psutil.net_connections(kind='inet'):
            if conn.status == psutil.CONN_LISTEN:
                open_ports.append(conn.laddr.port)
        
        # 检查是否有新的未授权端口
        authorized_ports = [22, 80, 443]  # 替换为实际的授权端口列表
        unauthorized_ports = set(open_ports) - set(authorized_ports)
        
        if unauthorized_ports:
            for port in unauthorized_ports:
                print(f"检测到未授权端口: {port}")
                logging.warning(f"检测到未授权端口: {port}")
                # 关闭未授权端口
                subprocess.run(["netsh", "interface", "portproxy", "delete", "v4tov4", "listenport=" + str(port)])
                subprocess.run(["netsh", "advfirewall", "firewall", "add", "rule", "name=BlockUnauthorizedPort", "dir=in", "action=block", "protocol=TCP", "localport=" + str(port)])
                subprocess.run(["netsh", "advfirewall", "firewall", "add", "rule", "name=BlockUnauthorizedPort", "dir=out", "action=block", "protocol=TCP", "localport=" + str(port)])
        
        # 休眠一段时间后再次检查
        time.sleep(60)

# 获取网络连接信息
def get_network_connections():
    connections = []
    for conn in psutil.net_connections(kind='inet'):
        if conn.raddr and conn.laddr:
            sent_bytes = conn.bytes_sent / 3  # 假设每3秒更新一次
            recv_bytes = conn.bytes_recv / 3  # 假设每3秒更新一次
            total_bytes = (conn.bytes_sent + conn.bytes_recv) / 3  # 假设每3秒更新一次
            connections.append({
                'pid': conn.pid,
                'laddr': f"{conn.laddr.ip}:{conn.laddr.port}",
                'raddr': f"{conn.raddr.ip}:{conn.raddr.port}",
                'status': conn.status,
                'sent_bytes': sent_bytes,
                'recv_bytes': recv_bytes,
                'total_bytes': total_bytes,
                'process': psutil.Process(conn.pid).name() if conn.pid else 'Unknown'
            })
    return connections

# 主函数
class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("安全监控系统")
        
        # 创建 TabWidget
        self.tab_widget = QTabWidget()
        self.setCentralWidget(self.tab_widget)
        
        # 创建实时监控标签页
        self.monitoring_tab = QWidget()
        self.tab_widget.addTab(self.monitoring_tab, "实时监控")
        
        # 创建日志查看标签页
        self.log_tab = QWidget()
        self.tab_widget.addTab(self.log_tab, "日志查看")
        
        # 创建沙箱管理标签页
        self.sandbox_tab = QWidget()
        self.tab_widget.addTab(self.sandbox_tab, "沙箱管理")
        
        # 创建反击操作标签页
        self.counterattack_tab = QWidget()
        self.tab_widget.addTab(self.counterattack_tab, "反击操作")
        
        # 创建网络连接监控标签页
        self.network_tab = QWidget()
        self.tab_widget.addTab(self.network_tab, "网络连接监控")
        
        # 实时监控标签页
        self.listbox = QListWidget()
        self.refresh_button = QPushButton("刷新可疑活动")
        self.refresh_button.clicked.connect(self.refresh_suspicious_activities)
        
        layout = QVBoxLayout()
        layout.addWidget(self.listbox)
        layout.addWidget(self.refresh_button)
        self.monitoring_tab.setLayout(layout)
        
        # 日志查看标签页
        self.log_text = QLabel("日志内容")
        self.report_button = QPushButton("显示报告")
        self.report_button.clicked.connect(self.show_report)
        
        layout = QVBoxLayout()
        layout.addWidget(self.log_text)
        layout.addWidget(self.report_button)
        self.log_tab.setLayout(layout)
        
        # 沙箱管理标签页
        self.sandbox_listbox = QListWidget()
        self.refresh_sandboxes_button = QPushButton("刷新沙箱列表")
        self.refresh_sandboxes_button.clicked.connect(self.refresh_sandboxes)
        
        layout = QVBoxLayout()
        layout.addWidget(self.sandbox_listbox)
        layout.addWidget(self.refresh_sandboxes_button)
        self.sandbox_tab.setLayout(layout)
        
        # 反击操作标签页
        self.counterattack_entry = QLineEdit()
        self.counterattack_button = QPushButton("执行反击")
        self.counterattack_button.clicked.connect(self.perform_counterattack)
        
        layout = QVBoxLayout()
        layout.addWidget(self.counterattack_entry)
        layout.addWidget(self.counterattack_button)
        self.counterattack_tab.setLayout(layout)
        
        # 网络连接监控标签页
        self.tree = QTreeView()
        self.refresh_connections_button = QPushButton("刷新连接")
        self.refresh_connections_button.clicked.connect(self.refresh_connections)
        
        layout = QVBoxLayout()
        layout.addWidget(self.tree)
        layout.addWidget(self.refresh_connections_button)
        self.network_tab.setLayout(layout)
        
        # 初始化太极护盾
        self.taiji_shield = TaiJiShield()
        self.taiji_shield.load_model()
        
        # 启动后台任务
        self.background_thread = BackgroundThread()
        self.background_thread.suspicious_activities_signal.connect(self.update_suspicious_activities)
        self.background_thread.start()
    
    def refresh_suspicious_activities(self):
        log_file = os.path.join('C:', 'Windows', 'System32', 'winevt', 'Logs', 'Security.evtx')
        if not os.path.exists(log_file):
            raise FileNotFoundError(f"日志文件 {log_file} 不存在")
        
        global suspicious_activities
        suspicious_activities = analyze_logs(log_file)
        self.listbox.clear()
        for activity in suspicious_activities:
            geolocation = get_geolocation(activity[1], geolocator_cache)
            description = verify_ip(activity[1], whois_cache)
            self.listbox.addItem(f"{activity[0]} - {activity[1]} - {geolocation} - {description} - {activity[2]}")
    
    def show_report(self):
        report_file = os.path.join('temp', 'suspicious_activities_report.txt')
        with open(report_file, 'r') as file:
            report_content = file.read()
        self.log_text.setText(report_content)
    
    def refresh_sandboxes(self):
        self.sandbox_listbox.clear()
        for activity in suspicious_activities:
            ip_address = activity[1]
            sandbox_dir = os.path.join('temp', f'sandbox_{ip_address}')
            if os.path.exists(sandbox_dir):
                self.sandbox_listbox.addItem(f"{ip_address} - {sandbox_dir}")
    
    def perform_counterattack(self):
        ip_address = self.counterattack_entry.text()
        response = QMessageBox.question(self, "反击操作", f"您确定要对 IP 地址 {ip_address} 进行反击吗?", QMessageBox.Yes | QMessageBox.No)
        if response == QMessageBox.Yes:
            enhanced_limited_counterattack(ip_address)
            QMessageBox.information(self, "反击成功", f"已对 IP 地址 {ip_address} 进行反击。")
    
    def refresh_connections(self):
        model = QFileSystemModel()
        model.setRootPath('')
        self.tree.setModel(model)
        self.tree.setRootIndex(model.index(''))
    
    def update_suspicious_activities(self, activities):
        self.refresh_suspicious_activities()

class BackgroundThread(QThread):
    suspicious_activities_signal = pyqtSignal(list)
    
    def run(self):
        # 抓取流量
        interface = "以太网"  # 替换为实际的网络接口名称
        global packets
        packets = capture_traffic(interface, 22)
        
        # 配置防火墙
        configure_firewall()
        
        # 生成报告
        report_file = os.path.join('temp', 'suspicious_activities_report.txt')
        os.makedirs(os.path.dirname(report_file), exist_ok=True)
        generate_report(suspicious_activities, report_file, geolocator_cache, whois_cache)
        
        # 提取特征并训练模型
        features = []
        for activity in suspicious_activities:
            ip_address = activity[1]
            geolocation = get_geolocation(ip_address, geolocator_cache)
            description = verify_ip(ip_address, whois_cache)
            features.append([ip_address, geolocation, description])
        
        # 将特征转换为数值
        features = [[hash(ip), hash(geo), hash(desc)] for ip, geo, desc in features]
        taiji_shield.incremental_train(features)
        
        # 启动动态端口监控
        threading.Thread(target=monitor_ports).start()
        
        self.suspicious_activities_signal.emit(suspicious_activities)

if __name__ == "__main__":
    app = QApplication(sys.argv)
    window = MainWindow()
    window.show()
    sys.exit(app.exec_())

三、代码功能及改进方向

1.代码功能

(1)日志分析

功能: 读取和解析系统日志文件,提取可疑活动。
实现: 使用 Evtx 库解析 Windows 安全日志文件,提取包含 IPTables-Input、IPTables-Output 和 Security-Audit 的日志条目。

(2)网络流量监控

功能: 抓取指定端口的网络流量,进行实时监控。
实现: 使用 Scapy 库抓取网络流量,支持动态获取网络接口名称。

(3)地理位置和 IP 验证

功能: 获取入侵者的地理位置和 IP 地址的详细信息。
实现: 使用 geopy 库获取地理位置,使用 whois 库验证 IP 地址。

(4)报告生成

功能: 生成可疑活动报告,记录时间戳、IP 地址、地理位置、描述和日志条目。
实现: 将分析结果写入文本文件,方便查看和存档。

(5)沙箱管理

功能: 为可疑 IP 地址创建隔离的沙箱环境,防止潜在威胁。
实现: 使用 os 库创建沙箱目录,并模拟多线程和多核处理。

(6)自适应防护机制(太极护盾)

功能: 使用机器学习模型(Isolation Forest 和 One-Class SVM)进行异常检测,并支持增量训练。
实现: 训练模型并保存到文件,支持增量训练以提高模型的准确性和适应性。

(7)动态黑名单

功能: 将已知恶意 IP 地址添加到黑名单,并配置防火墙规则进行阻断。
实现: 使用 subprocess 库配置 Windows 防火墙规则。

(8)蜜罐部署

功能: 部署蜜罐以吸引和分析攻击行为。
实现: 使用 socket 库创建监听端口,模拟简单的响应。

(9)动态端口跳跃

功能: 动态改变服务端口,增加攻击难度。
实现: 使用 subprocess 库配置端口代理。

(10)模拟攻击

功能: 模拟简单的网络攻击,用于测试和验证系统反应。
实现: 使用 subprocess 库发送 ICMP 请求。

(11)有限反击增强

功能: 对可疑 IP 地址进行增强的反击操作,包括更新黑名单、部署蜜罐、动态端口跳跃和模拟攻击。
实现: 综合使用上述功能,提供多层次的反击手段。

(12)数据流识别与转化

功能: 识别和转化网络数据流,模拟正常流量。
实现: 使用 Scapy 库修改数据包的源和目标地址。

(13)高级防护机制

功能: 对可疑 IP 地址进行高级防护,包括反向释放对冲数据流、模拟学习攻击方法、形成虚拟沙箱和引爆蜜罐。
实现: 综合使用上述功能,提供全面的防护措施。

(14)动态端口监控

功能: 监控系统开放的端口,检测和阻止未授权端口。
实现: 使用 psutil 库获取网络连接信息,配置防火墙规则。

(15)网络连接监控

功能: 显示当前系统的网络连接信息,包括进程、PID、本地地址、远程地址、状态和传输字节数。
实现: 使用 psutil 库获取网络连接信息,通过 QTreeView 显示。

(16)用户界面

功能: 提供图形用户界面,方便用户操作和查看系统状态。
实现: 使用 PyQt5 库创建图形用户界面,支持多个标签页,包括实时监控、日志查看、沙箱管理、反击操作和网络连接监控。

2.改进方向

(1)动态获取网络接口名称

当前实现: 硬编码了网络接口名称为“以太网”。
改进方向: 使用 Scapy 或其他库动态获取当前系统的网络接口名称,并在配置中选择合适的接口。

(2)性能优化

当前实现: 模拟多线程和多核处理较为简单。
改进方向: 使用更高效的计算方法或并行计算库(如 multiprocessing)进行优化。

(3)用户界面优化

当前实现: 当前的 GUI 较为基础。
改进方向: 使用更高级的 GUI 库(如 PyQt 或 Kivy),增加图表、动画等元素,提升用户体验。

(4)安全性增强

当前实现: 反击措施较为简单。
改进方向: 引入更多的安全工具和库,实现更高级的反击措施,如释放病毒、强制断开连接等。

(5)模型训练和迭代

当前实现: 模型在每次启动时重新训练。
改进方向: 在每次检测到新的可疑活动时进行增量训练,以提高模型的准确性和适应性。

(6)日志文件解析

当前实现: 只解析包含 IPTables-Input、IPTables-Output 和 Security-Audit 的日志条目。
改进方向: 增加更多的正则表达式匹配条件,解析不同类型的安全日志。

(7)自动化和脚本化

当前实现: 手动操作较多。
改进方向: 增加自动化脚本,减少手动干预,提高效率。

(9)多平台支持

当前实现: 主要针对 Windows 10。
改进方向: 增加对其他操作系统的支持,如 Linux 和 macOS。

(9)日志记录和报警

当前实现: 基本的日志记录功能。
改进方向: 增加更详细的日志记录,支持日志归档和报警功能,如通过邮件或短信通知管理员。

(10)模块化和可扩展性

当前实现: 功能较为集中。
改进方向: 将各个功能模块化,便于维护和扩展,支持插件化设计。
通过这些改进方向,可以进一步提升系统的性能、安全性和用户体验,使其成为一个更加全面和强大的安全监控和防护工具。

3.注意事项

防火墙规则配置: 确保用户具有管理员权限,否则防火墙规则配置可能失败。
日志文件解析: 确保 Security.evtx 文件存在并且可读。
网络连接监控: 确保 psutil 库已正确安装。
蜜罐和沙箱: 确保 scapy 库已正确安装,并且用户具有足够的权限捕获网络流量。
机器学习模型: 确保 scikit-learn 和 tensorflow 库已正确安装,并且 GPU 配置正确(如果使用 GPU)。
日志记录: 确保日志文件路径正确,并且用户具有写入权限。
希望这些改进和增强措施能够帮助你更好地保护系统安全。如果有任何进一步的问题或需要更多帮助,请随时告诉我。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2220822.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

离散数学 第二讲 特殊集合和集合间关系 笔记 [电子科大]王丽杰

1.2 特殊集合与集合间关系 空集 不含任何元素的集合叫做空集(empty set),记作∅. 空集可以符号化为 ∅ { x ∣ x ≠ x } ∅ \{ x|x ≠ x\} ∅{x∣xx} . 空集是绝对唯一的。 全集 针对一个具体范围,我们考虑的所有对象的集合叫做全集(universal se…

基于springboot招聘信息管理系统设计与实现(源码+定制+开发)

博主介绍: ✌我是阿龙,一名专注于Java技术领域的程序员,全网拥有10W粉丝。作为CSDN特邀作者、博客专家、新星计划导师,我在计算机毕业设计开发方面积累了丰富的经验。同时,我也是掘金、华为云、阿里云、InfoQ等平台…

视频美颜平台是如何搭建的?基于直播美颜SDK源码的开发技术详解

今天,笔者将详细讲解如何基于直播美颜SDK源码搭建视频美颜平台的技术路径。 一、理解视频美颜技术 视频美颜技术主要通过图像处理算法对视频流进行实时处理,包括肤色优化、瑕疵修复、面部特征增强等。实现这一目标需要高效的图像处理算法和稳定的实时渲…

5个最流行的图像嵌入模型对比

最近需要研究图像相似性搜索。我想知道基于架构训练方法的嵌入之间是否存在差异。但是,很少有博客比较几种模型之间的嵌入。因此,在这篇博客中,我将使用 Flickr 数据集 [6] 比较 EfficientNet [1]、ViT [2]、DINO-v2 [3]、CLIP [4] 和 BLIP-2…

Matlab软件进行金融时间序列数据的描述性统计代码

1、数据S&P500的收盘价格,return100*log(pt/pt-1) 方法1:用python代码 import numpy as np import pandas as pddef calculate_log_returns(prices):"""计算价格序列的对数收益率。参数:prices (numpy.array): 价格序列。返回:log_…

Mongodb基础用法【总结】

关系型数据库和非关系型数据库的区别 关系型数据库 1.在关系型数据库中,数据都是存储在表中的,对存储的内容有严格的要求 2.因为我们在创建表的时候久已经规定了表中的字段 存储的数据类型 是否为空 唯一标识等规则 3.由于操作的都是结构化的数据&#…

家政小程序搭建,数字化市场发展下的意义

家政服务行业作为当下社会生活中不可或缺的行业,需求量在逐渐增加,行业发展也趋向多样化。 随着数字化的浪潮,家政行业逐渐向数字化、智能化升级发展,推动行业高质量发展,迎合现代化发展趋势,这一转型为行…

83.【C语言】数据结构之顺序表的尾部插入和删除

目录 3.操作顺序表 2."伪"插入顺序表的元素 分析尾部插入函数SLPushBack 代码示例 SeqList.h main.c free(指针)出错的几种可能的原因 3."伪"删除顺序表元素 2.分析尾部删除函数SLPopBack 代码示例 错误检查 两种解决办法 1.判断size是否为负…

004-按照指定功能模块名称分组

按照指定功能模块名称分组 一、说明1.现在有一个需求:2.具体做法 二、代码案例三、效果展示 一、说明 1.现在有一个需求: 需要把一个功能模块的几个功能点放在同一个文档目录下,这几个功能点分布在不同的 Controller 2.具体做法 需要把他…

如何将markdown文件转换为pdf

最近笔者在用vscode写markdown,但是提交时往往需要交pdf。所以就涉及到如何将markdown转化为pdf格式。 首先,需要在vscode上安装插件 markdown Preview Enhanced 之后在vscode的右上角即可看到下述图标,点击,vscode右半面就会显示…

C++数据结构-图的存储及邻接矩阵的代码实现

1. 什么是图 图论(graph theory) 是数学的一个分支,它以 图 为研究的对象。 图论本身是应用数学的一部分,历史上图论曾经被很多数学家各自独立建立过。关于图论的最早文字记载最早出现在欧拉 1736 年的论著中,也就是…

2024年有哪些开放式耳机值得入手?开放式耳机排行榜10强

随着技术的不断进步与消费者需求的日益多样化,开放式耳机凭借其独特的优势——如保持对周围环境的感知、减少对耳道的压力等,逐渐成为市场上的一大热点。尤其是在健康意识不断提升的今天,开放式耳机不仅为音乐爱好者提供了全新的聆听体验&…

【C++语言】全面掌握const的用法

一、const 需要怎么理解?? const修饰的变量不能够再作为左值,初始化完成之后,值不能被修改 1.1 C语言的const const 修饰的量,可以不用初始化,不叫常量,叫做常变量。 void main() {const int…

Windows git 配置

需要在git-bash的目录下,配置.ssh 的配置文件 要 .ssh 目录下的配置无法使用

Modbus TCP报错:Response length is only 0 bytes

问题描述: 使用modbus_tk库,通过Modbus tcp连接PLC时,python中的一个报错信息: Response length is only 0 bytes报错原因: 与Modbus TCP 服务端建立连接后没有断开,继续作为长连接使用,客户端…

一文掌握Cephadm部署Ceph存储集群

📚 博客主页: StevenZeng学堂 🎉 本文专栏: 一文读懂Kubernetes一文读懂Harbor云原生安全实战指南云原生存储实践指南 ❤️ 摘要:随着企业数据量的增长和存储需求的复杂化,Ceph因其高可扩展性和灵活性,能…

AI劳动力崛起:人将面临失业危机?

场景 第一眼看到这个网站的时候,AI员工官网(好像是部署在美国),我觉得很好奇,真的可以让AI替代人类完成工作吗?替代到什么程度呢?能以自然语言直接驱动吗? 正好手上在做爬虫项目&am…

HCIP-HarmonyOS Application Developer 习题(十六)

(判断)1、HiLink通过分布式软总线的方式连接所有设备,强能力设备可对弱能力设备进行设备虚拟化,将弱设备当做本机设备直接调用。 答案:错误 分析:HiLink 主要针对的是应用开发者与第三方设备开发者&#xf…

医院排队叫号系统

医院分诊排队叫号系统是一种广泛应用于服务行业的智能化管理系统,系统可有效地解决病人就诊时排队的无序、医生工作量的不平衡、就诊环境嘈杂等问题,它主要用于改善服务流程,提高服务效率,优化客户体验。这种系统通常包括以下几个…

HarmonyOS Next应用开发——多种方式实现图片解码

【高心星出品】 图片解码 图片处理就是将设备中保存的图片进行编辑处理然后再存储下来,整个过程需要先图片解码,图片处理,最后在图片编码保存。 图片解码指将所支持格式的存档图片解码成统一的PixelMap,以便在应用或系统中进行…