2022年 11月 4日

Python多线程

Python的多线程

目录

基本概念

并行与并发

进程与线程

进程与线程的特点

Python多线程编程

Threading模块

创建线程

方式一:直接继承Thread,改写对应的run方法

方式二:直接调用Thread

多线程问题

多线程加锁

多线程编程死锁

解决死锁的办法

多线程多进程对于cpu bound任务与I/O bound任务的性能对比

总结


一直想对Python的多线程和多进程做一个简单的总结,毕竟开发到后面始终都是会用到并发和并行的设计知识的,现简单总结如下。

基本概念

并行与并发

  • 并行:同时处理多个任务的能力,指的是任务数小于等于cpu核数,任务真的是一起执行的。
  • 并发:交替处理多个任务的能力。指的是任务数多于cpu核数,通过操作系统的各种任务调度算法,实现用多个任务“一起”执行(实际上总有一些任务不在执行,因为切换任务的速度相当快,看上去一起执行而已)

 

              并行                                                                        并发

进程与线程

进程与线程的特点

 

多线程

多进程

优点

  • 多线程能够有效提升I/O阻塞型程序的效率
  • 与进程相比,占用更少的系统资源
  • 线程间能够共享所述进程的资源,方便进行通信
  • 更好地利用多核处理器
  • 处理cpu密集型任务时比多线程要好
  • 单个进程的崩溃不会导致整个程序的崩溃

缺点

  • CPython中有全局解释器锁(GIL)的限制。抢占式多线程容易造成线程颠簸
  • 线程共享地址空间,一个线程非法操作共享数据崩溃后,会导致整个进程崩溃,继而多个线程崩溃
  • 进程之间没有共享资源,如果需要共享资源需要自己编码设计。实现进程间通信,继而完成数据传递或者数据共享。
  • 进程需要消耗更多的内存

Python多线程编程

Threading模块

Python3 通过两个标准库 _thread 和 threading 提供对线程的支持。

_thread 提供了低级别的、原始的线程以及一个简单的锁,它相比于 threading 模块的功能还是比较有限的。

threading 模块除了包含 _thread 模块中的所有方法外,还提供的其他方法:

threading.currentThread(): 返回当前的线程变量。

threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。

threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

除了使用方法外,线程模块同样提供了Thread类来处理线程,Thread类提供了以下方法:

run(): 用以表示线程活动的方法。

start():启动线程活动。

join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。

isAlive(): 返回线程是否活动的。

getName(): 返回线程名。

setName(): 设置线程名。

创建线程

方式一:直接继承Thread,改写对应的run方法

  1. #!/usr/bin/python3
  2. import threading
  3. import time
  4. exitFlag = 0
  5. class myThread (threading.Thread):
  6.     def __init__(self, threadID, name, counter):
  7.         threading.Thread.__init__(self)
  8.         self.threadID = threadID
  9.         self.name = name
  10.         self.counter = counter
  11.     def run(self):
  12.         print ("开始线程:" + self.name)
  13.         print_time(self.name, self.counter, 5)
  14.         print ("退出线程:" + self.name)
  15. def print_time(threadName, delay, counter):
  16.     while counter:
  17.         if exitFlag:
  18.             threadName.exit()
  19.         time.sleep(delay)
  20.         print ("%s: %s" % (threadName, time.ctime(time.time())))
  21.         counter -= 1
  22. # 创建新线程
  23. thread1 = myThread(1, "Thread-1", 1)
  24. thread2 = myThread(2, "Thread-2", 2)
  25. # 开启新线程
  26. thread1.start()
  27. thread2.start()
  28. thread1.join()
  29. thread2.join()
  30. print ("退出主线程")

 

 

方式二:直接调用Thread

  1. # 多任务可以由多进程完成,也可以由一个进程内的多线程完成。
  2. # 我们前面提到了进程是由若干线程组成的,一个进程至少有一个线程。
  3. # Python使用threading模块对线程进行操作
  4. import time, threading
  5. # 新线程执行的代码
  6. def loop():
  7.     print('thread %s is running...' % threading.current_thread().name)
  8.     n = 0
  9.     while n < 5:
  10.         n += 1
  11.         print('thread %s >>> %s ' % (threading.current_thread().name, n))
  12.         time.sleep(1)
  13.     print('thread %s ended.' % threading.current_thread().name)
  14. if __name__ == "__main__":
  15.     print('thread %s is running...' % threading.current_thread().name)
  16.     t = threading.Thread(target=loop, name='LoopThread')
  17.     t.start()
  18.     # 如果不写join,似乎对子线程的运行没有什么影响,只是主线程会提前结束。
  19.     # 主线程的结束不会导致子线程的结束
  20.     t.join()
  21. print('thread %s ended.' % threading.current_thread().name)

 

多线程问题

多线程加锁

当多线程中多个线程修改共享资源时,需要加锁,否则会出现线程不安全问题。

# 多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,

# 而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。

