python多线程与多进程学习

学习多线程

多进程与多线程

程序:将一些代码放到文档中

进程:

程序运行的一个状态;每个进程有自己完全独立的运行环境,不共享数据

比如双击QQ,输入一个账号密码登录后,再次双击QQ,可以登录另一个QQ账号;这样就有了两个进程,彼此独立,不共享数据

线程:

一个进程的独立运行片段,一个进程可以由多个线程组成;
一个进程的多个线程共享数据和上下文运行环境

threading包

基本使用

import threading

t = threading.Thread(target=xxx, args=(xxx,))
#生成Thread实例
#target为函数名,args为传入函数的参数:若无参数,可省略args,若有一个参数,要在参数后加上逗号

t.start()
#启动多线程

t.join()
#等待多线程执行完成
#启动多线程后本程序就作为主线程存在,如果主线程执行完毕,则子线程可能也会终止
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import time
import threading

def loop1():
print('loop1开始运行\n')
time.sleep(4)
print('loop1运行完毕\n')

def loop2():
print('loop2开始运行\n')
time.sleep(2)
print('loop2运行完毕\n')

def main():
print('主程序开始运行\n')

t1 = threading.Thread(target=loop1, args=())
t1.start()

t2 = threading.Thread(target=loop2, args=())
t2.start()

print('主程序运行完毕\n')

if __name__ == '__main__':
main()

运行结果:

主程序开始运行

loop1开始运行

loop2开始运行

主程序运行完毕

loop2运行完毕

loop1运行完毕

可以看到,主线程运行完毕后子线程才运行完毕,而且由于子线程t2睡眠2s比子线程t1睡眠4s短而先运行完毕

等待线程

使用t.join(),让主线程等待子线程运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import time
import threading

def loop1():
print('loop1开始运行\n')
time.sleep(4)
print('loop1运行完毕\n')

def loop2():
print('loop2开始运行\n')
time.sleep(2)
print('loop2运行完毕\n')

def main():
print('主程序开始运行\n')

t1 = threading.Thread(target=loop1, args=())
t1.start()

t2 = threading.Thread(target=loop2, args=())
t2.start()

t1.join()
t2.join()
print('主程序运行完毕\n')

if __name__ == '__main__':
main()

运行结果:

主程序开始运行

loop1开始运行

loop2开始运行

loop2运行完毕

loop1运行完毕

主程序运行完毕

守护线程

如果想让主线程运行完毕后,子线程就停止运行,那么可以将子线程设置为守护线程

方法:在子线程start()前,

t.setDeamon(True)

守护线程能否有效与运行环境有关,在jupyter运行发现守护线程设置失效,而pycharm设置成功了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import time
import threading


def loop1():
print('loop1开始运行\n')
time.sleep(4)
print('loop1运行完毕\n')


def loop2():
print('loop2开始运行\n')
time.sleep(2)
print('loop2运行完毕\n')


def main():
print('主程序开始运行\n')

t1 = threading.Thread(target=loop1, args=())
t1.setDaemon(True)
t1.start()

t2 = threading.Thread(target=loop2, args=())
t2.setDaemon(True)
t2.start()

print('主程序运行完毕\n')


if __name__ == '__main__':
main()

输出结果:

主程序开始运行

loop1开始运行

loop2开始运行
主程序运行完毕

常用属性

threading.currentThread:返回当前线程变量
threading.enuerate: 返回一个包含正在运行的线程的list
threading.activeCount: 返回正在运行的线程的数量
t1.setName(): 给线程命名
t1.getName(): 得到线程名称

直接继承threading.Thread

使用类的实例来启动多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import threading


class Th(threading.Thread):
def __init__(self, arg):
super(Th, self).__init__()
self.arg = arg

# 必须重写run函数,代表真正执行的功能
def run(self):
print(self.arg)


for i in range(5):
t = Th(i)
t.start()
t.join()

print('over')

事件

Event()可以使主进程控制其他线程的执行

Event()全局定义了一个标志flag;若flag为True,则当前线程执行Event.wait()方法时会阻塞,其他线程会等待;若flag为False,则不再阻塞

Event()的方法:

  • set():设置标记为True
  • clear():设置标记为False
  • wait():若标志为True则进行阻塞,标志为False则不进行阻塞
  • isSet():返回flag状态

使用类来进行线程的暂停、继续和终止操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import threading
import time


class Job(threading.Thread):

def __init__(self):
super(Job, self).__init__()
self.__flag = threading.Event() # 用于暂停线程的标识
self.__flag.set() # 设置为True
self.__running = threading.Event() # 用于停止线程的标识
self.__running.set() # 将running设置为True

def run(self):
while self.__running.isSet():
self.__flag.wait() # 为True时立即返回, 为False时阻塞直到内部的标识位为True后返回
print(time.time())
time.sleep(1)

def pause(self):
print('pause')
self.__flag.clear() # 设置为False, 让线程阻塞

def resume(self):
print('resume')
self.__flag.set() # 设置为True, 让线程停止阻塞

def stop(self):
print('stop')
self.__flag.set() # 将线程从暂停状态恢复, 如已经暂停的话
self.__running.clear() # 设置为False


