Kotlin - 协程

文章目录[x]
  1. 1:阻塞请求
  2. 2:使用回调
  3. 2.1:使用Retrofit的回调API
  4. 3:使用挂起函数
  5. 4:并发
  6. 5:结构化并发
  7. 5.1:使用外部scope的Context
  8. 6:Channels

Coroutines提供了异步和非阻塞行为,但又不缺乏可读性。 使用协程执行网络请求,而不会阻塞线程,也不用使用回调。对于网络请求库,Retrofit已经支持协程。

阻塞请求

如下是使用RetrofitGithub执行HTTP请求。 它允许请求给定组织下的repo列表,以及每个repo的贡献者列表:

interface GitHubService {
@GET("orgs/{org}/repos?per_page=100")
fun getOrgReposCall(
@Path("org") org: String
): Call<List<Repo>>

@GET("repos/{owner}/{repo}/contributors?per_page=100")
fun getRepoContributorsCall(
@Path("owner") owner: String,
@Path("repo") repo: String
): Call<List<User>>
}

如下定义的loadContributorsBlocking函数使用此API来获取给定组织的贡献者列表:

fun loadContributorsBlocking(req: RequestData) : List<User> {
val service = createGitHubService(req.username, req.password)
val repos = service
.getOrgReposCall(req.org)
.execute()
.also { logRepos(req, it) }
.body() ?: listOf()

return repos.flatMap { repo ->
service
.getRepoContributorsCall(req.org, repo.name)
.execute()
.also { logUsers(repo, it) }
.bodyList()
}.aggregate()
}

首先,获得给定组织下的Repo List,并将其存储在repos中。 然后,对于每个repo,然后继续请求贡献者list,并将所有这些列表合并为一个最终的贡献者List

getOrgReposCallgetRepoContributorsCall都将Call类的实例返回。 调用Call.execute方法来执行请求。 execute方法将阻塞线程同步调用。

如下是扩展函数bodyList,如果出现错误则返回一个空的List<T>:

fun <T> Response<List<T>>.bodyList(): List<T> {
 return body() ?: listOf()
}
fun List<User>.aggregate(): List<User> =
 groupBy { it.login }
 .map { (login, group) -> User(login, group.sumBy { it.contributions }) }
 .sortedByDescending { it.contributions }

使用回调

在一个单独的线程中调用loadContributors:

thread { loadContributorsBlocking(req) }

如下是使用回调来执行方法:

fun loadContributorsBackground(req: RequestData, updateResults: (List<User>) -> Unit)

使用Retrofit的回调API

Retrofit回调API也可以实现。可以使用Call.enqueue函数,该函数启动HTTP请求并以回调作为参数。

fun loadContributorsCallbacks(req: RequestData, updateResults: (List<User>) -> Unit) {
 val service = createGitHubService(req.username, req.password)
 service.getOrgReposCall(req.org).onResponse { responseRepos -> 
 logRepos(req, responseRepos)
 val repos = responseRepos.bodyList()
 
 val allUsers = Collections.synchronizedList(mutableListOf<User>())
 val numberOfProcessed = AtomicInteger()
 for (repo in repos) {
 service.getRepoContributorsCall(req.org, repo.name).onResponse { responseUsers ->
 logUsers(repo, responseUsers)
 val users = responseUsers.bodyList()
 allUsers += users
 if (numberOfProcessed.incrementAndGet() == repos.size) {
 updateResults(allUsers.aggregate())
 }
 }
}
 }
}

使用挂起函数

Retrofit最近添加了对协程的支持,可以不再将Call<List<Repo>>返回,而是将API调用定义为挂起函数:

interface GitHubService {
 @GET("orgs/{org}/repos?per_page=100")
 suspend fun getOrgRepos(
 @Path("org") org: String
 ): List<Repo>
}

当使用挂起函数执行请求时,线程不会被阻塞,请注意,现在getOrgRepos会直接返回结果,而不是返回Call如果结果不成功,则引发异常。

Retrofit还允许将返回包装在Response的结果中。 在这种情况下,将提供结果主体,并且可以手动检查错误。

