Python高性能编程

news2024/12/23 22:13:56

 一、进程池和线程池

1.串行

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

import time

import requests

url_lists = [

    'http://www.baidu.com',

    'http://fanyi.baidu.com',

    'http://map.baidu.com',

    'http://music.baidu.com/',

    'http://tieba.baidu.com',

    'http://v.baidu.com',

    'http://image.baidu.com',

    'http://zhidao.baidu.com',

    'http://news.baidu.com',

    'http://xueshu.baidu.com']

start_time = time.time()

for url in url_lists:

    response = requests.get(url)

    print(response.text)

print("Runtime: {}".format(time.time()-start_time))

# Runtime: 1.95

  

2.多进程

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

import time

import requests

from multiprocessing import Process

url_lists = [

    'http://www.baidu.com',

    'http://fanyi.baidu.com',

    'http://map.baidu.com',

    'http://music.baidu.com/',

    'http://tieba.baidu.com',

    'http://v.baidu.com',

    'http://image.baidu.com',

    'http://zhidao.baidu.com',

    'http://news.baidu.com',

    'http://xueshu.baidu.com']

def task(url):

    response = requests.get(url)

    print(response.text)

if __name__ == '__main__':

    p_list = []

    start_time = time.time()

    for url in url_lists:

        p = Process(target=task, args=(url,))

        p_list.append(p)

        p.start()

    for in p_list:

        p.join()

    print("Runtime: {}".format(time.time() - start_time))

# Runtime: 1.91

 

3.进程池(1)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

import time

import requests

from concurrent.futures import ProcessPoolExecutor

"""

Py2里    没有线程池   但是有进程池

Py3里    有线程池     有进程池

"""

url_lists = [

    'http://www.baidu.com',

    'http://fanyi.baidu.com',

    'http://map.baidu.com',

    'http://music.baidu.com/',

    'http://tieba.baidu.com',

    'http://v.baidu.com',

    'http://image.baidu.com',

    'http://zhidao.baidu.com',

    'http://news.baidu.com',

    'http://xueshu.baidu.com']

def task(url):

    response = requests.get(url)

    print(response.content)

if __name__ == '__main__':

    start_time = time.time()

    pool = ProcessPoolExecutor(10)

    for url in url_lists:

        pool.submit(task,url)

    pool.shutdown(wait=True)

    print("Runtime: {}".format(time.time() - start_time))

# Runtime: 2.00

  

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

# 进程池 + 回调函数

import time

import requests

from concurrent.futures import ProcessPoolExecutor

"""

Py2里    没有线程池   但是有进程池

Py3里    有线程池     有进程池

"""

url_lists = [

    'http://www.baidu.com',

    'http://fanyi.baidu.com',

    'http://map.baidu.com',

    'http://music.baidu.com/',

    'http://tieba.baidu.com',

    'http://v.baidu.com',

    'http://image.baidu.com',

    'http://zhidao.baidu.com',

    'http://news.baidu.com',

    'http://xueshu.baidu.com']

def task(url):

    response = requests.get(url)

    return response.content

def callback(future):

    print(future.result())

if __name__ == '__main__':

    start_time = time.time()

    pool = ProcessPoolExecutor(10)

    for url in url_lists:

        v = pool.submit(task,url)

        v.add_done_callback(callback)

    pool.shutdown(wait=True)

    print("Runtime: {}".format(time.time() - start_time))

  

3.进程池(2)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

import time

import requests

from  multiprocessing import Pool

url_lists = [

    'http://www.baidu.com',

    'http://fanyi.baidu.com',

    'http://map.baidu.com',

    'http://music.baidu.com/',

    'http://tieba.baidu.com',

    'http://v.baidu.com',

    'http://image.baidu.com',

    'http://zhidao.baidu.com',

    'http://news.baidu.com',

    'http://xueshu.baidu.com']

def task(url):

    response = requests.get(url)

    return response.content

def callBackFunc(content):

    print(content)

if __name__ == '__main__':

    start_time = time.time()

    pool = Pool(10)

    for url in url_lists:

        pool.apply_async(func=task,args=(url,),callback=callBackFunc)

    pool.close()

    pool.join()

    print("Runtime: {}".format(time.time() - start_time))