# 上述所说的即为线程不安全问题

  1. import time, threading
  2. balance = 0
  3. lock = threading.Lock()
  4. def change_it(n):
  5.     # 先存后取,结果应该为0
  6.     global balance
  7.     balance += n
  8.     balance -= n
  9. def run_thread(n):
  10.     # 在跑线程的时候不加锁,就和之前写的Beringei操作数据库代码一样,由于不是原子性操作
  11.     # 两个线程并行交叉运行多条语句,所以会出现线程不安全,需要枷锁
  12.     for i in range(100000):
  13.         # lock.acquire()
  14.         # try:
  15.         change_it(n)
  16.         # finally:
  17.         #     lock.release()
  18. if __name__ == "__main__":
  19.     t1 = threading.Thread(target=run_thread, args=(5,))
  20.     t2 = threading.Thread(target=run_thread, args=(8,))
  21.     t1.start()
  22.     t2.start()
  23.     t1.join()
  24.     t2.join()
  25.     print(balance)

# 锁的好处就是确保了某段关键代码只能由一个线程从头到尾完整地执行,坏处当然也很多,首先是阻止了多线程并发执行,包含锁的某段代码

# 实际上只能以单线程模式执行,效率就大大地下降了。其次,由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,

# 可能会造成死锁,导致多个线程全部挂起,既不能执行,也无法结束,只能靠操作系统强制终止

以上代码段的结果由于没有加锁,不一定是0, 如下

所以需要对run_thread这个函数进行小小的改动

  1. def run_thread(n):
  2.     # 在跑线程的时候不加锁,就和之前写的Beringei操作数据库代码一样,由于不是原子性操作
  3.     # 两个线程并行交叉运行多条语句,所以会出现线程不安全,需要枷锁
  4.     for i in range(100000):
  5.         lock.acquire()
  6.         try:
  7.             change_it(n)
  8.         finally:
  9.             lock.release()

 

 

多线程编程死锁

如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源正在使用,所以这两个线程如果不人为终止,将一直等待下去。死锁示意图如下:

 

死锁示例代码与死锁结果:

  1. import time
  2. import threading
  3. class Mythread(threading.Thread):
  4.     def __init__(self):
  5.         threading.Thread.__init__(self)
  6.     def run(self):
  7.         self.foo()
  8.         self.bar()
  9.     def foo(self):
  10.         locka.acquire()
  11.         print('I am %s got locka---%s' % (self.name, time.ctime()))
  12.         lockb.acquire()
  13.         print('I am %s got lockb---%s' % (self.name, time.ctime()))
  14.         lockb.release()
  15.         locka.release()
  16.     def bar(self):
  17.         lockb.acquire()
  18.         print('I am %s got lockb---%s' % (self.name, time.ctime()))
  19.         locka.acquire()
  20.         print('I am %s got locka---%s' % (self.name, time.ctime()))
  21.         locka.release()
  22.         lockb.release()
  23. if __name__ == '__main__':
  24.     locka = threading.Lock()
  25.     lockb = threading.Lock()
  26.     for i in range(2):
  27.         t = Mythread()
  28.         t.start()

 

 

解决死锁的办法

将lock换为递归锁,即Rlock()

  1. import time
  2. import threading
  3. class Mythread(threading.Thread):
  4.     def __init__(self):
  5.         threading.Thread.__init__(self)
  6.     def run(self):
  7.         self.foo()
  8.         self.bar()
  9.     def foo(self):
  10.         lock.acquire()
  11.         print('I am %s got locka---%s' % (self.name, time.ctime()))
  12.         lock.acquire()
  13.         print('I am %s got lockb---%s' % (self.name, time.ctime()))
  14.         lock.release()
  15.         lock.release()
  16.     def bar(self):
  17.         lock.acquire()
  18.         print('I am %s got lockb---%s' % (self.name, time.ctime()))
  19.         lock.acquire()
  20.         print('I am %s got locka---%s' % (self.name, time.ctime()))
  21.         lock.release()
  22.         lock.release()
  23. if __name__ == '__main__':
  24.     lock = threading.RLock()
  25.     for i in range(2):
  26.         t = Mythread()
  27.         t.start()

