本文共 1945 字,大约阅读时间需要 6 分钟。
import inspect
import ctypes
import time
import random
import traceback
from threading import Thread
import sys
# from utils.thread_killer import stop_thread
def _async_raise(tid, exctype):
"""raises the exception, performs cleanup if needed"""
tid = ctypes.c_long(tid)
if not inspect.isclass(exctype):
exctype = type(exctype)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid,
ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("PyThreadState_SetAsyncExc failed")
def stop_thread(thread):
print(f'\nt -> {thread}')
print(f'\nthread.ident -> {thread.ident}')
_async_raise(thread.ident, SystemExit)
def func1():
while True:
try:
print(f'func1\n')
time.sleep(2)
except SystemExit:
sys.exit()
except:
print(traceback.format_exc())
def func2():
while True:
try:
print(f'func2]\n')
time.sleep(1.3)
except SystemExit:
sys.exit()
except:
print(traceback.format_exc())
def func3():
try:
func1()
except SystemExit:
sys.exit()
except:
print(traceback.format_exc())
class Test(object):
def __init__(self):
pass
def engine(self):
thread_pool = []
t1 = Thread(target=func3, args=())
t2 = Thread(target=func2, args=())
thread_pool.append(t1)
thread_pool.append(t2)
for i in thread_pool:
i.start()
while True:
try:
time.sleep(3)
# for i in thread_pool:
# print(f'线程{i}的状态{i.is_alive()}\n')
# print(f'线程{i}的名字{i.name}\n')
# print(f'线程的方法{i.ident}\n')
# time.sleep(1)
for i in thread_pool:
print(type(i.name))
if i.name == 'Thread-1':
print(f'开始杀线程')
stop_thread(i)
time.sleep(3)
thread_pool.remove(i)
print(f'thread_pool -> {thread_pool}')
print(i.is_alive())
except:
pass
if __name__ == '__main__':
test_case = Test()
test_case.engine()
要注意一定要在预备被杀的线程中拦截SystemExit错误,然后sys.exit(),否则不能杀死该线程。
关键字:python 线程 杀 动态
转载地址:http://oxhsl.baihongyu.com/