# Runtime: 1.96

 

2019-03-06 補充

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

# Parallel 底层調用的是 multiprocessing

import time

from joblib import Parallel, delayed

def func(idx):

    print(idx)

    time.sleep(1)

    return {'idx':idx}

start_ts = time.time()

results = Parallel(-1)(

    delayed(func)(x) for in range(4)

)

print(results)

print('Runtime : {}'.format(time.time()-start_ts))

4.多线程

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

import time

import requests

from threading import Thread

url_lists = [

    'http://www.baidu.com',

    'http://fanyi.baidu.com',

    'http://map.baidu.com',

    'http://music.baidu.com/',

    'http://tieba.baidu.com',

    'http://v.baidu.com',

    'http://image.baidu.com',

    'http://zhidao.baidu.com',

    'http://news.baidu.com',

    'http://xueshu.baidu.com']

def task(url):

    response = requests.get(url)

    print(response.text)

if __name__ == '__main__':

    t_list = []

    start_time = time.time()

    for url in url_lists:

        t = Thread(target=task, args=(url,))

        t_list.append(t)

        t.start()

    for in t_list:

        t.join()

    print("Runtime: {}".format(time.time() - start_time))

# Runtime: 0.49

  

5.线程池

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

import time

import requests

from concurrent.futures import ThreadPoolExecutor

"""

Py2里    没有线程池   但是有进程池

Py3里    有线程池     有进程池

"""

url_lists = [

    'http://www.baidu.com',

    'http://fanyi.baidu.com',

    'http://map.baidu.com',

    'http://music.baidu.com/',

    'http://tieba.baidu.com',

    'http://v.baidu.com',

    'http://image.baidu.com',

    'http://zhidao.baidu.com',

    'http://news.baidu.com',

    'http://xueshu.baidu.com']

def task(url):

    response = requests.get(url)

    print(response.content)

if __name__ == '__main__':

    start_time = time.time()

    pool = ThreadPoolExecutor(10)

    for url in url_lists:

        pool.submit(task,url)

    pool.shutdown(wait=True)

    print("Runtime: {}".format(time.time() - start_time))

# Runtime: 0.51

  

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

# 线程池 + 回调函数

import time

import requests

from concurrent.futures import ThreadPoolExecutor

"""

Py2里    没有线程池   但是有进程池

Py3里    有线程池     有进程池

"""

url_lists = [

    'http://www.baidu.com',

    'http://fanyi.baidu.com',

    'http://map.baidu.com',

    'http://music.baidu.com/',

    'http://tieba.baidu.com',

    'http://v.baidu.com',

    'http://image.baidu.com',

    'http://zhidao.baidu.com',

    'http://news.baidu.com',

    'http://xueshu.baidu.com']

def task(url):

    response = requests.get(url)

    return response.content

def callback(future):

    print(future.result())

if __name__ == '__main__':

    start_time = time.time()

    pool = ThreadPoolExecutor(10)

    for url in url_lists:

        v = pool.submit(task,url)

        v.add_done_callback(callback)

    pool.shutdown(wait=True)

    print("Runtime: {}".format(time.time() - start_time))

  

二、异步非阻塞

1

2

3

4

5

6

7

"""

异步非阻塞/异步IO

    非阻塞: 不等待

      异步: 回调函数

    本质:一个线程完成并发操作(前提是执行过程中一定得有IO,这样才能让线程空闲出来去执行下一个任务)

"""

1.asyncio示例1

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

import asyncio

@asyncio.coroutine

def func1():

    print('before...func1......')

    yield from asyncio.sleep(2)

    print('end...func1......')

@asyncio.coroutine

def func2():

    print('before...func2......')

    yield from asyncio.sleep(1)

    print('end...func2......')

@asyncio.coroutine

def func3():

    print('before...func3......')

    yield from asyncio.sleep(3)

    print('end...func3......')

tasks = [func1(), func2(), func3()]

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.gather(*tasks))

loop.close()

### 结果 ###

before...func3......

