Реактивные потоки в Kotlin: SharedFlow и StateFlow

Туториалы

Реактивные потоки в Kotlin: SharedFlow и StateFlow

В этой статье вы узнаете о реактивных потоках в Kotlin и напишите приложение, используя два типа потоков: SharedFlow и StateFlow.

Потоки событий стали стандартом для Android. В течение многих лет RxJava был стандартом для реактивных потоков. Теперь Kotlin предоставляет собственную реализацию реактивных потоков под названием Flow. Как и RxJava, Kotlin Flow может создавать потоки данных и реагировать на них. Также как и RxJava, потоки событий могут поступать от холодных или горячих издателей. Разница между ними проста: хотя холодные потоки генерируют события только при наличии подписчиков, горячие потоки могут генерировать новые события, даже если на них не реагируют никакие подписчики. В этом туториале вы узнаете о реализациях горячего потока Flow, которые называются SharedFlow и StateFlow. В частности, вы узнаете:

  • Что такое SharedFlow
  • Что такое StateFlow и как он связан с SharedFlow
  • Как эти горячие потоки работают в сравнении с RxJava, Channels и LiveData
  • Как использовать их в Android

Вы можете спросить себя: «Но зачем использовать Kotlin SharedFlow и StateFlow вместо RxJava?» Хотя RxJava хорошо справляется со своей задачей, некоторые любят описывать это как «использование базуки для убийства муравья». Другими словами, хотя фреймворк и работает, довольно легко увлечься всеми его возможностями. Это может привести к появлению слишком сложных решений и кода, которые будет трудно понять. Kotlin Flow предоставляет более прямые и конкретные реализации для реактивных потоков.

Начнем

Загрузите материалы проекта, нажав кнопку «Загрузить материалы».

Вы поработаете над приложением под названием CryptoStonks5000. У приложения 2 экрана: первый показывает пользователю несколько криптовалют, второй – изменение цены для криптовалюты за прошедшие 24 часа.

Для изучения общих потоков и потоков состояний вы:

  1. Имплементируете поток событий с SharedFlow, который генерирует события, общие для экранов
  2. Выполните рефакторинг CryptoStonks5000, чтобы использовать StateFlow для обработки состояния просмотра

Проект написан с использованием чистой архитектуры на паттерне MVVM 

 

Соберите и запустите проект, дабы убедиться, что все работает. После этого самое время изучать общие потоки!

SharedFlow

Перед тем, как начать кодить, важно понять, что такое SharedFlow.

Общий поток (Shared Flow) – это, по сути, Поток. Но с двумя главными отличиями от стандартной имплементации Потока. Он:

  1. Генерирует события, даже если вы не вызываете collect() на нем. В конце концов, это реализация горячего потока.
  2. Может иметь несколько подписчиков

Обратите внимание на термин «подписчики», используемый здесь вместо «сборщиков», как в обычном Flow. Это изменение именования связано с тем, что общие потоки никогда не завершаются. Другими словами, когда вы вызываете Flow.collect() в общем потоке, вы не собираете все его события. Вместо этого вы подписываетесь на события, которые генерируются, пока существует эта подписка.

Хоть это и означает, что обращение к Flow.collect() в общих потоках не завершаются нормально, подписку все же можно отменить. Как и следовало ожидать, эта отмена происходит путем отмены сопрограммы. 

Заметка

Операторы усечения потока, такие как Flow.take(count: Int), могут принудительно завершить общий поток.

Теперь с этими знаниями время кодить.

Обработка общих событий

Вы имплементируете ненастоящую систему уведомления о поддельных ценах, чтобы имитировать колебания стоимости монет. Она должна быть ненастоящей, потому что реальная слишком непостоянна. :]

Пользователи должны знать об этих вариантах независимо от того, на каком экране они находятся. Чтобы сделать это возможным, вы создадите общий поток в ViewModel, доступном для всех экранов.

В наборе presentation найдите и откройте CoinsSharedViewModel.kt.

Для начала вам нужно знать, как создать общий поток. Что ж, это ваш счастливый день, потому что вы собираетесь создать два подряд! Добавьте этот код в начало класса:

