Kotlin协程通道(Channel)核心机制解析
一、Channel基础应用
1.1 Channel核心概念
Channel本质是协程间的数据传输队列,提供非阻塞的线程安全通信机制。与BlockingQueue不同,Channel使用挂起函数send()和receive()实现异步数据传递,必须在协程内使用。
1.2 基础操作示例
val dataPipe = Channel<Int>()
launch {
(1..5).forEach {
dataPipe.send(it * it)
}
}
repeat(5) {
println(dataPipe.receive())
}
println("传输完成")
1.3 通道迭代机制
直接遍历Channel会持续等待新数据:
val numChannel = Channel<Int>()
launch {
(1..5).forEach { numChannel.send(it) }
}
// 持续等待新数据导致阻塞
for (item in numChannel) {
println(item)
}
1.4 通道关闭策略
val dataFlow = Channel<Int>()
launch {
(1..5).forEach { dataFlow.send(it) }
dataFlow.close() // 显式关闭
}
dataFlow.consumeEach {
println(it)
}
println("处理完毕")
关闭后isClosedForSend立即为true,isClosedForReceive需等待缓冲清空。
1.5 热流特性
Channel属于热数据流,数据生产与消费状态无关,区别于冷流(Flow)的按需触发特性。
二、通道类型体系
2.1 发送与接收接口
Channel继承SendChannel和ReceiveChannel:
- SendChannel:提供
send()(挂起发送)、trySend()(非阻塞发送)、close() - ReceiveChannel:提供
receive()(挂起接收)、tryReceive()(非阻塞接收)、cancel()
2.2 通道类型分类
// 创建四种通道类型
val rendezvousChan = Channel<String>() // 默认0缓冲
val bufferedChan = Channel<String>(10) // 指定缓冲大小
val conflatedChan = Channel<String>(CONFLATED) // 仅保留最新元素
val unlimitedChan = Channel<String>(UNLIMITED) // 无限缓冲
三、协程通信实践
3.1 多协程共享通道
runBlocking {
val sharedChan = Channel<Int>()
listOf(1,2,3).forEach { id ->
launch {
sharedChan.consumeEach {
println("协程$id: $it")
}
}
}
(1..10).forEach { sharedChan.send(it) }
sharedChan.close()
}
3.2 生产者-消费者模式
生产者构建:
val producer = produce {
var count = 0
while (active) {
send(count++)
delay(100)
}
}
消费者构建:
val consumer = actor<Int> {
for (msg in channel) {
processData(msg)
}
}
3.3 扇入与扇出模式
- 扇出(Fan-out):多个消费者从同一通道接收
- 扇入(Fan-in):多个生产者向同一通道发送
3.4 广播通道(BroadcastChannel)
3.4.1 基础用法
val broadcast = BroadcastChannel<Int>(5)
val receiver = broadcast.openSubscription()
launch {
receiver.consumeEach {
println("接收: $it")
}
}
3.4.2 通道转换
val originChan = Channel<Int>()
val broadcastChan = originChan.broadcast(3)
3.4.3 替代方案
自Kotlin 1.5起,BroadcastChannel被标记为过时,推荐使用SharedFlow和StateFlow实现广播场景。