当前位置:首页 > 技术 > 正文内容

Python多进程与并发编程实践

访客 技术 2026年6月9日 1

多进程基础:os.fork 与 multiprocessing 在 Python 中,可通过 os.fork() 实现轻量级进程创建。父进程调用该方法后,系统将复制当前进程,子进程从返回值开始执行不同逻辑。

import os

if __name__ == '__main__':
    print(f"主进程ID: {os.getpid()}")
    pid = os.fork()
    if pid < 0:
        print("fork 失败")
    elif pid == 0:
        print(f"子进程 {os.getpid()},父进程为 {os.getppid()}")
    else:
        print(f"主进程 {os.getpid()} 创建子进程 {pid}")

由于 Jupyter 环境限制,建议将多进程代码保存为 .py 文件并通过命令行运行,以避免内核冲突。

使用 multiprocessing.Process 可实现更安全的多进程管理:

from multiprocessing import Process
import os

def worker_task(task_id):
    print(f"任务 {task_id} 在进程 {os.getpid()} 中执行")

if __name__ == '__main__':
    processes = []
    for i in range(5):
        p = Process(target=worker_task, args=(i,))
        p.start()
        processes.append(p)
    
    for p in processes:
        p.join()
    print("所有子进程完成")

进程池与任务调度:Pool 模块 multiprocessing.Pool 提供了高效的并行任务处理机制,支持固定数量的子进程同时工作。

from multiprocessing import Pool
import os
import time

def task_handler(task_name):
    print(f"任务 {task_name} 在进程 {os.getpid()} 开始")
    time.sleep(0.1)
    print(f"任务 {task_name} 完成")

if __name__ == '__main__':
    with Pool(processes=3) as pool:
        results = [pool.apply_async(task_handler, (i,)) for i in range(5)]
        pool.close()
        pool.join()
        print("全部任务结束")

进程间通信:Queue 与 Pipe Queue 用于多个生产者-消费者场景,而 Pipe 适合点对点通信。

使用 Queue 传输数据:

from multiprocessing import Process, Queue

def producer(q, items):
    for item in items:
        q.put(item)
        print(f"放入队列: {item}")

def consumer(q):
    while True:
        try:
            item = q.get(timeout=1)
            print(f"取出: {item}")
        except:
            break

if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer, args=(q, ['A', 'B', 'C']))
    p2 = Process(target=producer, args=(q, ['X', 'Y']))
    c = Process(target=consumer, args=(q,))
    
    p1.start(); p2.start(); c.start()
    p1.join(); p2.join()
    c.terminate()

使用 Pipe 进行双工通信:

from multiprocessing import Process, Pipe
import time

def sender(conn, messages):
    for msg in messages:
        conn.send(msg)
        time.sleep(0.2)

def receiver(conn):
    while True:
        try:
            msg = conn.recv()
            print(f"接收: {msg}")
        except:
            break

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p1 = Process(target=sender, args=(parent_conn, ['Msg1', 'Msg2', 'Msg3']))
    p2 = Process(target=receiver, args=(child_conn,))
    
    p1.start(); p2.start()
    p1.join(); p2.join()

线程与协程:并发模型对比 Python 的线程通过 threading 模块实现,适用于高 I/O 密集型任务。

import threading
import time

def worker(name, urls):
    print(f"线程 {name} 启动")
    for url in urls:
        print(f"{name} -> {url}")
        time.sleep(0.1)

if __name__ == '__main__':
    t1 = threading.Thread(target=worker, args=('Thread-A', ['u1', 'u2']))
    t2 = threading.Thread(target=worker, args=('Thread-B', ['u3', 'u4']))
    t1.start(); t2.start()
    t1.join(); t2.join()

协程使用 gevent 库实现非阻塞并发,特别适合网络请求密集场景:

import gevent
import urllib.request

def fetch_url(url):
    print(f"访问: {url}")
    response = urllib.request.urlopen(url)
    data = response.read()
    print(f"{len(data)} 字节来自 {url}")

urls = ['https://httpbin.org/delay/1', 'https://httpbin.org/delay/1']
jobs = [gevent.spawn(fetch_url, url) for url in urls]
gevent.joinall(jobs)

分布式任务管理:远程队列 通过 multiprocessing.managers.BaseManager 实现跨机器的任务分发与结果收集。

# server.py
from multiprocessing.managers import BaseManager
import queue

task_queue = queue.Queue()
result_queue = queue.Queue()

class TaskManager(BaseManager): pass

TaskManager.register('get_task_queue', callable=lambda: task_queue)
TaskManager.register('get_result_queue', callable=lambda: result_queue)