before...func2......

before...func1......

end...func2......

end...func1......

end...func3......

2.asyncio示例2

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

####################################################################################

# async/await 是 python3.5中新加入的特性,将异步从原来的yield 写法中解放出来,变得更加直观;

# async 写在def前,替代了装饰器@asyncio.coroutine;await 替换了yield from;

####################################################################################

import asyncio

async def func1():

    print('before...func1......')

    await asyncio.sleep(2)

    print('end...func1......')

async def func2():

    print('before...func2......')

    await asyncio.sleep(1)

    print('end...func2......')

async def func3():

    print('before...func3......')

    await asyncio.sleep(3)

    print('end...func3......')

tasks = [func1(), func2(), func3()]

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.gather(*tasks))

loop.close()

  

3.asyncio示例3

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

import asyncio

@asyncio.coroutine

def fetch_async(host, url='/'):

    print(host, url)

    reader, writer = yield from asyncio.open_connection(host, 80)

    request_header_content = """GET %s HTTP/1.0\r\nHost: %s\r\n\r\n""" % (url, host,)

    request_header_content = bytes(request_header_content, encoding='utf-8')

    writer.write(request_header_content)

    yield from writer.drain()

    text = yield from reader.read()

    print(host, url, text)

    writer.close()

task_list = [

    fetch_async('www.cnblogs.com''/standby/'),

    fetch_async('www.cnblogs.com''/standby/p/7739797.html'),

    fetch_async('www.cnblogs.com''/wupeiqi/articles/6229292.html')

]

loop = asyncio.get_event_loop()

results = loop.run_until_complete(asyncio.gather(*task_list))

loop.close()

  

4.asyncio+aiohttp示例1

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

import asyncio

import aiohttp

import async_timeout

async def fetch(session, url):

    with async_timeout.timeout(10):

        async with session.get(url) as response:

            return await response.text()

async def fetch_async(url):

    async with aiohttp.ClientSession() as session:

        html = await fetch(session, url)

        print(html)

tasks = [

    fetch_async('https://api.github.com/events'),

    fetch_async('http://aiohttp.readthedocs.io/en/stable/'),

    fetch_async('http://aiohttp.readthedocs.io/en/stable/client.html')]

event_loop = asyncio.get_event_loop()

results = event_loop.run_until_complete(asyncio.gather(*tasks))

event_loop.close()

或者

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

import asyncio

import aiohttp

import async_timeout

async def fetch_async(url):

    async with aiohttp.ClientSession() as session:

        with async_timeout.timeout(10):

            async with session.get(url) as resp:

                print(resp.status)

                print(await resp.text())

tasks = [

    fetch_async('https://api.github.com/events'),

    fetch_async('http://aiohttp.readthedocs.io/en/stable/'),

    fetch_async('http://aiohttp.readthedocs.io/en/stable/client.html')]

event_loop = asyncio.get_event_loop()

results = event_loop.run_until_complete(asyncio.gather(*tasks))

event_loop.close()

5.asyncio+requests示例

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

import asyncio

import requests

@asyncio.coroutine

def fetch_async(func, *args):

    loop = asyncio.get_event_loop()

    future = loop.run_in_executor(None, func, *args)

    response = yield from future

    print(response.url, response.content)

tasks = [

    fetch_async(requests.get'http://aiohttp.readthedocs.io/en/stable/'),

    fetch_async(requests.get'https://api.github.com/events')

]

loop = asyncio.get_event_loop()

results = loop.run_until_complete(asyncio.gather(*tasks))

loop.close()

补充:

1

2

3

4

5

有时候会遇到 RuntimeError: Event loop is closed 这个错误

参考:https://stackoverflow.com/questions/45600579/asyncio-event-loop-is-closed

在 fetch_async 函数里添加一下语句即可

asyncio.set_event_loop(asyncio.new_event_loop())

6.gevent+requests示例

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

import gevent

import requests

from gevent import monkey

from gevent.pool import Pool

monkey.patch_all()

def fetch_async(method, url, req_kwargs):

    print(method, url, req_kwargs)

    response = requests.request(method=method, url=url, **req_kwargs)

    print(response.url, response.content)

