自从 2010 年提出这个问题以来,如何使用带有map和pool 的 python 进行简单的多线程处理已经有了真正的简化。
下面的代码来自一篇文章 / 博客文章,你绝对应该检查(没有隶属关系) - 一行中的并行性:日常线程任务的更好模型 。我将在下面总结 - 它最终只是几行代码:
from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(4)
results = pool.map(my_function, my_array)
哪个是多线程版本:
results = []
for item in my_array:
results.append(my_function(item))
描述
Map 是一个很酷的小函数,是轻松将并行性注入 Python 代码的关键。对于那些不熟悉的人来说,地图是从像 Lisp 这样的函数式语言中解脱出来的。它是一个在序列上映射另一个函数的函数。
Map 为我们处理序列上的迭代,应用函数,并将所有结果存储在最后的方便列表中。
履行
地图函数的并行版本由两个库提供:多处理,以及它鲜为人知,但同样出色的步骤子:multiprocessing.dummy。
multiprocessing.dummy
与多处理模块完全相同, 但使用线程 ( 一个重要的区别 - 对 CPU 密集型任务使用多个进程; 为 IO(和 IO 期间)使用线程 ):
multiprocessing.dummy 复制多处理的 API,但只不过是线程模块的包装器。
import urllib2
from multiprocessing.dummy import Pool as ThreadPool
urls = [
'http://www.python.org',
'http://www.python.org/about/',
'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
'http://www.python.org/doc/',
'http://www.python.org/download/',
'http://www.python.org/getit/',
'http://www.python.org/community/',
'https://wiki.python.org/moin/',
]
# make the Pool of workers
pool = ThreadPool(4)
# open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)
# close the pool and wait for the work to finish
pool.close()
pool.join()
时间结果如下:
Single thread: 14.4 seconds
4 Pool: 3.1 seconds
8 Pool: 1.4 seconds
13 Pool: 1.3 seconds
传递多个参数 ( 仅在 Python 3.3 及更高版本中起作用):
要传递多个数组:
results = pool.starmap(function, zip(list_a, list_b))
或传递常量和数组:
results = pool.starmap(function, zip(itertools.repeat(constant), list_a))
如果您使用的是早期版本的 Python,则可以通过此变通方法传递多个参数。
(感谢user136036的有用评论)
这是一个简单的示例:您需要尝试一些备用 URL 并返回第一个要响应的内容。
import Queue
import threading
import urllib2
# called by each thread
def get_url(q, url):
q.put(urllib2.urlopen(url).read())
theurls = ["http://google.com", "http://yahoo.com"]
q = Queue.Queue()
for u in theurls:
t = threading.Thread(target=get_url, args = (q,u))
t.daemon = True
t.start()
s = q.get()
print s
这是一种将线程用作简单优化的情况:每个子线程都在等待 URL 解析和响应,以便将其内容放在队列中; 每个线程都是一个守护进程(如果主线程结束,则不会保持进程 - 这种情况比较常见); 主线程启动所有的子线程,做一个get
队列等待,直到其中一人已经做了put
,然后发出的结果,并终止(这需要下可能仍在运行的所有子线程,因为他们是守护线程)。
在 Python 中正确使用线程总是连接到 I / O 操作(因为 CPython 不使用多个内核来运行 CPU 绑定任务,因此线程的唯一原因是在等待某些 I / O 时没有阻止进程)。顺便说一句,队列几乎总是将工作分配到线程和 / 或收集工作结果的最佳方式,并且它们本质上是线程安全的,因此它们可以避免担心锁,条件,事件,信号量和其他相互关联线程协调 / 通信概念。
注意 :对于 Python 中的实际并行化,您应该使用多处理模块来分叉并行执行的多个进程(由于全局解释器锁定,Python 线程提供交错但实际上是串行执行,而不是并行执行,并且仅在交错 I / O 操作)。
但是,如果您只是在寻找交错(或者正在进行可以并行化的 I / O 操作,尽管全局解释器锁定),那么线程模块就是起点。作为一个非常简单的例子,让我们通过并行求和子范围来考虑求和大范围的问题:
import threading
class SummingThread(threading.Thread):
def __init__(self,low,high):
super(SummingThread, self).__init__()
self.low=low
self.high=high
self.total=0
def run(self):
for i in range(self.low,self.high):
self.total+=i
thread1 = SummingThread(0,500000)
thread2 = SummingThread(500000,1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join() # This waits until the thread has completed
thread2.join()
# At this point, both threads have completed
result = thread1.total + thread2.total
print result
请注意,上面是一个非常愚蠢的例子,因为它绝对没有 I / O,并且由于全局解释器锁,它将在 CPython 中以串行方式执行,尽管是交错的(带有上下文切换的额外开销)。