본문 바로가기

코틀린

코루틴 채널 개념 및 예제

반응형

개념

채널은 코루틴을 연결해 주는 파이프 라인이다.

송신측 채널에서 send를 해서 데이터를 전해줄 수 있고

수신측 채널에서는 receive를 통해 데이터를 받을 수 있다.

 

ex)

fun main() {
    runBlocking {
        val channel = Channel<Int>()
        launch {
            for (i in 0 until 10) {
                channel.send(i)
            }
        }

        repeat(10) {
            println(channel.receive())
        }
        println("end")
    }
}

 

출력)

0
1
2
3
4
5
6
7
8
9
end

 

기본 사용법

send receive가 suspension point이고 서로에게 의존적이기 때문에 같은 코루틴 내에서 사용할 경우 무한 대기가 이루어 짐으로 코루틴 내에서 launch 등을 통해 따로 실행해야 한다. 

ex)

fun main() {
    val channel = Channel<Int>()
    launch {
        for (ix in 0 until 10) {
            channel.send(i)
        }

        repeat(10) {
            println(channel.receive())
        }
        println("end")
    }
}

무한대기가 이루어 짐으로 에러가 발생한다.

 

fun main() {
    runBlocking {
        val channel = Channel<Int>()
        launch {
            for (ix in 0 until 10) {
                channel.send(i)
            }

            println("end")
        }
        
        launch {
            repeat(10) {
                println(channel.receive())
            }
        }
    }
}

채널을 닫으면 for문은 자동으로 종료된다.

ex)

fun main() {
    runBlocking {
        val channel = Channel<Int>()
        launch {
            for (i in 0 until 10) {
                channel.send(i)
            }
            channel.close()
        }

        for (i in channel) {
            println(i)
        }
        println("end")
    }
}

 

출력)

0
1
2
3
4
5
6
7
8
9
end

발행이 완료되기전에 close를 호출하면 오류가 난다.

ex)

fun main() {
    runBlocking {
        val channel = Channel<Int>()
        launch {
            for (i in 0 until 10) {
                channel.send(i)
            }
        }

        for (i in channel) {
            if(i==5) channel.close()

            println(i)
        }
        println("end")
    }
}

 

출력)

0
1
2
3
4
5
end
Exception in thread "main" kotlinx.coroutines.channels.ClosedSendChannelException: Channel was closed

 

파이프라인

개념

파이프 라인은 하나의 스트림을 프로듀서가 생성하고 다른 코루틴에서 해당 스트림을 읽어 새로운 스트림을 만드는 패턴이다.

ex)

fun CoroutineScope.produceNumbers() = produce {
    var i = 1
    while (true) {
        send(i++)
    }
}

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (i in numbers) send(i * i)
}


fun main() = runBlocking<Unit> {
    val numbers = produceNumbers()
    val stringNumbers = square(numbers) // 다른 코루틴에서 해당 스트림을 읽어 새로운 스트림을 만듬

    repeat(5) {
        println(stringNumbers.receive())
    }
    println("end")
    coroutineContext.cancelChildren()
}

 

출력)
1
4
9
16
25
end

팬 아웃

개념

여러 코루틴에서 동일한 채널을 통해 데이터를 받는다.

ex)

fun CoroutineScope.produceNumbers() = produce<Int> {
    var i = 1
    while (true) {
        send(i++)
        delay(100L)
    }
}

fun CoroutineScope.processNumber(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("프로세서 ${id}가 ${msg}을 받았습니다.")
    }
}


fun main() = runBlocking<Unit> {
    val producer = produceNumbers()
    repeat (5) {
        processNumber(it, producer)
    }
    delay(1000L)
    producer.cancel()
}

 

출력)
프로세서 0가 1을 받았습니다.
프로세서 0가 2을 받았습니다.
프로세서 1가 3을 받았습니다.
프로세서 2가 4을 받았습니다.
프로세서 3가 5을 받았습니다.
프로세서 4가 6을 받았습니다.
프로세서 0가 7을 받았습니다.
프로세서 1가 8을 받았습니다.
프로세서 2가 9을 받았습니다.
프로세서 3가 10을 받았습니다.

팬 인

개념

여러 코루틴에서 한 채널을 통해 데이터를 발행한다.

ex)

suspend fun produceNumbers(channel: SendChannel<Int>, current: Int, time: Long) {
    var i = current
    while (true) { //1000L에 종료
        channel.send(i)
        delay(time)
        i += i
    }
}

fun CoroutineScope.processNumber(channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("${msg}을 받았습니다.")
    }
}


fun main() = runBlocking<Unit> {
    val channel = Channel<Int>()
    launch {
        produceNumbers(channel, 2, 100L) //2의 배수 100L마다 3의 배수 생산
    }
    launch {
        produceNumbers(channel, 3, 150L) //3의 배수 150L마다 3의 배수 생산
    }
    processNumber(channel)
    delay(1000L) // 1000L에 종료한다.
    println("end")
    coroutineContext.cancelChildren()
}

 

출력)
2을 받았습니다.
3을 받았습니다.
4을 받았습니다.
6을 받았습니다.
8을 받았습니다.
12을 받았습니다.
16을 받았습니다.
32을 받았습니다.
24을 받았습니다.
64을 받았습니다.
48을 받았습니다.
128을 받았습니다.
256을 받았습니다.
96을 받았습니다.
512을 받았습니다.
192을 받았습니다.
1024을 받았습니다.

 

참고

https://kotlinlang.org/docs/channels.html

반응형