在 MVI 模式中,数据的一致性和完整性主要依赖于 单向数据流(Unidirectional Data Flow, UDF)、不可变性(Immutability) 和 单一数据源(Single Source of Truth) 这三个核心原则。以下是具体的实现策略和最佳实践:
1. 核心原则:单一数据源(Single Source of Truth)
定义:应用中的所有数据都应来自一个统一的、权威的数据源,通常是 Model 层的 对象。如何确保:
State
View 不直接修改数据:View 只能通过发送 来触发数据变化,而不能直接修改
Intent 或数据源。Model 层集中管理状态:
State 的更新只能由 Model 层(通常是
State 或
ViewModel)完成,并且所有数据操作(如持久化、网络请求)都必须经过 Model 层。禁止直接访问数据源:View 和 ViewModel 不应直接访问数据库、SharedPreferences 等持久化存储,而应通过
Use Case 层进行操作。
Repository
示例:
// 正确:View 通过发送 Intent 触发数据保存
button.setOnClickListener {
viewModel.onIntent(SaveUserIntent(user))
}
// 错误:View 直接调用 Repository 保存数据
button.setOnClickListener {
userRepository.saveUser(user) // 违反单一数据源原则
}
2. 不可变性(Immutability)
定义: 对象和数据模型(如
State、
User)一旦创建,就不能被修改。任何状态变化都必须生成一个新的对象。如何确保:
Todo
使用不可变数据类:在 Kotlin 中,使用 并将所有属性声明为
data class。禁止在 View 或 ViewModel 中修改 State:
val 的更新只能通过
State 方法创建新实例。使用不可变集合:使用
copy()、
List 等不可变集合(避免使用
Map、
MutableList)。
MutableMap
示例:
// 不可变状态类
data class AppState(
val users: List<User> = emptyList(), // 不可变集合
val isLoading: Boolean = false,
val error: String? = null
)
// 正确:通过 copy() 创建新状态
val newState = currentState.copy(
users = currentState.users + newUser, // 生成新列表
isLoading = false
)
// 错误:直接修改状态
currentState.users.add(newUser) // 编译错误(List 是不可变的)
currentState.isLoading = false // 编译错误(val 属性不可修改)
3. 单向数据流(UDF)
定义:数据在应用中的流动方向是固定的,形成一个闭环:
View → Intent → ViewModel → Model → State → View
如何确保:
View 只发送 Intent:View 捕获用户操作(如点击按钮、输入文本),将其封装为 并发送给 ViewModel。ViewModel 处理 Intent:ViewModel 接收
Intent,调用相应的业务逻辑(如
Intent 或
Use Case),并更新
Repository。Model 层执行操作:
State 或
Use Case 负责执行具体的数据操作(如网络请求、持久化),并返回结果给 ViewModel。State 变化通知 View:ViewModel 将更新后的
Repository 发送给 View,View 根据新
State 渲染 UI。
State
示例:
// 1. View 发送 Intent
class UserActivity : AppCompatActivity() {
private val viewModel: UserViewModel by viewModels()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
saveButton.setOnClickListener {
val user = User(name = nameInput.text.toString())
viewModel.onIntent(SaveUserIntent(user)) // 发送 Intent
}
}
}
// 2. ViewModel 处理 Intent 并更新 State
class UserViewModel(private val userUseCase: UserUseCase) : ViewModel() {
private val _state = MutableStateFlow(AppState())
val state: StateFlow<AppState> = _state.asStateFlow()
fun onIntent(intent: UserIntent) {
viewModelScope.launch {
when (intent) {
is SaveUserIntent -> saveUser(intent.user)
}
}
}
private suspend fun saveUser(user: User) {
_state.update { it.copy(isLoading = true) } // 加载中状态
try {
userUseCase.saveUser(user) // 调用业务逻辑
val updatedUsers = userUseCase.getUsers() // 获取更新后的数据
_state.update { it.copy(users = updatedUsers, isLoading = false) } // 更新状态
} catch (e: Exception) {
_state.update { it.copy(error = e.message, isLoading = false) } // 错误状态
}
}
}
// 3. Use Case 执行业务逻辑
class UserUseCase(private val userRepository: UserRepository) {
suspend fun saveUser(user: User) {
userRepository.saveUser(user) // 调用 Repository
}
suspend fun getUsers(): List<User> {
return userRepository.getUsers() // 调用 Repository
}
}
// 4. Repository 执行持久化操作
class UserRepositoryImpl(private val localDataSource: LocalDataSource) : UserRepository {
override suspend fun saveUser(user: User) {
localDataSource.saveUser(user) // 调用本地数据源
}
override suspend fun getUsers(): List<User> {
return localDataSource.getUsers() // 调用本地数据源
}
}
4. 错误处理与重试机制
定义:在数据操作(如网络请求、数据库读写)过程中,必须处理可能出现的错误(如网络异常、数据库错误),并确保状态的一致性。如何确保:
统一错误处理:在 或
Use Case 中捕获异常,并将错误信息传递给 ViewModel。状态中包含错误信息:
Repository 对象应包含
State 字段,用于存储错误信息。提供重试机制:View 可以根据错误状态显示重试按钮,用户点击后重新发送
error。
Intent
示例:
// State 中包含错误信息
data class AppState(
val users: List<User> = emptyList(),
val isLoading: Boolean = false,
val error: String? = null // 错误信息
)
// Use Case 中处理错误
class UserUseCase(private val userRepository: UserRepository) {
suspend fun saveUser(user: User) {
try {
userRepository.saveUser(user)
} catch (e: IOException) {
throw Exception("网络异常,请检查网络连接")
} catch (e: SQLException) {
throw Exception("数据库错误,请稍后重试")
}
}
}
// ViewModel 中更新错误状态
private suspend fun saveUser(user: User) {
_state.update { it.copy(isLoading = true, error = null) }
try {
userUseCase.saveUser(user)
val updatedUsers = userUseCase.getUsers()
_state.update { it.copy(users = updatedUsers, isLoading = false) }
} catch (e: Exception) {
_state.update { it.copy(error = e.message, isLoading = false) } // 更新错误状态
}
}
// View 中显示错误并提供重试机制
class UserActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
lifecycleScope.launchWhenStarted {
viewModel.state.collect { state ->
if (state.error != null) {
errorTextView.text = state.error
retryButton.visibility = View.VISIBLE
} else {
errorTextView.text = ""
retryButton.visibility = View.GONE
}
}
}
retryButton.setOnClickListener {
val user = User(name = nameInput.text.toString())
viewModel.onIntent(SaveUserIntent(user)) // 重试
}
}
}
5. 事务管理(适用于复杂操作)
定义:对于需要原子性执行的多个数据操作(如同时保存用户和更新用户计数),应使用事务管理,确保要么所有操作都成功,要么都失败。如何确保:
数据库事务:如果使用 Room 数据库,可以使用 注解来标记事务方法。业务事务:在
@Transaction 中手动管理事务,确保多个操作的原子性。
Use Case
示例(Room 事务):
// DAO 中定义事务方法
@Dao
interface UserDao {
@Insert(onConflict = OnConflictStrategy.REPLACE)
suspend fun insert(user: UserEntity)
@Query("UPDATE stats SET user_count = user_count + 1")
suspend fun incrementUserCount()
@Transaction // 标记为事务
suspend fun insertUserAndIncrementCount(user: UserEntity) {
insert(user)
incrementUserCount()
}
}
// Repository 中调用事务方法
class UserRepositoryImpl(private val userDao: UserDao) : UserRepository {
override suspend fun saveUser(user: User) {
userDao.insertUserAndIncrementCount(user.toEntity()) // 调用事务方法
}
}
6. 状态恢复与持久化
定义:在应用生命周期变化(如屏幕旋转、应用后台)时,应确保状态不丢失,并能从持久化存储中恢复。如何确保:
状态持久化:将关键状态(如用户列表、当前选中项)保存到持久化存储(如 Room、DataStore)中。状态恢复:在应用启动或 ViewModel 初始化时,从持久化存储中加载状态。
示例:
// ViewModel 初始化时从 Repository 加载状态
class UserViewModel(private val userUseCase: UserUseCase) : ViewModel() {
private val _state = MutableStateFlow(AppState())
val state: StateFlow<AppState> = _state.asStateFlow()
init {
viewModelScope.launch {
loadUsers() // 初始化时加载数据
}
}
private suspend fun loadUsers() {
_state.update { it.copy(isLoading = true) }
try {
val users = userUseCase.getUsers() // 从 Repository 加载数据
_state.update { it.copy(users = users, isLoading = false) }
} catch (e: Exception) {
_state.update { it.copy(error = e.message, isLoading = false) }
}
}
}
7. 测试与验证
定义:通过单元测试、集成测试验证数据的一致性和完整性。如何确保:
单元测试:测试 和
Use Case 的逻辑,确保数据操作的正确性。集成测试:测试整个数据流(View → Intent → ViewModel → Model → State),确保状态更新的一致性。
Repository
示例(单元测试):
// 测试 UserUseCase
class UserUseCaseTest {
private val mockUserRepository = mock<UserRepository>()
private val userUseCase = UserUseCase(mockUserRepository)
@Test
fun `saveUser should call repository saveUser`() = runTest {
val user = User("1", "John", "john@example.com")
userUseCase.saveUser(user)
verify(mockUserRepository).saveUser(user) // 验证调用了 Repository 的 saveUser 方法
}
}
总结
在 MVI 模式中,确保数据的一致性和完整性需要遵循以下核心策略:
单一数据源:所有数据操作都经过 Model 层,避免直接访问数据源。不可变性: 和数据模型不可修改,状态更新通过创建新对象实现。单向数据流:数据流动方向固定,形成闭环。错误处理:统一处理异常,确保状态的一致性。事务管理:对于复杂操作,使用事务确保原子性。状态恢复:通过持久化存储确保状态在生命周期变化时不丢失。测试验证:通过测试确保数据操作的正确性。
State
通过以上策略,可以构建一个数据一致性高、可维护性强的 MVI 应用。
在MVI模式中,如何处理数据的并发更新?
在 MVI 模式中处理数据的并发更新,核心思路是 “单一可信数据源 + 不可变状态 + 原子化状态更新”,同时结合协程、Flow 等工具确保并发安全。以下是具体的解决方案和实践:
一、核心原则:并发更新的 “安全基石”
单一数据源(Single Source of Truth)所有数据更新都必须通过 Model 层的 管理,禁止 View 或其他组件直接修改数据,避免并发修改冲突。不可变状态(Immutability)
State 对象及其内部数据(如列表、实体)必须是不可变的。任何更新都需创建新的
State 实例,确保并发场景下状态的一致性。原子化更新(Atomic Update)多个并发操作对同一状态的更新必须是原子的,避免部分更新导致的状态不一致(如同时添加和删除列表项)。
State
二、具体解决方案
1. 使用协程 +
StateFlow 确保并发安全
StateFlow
是 Kotlin 协程中用于状态管理的核心组件,其
StateFlow 方法(或
update)是原子操作,能有效处理并发更新。
compareAndSet
关键特性:
:以原子方式更新状态,内部通过 CAS(Compare-And-Set)机制确保线程安全。
StateFlow.update 是冷流,仅在有观察者时发射数据,避免不必要的计算。
StateFlow
示例:并发更新用户列表
// 不可变状态类
data class UserState(
val users: List<User> = emptyList(), // 不可变列表
val isLoading: Boolean = false,
val error: String? = null
)
// ViewModel 中使用 StateFlow 管理状态
class UserViewModel(
private val userRepository: UserRepository
) : ViewModel() {
private val _state = MutableStateFlow(UserState())
val state: StateFlow<UserState> = _state.asStateFlow()
// 并发添加用户(原子更新)
fun addUser(user: User) {
viewModelScope.launch {
_state.update { currentState ->
// 原子操作:基于当前状态创建新状态
currentState.copy(
users = currentState.users + user, // 不可变列表添加新元素(生成新列表)
isLoading = false
)
}
}
}
// 并发删除用户(原子更新)
fun deleteUser(userId: String) {
viewModelScope.launch {
_state.update { currentState ->
currentState.copy(
users = currentState.users.filter { it.id != userId }, // 生成新列表
isLoading = false
)
}
}
}
}
为什么安全? 会获取当前最新的状态,基于该状态生成新状态,整个过程是原子的。即使多个协程同时调用
_state.update 或
addUser,
deleteUser 也会确保每个更新都基于最新的状态,避免数据覆盖。
StateFlow
2. 使用
Mutex 处理复杂并发逻辑
Mutex
当并发更新涉及多个步骤(如 “先查询再更新”)时,仅靠 可能不够,需使用
StateFlow.update(互斥锁)确保临界区代码的原子执行。
Mutex
适用场景:
并发更新依赖前一个状态的计算(如 “用户点击点赞,需先判断是否已点赞”)。多个网络请求同时返回,需按顺序更新状态。
示例:并发点赞功能(避免重复点赞)
class PostViewModel(
private val postRepository: PostRepository
) : ViewModel() {
private val _state = MutableStateFlow(PostState())
val state: StateFlow<PostState> = _state.asStateFlow()
// 创建互斥锁
private val mutex = Mutex()
// 并发点赞(原子操作)
fun likePost(postId: String) {
viewModelScope.launch {
// 锁定临界区,确保同一时间只有一个协程执行
mutex.withLock {
val currentState = _state.value
val targetPost = currentState.posts.find { it.id == postId } ?: return@launch
// 避免重复点赞
if (targetPost.isLiked) return@launch
// 1. 调用Repository更新服务器数据
val updatedPost = postRepository.likePost(postId)
// 2. 原子更新本地状态
_state.update { state ->
state.copy(
posts = state.posts.map { p ->
if (p.id == postId) updatedPost else p
}
)
}
}
}
}
}
为什么安全? 会确保临界区内的代码(从 “查询当前状态” 到 “更新状态”)是原子执行的,即使多个协程同时调用
mutex.withLock,也不会出现 “同时点赞” 的情况。
likePost
3. 使用
Channel 序列化并发请求
Channel
当需要处理大量并发请求(如用户快速点击按钮)时,可使用 将请求序列化,避免并发冲突。
Channel
适用场景:
用户快速触发同一操作(如快速点击 “加载更多”)。多个网络请求需要按顺序处理(如分页加载数据)。
示例:序列化分页加载请求
class PagingViewModel(
private val pagingRepository: PagingRepository
) : ViewModel() {
private val _state = MutableStateFlow(PagingState())
val state: StateFlow<PagingState> = _state.asStateFlow()
// 创建 Channel 接收加载请求
private val loadChannel = Channel<Int>(Channel.UNLIMITED)
init {
// 启动协程消费 Channel 中的请求(序列化处理)
viewModelScope.launch {
loadChannel.consumeEach { page ->
loadPage(page)
}
}
}
// 触发加载下一页(发送请求到 Channel)
fun loadNextPage() {
val currentPage = _state.value.currentPage
loadChannel.trySend(currentPage + 1) // 非阻塞发送
}
// 实际加载数据的逻辑
private suspend fun loadPage(page: Int) {
val currentState = _state.value
if (currentState.isLoading || currentState.hasMoreData.not()) return
_state.update { it.copy(isLoading = true) }
try {
val newData = pagingRepository.loadPage(page)
_state.update { state ->
state.copy(
data = state.data + newData,
currentPage = page,
hasMoreData = newData.isNotEmpty(),
isLoading = false
)
}
} catch (e: Exception) {
_state.update { it.copy(error = e.message, isLoading = false) }
}
}
}
为什么安全? 会按发送顺序处理请求,即使用户快速点击 “加载更多”,
Channel.consumeEach 也会按页号顺序执行,避免 “同时加载同一页” 或 “加载顺序混乱” 的问题。
loadPage
4. 使用数据库事务处理持久化并发
当数据更新需要同步到本地数据库(如 Room)时,需使用数据库事务确保持久化操作的原子性。
示例:Room 事务处理并发更新
// DAO 中定义事务方法
@Dao
interface UserDao {
@Insert(onConflict = OnConflictStrategy.REPLACE)
suspend fun insert(user: UserEntity)
@Delete
suspend fun delete(user: UserEntity)
// 标记为事务,确保原子执行
@Transaction
suspend fun updateUser(oldUser: UserEntity, newUser: UserEntity) {
delete(oldUser)
insert(newUser)
}
}
// Repository 中调用事务方法
class UserRepositoryImpl(
private val userDao: UserDao
) : UserRepository {
override suspend fun updateUser(oldUser: User, newUser: User) {
userDao.updateUser(oldUser.toEntity(), newUser.toEntity())
}
}
为什么安全?Room 的 注解会确保方法内的所有操作(删除旧用户 + 插入新用户)要么全部成功,要么全部失败,避免数据库处于 “部分更新” 的不一致状态。
@Transaction
三、总结:并发更新的 “最佳实践组合”
| 场景 | 推荐方案 | 核心优势 |
|---|---|---|
| 简单并发状态更新(如列表增删) | |
轻量、原子、无额外依赖 |
| 复杂并发逻辑(如依赖前状态的更新) | |
确保临界区原子执行,避免竞态条件 |
| 大量并发请求(如快速点击) | 序列化 |
按顺序处理请求,避免重复操作 |
| 持久化数据更新(如数据库操作) | 数据库事务(Room) | 确保持久化操作的原子性和一致性 |
通过以上方案,可在 MVI 模式中优雅处理数据的并发更新,确保状态的一致性和安全性。