Python复习(七)-多线程和多进程

进程与线程

  • 进程(process) 是指计算机已运行的程序,你可以理解成一个任务就是一个进程,比如打开浏览器,启动音乐播放器等
  • 线程(thread) 是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。在Unix System V及SunOS中也被称为轻量进程(lightweight processes),但轻量进程更多指内核线程(kernel thread),而把用户线程(user thread)称为线程

1.随着多核处理器主见成为主流而非特例,与以前相比,将处理在和分布到多台处理器上(以便充分利用所有可用的处理器资源)变得更吸引人,也更具有可行性。有两种方法可用对工作和在进行分布:多进程、多线程。

2.使用多进程,也就是运行多个单独的程序,每个进程都是独立运行的,这是的对并发性进行处理的所有任务都是由底层的操作系统完成的。不足之处在于,程序与各单独进程间通讯与数据共享可能不是很方便。在UNIX系统上,这可以使用exec与fork来完成;但是对跨平台程序,就必须使用其他解决方案。最简单的,也是在这里进行展示的,就是由调用程序为其运行的进程提供数据进行处理。一种更灵活的方法是使用网络,并可以极大地简化这种双向通信。当然很多情况下,这种通信并不是必要的——我们只需要从一个负责协调的程序来运行一个或多个其他程序。

3.一种将工作和在分布到独立进程上的替代方法是创建现成话程序,并将工作和在分布到独立的线程上处理。优势在于:通信可以简单地通过共享数据(前提是要确保共享数据一次只能由一个线程进行存取)完成,但同时也将并发性管理等任务留给了程序员。

-《python3程序开发指南第二版》

# 使用threading 模块
# 单线程执行
import time

def sayhello():
    print("Hello boy!")
    time.sleep(1)
start = time.time()
for i in range(5):
    sayhello()
end = time.time()
print(end-start)
Hello boy!
Hello boy!
Hello boy!
Hello boy!
Hello boy!
5.0062620639801025
# 使用threading 模块
# 多线程执行
import threading
import time

def sayhello():
    print("Hello boy!")
    time.sleep(1)
start = time.time()
for i in range(5):
    t = threading.Thread(target=sayhello)
    t.start()
end = time.time()
print(end-start)
Hello boy!
Hello boy!
Hello boy!
Hello boy!
Hello boy!
0.012967824935913086
# 主线程会等待所有的子进程结束后才结束
import threading
from time import sleep, ctime

def sing():
    for i in range(3):
        print("sing...")
        sleep(1)
def play():
    for i in range(3):
        print("play...")
        sleep(1)
print("Start: %s", ctime())
t1 = threading.Thread(target=sing)
t2 = threading.Thread(target=play)

t1.start()
t2.start()

print("End: %s", ctime())
Start: %s Thu Jan 23 20:20:40 2020
sing...
play...
End: %s Thu Jan 23 20:20:40 2020
play...sing...

play...sing...

# 查看线程数量
import threading
from time import sleep, ctime

def sing():
    for i in range(3):
        print("sing...")
        sleep(1)

def play():
    for i in range(3):
        print("Play...")
        sleep(1)

print("Start: %s" % ctime())

t1 = threading.Thread(target = sing)
t2 = threading.Thread(target = play)

t1.start()
t2.start()

while True:
    length = len(threading.enumerate())
    print("当前线程数为: %d" % length)
    if length <= 1:
        break
    sleep(2)
Start: Thu Jan 23 20:31:05 2020
sing...
Play...
当前线程数为: 7
sing...
Play...
sing...
Play...
---------------------------------------------------------------------------

KeyboardInterrupt                         Traceback (most recent call last)

<ipython-input-21-718a3722149c> in <module>
     26     if length <= 1:
     27         break
---> 28     sleep(2)
KeyboardInterrupt: 
# 线程执行代码的封装
# 继承 threading.Thread类,重写run方法

import threading
import time

class Mythread(threading.Thread):
    def run(self):
        for i in range(3):
            time.sleep(0.5)
            print(self.name+'@'+ str(i))

for i in range(5):
    t = Mythread()
    t.start()
Thread-25@0
Thread-24@0
Thread-22@0Thread-23@0

Thread-21@0
Thread-23@1Thread-24@1
Thread-25@1

Thread-22@1
Thread-21@1
Thread-25@2Thread-24@2Thread-21@2Thread-23@2
Thread-22@2

