前言
在我们参数算法完全还原的情况,请求网站却提示身份认证失败,我们推测可能存在的情况如下:
- cookies
- http2.0
- tls指纹
其中什么是tls指纹?
TLS指纹,也有人叫JA3指纹。在创建TLS连接时,根据TLS协议在Client Hello阶段发送的数据包就是就是TLS指纹。不同浏览器、不同版本(不同框架)因为对协议的理解和应用不一样,所以发送的数据包内容也就不一样,所以就形成了TLS指纹。
JA3:git:https://github.com/salesforce/ja3
简介:JA3 是一种对传输层安全应用程序进行指纹识别的方法。它于 2017 年 6 月首次发布在 GitHub 上,是 Salesforce 研究人员 John Althouse、Jeff Atkinson 和 Josh Atkins 的作品。创建的 JA3 TLS/SSL 指纹可以在应用程序之间重叠,但仍然是一个很好的妥协指标 (IoC)。指纹识别是通过创建客户端问候消息的 5 个十进制字段的哈希来实现的,该消息在 TLS/SSL 会话的初始阶段发送。
原理 :TLS 及其前身 SSL 用于为常见应用程序和恶意软件加密通信,以确保数据安全,因此可以隐藏在噪音中。要启动 TLS 会话,客户端将在 TCP 3 次握手之后发送 TLS 客户端 Hello 数据包。此数据包及其生成方式取决于构建客户端应用程序时使用的包和方法。服务器如果接受 TLS 连接,将使用基于服务器端库和配置以及 Client Hello 中的详细信息制定的 TLS Server Hello 数据包进行响应。由于 TLS 协商以明文形式传输,因此可以使用 TLS Client Hello 数据包中的详细信息来指纹和识别客户端应用程序
按照ja开发者自述,是在三次握手之后,客户端向服务端发起client hello包,这个包里带了客户端这边的一些特征发给服务端,服务端拿来解析数据包,然后回发一个hello给客户端,之后再进行ssl数据交互,下面这个图,就是John Althouse自己画的
识别原理
- JA3 不是简单地查看使用的证书,而是解析在 SSL 握手期间发送的 TLS 客户端 hello 数据包中设置的多个字段。然后可以使用生成的指纹来识别、记录、警报和/或阻止特定流量。
- JA3 在 SSL 握手中查看客户端 hello 数据包以收集 SSL 版本和支持的密码列表。如果客户端支持,它还将使用所有支持的 SSL扩展、所有支持的椭圆曲线,最后是椭圆曲线点格式。这些字段以逗号分隔,多个值用短划线分隔(例如,每个支持的密码将在它们之间用短划线列出)
- JA3 方法用于收集 Client Hello 数据包中以下字段的字节的十进制值:版本、接受的密码、扩展列表、椭圆曲线和椭圆曲线格式。然后按顺序将这些值连接在一起,使用“,”分隔每个字段,使用“-”分隔每个字段中的每个值
怎么过TLS指纹?
1.hook tls 组件
目前hook方面,比较成熟的方案就是golang的cycletls,这个cycletls是我偶然找到的go库,也算是能解决大部分的平台。但是据我所知,已经有一些风控强的网站可以识别你hook修改过ja3指纹了
2.魔改openssl
用志远大佬的方案,重新编译openssl,在编译过程中,改c源码,使每次发起请求的tls指纹随机
个人觉得,按目前的行情,如果是校验很强的tls话,随机的可能不行
3.魔改socket
这个方案是@3301大佬魔改了一套socket收发包流程,是真的牛逼。
在几个月前我跟他私聊过,说好了一起研究的,因为种种原因我放他鸽子了,唉
不过他已经基本搞定,目前已开源:
https://github.com/zero3301/pyrequests/
作者自己说了,暂不支持HTTP2和tls1.3和动态tls指纹,他希望有人能跟他一起开发,感兴趣的可以跟他联系。
4.重编译chromium
根据我的研究,发现这个方案在针对tls方面,能改的不是很多,底层还是借助了浏览器,还不如直接rpc或者用selenium之类的
5.curl-impersonate+pycurl
这个方案相对来说最完美的方案。
大概过程:先编译curl-impersonate,编译成功后再继续编译 spike 大佬魔改过的pycurl,最后用编译好 pycurl 去访问 https://tls.peet.ws/api/all 来进行测试是否编译成功
大佬(帝国皇家近卫军),把编译好的直接打包发到了 pypi 社区。所以,我们只需要pip install 库,就可以直接用了
教程原链接:https://github.com/synodriver/pycurl/blob/master/special.markdown
Q佬的文章:python完美突破tls/ja3(docker版),文章链接:https://mp.weixin.qq.com/s/UZlLuzlQZrI7w82HI7zGuw
后面文章链接:https://blog.csdn.net/Y_morph/article/details/129487839?spm=1001.2014.3001.5502
实施1 - pycurl-antitls
准备环节
1.环境准备
vmware + ubuntu 22.04(建议直接使用ubuntu最新版。kali有问题,不建议使用,不支持window!!!)
2.上手环节
ubuntu22.04版本虽然自带了python3.10,但是没有pip,我们需要先更新下apt,然后下载pip。
apt update
apt install pip
然后直接一键安装大佬提供好的库
pip install pycurl-antitls==7.45.3rc1
安装好了以后还有一个简单的小步骤,需要移动一个文件到usr的lib文件夹下。大佬甚至贴心的附上了代码。
import sys
import os
base = os.path.join("/usr/local", "lib", "libcurl-impersonate-chrome.so")
with open(base, "rb") as inp, open("/usr/lib/libcurl-impersonate-chrome.so.4","wb") as out:
data = inp.read()
out.write(data)
libcurl-impersonate-chrome.so 这个文件是在 python3.10 同级目录下,然后我的 python3.10 是在/usr/local/lib 文件夹下,所以 libcurl-impersonate-chrome.so也在这。
然后就,就结束了…
因为我科学上网没弄好,没能成功访问测试网站。所以拿了猿人学练习平台的几道题进行了测试,直接通杀了,真是嘎嘎猛啊。
测试代码如下,请小伙伴们自行修改访问的网址
import pycurl
import json
from requests import Session
# 打印看下pycurl的版本是否和文章中的一致
print(pycurl.version)
result = 0
def my_func(data):
global result
d = json.loads(data)['data']
for i in d:
result += int(i['value'])
print(result)
headers = [
'Host: www.python-spider.com',
'accept: application/json, text/javascript, */*; q=0.01',
'accept-language: zh-CN,zh;q=0.9',
'content-type: application/x-www-form-urlencoded; charset=UTF-8',
'cookie: Cookie不能公开~',
'origin: https://www.python-spider.com',
'referer: https://www.python-spider.com/challenge/29',
'sec-ch-ua: ".Not/A)Brand";v="99", "Google Chrome";v="103", "Chromium";v="103"',
'sec-ch-ua-mobile: ?0',
'sec-ch-ua-platform: "Windows"',
'sec-fetch-dest: empty',
'sec-fetch-mode: cors',
'sec-fetch-site: same-origin',
'user-agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36',
'x-requested-with: XMLHttpRequest'
]
curl = pycurl.Curl()
curl.setopt(
curl.SSL_CIPHER_LIST,
'TLS_AES_128_GCM_SHA256,TLS_AES_256_GCM_SHA384,TLS_CHACHA20_POLY1305_SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-CHACHA20-POLY1305,ECDHE-RSA-CHACHA20-POLY1305,ECDHE-RSA-AES128-SHA,ECDHE-RSA-AES256-SHA,AES128-GCM-SHA256,AES256-GCM-SHA384,AES128-SHA,AES256-SHA'
)
curl.setopt(curl.HTTP_VERSION, curl.CURL_HTTP_VERSION_2_0)
curl.setopt(curl.SSLVERSION, curl.SSLVERSION_TLSv1_2)
curl.setopt(curl.SSL_ENABLE_NPN, 0)
curl.setopt(curl.SSL_ENABLE_ALPS, 1)
# curl.setopt(curl.SSL_FALSESTART, 0)
curl.setopt(curl.SSL_CERT_COMPRESSION, "brotli")
curl.setopt(pycurl.HTTP2_PSEUDO_HEADERS_ORDER, "masp")
curl.setopt(pycurl.HTTPHEADER, headers)
# my_func是处理数据返回的回调事件
curl.setopt(pycurl.WRITEFUNCTION, my_func)
url = 'https://www.python-spider.com/api/challenge29'
for i in range(1, 101):
data = "page={}".format(i)
curl.setopt(pycurl.POSTFIELDS, data)
# curl.setopt(curl.PROXY, 'https://127.0.0.1:xxxx')
curl.setopt(pycurl.URL, url)
curl.perform()
curl.close()
实施2 - golang 之 ja3transport库突破ja3
https://www.qinglite.cn/doc/865064776b9220166
实施3 - curl_cffi: 支持原生模拟浏览器 TLS/JA3 指纹的 Python 库
from curl_cffi import requests
proxy = {
'PROXY_USER': "xxx",
'PROXY_PASS': "xxx",
'PROXY_SERVER': "http://ip:port"
}
def get_proxys():
proxy_host = proxy.get('PROXY_SERVER').rsplit(
':', maxsplit=1)[0].split('//')[-1]
proxy_port = proxy.get('PROXY_SERVER').rsplit(':', maxsplit=1)[-1]
proxy_username = proxy.get('PROXY_USER')
proxy_pwd = proxy.get('PROXY_PASS')
proxyMeta = "http://%(user)s:%(pass)s@%(host)s:%(port)s" % {
"host": proxy_host,
"port": proxy_port,
"user": proxy_username,
"pass": proxy_pwd,
}
proxies = {
'http': proxyMeta,
'https': proxyMeta,
}
return proxies
url = "https://www.infinigo.com/cps/299027a8-9e4d-11ea-b100-20040fe763d8"
# url = "https://www.infinigo.com/classify/08007"
# url = "https://www.infinigo.com/category/"
payload = {}
response = requests.get(url, data=payload, proxies=get_proxys(), allow_redirects=False, impersonate="chrome101")
print(response.text)
实施4 - Pyhttpx
https://github.com/zero3301/pyhttpx
pip install pyhttpx
import pyhttpx
proxies = {'https': 'ip:port', 'http': 'ip:port'}
proxy_auth = ("xxx", "xxxx")
url = "https://www.infinigo.com/cps/299027a8-9e4d-11ea-b100-20040fe763d8"
# url = "https://www.infinigo.com/classify/08007"
# url = "https://www.infinigo.com/category/"
payload={}
sess = pyhttpx.HttpSession(browser_type='chrome', http2=True)
response = sess.get(url, proxies=proxies, proxy_auth=proxy_auth, allow_redirects=False)
print(response.text)
实施5 - 修改urllib3 ssl_源码的DEFAULT_CIPHERS里的加密算法
requests版本
JA3特征值是把HTTPS通信过程中SSL/TLS的Client Hello报文信息中 SSL 版本、密码、扩展名、椭圆曲线和椭圆曲线点格式5个属性值进行md5加密的指纹, 服务器可以用此值来标识客户端
反爬解决思路(python): 修改urllib3 ssl_源码的DEFAULT_CIPHERS里的加密算法顺序
import requests, json, redis, re, time, random
from scrapy.selector import Selector
from requests.adapters import HTTPAdapter
from urllib3.util.ssl_ import create_urllib3_context, DEFAULT_CIPHERS
import os, sys, numpy
BASE_DIR = os.path.dirname(
os.path.dirname(
os.path.dirname(
os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))))
sys.path.insert(0, BASE_DIR)
from Material.items.dataplatform import DetailDataplatformItem
from Material import settings
from Material.tools import configs
from Material.tools import utils
# DEFAULT_CIPHERS = 'EECDH+AESGCMEDH+AESGCM'
sess = requests.session()
sess.keep_alive = False
ORIGIN_CIPHERS_LIST = [
"ECDHE+AESGCM", "ECDHE+CHACHA20", "DHE+AESGCM", "DHE+CHACHA20",
"ECDH+AESGCM", "DH+AESGCM", "ECDH+AES", "DH+AES", "RSA+AESGCM", "RSA+AES"
]
class DESAdapter(HTTPAdapter):
def __init__(self, *args, **kwargs):
# ORIGIN_CIPHERS = ':'.join(["ECDHE+AESGCM", "RSA+AESGCM"])
# ORIGIN_CIPHERS = ":".join(random.sample(ORIGIN_CIPHERS_LIST, 2))
ORIGIN_CIPHERS = ":".join(random.sample(ORIGIN_CIPHERS_LIST, random.randint(2, len(ORIGIN_CIPHERS_LIST))))
CIPHERS = ORIGIN_CIPHERS.split(':')
random.shuffle(CIPHERS)
CIPHERS = ':'.join(CIPHERS)
self.CIPHERS = CIPHERS + ':!aNULL:!eNULL:!MD5'
super().__init__(*args, **kwargs)
def init_poolmanager(self, *args, **kwargs):
context = create_urllib3_context(ciphers=self.CIPHERS)
kwargs['ssl_context'] = context
return super(DESAdapter, self).init_poolmanager(*args, **kwargs)
def proxy_manager_for(self, *args, **kwargs):
context = create_urllib3_context(ciphers=self.CIPHERS)
kwargs['ssl_context'] = context
return super(DESAdapter, self).proxy_manager_for(*args, **kwargs)
class ParseInfinigoList:
def __init__(self):
self.host = 'https://www.infinigo.com'
self.ua = 'PostmanRuntime/7.28.4'
self.headers = {
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Cache-Control': 'max-age=0',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'User-Agent':
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36',
'Host': 'www.infinigo.com',
}
self.sess = sess
self.sess.mount('https://www.infinigo.com', DESAdapter())
self.sess.mount('http://', HTTPAdapter(max_retries=3))
self.sess.mount('https://', HTTPAdapter(max_retries=3))
def parse_list(self, list_url, list_item):
self.sess.headers = self.headers
nums = int(list_item.get('nums', '0'))
cur_page = re.search(r'page=(\d+)', list_url).group(1)
pages = nums // 5 + 1 if nums % 5 != 0 else nums // 5
for i in range(int(cur_page), pages + 1):
cur_url = re.sub(r'page=\d+', 'page={}'.format(str(i)), list_url)
try:
resp = self.sess.get(cur_url, timeout=60, allow_redirects=False)
html = Selector(text=resp.text)
list_title = html.xpath('//title/text()').extract_first()
if resp.status_code == 200 and 'Document' not in list_title:
lis_reg = '//*[@class="result-item"]//*[@class="model-item"]'
contents_reg = './/*[@class="item-content"]/*[contains(@class, "item-content")]'
ps_reg = './*'
text_reg = './/text()'
href_reg = './/*[@class="item-content"]/*[1]//a[1]/@href'
lis = html.xpath(lis_reg)
for li in lis:
list_json = dict()
contents = li.xpath(contents_reg)
for content in contents:
ps = content.xpath(ps_reg)
for p in ps:
text = ''.join([
pk.strip()
for pk in p.xpath(text_reg).extract()
if pk.strip()
])
kv = re.sub(r':', ':', text).split(':', 1)
key = kv[0].strip()
val = kv[1].strip() if len(kv) > 1 else ''
list_json[key] = val
href = li.xpath(href_reg).extract_first()
if href:
detailurl = href if re.search(
'http', href) else self.host + href
items = dict()
items['url'] = detailurl
items['list_json'] = list_json
items['category_name'] = list_item.get('category_name', '')
return items
except Exception as e:
print(e)
def parse_detail(self, detailurl, items):
try:
resp = self.sess.get(detailurl, timeout=30, allow_redirects=False)
html = Selector(text=resp.text)
detail_title = html.xpath('//title/text()').extract_first()
if resp.status_code == 200 and 'Document' not in detail_title:
detail_reg = '//*[@id="v-content"]'
content_fl_reg = './/*[contains(@class, "cps-content-box") and contains(@class, "fl")]'
content_fr_reg = './/*[contains(@class, "cps-fr") and contains(@class, "fr")]'
item_reg = './/*[@class="cps-item"]'
title_reg = './/*[contains(@class, "cps-fl-right")]//*[contains(@class, "cps-title")]/*[1]/@title'
pdf_reg = './/*[contains(@class, "cps-fl-right")]//*[contains(@class, "cps-title")]/a/@href'
img_reg = './/*[contains(@class, "cps-fl-box")]//*[@class="box"]//img/@src'
# 基础物料信息
parts_reg = './/*[contains(@class, "cps-fl-right")]//*[contains(@class, "cps-message")]//p'
part_text_reg = './/text()'
# 交易信息
elrow_reg = './/*[@class="el-row"]/*[1]//span'
elrow_span_reg = './preceding::text()[1]'
price_reg = './/*[@class="el-row"]/*[2]//*[contains(@class, "cps-price")]'
price_key_reg = './/*[contains(@class, "fl")]//p/text()'
price_val_reg = './/*[contains(@class, "fr")]//p/text()'
# 详细参数
item_trs_reg = './/*[contains(@class, "cps-item-args")]//table/tbody/tr'
item_td_text_reg = './td//text()'
detail = html.xpath(detail_reg)
content_fl = detail.xpath(content_fl_reg)
content_fr = detail.xpath(content_fr_reg)
item = html.xpath(item_reg)
title_x = content_fl.xpath(title_reg).extract_first()
if not title_x: return
title = title_x.strip()
pdf = content_fl.xpath(pdf_reg).extract_first()
if pdf:
pdf_url = pdf if re.search(
r'http', pdf) else self.host + pdf.strip('..')
else:
pdf_url = ''
img = content_fl.xpath(img_reg).extract_first()
if img and ('default.png' not in img):
img_url = img if re.search(
r'http', img) else self.host + img.strip('..')
else:
img_url = ''
all_json = dict()
all_json['基础物料信息'] = dict()
parts = content_fl.xpath(parts_reg)
for part in parts:
text = ''.join([
pk.strip() for pk in part.xpath(part_text_reg).extract()
if pk.strip()
])
kv = re.sub(r':', ':', text).split(':', 1)
key = kv[0].strip()
val = kv[1].strip() if len(kv) > 1 else ''
# if part_val in configs.REP_VUL: part_val = ''
if key: all_json['基础物料信息'][key] = val
brand_name = packing = descs = manner_packing = installation_type =''
for k, v in all_json['基础物料信息'].items():
if '品牌' in k: brand_name = v
if '封装' in k: packing = v
if '描述' in k: descs = v
all_json['交易信息'] = dict()
elrow_x = content_fr.xpath(elrow_reg)
elrows = []
for ex in elrow_x:
elrow_span = ex.xpath(elrow_span_reg).extract_first()
if elrow_span:
el_text = ex.xpath('./text()').extract_first()
val = elrow_span.strip() + el_text.strip(
) if el_text else elrow_span.strip()
elrows.append(val)
for elrow in elrows[:-2]:
elrow_split = re.sub(r':', ':', elrow).split(':', 1)
key = elrow_split[0].strip()
val = elrow_split[1].strip() if len(elrow_split) > 1 else ''
if '包装' in key: manner_packing = val
if key: all_json['交易信息'][key] = val
price_x = content_fr.xpath(price_reg)
price_k = [
pk.strip() for pk in price_x.xpath(price_key_reg).extract()
]
price_v = [
pv.strip() for pv in price_x.xpath(price_val_reg).extract()
]
all_json['交易信息']['price_json'] = dict()
for pk in price_k:
ind = price_k.index(pk)
if pk:
all_json['交易信息']['price_json'][pk] = price_v[ind]
# all_json['price_json'][pk] = price_v[ind]
all_json['详细参数'] = dict()
item_trs = item.xpath(item_trs_reg)
for tr in item_trs:
tds_text = [
tt.strip() for tt in tr.xpath(item_td_text_reg).extract()
]
kvs = numpy.array_split(tds_text, len(tds_text) // 2)
for kv in kvs:
key = kv[0].strip()
val = kv[1].strip()
if key: all_json['详细参数'][key] = val
if '安装类型' in key: installation_type = val
item = dict()
list_json = items.get('list_json')
if list_json:
item['list_json'] = list_json
all_json['list_json'] = list_json
if all_json['基础物料信息'] or all_json['交易信息'] or all_json['详细参数']:
# items['all_json'] = json.dumps(all_json, ensure_ascii=False)
item['all_json'] = all_json
item['title'] = title
item['brand_name'] = brand_name
item['packing'] = packing
item['manner_packing'] = manner_packing
item['installation_type'] = installation_type
item['descs'] = descs
item['raw_img_url'] = img_url
item['raw_pdf_url'] = pdf_url
item['url'] = detailurl
return item
except Exception as e:
print(e)
if __name__ == '__main__':
url = "https://www.infinigo.com/classify/01015?p=&f=&c=&n=&min=&max=&is_qty=1&page=1"
p = ParseInfinigoList()
p.parse_list(url, category)
# url = "https://www.infinigo.com/cps/00067f5d-7e63-4cd5-a58c-f1b34d601ff3"
# p = ParseInfinigoList()
# p.parse_detail(url, {})
scrapy版本
https://blog.51cto.com/u_15023263/3812458
测试案例:
https://www.marinetraffic.com/en/ais/details/ports/1253?name=SHANGHAI&country=China
经过验证发现浏览器,fidder,postman都正常访问,返回正常
但是python请求,返回异常
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import requests
import random
import requests, warnings
import urllib3
from requests.adapters import HTTPAdapter
from urllib3.util.ssl_ import create_urllib3_context
# 关闭警告
urllib3.disable_warnings()
warnings.filterwarnings("ignore")
ORIGIN_CIPHERS = (
'ECDH+AESGCM:DH+AESGCM:ECDH+AES256:DH+AES256:RSA+3DES:!aNULL:'
'!eNULL:!MD5'
)
class DESAdapter(HTTPAdapter):
def __init__(self, *args, **kwargs):
"""
A TransportAdapter that re-enables 3DES support in Requests.
"""
CIPHERS = ORIGIN_CIPHERS.split(':')
random.shuffle(CIPHERS)
CIPHERS = ':'.join(CIPHERS)
self.CIPHERS = CIPHERS + ':!aNULL:!eNULL:!MD5'
super().__init__(*args, **kwargs)
def init_poolmanager(self, *args, **kwargs):
context = create_urllib3_context(ciphers=self.CIPHERS)
kwargs['ssl_context'] = context
return super(DESAdapter, self).init_poolmanager(*args, **kwargs)
def proxy_manager_for(self, *args, **kwargs):
context = create_urllib3_context(ciphers=self.CIPHERS)
kwargs['ssl_context'] = context
return super(DESAdapter, self).proxy_manager_for(*args, **kwargs)
url = "https://www.marinetraffic.com/en/ais/details/ports/1253?name=SHANGHAI&country=China"
#TODO 1 原始请求
response = requests.get(url )
#TODO 2 尝试修改指纹绕过
# sec=requests.session()
# sec.mount('https://www.marinetraffic.com', DESAdapter())
# response = sec.get(url)
#TODO 3 使用第三方模块
# pip3 install curl_cffi
# from curl_cffi import requests
# response = requests.get(url, impersonate="chrome101")
if 'Just a moment' in response.text:#tls指纹被检测到,会返回这个信息
print('被检测')
else:#
print('成功绕过')
上面案例提供了两种过tls的思路
第一种:修改tls加密算法,生成指纹
第二种:使用前辈已开源的,指定浏览器,模拟其指纹
经过验证,第一种没有过tls,第二种过了tls