interface GitHubService {
// getOrgReposCall & getRepoContributorsCall declarations

@GET("orgs/{org}/repos?per_page=100")
suspend fun getOrgRepos(
@Path("org") org: String
): Response<List<Repo>>

@GET("repos/{owner}/{repo}/contributors?per_page=100")
suspend fun getRepoContributors(
@Path("owner") owner: String,
@Path("repo") repo: String
): Response<List<User>>
}
suspend fun loadContributorsSuspend(req: RequestData): List<User> {
val service = createGitHubService(req.username, req.password)
val repos = service
.getOrgRepos(req.org)
.also { logRepos(req, it) }
.bodyList()

return repos.flatMap { repo ->
service.getRepoContributors(req.org, repo.name)
.also { logUsers(repo, it) }
.bodyList()
}.aggregate()
}
 不再需要调用之前返回了Responseexecute,因为现在API函数直接返回了Response。 但这是特定于Retrofit库的实现细节。 对于其他库,API会有所不同,但是概念是相同的: 带有暂停功能的代码看起来惊人地类似于阻塞版本。 它具有可读性,并准确表达了正在努力实现的目标。

调用挂起方法:

launch {
 val users = loadContributorsSuspend(req)
 updateResults(users, startTime)
}
launch将启动新的计算。 此计算负责加载数据并显示结果。 这种计算是可挂起的:在执行网络请求时,它被挂起并释放线程。 当网络请求返回结果时,恢复计算。 这种可挂起的计算称为协程, 协程是在线程顶部运行的计算,可以暂停所谓暂停,是指相应的计算可以暂停,从线程中删除并存储在内存中。 同时,线程可以自由地被其他活动占用:
当准备好继续计算时,它会返回到线程(但不一定要返回到同一线程)。

并发

  与线程相比,Kotlin协程非常轻量级。 每当要异步启动新计算时,都可以创建一个新协程。

要启动新的协程,可以使用主要的协程构建器launchasyncrunBlocking。   async启动一个新的协程,并返回一个Deferred对象。 Deferred代表其他名称(例如FuturePromise)所熟知的概念:它存储计算,但在获得最终结果时将延迟。 它有望在将来的某个时候产生结果。

asynclaunch之间的主要区别在于,launch用于启动预计不会返回特定结果的计算。 launch返回代表协程的Job。通过调用Job.join(),可以等到完成。

Deferred是扩展Job的通用类型。异步调用可以返回Deferred <Int>Deferred <CustomType>,具体取决于lambda返回的内容(lambda中的最后一个表达式是结果)。

为了获得协程的结果,可以在Deferred实例上调用await()。 在等待结果时,调用的协程将暂停:

import kotlinx.coroutines.*

fun main() = runBlocking {
val deferred: Deferred<Int> = async {
loadData()
}
println("waiting...")
println(deferred.await())
}

suspend fun loadData(): Int {
println("loading...")
delay(1000L)
println("loaded!")
return 42
}

    runBlocking用作常规函数和挂起函数之间阻塞世界和非阻塞世界之间的桥梁。 它用作启动顶级主协程的适配器。

如果存在一个延迟对象列表,则可以调用awaitAll来等待所有对象的结果:

import kotlinx.coroutines.*

fun main() = runBlocking {
val deferreds: List<Deferred<Int>> = (1..3).map {
async {
delay(1000L * it)
println("Loading $it")
it
}
}
val sum = deferreds.awaitAll().sum()
println("$sum")
}

获取贡献者列表可以改为如下代码 :