if __name__ == '__main__':
    manager = BaseManager(address=('127.0.0.1', 8001), authkey=b'qiye')
    manager.start()
    
    # 添加任务
    for i in range(10):
        task_queue.put(f"Image_{i}")
    
    # 获取结果
    for _ in range(10):
        print(result_queue.get())
    
    manager.shutdown()

客户端连接并处理任务:

# client.py
from multiprocessing.managers import BaseManager

class TaskManager(BaseManager): pass

TaskManager.register('get_task_queue')
TaskManager.register('get_result_queue')

if __name__ == '__main__':
    manager = TaskManager(address=('127.0.0.1', 8001), authkey=b'qiye')
    manager.connect()
    
    task_queue = manager.get_task_queue()
    result_queue = manager.get_result_queue()
    
    while not task_queue.empty():
        item = task_queue.get()
        print(f"处理: {item}")
        result_queue.put(f"{item} -> 已完成")

网络编程:TCP 与 UDP 通信 TCP 服务端(多线程):

import socket
import threading

def handle_client(client_socket, address):
    print(f"连接来自 {address}")
    client_socket.send(b"欢迎连接")
    while True:
        data = client_socket.recv(1024)
        if not data or data.decode() == "exit":
            break
        print(f"收到: {data.decode()}")
        client_socket.send(f"回显: {data.decode()}".encode())
    client_socket.close()

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('127.0.0.1', 9999))
server_socket.listen(5)

print("等待连接...")
while True:
    client, addr = server_socket.accept()
    thread = threading.Thread(target=handle_client, args=(client, addr))
    thread.start()

UDP 服务端:

import socket

udp_server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_server.bind(('127.0.0.1', 9999))

print("UDP 服务启动...")
while True:
    data, addr = udp_server.recvfrom(1024)
    print(f"来自 {addr}: {data.decode()}")
    udp_server.sendto(f"Echo: {data.decode()}".encode(), addr)

相关文章

Linux crontab 详解

1) crontab 是什么cron 是 Linux 的定时任务守护进程;crontab 是用来编辑/查看“按时间周期执行命令”的表(cron table)。常见两类:用户 crontab:每个用户一份(crontab -e 编辑)系统级 crontab / cron.d:可指定执行用户(/etc/crontab、/etc/cron.d/*)2) crontab 时间...

富文本里可以允许的 HTML 属性

一、所有标签默认允许的安全属性(极少)class        (可选)id           (通常建议禁用)title️ 注意:id 容易被滥用做锚点注入,很多系统直接禁用class 允许的话最好只允许固定前缀(如 editor-*)二、a 标签允许属性<a href="" t...

Mac 安装 Node.js 指南

方法一:通过官网安装包(最简单,适合初学者)如果你只是想快速安装并开始使用,这是最直接的方法。访问 Node.js 官网。页面会显示两个版本:LTS (Recommended For Most Users):长期支持版,最稳定。建议选这个。Current:最新特性版,包含最新功能但可能不够稳定。下载 .pkg 安装包并运行。按照安装向导点击“下一步”即可完成。方法二:使用 Homebrew 安装(...

Dom\HTML_NO_DEFAULT_NS 的副作用:自动加闭合标签

在使用Dom\HTMLDocument时,Dom\HTML_NO_DEFAULT_NS 将禁止在解析过程中设置元素的命名空间, 此设置是为了与DOMDocument向后兼容而存在的。当使用它时,已知的一个副作用就是:自动加闭合标签例如 </img> 为什么会这样?当你使用:Dom\HTML_NO_DEFAULT_NS文档会变成 无命名空间模式,此时内部更接近 XML...

Laravel 事件和监听器创建

在 Laravel 中,使用 Artisan 命令创建 Events(事件) 和 Listeners(监听器) 是非常高效的。你可以通过以下几种方式来实现:1. 手动创建单个 Event如果你只想创建一个事件类,可以使用 make:event 命令:Bashphp artisan make:event UserRegistered执行后,文件将生成在 app/Even...

自定义域名解析神器 dnsmasq

什么是 dnsmasq?dnsmasq 是一个轻量级、功能强大的网络服务工具,专为小型和中等规模网络设计。它是一个综合的网络基础设施解决方案[1]。dnsmasq 能做什么?功能说明应用场景DNS 转发与缓存将 DNS 查询转发到上游服务器(ISP、Google DNS 等),并在本地缓存结果加快 DNS 查询速度,减少外部 DNS 流量本地 DNS解析本地网络设备的主机名,无需编辑&n...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。