线程是属于进程的,线程运行在进程空间内,同一进程所产生的线程共享同一内存空间,当进程退出时该进程所产生的线程都会被强制退出并清除。进程是资源分配的最小单位,线程是CPU调度的最小单位,每一个进程中至少有一个线程,线程可与属于同一进程的其它线程共享进程所拥有的全部资源,但是其本身基本上不拥有系统资源,只拥有一点在运行中必不可少的信息(如程序计数器、一组寄存器和栈)。
Threading模块提供线程相关的操作,Threading模块包含Thread,Lock,RLock,Event,Queue等组件;multiprocess模块完全模仿了threading模块的接口,二者在使用时,时极其相似的。
创建线程的两种方式:
示例1:
import time
from threading import Thread
def func(i):
time.sleep(1)
print("hello : %s"%i)
thread_l = []
# 开启多线程
for i in range(10):
t = Thread(target=func,args=(i,)) #实例化线程对象
t.start() # 激活线程
thread_l.append(t)
# 异步开启阻塞
for j in thread_l:
j.join() # 阻塞主线程,子线程执行完毕之后向下执行主线程代码
print("主线程")
结果:
hello : 2
hello : 0
hello : 1
hello : 3
hello : 5
hello : 4
hello : 7hello : 6hello : 9
hello : 8
主线程
示例2:使用类继承的方式创建线程
import time
from threading import Thread
class MyThread(Thread): # 继承Thread类
count = 0 # 子线程间会共享此静态属性
def __init__(self,arg1,arg2): # 通过init方法传递参数
super().__init__()
self.arg1 = arg1
self.arg2 = arg2
def run(self): # 必须实现run方法
MyThread.count += 1
time.sleep(1)
print("%s,hello!%s"%(self.arg1,self.arg2))
thread_l = []
for i in range(10):
t = MyThread('eric','jonny')
t.start()
thread_l.append(t)
for j in thread_l:
j.join()
print("conut: %s"%MyThread.count)
结果:
1,hello!jonny
0,hello!jonny
5,hello!jonny4,hello!jonny
3,hello!jonny
2,hello!jonny
6,hello!jonny9,hello!jonny
7,hello!jonny
8,hello!jonny
conut: 10
Thread的主要方法:
t.start() :激活线程
t.join():阻塞(等待子线程执行完毕,在向下执行),在每次激活线程后阻塞会使线程变为同步,所以要在线程激活完毕之后阻塞。
t.name :设置或获取线程名
t.getName():获取线程名
t.setName(NAME):设置线程名
t.is_alive() :判断线程是否激活
t.setDaemon() :设置守护线程,在激活线程之前设置,默认值为False
t.isDaemon() : 判断是否为守护线程
同一个进程内的线程是数据共享的,线程的GIL(全局解释性)锁是锁的线程调用CPU的时间,在第一个线程调用CPU操作共享数据的时候,时间轮转至第二个线程,第二个线程也要操作共享数据,这样就导致了数据的不一致,这是因为GIL不锁数据,这种情况下,线程锁的出现就能解决这个这个问题。
示例1:GIL锁发挥作用
from threading import Thread
def func():
global n
n -= 1
n = 1000
thread_l = []
for i in range(100):
t = Thread(target=func)
t.start()
thread_l.append(t)
for j in thread_l:
j.join()
print(n)
结果是:900
示例2:时间片轮转,GIL锁失效
import time
from threading import Thread
def func():
global n
# n -= 1
temp = n # 从进程中获取n
time.sleep(0.01) # 每个线程调用CPU的时间是很短的,制造时间片轮转的效果
n = temp -1 # 得到结果,再将修改过的数据返回给进程
n = 1000
thread_l = []
for i in range(100):
t = Thread(target=func)
t.start()
thread_l.append(t)
for j in thread_l:
j.join()
print(n)
结果是:998
示例3:互斥锁Lock,对数据加锁
import time
from threading import Thread
from threading import Lock
def func():
global n
# n -= 1
lock.acquire() # 上锁
temp = n # 从进程中获取n
time.sleep(0.01) # 每个线程调用CPU的时间是很短的,制造时间片轮转的效果
n = temp -1 # 得到结果,再将修改过的数据返回给进程
lock.release() # 释放
n = 1000
lock = Lock() # 实例化锁对象
thread_l = []
for i in range(100):
t = Thread(target=func)
t.start()
thread_l.append(t)
for j in thread_l:
j.join()
print(n)
结果是:900
互斥锁Lock后使数据一致,具有安全性,但是也带来了新的问题,因为锁的缘故,每一个线程都是串行的拿到锁,在释放;整个程序运行变成串行,效率降低。
示例4:递归锁Rlock
Lock在同一线程中只能被acquire一次,下一次的acquire必须等待release之后才可以;而RLock允许在同一线程中被多次acquire,但是acquire和release必须是成对存在。
from threading import Lock
from threading import RLock
lock = Lock() # 实例化出互斥锁对象
lock.acquire()
lock.acquire() # 在第二次acquire时,程序阻塞等待release之后拿到锁
print("死锁")
lock.release()
lock.release()
lock1 = RLock() # 实例化出递归锁对象
lock1.acquire()
lock1.acquire() # 可被多次acquire
print("running")
lock1.release()
lock1.release() # acquire与release成对出现
在多线程并发的情况下,同一个线程中,如果出现多次acquire,就可能发生死锁现象,使用RLock就不会出现死锁问题
线程的信号量与进程的信号量使用基本一致;信号量可以允许指定数量的线程操作上锁的数据,即一把锁有多个钥匙。对与有信号量限制的程序来说,信号量有几个任务就开启几个线程,在加锁阶段会限制程序运行数量,并不影响其它代码的并发。
示例:
import random
import time
from threading import Semaphore
from threading import Thread
def sing(i,sem):
sem.acquire() # 加锁
print('%s enter the ktv'%i)
time.sleep(random.randint(1,10))
print('%s leave the ktv'%i)
sem.release() # 释放
sem = Semaphore(4)
for i in range(20):
t = Thread(target=sing, args=(i, sem))
t.start()
事件:线程之间状态标记通信,使用方法与进程的基本一致
主要方法:
e = Event() # 实例化一个事件对象
e.set() # 标记变为非阻塞
e.wait() # 默认标记为阻塞,在等待的过程中,遇到非阻塞信号就继续执行
e.clear() # 标记变为阻塞
e.is_set() # 是否阻塞 True就是非阻塞,False是阻塞
示例:连接数据库
'''
连接数据库
每0.5秒连一次,连接三次
用事件来标志数据库的连接情况
如果连接成功,显示成功
否则报错
'''
import time
import random
from threading import Thread
from threading import Event
# 模拟检查连接,检查连接成功使事件标志位为非阻塞
def check_conn():
time.sleep(random.randint(1,3))
e.set()
# 在还没检查成功前等待,接到非阻塞信号则连接数据库
def conn_mysql():
count = 1
while not e.is_set():
if count > 3:
raise TimeoutError
print("尝试第 %s 次连接" % count)
count += 1
e.wait(0.5)
print("连接成功")
e = Event() # 实例化事件对象
Thread(target=check_conn).start()
Thread(target=conn_mysql).start()
定时器:定时开启一个线程,执行一个任务
示例:
from threading import Timer
def func():
print("hello")
'''
必须有两个参数
第一个是时间,单位为秒
第二个是要执行的函数
'''
Timer(1,func).start()
条件变量:条件包含递归锁RLock和事件Event中的wait()方法的功能。
五个方法:
acquire(): 递归锁
release(): 释放锁
wait(timeout): 等待通知,或者等到设定的超时时间;才会被唤醒继续运行。wait()必须在已获得Lock前提下才能调用,否则会触发RuntimeError异常。
notify(n=1): 通知其他线程,传入的参数必须时int类型的,那些挂起的线程接到这个通知之后会开始运行,默认是通知一个正等待该condition的线程,最多则唤醒n个等待的线程。notify()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。notify()不会主动释放Lock。
notifyAll(): 如果wait状态线程比较多,notifyAll的作用就是通知所有线程
示例:
from threading import Condition
from threading import Thread
def run(n):
con.acquire()
con.wait()
print("run the thread: %s"%n)
con.release()
if __name__ == '__main__':
con = Condition()
for i in range(10):
t = Thread(target=run,args=(i,))
t.start()
while True:
msg = input(">>> ")
if msg == 'q':
break
con.acquire() # 递归锁
if msg == 'all':
con.notify_all() # 放行所有线程
else:
con.notify(int(msg)) # 传递信号,放行线程,参数是int类型的
con.release() # 释放锁
queue模块就是线程的队列,它是数据安全的。
主要方法:
q.put(1) # 将传入的数据放入队列
q.get() # 根据对象所属类的不同,取出队列中的数据
q.join() # 等队列为空时,在执行别的操作
q.qsize() # 返回队列的大小,不一定准确
q.empty() # 队列为空时,返回True,否则返回False,不一定准确
q.full() # 队列满时,返回True,否则返回False,不一定准确
Queue类的使用:先进先出
import queue
q = queue.Queue() # 实例化一个队列对象,可给出队列长度,先进先出
q.put(1) # 将传入的数据放入队列
q.put(2)
print(q.get()) # 先进先出,取出队列的第一个值
LifoQueue类的主要方法:后进先出
import queue
lfq = queue.LifoQueue() # 实例化一个对象,可给出长度,后进先出
lfq.put(1)
lfq.put(2)
print(lfq.get()) #后进先出,取出2
PriorityQueue类的主要方法:优先级
import queue
pq = queue.PriorityQueue() # 实例化一个队列对象,优先级队列,优先级值越小越优先
pq.put((10,'a'))
pq.put((5,'b'))
pq.put((1,'c'))
pq.put((15,'d'))
for i in range(4):
print(pq.get())
结果:
(1, 'c')
(5, 'b')
(10, 'a')
(15, 'd')
concurrent是用来操作池的模块,这个模块可创建进程池和线程池,其使用方法完全一致,统一了入口和方法,使用池更便捷,且python内置,导入便可使用。
主要方法:
submit(FUNC,ARGS):创建线程对象和激活线程,FUNC是要执行的任务,ARGS是参数
shutdown():shutdown方法封装了close和join方法,调用该方法时,不能在忘池中添加任务,且要等待池中任务执行结束
result():result方法取线程执行的函数返回值
map(FUNC,iter):map方法异步执行,需传入要执行的任务FUNC,以及一个可迭代对象iter,map方法无返回值
add_done_callback(call):回调函数
示例1:
import time
import random
from concurrent import futures
def func(n):
time.sleep(random.randint(1,3))
print(n)
return n*"*"
thread_pool = futures.ThreadPoolExecutor(20) # 实例化一个线程池对象,一般开启CPU核数*5
f_l = []
for i in range(10):
t = thread_pool.submit(func,i) # submit方法合并了创建线程对象和激活线程的功能
f_l.append(t)
thread_pool.shutdown() # shutdown方法封装了close和join方法,调用该方法时,不能在忘池中添加任务,且要等待池中任务执行结束
for j in f_l:
print(j.result()) # result方法取线程执行的函数返回值
示例2:回调
from concurrent import futures
def func(n):
print(n)
return n*"*"
def call(args):
print(args.result())
thread_pool = futures.ThreadPoolExecutor(20)
thread_pool.submit(func,1).add_done_callback(call)