# ##### 发送请求 #####

# gevent.joinall([

#     gevent.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}),

#     gevent.spawn(fetch_async, method='get', url='https://www.yahoo.com/', req_kwargs={}),

#     gevent.spawn(fetch_async, method='get', url='https://github.com/', req_kwargs={}),

#     gevent.spawn(fetch_async, method='get', url='https://api.github.com/events', req_kwargs={}),

# ])

# ##### 发送请求(协程池控制最大协程数量) #####

# pool = Pool(None)

pool = Pool(3)

gevent.joinall([

    pool.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}),

    pool.spawn(fetch_async, method='get', url='https://www.yahoo.com/', req_kwargs={}),

    pool.spawn(fetch_async, method='get', url='https://www.github.com/', req_kwargs={}),

    pool.spawn(fetch_async, method='get', url='https://api.github.com/events', req_kwargs={}),

    pool.spawn(fetch_async, method='get', url='https://www.baidu.com', req_kwargs={}),

    pool.spawn(fetch_async, method='get', url='https://www.ibm.com', req_kwargs={}),

    pool.spawn(fetch_async, method='get', url='https://www.intel.com', req_kwargs={}),

    pool.spawn(fetch_async, method='get', url='https://www.iqiyi.com', req_kwargs={}),

])

使用gevent协程并获取返回值示例:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

def get_single_chart_data_flux(single_data_param):

    import gevent

    from gevent import monkey

    from gevent.pool import Pool as gPool

    monkey.patch_socket()

    ip,port,timestamp,time_length,type_name,subtype_name,filter_str_list,appid,legend = single_data_param

    ModelClass = get_model_class(type_name)

    func = apps.get_app_config('serverdata').service.get_single_chart_data

    pool = gPool(len(filter_str_list))

    func_li = []

    for filter_str in filter_str_list:

        func_li.append(pool.spawn(func,ModelClass,ip,port,timestamp,time_length,subtype_name,filter_str,appid,legend))

    ret_li = gevent.joinall(func_li)

    # view_logger.debug(ret_li[0].get('value'))

    result_li = [{'filter_con':item.get('value')[2], 'value':item.get('value')[1], 'legend':item.get('value')[3]} for item in ret_li]

    return result_li

7.Twisted示例

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

from twisted.web.client import getPage, defer

from twisted.internet import reactor

def all_done(arg):

    reactor.stop()

def callback(contents,url):

    print(url,contents)

deferred_list = []

url_list = [

    'http://www.bing.com',

    'http://www.baidu.com',

    'https://www.python.org',

    'https://www.yahoo.com',

    'https://www.github.com'

]

start_time = time.time()

for url in url_list:

    deferred = getPage(bytes(url, encoding='utf8'))

    deferred.addCallback(callback,url)

    deferred_list.append(deferred)

dlist = defer.DeferredList(deferred_list)

dlist.addBoth(all_done)

reactor.run()

8.Tornado示例

以上均是Python内置以及第三方模块提供异步IO请求模块,使用简便大大提高效率;

而对于异步IO请求的本质则是【非阻塞Socket】+【IO多路复用】 

三、自定义异步非阻塞模块

1.简单示例

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

"""

异步非阻塞/异步IO

    非阻塞: 不等待

      异步: 回调函数

    本质:一个线程完成并发操作(前提是执行过程中一定得有IO,这样才能让线程空闲出来去执行下一个任务)

     

     

IO 多路复用  +  socket

    - IO多路复用:  select epoll 用于检测socket对象是否发生变化(是否连接成功,是否有数据到来)

    - socket  :  socket客户端

     

    - IO请求是不占用CPU的,计算型的才占用CPU

"""

import socket

import select

conn_list = []

input_list = []

for url in range(20):

    client = socket.socket()

    client.setblocking(False)

    try:

        client.connect(('61.135.169.121',80))

    except BlockingIOError as e:

        pass

    conn_list.append(client)

    input_list.append(client)

