当前位置:首页 > 技术 > 正文内容

IO多路复用技术详解

访客 技术 2026年6月22日 1

一、IO多路复用概述

IO多路复用是一种通过单个线程同时管理多个IO通道的技术。其核心思想是:通过一种监控机制,能够同时监视多个文件描述符的状态变化,当某个描述符就绪(可读或可写)时,通知应用程序进行相应的处理。

需要特别说明的是,select、poll和epoll本质上属于同步IO范畴。这是因为在读写事件就绪后,应用程序需要自行负责实际的读写操作,这个过程是阻塞的。与之相对应的是异步IO,后者由内核或库负责将数据从内核空间拷贝到用户空间,应用程序无需介入。

在Linux操作系统中,select、poll和epoll是三种主流的IO多路复用实现机制。

1、select机制

select最早出现在1983年的4.2BSD系统中,通过单一的select()系统调用来监控一个包含多个文件描述符的数组。当select()返回时,该数组中已经就绪的文件描述符会被内核修改其标志位,进程可以据此获取这些描述符并进行后续的读写操作。

select在几乎所有平台上都得到支持,这是其重要的跨平台优势。然而,它也存在明显的局限性:单个进程能够监控的文件描述符数量存在上限,在Linux系统上默认限制为1024。虽然可以通过修改内核宏定义或重新编译内核来提升这一限制,但这种做法并不推荐。

此外,select()所维护的数据结构会随着监控文件描述符数量的增加而产生越来越大的内存复制开销。由于网络延迟等原因,大量TCP连接可能处于空闲状态,但select()仍会对所有socket进行线性扫描,这无疑是一种资源浪费。

2、poll机制

poll机制于1986年在System V Release 3中引入,与select相比没有本质区别,但去除了最大文件描述符数量的限制。

poll同样存在一个缺陷:包含大量文件描述符的结构体数组会在用户态和内核态之间整体复制,无论这些文件描述符是否已经就绪,复制开销都会随着文件描述符数量的增加而线性增长。

select()和poll()在通知进程哪些文件描述符已经就绪后,如果进程没有立即对其进行IO操作,那么在下次调用select()或poll()时,这些文件描述符会被再次报告为就绪状态。这种机制被称为水平触发(Level Triggered),其优势在于不会丢失就绪通知。

3、epoll机制

直到Linux 2.6内核版本,epoll才正式登场。epoll被公认为Linux下性能最优的IO多路复用方案,它几乎集合了之前所有方案的全部优点。

epoll支持两种触发模式:水平触发和边缘触发(Edge Triggered)。边缘触发仅在文件描述符状态发生变化时通知进程一次,如果进程没有及时处理,后续不会再收到通知。从理论上讲,边缘触发的性能更优秀,但实现复杂度也更高。

epoll只向应用程序报告已经就绪的文件描述符。调用epoll_wait()返回的不是具体的描述符列表,而是就绪描述符的数量。应用程序只需从epoll指定的数组中依次获取相应数量的描述符即可。epoll内部使用了内存映射(mmap)技术,彻底消除了文件描述符在系统调用时的复制开销。

epoll的另一个重要改进是采用基于事件的回调机制。在select/poll中,内核只有在进程主动调用查询方法时才会扫描所有监控的文件描述符。而epoll通过epoll_ctl()预先注册要监控的文件描述符,当某个描述符就绪时,内核会使用类似回调的机制迅速激活该描述符,进程调用epoll_wait()时即可获得通知。

Python中的IO多路复用

Python标准库中的select模块提供了select、poll和epoll三种方法的接口。需要注意的是,不同操作系统对这些方法的支持程度不同:

  • Windows Python:仅支持select
  • Mac Python:仅支持select
  • Linux Python:支持select、poll和epoll三种方法

需要特别说明的是,网络操作、文件操作、终端操作等都属于IO操作。对于Windows平台,select仅支持Socket操作,无法检测普通文件内容的变化。

4、三种机制的区别

select、poll和epoll三种IO多路复用机制在性能和使用场景上存在明显差异,选择合适的方式可以显著提升程序的并发处理能力。

二、select实现

Python的select()方法直接调用操作系统的底层IO接口,它能够监控sockets、open files以及pipes等所有具有fileno()方法的对象何时变为可读、可写或发生通信错误。相比于在循环中逐一等待和监控多个客户端连接的方式,select的效率要高得多,因为它直接利用操作系统提供的C语言网络接口进行操作。

select函数监控的文件描述符分为三类:writefds(可写)、readfds(可读)和exceptfds(异常)。调用select后函数会阻塞,直到有描述符就绪或超时。当函数返回后,应用程序可以通过遍历fdset来找到就绪的描述符。