a = Job()
a.start()

time.sleep(3)

a.pause()
time.sleep(3)
a.resume()
time.sleep(3)
a.pause()
time.sleep(2)
a.stop()

参考:

https://fishc.com.cn/thread-104073-1-7.html
https://www.cnblogs.com/yoyoketang/p/8341972.html

共享变量

当多个线程同时访问一个变量的时候,会产生共享变量的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import threading

sums = 0

def Add():
global sums
for i in range(1,1000000):
sums += 1

def Minus():
global sums
for i in range(1,1000000):
sums -= 1


if __name__ == '__main__':
t1 = threading.Thread(target=Add,args=())
t2 = threading.Thread(target=Minus,args=())

t1.start()
t2.start()

t1.join()
t2.join()

print(sums)

结果:输出随机的数,正数或负数

解决方法:锁(锁互斥的资源)

1.是一个标志,表示一个线程在占用一些资源
2.使用时要上锁,用完后还要释放锁

使用锁保证了在同一时间只有一个线程在真正访问这个变量

定义锁:

lock = threading.Lock()

申请锁:

lock.aquire()

释放锁:

lock.release()

使用锁之后:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import threading

sums = 0
lock = threading.Lock()

def Add():
global sums
for i in range(1,1000000):
lock.acquire()
sums += 1
lock.release()

def Minus():
global sums
for i in range(1,1000000):
lock.acquire()
sums -= 1
lock.release()


if __name__ == '__main__':
t1 = threading.Thread(target=Add,args=())
t2 = threading.Thread(target=Minus,args=())

t1.start()
t2.start()

t1.join()
t2.join()

print(sums)

输出结果:

0

使用锁的时候应注意防止死锁情况

timeout参数
lock1 = threading.Lock()
lock1.acquire(timeout=2)  #如果锁空闲,则立即申请到,否则最多等待2s

lock2 = threading.Lock()
rst = lock2.acquire(timeout=2) #若在2s内申请成功,返回true,否则false

线程数量

semaphore:允许一个资源最多可以让几个线程同时使用

1
2
3
4
5
6
7
8
9
10
11
12
import threading
semaphore = threading.Semaphore(3) #最多允许三个线程同时使用

def func():
if semaphore.acquire():
print(threading.CurrentThread().getName())
semaphore.release()
print(threading.CurrentThread().getName())

for i in range(5):
t = threading.Thread(target=func)
t.start()

设置线程启动时间

设置6s之后启动线程:

def func():
    print('xxx')

t = threading.Timer(6, func)
t.start()

multiprocessing包

基本使用

multiprocessing模块提供了一个Process类来代表一个进程对象

getpid():获得当前进程的编号。系统每开辟一个新进程就会为它分配一个进程号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import multiprocessing
import os


# 子进程要执行的代码
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))


if __name__ == '__main__':
print('Parent process %s.' % os.getpid())
p = multiprocessing.Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join() # 等待子进程
print('Child process end.')

进程池

启动大量的子进程:

1
2
3
4
5
6
7
import multiprocessing

p = multiprocessing.Pool(4)
p.apply_async(target=func,args=())
p.close()
p.join()
# apply_async :不用等待当前进程执行完毕,随时根据系统调度来进行进程切换。

首先主进程开始运行,碰到子进程后,主进程仍可以先运行,等到操作系统进行进
程切换的时候,再交给子进程运行。可以做到不等待子进程执行完毕,主进程就已经
执行完毕,并退出程序。

同线程池一样,对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。

Pool的默认大小是CPU的核数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from multiprocessing import Pool
import os, time, random


def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
time.sleep(1)
print('Task %s runs over' % name)


if __name__ == '__main__':
print('Parent process %s.' % os.getpid())
p = Pool(2)
for i in range(5):
p.apply_async(long_time_task, args=(i,))
print('Waiting...')
p.close()
p.join()
print('done.')

电脑cpu核数不同,输出也不同

线程池

使用的multiprocessing包与进程使用的模块一样,但

from multiprocessing import Pool 这样导入的Pool表示的是进程池; 

from multiprocessing.dummy import Pool 这样导入的Pool表示的是线程池。

创建线程池:

ThreadPool(10)  # 创建10个容量的线程池并发执行

map()是 Python 内置的高阶函数,它接收一个函数 f 和一个 list,并通过把函数 f 依次作用在 list 的每个元素上,得到一个新的 list 并返回。

同样,在使用线程池并发时,用到函数与多个参数,因此也用到了map:

pool.map(func, list)  #func为函数名,list为参数列表

在使用

pool.join()

等待之前,应先调用

pool.close()

例子-python多线程MD5截断脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import hashlib
from multiprocessing.dummy import Pool


def md5(t):
lists = t.split(':')
t1 = int(lists[0])
t2 = int(lists[1])
for i in range(t1, t2):
if hashlib.md5(str(i).encode('utf-8')).hexdigest()[:6] == '75fc82':
print(i)
break


if __name__ == '__main__':
pool = Pool(5)
lists = ['1:100000', '100000:200000', '200000:300000', '300000:400000', '400000:500000']
pool.map(md5, lists)
pool.close()
pool.join()
print('over')