文章目录
- 1、简介
- 1.1 功能列表
- 1.2 支持平台
- 1.3 安装
- 2、gevent入门示例
- 2.1 文件IO
- 2.2 MySQL
- 2.3 redis
- 2.4 time
- 2.5 requests
- 2.6 socket
- 2.7 并发抓取文字
- 2.8 并发抓取图片
- 2.9 生产者 - 消费者
- 3、gevent其他示例
- 3.1 StreamServer
- 3.2 WSGI server
- 3.3 flask
- 3.4 websocket
- 3.5 udp
- 结语
1、简介
官网地址:
https://www.gevent.org/
https://github.com/gevent/gevent
gevent 是一个基于协程的 Python 网络库,它使用 greenlet 在 libev 或 libuv 事件循环之上提供高级同步 API。
1.1 功能列表
- 基于 libev 或 libuv 的快速事件循环。
- 基于绿粒的轻量级执行单元。
- 重用 Python 标准库中概念的 API(用于 示例有事件和队列)。
- 支持 SSL 的协作套接字
- 通过线程池执行的合作 DNS 查询, DNSpython,或C-ARES。
- 猴子补丁实用程序,让第三方模块变得合作
- TCP/UDP/HTTP 服务器
- 子进程支持(通过 gevent.subprocess)
- 线程池
1.2 支持平台
gevent在Windows,macOS和Linux上进行了测试,应该可以在大多数上运行。 其他类Unix操作系统(如FreeBSD、Solaris等)
1.3 安装
pip install gevent
- ModuleNotFoundError: No module named ‘gevent.wsgi’
gevent.wsgi模块已被弃用, 并在gevent 1.3发布时被删除 。 它的替代品是gevent.pywsgi模块,它已经存在了一段时间。
from gevent.wsgi import WSGIServer
改为:
from gevent.pywsgi import WSGIServer
2、gevent入门示例
2.1 文件IO
from gevent import monkey
monkey.patch_all()
import gevent
import os
import logging
logging.basicConfig(level=logging.DEBUG,format= "%(asctime)s - %(levelname)s - %(message)s")
def func(fn):
logging.info("func: start " + fn)
with open(fn, "w") as f:
f.write("*"*100000000)
with open(fn) as f:
print(len(f.read()))
logging.info("func: end " + fn)
gevent.sleep(0.1)
g1 = gevent.spawn(func, "text1")
g2 = gevent.spawn(func, "text2")
g3 = gevent.spawn(func, "text3")
g1.join()
g2.join()
g3.join()
gevent里文件IO操作是不做切换的。可以试着用gevent.fileobject.FileObjectThread 来包装 open 返回的文件对象;
2.2 MySQL
from gevent import monkey
monkey.patch_all()
import gevent
import os
import MySQLdb
import logging
logging.basicConfig(level=logging.DEBUG,format= "%(asctime)s - %(levelname)s - %(message)s")
def func(no, name):
logging.info("func: start " + no)
conn = MySQLdb.connect(host="localhost",user="root",passwd="root",db="employees")
cur = conn.cursor()
cur.execute("insert into departments (dept_no, dept_name) values(%s, %s)", (no, name,))
conn.commit()
logging.info("func: end " + no)
gevent.sleep(1)
g1 = gevent.spawn(func, "a001", "test1")
g2 = gevent.spawn(func, "a002", "test2")
g3 = gevent.spawn(func, "a003", "test3")
gevent.joinall([g1, g2, g3])
MySQL是阻塞的,因为,MySQL是用C写的,patch的socket补丁,并不生效。
2.3 redis
from gevent import monkey
monkey.patch_all()
import gevent
import logging
logging.basicConfig(level=logging.DEBUG,format= "%(asctime)s - %(levelname)s - %(message)s")
import redis
r = redis.Redis(host="localhost",port=6379)
def func(key):
logging.info("func: start " + key)
v = r.get(key)
logging.info("func: end " + key)
gevent.sleep(0.1)
g1 = gevent.spawn(func, "a001")
g2 = gevent.spawn(func, "a002")
g3 = gevent.spawn(func, "a003")
gevent.joinall([g1, g2, g3])
monkey.patch_all将socket变成非阻塞了,那么进行redis操作请求,也会建立socket连接,自然也是非阻塞的。
2.4 time
from gevent import monkey
monkey.patch_all()
import gevent
import logging
logging.basicConfig(level=logging.DEBUG,format= "%(asctime)s - %(levelname)s - %(message)s")
import time
def func(key):
logging.info("func: start " + key)
time.sleep(3)
logging.info("func: end " + key)
gevent.sleep(0.1)
g1 = gevent.spawn(func, "a001")
g2 = gevent.spawn(func, "a002")
g3 = gevent.spawn(func, "a003")
gevent.joinall([g1, g2, g3])
Monkey.patch_all会将time库也变成非阻塞的,也就是说monkey.patch_all之后,time.sleep等同等于gevent.sleep。
2.5 requests
from gevent import monkey
monkey.patch_all()
import gevent
import logging
logging.basicConfig(level=logging.DEBUG,format= "%(asctime)s - %(levelname)s - %(message)s")
import requests
def func(url):
logging.info("func: start " + url)
requests.get(url, timeout=3)
logging.info("func: end " + url)
gevent.sleep(0.1)
g1 = gevent.spawn(func, "http://www.bing.com")
g2 = gevent.spawn(func, "http://www.baidu.com")
g3 = gevent.spawn(func, "http://www.google.com")
gevent.joinall([g1, g2, g3])
- or
from gevent import monkey; monkey.patch_all()
import gevent
import requests
def get_url(url):
res = requests.get(url)
print(url, res.status_code, len(res.text))
url_l = [
'http://www.baidu.com',
'http://www.python.org',
'http://www.cnblogs.com'
]
g_l = []
for i in url_l:
g_l.append(gevent.spawn(get_url, i))
gevent.joinall(g_l)
- or
# -*- coding: utf-8 -*-
from gevent import monkey;
monkey.patch_all()
import gevent
import requests
from datetime import datetime
def func(url):
print(f'time: {datetime.now()}, GET: {url}')
resp = requests.get(url)
print(f'time: {datetime.now()}, {len(resp.text)} bytes received from {url}.')
gevent.joinall([
gevent.spawn(func, 'https://www.python.org/'),
gevent.spawn(func, 'https://www.yahoo.com/'),
gevent.spawn(func, 'https://github.com/'),
2.6 socket
import gevent
from gevent import socket
urls = ['www.baidu.com', 'www.example.com', 'www.python.org']
jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]
gevent.joinall(jobs, timeout=2)
result = [job.value for job in jobs]
print(result)
gevent.socket.gethostbyname() 函数与标准的socket.gethotbyname() 有相同的接口,但它不会阻塞整个解释器,因此会使得其他的 greenlets 跟随着无阻的请求而执行。
2.7 并发抓取文字
from gevent import monkey
monkey.patch_all()
import requests
import gevent
import io
import sys
import logging
logging.basicConfig(level=logging.DEBUG,format= "%(asctime)s - %(levelname)s - %(message)s")
# 解决console显示乱码的编码问题
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
class Douban(object):
"""A class containing interface test method of Douban object"""
def __init__(self):
self.host = 'movie.douban.com'
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:61.0) Gecko/20100101 Firefox/61.0',
'Referer': 'https://movie.douban.com/',
}
def get_response(self, url, data):
resp = requests.post(url=url, data=data, headers=self.headers).content.decode('utf-8')
return resp
def test_search_tags_movie(self):
logging.info("func: start ")
method = 'search_tags'
url = 'https://%s/j/%s' % (self.host, method)
post_data = {
'type': 'movie',
'source': 'index'
}
resp = self.get_response(url=url, data=post_data)
logging.info("func: end " + resp)
return resp
if __name__ == '__main__':
douban = Douban()
jobs = []
for i in range(6):
job = gevent.spawn(douban.test_search_tags_movie)
jobs.append(job)
gevent.joinall(jobs)
可以。
2.8 并发抓取图片
from gevent import monkey
monkey.patch_all()
import requests
import gevent
from lxml import etree
import logging
logging.basicConfig(level=logging.DEBUG,format= "%(asctime)s - %(levelname)s - %(message)s")
def downloader(img_name, img_url):
logging.info("downloader: " + img_name + ", " + img_url)
req = requests.get(img_url)
img_content = req.content
with open(img_name, "wb") as f:
f.write(img_content)
def main():
r = requests.get('https://huaban.com/')
if r.status_code == 200:
img_src_xpath = '//img/@src'
s_html = etree.HTML(text=r.text)
all_img_src = s_html.xpath(img_src_xpath)
count = 0
for img_src in all_img_src:
count += 1
url = img_src
gevent.joinall(
[gevent.spawn(downloader, f"{count}.png", url), ]
)
if __name__ == '__main__':
main()
可以。
2.9 生产者 - 消费者
from gevent import monkey
monkey.patch_all()
from gevent.queue import Queue
import gevent
import random
task_queue = Queue(3)
def producer(index=1):
while True:
print(f'生产者 [{index}]', end='')
item = random.randint(0, 99)
task_queue.put(item)
print(f"生产 ---> {item}")
def consumer(index=1):
while True:
print(f'消费者 [{index}]', end='')
item = task_queue.get()
print(f"消费 ---> {item}")
def main():
job1 = gevent.spawn(producer)
job2 = gevent.spawn(consumer)
job3 = gevent.spawn(consumer, 2)
thread_list = [job1, job2, job3]
gevent.joinall(thread_list)
if __name__ == '__main__':
main()
- or
import gevent
from gevent.queue import Queue
tasks = Queue()
def worker(n):
while not tasks.empty():
task = tasks.get()
print('Worker %s got task %s' % (n, task))
gevent.sleep(0)
print('Quitting time!')
def boss():
for i in range(1, 25):
tasks.put_nowait(i)
gevent.spawn(boss).join()
gevent.joinall([
gevent.spawn(worker, 'steve'),
gevent.spawn(worker, 'john'),
gevent.spawn(worker, 'nancy'),
])
3、gevent其他示例
3.1 StreamServer
from gevent.server import StreamServer
def handle(socket, address):
socket.send("Hello from a telnet!\n")
for i in range(5):
socket.send(str(i) + '\n')
socket.close()
server = StreamServer(('127.0.0.1', 5000), handle)
server.serve_forever()
3.2 WSGI server
Gevent为HTTP内容服务提供了两种WSGI server。
- gevent.wsgi.WSGIServer(gevent.wsgi模块已被弃用, 并在gevent 1.3发布时被删除 。)
- gevent.pywsgi.WSGIServer
3.3 flask
from gevent.pywsgi import WSGIServer
from flask import Flask
app = Flask(__name__)
@app.route('/',methods=['GET'])
def home():
return 'hello 爱看书的小沐!'
if __name__ == "__main__":
WSGIServer(('127.0.0.1',5000),app).serve_forever()
- or
from gevent import monkey
monkey.patch_all()
from flask import Flask
app = Flask( __name__ )
@app.route( '/')
def hello():
return 'Hello World, 爱看书的小沐!'
if __name__ == '__main__':
from gevent import pywsgi
server = pywsgi.WSGIServer( ('127.0.0.1', 5000 ), app )
server.serve_forever()
- or
#!/usr/bin/python
"""WSGI server example"""
from gevent.pywsgi import WSGIServer
def application(env, start_response):
if env['PATH_INFO'] == '/':
start_response('200 OK', [('Content-Type', 'text/html')])
return [b"<b>hello world</b>"]
start_response('404 Not Found', [('Content-Type', 'text/html')])
return [b'<h1>Not Found</h1>']
if __name__ == '__main__':
print('Serving on 8088...')
WSGIServer(('127.0.0.1', 5000), application).serve_forever()
3.4 websocket
- ws_server.py
# Simple gevent-websocket server
import json
import random
from gevent import pywsgi, sleep
from geventwebsocket.handler import WebSocketHandler
class WebSocketApp(object):
'''Send random data to the websocket'''
def __call__(self, environ, start_response):
ws = environ['wsgi.websocket']
x = 0
while True:
data = json.dumps({'x': x, 'y': random.randint(1, 5)})
ws.send(data)
x += 1
sleep(0.5)
server = pywsgi.WSGIServer(
("127.0.0.1", 9090), WebSocketApp(),
handler_class=WebSocketHandler
)
server.serve_forever()
- ws_client.html
<html>
<head>
<title>Minimal websocket application</title>
<script type="text/javascript" src="https://cdn.staticfile.org/jquery/1.10.2/jquery.min.js"></script>
<script type="text/javascript">
$(function() {
// Open up a connection to our server
var ws = new WebSocket("ws://localhost:9090/");
// What do we do when we get a message?
ws.onmessage = function(evt) {
$("#placeholder").append('<p>' + evt.data + '</p>')
}
// Just update our conn_status field with the connection status
ws.onopen = function(evt) {
$('#conn_status').html('<b>Connected</b>');
}
ws.onerror = function(evt) {
$('#conn_status').html('<b>Error</b>');
}
ws.onclose = function(evt) {
$('#conn_status').html('<b>Closed</b>');
}
});
</script>
</head>
<body>
<h1>WebSocket Example</h1>
<div id="conn_status">Not Connected</div>
<div id="placeholder" style="width:600px;height:300px;"></div>
</body>
</html>
3.5 udp
- udp_server.py:
# Copyright (c) 2012 Denis Bilenko. See LICENSE for details.
"""A simple UDP server.
For every message received, it sends a reply back.
You can use udp_client.py to send a message.
"""
from gevent.server import DatagramServer
class EchoServer(DatagramServer):
def handle(self, data, address): # pylint:disable=method-hidden
print('%s: got %r' % (address[0], data))
self.socket.sendto(('Received %s bytes' % len(data)).encode('utf-8'), address)
if __name__ == '__main__':
print('Receiving datagrams on :9000')
EchoServer(':9000').serve_forever()
- udp_client.py:
"""Send a datagram to localhost:9000 and receive a datagram back.
Usage: python udp_client.py MESSAGE
Make sure you're running a UDP server on port 9001 (see udp_server.py).
There's nothing gevent-specific here.
"""
from __future__ import print_function
import sys
from gevent import socket
address = ('127.0.0.1', 9001)
message = ' '.join(sys.argv[1:])
sock = socket.socket(type=socket.SOCK_DGRAM)
sock.connect(address)
print('Sending %s bytes to %s:%s' % ((len(message), ) + address))
sock.send(message.encode())
data, address = sock.recvfrom(8192)
print('%s:%s: got %r' % (address + (data, )))
sock.close()
结语
如果您觉得该方法或代码有一点点用处,可以给作者点个赞,或打赏杯咖啡;
╮( ̄▽ ̄)╭
如果您感觉方法或代码不咋地
//(ㄒoㄒ)//,就在评论处留言,作者继续改进;
o_O???
如果您需要相关功能的代码定制化开发,可以留言私信作者;
(✿◡‿◡)
感谢各位大佬童鞋们的支持!
( ´ ▽´ )ノ ( ´ ▽´)っ!!!