网络编程Day6-队列

前言

上此我们讲到了线程: 网络编程Day3-子线程 ,那么多线程如何应用呢?

就是我们今天要讲的主题了——队列。

队列,是用来解决线程安全的一种“利器”,讲述队列之前,我们需要对一个模型进行了解。


生产者消费者模型

生产者消费者模式是指通过一个容器来解决生产者和消费者的强耦合问题。

耦合: 用python代码来做比喻,如果更改一处代码,其他代码会随之受到影响的情景,我们就称之为耦合。

生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这是一个解耦的过程。


PS:python 的协程,就涉及到了生产者消费者模型的相关知识。
协程,又称微线程,纤程。英文名Coroutine。协程的作用,是在执行函数A时,可以随时中断,去执行函数B,然后中断继续执行函数A(可以自由切换)。但这一过程并不是函数调用(没有调用语句),这一整个过程看似像多线程,然而协程只有一个线程执行。(因此也被较为伪多线程)



队列

队列是用于线程安全的,提供了一个适用于多线程编程的数据结构。queue是python标准库中的线程安全的队列,默认基本FIFO队列(FIFO即First in First Out,先进先出)

Python Queue模块有三种队列及构造函数(返回一个数据容器):

  1. Python Queue模块的FIFO队列先进先出。
  2. LIFO类似于堆,即先进后出。 queue.LifoQueue(maxsize)
  3. 还有一种是优先级队列级别越低越先出来。 queue.PriorityQueue(maxsize)

队列的方法

q.put()

调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为True。如果队列当前为空且block为True,put()方法就使调用线程暂停,直到空出一个数据单元(等待其他线程做get操作)。如果block为False,put方法将引发Full异常(设置当队列塞不进数据时直接报错)。


q.get()

调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,

get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。


block 参数的作用:

设想一下,如果一个线程运行着 put 或 get 方法,当队列为空时,是不是一直保持着,等待状态,这跟我们之前讲过的 socket 一样,类似于它的 recv(一直等待消息),这时候,就处于阻塞状态。

put和get同样可以卡住,也是依赖于同步状态进行操作的。所以我们可以通过设置block值引发错误,然后通过异常捕获来避免这种情况发生。


其他方法

  • q.task_done(): 在每一次完成put或get时,就会发送一个信号
  • q.join(): 用于接收信号,只有全接收到信号时,才会执行下一步操作
  • 这两者的配合就达到了同步效果


队列的使用

假设现在有一个需求:开两个线程来删除某数据对象,这里我们使用一个列表来模拟。

运行之后发现报错了:原因很简单,remove是按值删除的,线程1删除掉的数据,列表已经没有这个值了,线程2想要删除这个值时,就报错。那么这个问题就是数据不安全导致的。


报错信息如下:


根据我们学到过的知识点,有这么两个解决方法:


方法一:加锁

  • 加了锁之后,虽然能够解决问题,但对于这种 CPU密集型的任务,其并不能完美胜任这个工作。

方法二:使用队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import queue
import threading
import time

the_list = [1,2,3,4,5]
q = queue.Queue()

def dl():

while len(the_list) >0:
data = q.get() # 从队列中拿数据
q.task_done() # 发送信号,表明已拿到数据
the_list.remove(data) # 接着将拿到的数据从列表中删除
print("%s 已经被删除了"%data)


def d2():
while len(the_list) >0:
a = the_list[-1] # 从列表中取出最后一个数据
q.put(a) # 将数据放入队列中
q.join() # 等待另一个线程的信号


t1 =threading.Thread(target=dl)
t2 =threading.Thread(target=d2)
t1.start()
t2.start()
  • 我们使用一个线程循环来完成这个需求,循环的条件时:列表不为空
  • 然后一个子线程不断的放、另一个子线程不断的取。最终达到删除列表数据的效果
  • 通过队列来完成解耦操作,保证了数据的安全性



1
队列的相关知识就讲到这儿了~  下期见,各位。