while True:

    rlist, wlist, errlist = select.select(input_list, conn_list, [], 0.05)

    for sock in wlist:

        sock.sendall(b"GET / HTTP/1.0\r\nHost: www.baidu.com\r\n\r\n")

        conn_list.remove(sock)

    for sock in rlist:

        data = sock.recv(8192)

        sock.close()

        input_list.remove(sock)

        print(data)

    if not input_list:

        break

2.自定义异步非阻塞模块

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

#!/usr/bin/python3.5

# -*- coding:utf-8 -*-

"""

异步非阻塞/异步IO

    非阻塞: 不等待

      异步: 回调函数

    本质:一个线程完成并发操作(前提是执行过程中一定得有IO,这样才能让线程空闲出来去执行下一个任务)

     

     

IO 多路复用  +  socket

    - IO多路复用:  select epoll 用于检测socket对象是否发生变化(是否连接成功,是否有数据到来)

    - socket  :  socket客户端

     

    - IO请求是不占用CPU的,计算型的才占用CPU

"""

import socket

import select

from urllib import parse

class Request():

    def __init__(self,sock,url,callback):

        """

        初始化

        :param sock: client's socket

        :param callback: callback function

        :param url: page url which wanna crawling

        """

        self.sock = sock

        self.url = url

        self.callback = callback

    def fileno(self):

        return self.sock.fileno()

    @property

    def host(self):

        domain = parse.urlparse(self.url)

        return domain.netloc

    @property

    def pathinfo(self):

        domain = parse.urlparse(self.url)

        return domain.path

def async_request(url_list):

    conn_list = []

    input_list = []

    for li in url_list:

        sock = socket.socket()

        sock.setblocking(False)

        obj = Request(sock, li[0], li[1])

        try:

            sock.connect((obj.host,80))

        except BlockingIOError as e:

            pass

        conn_list.append(obj)

        input_list.append(obj)

    while True:

        # 监听socket是否已经发生变化 [request_obj,request_obj....request_obj]

        # 如果有请求连接成功:wlist = [request_obj,request_obj]

        # 如果有响应的数据:  rlist = [request_obj,request_obj....client100]

        rlist, wlist, errlist = select.select(input_list, conn_list, [], 0.05)

        for obj in wlist:

            # print("链接成功,发送请求...")

            # obj.sock.sendall("GET {0} HTTP/1.0\r\nHost: {1}\r\n\r\n".format(obj.pathinfo,obj.host).encode('utf-8'))

            obj.sock.sendall(bytes("GET {0} HTTP/1.0\r\nHost: {1}\r\n\r\n".format(obj.pathinfo,obj.host),encoding='utf-8'))

            conn_list.remove(obj)

        for obj in rlist:

            # print("获取响应...")

            data = obj.sock.recv(8192)

            obj.callback(data)

            obj.sock.close()

            input_list.remove(obj)

        if not input_list:

            break

if __name__ == '__main__':

    def callback1(data):

        print("cnblogs...", data)

    def callback2(data):

        print("csdn...", data)

    def callback3(data):

        print("tornadoweb...", data)

    url_list = [

        ['http://www.cnblogs.com/standby/p/7589055.html', callback1],

        ['http://www.cnblogs.com/wupeiqi/articles/6229292.html', callback1],

        ['http://blog.csdn.net/vip_wangsai/article/details/51997882', callback2],

        ['http://blog.csdn.net/hjhmpl123/article/details/53378068', callback2],

        ['http://blog.csdn.net/zcc_0015/article/details/50688145', callback2],

        ['http://www.tornadoweb.org/en/stable/guide.html', callback3],

        ['http://www.tornadoweb.org/en/stable/guide/async.html', callback3],

        ['http://www.tornadoweb.org/en/stable/guide/coroutines.html#python-3-5-async-and-await', callback3]

    ]

    async_request(url_list)

  

3.牛逼的异步IO模块

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

import select

import socket

import time

class AsyncTimeoutException(TimeoutError):

    """

    请求超时异常类

    """

    def __init__(self, msg):

        self.msg = msg

        super(AsyncTimeoutException, self).__init__(msg)

