用Python制作邮件检测器

news2024/11/16 17:34:23

github地址:
https://github.com/CaLlMeErIC/MailDetective

因为需求需要写一个简单的邮件检测系统的框架,这里记录下思路
首先第一反应,这个检测系统不应该是各个邮件收件系统都有自带的吗,于是搜索了下是否有相关的邮件检测开源软件,发现有两个比较靠谱的:
1.spamassassin
https://spamassassin.apache.org/

2.mmpi
https://github.com/a232319779/mmpi

3.其他一大堆使用机器学习的nlp方法,论文保存在git的misc文件下了

发现实际mmpi比较符合我的需求,但是看了下感觉规则有点少了而且是几年前的,
spamassassin规则很丰富,而且一直在更新,但是也有问题:它的规则很大一部分是根据英文关键词和域名设计的,如果是中文的话需要再做适配,还有就是他的很多规则需要联网或者自定义黑白名单,我的想法是尽量做一个离线也能使用的脚本框架,于是借鉴
spamassassin的方式,把它的正则表达式规则移植过来。它的中心思想是,如果邮件命中了规则的话就给邮件加上对应的报警分数,当分数达到一定阈值的时候就报警。

我是尝试使用了python自带的email包来读取邮件,然后把各个字段以字典的形式保存下来,到时候对应的检测的时候直接检测对应字段即可

import email.header
import os
from email.parser import Parser
import json
import re
from OSutils import loadJson


class MailReader(object):
    """
    用于读取eml文件,并把各个字段保存下来
    使用python3.7内置的email包
    """

    def __init__(self, eml_path="", debug=False):
        """
        初始化属性
        """
        self.raw_email = None
        self.email_content = None
        self.process_log = ""
        self.debug = debug
        self.attribute_dict = {}
        self.mail_text = ""
        self.all_links = []
        self.urls = []
        self.tag = set()
        self.flag = set()
        if eml_path:
            self.__MailReader(eml_path)

    @staticmethod
    def decodeHeader(header_str):
        """
        输入需要解码的header字符串,返回解码结果
        """
        temp = email.header.decode_header(header_str)
        result = email.header.make_header(temp)
        return result

    def addTag(self, tag):
        """
        给邮件添加标签,以字符串的形式存放在列表中
        """
        self.tag.add(tag)
        return self.tag

    def addFlag(self, flag):
        """
        给邮件添加自定义的检测标记
        tag用于输出,flag用于规则检测
        """
        self.flag.add(flag)
        return self.flag

    def toString(self):
        """
        打印整个邮件以及日志
        """
        print("email内容:", self.email_content)
        if self.debug:
            print("process_log:", self.process_log)
        return self.email_content

    def toDict(self):
        """
        把header转换为字典形式,From,To,Subject需要单独解码
        字典的键统一小写
        """
        each_key: str
        all_str = []
        if self.attribute_dict != {}:
            return self.attribute_dict

        for each_key in set(self.email_content.keys()):
            self.attribute_dict.update({each_key.lower(): self.email_content.get_all(each_key)})
            all_str += self.email_content.get_all(each_key)

        for each_key in ["From", "To", "Subject"]:
            temp = []
            if each_key not in self.attribute_dict:
                continue
            for each_str in self.attribute_dict.get(each_key):
                each_str = str(self.decodeHeader(each_str))
                temp.append(each_str)
            self.attribute_dict.update({each_key.lower(): temp})
        self.attribute_dict.update({'body': self.getContent()})
        self.attribute_dict.update({'url': self.getUrls()})
        self.attribute_dict.update({'all': all_str})
        return self.attribute_dict

    def toJson(self):
        """
        把字典转换为json格式
        """
        if self.attribute_dict == {}:
            self.attribute_dict = self.toDict()
        return json.dumps(self.attribute_dict)

    def __MailReader(self, eml_path):
        """
        读取邮件,有些邮件开头会混入无用字符,需要去除才能提取信息
        """
        try:
            if os.path.exists(eml_path):
                with open(eml_path, encoding='utf-8', errors='ignore') as fp:
                    self.raw_email = fp.read()
                cut_len = 0
                for each_line in self.raw_email.split('\n'):
                    if ':' not in each_line:
                        cut_len += len(each_line) + 1
                    else:
                        break
                if cut_len:
                    self.raw_email = self.raw_email[cut_len:]
                self.email_content = Parser().parsestr(self.raw_email)
        except Exception as e:
            self.process_log += "读取邮件失败:" + str(e)
            self.toString()
        return self

    def parseMail(self, eml_path):
        """
        输入邮件路径,用email库整理邮件
        """
        self.attribute_dict = {}
        return self.__MailReader(eml_path)

    def getContent(self):
        """
        循环遍历数据块并尝试解码,暂时只处理text数据
        """
        all_content = []
        for par in self.email_content.walk():

            if not par.is_multipart():  # 这里要判断是否是multipart,是的话,里面的数据是无用的
                str_charset = par.get_content_charset(failobj=None)  # 当前数据块的编码信息
                if str_charset is None:
                    self.addTag("没有获取到部分内容的charset")
                    self.addFlag("NO_CHARSET")
                    continue

                str_content_type = par.get_content_type()
                if str_content_type in ('text/plain', 'text/html'):
                    try:
                        content = par.get_payload(decode=True)
                        all_content.append(content.decode(str_charset))
                    except Exception as e:
                        print(e)
        self.mail_text = all_content
        return all_content

    def getUrls(self):
        """
        获取所有的url链接,与getLinks不一样的是,getUrls的返回值是一个字符串列表
        """
        if self.urls:
            return self.urls
        self.getLinks()
        return self.urls

    def getLinks(self):
        """
        通过正则表达式,匹配超链接以及
        显示的属性内容,格式如下
         [('https://rashangharper.com/wp-admin/user/welllz/display/login.html', 'wellsfargo.com')]
        """
        if self.all_links:
            return self.all_links
        all_links = []
        self.urls = []

        if self.mail_text == "":
            self.getContent()

        pattern = '<a.*?href="(.+)".*?>(.*?)</a>'
        for part in self.mail_text:
            links = re.findall(pattern, part, re.IGNORECASE)
            all_links += links

        self.all_links = all_links
        for each_link in all_links:
            self.urls += list(each_link)
        return all_links


