DNS处理模块 dnspython
- 标题介绍
- 安装dnspython 模块
- 常用方法介绍
- 实践:DNS域名轮询业务监控
标题介绍
Dnspython 是 Python 的 DNS 工具包。它可用于查询、区域传输、动态更新、名称服务器测试和许多其他事情。
dnspython 模块提供了大量的 DNS 处理方法,最常用的方法是域名查询。
dspython 提供了一个DNS 解析器类–resolve,使用它的 reslover方法来实现域名的查询功能。
安装dnspython 模块
采用pip的方式直接安装
pip install dnspython
dns.resolve 源码如下
def resolve(
self,
qname: Union[dns.name.Name, str],
rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A,
rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN,
tcp: bool = False,
source: Optional[str] = None,
raise_on_no_answer: bool = True,
source_port: int = 0,
lifetime: Optional[float] = None,
search: Optional[bool] = None,
) -> Answer:
resolution = _Resolution(
self, qname, rdtype, rdclass, tcp, raise_on_no_answer, search
)
start = time.time()
while True:
(request, answer) = resolution.next_request()
# Note we need to say "if answer is not None" and not just
# "if answer" because answer implements __len__, and python
# will call that. We want to return if we have an answer
# object, including in cases where its length is 0.
if answer is not None:
# cache hit!
return answer
assert request is not None # needed for type checking
done = False
while not done:
(nameserver, tcp, backoff) = resolution.next_nameserver()
if backoff:
time.sleep(backoff)
timeout = self._compute_timeout(start, lifetime, resolution.errors)
try:
response = nameserver.query(
request,
timeout=timeout,
source=source,
source_port=source_port,
max_size=tcp,
)
except Exception as ex:
(_, done) = resolution.query_result(None, ex)
continue
(answer, done) = resolution.query_result(response, None)
if answer is not None:
return answer
resolve 方法的定义如下:
参数解析:
● qname 指定查询的名称
● rdtype 指定查询的类型 默认是A记录
- A记录,将主机名转换为IP地址;
- MX记录,邮件交换记录,定义邮件服务器的域名;
- CNAME记录,指别名记录,实现域名间的映射;
- NS记录,标记区域的域名服务器及授权子域;
- PTR记录,反向解析,与A记录相反,将IP转换为主机名;
- SOA记录,SOA标记,一个起始授权区的定义。
● rdclass 指定查询的网络类型 ,可选的值有IN、CH与HS,默认是 IN
● tcp 是否启用tcp查询模式
● source 和source_port 指定 查询用的源地址和端口
● raise_on_no_answer 查询无结果的时候,是否需要抛出异常
● lifetime 声明周期配置参数,采用默认值
常用方法介绍
- A记录
import dns.resolver
domain = "www.baidu.com"
# 获取解析对象
query_object = dns.resolver.resolve(qname=domain, rdtype='A') # 指定查询记录为A
print('查询对象:{}'.format(query_object))
# 应答对象
response_object = query_object.response # 指定查询记录为 A
print('应答对象:{}'.format(response_object))
print("-"*20)
# 解析对象
answer_object = response_object.answer # 指定查询记录为A
print('解析对象:{}'.format(answer_object))
# print(len(answer_object))
print('解析对象1:{}'.format(answer_object[0]))
print('解析对象2:{}'.format(answer_object[1]))
print("-"*20)
for i in answer_object:
print(f"查询条目: {i}")
for j in i.items:
print(f"解析记录: {j}")
import dns.resolver
# 基本信息
domain = input("Please input domain: " )
dns_type = 'A'
query_object = dns.resolver.resolve(qname=domain, rdtype='A')
for query_item in query_object.response.answer:
for item in query_item.items:
print("{}的A记录解析地址有:{}".format(domain, item))
- MX邮件记录
import dns.resolver
# 基本信息
domain = "163.com"
dns_type = 'MX'
# A记录解析效果
query_object = dns.resolver.resolve(domain, dns_type) # 指定查询记录为MX
# 从应答的answer中获取查询记录
for query_item in query_object.response.answer:
# 获取解析的记录信息
for item in query_item.items:
print("邮件服务器权重:{},邮件服务器地址:{} ".format(domain,item))
import dns.resolver
# 基本信息
domain = "163.com"
dns_type = 'MX'
# 获取解析对象
# query_object1 = dns.resolver.query(domain,dns_type) # query 后续可能不在支持
query_object = dns.resolver.resolve(domain, dns_type) # 指定查询记录为A
print("MX记录的结果:{} ".format(query_object))
# 从应答的response中获取查询记录
resp_object = query_object.response
print("MX记录的结果:{}".format(resp_object))
# 查看answer对象
answer_object = resp_object.answer
print("解析对象:{}".format(answer_object))
# 查看解析条目对象
answer_object = resp_object.answer
for query_item in answer_object:
print("查询条目:{}".format(query_item))
print('-' * 20)
# 查看解析后的记录条目--详情
# 1. preference 权重, exchange 替换服务器主机
query_item = resp_object.answer[0]
for item in query_item.items:
print("邮件服务器权重:{},邮件服务器地址:{} ".format(item.preference,item.exchange))
- NS 记录结构
NS(Name Server)域名服务器记录。用来表明由哪台服务器对该域名进行解析。在注册域名时,总有默认的DNS服务器,每个注册的域名都是由一个DNS域名服务器来进行解析的。但是需要注意的是只能输入一级域名,如:baidu.com;对于二级以及多级域名,如www.baidu.com、wenku.baidu.com则是错误的。
import dns.resolver
# 基本信息
domain = "jdy.com"
dns_type = 'NS'
# 获取解析对象
query_object = dns.resolver.resolve(domain, dns_type) # 指定查询记录为NS
print("NS记录的结果:{} ".format(query_object))
print('-' * 20)
# 从应答的response中获取查询记录
resp_object = query_object.response
print("NS记录的结果:{}".format(resp_object))
print('-' * 20)
# 查看answer对象
answer_object = resp_object.answer
print("解析对象:{}".format(answer_object))
print('-' * 20)
# 查看解析条目对象
answer_object = resp_object.answer
for query_item in answer_object:
print("查询条目:{}".format(query_item))
print('-' * 20)
# 查看解析后的记录条目 - 2
query_item = resp_object.answer[0]
for item in query_item:
print("查询条目:{}".format(item))
print("数据格式:{}".format(type(item)))
print('-' * 20)
query_item = resp_object.answer[0]
for item in query_item:
print("查询条目:{}".format(item.to_text()))
print("数据格式:{}".format(type(item.to_text())))
import dns.resolver
# 基本信息
domain = input("Please input domain: " )
dns_type = 'NS'
# NS记录解析效果
query_object = dns.resolver.resolve(domain, dns_type) # 指定查询记录为A
# 从应答的answer中获取查询记录
for query_item in query_object.response.answer:
# 获取解析的记录信息
for item in query_item.items:
print("{}的NS记录解析结果有:{} ".format(domain,item))
- CNAME记录结构
import dns.resolver
# 基本信息
domain = "www.baidu.com"
dns_type = 'CNAME'
# 获取解析对象
query_object = dns.resolver.resolve(domain, dns_type) # 指定查询记录为CNAME
print("CNAME记录的结果:{} ".format(query_object))
print('-' * 20)
# 从应答的response中获取查询记录
resp_object = query_object.response
print("CNAME记录的结果:{}".format(resp_object))
print('-' * 20)
# 查看answer对象
answer_object = resp_object.answer
print("解析对象:{}".format(answer_object))
print('-' * 20)
# 查看解析条目对象
answer_object = resp_object.answer
for query_item in answer_object:
print("查询条目:{}".format(query_item))
print('-' * 20)
# 查看解析后的记录条目 - 2
query_item = resp_object.answer[0]
for item in query_item:
print("查询条目:{}".format(item))
import dns.resolver
# 基本信息
domain = input("Please input domain: " )
dns_type = 'CNAME'
# NS记录解析效果
query_object = dns.resolver.resolve(domain, dns_type) # 指定查询记录为A
# 从应答的answer中获取查询记录
for query_item in query_object.response.answer:
# 获取解析的记录信息
for item in query_item.items:
print("{}的CNAME记录解析结果有:{} ".format(domain,item))
实践:DNS域名轮询业务监控
大部分的 DNS 解析都是一个域名对应一个 IP 址,但是通过 DNS 轮循技术可以做到个域名对应多个IP,从而实现最简单且高效的负载平衡,
不过此方案最大的弊端是目标主机不可用时无法被自动剔除,因此做好业务主机的服务可用监控至关重要。
本示例通过分析当前域名的解析 IP,再结合服务端口探测来实现自动监控,在域名解析中添加、删除 IP 时无须对监控脚本进行更改。
- 步骤
- 实现域名的解析,获取域名所有的A记录解析IP列表;
- 对IP列表进行HTTP级别的探测。
"""
代码思路:
1. 获取有效IP列表
获取记录类型
将获取的值添加到列表中
2. 对IP列表里面的值进行检测
创建IP地址的连接
尝试发起请求
对请求的响应内容进行判断,并输出结果
3. 测试主程序
先获取IP列表,然后对IP进行测试
"""
import dns.resolver
import http.client
import re
# 准备工作
iplist = [] #定义域名IP列表
appdomain = 'www.baidu.com' #定义业务域名
# 判断解析的数据是不是IP
def is_ipv4(ip):
# 使用正则表达式检查是否为IPv4地址
pattern = r'^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
return re.match(pattern, ip) is not None
def get_iplist(domain=""):
try:
dns_type = 'A'
ip_result = dns.resolver.resolve(domain, dns_type)
except Exception as e:
print("dns解析失败:{}".format(str(e)))
return False
# 将获取的值添加到列表中
for rdata in ip_result:
if is_ipv4(rdata.address): #要判断是不是IP需要 加上address
iplist.append(rdata.address)
print(type(data)) #<class 'dns.rdtypes.IN.A.A'>
print(type(data.address)) #<class 'str'>
return True
def checkip(ip):
conn = http.client.HTTPConnection(ip, port=80, timeout=10)
try:
conn.request("GET", "/", headers={"Host": appdomain})
response = conn.getresponse()
if response.status == 200:
first_bytes = response.read(6)
if first_bytes.startswith(b"<html>") or first_bytes.startswith(b"<!DOCT"):
print("{} 状态: Ok".format(ip))
else:
print("{} 状态: 可能不是HTML内容".format(ip))
else:
print("{} 状态码: {}".format(ip, response.status))
except Exception as e:
print("网络超时或者地址访问异常,请重试: {}".format(str(e)))
finally:
conn.close()
# 测试主程序
if __name__ == '__main__':
if get_iplist(appdomain) and len(iplist) > 0:
for ip in iplist:
# print("{} 的正常解析IP有: {}".format(appdomain,ip))
checkip(ip)
else:
print("dns 解析后的IP地址有误")
- A记录解析的IP地址必须能够被ping正常测试,否则失败
- 正常的126.com虽然可以正常,但是测试失败。
因为格式不一致,测试失败。
将二进制格式转为普通的utf-8格式
str(b’sdfads’,“utf-8”)
for rdata in ip_result:
if is_ipv4(rdata.address): #要判断是不是IP需要 加上address
iplist.append(rdata.address)
print(type(data)) #<class 'dns.rdtypes.IN.A.A'>
print(type(data.address)) #<class 'str'>