本文共 30463 字,大约阅读时间需要 101 分钟。
协成又称为微线程
CPU是无法识别协程的,只能识别是线程,协成是由开发人员自己控制的。协成可以在单线程下实现并发的效果(实际计算还是串行的方式)。如果使用线程在多个函数之间进行上下文切换,那么这个上下文的逻辑位置是保存在CPU中的,而协程也有上下文切换的操作,但是协成的上下文逻辑位置不是通过CPU保存的,所以使用协成的好处就是更少的占用了CPU。
线程之间修改共享数据时,需要锁;而协成不需要,因为协成在线程中是串行的方式来修改数据的,所以不需要锁。
协程可以做到高并发、高扩展、低成本资源(一个CPU上万个协成都没问题)。
协程的缺点:
因为是在单线程中,所以无法利用多核CPU的资源;协程如果要使用多核CPU的话,那么就需要先启多个进程,在每个进程下启一个线程,然后在线程下在启协程。在单线程下实现的并发效果,就是协程。
在单线程中的协程,实现并发:import timedef consumer(name): print("--->starting eating baozi...") while True: new_baozi = yield #返回到conn.__next__() print("[%s] is eating baozi %s" % (name, new_baozi)) # time.sleep(1)def producer(): r = con.__next__() #执行生成器 r = con2.__next__() n = 0 while n < 5: n += 1 con.send(n) #返回到conn.__next__()后,通过send发送参数给yield con2.send(n) print("\033[32;1m[producer]\033[0m is making baozi %s" % n)if __name__ == '__main__': con = consumer("c1") #定义成生成器 con2 = consumer("c2") p = producer()执行结果:--->starting eating baozi...--->starting eating baozi...[c1] is eating baozi 1[c2] is eating baozi 1[producer] is making baozi 1[c1] is eating baozi 2[c2] is eating baozi 2[producer] is making baozi 2[c1] is eating baozi 3[c2] is eating baozi 3[producer] is making baozi 3[c1] is eating baozi 4[c2] is eating baozi 4[producer] is making baozi 4[c1] is eating baozi 5[c2] is eating baozi 5[producer] is making baozi 5##可以感觉到瞬间就执行完成了,我们通过自己写的方式实现了协程并发的效果(效果只是假象)##之所以产生了并发的效果,是因为执行代码时没有任何卡顿的地方,如果使用sleep的话,就不一样了。
import timeimport queuedef consumer(name): print("--->starting eating baozi...") while True: new_baozi = yield print("[%s] is eating baozi %s" % (name, new_baozi)) # time.sleep(1)def producer(): r = con.__next__() r = con2.__next__() n = 0 while n < 5: n += 1 con.send(n) con2.send(n) time.sleep(1) ##这里加一个sleep print("\033[32;1m[producer]\033[0m is making baozi %s" % n)if __name__ == '__main__': con = consumer("c1") con2 = consumer("c2") p = producer()执行结果:--->starting eating baozi...--->starting eating baozi...[c1] is eating baozi 1[c2] is eating baozi 1[producer] is making baozi 1[c1] is eating baozi 2[c2] is eating baozi 2[producer] is making baozi 2[c1] is eating baozi 3[c2] is eating baozi 3[producer] is making baozi 3[c1] is eating baozi 4[c2] is eating baozi 4[producer] is making baozi 4[c1] is eating baozi 5[c2] is eating baozi 5[producer] is making baozi 5##加了sleep后,执行明显就卡顿了; ##假如此时协程1通过CPU处理后,有一个IO(耗时)的操作(比如传数据),这个耗时需要30秒,那么我们当前代码就得等上30秒后,才会用CPU去处理另一个协程的代码。 但是要知道,处理IO操作时是不需要CPU的,那么此时CPU有30秒时间期间没有给其他协程做计算。## 实际的协程并发就是,当处理协程1时,遇到30秒的IO操作时不需要等待30秒就切换到其他协程去做计算,这样就实现了并发的效果。## 正常情况下每个协程IO的操作时间都不同,那么我们什么时候进行上下文的切换来切换到之前的协程呢,提前和延后都不合适,提前了IO操作还没完成,不能进行下一步计算,延后的话就对该协程造成了延迟,所以就需要来识别IO操作什么时候完成,当完成之后立刻切换到之前的协程进行下一步CPU的计算。
greenlet模块#之前我们是自己用yield的方式自己实现的协程;而greenlet模块是已经封装好的协程。#greenlet需要在cmd中通过 pip install greenlet 来安装from greenlet import greenletdef test1(): print (12) gr2.switch() #切换到函数test2() print(34) gr2.switch() #这里会切换到test2中的gr1.switch()位置,继续向下执行def test2(): print (56) gr1.switch() #切换到test1(),这里切换到test1后不是从头开始执行函数了,而是从记录了gr2.switch() 的位置继续执行 print(78)gr1 = greenlet(test1) #启动一个协程gr2 = greenlet(test2)gr1.switch() #调用test1开始执行,类似yield的next切换执行结果:12563478## 执行效果和yield类似,需要手动定义switch()才会切换
gevent模块#cmd中pip install gevent#gevent用于协程之间的自动切换import geventdef foo(): print ('Running in foo') gevent.sleep(2) #gevent.sleep用来模拟IO操作,这里模拟2秒钟 print ('Explicit context switch to foo again')def bar(): print ('Explicit context to bar') gevent.sleep(1) print ('Implicit Context switch back to bar')gevent.joinall([ gevent.spawn(foo), #spawn是生成的意思,这里生成协程foo gevent.spawn(bar),])执行结果:Running in fooExplicit context to bar上面两行内容相当于并发一起执行的Implicit Context switch back to bar #这里在上2行内容后隔了1秒被执行Explicit context switch to foo again #这里在上1行内容后隔了1秒被执行##通过gevent.spawn(foo)执行了协程,执行到函数def foo()中的geven.sleep后,就会切换到另一个协程,另一个协程执行到geven.sleep后也会切换,所以当前两个协程只要IO操作还没执行完成就会不断的切换来确认IO是否执行完成;## 因为def bar中只sleep(1),执行的较快,所以'Implicit Context switch back to bar'就先于'Explicit context switch to foo again'被打印出来。
import geventdef foo(): print ('Running in foo') gevent.sleep(2) print ('Explicit context switch to foo again')def bar(): print ('Explicit context to bar') gevent.sleep(1) print ('Implicit Context switch back to bar')def func3(): print ('Running func3') gevent.sleep(0) #这里设置0秒只是为了进行协程间的切换 print ('Running func3 again')gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), gevent.spawn(func3),])执行结果:Running in fooExplicit context to barRunning func3Running func3 againImplicit Context switch back to barExplicit context switch to foo again##这里总体执行时间用了2秒左右的时间,实现了并发的效果。##协程之间的切换是轮询的,也就是串行的方式切换。##gevent模块中封装了手动切换的内容,利用手动切换的代码来实现自动切换的。
下载网页from urllib import requestdef f(url): print('GET: %s' % url) resp = request.urlopen(url) #建立一个实例resp,用来请求指定的链接 data = resp.read() #将请求的链接读取出来,并赋值给data; 这个data就是下载下来的网页 f = open('url.html','wb') f.write(data) #将读取的内容写入到文件中 f.close() print('%d bytes received from %s.' % (len(data), url))f("https://www.baidu.com")##通过上面代码来获取www.baidu.com网页执行结果:GET: https://www.baidu.com227 bytes received from https://www.baidu.com.这里的内容不对,是百度设置了反爬虫。下面我们弄个其他网页 用基本的爬虫功能,爬取了网页的内容爬虫网页肯定不是一个网页的去爬,肯定是大范围的,所以可以利用协程来进行大范围爬网页。
串行爬虫:from urllib import requestimport timedef f(url): print('GET: %s' % url) resp = request.urlopen(url) data = resp.read() print('%d bytes received from %s.' % (len(data), url))urls = ['https://www.python.org/', 'https://www.yahoo.com/', 'https://github.com/']time_start = time.time()for url in urls: f(url)print ("同步cost",time.time() - time_start)执行结果:D:\python3.6.4\python.exe E:/python/代码练习/A1.pyGET: https://www.python.org/48844 bytes received from https://www.python.org/.GET: https://www.yahoo.com/529760 bytes received from https://www.yahoo.com/.GET: https://github.com/52239 bytes received from https://github.com/.异步cost 22.685065507888794##通过串行的方式来爬取网页,可以看到时间大概是22秒
并行爬虫:from urllib import requestimport gevent,timedef f(url): print('GET: %s' % url) resp = request.urlopen(url) data = resp.read() print('%d bytes received from %s.' % (len(data), url))async_time = time.time()gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), #这里加了参数,启动三个协程都执行f这个函数 gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://github.com/'),])print ('异步cost',time.time()- async_time)执行结果:D:\python3.6.4\python.exe E:/python/代码练习/A1.pyGET: https://www.python.org/48844 bytes received from https://www.python.org/.GET: https://www.yahoo.com/530851 bytes received from https://www.yahoo.com/.GET: https://github.com/52239 bytes received from https://github.com/.异步cost 21.431119441986084##看到时间是21秒的样子(根据网速会有波动),而且执行过程中发现用的也是串行的方式来执行的。##依然使用串行的方式来执行是因为,gevent没有识别urllib的IO操作
from urllib import requestimport gevent,timefrom gevent import monkeymonkey.patch_all()#通过monkey.patch_all()可以自动识别urllib中有可能是IO的所有操作,然后在操作之前打一个标记,实现阻塞的效果(类似gevent.sleep)的效果。#所以一旦gevent发现阻塞的效果,就会进行协程之间的切换,然后就可以实现协程并行的效果了。def f(url): print('GET: %s' % url) resp = request.urlopen(url) data = resp.read() print('%d bytes received from %s.' % (len(data), url))async_time = time.time()gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), #这里加了参数,启动三个协程都执行f这个函数 gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://github.com/'),])print ('异步cost',time.time()- async_time)执行结果:GET: https://www.python.org/GET: https://www.yahoo.com/GET: https://github.com/52239 bytes received from https://github.com/.48844 bytes received from https://www.python.org/.519508 bytes received from https://www.yahoo.com/.异步cost 6.967620849609375##可以看到明显执行耗费的时间变短了很多。
server端:import sysimport socketimport timeimport geventfrom gevent import socket, monkeymonkey.patch_all()def server(port): s = socket.socket() s.bind(('0.0.0.0', port)) s.listen(500) #最大500个连接 while True: cli, addr = s.accept() #等待请求;默认收到请求是交给协程,不过下面设置交给协程。 gevent.spawn(handle_request, cli) #启动协程并调用函数,将请求连接的实例cli交给handle_request函数def handle_request(conn): try: while True: data = conn.recv(1024) print("recv:", data) conn.send(data) if not data: conn.shutdown(socket.SHUT_WR) ##conn.shutdown将客户端关闭。socket.SHUT_WR发一个信号。 ##上一行代码可以用break代替 except Exception as ex: print(ex) finally: conn.close()if __name__ == '__main__': server(8001)client端:import socketHOST = 'localhost' # The remote hostPORT = 8001 # The same port as used by the servers = socket.socket(socket.AF_INET, socket.SOCK_STREAM)s.connect((HOST, PORT))while True: msg = bytes(input(">>:"), encoding="utf8") s.sendall(msg) data = s.recv(1024) # print(data) print('Received', data)s.close()从执行结果来看,我们启了3个client分别向server发送信息,然后在server端可以看到通过协程同时处理了3个client的数据。
在此之前协程之间的切换,会在IO操作完成之前在协程之间不断的切换,但目前依然没有解决要等待IO操作完成之后才切换的功能。
通常,我们写服务器处理模型的程序时,有以下几种模型:
(1)每收到一个请求,创建一个新的进程,来处理该请求;(2)每收到一个请求,创建一个新的线程,来处理该请求;(3)每收到一个请求,放入一个事件列表,让主进程通过非阻塞I/O方式来处理请求(就是以事件驱动的方式来处理)上面的几种方式,各有千秋,第(1)种方法,由于创建新的进程的开销比较大,所以,会导致服务器性能比较差,但实现比较简单。第(2)种方式,由于要涉及到线程的同步,有可能会面临死锁等问题。第(3)种方式,在写应用程序代码时,逻辑比前面两种都复杂。综合考虑各方面因素,一般普遍认为第(3)种方式是大多数网络服务器采用的方式之前接触到ThreadingTCPServer,这表示启动多线程
还有一个叫ForkingTCPServer,启动多进程现在主流的网络服务模型用的就是事件驱动
在UI编程中,常常要对鼠标点击进行相应,首先如何获得鼠标点击呢?
方式一:创建一个线程,该线程一直循环检测是否有鼠标点击,那么这个方式有以下几个缺点:方式二:就是事件驱动模型
目前大部分的UI编程都是事件驱动模型,如很多UI平台都会提供onClick()事件,这个事件就代表鼠标按下事件。事件驱动模型大体思路如下:事件驱动模型就是根据事件做出相应的反应,比如点下文档的'X'就关闭文档,点击 '-' 就最小化文档。
下面讨论的是基于linux的IO
用户空间与内核空间
现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操作系统的核心是内核(操作系统需要使用部分内存空间来运行,这就是内核空间),独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限(比如访问网卡、音响声卡都是通过内核访问的,而不是用户程序)。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。进程切换
进程切换就是上下文的切换进程阻塞
正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新工作做等,则由系统自动执行阻塞原语(Block),使自己由运行状态变为阻塞状态(比如socket server等不到client的数据就会阻塞)。可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行态的进程(获得CPU),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的。文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。
文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。
文件描述符相当于一个索引,通过索引打开真正的内容。
缓存 I/O 又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。
缓存 I/O 的缺点:
数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作,这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。打开一个文件默认不是在用户的内存空间,而是放入了内核的缓存中,然后在从内核的缓存拷贝到用户的内存空间; 传数据也是一样,先是到内核缓存中,然后才会拷贝到用户的内存空间; 使用内核是很耗CPU的,耗CPU是指拷贝到内存的这个指令,如果有大量数据需要从内核缓存拷贝到用户内存空间,那么就会有大量的指令会消耗CPU资源
访问网卡、声卡等只能通过内核实现,而用户空间是无法直接访问内核空间的,所以需要通过内核缓存的空间将内容拷贝到用户的内存空间,然后用户才可以使用。
刚才说了,对于一次IO访问(以read举例),数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说,当一个read操作发生时,它会经历两个阶段:
正是因为这两个阶段,linux系统产生了下面五种网络模式的方案。
注:由于signal driven IO在实际中并不常用,所以我这只提及剩下的四种IO Model。
在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:
recve时接收端会阻塞,直到系统接收到数据,系统接收到数据后此时也是阻塞的,会从内核缓存copy到用户内存,然后返回一个OK才是用户真正接收到了数据。当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据(对于网络IO来说,很多时候数据在一开始还没有到达。比如,还没有收到一个完整的UDP包。这个时候kernel就要等待足够的数据到来)。这个过程需要等待,也就是说数据被拷贝到操作系统内核的缓冲区中是需要一个过程的。而在用户进程这边,整个进程会被阻塞(当然,是进程自己选择的阻塞)。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。
所以,blocking IO的特点就是在IO执行的两个阶段都被block了(等数据的阶段和从内核拷贝给用户的阶段)。
linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:
当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call(receive的动作),那么它马上就将数据拷贝到了用户内存,然后返回。
所以,nonblocking IO的特点是用户进程需要不断的主动询问kernel数据好了没有。
IO multiplexing就是我们说的select,poll,epoll,有些地方也称这种IO方式为event driven IO。select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select,poll,epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。
在单线程且又是阻塞模式下,是没法实现多个IO一起执行的,因为当接收数据时,一直没有接收到的话就会一直卡住。
在单线程下非阻塞模式下,假如此时有10个IO,对这10个IO进行for循环来接收数据,先接收其中2个IO的数据,如果这2个IO没有接收到数据就会返回err,此时就不会在阻塞了,然后继续进行for循环,此时其他IO如果有数据就会将数据接收过来,然后就这样不断的receive,发现err就不阻塞,有数据则接收。
使用非阻塞模式就可以处理多个socket,对于用户来说就已经是并发了。但是要注意的是第一阶段不卡了,但是此时第二阶段依然会卡,如果从内核copy到用户内存的数据不大,则很快会copy完成,但是如果数据很大的话,第二阶段就会一直在copy数据,直到数据copy完成,但相应的在第二阶段卡的时间也会很久。当用户进程调用了select,那么整个进程会被block,假如此时有100个socket的IO,那么kernel会监视所有select负责的socket,当任何一个socket中的数据准备好了(kernel的数据准备好),select就会返回。这个时候用户进程在调用read操作,将数据kernelcopy到用户进程。
所以,I/O多路复用的特点是通过一种机制 一个进程能同时等待多个文件描述符,而这些文件描述符(socket连接)其中的任意一个进入读就绪状态,select()函数就可以返回。 多路复用和阻塞模式的区别就是,阻塞模式监视一个socket,有数据则接收;而多路复用就是可以通过select监视N个socket,只要其中任何一个有数据,则进行select返回,然后receive数据(第二阶段数据过大的话,依然会有阻塞)。假如此时有10000个socket连接,监视到有数据后kernel就会告诉返回给用户进程,但kernel不会告诉用户进程具体是哪个socket连接,所以用户就会循环着10000个socket连接,但是即使其中只有2个socket有数据,用户程序也会去循环着10000个socket连接,这就造成了大量的多余循环操作。
select
select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一。
select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。
另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。
poll
poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。
另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。
epoll
直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。
epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。
另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。
epool会告诉用户进程具体哪个socket连接有数据了,所以用户进程不需要在将所有socket 连接全都循环一次才发现具体哪个有数据。
Windows不支持epool,支持select
inux下的asynchronous IO其实用得很少。先看一下它的流程:
用户进程发起read操作之后,立刻就可以开始去做其它的事(不需要等待kernel拷贝数据到用户)。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后kernel主动将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了(没有任何阻塞)。异步因为实现比较复杂,所以使用的较少,使用较多的还是epool多路复用。
阻塞模式:
原理:内核阻塞,直到接收到完整的数据后,在将数据copy给用户内存,copy完成后会返回一个OK给当前server的进程,然后进程接触阻塞继续向下执行(执行代码)。server等待接收数据时,会处于阻塞的状态,当前进程不会再往下执行,除非接收到数据以后才会;假如当前有3个IO,目前第1个IO处于接收数据状态,除非第1个IO内核接收完成,否则第2和第3个内核IO都不会开始数据的接收;内核准备好数据还要讲数据copy给用户内存。
非阻塞模式:
原理:内核不阻塞,用户内存接收数据会阻塞(直到接收完成);多个IO时,内核会同时接收多个IO数据,用户进程会轮询的方式read内核是否准备好数据,如果没有准备好的话内核返回err给用户进程,此时用户进程会问内核其他IO是否准备好数据,没准备好内核返回err给用户进程,准备的话就将数据copy给用户内存。server等待接收数据,假如当前有3个IO,内核的第1个IO数据没有准备好,用户进程向进程询问第1个IO数据是否准备好,内核返回err给用户进程,用户进程去问内核第2个IO数据是否准备好,准备好了就会将数据copy到用户内存,用户进程以此类推的循环去询问内核; 需要不断的read和返回err,开销较大。
IO多路复用:
select:有多个socket IO时,通过select来负责所有socket IO,然后内核会监视所有select负责的socket,用户进程会循环所有IO,当任何一个socket IO中的数据准备好了(相当于内核的数据准备好了)且刚好被用户进程循环到,select就会返回datagram ready给用户进程(此时用户内存处于阻塞状态),用户进程将该IO数据copy到用户内存,copy完成后返回OK给用户进程,告知解除用户内存,让其他IO在内核内存准备好的数据可以copy到用户内存(如果当前从内核copy到用户内存的数据较大的话,只能等待数据copy完成,也就是该阶段依然是阻塞,除非数据copy完成后,才会为其他IO的数据进行copy操作)。使用select不需要内核返回大量的err,但是用户进程依然需要循环所有IO,假如10000个IO,其中只有2个IO数据准备好了,那么用户进程依然需要去循环这10000个IO来发现这两个已经准备好数据的IO。select还是存在大量循环的操作。
poll:
poll和select基本相似,select支持的文件描述符(相当于每个IO的索引地址,用户进程需要根据具体的地址来进行制定的数据操作)为1024,poll则没有文件描述符的限制。
epoll:
不需要用户进程去循环,当内核数据准备好后会立即告知用户进程具体的哪个socket IO数据准备好了,且只会说一遍,不会再次告知,用户进程不需要循环所有socket IO ,不过epoll的代码实现相当复杂。
异步IO:
用户进程发起read后就立刻开始做其他事情,不需要等内核将数据copy到用户;而内核接到read后会返回信息给用户进程,不会让用户进程产生阻塞(这样第二阶段不会阻塞),然后当内核准备好数据后,内核会主动将数据copy给用户内存(其他模式是用户主动从内核copy数据),当数据copy完成后内核会发送一个signal给用户进程,告诉用户进程read操作完成了(没有任何阻塞)。异步因为实现比较复杂,所以使用的较少,使用较多的还是epool多路复用。
各个IO Model的比较如下图所示:
socket server 只有在非阻塞的模式下,才可以实现单线程下的多路复用
server端:import selectimport socketimport queueserver = socket.socket()server.bind(('localhost',9000))server.listen(1000)server.setblocking(False) #设置socket server为非阻塞模式inputs = [server,]#程序运行时就需要监视连接,但server启动时不会有client马上或者正好连接过来,所以这里启动server程序时监视server自己的连接#监视自己的连接后,就发现了有连接活动,然后就会阻塞状态outputs = []readable,writeable,exceptional = select.select(inputs,outputs,inputs) #设置select## 第一个inputs是告诉操作系统监视哪些连接,任何一个有数据就会返回;## 第三个值,也就是第二个inputs用来监视被监视所有连接中,哪些有问题(比如100个连接其中有4个连接断开了),就会返回有问题的##将监视到第一个inputs的连接赋值给readable,将outputs(这个后面会说到)赋值给writeable,将第二个inputs赋值给exceptionalprint (readable,writeable,exceptional) #运行server端后,在去运行client端,就会打印三个值server.accept() #设置非阻塞模式后,这里就不会在阻塞了,如果没有数据就会报错。client端:import socketHOST = 'localhost'PORT = 9000c = socket.socket()c.connect((HOST,PORT))while True: msg = bytes(input(">>:"),encoding='utf-8') c.sendall(msg) data = c.recv(1024) print ('Received',data)c.close()server端执行结果:[] [] []##readable是client的连接;而writeable,exceptional则为两个空列表
修改server端:import selectimport socketimport queueserver = socket.socket()server.bind(('localhost',9000))server.listen(1000)server.setblocking(False)inputs = [server,]outputs = []readable,writeable,exceptional = select.select(inputs,outputs,inputs)print (readable,writeable,exceptional)for r in readable: # print (r) conn,addr = server.accept() print (conn,addr) #可以看到conn实例,和addr的client地址 print ("recv:",conn.recv(1024)) ##目前代码到这里就会报错,因为server希望收到client的数据,但是client没有发送数据; ##server没有收到数据,也不会阻塞,这里就会报错。 ##此时就希望select可以监视client的连接,当监视到client的对应的连接发送数据了以后,server再去接收数据,这样接收数据就不为空了。
修改server端:import selectimport socketimport queueserver = socket.socket()server.bind(('localhost',9000))server.listen(1000)server.setblocking(False)inputs = [server,]outputs = []while True: #死循环用来不断的去select监视 readable,writeable,exceptional = select.select(inputs,outputs,inputs) print (readable,writeable,exceptional) for r in readable: # print (r) if r is server: #这里确定通过server自己已经建立好连接,处于阻塞状态 conn,addr = server.accept() #与client建立连接 print ("新连接:",addr) inputs.append(conn) ##想要实现client发数据来时,server端能知道客户端发送数据过来,就需要select能够再检测这个conn ##将conn加入到inputs中,此时inputs=[server,conn],select发现有活动的连接就会返回,但此时不知道活动的连接是谁; ##所以select会循环inputs,如果发现是conn活动就说明数据发送过来了,如果是server活动说明有其他client建立了新连接。 else: #执行这里就说明r不等于server本身的连接,而是等于conn这个连接 data = conn.recv(1024) #接收conn连接的数据 print ("收到数据:",data)server端执行结果:[] [] []新连接: ('127.0.0.1', 56201)[ ] [] []收到数据: b'123'[ ] [] []新连接: ('127.0.0.1', 56205)[ ] [] []收到数据: b'456'##两次client建立连接,分别发送数据123和456,从server执行结果也可以看到两个client的连接和发送的数据;##但此时去看client端的话,两个client端都是卡主的状态。
server端:import selectimport socketimport queueserver = socket.socket()server.bind(('localhost',9000))server.listen(1000)server.setblocking(False)inputs = [server,]outputs = []while True: readable,writeable,exceptional = select.select(inputs,outputs,inputs) print (readable,writeable,exceptional) for r in readable: # print (r) if r is server: conn,addr = server.accept() print ("新连接:",addr) inputs.append(conn) else: data = conn.recv(1024) print ("收到数据:",data) #conn.send(data) #print ("send done!")client端:import socketHOST = 'localhost'PORT = 9000c = socket.socket()c.connect((HOST,PORT))while True: msg = bytes(input(">>:"),encoding='utf-8') c.sendall(msg) # data = c.recv(1024) # # print ('Received',data)c.close()执行结果:执行步骤:1、启动server端;2、启动client1,并发送数据;3、启动client2并发送数据;4、client1再次发送数据,此时发现server端报错了。server报错内容如下:Traceback (most recent call last): File "E:/python/代码练习/A1.py", line 31, indata = conn.recv(1024)BlockingIOError: [WinError 10035] 无法立即完成一个非阻止性套接字操作。#client1建立连接后,server端的data是用client1的连接接收数据;client2建立连接后,此时server端用的是client2的连接接收数据; 此时server端data的conn依然是client2的连接,所以这时client1在发送数据的话,server端就会报错。
修改server端:server = socket.socket()server.bind(('localhost',9000))server.listen(1000)server.setblocking(False)inputs = [server,]outputs = []while True: readable,writeable,exceptional = select.select(inputs,outputs,inputs) print (readable,writeable,exceptional) for r in readable: # print (r) if r is server: conn,addr = server.accept() print ("新连接:",addr) inputs.append(conn) else: data = r.recv(1024) #这里conn改成r,这时因为for循环时获取的是动态的活动连接,此时用client1发送数据,那么for循环整个inputs,然后发现活动的是client1的连接,所以此时r就等于client1的连接,然后用client1的连接来接收数据就不会出问题了。 print ("收到数据:",data) # r.send(data) # print ("send done!")
server端:server = socket.socket()server.bind(('localhost',9000))server.listen(1000)server.setblocking(False)msg_dic = {} #建立空字典,用于存储每个client连接发过来的数据inputs = [server,]outputs = []while True: readable,writeable,exceptional = select.select(inputs,outputs,inputs) print (readable,writeable,exceptional) for r in readable: # print (r) if r is server: conn,addr = server.accept() print ("新连接:",addr) inputs.append(conn) msg_dic[conn] = queue.Queue() #初始化一个队列,后面存要返回给客户端的数据 else: data = r.recv(1024) print ("收到数据:",data) msg_dic[r].put(data) #将data这个数据放入字典中对应的key,也就是r是key,data是value outputs.append(r) #放入返回的连接队列 ##当前将r这个连接已经放入outputs中了,所以下次select时会检查outputs中这个连接(这里并没有在接收到数据时直接发送数据回去,其实使用send也可以) for w in writeable: #要返回给客户端的连接列表 data_to_client = msg_dic[w].get() #此时w等于r,都是一个连接实例 w.send(data_to_client) #返回给客户端原数据 outputs.remove(w) #删除当前连接的数据,确保下次循环时不返回上次处理完连接的数据client端:import socketHOST = 'localhost'PORT = 9000c = socket.socket()c.connect((HOST,PORT))while True: msg = bytes(input(">>:"),encoding='utf-8') c.sendall(msg) data = c.recv(1024) print ('Received',data)c.close()server执行结果:[] [] []新连接: ('127.0.0.1', 58058)[ ] [] []收到数据: b'hello'[] [ ] [][ ] [] []收到数据: b'aa'[] [ ] [][ ] [] []新连接: ('127.0.0.1', 58073)[ ] [] []收到数据: b'bbb'##可以看到已经可以实现多连接并发了。
server端:import selectimport socketimport queueserver = socket.socket()server.bind(('localhost',9000))server.listen(1000)server.setblocking(False)msg_dic = {} #建立空字典,用于存储每个client连接发过来的数据inputs = [server,]outputs = []while True: readable,writeable,exceptional = select.select(inputs,outputs,inputs) print (readable,writeable,exceptional) for r in readable: # print (r) if r is server: conn,addr = server.accept() print ("新连接:",addr) inputs.append(conn) msg_dic[conn] = queue.Queue() #初始化一个队列,后面存要返回给客户端的数据 else: data = r.recv(1024) print ("收到数据:",data) msg_dic[r].put(data) #将data这个数据放入字典中对应的key,也就是r是key,data是value outputs.append(r) #放入返回的连接队列 ##当前将r这个连接已经放入outputs中了,所以下次select时会检查outputs中这个连接 for w in writeable: #要返回给客户端的连接列表 data_to_client = msg_dic[w].get() #此时w等于r,都是一个连接实例 w.send(data_to_client) #返回给客户端原数据 outputs.remove(w) #删除当前连接的数据,确保下次循环时不返回上次处理完连接的数据 for e in exceptional: #如果连接断开,那么就需要从inputs和outpus中移除 if e in outputs: outputs.remove(e) #移除outputs中的连接 inputs.remove(e) #移除inputs中的连接 del msg_dic[e] #移除队列中的数据client端:import socketHOST = 'localhost'PORT = 9000c = socket.socket()c.connect((HOST,PORT))while True: msg = bytes(input(">>:"),encoding='utf-8') c.sendall(msg) data = c.recv(1024) print ('Received',data)c.close()
之前讲的有些算是底层的东西,如果要了解epoll底层的东西可以访问连接:
Windows不支持epoll封装好的epoll(不是底层代码)这里封装好的代码在linux中会自动使用epoll,但在Windows中因为不支持epoll会自动使用select。server端:import selectorsimport socketsel = selectors.DefaultSelector() #生成一个select对象def accept(sock, mask): conn, addr = sock.accept() # 建立连接 print('accepted', conn, 'from', addr) conn.setblocking(False) #将连接设置为非阻塞模式 sel.register(conn, selectors.EVENT_READ, read) ## 不立刻收数据,将新建立的连接(conn)注册到sel这个对象中; ## 新连接活动,再一次while循环后,如果数据发过来就回调read函数; ## accept函数执行完成后,回到第while循环中的callback(key.fileobj, mask)def read(conn, mask): data = conn.recv(1000) # 接收数据 if data: #如果有数据 print('echoing', repr(data), 'to', conn) conn.send(data) # 返回数据 else: #如果没有数据,表示client断开连接了 print('closing', conn) sel.unregister(conn) #取消注册(从相关的列表中删除链接信息) conn.close() #关闭连接sock = socket.socket()sock.bind(('localhost', 10000))sock.listen(100)sock.setblocking(False)sel.register(sock, selectors.EVENT_READ, accept)## 将写好的socket这个server注册到sel对象中,让其监听socket## 只要来一个新连接就会回调函数accept(不是在这调用,这只是定义)while True: events = sel.select() #根据系统调用epoll或select;默认是阻塞模式,有活动连接就返回活动的连接列表 for key, mask in events: #循环活动的连接列表 callback = key.data #callback相当于定义函数(不是调用) callback(key.fileobj, mask) #调用函数传参数 ## key.fileobj是文件句柄,相当于还没有建立好连接的实例client端:import socketHOST = 'localhost'PORT = 10000c = socket.socket()c.connect((HOST,PORT))while True: msg = bytes(input(">>:"),encoding='utf-8') c.sendall(msg) data = c.recv(1024) print ('Received',data)c.close()server执行结果:acceptedfrom ('127.0.0.1', 53817)echoing b'111' to accepted from ('127.0.0.1', 53822)echoing b'22' to echoing b'1111111' to echoing b'222222222' to ##多个client并发,发送数据
server端:import selectorsimport socketsel = selectors.DefaultSelector() def accept(sock, mask): conn, addr = sock.accept() # 建立连接 print('accepted', conn, 'from', addr) conn.setblocking(False) #将连接设置为非阻塞模式 sel.register(conn, selectors.EVENT_READ, read)def read(conn, mask): data = conn.recv(1000) if data: print('echoing', repr(data), 'to', conn) conn.send(data) else: print('closing', conn) sel.unregister(conn) conn.close() sock = socket.socket()sock.bind(('localhost', 10000))sock.listen(100)sock.setblocking(False)sel.register(sock, selectors.EVENT_READ, accept)while True: events = sel.select() for key, mask in events: callback = key.data callback(key.fileobj, mask) client端:import socketimport sysmessages = [ b'This is the message. ', b'It will be sent ', b'in parts.', ] #定义3条数据server_address = ('localhost', 10000)# Create a TCP/IP socketsocks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM), socket.socket(socket.AF_INET, socket.SOCK_STREAM), socket.socket(socket.AF_INET, socket.SOCK_STREAM), socket.socket(socket.AF_INET, socket.SOCK_STREAM), ] #定义4个client去连接server; 这里可以定义N个client# Connect the socket to the port where the server is listeningprint('connecting to %s port %s' % server_address)for s in socks: s.connect(server_address) #4个client去连接serverfor message in messages: # Send messages on both sockets for s in socks: print('%s: sending "%s"' % (s.getsockname(), message) ) #s.getsockname()是客户端的地址(是服务器返回的) s.send(message) #4个链接发送数据,一共3条数据,4个client一共发送了12次 # Read responses on both sockets for s in socks: data = s.recv(1024) #发完数据后就接收数据。 print( '%s: received "%s"' % (s.getsockname(), data) ) if not data: #如果没有数据 print('closing socket', s.getsockname() ) ## 打印客户端要关闭了server端执行结果:acceptedfrom ('127.0.0.1', 54652)accepted from ('127.0.0.1', 54653)accepted from ('127.0.0.1', 54654)accepted from ('127.0.0.1', 54655)echoing b'This is the message. ' to echoing b'This is the message. ' to echoing b'This is the message. ' to echoing b'This is the message. ' to echoing b'It will be sent ' to echoing b'It will be sent ' to echoing b'It will be sent ' to echoing b'It will be sent ' to echoing b'in parts.' to echoing b'in parts.' to echoing b'in parts.' to echoing b'in parts.' to closing closing closing closing
修改client:import socketimport sysmessages = [ b'This is the message. ', b'It will be sent ', b'in parts.', ] server_address = ('localhost', 10000)socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(4)]#定义4个client去连接server; 这里可以定义N个clientprint('connecting to %s port %s' % server_address)for s in socks: s.connect(server_address) for message in messages: for s in socks: print('%s: sending "%s"' % (s.getsockname(), message) ) s.send(message) for s in socks: data = s.recv(1024) print( '%s: received "%s"' % (s.getsockname(), data) ) if not data: print('closing socket', s.getsockname() )执行结果与上面代码相同,只是定义4个client方式变了
在linux中运行,linux默认默认支持socket连接为1024,需要修改一下
这里最大不止65535,也可以改成100000等数字,不是按照端口数量来限制的socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(3000)]#client的代码修改一下,修改client链接数量(默认linux支持1024)linux使用epoll,最后执行很快
转载于:https://blog.51cto.com/daimalaobing/2087355