从代码和执行结果我们可以看出,多线程程序的执行顺序是不确定的。当执行到sleep语句时,线程将被阻塞(Blocked),到sleep结束后,线程进入就绪(Runnable)状态,等待调度。而线程调度将自行选择一个线程执行。上面的代码中只能保证每个线程都运行完整个run函数,但是线程的启动顺序、run函数中每次循环的执行顺序都不能确定。

# 多线程-共享全局变量
import threading
import time

share_num = 100

def work1():
    global share_num
    for i in range(3):
        share_num += 1
    print("work1... share_num = %d" % share_num)
def work2():
    global share_num
    print("work2... share_num = %d" % share_num)

print("first num = %d" % share_num)
t1 = threading.Thread(target=work1)
t1.start()

time.sleep(1)

t2 = threading.Thread(target=work2)
t2.start()
first num = 100
work1... share_num = 103
work2... share_num = 103
# 列表当作实参传递到线程中
from threading import Thread
import time

def work1(nums):
    nums.append(4)
    print("work1 list: ", nums)
def work2(nums):
    time.sleep(1)
    print("work2 list: ", nums)

nums = [1, 2, 3]

t1 = Thread(target = work1, args = (nums,))
t1.start()

t2 = Thread(target = work2, args = (nums,))
t2.start()
work1 list:  [1, 2, 3, 4]
work2 list:  [1, 2, 3, 4]
  • 在一个进程内的所有线程共享全局变量,很方便在多个线程间共享数据
  • 缺点就是,线程是对全局变量随意遂改可能造成多线程之间对全局变量的混乱(即线程非安全)
# 多线程-共享全局变量问题
from threading import Thread
import time

g_num = 0

def work1(num):
    global g_num
    for i in range(num):
        g_num += 1
    print("work1.... g_num is %d" % g_num)
def work2(num):
    global g_num
    for i in range(num):
        g_num += 1
print("first g_num = %d" % g_num)

t1 = Thread(target=work1, args = (1000,))
t1.start()

t2 = Thread(target=work2, args = (1000,))
t2.start()
print("End..... g_num = %d" % g_num)
first g_num = 0
work1.... g_num is 1000
End..... g_num = 2000
# 多线程-共享全局变量问题
from threading import Thread
import time

g_num = 0

def work1(num):
    global g_num
    for i in range(num):
        g_num += 1
    print("work1.... g_num is %d" % g_num)
def work2(num):
    global g_num
    for i in range(num):
        g_num += 1
    print("work2.... g_num is %d" % g_num)
print("first g_num = %d" % g_num)

t1 = Thread(target=work1, args = (1000000,))
t1.start()

t2 = Thread(target=work2, args = (1000000,))
t2.start()

print("End..... g_num = %d" % g_num)
# 如果多个线程同时对同一个全局变量操作,会出现资源竞争问题,从而数据结果会不正确
first g_num = 0
End..... g_num = 325248
work1.... g_num is 1000000
work2.... g_num is 1384982
# 解决以上问题,进行线程同步即可(同步锁)
# 同步就是协同步调, 按预定的先后次序进行运行,如:你说完,我再说
# "同"字从字面上容易理解为一起动作-其实不是,"同"字应是指协同、协助、互相配合。
from threading import Thread, Lock
import time

g_num = 0

def work1(num):
    global g_num
    for i in range(num):
        lock.acquire()
        g_num += 1
        lock.release()
    print("work1.... g_num is %d" % g_num)
    
def work2(num):
    global g_num
    for i in range(num):
        lock.acquire()
        g_num += 1
        lock.release()
    print("work2.... g_num is %d" % g_num)

print("first g_num = %d" % g_num)
lock = Lock()

t1 = Thread(target=work1, args = (1000000,))
t1.start()
t1.join()

t2 = Thread(target=work2, args = (1000000,))
t2.start()
t2.join()

print("End..... g_num = %d" % g_num)
first g_num = 0
work1.... g_num is 1000000
work2.... g_num is 2000000
End..... g_num = 2000000

死锁的问题

通俗的理解:死锁就是多个线程各自已经占据了一些资源,但是同时又需要其他线程占据的互斥资源,才得以继续执行下去。多方都不愿意放弃已有的资源,但是又得不到想要的其他资源。由此达到了一种僵持的状态,使得程序无法继续执行下去。

参考:

多进程

程序:例如xxx.py这是程序,是一个静态的

进程:一个程序运行起来后,代码+用到的资源 称之为进程,它是操作系统分配资源的基本单元。

不仅可以通过线程完成多任务,进程也是可以的
multiprocessing模块就是跨平台版本的多进程模块,提供了一个Process类来代表一个进程对象,这个对象可以理解为是一个独立的进程,可以执行另外的事情

