Unix进程管理与IPC机制实践
1. 僵尸进程与孤儿进程
1.1 背景
Unix系统中进程呈树状结构,所有子进程均由父进程派生。子进程的生命周期与父进程异步,当子进程终止时,父进程可能尚未感知。此时需通过wait()或waitpid()主动回收子进程状态信息。
1.2 僵尸进程
子进程终止后,若父进程未及时回收其退出状态,该进程实体仍驻留于进程表,形成僵尸状态(Zombie)。此类进程不占用CPU,但消耗进程号等内核资源,大量积聚将导致系统无法创建新进程。
典型场景:IDE关闭后,关联调试进程残留于系统。
解决方案
- 终止父进程,使僵尸进程被init接管回收
- 调用
join()同步等待:内部封装wait(),确保子进程资源释放 - 信号机制:注册
SIGCHLD处理器异步回收
from multiprocessing import Process
import os, time
def worker():
print(f"worker pid: {os.getpid()}")
time.sleep(2)
if __name__ == '__main__':
p = Process(target=worker)
p.start()
p.join() # 阻塞等待,内部调用 wait() 回收
print(f"after join, pid still visible: {p.pid}") # 仅数字残留,内核已释放
print("main done")
1.3 孤儿进程
父进程先于子进程退出,子进程失去父进程关联,由init进程(PID 1)收养。init会自动调用wait(),故孤儿进程无危害。
1.4 危害对比
僵尸进程危害显著:持续占用进程表项,且父进程若忽略处理将无限累积;孤儿进程因init接管而安全。
2. 守护进程机制
2.1 定义
守护进程(Daemon)为后台常驻服务进程,脱离终端控制,生命周期随系统启停。典型实例:Web服务器、日志服务、定时任务调度器。
2.2 基础行为对比
默认情况下,主进程退出不影响子进程继续执行:
from multiprocessing import Process
import time
def background_task(tid):
print(f"task {tid} launched")
time.sleep(2)
print(f"task {tid} finished")
def spawn():
p = Process(target=background_task, args=(1,))
p.start()
if __name__ == '__main__':
print("main start")
spawn()
print("main end")
# 输出顺序:main start → main end → task 1 launched → task 1 finished
2.3 守护进程设置
方式一:构造参数
from multiprocessing import Process
import time
def service(tid):
print(f"daemon {tid} running")
time.sleep(5)
print(f"daemon {tid} done") # 可能无法执行
def launch():
p = Process(target=service, args=(1,), daemon=True)
p.start()
if __name__ == '__main__':
launch()
time.sleep(1)
print("main exits") # 守护进程随主进程终止
方式二:属性赋值
p = Process(target=service, args=(1,))
p.daemon = True # 必须在 start() 之前设置
p.start()
错误示例
p.start()
p.daemon = True # RuntimeError: 进程已启动无法更改
3. 进程间通信理论基础
3.1 IPC核心概念
进程拥有独立地址空间,需借助内核提供的机制交换数据。multiprocessing模块封装了队列与管道两种主要方式。
3.2 管道(Pipe)
半双工字节流通道,os.pipe()返回读写端文件描述符。单向传输特性决定其适用场景有限,且需自行处理同步。
3.3 队列(Queue)
基于管道与锁实现的线程/进程安全FIFO容器,内置同步机制,为多生产者-多消费者场景的首选。
4. 队列操作详解
4.1 基础用法
import queue
# 无界队列
q1 = queue.Queue()
# 有界队列,容量3
q2 = queue.Queue(maxsize=3)
4.2 阻塞与超时控制
import queue
q = queue.Queue(2)
q.put("alpha")
q.put("beta")
# 阻塞写入,2秒超时
try:
q.put("gamma", block=True, timeout=2)
except queue.Full:
print("queue saturated")
# 非阻塞写入,立即异常
# q.put("gamma", block=False)
# 读取同理
val = q.get(block=True, timeout=1)
4.3 状态检测
q.empty() # 判空(多线程下不可靠)
q.full() # 判满
q.get_nowait() # 等价于 get(block=False)
q.put_nowait(x) # 等价于 put(x, block=False)
5. 队列实现进程通信
5.1 主从进程交互
常见错误:在join()前读取队列,导致子进程阻塞。
from multiprocessing import Process, Queue
def handler(q):
msg = q.get()
print(f"received: {msg}")
q.put(f"echo: {msg}")
def flawed():
q = Queue()
q.put("hello")
p = Process(target=handler, args=(q,))
p.start()
# 错误:此时子进程可能尚未写入
print(q.get()) # 可能取到"hello"而非响应
p.join()
# 修正:join后读取
def correct():
q = Queue()
q.put("hello")
p = Process(target=handler, args=(q,))
p.start()
p.join()
print(q.get()) # 确保子进程已完成写入
5.2 子进程间直接通信
from multiprocessing import Queue, Process
def emitter(q, name):
print(f"[{name}] producing")
q.put(f"data from {name}")
def collector(q, name):
print(f"[{name}] waiting")
data = q.get()
print(f"[{name}] got: {data}")
def orchestrate():
q = Queue()
p1 = Process(target=emitter, args=(q, "producer"))
p2 = Process(target=collector, args=(q, "consumer"))
p1.start()
p1.join() # 确保数据已入队
p2.start()
p2.join()
if __name__ == '__main__':
orchestrate()
6. 生产者-消费者模型
6.1 基础模型缺陷
from multiprocessing import Queue, Process
def producer(q, items):
for i in range(items):
q.put(f"item-{i}")
print("producer done")
def consumer(q):
while True:
item = q.get() # 队空时永久阻塞
print(f"consumed: {item}")
def run_flawed():
q = Queue()
p = Process(target=producer, args=(q, 3))
c = Process(target=consumer, args=(q,))
p.start(); c.start()
p.join(); c.join() # 消费者永不退出,死锁
6.2 终止信号方案
import time
from multiprocessing import Queue, Process
def producer_v2(q, items, sentinel):
for i in range(items):
q.put(f"product-{i}")
q.put(sentinel) # 发送终止标记
def consumer_v2(q, sentinel):
while True:
item = q.get()
if item == sentinel:
break
print(f"processing: {item}")
time.sleep(0.1)
def run_with_sentinel():
q = Queue()
sentinel = None # 或使用 object() 保证唯一性
p = Process(target=producer_v2, args=(q, 5, sentinel))
c = Process(target=consumer_v2, args=(q, sentinel))
p.start(); c.start()
p.join(); c.join()
7. JoinableQueue高级应用
7.1 核心机制
JoinableQueue扩展自Queue,通过计数器跟踪未完成任务:
task_done():消费者标记单个项目处理完毕join():生产者阻塞等待所有项目被标记
7.2 单进程演示
from multiprocessing import JoinableQueue
jq = JoinableQueue()
jq.put("a")
jq.put("b")
print(jq.get()) # a
jq.task_done()
print(jq.get()) # b
jq.task_done()
jq.join() # 立即返回,所有项目已处理
print("all cleared")
7.3 完整生产消费实现
import time
from multiprocessing import JoinableQueue, Process
def chef(name, dish, q):
for n in range(2):
meal = f"{name}'s {dish} #{n}"
q.put(meal)
print(f"produced: {meal}")
q.join() # 等待自己生产的全部被消费
print(f"{name} finished")
def diner(name, q):
while True:
meal = q.get()
print(f"[{name}] eating {meal}")
time.sleep(0.5)
q.task_done() # 通知队列该条已处理
def run_jq_model():
q = JoinableQueue()
chefs = [
Process(target=chef, args=("Alice", "Pizza", q)),
Process(target=chef, args=("Bob", "Pasta", q))
]
diners = [
Process(target=diner, args=("Charlie", q), daemon=True),
Process(target=diner, args=("David", q), daemon=True)
]
for c in chefs: c.start()
for d in diners: d.start()
for c in chefs: c.join() # 主进程等待生产者完成
# 生产者join返回时,队列已空,守护消费者随主进程终止
if __name__ == '__main__':
run_jq_model()
7.4 常见误区
| 错误配置 | 后果 |
|---|---|
| 消费者既daemon又join | 逻辑矛盾,join要求主进程等待,daemon要求随主进程终止 |
| 生产者不join,消费者不daemon | 主进程快速退出,消费者阻塞于空队列 |
| task_done次数超过get | 引发ValueError异常 |
8. 管道通信实践
双工管道需成对使用,注意关闭未使用端避免阻塞:
from multiprocessing import Pipe, Process
def sender(conn, items):
rx, tx = conn
rx.close() # 关闭接收端
for i in items:
tx.send(f"packet-{i}")
tx.close()
def receiver(conn, name):
rx, tx = conn
tx.close() # 关闭发送端
try:
while True:
data = rx.recv()
print(f"[{name}] received: {data}")
except EOFError:
rx.close()
def run_pipe():
parent_conn, child_conn = Pipe()
p = Process(target=sender, args=((parent_conn, child_conn), range(3)))
c = Process(target=receiver, args=((parent_conn, child_conn), "worker"))
p.start(); c.start()
p.join(); c.join()
if __name__ == '__main__':
run_pipe()