private val _sharedViewEffects = MutableSharedFlow() // 1

val sharedViewEffects = _sharedViewEffects.asSharedFlow() // 2

В этом коде:

  1. Вы вызываете MutableSharedFlow. Он создает изменяемый общий поток, который генерирует события типа SharedViewEffects, который представляет собой простой запечатанный класс для моделирования возможных событий. Обратите внимание, что это частное свойство. Вы будете использовать его внутри, чтобы генерировать события, открывая неизменяемый общий поток, чтобы сделать их видимыми извне.
  2. Создаете публичный неизменяемый общий поток, упомянутый выше, вызывая asSharedFlow() в изменяемом общем потоке. Таким образом, неизменяемое открытое свойство всегда отражает значение изменяемого частного.

Наличие этих двух свойств - хорошая практика. Это не только дает вам свободу генерировать все, что вы хотите внутри, через _sharedViewEffects, но также позволяет внешнему коду реагировать на эти изменения только путем подписки на sharedViewEffects. Таким образом, код подписки не имеет права изменять общий поток, что является изящным способом создания надежной конструкции и разделения задач, а также предотвращения ошибок изменяемости.

Генерация событий с SharedFlow

Хорошо, у вас есть потоки. Теперь необходимо с ними что-то сделать: изменение цены. CoinsSharedViewModel вызывает getPriceVariations() в его блоке init, но сам метод ничего не делает пока что.

Добавьте этот код в getPriceVariations():

viewModelScope.launch { // 1
  for (i in 1..100) { // 2
    delay(5000) // 3
    _sharedViewEffects.emit(SharedViewEffects.PriceVariation(i)) // 4
  }
}

Этот код делает несколько разных вещей. Он:

  1. Запускает сопрограмму
  2. Выполняет цикл for от 1 до 100 включительно.
  3. Задерживает сопрограмму на пять секунд. delay() проверяет отмену, поэтому он остановит цикл, если задание будет отменено.
  4. Вызывает emit на изменяемом общем потоке, передавая ему экземпляр PriceVariation, который является событием SharedViewEffects.

Этот emit(value: T) является одним из двух методов передачи событий, которые вы можете вызвать в общем потоке. Альтернативой является использование tryEmit(value: T).

Разница между ними в том, что emit — это функция приостановки, а tryEmit - нет. Эта небольшая разница приводит к огромному поведенческому контрасту между двумя методами. Однако, чтобы объяснить это, вам нужно углубиться в кеш воспроизведения и буферизацию общего потока. Пристегнитесь!

Воспроизведение и буферизация

MutableSharedFlow() принимает три параметра:

public fun  MutableSharedFlow(
  replay: Int = 0, // 1
  extraBufferCapacity: Int = 0, // 2
  onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND // 3
): MutableSharedFlow

Вот для чего они используются:

  1. replay: число значений, воспроизведенных для новых подписчиков. Не может быть отрицательным и по умолчанию равно нулю
  2. extraBufferCapacity: число буферизованных значений. Не может быть отрицательным и по умолчанию равен нулю. Сумма этого значения и replay состоит из общий буфера этого потока
  3. onBufferOverflow: действие, предпринимаемое при достижении переполнения буфера. Оно может иметь три значения: BufferOverflow.SUSPEND, BufferOverflow.DROP_OLDEST или BufferOverflow.DROP_LATEST. По умолчанию это BufferOverflow.SUSPEND.

Стандартное поведение

По началу может быть сложно понять, поэтому вот небольшая гифка возможного взаимодействия с общим потоком стандартных значений. Учтите что поток использует emit (value: T).