suspend fun loadContributorsConcurrent(req: RequestData): List<User> = coroutineScope {
val service = createGitHubService(req.username, req.password)
val repos = service
.getOrgRepos(req.org)
.also { logRepos(req, it) }
.bodyList()val deferreds: List<Deferred<List<User>>> = repos.map { repo ->
async {
service.getRepoContributors(req.org, repo.name)
.also { logUsers(repo, it) }
.bodyList()
}
}
deferreds.awaitAll().flatten().aggregate()
 所有协程仍在主UI线程上运行。要想在公共线程池中的不同线程上运行协程非常容易。 指定Dispatchers.Default作为异步函数的上下文参数:
async(Dispatchers.Default) { ... }

CoroutineDispatcher确定了相应的协程应在哪个或哪些线程上运行。 如果不指定,那么async将使用外部作用域的调度程序。

Dispatchers.Default表示JVM上的共享线程池。 该池提供了一种并行执行的方法。 它包含与CPU可用内核数量一样多的线程,但是如果只有一个内核,它仍然具有两个线程。

要仅在主UI线程上运行协程,应指定Dispatchers.Main作为参数:

</div>
<div>launch(Dispatchers.Main) {
updateResults()
}
如果在主线程上启动新协程时主线程正忙,则协程将被挂起并计划在此线程上执行。 协程仅在线程空闲时恢复。应该从外部范围使用调度程序:
launch(Dispatchers.Default) {
 val users = loadContributorsConcurrent(service, req)
 withContext(Dispatchers.Main) {
 updateResults(users, startTime)
 }
}
 应该在主UI线程上调用updateResults,因此在Dispatchers.Main的上下文中进行调用。 withContext使用指定的协程上下文调用给定代码,挂起直到完成,然后返回结果。 另一种更冗长的表达方式是启动一个新的协程并显式等待(通过挂起)直到完成:
launch(context) { ... }.join()

结构化并发

  协程范围(Coroutine scope)负责不同协程之间的结构和父子关系,总是在范围内开始新的协程。 协程上下文(Coroutine context)存储用于运行给定协程的其他信息,例如调度程序指定应在其上调度协程的一个或多个线程。

使用launchasyncrunBlocking启动新协程时,它们会自动创建相应的作用域。 所有这些函数都将lambda用作接收器的参数,隐式接收器类型为CoroutineScope:

launch { /* this: CoroutineScope */
}
新协程只能在范围内启动。 launchasync被声明为CoroutineScope的扩展,因此在调用它们时必须始终传递隐式或显式接收器。 由runBlocking启动的协程是唯一的例外:runBlocking被定义为顶级函数。 但是因为它阻塞了当前线程,所以它主要用于主要功能并作为桥接功能进行测试。
 在runBlockinglaunchasync中启动新的协程时,它将在范围内自动启动:
import kotlinx.coroutines.*

fun main() = runBlocking { /* this: CoroutineScope */
launch { /* ... */ }
// the same as:
this.launch { /* ... */ }
}

当在runBlocking内部调用launch时,将其称为对CoroutineScope类型的隐式接收器的扩展。 或者,可以显式编写this.launch

可以说嵌套的协程(launch)是外部协程(runBlocking)的子级。 这种父子关系通过作用域起作用:子协程从与父协程相对应的作用域开始。

可以使用GlobalScope.asyncGlobalScope.launch从全局范围启动新的协程。 这将创建一个顶级的独立协程。

提供协程结构的机制称为结构化并发,结构化并发在全局范围内的好处如下:

  • scope通常负责子协程,并且子协程的生命周期依附于scope的生命周期。
  • 如果出现问题或用户只是改变主意并决定撤消操作,scope可以自动取消子协程操作。
  • scope会自动等待所有子协程的完成。因此,如果scope对应于一个协程,则父协程将不会完成,直到在其范围内启动的所有协程都执行完成为止。

使用GlobalScope.async时,从全局范围定义的协程都是独立的。它们的寿命仅受整个应用程序寿命的限制。 可以存储从全局范围开始的协程的引用,等待其完成或显式取消它,但是它不会像结构化的那样自动发生。

使用外部scope的Context

coroutineScope或由coroutine构建器创建的新作用域始终从外部作用域继承上下文。 在这种情况下,调用suspended loadContributorsConcurrentscope是外部的scope

launch(Dispatchers.Default) { //外部scope
 val users = loadContributorsConcurrent(service, req)
 // ...
}

所有嵌套的协程都是从继承的上下文自动开始的。 dispatcher就是这种情况的一部分。 这就是为什么async启动的所有协程都使用默认dispatcher的上下文启动的原因:

suspend fun loadContributorsConcurrent(req: RequestData): List<User> = coroutineScope {
 // 该scope从外部scope继承上下文
 // ... 
 async { // 嵌套协程从继承的上下文开始
 // ...
 }
 // ...
}

使用结构化并发,当创建顶级协程时,可以一次指定主要上下文元素(例如dispatcher)。 所有嵌套的协程都继承上下文并仅在需要时对其进行修改。

当为UI应用程序(例如Android)使用协程编写代码时,通常的做法是默认使用CoroutineDispatchers.Main作为顶层协程,然后在需要在不同线程上运行代码时显式放置其他dispatcher

Channels

   众所周知,编写具有共享可变状态的代码非常困难且容易出错。 通过通信共享信息而不是使用公共可变状态共享信息可以简化此过程。 协程可以通过channels相互通信。

channels允许在不同协程之间传递数据。 一个协程可以向channel发送一些信息,而另一个协程可以从该channel接收此信息:

发送(产生)信息的协程通常被称为生产者,而接收(消费)信息的协程被称为消费者。 在需要时,许多协程可以将信息发送到同一channel,许多协程也可以从该channel接收信息:

当许多协程从同一channel接收信息时,每个元素仅由一个使用者处理一次;处理意味着从会从channel中删除此元素。

可以认为channel类似于元素集合(直接的模拟将是一个队列:将元素添加到一端并从另一端接收)。 但是,有一个重要的区别:与集合不同,即使在同步版本中,通道也可以暂停sendreceive操作。 当channel为空或已满(channel的大小可能受到限制,可能已满)时,会发生这种情况。

Channel通过三个不同的接口表示:SendChannelReceiveChannel和继承前两个接口的Channel。 通常,创建一个channel并将其作为SendChannel实例提供给生产者,以便只有他们可以发送,并作为ReceiveChannel实例提供给消费者,以便只有他们可以从中接收。

注意,send和接收receive都声明为suspend方法

interface SendChannel<in E> {
suspend fun send(element: E)
fun close(): Boolean
}

interface ReceiveChannel<out E> {
suspend fun receive(): E
}

interface Channel<E> : SendChannel<E>, ReceiveChannel<E>

生产者可以关闭通道以指示不再有元素。

Library中定义了几种类型的Channel。 它们在内部可以存储多种元素,只是在send调用是否可以挂起方面有所不同。 对于所有通道类型,receive调用的行为方式相同:如果通道不为空,则接收元素,否则将挂起。

1、Unlimited channel

无限制通道(Unlimited channel)是最接近队列的模拟:生产者可以将元素发送到此通道,并且它将无限增长。 send方法将永远不会被挂起。 如果没有更多的内存,则会抛出OutOfMemoryException。 和队列不同的是当使用者尝试从空通道接收消息并被挂起直到有一些新元素发送到该通道时继续使用。

2、Buffered channel

缓冲通道(Buffered channel)的大小受指定数字的限制。 生产者可以将元素发送到此通道,直到达到最大限制。 所有元素都在内部存储。 通道已满时,下一个send呼叫将被挂起,直到出现更多可用空间。
3、Rendezvous channel
"约定"通道(Rendezvous channel)是没有缓冲区的通道。 这与创建大小为零的缓冲通道(Buffered channel)相同。 其中一个功能(sendreceive)始终被挂起,直到调用另一个功能为止。 如果调用了send函数,但消费者没有准备好处理该元素则receive会挂起,并且send也会被挂起。 同样,如果调用了receive函数且通道为空,换句话说,没有准备好发送该元素的的send被挂起-receive也会被挂起。

4、Conflated channel

发送到合并通道(Conflated channel)的新元素将覆盖先前发送的元素,因此接收方将始终仅能获取最新元素。 send调用将永远不会被挂起。
创建通道时,指定其类型或缓冲区大小(如果需要缓冲的通道):
val rendezvousChannel = Channel<String>()
val bufferedChannel = Channel<String>(10)
val conflatedChannel = Channel<String>(CONFLATED)
val unlimitedChannel = Channel<String>(UNLIMITED)
   默认情况下,会创建一个"约定"通道(Rendezvous channel)。
  在以下示例中,将创建一个"约定"通道,两个生产者协程和一个消费者协程:
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.*fun main() = runBlocking<Unit> {
val channel = Channel<String>()
launch {
channel.send("A1")
channel.send("A2")
log("A done")
}
launch {
channel.send("B1")
log("B done")
}
launch {
repeat(3) {
val x = channel.receive()
log(x)
}
}
}

fun log(message: Any?) {
println("[${Thread.currentThread().name}] $message")
}
以上将会打印如下结果:
[main @coroutine#4] A1
[main @coroutine#4] B1
[main @coroutine#2] A done
[main @coroutine#3] B done
[main @coroutine#4] A2
点赞

发表评论

昵称和uid可以选填一个,填邮箱必填(留言回复后将会发邮件给你)
tips:输入uid可以快速获得你的昵称和头像

Title - Artist
0:00