1、select语法

select(rlist, wlist, xlist, timeout=None)

select()方法接收并监控三个通信列表:第一个参数监控输入数据,第二个参数监控输出数据,第三个参数监控异常错误数据,第四个参数设置等待时间。

使用select需要创建两个列表来分别存放输入和输出信息,传递给select方法后由内核进行监控。

inputs = [server,]
outputs = []
readable, writeable, exceptional = select.select(inputs, outputs, inputs)

select方法工作原理:

select方法用于监视文件句柄的状态变化,返回三个列表:

  • 当第一个参数序列中的句柄发生可读事件(accept或read)时,该句柄被添加到返回值1序列中
  • 当第二个参数序列中有句柄可写时,所有句柄被添加到返回值2序列中
  • 当第三个参数序列中的句柄发生错误时,该句柄被添加到返回值3序列中
  • 未设置超时时间时,select会一直阻塞;设置超时时间后,超过指定时间返回空列表

2、select服务端实现示例

import select
import socket
import queue

server = socket.socket()
server.bind(("localhost", 9000))
server.listen(1000)
server.setblocking(False)

msg_queue = {}
inputs = [server,]
outputs = []

while True:
    readable, writeable, exceptional = select.select(inputs, outputs, inputs)
    
    for r in readable:
        if r is server:
            conn, addr = r.accept()
            print("新连接:", addr)
            inputs.append(conn)
            msg_queue[conn] = queue.Queue()
        else:
            data = r.recv(1024)
            print("接收数据:", data)
            msg_queue[r].put(data)
            outputs.append(r)
    
    for w in writeable:
        data_to_send = msg_queue[w].get()
        w.send(data_to_send)
        outputs.remove(w)
    
    for e in exceptional:
        if e in outputs:
            outputs.remove(e)
        inputs.remove(e)
        del msg_queue[e]

三、epoll实现

epoll是一种高效的IO多路复用方式,但需要注意它在Windows平台下不被支持。Python的selectors模块默认使用epoll机制(Linux平台),如果是在Windows系统上使用,会自动回退到select。

selectors模块是Python 3.x新增的模块,Python 2中不存在。

1、selectors模块语法

sel = selectors.DefaultSelector()
sel.register(server, selectors.EVENT_READ, accept)

上述代码创建了一个默认的选择器对象,并注册了服务器socket的读事件回调函数accept。当有新连接到来时,select会自动调用该回调函数进行处理。

2、selectors服务端实现

import selectors
import socket

sel = selectors.DefaultSelector()

def accept_handler(sock, mask):
    """处理客户端连接请求"""
    conn, addr = sock.accept()
    print("接受连接:", conn, "来源:", addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read_handler)

def read_handler(conn, mask):
    """处理客户端数据接收"""
    data = conn.recv(1024)
    if data:
        print("回显数据:", repr(data), "到", conn)
        conn.send(data)
    else:
        print("关闭连接:", conn)
        sel.unregister(conn)
        conn.close()

server = socket.socket()
server.bind(('localhost', 9999))
server.listen(500)
server.setblocking(False)
sel.register(server, selectors.EVENT_READ, accept_handler)

while True:
    events = sel.select()
    print("事件:", events)
    for key, mask in events:
        callback = key.data
        callback(key.fileobj, mask)

运行效果示例

事件: [(SelectorKey(...), 1)]
接受连接 from ('127.0.0.1', 50281)
事件: [(SelectorKey(...), 1)]
回显 b'adas' to ...
事件: [(SelectorKey(...), 1)]
回显 b'HA' to ...

3、客户端实现

import socket
import sys

messages = [b'Hello ',
            b'World ',
            b'!']
server_address = ('localhost', 9999)

socks = [socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(100)]

print('连接服务器 %s port %s' % server_address)
for s in socks:
    s.connect(server_address)

for message in messages:
    for s in socks:
        print('%s: 发送 "%s"' % (s.getsockname(), message))
        s.send(message)
    
    for s in socks:
        data = s.recv(1024)
        print('%s: 接收 "%s"' % (s.getsockname(), data))
        if not data:
            print('关闭socket', s.getsockname())

四、实用案例

1、监控终端输入

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import select
import sys

while True:
    readable, _, _ = select.select([sys.stdin,], [], [], 1)
    if sys.stdin in readable:
        print('检测到输入:', sys.stdin.readline())

2、简单的Socket服务器

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket
import select

sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sk.bind(('127.0.0.1', 8002))
sk.listen(5)
sk.setblocking(0)

inputs = [sk]