class HttpContext(object):

    """封装请求和相应的基本数据"""

    def __init__(self, sock, host, port, method, url, data, callback, timeout=5):

        """

        sock: 请求的客户端socket对象

        host: 请求的主机名

        port: 请求的端口

        port: 请求的端口

        method: 请求方式

        url: 请求的URL

        data: 请求时请求体中的数据

        callback: 请求完成后的回调函数

        timeout: 请求的超时时间

        """

        self.sock = sock

        self.callback = callback

        self.host = host

        self.port = port

        self.method = method

        self.url = url

        self.data = data

        self.timeout = timeout

        self.__start_time = time.time()

        self.__buffer = []

    def is_timeout(self):

        """当前请求是否已经超时"""

        current_time = time.time()

        if (self.__start_time + self.timeout) < current_time:

            return True

    def fileno(self):

        """请求sockect对象的文件描述符,用于select监听"""

        return self.sock.fileno()

    def write(self, data):

        """在buffer中写入响应内容"""

        self.__buffer.append(data)

    def finish(self, exc=None):

        """在buffer中写入响应内容完成,执行请求的回调函数"""

        if not exc:

            response = b''.join(self.__buffer)

            self.callback(self, response, exc)

        else:

            self.callback(self, None, exc)

    def send_request_data(self):

        content = """%s %s HTTP/1.0\r\nHost: %s\r\n\r\n%s""" % (

            self.method.upper(), self.url, self.host, self.data,)

        return content.encode(encoding='utf8')

class AsyncRequest(object):

    def __init__(self):

        self.fds = []

        self.connections = []

    def add_request(self, host, port, method, url, data, callback, timeout):

        """创建一个要请求"""

        client = socket.socket()

        client.setblocking(False)

        try:

            client.connect((host, port))

        except BlockingIOError as e:

            pass

            # print('已经向远程发送连接的请求')

        req = HttpContext(client, host, port, method, url, data, callback, timeout)

        self.connections.append(req)

        self.fds.append(req)

    def check_conn_timeout(self):

        """检查所有的请求,是否有已经连接超时,如果有则终止"""

        timeout_list = []

        for context in self.connections:

            if context.is_timeout():

                timeout_list.append(context)

        for context in timeout_list:

            context.finish(AsyncTimeoutException('请求超时'))

            self.fds.remove(context)

            self.connections.remove(context)

    def running(self):

        """事件循环,用于检测请求的socket是否已经就绪,从而执行相关操作"""

        while True:

            r, w, e = select.select(self.fds, self.connections, self.fds, 0.05)

            if not self.fds:

                return

            for context in r:

                sock = context.sock

                while True:

                    try:

                        data = sock.recv(8096)

                        if not data:

                            self.fds.remove(context)

                            context.finish()

                            break

                        else:

                            context.write(data)

                    except BlockingIOError as e:

                        break

                    except TimeoutError as e:

                        self.fds.remove(context)

                        self.connections.remove(context)

                        context.finish(e)

                        break

            for context in w:

                # 已经连接成功远程服务器,开始向远程发送请求数据

                if context in self.fds:

                    data = context.send_request_data()

                    context.sock.sendall(data)

                    self.connections.remove(context)

            self.check_conn_timeout()

