close

這是 JsonChao 的第277期分享

Kotlin Flow 是基於 Kotlin 協程基礎能力搭建的一套數據流框架,從功能複雜性上看是介於 LiveData 和 RxJava 之間的解決方案。Kotlin Flow 擁有比 LiveData 更豐富的能力,但裁剪了 RxJava 大量複雜的操作符,做得更加精簡。並且在 Kotlin 協程的加持下,Kotlin Flow 目前是 Google 主推的數據流框架。

1. 為什麼要使用 Flow?

LiveData、Kotlin Flow 和 RxJava 三者都屬於可觀察的數據容器類,觀察者模式是它們相同的基本設計模式,那麼相對於其他兩者,Kotlin Flow 的優勢是什麼呢?

LiveData 是 androidx 包下的組件,是 Android 生態中一個的簡單的生命周期感知型容器。簡單即是它的優勢,也是它的局限,當然這些局限性不應該算 LiveData 的缺點,因為 LiveData 的設計初衷就是一個簡單的數據容器。對於簡單的數據流場景,使用 LiveData 完全沒有問題。

LiveData 只能在主線程更新數據:只能在主線程 setValue,即使 postValue 內部也是切換到主線程執行;

LiveData 數據重放問題:註冊新的訂閱者,會重新收到 LiveData 存儲的數據,這在有些情況下不符合預期(可以使用自定義的 LiveData 子類SingleLiveData或UnPeekLiveData解決,此處不展開);

LiveData 不防抖:重複 setValue 相同的值,訂閱者會收到多次onChanged()回調(可以使用distinctUntilChanged()解決,此處不展開);

LiveData 不支持背壓:在數據生產速度 > 數據消費速度時,LiveData 無法正常處理。比如在子線程大量postValue數據但主線程消費跟不上時,中間就會有一部分數據被忽略。

RxJava 是第三方組織 ReactiveX 開發的組件,Rx 是一個包括 Java、Go 等語言在內的多語言數據流框架。功能強大是它的優勢,支持大量豐富的操作符,也支持線程切換和背壓。然而 Rx 的學習門檻過高,對開發反而是一種新的負擔,也會帶來誤用的風險。

Kotlin 是 kotlinx 包下的組件,不是單純 Android 生態下的產物。那麼,Flow 的優勢在哪裡呢?

Flow 支持協程:Flow 基於協程基礎能力,能夠以結構化並發的方式生產和消費數據,能夠實現線程切換(依靠協程的 Dispatcher);

Flow 支持背壓:Flow 的子類 SharedFlow 支持配置緩存容量,可以應對數據生產速度 > 數據消費速度的情況;

Flow 支持數據重放配置:Flow 的子類 SharedFlow 支持配置重放 replay,能夠自定義對新訂閱者重放數據的配置;

Flow 相對 RxJava 的學習門檻更低:Flow 的功能更精簡,學習性價比相對更高。不過 Flow 是基於協程,在協程會有一些學習成本,但這個應該拆分來看。

當然 Kotlin Flow 也存在一些局限:

Flow 不是生命周期感知型組件:Flow 不是 Android 生態下的產物,自然 Flow 是不會關心組件生命周期。那麼我們如何確保訂閱者在監聽 Flow 數據流時,不會在錯誤的狀態更新 View 呢?這個問題在下文第 6 節再說。

UnPeekLiveData

https://github.com/KunMinX/UnPeek-LiveData

2. 冷數據流與熱數據流

Kotlin Flow 包含三個實體:數據生產方 - (可選的)中介者 - 數據使用方。數據生產方負責向數據流發射(emit)數據,而數據使用方從數據流中消費數據。根據生產方產生數據的時機,可以將 Kotlin Flow 分為冷流和熱流兩種:

普通 Flow(冷流):冷流是不共享的,也沒有緩存機制。冷流只有在訂閱者 collect 數據時,才按需執行發射數據流的代碼。冷流和訂閱者是一對一的關係,多個訂閱者間的數據流是相互獨立的,一旦訂閱者停止監聽或者生產代碼結束,數據流就自動關閉。