Идем по шагам:

  1. Этот общий поток имеет три события и двух подписчиков. Первое событие генерируется, когда еще нет подписчиков, поэтому оно теряется навсегда.
  2. К тому времени, когда общий поток генерирует второе событие, у него уже есть один подписчик, который получает это событие.
  3. Перед достижением третьего события появляется еще один подписчик, но первый приостанавливается и остается таким до достижения события. Это означает, что emit() не сможет доставить третье событие этому подписчику. Когда это происходит, у общего потока есть два варианта: он либо буферизует событие и отправляет его приостановленному подписчику, когда оно возобновляется, либо достигает переполнения буфера, если для события не осталось достаточно буфера.
  4. В этом случае общий буфер равен нулю - replay + extraBufferCapacity. Другими словами, переполнение буфера. Поскольку onBufferOverflow установлен с BufferOverflow.SUSPEND, поток будет приостановлен до тех пор, пока он не сможет доставить событие всем подписчикам.
  5. Когда подписчик возобновляет работу, также возобновляется поток, доставляя событие всем подписчикам и продолжая свою работу.

Заметка

Спецификация общего потока запрещает вам использовать что-либо, кроме onBufferOverflow = BufferOverflow.SUSPEND, когда общее значение буфера равно нулю. Поскольку tryEmit(value: T) не приостанавливается, он не будет работать, если вы используете его со значениями replay и extraBufferCapacity по умолчанию. Другими словами, единственный способ генерировать события с помощью tryEmit(value: T) - иметь, по крайней мере, общий буфер, равный единице.

С повтором

Окей, это было неплохо. Что случится при наличии буфера? Вот пример с replay = 1:

Разобьём по пунктам:

  1. Когда общий поток достигает первого события без активных подписчиков, он больше не приостанавливается. При replay = 1 общий размер буфера теперь равен единице. Таким образом, поток буферизует первое событие и продолжает работу.
  2. Когда он достигает второго события, в буфере больше нет места, поэтому он приостанавливается.
  3. Поток остается приостановленным, пока подписчик не возобновит работу. Как только это произойдет, он получит первое событие в буфере вместе с последним вторым событием. Общий поток возобновляется, и первое событие исчезает навсегда, потому что второе теперь занимает свое место в кэше воспроизведения.
  4. Перед третьим событием появляется новый подписчик. Благодаря replay он также получает копию последнего события.
  5. Когда поток, наконец, достигает третье событие, оба подписчика получают его копию.
  6. Общий поток буферизует это третье событие, отбрасывая предыдущее. Позже, когда появляется третий подписчик, он также получает копию третьего события.

С extraBufferCapacity и onBufferOverflow

Процесс крайне похож на extraBufferCapacity, но без поведения с повторением. Этот третий пример показывает общий поток при extraBufferCapacity = 1 и onBufferOverflow = BufferOverflow.DROP_OLDEST:

В этом примере:

  1. Сначала поведение одинаково: с приостановленным подписчиком и общим размером буфера, равным единице, общий поток буферизует первое событие.
  2. Другое поведение начинается при втором выпуске события. С onBufferOverflow = BufferOverflow.DROP_OLDEST общий поток отбрасывает первое событие, буферизует второе и продолжается. Также обратите внимание, что второй подписчик не получает копию буферизованного события: помните, что этот общий поток имеет extraBufferCapacity = 1, но replay = 0.
  3. Поток в конечном итоге достигает третьего события, которое получает активный подписчик. Затем поток буферизует это событие, отбрасывая предыдущее.
  4. Вскоре после этого приостановленный подписчик возобновляет работу, инициируя общий поток для передачи ему буферизованного события и очистки буфера.

Подписка на рассылку событий

Прекрасно, мы неплохо продвинулись! Теперь вы знаете, как создать общий поток и настроить его поведение. Осталась лишь одна вещь  – подписка на этот поток.

В коде перейдите в набор coinhistory внутри presentation и откройте CoinHistoryFragment.kt. В самом верху класса объявите и проинициализируйте общий ViewModel:

private val sharedViewModel: CoinsSharedViewModel by activityViewModels { CoinsSharedViewModelFactory }

Вы хотите, чтобы общий поток генерировался независимо от того, на каком экране вы находитесь, поэтому вы не можете привязать эту ViewModel к конкретному Fragment. Вместо этого вы хотите, чтобы он был привязан к Activity, чтобы он выжил, когда вы переходите от одного Fragment к другому. Вот почему в коде используется делегат by activityViewModels. Что касается CoinsSharedViewModelFactory, не беспокойтесь об этом: каждая фабрика ViewModel в приложении уже подготовлена для правильного внедрения любых зависимостей.

