Python多进程与并发编程实践
多进程基础:os.fork 与 multiprocessing
在 Python 中,可通过 os.fork() 实现轻量级进程创建。父进程调用该方法后,系统将复制当前进程,子进程从返回值开始执行不同逻辑。
import os
if __name__ == '__main__':
print(f"主进程ID: {os.getpid()}")
pid = os.fork()
if pid < 0:
print("fork 失败")
elif pid == 0:
print(f"子进程 {os.getpid()},父进程为 {os.getppid()}")
else:
print(f"主进程 {os.getpid()} 创建子进程 {pid}")
由于 Jupyter 环境限制,建议将多进程代码保存为 .py 文件并通过命令行运行,以避免内核冲突。
使用 multiprocessing.Process 可实现更安全的多进程管理:
from multiprocessing import Process
import os
def worker_task(task_id):
print(f"任务 {task_id} 在进程 {os.getpid()} 中执行")
if __name__ == '__main__':
processes = []
for i in range(5):
p = Process(target=worker_task, args=(i,))
p.start()
processes.append(p)
for p in processes:
p.join()
print("所有子进程完成")
进程池与任务调度:Pool 模块
multiprocessing.Pool 提供了高效的并行任务处理机制,支持固定数量的子进程同时工作。
from multiprocessing import Pool
import os
import time
def task_handler(task_name):
print(f"任务 {task_name} 在进程 {os.getpid()} 开始")
time.sleep(0.1)
print(f"任务 {task_name} 完成")
if __name__ == '__main__':
with Pool(processes=3) as pool:
results = [pool.apply_async(task_handler, (i,)) for i in range(5)]
pool.close()
pool.join()
print("全部任务结束")
进程间通信:Queue 与 Pipe
Queue 用于多个生产者-消费者场景,而 Pipe 适合点对点通信。
使用 Queue 传输数据:
from multiprocessing import Process, Queue
def producer(q, items):
for item in items:
q.put(item)
print(f"放入队列: {item}")
def consumer(q):
while True:
try:
item = q.get(timeout=1)
print(f"取出: {item}")
except:
break
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=(q, ['A', 'B', 'C']))
p2 = Process(target=producer, args=(q, ['X', 'Y']))
c = Process(target=consumer, args=(q,))
p1.start(); p2.start(); c.start()
p1.join(); p2.join()
c.terminate()
使用 Pipe 进行双工通信:
from multiprocessing import Process, Pipe
import time
def sender(conn, messages):
for msg in messages:
conn.send(msg)
time.sleep(0.2)
def receiver(conn):
while True:
try:
msg = conn.recv()
print(f"接收: {msg}")
except:
break
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p1 = Process(target=sender, args=(parent_conn, ['Msg1', 'Msg2', 'Msg3']))
p2 = Process(target=receiver, args=(child_conn,))
p1.start(); p2.start()
p1.join(); p2.join()
线程与协程:并发模型对比
Python 的线程通过 threading 模块实现,适用于高 I/O 密集型任务。
import threading
import time
def worker(name, urls):
print(f"线程 {name} 启动")
for url in urls:
print(f"{name} -> {url}")
time.sleep(0.1)
if __name__ == '__main__':
t1 = threading.Thread(target=worker, args=('Thread-A', ['u1', 'u2']))
t2 = threading.Thread(target=worker, args=('Thread-B', ['u3', 'u4']))
t1.start(); t2.start()
t1.join(); t2.join()
协程使用 gevent 库实现非阻塞并发,特别适合网络请求密集场景:
import gevent
import urllib.request
def fetch_url(url):
print(f"访问: {url}")
response = urllib.request.urlopen(url)
data = response.read()
print(f"{len(data)} 字节来自 {url}")
urls = ['https://httpbin.org/delay/1', 'https://httpbin.org/delay/1']
jobs = [gevent.spawn(fetch_url, url) for url in urls]
gevent.joinall(jobs)
分布式任务管理:远程队列
通过 multiprocessing.managers.BaseManager 实现跨机器的任务分发与结果收集。
# server.py
from multiprocessing.managers import BaseManager
import queue
task_queue = queue.Queue()
result_queue = queue.Queue()
class TaskManager(BaseManager): pass
TaskManager.register('get_task_queue', callable=lambda: task_queue)
TaskManager.register('get_result_queue', callable=lambda: result_queue)
if __name__ == '__main__':
manager = BaseManager(address=('127.0.0.1', 8001), authkey=b'qiye')
manager.start()
# 添加任务
for i in range(10):
task_queue.put(f"Image_{i}")
# 获取结果
for _ in range(10):
print(result_queue.get())
manager.shutdown()
客户端连接并处理任务:
# client.py
from multiprocessing.managers import BaseManager
class TaskManager(BaseManager): pass
TaskManager.register('get_task_queue')
TaskManager.register('get_result_queue')
if __name__ == '__main__':
manager = TaskManager(address=('127.0.0.1', 8001), authkey=b'qiye')
manager.connect()
task_queue = manager.get_task_queue()
result_queue = manager.get_result_queue()
while not task_queue.empty():
item = task_queue.get()
print(f"处理: {item}")
result_queue.put(f"{item} -> 已完成")
网络编程:TCP 与 UDP 通信 TCP 服务端(多线程):
import socket
import threading
def handle_client(client_socket, address):
print(f"连接来自 {address}")
client_socket.send(b"欢迎连接")
while True:
data = client_socket.recv(1024)
if not data or data.decode() == "exit":
break
print(f"收到: {data.decode()}")
client_socket.send(f"回显: {data.decode()}".encode())
client_socket.close()
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('127.0.0.1', 9999))
server_socket.listen(5)
print("等待连接...")
while True:
client, addr = server_socket.accept()
thread = threading.Thread(target=handle_client, args=(client, addr))
thread.start()
UDP 服务端:
import socket
udp_server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_server.bind(('127.0.0.1', 9999))
print("UDP 服务启动...")
while True:
data, addr = udp_server.recvfrom(1024)
print(f"来自 {addr}: {data.decode()}")
udp_server.sendto(f"Echo: {data.decode()}".encode(), addr)