一个串行程序需要从每个I/O终端通道来检测用户的输入,然而程序在读取过程中不能阻塞,因为用户输入的到达时间的不确定,并且阻塞会妨碍其他I/O通道的处理。由于串行程序只有唯一的执行线程,因此它需要兼顾执行的多个任务,确保其中的某个任务不会占用过多的时间,并对用户的响应时间进行合理的分配。这种任务类型的串行程序的使用,往往造成非常复杂的控制流,难以维护。
多线程编程的本质就是异步,需要多个并发活动,每个活动的处理顺序不确定,或者说随机的。这种编程任务可以被组织或划分成多个执行流,其中每个执行流都有一个指定要完成的任务。根据应用的不同,这些子任务可能需要计算出中间结果,然后合并为最终的输出。使用多线程编程,以及类似的Queue的共享数据结构,这个编程任务可以规划成几个特定函数的线程。使用多线程编程来规划这种编程任务可以降低程序的复杂度,使其实现更加清晰、高校,简洁。
进程与线程
计算机程序只是存储在磁盘上的可执行二进制文件。只有把它们加载到内存中并操作系统调用,才能拥有其生命周期。进程则是一个执行中的程序。每个进程都有自己的地址空间、内存、数据栈以及其他用于跟踪执行的辅助数据。进程可以通过派生(fork或spawn)新的进程来执行其他任务,但是因为每个新进程也拥有自己的内存和数据栈等,所以只能采用进程间通信(IPC)的方式共享信息。
线程与进程类似,不过它们是在同一进程下执行的,并共享相同的上下文。一个进程中的各个线程与主线程共享同一片数据空间,因此相比于独立的进程而言,线程间的共享和通信更加容易。线程一般以并行方式执行,正是由于这种并行和数据共享,是的多任务的协作成为可能。但是线程建的数据共享可能引起多个线程访问同一片数据造成竞态条件,而且多个线程无法给与公平的执行时间。
全局解释锁
Python的代码执行是由Python虚拟机(解释器主循环)进行控制。在主循环中同时只有一个控制线程在执行,就像单核CPU系统中的多线程一样。内存中可以有许多程序,但是任意给定的时刻只能有一个程序在运行。同理,尽管Python解释其中可以运行多个线程,但任意时刻只有一个线程会被解释器执行。
对Python虚拟机的访问是由全局解释锁(GIL)控制的。这个锁就是用来保证同时只能有一个线程运行。在多线程环境中,Python虚拟机将按照以下方式执行:
1.设置GIL 2.切换到一个线程去运行 3.运行: b. 线程主动让出控制(调用time.sleep(0)) 4.把线程设置为睡眠状态 |
Python中的threading模块
Python提供了多个模块来支持来支持多线程编程,包括thread、threading和Queue模块等。然而建议避免使用thread模块,threading模块更加先进,有更好的线程支持,并且thread模块中一些属性会和threading冲突,另外低级别的thread模块拥有的同步原语和很少。更重要的是,在Python3中已经没有thread模块。
thread模块和锁对象:
thread模块的函数
start_new_thread(function, args[, kwargs]) | 派生一个新的线程 |
allocate_lock() | 分配LockType锁对象 |
exit() | 线程退出 |
LockType锁对象的方法
acquire(wait=None) | 尝试获取锁对象 |
locked() | 如果获取锁对象返回True,否则False |
release() | 释放锁 |
threading模块
threading模块的对象
对象 | 描述 |
Thread | 一个线程的执行对象 |
Lock | 锁原语对象 |
RLock | 可重入锁对象,使单一线程(再次)获得已持有的锁(递归锁) |
Condition | 条件变量对象,使得一个线程等待另一个线程满足特定的条件 |
Event | 条件变量的通用版本,任意数量的线程等待某个事件的发生,该事件发生后所有线程将激活 |
Semaphore | 为线程间共享的有限资源提供'计数器',如果没有可用资源时会被阻塞 |
BoundedSemaphore | 与Semaphore相似,但它不允许超过初始值 |
Timer | 与Thread相似,在运行前需要等待一段时间 |
Barrier | 创建一个'障碍',必须达到指定数量线程后才可以继续 |
Thread对象数据属性
name | 线程名 |
ident | 线程标识符 |
daemon | 布尔标志,表示这个线程是否是守护线程 |
Thread对象方法
__init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None) | 实例化一个线程 |
start(self) | 开始执行该线程 |
run(self) | 定义线程功能的方法 |
join(self, timeout=None) | 直至启动的线程终止之前一直挂起,除非给出了timeout,否则一直阻塞 |
getName(self) | 返回线程名 |
setName(self, name) | 设定线程名 |
isAlive()/is_alive(self) | 布尔标志,表示线程是否还存活 |
isDaemon(self) | 如果是守护线程,返回True |
setDaemon(self, daemonic) | 把线程的守护标志设定为deamonic(必须在线程start()之前调用) |
使用Thread类,可以有很多方法创建线程,有三种常用的方法:
创建Thread的实例,传给它一个函数 创建Thread的实例,传给它一个可调用的类实例 派生Thread的子类,并创建子类的实例 |
创建Thread的实例,传给它一个函数:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
'''
join()方法,可以等待所有线程执行完毕
'''
import threading
from time import sleep,ctime
loops = [4,2]
def loop(nloop,nsec):
print('start loop',nloop,'at:',ctime())
sleep(nsec)
print('loop',nloop,'done at:',ctime())
def main():
print('starting at:',ctime())
threads = []
nloops = range(len(loops))
for i in nloops:
t = threading.Thread(target=loop,args=(i,loops[i]))
threads.append(t)
for i in nloops:
threads[i].start()
for i in nloops:
threads[i].join()
print('all DONE at:',ctime())
if __name__ == '__main__':
main()
运行结果:
starting at: Wed May 10 12:30:28 2017
start loop 0 at: Wed May 10 12:30:28 2017
start loop 1 at: Wed May 10 12:30:28 2017
loop 1 done at: Wed May 10 12:30:30 2017
loop 0 done at: Wed May 10 12:30:32 2017
all DONE at: Wed May 10 12:30:32 2017
创建Thread的实例,传给它一个可调用的类实例:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
from time import sleep,ctime
loops = [4,2]
class ThreadFunc(object):
def __init__(self,func,args,name=''):
self.name = name
self.func = func
self.args = args
def __call__(self):
self.func(*self.args)
def loop(nloop,nsec):
print('start loop',nloop,'at:',ctime())
sleep(nsec)
print('loop',nloop,'done at:',ctime())
def main():
print('starting at:',ctime())
threads = []
nloops = range(len(loops))
for i in nloops:
t = threading.Thread(target=ThreadFunc(loop,(i,loops[i]),loop.__name__))
threads.append(t)
for i in nloops:
threads[i].start()
for i in nloops:
threads[i].join()
print('all DONE at:',ctime())
if __name__ == '__main__':
main()
运行结果:
starting at: Wed May 10 12:31:23 2017
start loop 0 at: Wed May 10 12:31:23 2017
start loop 1 at: Wed May 10 12:31:23 2017
loop 1 done at: Wed May 10 12:31:25 2017
loop 0 done at: Wed May 10 12:31:27 2017
all DONE at: Wed May 10 12:31:27 2017
派生Thread的子类,并创建子类的实例
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
from time import sleep,ctime
loops = (4,2)
class MyThread(threading.Thread):
def __init__(self,func,args,name=''):
threading.Thread.__init__(self)
self.name = name
self.func = func
self.args = args
def run(self):
self.func(*self.args)
def loop(nloop,nsec):
print('start loop',nloop,'at:',ctime())
sleep(nsec)
print('loop',nloop,'done at:',ctime())
def main():
print('starting at:',ctime())
threads = []
nloops = range(len(loops))
for i in nloops:
t = MyThread(loop,(i,loops[i]),loop.__name__)
threads.append(t)
for i in nloops:
threads[i].start()
for i in nloops:
threads[i].join()
print('all DONE at:',ctime())
if __name__ == '__main__':
main()
运行结果:
starting at: Wed May 10 10:43:10 2017
start loop 0 at: Wed May 10 10:43:10 2017
start loop 1 at: Wed May 10 10:43:10 2017
loop 1 done at: Wed May 10 10:43:12 2017
loop 0 done at: Wed May 10 10:43:14 2017
all DONE at: Wed May 10 10:43:14 2017
锁的使用
锁有两种状态:锁定和未锁定。而且它也只支持两个函数,获得锁和释放锁。当多线程争夺锁时,允许第一个获得锁的线程进入临界区,并执行代码。所有之后到达的线程将被阻塞,直到第一个线程之行结束,退出临界区,并释放锁。此时,其他等待的线程可以获得锁并进入临界区,不过那些被阻塞的线程进入临界区没有先后顺序,根据Python实现不同而有所区别。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from atexit import register
from random import randrange
from threading import Thread,Lock,current_thread
from time import sleep,ctime
class CleanOutputSet(set):
def __str__(self):
return ','.join(x for x in self)
lock = Lock()
loops = (randrange(2,5) for x in range(randrange(3,7)))
remaining = CleanOutputSet()
def loop(nsec):
myname = current_thread().name
lock.acquire()
remaining.add(myname)
print('[%s] started %s' %(ctime(),myname))
lock.release()
sleep(nsec)
lock.acquire()
remaining.remove(myname)
print('[%s] completed %s (%d secs)' %(ctime(),myname,nsec))
print('(remaining:%s)' %(remaining or 'None'))
lock.release()
def main():
for pause in loops:
Thread(target=loop,args=(pause,)).start()
@register
def _atexit():
print('all DONE at:',ctime())
if __name__ == '__main__':
main()
运行结果:
[Wed May 10 11:20:14 2017] started Thread-1
[Wed May 10 11:20:14 2017] started Thread-2
[Wed May 10 11:20:14 2017] started Thread-3
[Wed May 10 11:20:16 2017] completed Thread-3 (2 secs)
(remaining:Thread-2,Thread-1)
[Wed May 10 11:20:16 2017] completed Thread-2 (2 secs)
(remaining:Thread-1)
[Wed May 10 11:20:18 2017] completed Thread-1 (4 secs)
(remaining:None)
all DONE at: Wed May 10 11:20:18 2017
信号量的使用
信号量是最古老的同步原语之一。它是一个计数器,当资源消耗时递减,当资源释放时递增。信号量比锁更加灵活,因为可以有多个线程,每个线程拥有有限资源的一个实例。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from atexit import register
from random import randrange
from threading import BoundedSemaphore,Lock,Thread
from time import sleep,ctime
lock = Lock()
MAX = 5
candytray = BoundedSemaphore(MAX)
def refill():
lock.acquire()
print('Refilling candy...')
try:
candytray.release()
except ValueError:
print('full,skipping')
else:
print('OK')
lock.release()
def buy():
lock.acquire()
print('Buying candy...')
if candytray.acquire(False):
print('OK')
else:
print('empty,skipping')
lock.release()
def producer(loops):
for i in range(loops):
refill()
sleep(randrange(3))
def consumer(loops):
for i in range(loops):
buy()
sleep(randrange(3))
def main():
print('starting at:',ctime())
nloops = randrange(2,6)
print('THE CANDY MACHINE ( full with %d bars)'%MAX)
Thread(target=consumer,args=(randrange(nloops,nloops+MAX+2),)).start()
Thread(target=producer,args=(nloops,)).start()
@register
def _atexit():
print('all DONE at:',ctime())
if __name__ == '__main__':
main()
运行结果:
starting at: Wed May 10 11:42:34 2017
THE CANDY MACHINE ( full with 5 bars)
Buying candy...
OK
Refilling candy...
OK
Refilling candy...
full,skipping
Buying candy...
OK
Refilling candy...
OK
Refilling candy...
full,skipping
Buying candy...
OK
Refilling candy...
OK
Buying candy...
OK
Buying candy...
OK
Buying candy...
OK
Buying candy...
OK
Buying candy...
OK
Buying candy...
empty,skipping
Buying candy...
empty,skipping
Buying candy...
empty,skipping
all DONE at: Wed May 10 11:42:44 2017
生产者-消费者问题和Queue/queue
queue模块的类
Queue(maxsize=0) | 创建一个先入先出队列。如果给定最大值,在队列没有空间时阻塞,否则为无限队列 |
LifoQueue(maxsize=0) | 创建一个后入先出队列。如果给定最大值,在队列没有空间时阻塞,否则为无限队列 |
PriorityQueue(maxsize=0) | 创建一个优先级队列。如果给定最大值,在队列没有空间时阻塞,否则为无限队列 |
queue异常
Empty | 当对空队列调用get()方法时抛出异常 |
Full | 当对满队列调用put()方法时抛出异常 |
queue对象方法
qsize() | 返回队列大小 |
empty() | 如果队列为空,则返回True |
full() | 如果队列为满,则返回True |
put(item,block=True,timeout=None) | 将item放入队列,如果block为True且timeout为None,则在有可调用空间之前阻塞;如果timeout为正值 ,则最多阻塞timeout秒,如果block为False,则抛出Empty异常 |
put_nowait(item) | 和put(item,False)相同 |
get(block=True,timeout=None) | 从队列中取得元素,如果给定block(非0),则一直阻塞到有可用元素为止 |
get_nowait(item) | 和get(False)相同 |
task_done() | 表示队列中的某个元素已经完成,该方法会被 join()使用 |
join() | 在队列所有元素执行完毕并调用task_done信号之前, 保持阻塞。 |
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
from queue import Queue
import random
class consumer(threading.Thread):
def __init__(self, que):
threading.Thread.__init__(self)
self.queue = que
def run(self):
while True:
if self.queue.empty():
break
item = self.queue.get()
#processing the item
time.sleep(item)
print(self.name,item)
self.queue.task_done()
return
que = Queue()
for x in range(10):
number = random.randint(1, 5)
print('random %d number is %d:' % (x, number))
que.put(number, True, None)
print('queue is:', que.queue)
consumers = [consumer(que) for x in range(5)]
for c in consumers:
c.start()
que.join()
运行结果:
random 0 number is 5:
random 1 number is 1:
random 2 number is 2:
random 3 number is 1:
random 4 number is 5:
random 5 number is 3:
random 6 number is 3:
random 7 number is 2:
random 8 number is 2:
random 9 number is 2:
queue is: deque([5, 1, 2, 1, 5, 3, 3, 2, 2, 2])
Thread-2 1
Thread-4 1
Thread-3 2
Thread-3 2
Thread-2 3
Thread-4 3
Thread-1 5
Thread-5 5
Thread-3 2
Thread-2 2