if __name__ == '__main__':

    def callback_func(context, response, ex):

        """

        :param context: HttpContext对象,内部封装了请求相关信息

        :param response: 请求响应内容

        :param ex: 是否出现异常(如果有异常则值为异常对象;否则值为None)

        :return:

        """

        print(context, response, ex)

    obj = AsyncRequest()

    url_list = [

        {'host''www.google.com''port': 80, 'method''GET''url''/''data''''timeout': 5,

         'callback': callback_func},

        {'host''www.baidu.com''port': 80, 'method''GET''url''/''data''''timeout': 5,

         'callback': callback_func},

        {'host''www.bing.com''port': 80, 'method''GET''url''/''data''''timeout': 5,

         'callback': callback_func},

    ]

    for item in url_list:

        print(item)

        obj.add_request(**item)

    obj.running()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/385911.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

项目实战典型案例13——学情页面逻辑问题

学情页面逻辑问题一&#xff1a;背景介绍二&#xff1a;学情页面逻辑问题分析逻辑问题缓存滥用的问题三&#xff1a;LocalStorage基础知识数据结构特性应用场景localStorage常用方法四&#xff1a;总结一&#xff1a;背景介绍 本篇博客是对项目开发中出现的学情页面逻辑问题进…

buu [INSHack2017]rsa16m 1

题目描述&#xff1a; 打开的 rsa_16m 文件 &#xff1a; &#xff08;在此我只想说神人才找得到 c 的位置&#xff09; &#xff0c;这位置是真的难找啊 题目分析&#xff1a; 首先打开 description.md 文件&#xff0c;得到&#xff1a; 翻译下来&#xff1a; 当您需要真正…

青岛诺凯达机械盛装亮相2023济南生物发酵展,3月与您相约

BIO CHINA生物发酵展&#xff0c;作为生物发酵产业一年一度行业盛会&#xff0c;由中国生物发酵产业协会主办&#xff0c;上海信世展览服务有限公司承办&#xff0c;2023第10届国际生物发酵展&#xff08;济南&#xff09;于2023年3月30-4月1日在山东国际会展中心&#xff08;济…

王道C语言督学营OJ练习全解【24考研最新版】

前言 本篇博客是在博主参加王道408专业课前置课程-----C语言督学营的学习笔记&#xff0c;包含了从第一节课到最后一节课的所有OJ习题题解&#xff0c;文章中每一题都给出了详尽的代码&#xff0c;并在每一题的关键部位加上了注释&#xff0c;记录下来的目的是方便自己以后进行…

ElasticSearch从0到1——基础知识

1.ES是什么&#xff1f; 是一个开源的高扩展的分布式全文检索引擎&#xff0c;它可以近乎实时的存储、检索数据&#xff1b;本身扩展性很好&#xff0c;可以扩展到上百台服务器&#xff0c;处理PB级别的数据使用Java开发并使用Lucene作为其核心来实现所有索引和搜索的功能&…

【ElasticSearch8.X】学习笔记(二)

【ElasticSearch8.X】学习笔记四、基础操作4.1、索引操作4.1.1、创建索引4.1.2、查询指定索引4.1.3、查询所有索引4.1.4、 删除索引4.2、文档操作4.2.1、创建文档4.2.2、查询文档4.2.3、修改文档4.2.4、删除文档4.2.5、查询所有文档4.3、数据搜索4.3.1、匹配查询文档4.3.2、匹配…

LeetCode题目笔记——1487. 保证文件名唯一

文章目录题目描述题目链接题目难度——中等方法一&#xff1a;哈希表代码/Python代码/C总结题目描述 给你一个长度为 n 的字符串数组 names 。你将会在文件系统中创建 n 个文件夹&#xff1a;在第 i 分钟&#xff0c;新建名为 names[i] 的文件夹。 由于两个文件 不能 共享相同…

JUC并发编程之JMM_synchronized_volatile

目录 JUC并发编程之JMM_synchronized_volatile 什么是JMM模型&#xff1f; JMM和JVM的区别 JMM不同于JVM内存区域模型 主内存 工作内存 Java内存模型与硬件内存架构的关系 JMM存在的必要性 数据同步八大原子操作 同步规则分析 并发编程的可见性&#xff0c;原子性与有序…

【代码编辑器记录一】vue项目中如何实现代码高亮效果+输入

文章目录1-1 代码高亮显示但不可以实现编辑输入vue-highlightjs1-1-1 vue31-1-2 vue21-2 编辑输入高亮代码&#xff0c;进行格式规范code-mirror1-2-1 展示1-2-2 基本配置1-2-3 使用1-1 代码高亮显示但不可以实现编辑输入vue-highlightjs 1-1-1 vue3 安装依赖 npm install -…

企业电子招投标采购系统之系统的首页设计

​​ 功能模块&#xff1a; 待办消息&#xff0c;招标公告&#xff0c;中标公告&#xff0c;信息发布 描述&#xff1a; 全过程数字化采购管理&#xff0c;打造从供应商管理到采购招投标、采购合同、采购执行的全过程数字化管理。通供应商门户具备内外协同的能力&#xff0c;为…

【知识点】OkHttp 原理 8 连问

前言OkHttp可以说是Android开发中最常见的网络请求框架,OkHttp使用方便&#xff0c;扩展性强&#xff0c;功能强大&#xff0c;OKHttp源码与原理也是面试中的常客但是OKHttp的源码内容比较多&#xff0c;想要学习它的源码往往千头万绪&#xff0c;一时抓不住重点.本文从几个问题…

ThreadLocal 理解及面试

一、ThreadLocal 引用关系 图解关系说明&#xff1a; 每个线程拥有自己的 ThreadLocalMap 属性&#xff1b;ThreadLocalMap 的存储结构为 Entry[] 数组&#xff1b;Entry的Key是ThreadLocal类型且弱引用指向ThreadLocal对象&#xff0c;Value是我们自己定义的泛型值对象&#…

链表的替代品--Vector组件

概述 在之前的一篇文章中&#xff0c;作者写了一个事件组件-- 超精简的订阅发布事件组件–SPEvent&#xff0c;这个组件是采用链表建立所有事件节点的关系的。链表的优缺点&#xff1a; 优点&#xff1a;①链表上的元素在空间存储上内存地址不连续&#xff1b;②在插入和删除操…

注解开发定义bean

注解开发定义bean 使用Component定义bean在核心配置文件中通过组件扫描加载bean&#xff0c;需要指定扫描包的范围 当然也可以使用Component的衍生注解&#xff0c;可以更加形象的表示 纯注解的开发模式 使用java类来代替了以前的 配置文件&#xff0c;在java类中&#xff…

渗透测试之巧用工具搞定sharepoint

背景 在一次实战演练中 goby扫描到一个sharepoint的getshell漏洞 &#xff0c;漏洞cve编号为CVE-2019-0604&#xff0c;本想着一把梭&#xff0c;直接渗透内网&#xff0c;没想到有waf之类的防护&#xff0c;最后还是想办法解决了。 现在网络上各类漏洞利用工具很多&#xff…

项目中用到的知识点回顾---JWT(JSON Web Token)

1.JWT原理&#xff1a; JWT 的原理是&#xff0c;服务器认证以后&#xff0c;生成一个 JSON 对象&#xff0c;发回给用户&#xff0c;如下&#xff1b; {"姓名": "张三","角色": "管理员","到期时间": "2018年7月1日…

【调试】ftrace(一)基本使用方法

简介 Ftrace是Linux Kernel的官方tracing系统&#xff0c;支持Function trace、静态tracepoint、动态Tracepoint的跟踪&#xff0c;还提供各种Tracer&#xff0c;用于统计最大irq延迟、最大函数调用栈大小、调度事件等。 Ftrace还提供了强大的过滤、快照snapshot、实例&#…

数据结构的一些基础概念

一 基本术语 数据&#xff1a;是描述客观事物的符号&#xff0c;是计算机中可以操作的对象&#xff0c;是能被计算机识别&#xff0c;并输入给计算机处理的符号集合。 数据元素&#xff1a;是组成数据的&#xff0c;有一定意义的基本单位&#xff0c;在计算机中通常作为整体处…

【Docker】docker | 迁移docker目录

一、场景说明1、物理机磁盘空间不够用了2、docker的镜像、容器、卷等资料的默认路径为&#xff1a; /var/lib/docker3、增加了数据盘挂在&#xff0c;需要将docker的全部资料更换个目录二、操作确认是否满足切换条件1&#xff09;服务是否能够暂停&#xff0c;如果可以就OK2&am…

新一代骨传导机皇重磅发布:南卡Neo骨传导运动耳机,性能全面提升

近日&#xff0c;中国最强骨传导品牌NANK南卡发布了最新一代骨传导耳机——南卡Neo骨传导耳机&#xff01;该款耳机与运动专业性更强的南卡runner Pro4略微不同&#xff0c;其主要定位于轻运动风格&#xff0c;所以这款耳机的音质和佩戴舒适度达到了令人咂舌的地步&#xff01;…