一、基础功能介绍
# coding=utf-8
import paramiko
from time import sleep
# 建立通信
transport = paramiko.Transport(('192.168.0.7', 22))
print(transport) # <paramiko.Transport at 0x5745ed0 (unconnected)>
# 建立连接
transport.connect(username='root', password='123456')
print(transport) # <paramiko.Transport at 0x5745ed0 (cipher aes128-ctr, 128 bits) (active; 0 open channel(s))>
# 注意 active 此时为0
# 开启一个信道
channel = transport.open_session()
print(channel) # <paramiko.Channel 0 (open) window=0 -> <paramiko.Transport at 0x5745ed0 (cipher aes128-ctr, 128 bits) (active; 1 open channel(s))>>
# 调用open_session后,active为1,表示已打开channel。此时只能下发命令,收不到回显
# 设置信道获取信息的超时时间。因为在调用 channel.recv(65535) 方法时 会阻塞执行,不设置就会卡死。
channel.timeout = 10
# 开启终端,进入交互模式
channel.get_pty()
channel.invoke_shell()
sleep(2) # 是为了一次能显示完所有回显,如果不等待2秒,获取的回显可能不完整。
# 检查通道是否有数据。若没有,则返回False,注意:不能用来判断已回显完。当下发命令后,执行出现卡顿,在卡顿期间信道是没有数据的。
channelStatus = channel.recv_ready()
print(channelStatus) # 此时返回True
# 获取返回的数据。此时返回的是登陆信息。注意:当信道没有数据是,若直接获取,则会处于阻塞状态
backMsg = channel.recv(65535).decode('utf-8') # 使用recv读取in-buffer内容,65535表示预读取内容大小,若该值小于in-buffer值,则会读取不完全
print(backMsg)
# 下发命令,命令后需追加\n表示发送命令
channel.send("10.27.0.7 \r")
sleep(2)
backMsg = channel.recv(65535).decode('utf-8')
print(backMsg)
channel.send("mysql22001 \r")
sleep(2)
backMsg = channel.recv(65535).decode('utf-8')
print(backMsg)
channel.send("quit; \r")
sleep(2)
backMsg = channel.recv(65535).decode('utf-8')
print(backMsg)
# 关闭信道
channel.close()
# 关闭连接
transport.close()
二、简单封装
import paramiko
import time
class SSH(object):
def __init__(self):
self.__transport = None
self.__channel = None
self.__default_end_check_infos = ["[root@"]
def connect(self, host, port, username, password, **kwargs):
# 建立通信
self.__transport = paramiko.Transport((host, port))
# 建立连接
self.__transport.connect(username=username, password=password)
# 开启一个信道
self.__channel = self.__transport.open_session()
self.__channel.timeout = 10
# 开启终端,进入交互模式
self.__channel.get_pty()
self.__channel.invoke_shell()
return self.__echo(**kwargs)
def disconnect(self):
self.__channel.close()
self.__transport.close()
def exec_cmd(self, cmd, **kwargs):
self.__channel.send("%s \r" % cmd)
return self.__echo(**kwargs)
def set_echo_end_check_info(self, end_check_info=None):
""" 设置回显校验信息 """
if end_check_info is not None:
if isinstance(end_check_info, str):
self.__default_end_check_infos.append(end_check_info)
if isinstance(end_check_info, list):
self.__default_end_check_infos = self.__default_end_check_infos + end_check_info
return self.__default_end_check_infos
def __echo(self, timeout=10, interval=0.5):
""" 获取回显 """
t0 = time.time()
echo = ""
while time.time()-t0 < timeout:
if self.__channel.recv_ready() is True:
_echo = self.__channel.recv(65535).decode('utf-8')
echo += _echo
for end_check_info in self.__default_end_check_infos:
if end_check_info in echo:
print(echo)
time.sleep(0.5)
return echo
time.sleep(interval)
self.disconnect()
raise RuntimeError("获取预期回显超时!")
if __name__ == '__main__':
ssh_session = SSH()
ssh_session.set_echo_end_check_info(["Opt>"])
ssh_session.set_echo_end_check_info("MySQL")
ssh_session.connect('192.168.0.7', 22, 'root', '123456')
ssh_session.exec_cmd("10.27.0.7")
ssh_session.exec_cmd("mysql22001")
ssh_session.exec_cmd("quit;")
ssh_session.disconnect()
执行结果:
源码等资料获取方法
各位想获取源码等教程资料的朋友请点赞 + 评论 + 收藏,三连!
三连之后我会在评论区挨个私信发给你们~