文章目录
- 前言
- https改造-python https 改造
- 1.1. https 配置信任库
- 2. 客户端带证书https发送,、服务端关闭主机、ip验证
前言
如果您觉得有用的话,记得给博主点个赞,评论,收藏一键三连啊,写作不易啊^ _ ^。
而且听说点赞的人每天的运气都不会太差,实在白嫖的话,那欢迎常来啊!!!
https改造-python https 改造
这里说一下要改造的内容:
1、https 配置信任库;
2、客户端带证书https发送,、服务端关闭主机、ip验证;
代码结构,下图红框处:
1.1. https 配置信任库
SslConfig.py:
import ssl
def _addSsl(cert, key, hundle):
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(certfile=cert, keyfile=key)
context.load_verify_locations(cafile=hundle)
context.verify_mode = ssl.CERT_REQUIRED # 表示需要进行客户端认证。
context.check_hostname = False # 禁用主机名验证
return context
import configparser
import logging
import pathlib
from common.SslConfig import _addSsl
from flask import Flask
app = Flask(__name__)
# SSL 配置
current_dir = pathlib.Path(__file__).parent
cert = current_dir / 'ssl' / 'psbc_crt.pem'
key = current_dir / 'ssl' / 'psbc_key.pem'
bundle = current_dir / 'ssl' / 'psbc_full.pem'
context = _addSsl(cert, key, bundle)
@app.route('/data')
def data():
return 'hello world'
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8007, ssl_context=context)
测试:
访问:
https://127.0.0.1:8007/data
2. 客户端带证书https发送,、服务端关闭主机、ip验证
SslConfig.py:
import ssl
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.ssl_ import create_urllib3_context
# SSL 配置
class SSLAdapter(HTTPAdapter):
def __init__(self, cert=None, key=None, bundle=None, **kwargs):
self.cert = cert
self.key = key
self.bundle = bundle
super().__init__(**kwargs)
def init_poolmanager(self, *args, **kwargs):
context = create_urllib3_context()
if self.cert and self.key:
context.load_cert_chain(certfile=self.cert, keyfile=self.key)
if self.bundle:
context.load_verify_locations(cafile=self.bundle)
context.check_hostname = False # 不检查主机名
context.verify_mode = ssl.CERT_NONE # 不验证证书
kwargs['ssl_context'] = context
return super().init_poolmanager(*args, **kwargs)
def _addSsl(cert, key, hundle):
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(certfile=cert, keyfile=key)
context.load_verify_locations(cafile=hundle)
context.verify_mode = ssl.CERT_REQUIRED # 表示需要进行客户端认证。
context.check_hostname = False # 禁用主机名验证
return context
def _ssl_session(cert, key, hundle):
session = requests.Session()
adapter = SSLAdapter(cert=cert, key=key, bundle=hundle)
session.mount('https://', adapter)
session.verify = False # 禁用证书验证
return session
LogConfig.py
import logging
from logging import handlers
def _logging(log, f_name, backup_count, level_tag):
# 日志格式化 and 日志切割配置
level = logging.DEBUG
if level_tag == 'INFO':
level = logging.INFO
datefmt = '[%Y-%m-%d %H:%M:%S]'
format = '%(asctime)s - [%(levelname)s] - %(filename)s [line:%(lineno)d] - %(message)s'
format_str = logging.Formatter(format, datefmt)
# File handler
th = handlers.TimedRotatingFileHandler(
filename=f_name,
when='D',
interval=1,
backupCount=backup_count,
encoding='utf-8'
)
th.setFormatter(format_str)
th.setLevel(level)
log.addHandler(th)
# Console handler
ch = logging.StreamHandler()
ch.setFormatter(format_str)
ch.setLevel(level)
log.addHandler(ch)
log.setLevel(level)
return log
httpsDemo.py
import configparser
import logging
import pathlib
from common.LogConfig import _logging
from common.SslConfig import _addSsl,_ssl_session
import requests
from flask import Flask
app = Flask(__name__)
current_dir = pathlib.Path(__file__).parent
config_file = current_dir / 'config' / 'httpsDemo.ini'
config = configparser.ConfigParser()
with open(config_file, 'r', encoding='utf-8') as f:
config.read_file(f)
log_file = current_dir / config['logging']['log_file']
log_level = config['logging'].get('log_level', 'INFO').upper()
log_back_up_days = config['logging'].getint('log_back_up_days', 5) # 默认保留5个文件
# 初始化日志配置
log_file.parent.mkdir(parents=True, exist_ok=True)
log = logging.getLogger(__name__)
logger = _logging(log, log_file, log_back_up_days, log_level)
https_flag = config['params'].getint('https_flag', 0)
# SSL 配置
cert = current_dir / 'ssl' / 'psbc_crt.pem'
key = current_dir / 'ssl' / 'psbc_key.pem'
bundle = current_dir / 'ssl' / 'psbc_full.pem'
context = _addSsl(cert, key, bundle)
session = _ssl_session(cert, key, bundle)
@app.route('/data')
def data():
logger.info('======================== psot data hello world')
return 'hello world'
@app.route('/index')
def index():
msg = 'error'
url = 'https://127.0.0.1:8180/api/https/a'
logger.info(f'======================== get:{url}')
try:
# 指定信任证书库
response = session.get(url)
msg = response.text
except Exception as e:
logger.error(e)
logger.info(f'response:{msg}')
return msg
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8007, ssl_context=context)