Celery是一个基于Python开发的分布式异步消息任务队列,可以轻松的实现任务的异步处理
实例场景:
- 对100台机器执行一条批量命令,可能会花很长时间 ,但不想让你的程序等着结果返回,而是给你返回 一个任务ID,经过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,可以继续做其它的事情。
- 想做一个定时任务,比如每天检测一下所有客户的资料,如果发现今天 是客户的生日,就给他发个短信祝福
Celery基本工作流程图:
Celery 在执行任务时需要通过一个消息中间件来接收和发送任务消息,以及存储任务结果, 一般使用rabbitMQ or Redis。
Celery默认broker是RabbitMQ,仅需一行配置:
broker_url = ‘amqp://guest:guest@localhost:5672//’
也可以使用Redis做broker
安装Celery:pip install celery
由于官方不支持windows安装和使用,需要借助Python第三方模块eventlet
,此模块主要作用通过协程实现并发
pip install eventlet
目录结构
celery对目录要求严格,如果不在目录下加入__init__.py,worker执行任务可能会出现NotRegistered的情况。
创建一个mycelery包:
包下创建tasks.py:
from celery import Celery
app = Celery('tasks', # 可以使用 __name__
broker='redis://:abc123@192.168.1.117:6379/1',
backend='redis://:abc123@192.168.1.117:6379/2')
@app.task
def add(x,y):
print("running ...",x,y)
return x + y
backend是用于返回结果的,broker是接收用户发送的任务的。
redis作为broker的配置:redis://:password@hostname:port/db_number
Redis默认使用的数据库是0号数据库(database 0)。整个Redis实例最多可包含16个数据库,从[0]到[15]。每个Redis实例都有16个数据库,不管它们是否被使用。
启动Celery wirker来开始监听并执行任务
celery -A tasks worker --loglevel=info
这样就启动了一个worker,可以启动多个这个的worker。
调用:python3:
>>>from tasks import add
>>>r = add.delay(5,7)
>>>r.get()
worker相当于一个个服务器,负责执行task任务,如这里的add,即以@app.task装饰的函数,就是实现多个服务器来执行任务,满足性能要求。实现分布式。
堡垒机:
在一个特定的网络环境下,为了保障网络和数据不受来自外部和内部用户的入侵和破坏,而运用各种技术手段监控和记录运维人员对网络内的服务器、网络设备、安全设备、数据库等设备的操作行为,以便集中报警、及时处理及审计定责。
实际工作中存在的各种问题(业务需求):
1、权限管理;权限分配混乱
2、用户行为审计;共用一套root账户。
堡垒机系统:齐治堡垒机。
架构及功能需求:
1、登录功能;2、账号管理;3、身份认证;4、资源授权;5、访问控制;6、操作审计
用户登录堡垒机,由堡垒机负责与各业务系统连接,并将连接返回给用户,堡垒机可以做各种权限限制管理和行为审计工作。
与各业务系统连接,一般不是使用图形界面,都是基于SSH的连接,Python实现的SSH2远程安全连接有一个模块,叫做Paramiko,支持认证及秘钥方式。可以实现远程命令执行、文件传输、之间SSH代理等功能,封装的层次更高,更贴近SSH协议的功能。
如下代码:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# author heaven
import paramiko
#创建ssh对象
ssh = paramiko.SSHClient()
#允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
#链接服务器
ssh.connect(hostname='192.168.138.131',port=2704,username='user',password='pwsword')
while True:
try:
yourinput = input("请输入你要执行的命令: ").strip();
stdin,stdout,stderr = ssh.exec_command(yourinput);
if len(yourinput) == 0:
continue;
result = stdout.read().decode();
print(result);
except KeyboardInterrupt as e: #注意此处异常的写法,python2与python3中不一样,python3中必须用as
print('你已经退出登录! {}'.format(e));
break
#关闭链接
ssh.close()
通过这个模块,就能自动登录,并捕获用户的操作,进行记录以实现行为审计。
堡垒机的作用就是提供连接,这个链接可以是用户无需知道用户以及密码,并记录用户操作的系统。
系统的好坏,关键在于这个链接提供的操作界面的用户体验,如果能实现原系统,如linux的bash的体验,就是一个成功的系统。jumpserver,一个基于paramiko的类似系统。
基于Django的堡垒机设计:
创建一个Django项目,设计表结构:
Host表:保存主机;HostGroup表 :主机组;UserProfile表 : 堡垒机账号;HostUser表:主机登录账户。
用户登录堡垒机,就使用Django的用户管理,即Web用户验证,登录成功后,找到此用户关联的可登陆主机、登录用户和密码,然后以此用户、密码远程登录主机,将登录连接返回给用户,使其可以使用主机。
堡垒机用户交互程序:使用原生的ssh。
Python中使用subprocess模块
login_cmd = sshpass -p password ssh username@hostname(ip) -o "StrictHostKeyChecking no"
调用原生的ssh,很可能需要进行交互,而我们的需求是中间不能有交互,所以使用sshpass软件来进行ssh登录,这样,用户、密码都可以在参数中带入,无需交互,ssh的 -o "StrictHostKeyChecking no" 参数,跳过rsa等认证交互。
上面的命令,可以进行填充了,即password、username、hostname都可以从数据库中取到填充上。
然后:ssh_instance = subprocess.run(login_cmd,shell=True)
登录堡垒机时,直接进入选择管理机器程序,不能运行其他程序,更不能结束这个进程进入堡垒机的shell,确保堡垒机的安全:
修改堡垒机登录用户的.bashrc,确保一登录就运行程序,退出程序就退出堡垒机登录。
.bashrc:
xxxxxx......
python3 /usr/local/baoleiji.py
logout
审计:
首先是获取操作内容,linux中的strace-ttt -f -p pid -o FILENAME,可以跟踪输入操作,即键盘的操作,对文件进行分析整理,可以得到用户的操作。
在堡垒机上,跟踪每个登录的用户启动的连接进程。问题是如何获取进程号。
修改了ssh源代码,增加了参数选项,可以使启动的ssh进程带上一个特别参数和一串字符,以此来标识每个ssh进程。
审计需要的日志文件字段:堡垒机用户,登录的主机,主机用户,登录时间,操作
以上操作均是在堡垒机上,登录堡垒机,就启动了python3 /usr/local/baoleiji.py,然后登录管理的主机,也是在堡垒机上启动ssh,堡垒机上对这个进程跟踪,获取操作,保存于日志中。
缺陷:对于上传文件,看不到上传文件的内容,是安全隐患。可以禁止文件的上传。
批量任务:批量命令执行、批量文件上传、批量文件下载;(这里的批量,是同时对多个远端主机执行相同的任务)
批量命令:
使用了paramiko.SSHClient()进行批量命令的执行
s = paramiko.SSHClient()
s.connect(ip_addr,port,username,password,timeout=5) # 使用密码登录执行
######
# key = paramiko.RSAKey.from_private_key_file(RSA_PRIVATE_KEY_FILE)
# s.connect(ip_addr,port,username,pkey=key,timeout=5)
# 使用key方式登录执行
stdin,stdout,stderr = s.exec_command(cmd)
result = stdout.read(),stderr.read()
自动发布项目管理实现:(持续集成,自动完成项目的测试编译打包部署等过程。jekins,bamboo等)
TaskPlan 任务计划
Stage1 :
Job:1
1 、登录指定测试机器
2、从git下载最新代码
Job2:
3、build编译
4、打包发布
5、运行,自动测试脚本
Stage 2:
Job1 :
。。。
Job2:
。。。
设计models:
Plan()、Stage()、Job()、SSHTask()——SSH类任务的单独表、SCPTask()——scp类任务表、
类似功能系统:saltstack,ansible、puppet
CMDB:配置管理数据库系统 Configuration Management DataBase
对单位的 资产进行管理,管理的内容主要涉及
SN、hostname,ip,机房,机柜,配置,提供的服务等(软件资产)。
资产数据准确性、减少人工干预、资产自动汇报、硬件变更自动更新数据库。
自动化运维基石。
类似系统:nagios、zabbix、ganglia、open-falcon、cacti
ITIL —— Information Technology Infrastructure Library,信息技术基础架构库
事件管理、问题管理、配置管理、变更管理、发布管理。其中配置管理是核心
资产信息收集:
windows系统:
wmiobj = wmi.WMI()来获取windows系统的相关信息,如:
wmiobj.Win32_Processor() # 获取cpu信息
相应的还有其他获取如磁盘、内存、网卡等信息的方法,以Win32_开头的一系列方法。
获取到相关数据后,传递给管理服务器,保存到对应数据库中,作为资产进行管理。
需要注意的,如果是以前保存过的资产,需要在本地保存一个标识ID,传递的同时读取这个ID一同提交,新资产则需要从管理服务器获取ID,保存到本地,以此来区分是进行更新还是添加。
api接口认证:在服务器和客户端保存一个token值,传递过程中,使用用户名+时间戳+token+其他,然后进行MD5,传递这个MD5值进行认证。
RESTful api:Representational State Transfer,表现层状态转化
资源:Resources —— URI
表现层:Representation,相当于Content-Type
状态转化:State Transfer,GET、POST、PUT、DELETE
RESTful架构:
1、每一个URI代表一种资源;
2、客户端和服务器之间,传递这种资源的某种表现层;
3、客户端通过四个HTTP动词,对服务器资源进行操作,实现“表现层状态转化”
Django RESTful framwork
以上内容涉及linux,自己在虚拟机上捣鼓了很长时间,也没将环境搭建好。下一步主攻linux。