python多线程使用
文章目录
- python多线程使用
- 一、案例
- 二、说明
- 1、针对第一种是有返回值的 ,可以通过future.result() 去拿到每个线程返回值
- 2、无返回值问题
- 3、我们可以重写这个Thread类
- 重写了__init__、run方法和join方法,主要是start()涉及的方法太多了
- 而run()却相对简单
- 4、重写后的run()
一、案例
def compute_ceph_func(self, i, ceph_ip)
"""
i: hostinfo
ceph_ip: ceph_host ip
"""
# 第一种 有返回值
def compute_ceph_port_check(self, region, ceph_ip):
import concurrent.futures
tmp_datas = []
with concurrent.futures.ThreadPoolExecutor(max_workers=300) as executor:
# 线程列表
to_do = []
for i in os_services:
future = executor.submit(self.compute_ceph_func, i, ceph_ip)
to_do.append(future)
for future in concurrent.futures.as_completed(to_do):
data = future.result() # 获取每个线程的执行结果
tmp_datas.extend(data)
# 另外一种 无返回值
tmp_datas = []
threads = []
for i in os_services:
t = threading.Thread(target=self.compute_ceph_func, args=(i, ceph_ip))
threads.append(t)
t.start()
for t in threads:
t.join()
tmp_datas = [thread.join() for thread in threads]
logging.info(tmp_datas)
# 另外一种
from concurrent.futures import ThreadPoolExecutor
from threading import Thread
def func(x):
return x
if __name__ == '__main__':
pool = ThreadPoolExecutor(max_workers=500)
li = []
for i in range(1, 500):
li.append(pool.submit(func, i))
for l in li:
print(l.result())
# 关闭线程池
pool.shutdown()
import os
from multiprocessing.pool import ThreadPool
def func(x):
print(f"Process Id:{os.getpid()} res:{x+1}", )
return x + 1
if __name__ == '__main__':
pool = ThreadPool(processes=10)
li = []
for i in range(1, 500):
li.append(pool.apply_async(func, args=(i,)))
for p in li:
print(p.get())
pool.close()
二、说明
1、针对第一种是有返回值的 ,可以通过future.result() 去拿到每个线程返回值
2、无返回值问题
对于第二种方法无返回值问题:
可以重新写join方法,并且在run方法中给对象设置了一个属性,_return这个属性的值就是线程的执行结果,最后在join方法中return出来。
我们可以详细看下
# 每个线程无返回值问题
tmp_datas = []
threads = []
for i in os_services:
t = threading.Thread(target=self.compute_ceph_func, args=(i, ceph_ip))
threads.append(t)
t.start() # start
for t in threads:
t.join()
tmp_datas = [thread.join() for thread in threads]
# 1、首先看start()方法
def start(self):
"""Start the thread's activity.
It must be called at most once per thread object. It arranges for the
object's run() method to be invoked in a separate thread of control.
This method will raise a RuntimeError if called more than once on the
same thread object.
"""
if not self._initialized:
raise RuntimeError("thread.__init__() not called")
if self._started.is_set():
raise RuntimeError("threads can only be started once")
with _active_limbo_lock:
_limbo[self] = self
try:
_start_new_thread(self._bootstrap, ())
except Exception:
with _active_limbo_lock:
del _limbo[self]
raise
self._started.wait()
# 其实不难看出 start方法并没有返回值,并且从下面的__Init__ 中可以看出并没有存储下来
class Thread:
"""A class that represents a thread of control.
This class can be safely subclassed in a limited fashion. There are two ways
to specify the activity: by passing a callable object to the constructor, or
by overriding the run() method in a subclass.
"""
_initialized = False
# 并且从下面的__Init__ 中可以看出并没有存储下来
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None):
"""This constructor should always be called with keyword arguments. Arguments are:
*group* should be None; reserved for future extension when a ThreadGroup
class is implemented.
*target* is the callable object to be invoked by the run()
method. Defaults to None, meaning nothing is called.
*name* is the thread name. By default, a unique name is constructed of
the form "Thread-N" where N is a small decimal number.
*args* is the argument tuple for the target invocation. Defaults to ().
*kwargs* is a dictionary of keyword arguments for the target
invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke
the base class constructor (Thread.__init__()) before doing anything
else to the thread.
"""
assert group is None, "group argument must be None for now"
if kwargs is None:
kwargs = {}
self._target = target
self._name = str(name or _newname())
self._args = args
self._kwargs = kwargs
if daemon is not None:
self._daemonic = daemon
else:
self._daemonic = current_thread().daemon
self._ident = None
if _HAVE_THREAD_NATIVE_ID:
self._native_id = None
self._tstate_lock = None
self._started = Event()
self._is_stopped = False
self._initialized = True
# Copy of sys.stderr used by self._invoke_excepthook()
self._stderr = _sys.stderr
self._invoke_excepthook = _make_invoke_excepthook()
# For debugging and _after_fork()
_dangling.add(self)
3、我们可以重写这个Thread类
重写了__init__、run方法和join方法,主要是start()涉及的方法太多了
而run()却相对简单
4、重写后的run()
class ThreadReturnValueHanler(Thread):
"""
"""
def run(self):
if self._target is not None:
self._return = self._target(*self._args, **self._kwargs )
def join(self):
super().join()
return self._return
# 当然直接使用import concurrent.futures 更为方便