Собираем SharedFlow

Теперь при наличии sharedViewModel вы можете ее использовать. Найдите subscribeToSharedViewEffects(). Подпишитесь под общий поток, добавив:

viewLifecycleOwner.lifecycleScope.launchWhenStarted { // 1
  sharedViewModel.sharedViewEffects.collect { // 2
    when (it) {
      // 3
      is SharedViewEffects.PriceVariation -> notifyOfPriceVariation(it.variation)
    }
  }
}

У этого кода есть несколько важных деталей:

  1. Область действия сопрограммы ограничена представлением, а не фрагментом. Это гарантирует, что сопрограмма жива, только пока жив View, даже если Fragment переживает его. Код создает сопрограмму с launchWhenStarted вместо наиболее распространенного launch. Таким образом, сопрограмма запускается только тогда, когда жизненный цикл находится по крайней мере в состоянии STARTED, приостанавливается, когда он находится по крайней мере в состоянии STOPPED, и отменяется, когда область видимости уничтожается. Использование запуска здесь может привести к потенциальным сбоям, поскольку сопрограмма будет продолжать обрабатывать события даже в фоновом режиме
  2. Как видно, подписка на общий поток аналогична подписке на обычный поток. Код вызывает функцию collect() в SharedFlow для подписки на новые события
  3. Подписчик реагирует на событие общего потока

Всегда помните, что даже при использовании launchWhenStarted общий поток будет продолжать генерировать события без подписчиков. Таким образом, вам всегда нужно учитывать потраченные впустую ресурсы. В этом случае код создания события довольно безвреден. Но все может измениться, если превратить холодные потоки в горячие с помощью чего-то вроде shareIn.

Примечание. Превращение холодных потоков в горячие выходит за рамки этого туториала - по правде говоря, оно заслуживает отдельного руководства. Если вам интересно, просмотрите последний раздел туториала, чтобы найти ссылки по этой теме.

Применяем данные потока во View

И снова в код: можете увидеть, что notifyOfPriceVariation() еще не создан. Добавьте:

private fun notifyOfPriceVariation(variation: Int) {
  val message = getString(R.string.price_variation_message, variation)
  showSnackbar(message)
}

Легче легкого. Соберите и запустите приложение. Теперь, когда вы перейдете на экран истории монет, вы увидите несколько периодических сообщений Snackbar внизу. Однако общий поток начнет работать только тогда, когда вы перейдете на этот экран. Даже если экземпляр CoinsSharedViewModel привязан к Activity, он создается только при первом посещении экрана истории монет.

Вы заинтересованы в том, чтобы все экраны были в курсе изменений цены, так что пока еще не идеально. Чтобы это исправить, проведите изменения в CoinListFragment:

  1. Создайте экземпляр CoinsSharedViewModel схожим образом
  2. Добавьте код в subscribeToSharedViewEffects()
  3. Создайте notifyOfPriceVariation()

Соберите и запустите приложение. Теперь вы будете видеть периодические сообщения Snackbar в CoinListFragment. При переключении экранов вы заметите, что сообщения всегда показывают следующее событие, а не предыдущие. MutableSharedFlow() в CoinsSharedViewModel использует параметры по умолчанию. Но не стесняйтесь изменить что-то, чтобы увидеть, как он влияет на общий поток!

SharedFlow и Channels

Подобно общим потокам каналы (channels) представляют собой горячие потоки. Но это не значит, что общий поток заменит API каналов. Ну, или не полностью.)

SharedFlow сделан таким образом, чтобы полностью заменить BroadcastChannel. Он не только проще и быстрее при использовании, но и более гибок, чем BroadcastChannel. Держите в уме, что, хотя другие элементы из API каналов могут и должны использоваться, когда для этого имеется смысл.

StateFlow

Поток состояния сделан по образу и подобию общего потока. Вот почему StateFlow ничего более, как специализация SharedFlow. На деле вы можете создать общий поток, который будет вести себя в точности, как поток состояния:

val shared = MutableSharedFlow(
    replay = 1,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)