SharedFlow / StateFlow(熱流):熱流是共享的,有緩存機制的。無論是否有訂閱者 collect 數據,都可以生產數據並且緩存起來。熱流和訂閱者是一對多的關係,多個訂閱者可以共享同一個數據流。當一個訂閱者停止監聽時,數據流不會自動關閉(除非使用WhileSubscribed策略,這個在下文再說)。

3. 普通 Flow(冷流)

普通 Flow 是冷流,數據是不共享的,也沒有緩存機制。數據源會延遲到消費者開始監聽時才生產數據(如終端操作 collect{}),並且每次訂閱都會創建一個全新的數據流。一旦消費者停止監聽或者生產者代碼結束,Flow 會自動關閉。

val coldFlow: Flow<Int> = flow { // 生產者代碼 while(true) { // 執行計算 emit(result) delay(100) } // 生產者代碼結束,流將被關閉}.collect{ data -> }

冷流 Flow 主要的操作如下:

創建數據流 flow{}:Flow 構造器會創建一個新的數據流。flow{} 是 suspend 函數,需要在協程中執行;

發送數據 emit():emit() 將一個新的值發送到數據流中;

終端操作 collect{}:觸發數據流消費,可以獲取數據流中所有的發出值。Flow 是冷流,數據流會延遲到終端操作 collect 才執行,並且每次在 Flow 上重複調用 collect,都會重複執行 flow{} 去觸發發送數據動作(源碼位置:AbstractFlow)。collect 是 suspend 函數,需要在協程中執行。

異常捕獲 catch{}:catch{} 會捕獲數據流中發生的異常;

協程上下文切換 flowOn():更改上流數據操作的協程上下文 CoroutineContext,對下流操作沒有影響。如果有多個 flowOn 運算符,每個 flowOn 只會更改當前位置的上游數據流;

狀態回調 onStart:在數據開始發送之前觸發,在數據生產線程回調;

狀態回調 onCompletion:在數據發送結束之後觸發,在數據生產線程回調;

狀態回調 onEmpty:在數據流為空時觸發(在數據發送結束但事實上沒有發送任何數據時),在數據生產線程回調。

普通 Flow 的核心代碼在 AbstractFlow 中,可以看到每次調用終端操作 collect,collector 代碼塊都會執行一次,也就是重新執行一次數據生產代碼:

AbstractFlow.kt

public abstract class AbstractFlow<T> : Flow<T> { @InternalCoroutinesApi public final override suspend fun collect(collector: FlowCollector<T>) { // 1. 對 flow{} 的包裝 val safeCollector = SafeCollector(collector, coroutineContext) try { // 2. 執行 flow{} 代碼塊 collectSafely(safeCollector) } finally { // 3. 釋放協程相關的參數 safeCollector.releaseIntercepted() } } public abstract suspend fun collectSafely(collector: FlowCollector<T>)}private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() { override suspend fun collectSafely(collector: FlowCollector<T>) { collector.block() }}4. SharedFlow —— 高配版 LiveData

下文要講的 StateFlow 其實是 SharedFlow 的一個子類,所以我們先講 SharedFlow。SharedFlow 和 StateFlow 都屬於熱流,無論是否有訂閱者(collect),都可以生產數據並且緩存。它們都有一個可變的版本 MutableSharedFlow 和 MutableStateFlow,這與 LiveData 和 MutableLiveData 類似,對外暴露接口時,應該使用不可變的版本。

4.1 SharedFlow 與 MutableSharedFlow 接口

直接對着接口講不明白,這裡先放出這兩個接口方便查看:

public interface SharedFlow<out T> : Flow<T> { // 緩存的重放數據的快照 public val replayCache: List<T>}public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> { // 發射數據(注意這是個掛起函數) override suspend fun emit(value: T) // 嘗試發射數據(如果緩存溢出策略是 SUSPEND,則溢出時不會掛起而是返回 false) public fun tryEmit(value: T): Boolean // 活躍訂閱者數量 public val subscriptionCount: StateFlow<Int> // 重置重放緩存,新訂閱者只會收到註冊後新發射的數據 public fun resetReplayCache()}4.2 構造一個 SharedFlow

我會把 SharedFlow 理解為一個高配版的 LiveData,這點首先在構造函數就可以體現出來。SharedFlow 的構造函數允許我們配置三個參數:

SharedFlow.kt

public fun <T> MutableSharedFlow( // 重放數據個數 replay: Int = 0, // 額外緩存容量 extraBufferCapacity: Int = 0, // 緩存溢出策略 onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): MutableSharedFlow<T> { val bufferCapacity0 = replay + extraBufferCapacity val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0 // coerce to MAX_VALUE on overflow return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)}public enum class BufferOverflow { // 掛起 SUSPEND, // 丟棄最早的一個 DROP_OLDEST, // 丟棄最近的一個 DROP_LATEST}

參數

描述reply重放數據個數,當新訂閱者註冊時會重放緩存的 replay 個數據extraBufferCapacity額外緩存容量,在 replay 之外的額外容量,SharedFlow 的緩存容量 capacity = replay + extraBufferCapacity(實在想不出額外容量有什麼用,知道可以告訴我)onBufferOverflow緩存溢出策略,即緩存容量 capacity 滿時的處理策略(SUSPEND、DROP_OLDEST、DROP_LAST)

SharedFlow 默認容量 capacity 為 0,重放 replay 為 0,緩存溢出策略是 SUSPEND,發射數據時已註冊的訂閱者會收到數據,但數據會立刻丟棄,而新的訂閱者不會收到歷史發射過的數據。

為什麼我們可以把 SharedFlow 理解為 「高配版」 LiveData,拿 SharedFlow 和 LiveData 做個簡單的對比就知道了:

容量問題:LiveData 容量固定為 1 個,而 SharedFlow 容量支持配置 0 個到 多個;

背壓問題:LiveData 無法應對背壓問題,而 SharedFlow 有緩存空間能應對背壓問題;

重放問題:LiveData 固定重放 1 個數據,而 SharedFlow 支持配置重放 0 個到多個;

線程問題:LiveData 只能在主線程訂閱,而 SharedFlow 支持在任意線程(通過協程的 Dispatcher)訂閱。

當然 SharedFlow 也並不是完勝,LiveData 能夠處理生命周期安全問題,而 SharedFlow 不行(因為 Flow 本身就不是純 Android 生態下的組件),不合理的使用會存在不必要的操作和資源浪費,以及在錯誤的狀態更新 View 的風險。不過別擔心,這個問題可以通過第 6 節的 Lifecycle API 來解決。

4.3 普通 Flow 轉換為 SharedFlow

前面提到過,冷流是不共享的,也沒有緩存機制。使用Flow.shareIn或Flow.stateIn可以把冷流轉換為熱流,一來可以將數據共享給多個訂閱者,二來可以增加緩衝機制。

Share.kt

public fun <T> Flow<T>.shareIn( // 協程作用域範圍 scope: CoroutineScope, // 啟動策略 started: SharingStarted, // 控制數據重放的個數 replay: Int = 0): SharedFlow<T> { val config = configureSharing(replay) val shared = MutableSharedFlow<T>( replay = replay, extraBufferCapacity = config.extraBufferCapacity, onBufferOverflow = config.onBufferOverflow ) @Suppress("UNCHECKED_CAST") scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T) return shared.asSharedFlow()}public companion object { // 熱啟動式:立即開始,並在 scope 指定的作用域結束時終止 public val Eagerly: SharingStarted = StartedEagerly() // 懶啟動式:在註冊首個訂閱者時開始,並在 scope 指定的作用域結束時終止 public val Lazily: SharingStarted = StartedLazily() public fun WhileSubscribed( stopTimeoutMillis: Long = 0, replayExpirationMillis: Long = Long.MAX_VALUE ): SharingStarted = StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)}

sharedIn 的參數 scope 和 replay 不需要過多解釋,主要介紹下 started: SharingStarted 啟動策略,分為三種:

Eagerly(熱啟動式):立即啟動數據流,並保持數據流(直到 scope 指定的作用域結束);

Lazily(懶啟動式):在首個訂閱者註冊時啟動,並保持數據流(直到 scope 指定的作用域結束);

WhileSubscribed():在首個訂閱者註冊時啟動,並保持數據流直到在最後一個訂閱者註銷時結束(或直到 scope 指定的作用域結束)。通過 WhildSubscribed() 策略能夠在沒有訂閱者的時候及時停止數據流,避免引起不必要的資源浪費,例如一直從數據庫、傳感器中讀取數據。

whileSubscribed() 還提供了兩個配置參數:

stopTimeoutMillis 超時時間(毫秒):最後一個訂閱者註銷訂閱後,保留數據流的超時時間,默認值 0 表示立刻停止。這個參數能夠幫助防抖,避免訂閱者臨時短時間註銷就馬上關閉數據流。例如希望等待 5 秒後沒有訂閱者則停止數據流,可以使用 whileSubscribed(5000)。

replayExpirationMillis 重放過期時間(毫秒):停止數據流後,保留重放數據的超時時間,默認值 Long.MAX_VALUE 表示永久保存(replayExpirationMillis 發生在停止數據流後,說明 replayExpirationMillis 時間是在 stopTimeoutMillis 之後發生的)。例如希望希望等待 5 秒後停止數據流,再等待 5 秒後的數據視為無用的陳舊數據,可以使用 whileSubscribed(5000, 5000)。

Flow#shareIn

https://link.juejin.cn/?target=https%3A%2F%2Fkotlin.github.io%2Fkotlinx.coroutines%2Fkotlinx-coroutines-core%2Fkotlinx.coroutines.flow%2Fshare-in.html

Flow#stateIn

https://link.juejin.cn/?target=https%3A%2F%2Fkotlin.github.io%2Fkotlinx.coroutines%2Fkotlinx-coroutines-core%2Fkotlinx.coroutines.flow%2Fstate-in.html

5. StateFlow —— LiveData 的替代品

StateFlow 是 SharedFlow 的子接口,可以理解為一個特殊的 SharedFlow。不過它們的繼承關係只是接口上有繼承關係,內部的實現類SharedFlowImpl和StateFlowImpl其實是分開的,這裡要留個印象就好。

5.1 StateFlow 與 MutableStateFlow 接口

這裡先放出這兩個接口方便查看:

public interface StateFlow<out T> : SharedFlow<T> { // 當前值 public val value: T}public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> { // 當前值 public override var value: T // 比較並設置(通過 equals 對比,如果值發生真實變化返回 true) public fun compareAndSet(expect: T, update: T): Boolean}5.2 構造一個 StateFlow

StateFlow 的構造函數就簡單多了,有且僅有一個必選的參數,代表初始值:

public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)5.3 特殊的 SharedFlow

StateFlow 是 SharedFlow 的一種特殊配置,MutableStateFlow(initialValue) 這樣一行代碼本質上和下面使用 SharedFlow 的方式是完全相同的:

val shared = MutableSharedFlow( replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)shared.tryEmit(initialValue) // emit the initial valueval state = shared.distinctUntilChanged() // get StateFlow-like behavior

有初始值:StateFlow 初始化時必須傳入初始值;

容量為 1:StateFlow 只會保存一個值;

重放為 1:StateFlow 會向新訂閱者重放最新的值;

不支持 resetReplayCache() 重置重放緩存:StateFlow 的 resetReplayCache() 方法拋出UnsupportedOperationException

緩存溢出策略為 DROP_OLDEST:意味着每次發射的新數據會覆蓋舊數據;

總的來說,StateFlow 要求傳入初始值,並且僅支持保存一個最新的數據,會向新訂閱者會重放一次最新值,也不允許重置重放緩存。說 StateFlow 是 LiveData 的替代品一點不為過。除此之外,StateFlow 還額外支持一些特性:

數據防抖:意味着僅在更新值並且發生變化才會回調,如果更新值沒有變化不會回調 collect,其實就是在發射數據時加了一層攔截:

StateFlow.kt

public override var value: T get() = NULL.unbox(_state.value) set(value) { updateState(null, value ?: NULL) }override fun compareAndSet(expect: T, update: T): Boolean = updateState(expect ?: NULL, update ?: NULL)private fun updateState(expectedState: Any?, newState: Any): Boolean { var curSequence = 0 var curSlots: Array<StateFlowSlot?>? = this.slots // benign race, we will not use it synchronized(this) { val oldState = _state.value if (expectedState != null && oldState != expectedState) return false // CAS support if (oldState == newState) return true // 如果新值 equals 舊值則攔截, 但 CAS 返回 true _state.value = newState ... return true }}

CAS 操作:原子性的比較與設置操作,只有在舊值與 expect 相同時返回 ture。

5.4 普通 Flow 轉換為 StateFlow

跟 SharedFlow 一樣,普通 Flow 也可以轉換為 StateFlow:

Share.kt

public fun <T> Flow<T>.stateIn( // 共享開始時所在的協程作用域範圍 scope: CoroutineScope, // 共享開始策略 started: SharingStarted, // 初始值 initialValue: T): StateFlow<T> { val config = configureSharing(1) val state = MutableStateFlow(initialValue) scope.launchSharing(config.context, config.upstream, state, started, initialValue) return state.asStateFlow()}6. 安全地觀察 Flow 數據流

前面也提到了,Flow 不具備 LiveData 的生命周期感知能力,所以訂閱者在監聽 Flow 數據流時,會存在生命周期安全的問題。Google 推薦的做法是使用Lifecycle#repeatOnLifecycleAPI:

// 從 2.4.0 開始支持 Lifecycle#repeatOnLifecycle APIimplementation "androidx.lifecycle:lifecycle-runtime-ktx:2.4.1"

LifecycleOwner#addRepeatingJob:在生命周期到達指定狀態時,自動創建並啟動協程執行代碼塊,在生命周期低於該狀態時,自動取消協程。因為 addRepeatingJob 不是掛起函數,所以不遵循結構化並發的規則。目前已經廢棄,被下面的 repeatOnLifecycle() 替代了(廢棄 addRepeatingJob 的考量見設計 repeatOnLifecycle API 背後的故事);

Lifecycle#repeatOnLifecycle:repeatOnLifecycle 的作用相同,區別在於它是一個 suspend 函數,需要在協程中執行;

Flow#flowWithLifecycle:Flow#flowWithLifecycle 的作用相同,內部基於 repeatOnLifecycle API。

class LocationActivity : AppCompatActivity() { override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) lifecycleOwner.addRepeatingJob(Lifecycle.State.STARTED) { locationProvider.locationFlow().collect { // update UI } } }}class LocationActivity : AppCompatActivity() { override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) // repeatOnLifecycle 是 suspends 函數,所以需要在協程中執行 lifecycleScope.launch { // 當 lifecycleScope 的生命周期高於 STARTED 狀態時,啟動一個新的協程並執行代碼塊 // 當 lifecycleScope 的生命周期低於 STARTED 狀態時,取消該協程 repeatOnLifecycle(Lifecycle.State.STARTED) { // 當前生命周期一定高於 STARTED 狀態,可以安全地從數據流中取數據,並更新 View locationProvider.locationFlow().collect { // update UI } } // 結構化並發:生命周期處於 DESTROYED 狀態時,切換回調用 repeatOnLifecycle 的協程繼續執行 } }}class LocationActivity : AppCompatActivity() { override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) locationProvider.locationFlow() .flowWithLifecycle(this, Lifecycle.State.STARTED) .onEach { // update UI } .launchIn(lifecycleScope) }}

如果不使用Lifecycle#repeatOnLifecycleAPI,具體會出現什麼問題呢?

Activity.lifecycleScope.launch:立即啟動協程,並在 Activity 銷毀時取消協程;

Fragment.lifecycleScope.launch:立即啟動協程,並在 Fragment 銷毀時取消協程;

Fragment.viewLifecycleOwner.lifecycleScope.launch:立即啟動協程,並在 Fragment 中視圖銷毀時取消協程。

可以看到,這些協程 API 只有在最後組件 / 視圖銷毀時才會取消協程,當視圖進入後台時協程並不會被取消,Flow 會持續生產數據,並且會觸發更新視圖。

LifecycleContinueScope.launchWhenX:在生命周期到達指定狀態時立即啟動協程執行代碼塊,在生命周期低於該狀態時掛起(而不是取消)協程,在生命周期重新高於指定狀態時,自動恢復該協程。

可以看到,這些協程 API 在視圖離開某個狀態時會掛起協程,能夠避免更新視圖。但是 Flow 會持續生產數據,也會產生一些不必要的操作和資源消耗(CPU 和內存)。雖然可以在視圖進入後台時手動取消協程,但很明顯增寫了模板代碼,沒有 repeatOnLifecycle API 來得簡潔。

class LocationActivity : AppCompatActivity() { // 協程控制器 private var locationUpdatesJob: Job? = null override fun onStart() { super.onStart() locationUpdatesJob = lifecycleScope.launch { locationProvider.locationFlow().collect { // update UI } } } override fun onStop() { // 在視圖進入後台時取消協程 locationUpdatesJob?.cancel() super.onStop() }}

回過頭來看,repeatOnLifecycle 是怎麼實現生命周期感知的呢?其實很簡單,是通過 Lifecycle#addObserver 來監聽生命周期變化:

RepeatOnLifecycle.kt

suspendCancellableCoroutine<Unit> { cont -> // Lifecycle observers that executes `block` when the lifecycle reaches certain state, and // cancels when it falls below that state. val startWorkEvent = Lifecycle.Event.upTo(state) val cancelWorkEvent = Lifecycle.Event.downFrom(state) val mutex = Mutex() observer = LifecycleEventObserver { _, event -> if (event == startWorkEvent) { // Launch the repeating work preserving the calling context launchedJob = this@coroutineScope.launch { // Mutex makes invocations run serially, // coroutineScope ensures all child coroutines finish mutex.withLock { coroutineScope { block() } } } return@LifecycleEventObserver } if (event == cancelWorkEvent) { launchedJob?.cancel() launchedJob = null } if (event == Lifecycle.Event.ON_DESTROY) { cont.resume(Unit) } } this@repeatOnLifecycle.addObserver(observer as LifecycleEventObserver)}

設計 repeatOnLifecycle API 背後的故事

http://events.jianshu.io/p/88008aa77550

Lifecycle#repeatOnLifecycle

https://developer.android.google.cn/reference/androidx/lifecycle/RepeatOnLifecycleKt

Flow#flowWithLifecycle

https://developer.android.google.cn/reference/androidx/lifecycle/FlowExtKt

7. Channel 通道

在協程的基礎能力上使用數據流,除了上文提到到 Flow API,還有一個Channel API。Channel 是 Kotlin 中實現跨協程數據傳輸的數據結構,類似於 Java 中的 BlockQueue 阻塞隊列。不同之處在於 BlockQueue 會阻塞線程,而 Channel 是掛起線程。Google 的建議是優先使用 Flow 而不是 Channel,主要原因是 Flow 會更自動地關閉數據流,而一旦 Channel 沒有正常關閉,則容易造成資源泄漏。此外,Flow 相較於 Channel 提供了更明確的約束和操作符,更靈活。

Channel 主要的操作如下:

創建 Channel:通過 Channel(Channel.UNLIMITED) 創建一個 Channel 對象,或者直接使用 produce{} 創建一個生產者協程;

關閉 Channel:Channel#close();

發送數據:Channel#send() 往 Channel 中發送一個數據,在 Channel 容量不足時 send() 操作會掛起,Channel 默認容量 capacity 是 1;

接收數據:通過 Channel#receive() 從 Channel 中取出一個數據,或者直接通過 actor 創建一個消費者協程,在 Channel 中數據不足時 receive() 操作會掛起。

廣播通道 BroadcastChannel(廢棄,使用 SharedFlow):普通 Channel 中一個數據只會被一個消費端接收,而 BroadcastChannel 允許多個消費端接收。

public fun <E> Channel( // 緩衝區容量,當超出容量時會觸發 onBufferOverflow 拒絕策略 capacity: Int = RENDEZVOUS, // 緩衝區溢出策略,默認為掛起,還有 DROP_OLDEST 和 DROP_LATEST onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, // 處理元素未能成功送達處理的情況,如訂閱者被取消或者拋異常 onUndeliveredElement: ((E) -> Unit)? = null): Channel<E>

ChannelAPI

https://kotlinlang.org/docs/channels.html

Google的建議

https://juejin.cn/post/6844904153181847566#heading-0

8. 淺嘗一下

到這裡,LiveData、Flow 和 Channel 我們都講了一遍了,實際場景中怎麼使用呢,淺嘗一下。

事件(Event):事件是一次有效的,新訂閱者不應該收到舊的事件,因此事件數據適合用 SharedFlow(replay=0);

狀態(State):狀態是可以恢復的,新訂閱者允許收到舊的狀態數據,因此狀態數據適合用 StateFlow。

示例代碼如下,不熟悉 MVI 模式的同學可以移步:Android UI 架構演進:從 MVC 到 MVP、MVVM、MVI

BaseViewModel.kt

interface UiStateinterface UiEventinterface UiEffectabstract class BaseViewModel<State : UiState, Event : UiEvent, Effect : UiEffect> : ViewModel() { // 初始狀態 private val initialState: State by lazy { createInitialState() } // 頁面需要的狀態,對應於 MVI 模式的 ViewState private val _uiState = MutableStateFlow<State>(initialState) // 對外接口使用不可變版本 val uiState = _uiState.asStateFlow() // 頁面狀態變更的 「副作用」,類似一次性事件,不需要重放的狀態變更(例如 Toast) private val _effect = MutableSharedFlow<Effect>() // 對外接口使用不可變版本 val effect = _effect.asSharedFlow() // 頁面的事件操作,對應於 MVI 模式的 Intent private val _event = MutableSharedFlow<Event>() init { viewModelScope.launch { _event.collect { handleEvent(it) } } } // 初始狀態 protected abstract fun createInitialState(): State // 事件處理 protected abstract fun handleEvent(event: Event) /** * 事件入口 */ fun sendEvent(event: Event) { viewModelScope.launch { _event.emit(event) } } /** * 狀態變更 */ protected fun setState(newState: State) { _uiState.value = newState } /** * 副作用 */ protected fun setEffect(effect: Effect) { _effect.send(effect) }}參考資料

協程Flow 最佳實踐 |基於 Android 開發者峰會應用—— Android 官方文檔

https://juejin.cn/post/6844904153181847566

設計 repeatOnLifecycleAPI 背後的故事—— Android 官方文檔

http://events.jianshu.io/p/88008aa77550

使用更為安全的方式收集Android UI 數據流—— Android 官方文檔

https://mp.weixin.qq.com/s?__biz=Mzk0NDIwMTExNw%3D%3D&idx=1&mid=2247494116&scene=21&sn=6bd12ff9d62eb2a71fa74060afcac996

Flow 操作符shareIn 和 stateIn 使用須知—— Android 官方文檔

https://juejin.cn/post/6998066384290709518

從 LiveData遷移到 Kotlin 數據流—— Android 官方文檔

https://juejin.cn/post/6979008878029570055

用 Kotlin Flow 解決開發中的痛點—— 都梁人 著

https://mp.weixin.qq.com/s/q-j8FRj0LeBlWcWd3Egz_g

抽絲剝繭Kotlin -協程中繞不過的Flow —— 九心 著

https://juejin.cn/post/6914802148614242312

Kotlin flow實踐總結!—— 入魔的冬瓜 著

https://mp.weixin.qq.com/s/VxTeiyU0CtH7v-vFVed0Bw

Android—kotlin-Channel超詳細講解—— hqk 著

https://juejin.cn/post/7041835887897870373

END


往期推薦



這兩年,我打造了一份具備競爭壁壘的 Android 跨平台 通關秘籍

貝塞爾Loading——化學風暴

從原理到實戰,全面總結 Android HTTPS 抓包

餓了麼絲滑無縫過度搜索欄的實現

Android Deep Link 深度鏈接,看看你在第幾層?

點擊下方卡片關注JsonChao,為你構建一套

大廠青睞的 T 型人才系統


▲點擊上方卡片關注JsonChao,構建一套

大廠青睞的 T 型人才知識體系

歡迎把文章分享到朋友圈

大廠T 型人才成長社群

arrow
arrow
    全站熱搜
    創作者介紹
    創作者 鑽石舞台 的頭像
    鑽石舞台

    鑽石舞台

    鑽石舞台 發表在 痞客邦 留言(0) 人氣()