while True:
    readable_list, _, _ = select.select(inputs, [], inputs, 1)
    
    for r in readable_list:
        if sk == r:
            print('接受连接')
            client, address = r.accept()
            client.setblocking(0)
            inputs.append(client)
        else:
            received = r.recv(1024)
            if received:
                print('接收数据:', received)
            else:
                inputs.remove(r)

sk.close()

3、完整的服务器端实现

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import select
import socket
import sys
import Queue

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)

server_address = ('localhost', 10000)
print('启动于 %s port %s' % server_address)
server.bind(server_address)

server.listen(5)

inputs = [server]
outputs = []
message_queues = {}

while inputs:
    print('等待下一个事件...')
    readable, writable, exceptional = select.select(inputs, outputs, inputs)
    
    for s in readable:
        if s is server:
            connection, client_address = s.accept()
            print('连接来自', client_address)
            connection.setblocking(0)
            inputs.append(connection)
            message_queues[connection] = Queue.Queue()
        else:
            data = s.recv(1024)
            if data:
                print('收到 "%s" 来自 %s' % (data, s.getpeername()))
                message_queues[s].put(data)
                if s not in outputs:
                    outputs.append(s)
            else:
                print('关闭', client_address)
                if s in outputs:
                    outputs.remove(s)
                inputs.remove(s)
                s.close()
                del message_queues[s]
    
    for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()
        except Queue.Empty:
            print(s.getpeername(), '队列为空')
            outputs.remove(s)
        else:
            print('发送 "%s" 到 %s' % (next_msg, s.getpeername()))
            s.send(next_msg)
    
    for s in exceptional:
        print('异常情况于', s.getpeername())
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()
        del message_queues[s]

上述代码展示了如何使用select构建一个完整的Socket服务器。该服务器能够同时处理多个客户端连接,当某个连接无数据可发时,服务器可以转而处理其他连接的请求。需要注意的是,如果每个请求的处理耗时较长,select版本的服务器仍然无法实现真正的并行处理。

相关文章

Linux crontab 详解

1) crontab 是什么cron 是 Linux 的定时任务守护进程;crontab 是用来编辑/查看“按时间周期执行命令”的表(cron table)。常见两类:用户 crontab:每个用户一份(crontab -e 编辑)系统级 crontab / cron.d:可指定执行用户(/etc/crontab、/etc/cron.d/*)2) crontab 时间...

富文本里可以允许的 HTML 属性

一、所有标签默认允许的安全属性(极少)class        (可选)id           (通常建议禁用)title️ 注意:id 容易被滥用做锚点注入,很多系统直接禁用class 允许的话最好只允许固定前缀(如 editor-*)二、a 标签允许属性<a href="" t...

Mac 安装 Node.js 指南

方法一:通过官网安装包(最简单,适合初学者)如果你只是想快速安装并开始使用,这是最直接的方法。访问 Node.js 官网。页面会显示两个版本:LTS (Recommended For Most Users):长期支持版,最稳定。建议选这个。Current:最新特性版,包含最新功能但可能不够稳定。下载 .pkg 安装包并运行。按照安装向导点击“下一步”即可完成。方法二:使用 Homebrew 安装(...

Dom\HTML_NO_DEFAULT_NS 的副作用:自动加闭合标签

在使用Dom\HTMLDocument时,Dom\HTML_NO_DEFAULT_NS 将禁止在解析过程中设置元素的命名空间, 此设置是为了与DOMDocument向后兼容而存在的。当使用它时,已知的一个副作用就是:自动加闭合标签例如 </img> 为什么会这样?当你使用:Dom\HTML_NO_DEFAULT_NS文档会变成 无命名空间模式,此时内部更接近 XML...

Laravel 事件和监听器创建

在 Laravel 中,使用 Artisan 命令创建 Events(事件) 和 Listeners(监听器) 是非常高效的。你可以通过以下几种方式来实现:1. 手动创建单个 Event如果你只想创建一个事件类,可以使用 make:event 命令:Bashphp artisan make:event UserRegistered执行后,文件将生成在 app/Even...

自定义域名解析神器 dnsmasq

什么是 dnsmasq?dnsmasq 是一个轻量级、功能强大的网络服务工具,专为小型和中等规模网络设计。它是一个综合的网络基础设施解决方案[1]。dnsmasq 能做什么?功能说明应用场景DNS 转发与缓存将 DNS 查询转发到上游服务器(ISP、Google DNS 等),并在本地缓存结果加快 DNS 查询速度,减少外部 DNS 流量本地 DNS解析本地网络设备的主机名,无需编辑&n...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。