본문 바로가기

코틀린

코틀린 StateFlow 및 SharedFlow

반응형

스트림

콜드 스트림인 Flow와 달리 StateFlow와 SharedFlow는 핫 스트림이다.

Flow에서 콜드 스트림이란 collect 시점과 상관 없이 emit 된 모든 값들을 받아오는 것 을 말한다.

핫 스트림이란 collect한 시점 이후 발생한 emit 값만 받아 오는 것을 말한다.

ex)

lateinit var flowTest : Flow<Int>

val stateFlowTest = MutableStateFlow(0)

val sharedFlowTest = MutableSharedFlow<Int>(0)

someScope.launch {
    launch {
        flowTest = flow {
            for (i in 0 until 10) {
                emit(i)
                delay(1000)
            }
        }
    }

    launch {
        for (i in 0 until 10) {
            stateFlowTest.emit(i)
            sharedFlowTest.emit(i)
            delay(1000)
        }
    }
    delay(5000)

    launch {
        flowTest.collect {
            Log.i("flowTest", it.toString())
        }
    }

    launch {
        stateFlowTest.collect {
            Log.i("stateFlowTest", it.toString())
        }
    }

    launch {
        sharedFlowTest.collect {
            Log.i("sharedFlowTest", it.toString())
        }
    }
}

아래와 같이 콜드 스트림인 flow는 0부터 9까지 전부 출력하고 핫 스트림인 StateFlow와 SharedFlow는 5초 이후 부터 출력하는걸 볼 수 있다.

출력)

flowTest: 0
stateFlowTest: 4
stateFlowTest: 5
sharedFlowTest: 5
flowTest: 1
stateFlowTest: 6
sharedFlowTest: 6
flowTest: 2
stateFlowTest: 7
sharedFlowTest: 7
flowTest: 3
stateFlowTest: 8
sharedFlowTest: 8
flowTest: 4
stateFlowTest: 9
sharedFlowTest: 9
flowTest: 5
flowTest: 6
flowTest: 7
flowTest: 8
flowTest: 9

StateFlow

StateFlow는 Livedata와 사용용도가 비슷하다.

databinding 그리고 collectAsState()등을 통해 compose의 상태로 이용가능하다.

StateFlow는 Livedata와 달리 초기값을 필요로 한다.

Flow와 달리 중복 값을 필터링 한다.

SharedFlow

SharedFlow는 StateFlow와 달리 중복 값을 필터링 하지 않으며 다양한 매개변수를 통해 동작을 정의할 수 있다.

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
)

replay를 사용하면 핫 흐름시 놓치는 데이터를 가져올 수 있다.

ex) 위의 출력을 기준으로

//replay = 1일 경우
sharedFlowTest: 3 // 놓쳤던 첫 번째 이전 흐름 부터
sharedFlowTest: 4 // 시작 흐름
...
sharedFlowTest: 9

//replay = 2일 경우
sharedFlowTest: 2 // 놓쳤던 두 번째 이전 흐름 부터
sharedFlowTest: 3
sharedFlowTest: 4 // 시작 흐름
...
sharedFlowTest: 9

extraBufferCapacity

여러 Flow가 emit을 해줄 경우 collect 하지 못하고 버퍼가 가득찬 부분에 대한 Capacity이다.

onBufferOverflow

설정한 extraBufferCapacity의 Capacity를 넘어 섰을 때 적용할 정책을 지정할 수 있다. (defualt는 BufferOverflow.SUSPEND 이다.)

BufferOverflow.SUSPEND 버퍼가 가득 찬 경우 버퍼에 여유공간이 생길때 까지 suspend 한다.

BufferOverflow.DROP_LATEST 버퍼가 가득 찬 경우 가장 최근 데이터를 drop 하고 새로운 데이터를 버퍼에 넣는다.

BufferOverflow.DROP_OLDEST 버퍼가 가득 찬 경우 가장 오래된 데이터를 drop하고 새로운 데이터를 버퍼에 넣는다.

stateIn 및 sharedIn

콜드 스트림인 Flow를 핫 스트림으로 바꾸어 줄 수 있다. (Flow.stateIn -> StateFlow, Flow.sharedIn -> SharedFlow)

public fun <T> Flow<T>.stateIn(
    scope: CoroutineScope,
    started: SharingStarted,
    initialValue: T
): StateFlow<T>
public fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T> 

scope

해당 flow를 실행할 scope를 지정해 줄 수 있다.

started

public val Eagerly: SharingStarted = StartedEagerly()

/**
 * Sharing is started when the first subscriber appears and never stops.
 */
public val Lazily: SharingStarted = StartedLazily()

/**
 * Sharing is started when the first subscriber appears, immediately stops when the last
 * subscriber disappears (by default), keeping the replay cache forever (by default).
 *
 * It has the following optional parameters:
 *
 * * [stopTimeoutMillis] &mdash; configures a delay (in milliseconds) between the disappearance of the last
 *   subscriber and the stopping of the sharing coroutine. It defaults to zero (stop immediately).
 * * [replayExpirationMillis] &mdash; configures a delay (in milliseconds) between the stopping of
 *   the sharing coroutine and the resetting of the replay cache (which makes the cache empty for the [shareIn] operator
 *   and resets the cached value to the original `initialValue` for the [stateIn] operator).
 *   It defaults to `Long.MAX_VALUE` (keep replay cache forever, never reset buffer).
 *   Use zero value to expire the cache immediately.
 *
 * This function throws [IllegalArgumentException] when either [stopTimeoutMillis] or [replayExpirationMillis]
 * are negative.
 */
@Suppress("FunctionName")
public fun WhileSubscribed(
    stopTimeoutMillis: Long = 0,
    replayExpirationMillis: Long = Long.MAX_VALUE
): SharingStarted =
    StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)

위와 같이 Eagerly share, Lazily, WhileSubscribed 세가지로 나뉜다.

Eagerly

share(동작)가 즉시 시작되고 중간에 멈추지 않는다.

Lazily

처음 collect가 이루어 진 후 share(동작)를 시작하며 중간에 멈추지 않는다.

WhileSubscribed

처음 collect가 이루어 진 후 share(동작)를 시작하며 마지막 구독자가 사라지면 즉시 중지하며 재생 캐시를 영구적으로 유지한다.

  • stopTimeourMillis는 마지막 구독자가 사라질 때 코루틴이 중지를 지연시키는 시간이다.
  • replayExpirationMillis는 코루틴 중지와 재생 캐시 재설정 사이의 지연 시간이다.

출처

https://developer.android.com/kotlin/flow/stateflow-and-sharedflow?hl=ko

https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow

[kotlinx.coroutines.flow

Flow — asynchronous cold stream of elements.

kotlin.github.io](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow)

반응형