shared.tryEmit(InitialState()) // emit the initial value
val state = shared.distinctUntilChanged() // get StateFlow-like behavior

Этот код создает общий поток, который создает последнее значение только новым подписчикам. Благодаря этому distinctUntilChanged сгенерирует любое значение, если оно отличается от предыдущего. Вот, что делает поток состояния, и это делает его отличным для удержания и обработки состояния.

Обработка состояния приложения

Есть более простые способы создания потоков состояния, которыми вы будете пользоваться. Раскройте набор coinlist и откройте внутри CoinListFragmentViewModel.kt. Эта простая ViewModel использует LiveData для предоставления view класса состояния CoinListFragment. Сам класс состояния также довольно прост, и у него есть значения по умолчанию, соответствующие начальному состоянию просмотра:

data class CoinListFragmentViewState(
    val loading: Boolean = true,
    val coins: List = emptyList()

Затем Fragment использует текущее состояние для обновления представления, наблюдая за LiveData:

// Code in CoinListFragment.kt
private fun observeViewStateUpdates(adapter: CoinAdapter) {
  viewModel.viewState.observe(viewLifecycleOwner) { updateUi(it, adapter) }
}

Начните рефакторинг с изменения MutableLiveData на MutableStateFlow. Таким образом, в CoinListFragmentViewModel перейдите из:

private val _viewState = MutableLiveData(CoinListFragmentViewState())

В:

private val _viewState = MutableStateFlow(CoinListFragmentViewState())

Не забудьте добавить необходимый импорт для MutableStateFlow. Вот как вы создаете изменяемый поток состояний. В отличие от общих потоков, потокам состояний требуется начальное значение или, другими словами, начальное состояние. Но, поскольку поток состояний – это конкретная реализация общего потока, у вас нет возможности настроить такие вещи, как replay или extraBufferCapacity. Несмотря на это, общие правила и ограничения для общих потоков все еще применяются.

Далее обновите неизменяемую LiveData с:

val viewState: LiveData get() = _viewState

На:

val viewState: StateFlow get() = _viewState

Конечно же, также можете добавить:

val viewState = _viewState.asStateFlow()

Добавьте импорт для StateFlow. Будь то общий поток или поток состояний, вы можете создать неизменяемый поток с обоими параметрами. Преимущество использования asStateFlow() или asSharedFlow() заключается в том, что вы получаете дополнительную безопасность за счет явного создания неизменяемой версии потока. Это позволяет избежать таких вещей, как создание другой изменяемой версии по ошибке.

Генерация событий с помощью StateFlow

Стоящая разница между общим потоком и потоком состояния – генерация событий. Вы все еще можете использовать emit и tryEmit в потоке состояния, но лучше не стоит :)

Вместо этого сделайте:

mutableState.value = newState

Причина в том, что обновления value всегда объединяются, а это означает, что даже если вы обновляете его быстрее, чем подписчики могут его использовать, они получат только самое последнее значение. Следует иметь в виду, что все, что вы присваиваете value, должно быть совершенно другим объектом, чем то, что было раньше. Например:

data class State(
  var name: String = "",
  var age: Int = -1
)

val mutableState = MutableStateFlow(State())

// ...

// newState and mutableState.value will reference the same object
val newState = mutableState.value 

// Reference is the same, so this is also changing mutableState.value!
newState.name = "Marc"

mutableState.value = newState

В этом случае поток состояния не сгенерирует новое значение. Так как ссылаемый объект тот же самый, равенство сравнения вернет true, и поток будет считать, что состояние не изменилось.

Чтобы заработало, нудно использовать immutable объекты. Например:

data class State(
  val name: String = "",
  val age: Int = -1
)

val mutableState = MutableStateFlow(State())

// ...

mutableState.value = State(name = "Marc")

Таким способом поток напрямую создаст обновление состояния. Неизменяемость сэкономила кучу времени :)

Обратимся снова к коду: клевая вещь касательно замены LiveData н StateFlow заключается в том, что и то и то используют свойство value, и ничего менять здесь не придется.

Последнее изменение необходимо сделать в CoinListFragmentViewModel, внутри метода requestCoinList(). Теперь можете обновить if условие в начале на:

if (viewState.value.coins.isNotEmpty()) return

Вам не нужен ? больше, потому что value не будет null. Кроме того, вы инвертируете условие, используя isNotEmpty() вместо isNullOrEmpty() и отбрасывая ! в начале. Это делает код более читабельным.

Если вы попытаетесь создать приложение, вы получите сообщение об ошибке CoinListFragment о том, что существует неразрешенная ссылка observe. StateFlow не имеет метода observe, поэтому вам также необходимо провести рефакторинг.

Подписка на обновления состояния

Откройте CoinListFragment.kt. Найдите там observeViewStateUpdates() и обновите на:

private fun observeViewStateUpdates(adapter: CoinAdapter) {
  viewLifecycleOwner.lifecycleScope.launchWhenStarted {
    viewModel.viewState.collect { updateUi(it, adapter) }
  }
}

Этот код очень похож на то, что вы делали с SharedFlow, в том смысле, что используется та же логика. Несмотря на это, вы можете забеспокоиться о том, что поток состояний генерирует значения, когда приложение находится в фоновом режиме. Но в этом нет необходимости. Это правда, что, поскольку он ограничен viewModelScope, он все равно будет генерировать даже без каких-либо подписчиков, пока существует ViewModel. Тем не менее, генерация потока состояний - это легковесные операции: они просто обновляют value и уведомляют всех подписчиков. Кроме того, вы, вероятно, хотите, чтобы приложение показывало вам последнее состояние пользовательского интерфейса, когда дело доходит до переднего плана.

Соберите и запустите приложение. Все должно работать как раньше, потому что вы отредактировали код. Хорошая работа по использованию StateFlow!

StateFlow и Channels

Подобно тому, как SharedFlow может полностью заменить BroadcastChannel, StateFlow может полностью заменить ConflatedBroadcastChannel. На это есть несколько причин. StateFlow проще и эффективнее, чем ConflatedBroadcastChannel. Он также лучше различает изменчивость и неизменяемость с MutableStateFlow и StateFlow.

Горячие потоки, RxJava и LiveData

Теперь вы знаете, как работают SharedFlow и StateFlow. Но полезны ли они вообще на Android?

Хотя они могут не принести ничего «нового», они предоставляют более прямые и эффективные альтернативы. Например, где бы вы ни использовали PublishSubject из RxJava, вы можете использовать SharedFlow. Или, где бы вы ни использовали BehaviorSubject, вы, вероятно, можете использовать StateFlow. Фактически, если генерация горячих событий не является проблемой, StateFlow может даже легко заменить LiveData.

Примечание. Вы также можете преобразовать объекты SharedFlow и StateFlow в LiveData с помощью библиотеки AndroidX lifecycle-liveata-ktx. Библиотека предоставляет метод расширения asLiveData(), который позволяет преобразовывать поток и предоставлять его как LiveData для использования в вашем view. Дополнительные сведения см. В разделе StateFlow, Flow и LiveData статьи StateFlow и SharedFlow для разработчиков Android.

Итак, резюмируя:

  1. Если у вас есть какое-то управление состоянием, вы можете использовать StateFlow
  2. Если у вас есть поток событий, и нет проблемы в том, что события не обрабатываются всеми возможными подписчиками или прошлые события могут не обрабатываться вовсе, вы можете использовать SharedFlow.

Challenge: использование SharedFlow для обработки экранных событий

Поздравляю! Вот и конец статьи. Если вы хотите еще попрактиковаться, также можете пообрабатывать специфичные для экрана события, смоделированные в классах CoinListFragmentViewEffects и CoinHistoryFragmentViewEffects, используя общие потоки. Это события, которые следует обрабатывать ровно один раз, что означает, что простой канал подойдет лучше - помните, что общие потоки отбрасывают события, когда нет подписчиков. Тем не менее, вы можете сделать это с помощью общих потоков для практики. Если вам интересно, последний проект в материалах проекта имеет образец реализации.

Оригинал статьи.

Комментарии

Сообщить об опечатке

Текст, который будет отправлен нашим редакторам: