Python学习笔记(1)

Python中的进程和线程.

线程,是操作系统分发给CPU的最小执行单位,至少一条线程组成进程。

要想实现多任务,自然就会有以下三种方法:

  • 多进程模式
  • 多线程模式
  • 多线程+多进程模式

Python对于线程和进程的支持都不错,在以上三种模式中,前两种较为常见。

多进程

os模块中封装了OS的fork调用,通过这个可以非常方便的创建子进程.(pid = process id)

1
2
3
4
5
6
7
import os
print("Process (%s) is running..." % os.getpid())# os.getpid() 可以获得当前进程的id
pid = os.fork()
if pid == 0:
print("I am a child process (%s) and my parent is (%s)" % (os.getpid(), os.getppid()))# os.getppid() 可以获得当前进程的父进程的id
else:
print("Create a child process (%s)" % pid)

需要注意的是,fork调用只有Unix/Linux才有,由于Mac是基于Unix的,所以也可以,那么如何在Windows上使用python编写多进程程序呢?

Python提供了multiprocessing模块来实现跨平台的多进程。

这个模块中提供了 ProcessPool, QueuePipe 这四个类

开辟子进程–Process

先从Process开始:

通过Process()来构造一个子进程实例,它接受的参数有:(group=None,target=None,name=None,args=(),kwargs={},*,daemon=None)

一个个来分析:

  • group 是不需要赋值的,这是一个保留参数,日后Python实现了ThreadGroup类,这个参数便可以发挥作用
  • target 是可以被run()方法调用的一个可被调用的对象,其实就是子进程要执行的代码
  • name 子进程的名字
  • args target的参数 tuple型
  • kwargs target的参数 dict型
  • daemon 是否为守护进程(前台进程/后台进程)

现在来看一个实例:

1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing import Process
import os

def run_proc(name): # 声明子进程任务
print("Run child process %s (%s)..." % (name, os.getpid()))
if __name__ == '__main__':
print("Parent process is %s" % os.getpid())
p = Process(target=run_proc, args=("test",)) # 构造子进程
print("Child process will start...")
p.start() # 准备就绪
p.join() # 等待执行结束
print('Child process end')

其中,start()方法表示进程准备就绪,等待CPU的调用
join()方法使得主进程等待子进程的结束再执行后续,通常用于进程间的同步

run()方法和start()方法的区别:run()方法相当于是简单调用了一下子进程的target方法,仅仅是一般调用,而start()方法才是真的开启了进程执行。
可以尝试执行两个分别打印100次数字和字母的进程,分别调用**run()start()试试,你会发现使用run()调用的结果是先打印完100个数字(字母),再打印字母(数字),而start()调用的结果是混合打印。(计算机处理的速度很快,最好加上time.sleep(random.random())**,就能看到非常明显的结果)

进程池–Pool

在需要批量建立许多子进程的场景下,可以使用进程池。

构造一个进程池的标准方法是Pool([processes[,initializer[,intargs[,maxtasksperchild[,context]]]]])

其中常用的就应该只有processes这个参数了,表示构造的子进程的数量,缺省值是os.cpu_count()的值,也就是你CPU的核数。

话不多说,在一个实例中感受一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from multiprocessing import Pool
import os, time, random
def long_time_task(): # 声明子进程任务
print("Starting run %s (%s)" % (name, os.getpid()))
start = time.time()
time.sleep(random.random()*3)
end = time.time()
print("Task %s runs %0.2f seconds." % (name, (end - start)))
if __name__ = "__main__":
print("Parent process is %s" % os.getpid())
p = Pool(4)
for i in range(5):
p.apply_async(long_time_task, args=(i,))
print("Waiting for all subprocesses done...")
p.close()
p.join()
print("All subprocesses done.")

其中,我们建立了一个四进程的进程池,使用apply_async(func[,args[,kwds[,callback[,error_callback]]])来为子进程调用一个函数。

后面使用close()使得进程池中不会再被提交进其他的任务,一旦等到所有的任务完成,工作进程就会退出。

在调用join()之前必须要先调close()terminate()方法

子进程中的IO–新的模块subprocess

1
2
3
4
import subprocesses
print("# ls -l -a -h")
ec = subprocesses.call(["ls", "-l", "-a", "-h"]) # call方法返回一个returncode对象
print("Exit Code:",ec)

运行该代码,结果和直接在命令行键入命令是同等效果。

接下来我们来看一下subprocess中的Popen类,该类用于在一个新进程中执行一个子程序。

Popen的构造器接受19个参数,很可怕…,不过常用的也就前几个,其他用缺省值就好,需要用时再查Doc,

Popen(args, bufsize=-1, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=True, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0, restore_signals=True, start_new_session=False, pass_fds=(), *, encoding=None, errors=None)

  • args 也就是在Shell中手工输入命令了,如果是string,直接执行,如果是sequence,就把第一个元素作为命令。
  • stdin,stdout,stderr 在Popen.communicate()很常用 也就是流的位置了
1
2
3
4
5
6
import subprocess
print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)

重点来说一下communicate()这个方法,它接受两个参数,其中一个就是相当于我们在Shell中直接输入的命令,二就是timeout参数

进程间的通信–Queue&Pipe

进程间通信的方式有很多种:

  • 管道(pipe)及有名管道(named pipe):管道可用于具有亲缘关系的父子进程间的通信,有名管道除了具有管道所具有的功能外,它还允许无亲缘关系进程间的通信。

  • 信号(signal):信号是在软件层次上对中断机制的一种模拟,它是比较复杂的通信方式,用于通知进程有某事件发生,一个进程收到一个信号与处理器收到一个中断请求效果上可以说是一致的。

  • 消息队列(message queue):消息队列是消息的链接表,它克服了上两种通信方式中信号量有限的缺点,具有写权限得进程可以按照一定得规则向消息队列中添加新信息;对消息队列有读权限得进程则可以从消息队列中读取信息。

  • 共享内存(shared memory):可以说这是最有用的进程间通信方式。它使得多个进程可以访问同一块内存空间,不同进程可以及时看到对方进程中对共享内存中数据得更新。这种方式需要依靠某种同步操作,如互斥锁和信号量等。

  • 信号量(semaphore):主要作为进程之间及同一种进程的不同线程之间得同步和互斥手段。

  • 套接字(socket):这是一种更为一般得进程间通信机制,它可用于网络中不同机器之间的进程间通信,应用非常广泛。

先从Queue开始:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from multiprocessing import Queue, Process
import os, time, random
def write(q):
for n in ["A", "B", "C"]:
q.put(n)
time.sleep(random.random())
def read(q):
while True:
value = q.get(True)
if __name__ == "__main__":
q = Queue()
pw = Process(target=write, args=(q,)) # 创建写进程
pr = Process(target=read, args=(q,)) # 创建读进程
pw.start()
pr.start()
pw.join()
pr.terminate()

然后是Pipe:

1
2
3
4
5
6
7
8
9
from multiprocessing import Pipe, Process
def func(conn):
conn.send(["A","B","C"])
conn.close
if __name__ = "__main__":
parent_conn,child_conn = Pipe()
p = Process(target=func, args=(child_conn,)) #子进程绑定发送任务
print(parent_conn.recv()) # ["A","B","C"] 父进程接受到子进程发来的信息
p.join()

多线程

Python所提供的线程,不是虚拟出来的而是,真实的POSIX线程。

标准库中提供两个模块:_thread,threading。其中_thread是低级模块,threading是对_thread的封装,是一个高级模块。

还是用一个例子来切入吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import time, threading
def loop():
print('thread (%s) is running...' % threading.current_thread().name)
n = 0
while n < 5:
n += 1
print('thread %s >>> %s' % (threading.current_thread().name, n))
time.sleep()
print("thread %s ended" % threading.current_thread().name)
print("thread %s is running ..." % threading.current_thread().name)
t = threading.Thread(target=loop, name="LoopThread")
t.start()
t.join()
print("All done!")

基本上,thread的API和process的很相似的,构造器参数是一模一样的,线程的启动,等待也和process类似。

上述代码中,出现最多的接口就是threading.current_thread()了,这个方法永远返回当前运行的线程实例,每个进程都会有至少一个线程,这个线程叫主线程,主线程可以开启新的子线程,主线程的名字为MainThread,子线程的名字在我们创建实例的时候制定,若为空,则Python会自动为其命名。

线程和进程最大的区别就是,每一条进程的资源是拷贝过来的,因此他们互不影响,而线程不同,所有的变量是线程共享的。

