IO多路复用技术详解
一、IO多路复用概述
IO多路复用是一种通过单个线程同时管理多个IO通道的技术。其核心思想是:通过一种监控机制,能够同时监视多个文件描述符的状态变化,当某个描述符就绪(可读或可写)时,通知应用程序进行相应的处理。
需要特别说明的是,select、poll和epoll本质上属于同步IO范畴。这是因为在读写事件就绪后,应用程序需要自行负责实际的读写操作,这个过程是阻塞的。与之相对应的是异步IO,后者由内核或库负责将数据从内核空间拷贝到用户空间,应用程序无需介入。
在Linux操作系统中,select、poll和epoll是三种主流的IO多路复用实现机制。
1、select机制
select最早出现在1983年的4.2BSD系统中,通过单一的select()系统调用来监控一个包含多个文件描述符的数组。当select()返回时,该数组中已经就绪的文件描述符会被内核修改其标志位,进程可以据此获取这些描述符并进行后续的读写操作。
select在几乎所有平台上都得到支持,这是其重要的跨平台优势。然而,它也存在明显的局限性:单个进程能够监控的文件描述符数量存在上限,在Linux系统上默认限制为1024。虽然可以通过修改内核宏定义或重新编译内核来提升这一限制,但这种做法并不推荐。
此外,select()所维护的数据结构会随着监控文件描述符数量的增加而产生越来越大的内存复制开销。由于网络延迟等原因,大量TCP连接可能处于空闲状态,但select()仍会对所有socket进行线性扫描,这无疑是一种资源浪费。
2、poll机制
poll机制于1986年在System V Release 3中引入,与select相比没有本质区别,但去除了最大文件描述符数量的限制。
poll同样存在一个缺陷:包含大量文件描述符的结构体数组会在用户态和内核态之间整体复制,无论这些文件描述符是否已经就绪,复制开销都会随着文件描述符数量的增加而线性增长。
select()和poll()在通知进程哪些文件描述符已经就绪后,如果进程没有立即对其进行IO操作,那么在下次调用select()或poll()时,这些文件描述符会被再次报告为就绪状态。这种机制被称为水平触发(Level Triggered),其优势在于不会丢失就绪通知。
3、epoll机制
直到Linux 2.6内核版本,epoll才正式登场。epoll被公认为Linux下性能最优的IO多路复用方案,它几乎集合了之前所有方案的全部优点。
epoll支持两种触发模式:水平触发和边缘触发(Edge Triggered)。边缘触发仅在文件描述符状态发生变化时通知进程一次,如果进程没有及时处理,后续不会再收到通知。从理论上讲,边缘触发的性能更优秀,但实现复杂度也更高。
epoll只向应用程序报告已经就绪的文件描述符。调用epoll_wait()返回的不是具体的描述符列表,而是就绪描述符的数量。应用程序只需从epoll指定的数组中依次获取相应数量的描述符即可。epoll内部使用了内存映射(mmap)技术,彻底消除了文件描述符在系统调用时的复制开销。
epoll的另一个重要改进是采用基于事件的回调机制。在select/poll中,内核只有在进程主动调用查询方法时才会扫描所有监控的文件描述符。而epoll通过epoll_ctl()预先注册要监控的文件描述符,当某个描述符就绪时,内核会使用类似回调的机制迅速激活该描述符,进程调用epoll_wait()时即可获得通知。
Python中的IO多路复用
Python标准库中的select模块提供了select、poll和epoll三种方法的接口。需要注意的是,不同操作系统对这些方法的支持程度不同:
- Windows Python:仅支持select
- Mac Python:仅支持select
- Linux Python:支持select、poll和epoll三种方法
需要特别说明的是,网络操作、文件操作、终端操作等都属于IO操作。对于Windows平台,select仅支持Socket操作,无法检测普通文件内容的变化。
4、三种机制的区别
select、poll和epoll三种IO多路复用机制在性能和使用场景上存在明显差异,选择合适的方式可以显著提升程序的并发处理能力。
二、select实现
Python的select()方法直接调用操作系统的底层IO接口,它能够监控sockets、open files以及pipes等所有具有fileno()方法的对象何时变为可读、可写或发生通信错误。相比于在循环中逐一等待和监控多个客户端连接的方式,select的效率要高得多,因为它直接利用操作系统提供的C语言网络接口进行操作。
select函数监控的文件描述符分为三类:writefds(可写)、readfds(可读)和exceptfds(异常)。调用select后函数会阻塞,直到有描述符就绪或超时。当函数返回后,应用程序可以通过遍历fdset来找到就绪的描述符。
1、select语法
select(rlist, wlist, xlist, timeout=None)
select()方法接收并监控三个通信列表:第一个参数监控输入数据,第二个参数监控输出数据,第三个参数监控异常错误数据,第四个参数设置等待时间。
使用select需要创建两个列表来分别存放输入和输出信息,传递给select方法后由内核进行监控。
inputs = [server,]
outputs = []
readable, writeable, exceptional = select.select(inputs, outputs, inputs)
select方法工作原理:
select方法用于监视文件句柄的状态变化,返回三个列表:
- 当第一个参数序列中的句柄发生可读事件(accept或read)时,该句柄被添加到返回值1序列中
- 当第二个参数序列中有句柄可写时,所有句柄被添加到返回值2序列中
- 当第三个参数序列中的句柄发生错误时,该句柄被添加到返回值3序列中
- 未设置超时时间时,select会一直阻塞;设置超时时间后,超过指定时间返回空列表
2、select服务端实现示例
import select
import socket
import queue
server = socket.socket()
server.bind(("localhost", 9000))
server.listen(1000)
server.setblocking(False)
msg_queue = {}
inputs = [server,]
outputs = []
while True:
readable, writeable, exceptional = select.select(inputs, outputs, inputs)
for r in readable:
if r is server:
conn, addr = r.accept()
print("新连接:", addr)
inputs.append(conn)
msg_queue[conn] = queue.Queue()
else:
data = r.recv(1024)
print("接收数据:", data)
msg_queue[r].put(data)
outputs.append(r)
for w in writeable:
data_to_send = msg_queue[w].get()
w.send(data_to_send)
outputs.remove(w)
for e in exceptional:
if e in outputs:
outputs.remove(e)
inputs.remove(e)
del msg_queue[e]
三、epoll实现
epoll是一种高效的IO多路复用方式,但需要注意它在Windows平台下不被支持。Python的selectors模块默认使用epoll机制(Linux平台),如果是在Windows系统上使用,会自动回退到select。
selectors模块是Python 3.x新增的模块,Python 2中不存在。
1、selectors模块语法
sel = selectors.DefaultSelector()
sel.register(server, selectors.EVENT_READ, accept)
上述代码创建了一个默认的选择器对象,并注册了服务器socket的读事件回调函数accept。当有新连接到来时,select会自动调用该回调函数进行处理。
2、selectors服务端实现
import selectors
import socket
sel = selectors.DefaultSelector()
def accept_handler(sock, mask):
"""处理客户端连接请求"""
conn, addr = sock.accept()
print("接受连接:", conn, "来源:", addr)
conn.setblocking(False)
sel.register(conn, selectors.EVENT_READ, read_handler)
def read_handler(conn, mask):
"""处理客户端数据接收"""
data = conn.recv(1024)
if data:
print("回显数据:", repr(data), "到", conn)
conn.send(data)
else:
print("关闭连接:", conn)
sel.unregister(conn)
conn.close()
server = socket.socket()
server.bind(('localhost', 9999))
server.listen(500)
server.setblocking(False)
sel.register(server, selectors.EVENT_READ, accept_handler)
while True:
events = sel.select()
print("事件:", events)
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
运行效果示例
事件: [(SelectorKey(...), 1)]
接受连接 from ('127.0.0.1', 50281)
事件: [(SelectorKey(...), 1)]
回显 b'adas' to ...
事件: [(SelectorKey(...), 1)]
回显 b'HA' to ...
3、客户端实现
import socket
import sys
messages = [b'Hello ',
b'World ',
b'!']
server_address = ('localhost', 9999)
socks = [socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(100)]
print('连接服务器 %s port %s' % server_address)
for s in socks:
s.connect(server_address)
for message in messages:
for s in socks:
print('%s: 发送 "%s"' % (s.getsockname(), message))
s.send(message)
for s in socks:
data = s.recv(1024)
print('%s: 接收 "%s"' % (s.getsockname(), data))
if not data:
print('关闭socket', s.getsockname())
四、实用案例
1、监控终端输入
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import select
import sys
while True:
readable, _, _ = select.select([sys.stdin,], [], [], 1)
if sys.stdin in readable:
print('检测到输入:', sys.stdin.readline())
2、简单的Socket服务器
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socket
import select
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sk.bind(('127.0.0.1', 8002))
sk.listen(5)
sk.setblocking(0)
inputs = [sk]
while True:
readable_list, _, _ = select.select(inputs, [], inputs, 1)
for r in readable_list:
if sk == r:
print('接受连接')
client, address = r.accept()
client.setblocking(0)
inputs.append(client)
else:
received = r.recv(1024)
if received:
print('接收数据:', received)
else:
inputs.remove(r)
sk.close()
3、完整的服务器端实现
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import select
import socket
import sys
import Queue
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)
server_address = ('localhost', 10000)
print('启动于 %s port %s' % server_address)
server.bind(server_address)
server.listen(5)
inputs = [server]
outputs = []
message_queues = {}
while inputs:
print('等待下一个事件...')
readable, writable, exceptional = select.select(inputs, outputs, inputs)
for s in readable:
if s is server:
connection, client_address = s.accept()
print('连接来自', client_address)
connection.setblocking(0)
inputs.append(connection)
message_queues[connection] = Queue.Queue()
else:
data = s.recv(1024)
if data:
print('收到 "%s" 来自 %s' % (data, s.getpeername()))
message_queues[s].put(data)
if s not in outputs:
outputs.append(s)
else:
print('关闭', client_address)
if s in outputs:
outputs.remove(s)
inputs.remove(s)
s.close()
del message_queues[s]
for s in writable:
try:
next_msg = message_queues[s].get_nowait()
except Queue.Empty:
print(s.getpeername(), '队列为空')
outputs.remove(s)
else:
print('发送 "%s" 到 %s' % (next_msg, s.getpeername()))
s.send(next_msg)
for s in exceptional:
print('异常情况于', s.getpeername())
inputs.remove(s)
if s in outputs:
outputs.remove(s)
s.close()
del message_queues[s]
上述代码展示了如何使用select构建一个完整的Socket服务器。该服务器能够同时处理多个客户端连接,当某个连接无数据可发时,服务器可以转而处理其他连接的请求。需要注意的是,如果每个请求的处理耗时较长,select版本的服务器仍然无法实现真正的并行处理。