Python 多线程编程小记

threading模块

python3 中对于多线程,提供了两个模块 _threadthreading 模块,其中 threading 是对 _thread 的封装。

实现多线程有两种方法,一是直接实例化一个 threading.Thread 对象,实例化时,传入需要执行的函数和参数。

二是自定义一个多线程类继承 threading.Thread 类,并重写其中的 run() 方法。因为当调用 start() 方法后子线程就是去执行 run() 方法。

Thread 类中包含的方法

方法和属性 说明
start() 启动线程,等待CPU调度
run() 线程被cpu调度后自动执行的方法
getName()、setName()和name 用于获取和设置线程的名称。
setDaemon() 设置为后台线程或前台线程(默认是False,前台线程)。如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止。如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程执行完成后,程序才停止。
join([timeout]) 调用该方法将会使主调线程堵塞,直到被调用线程运行结束或超时。参数timeout是一个数值类型,表示超时时间,如果未提供该参数,那么主调线程将一直堵塞到被调线程结束。
isDaemon()方法和daemon属性 是否为守护线程
is_alive() 判断线程是否是激活的(alive)。从调用start()方法启动线程,到run()方法执行完毕或遇到未处理异常而中断这段时间内,线程是激活的。
ident 获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。

Queue队列 | 生产者消费者模式

以一个局域网 Ping 扫描存活主机工具(扫C段)为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# -*- coding: utf8 -*-
import os
import platform
import re
import threading
from queue import Queue


def ping_scan(ip): # 进行ping检测
cmd = 'ping -{} 1 {}'.format(get_os(), ip)
data = os.popen(cmd).read()
message = re.findall(r'TTL=.+', data, re.IGNORECASE)
if len(message) == 1: # 通过 TTL 的值判断是否存在该主机
print('[OK] '+ip)


def get_os(): # 判断操作系统
if platform.system() == 'Windows':
sys = 'n'
else:
sys = 'c'
return sys


class Mythread(threading.Thread): # 这里继承了Thread类,并重写了run方法

def __init__(self, queue):
self.queue = queue
threading.Thread.__init__(self)

def run(self):
while True:
ip = self.queue.get() # 从队列中取出 IP 进行扫描,如果是 0 则标志 IP 结束,则将 0 放回队列中,告诉下一个进程,所有 IP 都扫描结束了
if ip != 0:
ping_scan(ip)
else:
queue.put(0)
break


if __name__ == '__main__':
ip = input('请输入要扫描的网段(示例:192.168.1.0):\n')
num = int(input('请输入线程数:\n'))
ip = re.findall(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.', ip)[0]
queue = Queue(255) # 这里生产者消费者模式实现进程调用,但这里先把254个IP放入到队列中,然后由每个线程在进行扫描时来取
for i in range(1, 255):
queue.put(ip+str(i))
queue.put(0) # 最后放入一个标志位 0,表示结束
threads = []
for i in range(0, num):
threads.append(Mythread(queue))
for i in threads:
i.start()

因为这里可以指定任意个线程数,一开始的想的是一共 254 个 IP ,给每个线程平均分配若干个IP进行测试,但测试后发现如何把 254 个 IP 平均分配给若干个线程好像这个分配有点难…还有一个问题就是如果某一个线程提前完成任务了,它并不能去帮忙其他线程完成任务…会出现有线程围观的情况。

后来通过查询,发现了线程的生产者消费者模式。生产线程负责生产数据,数据生产完后放入到缓存中,消费线程每次需要数据直接去缓存中取,这样就解决线程围观的问题。在 Python 中,这个数据缓存,可以用 Queue 类来实现。Queue 类已经提供了对线程同步的支持。所以也不会有线程锁之类的线程同步问题。(图片来源网络)

生产者消费者模式

Queue 类中包含的方法

方法和属性 说明
put(self, item, block=True, timeout=None) 将一个值放入队列中,item放入队列中的值,block是否阻塞,timeout超时时间
get(self, block=True, timeout=None) 获取队列中的一个值,block是否阻塞,timeout超时时间
task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
qsize() 返回队列的大小
join() 等到队列为空,再执行别的操作
empty() 如果队列为空,返回True,反之False
full() 如果队列满了,返回True,反之False
get_nowait() 非阻塞获取队列中的值
put_nowait() 非阻塞将值放入队列中

join() 和 task_done()

1
2
3
4
5
6
7
8
from queue import Queue

q = Queue(5)
q.put(1)
q.get()
q.task_done() # 从队列中取出一个值后向队列发送一个信号,告诉队列任务已完成,否则,join还是会处于阻塞状态
q.join() # 这里会一直等待队列中的值为空之后才会继续执行后面的值
print('hello world')

执行结果

hello world

每次 get() 取完值后都要调用 task_done() 告知队列完成任务,这样当队列内容为空后,join() 才不会处于阻塞状态,若没有调用 task_done() ,则 get() 取完所有值后,join() 还是会处于阻塞状态

互斥锁 threading.Lock()

Lock()类中包含的方法

方法和属性 说明
acquire() 加锁
release() 释放锁

不加锁示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import threading

num = 0
def run():
global num
for i in range(1, 10000000):
num = num + 10
num = num - 10

threa1 = threading.Thread(target=run)
threa2 = threading.Thread(target=run)
threa1.start()
threa2.start()
threa1.join()
threa2.join()
print(num)

运行结果:

-50

因为线程之间的全局变量是共享的,当多个线程同时对 num 进行操作时会产生脏数据,导致最后的结果不是我们想要的,所以此时就需要对操作共享数据的代码段,进行加锁操作,避免同时有多个线程修改共享数据。

加锁示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import threading

num = 0
lock = threading.Lock()

def run(lock):
global num
for i in range(1, 10000000):
lock.acquire() # 加锁
num = num + 10 # 在锁中间的这部分代码,在运行时其实是单线程运行,因为在同一时刻,只能有一个线程对这段代码进行操作
num = num - 10
lock.release() # 释放锁

threa1 = threading.Thread(target=run, args=(lock,))
threa2 = threading.Thread(target=run, args=(lock,))
threa1.start()
threa2.start()
threa1.join()
threa2.join()
print(num)

运行结果:

0