所以接下来,我们来看一下线程的锁。

Lock in Thread

先来看一下在没有锁机制的情况下,多线程之间是怎样把数据改乱的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import time, threading
balance = 0 # 关键数据
def change(n):
global balance
balance += n
balance -= n
# 相当于没做操作,值不变
def run_thread(n):
for i in range(1000000):
change(n)
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance) # 不一定是0了 实际测试中我出现了 -8 13 5 11等值

为什么会有这样的结果呢,原因是,balance += n 这一步看似简单的语句中,执行顺序是temp = balance + n –> balance = temp,这样的步骤,而两条线程又是混搭执行的,所以可能会出现:

1
2
3
4
5
6
7
8
temp1 = balance + n
temp2 = balance + n
balance = temp1 # 5
balance = temp2 # 8
temp1 = balance - n
balance = temp1 # 3
temp2 = balance - n
balance = temp2 - n # -5

为了解决这个问题,我们引入锁机制。

1
2
3
4
5
6
7
8
9
10
11
12
balance = 0
lock = threading.Lock() # 取得锁实例
def run_thread(n):
for i in range(1000000):
# 先要获取锁:
lock.acquire()
try:
# 此时就没问题
change_it(n)
finally:
# 改完了一定要释放锁:
lock.release()

这种简单的锁非常容易造成死锁的.

就比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
>>> import threading
>>> lock = threading.Lock()
>>> def run():
... lock.acquire()
... print("要被锁住了.")
... lock.require()
... print("(这一句执行不到)啊!被锁住了!")
>>> t = threading.Thread(target=run)
>>> t.start()
要被锁住了
>>>
>>> exit()
(main_thread已经被锁住了, 无法退出.)

这显然不是我们要的结果.所以在比较资源控制复杂的情况下, 更应该使用Python提供的递归锁(RLock)

原理很简单, 递归锁对象由一个计数器,每加锁一次,该计数器加一,每解锁一次,该计数器减一.

除了锁,上面提到的信号量也是解决资源冲突问题的一个方式:

信号量就可以简单的控制临界区资源的最大线程数.

使用起来和锁几乎是一样的, 其实信号量就是锁的进一步封装.

1
2
3
4
5
6
7
8
9
10
11
import threading
import time
data = "secretData"
semaphore = threading.BoundedSemaphore(5)
def run():
semaphore.acquire()
print(data)
time.sleep(1)
semaphore.release()
for i in range(20):
t = threading.Thread(target=run)

接着, 你会看到每1s最多也就有5条线程在访问临界区资源.

线程的队列也可以达到锁的作用, 这里的队列和上面的进程队列不一样, 上面的队列是为了进行通信, 而线程之间本身就可以通信.( 当然了,他们的API设计基本是一样的 )

注意啦:在终端下直接跑的Python是单线程的,所以当队列的元素超过了maxsize以及在Queue.qsize()为0的时候get都会造成线程堵塞,然后就GG了.

一共有三种Queue, 除了上面的最基本的Queue,还有LifoQueue和PriorityQueue.

见名知意, LIFO就是(Last in First out)后进先出的队列, 而P就是优先队列, 后进先出队列和基本的队列的使用的方法没有什么不同.

而优先队列在put的时候, 要加入权重, 接着在取出的时候就会按照权重的大小从小开始排列.

1
2
3
4
5
6
7
8
9
10
11
import queue
q = queue.priorityQueue()
q.put((10, "Justin"))
q.put((45, "tin"))
q.put((5, "Jus"))
q.get()
(5, "Jus")
q.get()
(10, "Justin")
q.get()
(45, "tin")

解决了全局变量的干扰问题,我们想到,为什么不用局部变量呢?

如果使用局部变量,那么每一条线程间的调用就只能通过函数传参来实现,很蠢.

所以,下一个登场的新角色–ThreadLocal!

ThreadLocal

ThreadLocal使用起来很方便,只要先声明一个全局的ThreadLocal实例,接着把需要传递的参数作为该实例的属性添加上去就好。

调用时,只要读属性就好。

就像这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import threading
# 创建全局ThreadLocal对象:
local_school = threading.local()
def process_student():
std = local_school.student # 获取值
print('Hello, %s (in %s)' % (std, threading.current_thread().name))
def process_thread(name):
local_school.student = name # 绑定值
process_student()

t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()