if __name__ == '__main__':
    a = MailReader("fakeherf.eml").toDict().get('date')[0]

然后是规则设计部分,也是参考spamassassin的规则, 分为简单规则和复杂规则,简单规则,直接用正则表达式检查邮件对应的部分即可
在这里插入图片描述
复杂规则的话,我是写成了python类的形式,然后在运行的时候动态加载,放在metarules文件夹里,类似于下面这样的:

class CheckMail(object):
    """
    检查邮件中,From是否和存在的真实发件源不一样
    """

    def __init__(self, input_mail):
        self.reader = input_mail
        self.score = 2.5
        self.description = "检查邮件中,From和存在的真实发件源不一样"

    @staticmethod
    def list2str(data_list):
        """
        把字符串列表转换成字符串
        """
        if isinstance(data_list, list):
            result = ""
            for each_str in data_list:
                result += each_str + " "
            return result[:-1]
        return data_list

    def getReport(self):
        """
        检测邮件的From字段,option_sender中是几种可能的
        真实发件人字段
        """

        header_dict = self.reader.toDict()
        if header_dict.get('from'):
            mail_from = self.list2str(header_dict.get('from'))
        else:
            self.reader.addTag("发件人缺失")
            return True, [self.score, "未检测到发件人字段"]

        for option_sender in ["x-mail-from", "return-path", "x-qq-orgsender", "sender"]:
            if option_sender in header_dict:
                for each_option_sender in header_dict.get(option_sender):
                    if each_option_sender not in mail_from:
                        self.reader.addTag("疑似伪造的发件人")
                        return True, [self.score, self.description]
        return False, []

其他内容及readme详见github

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/376095.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

行测-判断推理-图形推理-样式规律-属性规律-对称性