# 2 个 while循环一起执行
from multiprocess import Process
import time

def run_proc():
    " 子进程要执行的代码 "
    while True:
        print("- - - 2 - - -")
        time.sleep(1)
p = Process(target=run_proc)
p.start()
while True:
    print("- - - 1 - - -")
    time.sleep(1)
# 创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动
# 进程pid
from multiprocess import Process
import os
import time

def run_proc():
    print("子进程运行中... pid = %d" % os.getpid())
    print("子进程要结束")

print("父进程pid = %d " % os.getpid())
p = Process(target = run_proc)
p.start()
p.join()
父进程pid = 3100 

多进程

多进程使用subprocess模块

# child.py 无多进程
import sys
import os
import time
input_msg = sys.stdin.readline() # sys.stdin是一个标准化输入的方法
msg = '当前获取到的参数: {0},当前进程号:{1}'.format(input_msg,os.getpid())
print(msg) #子进程是标准输出不能直接在控制台看到
with open(r'tao.txt', 'a+', encoding='utf8') as f:
    f.write(msg)
    f.write('\n')
time.sleep(5)
当前获取到的参数: ,当前进程号:10192
# father.py
import subprocess
import sys
import os
child_path = os.path.join(os.path.dirname(__file__),'child.py')
command = [sys.executable, child_path] # sys.execcutable就是Python命令的路径

pipes = []
n = 0 
while n < 5:
    pipe = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
    pipe.stdin.write(str(n).encode('utf-8')) # 这里只接受字节串输入,可以查看源码
    pipe.stdin.close()
    pipes.append(pipe)
    n += 1 
while pipes:
    pipe = pipes.pop()
    pipe.wait()

总结:可以看到主进程启动了子进程,子进程的标准输出不能在控制台看到(如果需要看到子进程的数据,可以通过获取子进程的输出来查看);主进程将每个子进程进行了wait()操作,可以使得主进程的继续需要子进程全部结束才会继续,且主进程的中断会使得子进程全部中断。如果子进程需要看到输出结果的话,可以采用子进程输出日志进行监控进程数据

使用多进程模块multiprocessing

import multiprocessing
import os
import time

def run_case(*text):
    print("入参:{0},当前进程号:{1}".format(text.getpid()))
    time.sleep(20)
if __name__ == "__main__": # windows必须使用这句话,不要问我为什么,我也不知道
    print("当前是父进程,进程号:{0}".format(os.getpid()))
    child = multiprocessing.Process(target=run_case,args=(1,2,3,))
    print('子进程启动')
    child.start()
    child.join()
    print('进程结束')
当前是父进程,进程号:10192
子进程启动
进程结束

Pool

进程池:定义了一个池子,在里面放上固定数量的进程,有需求来了,就拿这个池中的一个进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有许多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行

# pool.py
import multiprocessing
import os
import time

def run_case(*text):
    print("入参:{0},当前进程号:{1}".format(text,os.getpid()))
    time.sleep(1)
if __name__ == '__main__':
    pool_num = multiprocessing.Pool(4)
    print('主进程:{0}'.format(os.getpid()))
    print('子进程开始')
    for i in range(20):
        pool_num.apply_async(run_case,args=(i,))
    pool_num.close()
    pool_num.join()
    print('子进程结束了')
    print('主进程结束了')
# 发现了吗?子进程只启动了4个,然后这个4个进程结束了,这4个进程的进程号没有变化?看到了吗
# 总结:进程池的优势在于不会立即销毁进程,不会重新启动新的进程。效率更高
主进程:10192
子进程开始

进程间通讯

我们知道进程是独立的,多进程和多线程的最大的不同就在于,多线程共享同样的变量;但对于多进程而言,进程间的变量是独立的。

# multiprocess_queue.py
import muitlprocessing
import os

class A:
    def __init__(self, num):
        self.num = num
        pass
    def get(self):
        return self.num
one = A(1)
two = A(2)

def put_msg(que):
    print("写数据:我的进程号是{0}".format(os.getpid()))
    for i in [one,two]:
        que.put(i)
def get_msg(que):
    print("读数据:我的进程号是{0}".format(os.getpid()))
    while True:
        msg = que.get()
        print('{0}拿到的数据:{1}'.format(os.getpid(),msg.get()))
....

参考:

本文链接:

https://www.betao.cn/archives/python-review07.html
1 + 1 =
快来做第一个评论的人吧~