多线程多进程对于cpu bound任务与I/O bound任务的性能对比

  1. import requests
  2. import time
  3. from threading import Thread
  4. from multiprocessing import Process
  5. # 定义cpu密集型的计算函数
  6. def count(x, y):
  7.     c = 0
  8.     while c < 50000:
  9.         c += 1
  10.         x += x
  11.         y += y
  12. # 定义io密集型的文件读写函数
  13. def write():
  14.     f = open("test.txt", "w")
  15.     for x in range(5000000):
  16.         f.write("testwrite\n")
  17.     f.close()
  18. def read():
  19.     f = open("test.txt", "r")
  20.     lines = f.readlines()
  21.     f.close()
  22. # 定义网络请求函数
  23. _head = {
  24.     'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36'}
  25. url = "http://www.tieba.com"
  26. def http_request():
  27.     try:
  28.         webPage = requests.get(url, headers=_head)
  29.         html = webPage.text
  30.         return {"context": html}
  31.     except Exception as e:
  32.         return {"error": e}
  33. # 测试单进程单线程IO密集型、cpu密集型、网络请求密集型操作所需要的时间
  34. def line():
  35.     # CPU密集操作
  36.     t = time.time()
  37.     for x in range(10):
  38.         count(1, 1)
  39.     print("Line cpu", time.time() - t)
  40.     # IO密集操作
  41.     t = time.time()
  42.     for x in range(10):
  43.         write()
  44.         read()
  45.     print("Line IO", time.time() - t)
  46.     # 网络请求密集型操作
  47.     t = time.time()
  48.     for x in range(10):
  49.         http_request()
  50.     print("Line Http Request", time.time() - t)
  51. def multi_thread_cpu():
  52.     counts = []
  53.     t = time.time()
  54.     for x in range(10):
  55.         thread = Thread(target=count, args=(1, 1))
  56.         counts.append(thread)
  57.         thread.start()
  58.     e = counts.__len__()
  59.     while True:
  60.         for th in counts:
  61.             if not th.is_alive():
  62.                 e -= 1
  63.         if e <= 0:
  64.             break
  65.     print(time.time() - t)
  66. def io():
  67.    write()
  68.     read()
  69. def multi_thread_io():
  70.     t = time.time()
  71.     ios = []
  72.     t = time.time()
  73.     for x in range(10):
  74.         thread = Thread(target=count, args=(1, 1))
  75.         ios.append(thread)
  76.         thread.start()
  77.     e = ios.__len__()
  78.     while True:
  79.         for th in ios:
  80.             if not th.is_alive():
  81.                 e -= 1
  82.         if e <= 0:
  83.             break
  84.     print(time.time() - t)
  85. def multi_thread_http():
  86.     t = time.time()
  87.     ios = []
  88.     t = time.time()
  89.     for x in range(10):
  90.         thread = Thread(target=http_request)
  91.         ios.append(thread)
  92.         thread.start()
  93.     e = ios.__len__()
  94.     while True:
  95.         for th in ios:
  96.             if not th.is_alive():
  97.                e -= 1
  98.         if e <= 0:
  99.             break
  100.     print("Thread Http Request", time.time() - t)
  101. def mp_cpu():
  102.     counts = []
  103.     t = time.time()
  104.     for x in range(10):
  105.         process = Process(target=count, args=(1, 1))
  106.         counts.append(process)
  107.         process.start()
  108.     e = counts.__len__()
  109.     while True:
  110.         for th in counts:
  111.             if not th.is_alive():
  112.                 e -= 1
  113.         if e <= 0:
  114.             break
  115.     print("Multiprocess cpu", time.time() - t)
  116. def mp_io():
  117.     t = time.time()
  118.     ios = []
  119.     t = time.time()
  120.     for x in range(10):
  121.         process = Process(target=io)
  122.         ios.append(process)
  123.         process.start()
  124.     e = ios.__len__()
  125.     while True:
  126.         for th in ios:
  127.             if not th.is_alive():
  128.                 e -= 1
  129.         if e <= 0:
  130.             break
  131.     print("Multiprocess IO", time.time() - t)
  132. def mp_http():
  133.     t = time.time()
  134.     httprs = []
  135.     ios = []
  136.     t = time.time()
  137.     for x in range(10):
  138.         process = Process(target=http_request)
  139.         ios.append(process)
  140.         process.start()
  141.     e = httprs.__len__()
  142.     while True:
  143.         for th in httprs:
  144.             if not th.is_alive():
  145.                 e -= 1
  146.         if e <= 0:
  147.             break
  148.     print("Multiprocess Http Request", time.time() - t)
  149. if __name__ == '__main__':
  150.     mp_cpu()
  151.     mp_io()
  152.     mp_http()

 

CPU密集型/s

IO密集型/s

HTTP密集型/s

Line(单进程单线程)

94.91825

22.462

7.3296

多线程

101.17

24.8605

0.5053333

多进程

53.89

12.784

0.5045

总结

多线程在IO密集型的操作下似乎也没有很大的优势(也许IO操作的任务再繁重一些就能体现出优势),在CPU密集型的操作下明显地比单线程线性执行性能更差,但是对于网络请求这种忙等阻塞线程的操作,多线程的优势便非常显著了

 

多进程无论是在CPU密集型还是IO密集型以及网络请求密集型(经常发生线程阻塞的操作)中,都能体现出性能的优势。不过在类似网络请求密集型的操作上,与多线程相差无几,但却更占用CPU等资源,所以对于这种情况下,我们可以选择多线程来执行

1、对于CPU密集型的程序,顺序执行比并发执行效率更高,这是因为并发时GIL带来了切换线程额外的开销。

   (循环处理、计数等)

2. 对于IO密集型的程序,Python的多线程并发执行能够提高效率的,因为,Python线程在等待IO时,会释放GIL;

  (文件处理、网络爬虫)

3、多核多线程比单核多线程更差,单核下的多线程,每次释放GIL,唤醒的那个线程都能获取到GIL锁,所以能够无缝执行,但多核下,CPU0释放GIL后,其他CPU上的线程都会进行竞争,但GIL可能会马上又被CPU0拿到,导致其他几个CPU上被唤醒后的线程会醒着等待到切换时间后又进入待调度状态,这样会造成线程颠簸(thrashing),导致效率更低。