中心对称&#xff1a;可以看作&#xff0c;图上的每一个点&#xff0c;都能关于中心点&#xff0c;在图上找到另一个对称的点五个图都是轴对称图形&#xff0c;只有答案C是轴对称图形选C都是中心对称图形选A1 3 5中心对称2 4 6轴对称中心对称选B对称轴顺时针45旋转选A对称轴的数…

极智项目 | 实战pytorch arcface人脸识别

欢迎关注我的公众号 [极智视界]&#xff0c;获取我的更多经验分享 大家好&#xff0c;我是极智视界&#xff0c;本文介绍 实战pytorch arcface人脸识别&#xff0c;并提供完整项目源码。 本文介绍的实战arcface人脸识别项目&#xff0c;提供完整的可以一键训练、测试的项目工程…

微信公众号历史作品定向采集

最近有遇到微信公众号历史作品采集的需求,这里做一下记录, 登录自己注册好的的微信公众号后台进入创作界面,点击右上角的引用: 弹出如下界面: 选择查找公众号文章,输入要查找的公众号: 回车: 同时就可以打开F12开始抓包,选择公众号点击进入: appmsg?action=li…

golang 整合antlr语法校验

1. 背景 在项目中我们可能会遇到表达式检索的场景&#xff0c;例如&#xff0c;输入以下表达式检索&#xff0c;需要解析表达式并得到检索结果。 ip"192.168.1.3" && (port"80" || protocol"http")此时&#xff0c;我们需要对语法进行…

Linux启动过程

theme: channing-cyan 两种启动方式 传统启动方式&#xff08;LEGACYMBR&#xff09; 指传统BIOS启动方式&#xff0c;存在一些不足&#xff1a;比如最大只支持2TB磁盘&#xff0c;磁盘最多四个分区&#xff0c;且不支持图形操作 UEFIGPT方式 是新式的启动方式&#xff0c…

数学小课堂:三次方程(定理发明的过程)

文章目录 引言I 一元三次方程1.1 通解发明权之争1.2 费拉里-塔尔塔利亚公式1.3 Mathematica1.4 数学定理发明的过程引言 学习数学,最重要的是把实际问题变成数学问题,然后知道如何利用各种软件工具来解决。 方程是一个能把具体问题,等量转化成类型问题的好工具。 一元三次方…

Jetson AGX Orin安装Anaconda、Cuda、Cudnn、Pytorch最全教程

文章目录一&#xff1a;Anaconda安装二&#xff1a;Cuda、Cudnn安装三&#xff1a;Pytorch安装一&#xff1a;Anaconda安装 Jetson系列边缘开发板&#xff0c;其架构都是arm64&#xff0c;而不是传统PC的amd64&#xff0c;深度学习的环境配置方法大不相同。想要看amd64的相关环…

智能家居项目(六)之摄像头模块

目录 一、树莓派mipg-streamer实现监控功能调试 1、实现基本思路 2、安装摄像头模块 2.1、在安装sudo apt-get install libv4l-dev 的命令时报错 3、开启摄像头 以下内容是针对树莓派是stretch版本的修改办法&#xff1a; 一、树莓派mipg-streamer实现监控功能调试 1、…

有哪些前端面试题是必须要掌握的

对浏览器的缓存机制的理解 浏览器缓存的全过程&#xff1a; 浏览器第一次加载资源&#xff0c;服务器返回 200&#xff0c;浏览器从服务器下载资源文件&#xff0c;并缓存资源文件与 response header&#xff0c;以供下次加载时对比使用&#xff1b; 下一次加载资源时&#x…

Flow API搭建指南

搭建Flow API&#xff0c;首先需要安装知行之桥EDI系统&#xff0c;注意&#xff0c;Flow API为新增功能&#xff0c;仅在2022版本&#xff08;8336&#xff09;及以后支持&#xff0c;如果你发现正在使用的产品没有这个功能&#xff0c;可以在我们官网下载最新版本或者联系我们…

将企业文件共享解决方案与数据丢失防护配对

