用 Python 和任何语言突然终止线程通常是一个错误的模式。考虑以下情况:
如果您负担得起的话(如果您要管理自己的线程),处理此问题的一种好方法是有一个 exit_request 标志,每个线程定期检查一次,以查看是否该退出。
例如:
import threading
class StoppableThread(threading.Thread):
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""
def __init__(self, *args, **kwargs):
super(StoppableThread, self).__init__(*args, **kwargs)
self._stop_event = threading.Event()
def stop(self):
self._stop_event.set()
def stopped(self):
return self._stop_event.is_set()
在此代码中,您希望在线程退出时调用其上的stop()
,然后使用join()
等待线程正确退出。线程应定期检查停止标志。
但是,在某些情况下,您确实需要杀死线程。一个示例是当您包装一个忙于长时间调用的外部库并且想要中断它时。
以下代码允许(有一些限制)在 Python 线程中引发 Exception:
def _async_raise(tid, exctype):
'''Raises an exception in the threads with id tid'''
if not inspect.isclass(exctype):
raise TypeError("Only types can be raised (not instances)")
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid),
ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# "if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), None)
raise SystemError("PyThreadState_SetAsyncExc failed")
class ThreadWithExc(threading.Thread):
'''A thread class that supports raising exception in the thread from
another thread.
'''
def _get_my_tid(self):
"""determines this (self's) thread id
CAREFUL : this function is executed in the context of the caller
thread, to get the identity of the thread represented by this
instance.
"""
if not self.isAlive():
raise threading.ThreadError("the thread is not active")
# do we have it cached?
if hasattr(self, "_thread_id"):
return self._thread_id
# no, look for it in the _active dict
for tid, tobj in threading._active.items():
if tobj is self:
self._thread_id = tid
return tid
# TODO: in python 2.6, there's a simpler way to do : self.ident
raise AssertionError("could not determine the thread's id")
def raiseExc(self, exctype):
"""Raises the given exception type in the context of this thread.
If the thread is busy in a system call (time.sleep(),
socket.accept(), ...), the exception is simply ignored.
If you are sure that your exception should terminate the thread,
one way to ensure that it works is:
t = ThreadWithExc( ... )
...
t.raiseExc( SomeException )
while t.isAlive():
time.sleep( 0.1 )
t.raiseExc( SomeException )
If the exception is to be caught by the thread, you need a way to
check that your thread has caught it.
CAREFUL : this function is executed in the context of the
caller thread, to raise an excpetion in the context of the
thread represented by this instance.
"""
_async_raise( self._get_my_tid(), exctype )
(基于 Tomer Filiba 的Killable Threads 。有关PyThreadState_SetAsyncExc
返回值的PyThreadState_SetAsyncExc
似乎来自旧版本的 Python 。)
如文档中所述,这不是灵丹妙药,因为如果线程在 Python 解释器之外很忙,它将无法捕获中断。
此代码的一个很好的用法模式是让线程捕获特定的异常并执行清理。这样,您可以中断任务并仍然进行适当的清理。
没有官方的 API 可以这样做。
您需要使用平台 API 杀死线程,例如 pthread_kill 或 TerminateThread。您可以通过 pythonwin 或 ctypes 访问此类 API。
请注意,这本质上是不安全的。如果被杀死的线程在被杀死时具有 GIL,则很可能导致无法收集的垃圾(来自成为垃圾的堆栈帧的局部变量),并可能导致死锁。
一个multiprocessing.Process
可以p.terminate()
在我想杀死一个线程,但又不想使用标志 / 锁 / 信号 / 信号量 / 事件 / 任何东西的情况下,我会将线程提升为完整进程。对于仅使用几个线程的代码,开销并不是那么糟糕。
例如,这样做很方便,可以轻松终止执行阻塞 I / O 的助手 “线程”
转换是微不足道的:在相关代码中,将所有threading.Thread
替换为multiprocessing.Process
,将所有queue.Queue
为multiprocessing.Queue
,并将p.terminate()
的必需调用添加到要杀死其子p
父进程中。