您的企业文件共享解决方案是否足够&#xff1f;企业文件共享解决方案已经是一种加密移动中敏感数据的好方法&#xff0c;但仅加密是不够的。 您能否确保不会意外传输敏感信息&#xff1f;您是否可以审核谁发送了什么&#xff1f;最后但并非最不重要的一点是&#xff0c;您是否…

【Autoware】2小时安装Autoware1.13(保姆级教程)

前言&#xff1a;ROS的出现使得机器人软件开发更加快速和模块化&#xff0c;在此基础上&#xff0c;Autoware.ai开源项目可以让我们很容易地将一套完整的自动驾驶软件部署到我们的测试车辆上&#xff0c;并见证它跑起来&#xff01; 文章目录1.Autoware简介2.电脑软硬件配置要求…

爆肝更新 Python 100道基础入门练习题(附答案)

前言 大家早好、午好、晚好吖 ❤ ~ 更多精彩内容、资源皆可点击文章下方名片获取此处跳转 实例001&#xff1a;数字组合 题目&#xff1a; 有四个数字&#xff1a;1、2、3、4&#xff0c;能组成多少个互不相同且无重复数字的三位数&#xff1f;各是多少&#xff1f; 程序分…

合宙入门教程之luat开发教程

合宙入门教程准备工作连接电脑建工程与烧录测试demo之main.lua实验现象准备工作 1.开发板&#xff08;1块&#xff09; 2.Luatools_v2.exe &#xff08;烧录固件软件&#xff09; 3.USB驱动 跳转合宙官网链接 连接电脑 1.首先安装合宙开发工具&#xff0c;其次安装USB驱动。…

【JAVA】一个项目如何预先加载数据?

这里写目录标题需求实现AutowiredPostConstruct实例CommandLineRunner实例ApplicationListener实例参考需求 一般我们可能会有一些在应用启动时加载资源的需求&#xff0c;局部或者全局使用&#xff0c;让我们来看看都有哪些方式实现。 实现 Autowired 如果是某个类里需求某…

山东大学机器学习期末2022

接力&#xff1a;山东大学机器学习期末2021 本来是不想写的&#xff0c;因为不想回忆起考试时啥也不会的伤痛&#xff0c;没想到最后给分老师海底捞&#xff0c;心情好了一些&#xff0c;还是一块写完 备考建议&#xff1a;多看ppt&#xff0c;多看ppt&#xff0c;多看ppt 山东…

关于 Android 线程优化这些知识你都该了解

前言在实际项目开发中会频繁的用到线程&#xff0c;线程使用起来是很简单&#xff0c;但是滥用线程会带来性能问题&#xff0c; 比如启动一个线程至少 占用16kb的内存、线程过多会导致cpu的频繁切换而cpu切换成本是很高的、消耗大量用户电量等问题&#xff0c; 所以应该让app的…

Set集合、HashSet集合、LinkedHashSet集合

1、Set集合的特点 无序&#xff0c;不重复、无索引 Set集合的方法上基本和Collection的API一致 2、Set集合的实现类特点 HashSet&#xff1a;无序、不重复、无索引 LinkedHashList&#xff1a;有序、不重复、无索引 TreeSet&#xff1a;可排序、不重复、无索引 public s…

taobao.fulfillment.order.assemble( 拆合单结果回传接口 )

&#xffe5;免费必须用户授权 拆合单结果回传接口 公共参数 请求地址: HTTP地址 http://gw.api.taobao.com/router/rest 公共请求参数: 公共响应参数: 请求参数 响应参数 点击获取key和secret 请求示例 TaobaoClient client new DefaultTaobaoClient(url, appkey, secr…

centos7合并home分区到root分区

最近在尝试通过物理机安装 CentOS&#xff0c;官方镜像默认安装时&#xff0c;如果没有手动分区&#xff0c;默认设置是会将 home 单独分区&#xff0c;系统分区默认为 50 GB&#xff0c;这里提供方法将 home 分区合并到 root 分区。 1.查看当前系统分区情况 输入命令&#x…