- 1:阻塞请求
- 2:使用回调
- 2.1:使用Retrofit的回调API
- 3:使用挂起函数
- 4:并发
- 5:结构化并发
- 5.1:使用外部scope的Context
- 6:Channels
Coroutines
提供了异步和非阻塞行为,但又不缺乏可读性。 使用协程执行网络请求,而不会阻塞线程,也不用使用回调。对于网络请求库,Retrofit已经支持协程。
阻塞请求
如下是使用Retrofit
对Github
执行HTTP
请求。 它允许请求给定组织下的repo
列表,以及每个repo
的贡献者列表:
interface GitHubService {
@GET("orgs/{org}/repos?per_page=100")
fun getOrgReposCall(
@Path("org") org: String
): Call<List<Repo>>
@GET("repos/{owner}/{repo}/contributors?per_page=100")
fun getRepoContributorsCall(
@Path("owner") owner: String,
@Path("repo") repo: String
): Call<List<User>>
}
如下定义的loadContributorsBlocking
函数使用此API
来获取给定组织的贡献者列表:
fun loadContributorsBlocking(req: RequestData) : List<User> {
val service = createGitHubService(req.username, req.password)
val repos = service
.getOrgReposCall(req.org)
.execute()
.also { logRepos(req, it) }
.body() ?: listOf()
return repos.flatMap { repo ->
service
.getRepoContributorsCall(req.org, repo.name)
.execute()
.also { logUsers(repo, it) }
.bodyList()
}.aggregate()
}
首先,获得给定组织下的Repo List
,并将其存储在repos
中。 然后,对于每个repo
,然后继续请求贡献者list
,并将所有这些列表合并为一个最终的贡献者List
。
getOrgReposCall
和getRepoContributorsCall
都将Call
类的实例返回。 调用Call.execute
方法来执行请求。 execute
方法将阻塞线程同步调用。
如下是扩展函数bodyList
,如果出现错误则返回一个空的List<T>
:
fun <T> Response<List<T>>.bodyList(): List<T> {
return body() ?: listOf()
}
fun List<User>.aggregate(): List<User> =
groupBy { it.login }
.map { (login, group) -> User(login, group.sumBy { it.contributions }) }
.sortedByDescending { it.contributions }
使用回调
在一个单独的线程中调用loadContributors
:
thread { loadContributorsBlocking(req) }
如下是使用回调来执行方法:
fun loadContributorsBackground(req: RequestData, updateResults: (List<User>) -> Unit)
使用Retrofit的回调API
Retrofit
回调API
也可以实现。可以使用Call.enqueue
函数,该函数启动HTTP
请求并以回调作为参数。
fun loadContributorsCallbacks(req: RequestData, updateResults: (List<User>) -> Unit) {
val service = createGitHubService(req.username, req.password)
service.getOrgReposCall(req.org).onResponse { responseRepos ->
logRepos(req, responseRepos)
val repos = responseRepos.bodyList()
val allUsers = Collections.synchronizedList(mutableListOf<User>())
val numberOfProcessed = AtomicInteger()
for (repo in repos) {
service.getRepoContributorsCall(req.org, repo.name).onResponse { responseUsers ->
logUsers(repo, responseUsers)
val users = responseUsers.bodyList()
allUsers += users
if (numberOfProcessed.incrementAndGet() == repos.size) {
updateResults(allUsers.aggregate())
}
}
}
}
}
使用挂起函数
Retrofit
最近添加了对协程的支持,可以不再将Call<List<Repo>>
返回,而是将API
调用定义为挂起函数:
interface GitHubService {
@GET("orgs/{org}/repos?per_page=100")
suspend fun getOrgRepos(
@Path("org") org: String
): List<Repo>
}
当使用挂起函数执行请求时,线程不会被阻塞,请注意,现在getOrgRepos
会直接返回结果,而不是返回Call
。 如果结果不成功,则引发异常。
Retrofit
还允许将返回包装在Response
的结果中。 在这种情况下,将提供结果主体,并且可以手动检查错误。
interface GitHubService {
// getOrgReposCall & getRepoContributorsCall declarations
@GET("orgs/{org}/repos?per_page=100")
suspend fun getOrgRepos(
@Path("org") org: String
): Response<List<Repo>>
@GET("repos/{owner}/{repo}/contributors?per_page=100")
suspend fun getRepoContributors(
@Path("owner") owner: String,
@Path("repo") repo: String
): Response<List<User>>
}
suspend fun loadContributorsSuspend(req: RequestData): List<User> {
val service = createGitHubService(req.username, req.password)
val repos = service
.getOrgRepos(req.org)
.also { logRepos(req, it) }
.bodyList()
return repos.flatMap { repo ->
service.getRepoContributors(req.org, repo.name)
.also { logUsers(repo, it) }
.bodyList()
}.aggregate()
}
Response
的execute
,因为现在API
函数直接返回了Response
。 但这是特定于Retrofit
库的实现细节。 对于其他库,API
会有所不同,但是概念是相同的: 带有暂停功能的代码看起来惊人地类似于阻塞
版本。 它具有可读性,并准确表达了正在努力实现的目标。调用挂起方法:
launch {
val users = loadContributorsSuspend(req)
updateResults(users, startTime)
}
launch
将启动新的计算。 此计算负责加载数据并显示结果。 这种计算是可挂起的:在执行网络请求时,它被挂起并释放线程。 当网络请求返回结果时,恢复计算。 这种可挂起的计算称为协程, 协程是在线程顶部运行的计算,可以暂停。 所谓暂停,是指相应的计算可以暂停,从线程中删除并存储在内存中。 同时,线程可以自由地被其他活动占用:并发
与线程相比,Kotlin
协程非常轻量级。 每当要异步启动新计算时,都可以创建一个新协程。
要启动新的协程,可以使用主要的协程构建器:launch
,async
或runBlocking
。 async
启动一个新的协程,并返回一个Deferred
对象。 Deferred
代表其他名称(例如Future
或Promise
)所熟知的概念:它存储计算,但在获得最终结果时将延迟。 它有望在将来的某个时候产生结果。
async
与launch
之间的主要区别在于,launch
用于启动预计不会返回特定结果的计算。 launch
返回代表协程的Job
。通过调用Job.join()
,可以等到完成。
Deferred
是扩展Job
的通用类型。异步调用可以返回Deferred <Int>
或Deferred <CustomType>
,具体取决于lambda
返回的内容(lambda
中的最后一个表达式是结果)。
为了获得协程的结果,可以在Deferred
实例上调用await()
。 在等待结果时,调用的协程将暂停:
import kotlinx.coroutines.*
fun main() = runBlocking {
val deferred: Deferred<Int> = async {
loadData()
}
println("waiting...")
println(deferred.await())
}
suspend fun loadData(): Int {
println("loading...")
delay(1000L)
println("loaded!")
return 42
}
runBlocking
用作常规函数和挂起函数之间阻塞世界和非阻塞世界之间的桥梁。 它用作启动顶级主协程的适配器。
如果存在一个延迟对象列表,则可以调用awaitAll
来等待所有对象的结果:
import kotlinx.coroutines.*
fun main() = runBlocking {
val deferreds: List<Deferred<Int>> = (1..3).map {
async {
delay(1000L * it)
println("Loading $it")
it
}
}
val sum = deferreds.awaitAll().sum()
println("$sum")
}
获取贡献者列表可以改为如下代码 :
suspend fun loadContributorsConcurrent(req: RequestData): List<User> = coroutineScope {
val service = createGitHubService(req.username, req.password)
val repos = service
.getOrgRepos(req.org)
.also { logRepos(req, it) }
.bodyList()val deferreds: List<Deferred<List<User>>> = repos.map { repo ->
async {
service.getRepoContributors(req.org, repo.name)
.also { logUsers(repo, it) }
.bodyList()
}
}
deferreds.awaitAll().flatten().aggregate()
UI
线程上运行。要想在公共线程池中的不同线程上运行协程非常容易。 指定Dispatchers.Default
作为异步函数的上下文参数:async(Dispatchers.Default) { ... }
CoroutineDispatcher
确定了相应的协程应在哪个或哪些线程上运行。 如果不指定,那么async
将使用外部作用域的调度程序。
Dispatchers.Default
表示JVM
上的共享线程池。 该池提供了一种并行执行的方法。 它包含与CPU
可用内核数量一样多的线程,但是如果只有一个内核,它仍然具有两个线程。
要仅在主UI
线程上运行协程,应指定Dispatchers.Main
作为参数:
</div>
<div>launch(Dispatchers.Main) {
updateResults()
}
launch(Dispatchers.Default) {
val users = loadContributorsConcurrent(service, req)
withContext(Dispatchers.Main) {
updateResults(users, startTime)
}
}
UI
线程上调用updateResults
,因此在Dispatchers.Main
的上下文中进行调用。 withContext
使用指定的协程上下文调用给定代码,挂起直到完成,然后返回结果。 另一种更冗长的表达方式是启动一个新的协程并显式等待(通过挂起)直到完成:launch(context) { ... }.join()
结构化并发
协程范围(Coroutine scope
)负责不同协程之间的结构和父子关系,总是在范围内开始新的协程。 协程上下文(Coroutine context
)存储用于运行给定协程的其他信息,例如调度程序指定应在其上调度协程的一个或多个线程。
使用launch
、async
或runBlocking
启动新协程时,它们会自动创建相应的作用域。 所有这些函数都将lambda
用作接收器的参数,隐式接收器类型为CoroutineScope
:
launch { /* this: CoroutineScope */
}
launch
和async
被声明为CoroutineScope
的扩展,因此在调用它们时必须始终传递隐式或显式接收器。 由runBlocking
启动的协程是唯一的例外:runBlocking
被定义为顶级函数。 但是因为它阻塞了当前线程,所以它主要用于主要功能并作为桥接功能进行测试。runBlocking
、launch
或async
中启动新的协程时,它将在范围内自动启动:import kotlinx.coroutines.*
fun main() = runBlocking { /* this: CoroutineScope */
launch { /* ... */ }
// the same as:
this.launch { /* ... */ }
}
当在runBlocking
内部调用launch
时,将其称为对CoroutineScope
类型的隐式接收器的扩展。 或者,可以显式编写this.launch
。
可以说嵌套的协程(launch
)是外部协程(runBlocking
)的子级。 这种父子关系通过作用域起作用:子协程从与父协程相对应的作用域开始。
可以使用GlobalScope.async
或GlobalScope.launch
从全局范围启动新的协程。 这将创建一个顶级的独立协程。
提供协程结构的机制称为结构化并发,结构化并发在全局范围内的好处如下:
scope
通常负责子协程,并且子协程的生命周期依附于scope
的生命周期。- 如果出现问题或用户只是改变主意并决定撤消操作,
scope
可以自动取消子协程操作。 scope
会自动等待所有子协程的完成。因此,如果scope
对应于一个协程,则父协程将不会完成,直到在其范围内启动的所有协程都执行完成为止。
使用GlobalScope.async
时,从全局范围定义的协程都是独立的。它们的寿命仅受整个应用程序寿命的限制。 可以存储从全局范围开始的协程的引用,等待其完成或显式取消它,但是它不会像结构化的那样自动发生。
使用外部scope的Context
由coroutineScope
或由coroutine
构建器创建的新作用域始终从外部作用域继承上下文。 在这种情况下,调用suspended loadContributorsConcurrent
的scope
是外部的scope
:
launch(Dispatchers.Default) { //外部scope
val users = loadContributorsConcurrent(service, req)
// ...
}
所有嵌套的协程都是从继承的上下文自动开始的。 dispatcher
就是这种情况的一部分。 这就是为什么async
启动的所有协程都使用默认dispatcher
的上下文启动的原因:
suspend fun loadContributorsConcurrent(req: RequestData): List<User> = coroutineScope {
// 该scope从外部scope继承上下文
// ...
async { // 嵌套协程从继承的上下文开始
// ...
}
// ...
}
使用结构化并发,当创建顶级协程时,可以一次指定主要上下文元素(例如dispatcher
)。 所有嵌套的协程都继承上下文并仅在需要时对其进行修改。
当为UI
应用程序(例如Android
)使用协程编写代码时,通常的做法是默认使用CoroutineDispatchers.Main
作为顶层协程,然后在需要在不同线程上运行代码时显式放置其他dispatcher
。
Channels
众所周知,编写具有共享可变状态的代码非常困难且容易出错。 通过通信共享信息而不是使用公共可变状态共享信息可以简化此过程。 协程可以通过channels
相互通信。
channels
允许在不同协程之间传递数据。 一个协程可以向channel
发送一些信息,而另一个协程可以从该channel
接收此信息:
发送(产生)信息的协程通常被称为生产者,而接收(消费)信息的协程被称为消费者。 在需要时,许多协程可以将信息发送到同一channel
,许多协程也可以从该channel
接收信息:
当许多协程从同一channel
接收信息时,每个元素仅由一个使用者处理一次;处理意味着从会从channel
中删除此元素。
可以认为channel
类似于元素集合(直接的模拟将是一个队列:将元素添加到一端并从另一端接收)。 但是,有一个重要的区别:与集合不同,即使在同步版本中,通道也可以暂停send
和receive
操作。 当channel
为空或已满(channel
的大小可能受到限制,可能已满)时,会发生这种情况。
Channel
通过三个不同的接口表示:SendChannel
、ReceiveChannel
和继承前两个接口的Channel
。 通常,创建一个channel
并将其作为SendChannel
实例提供给生产者,以便只有他们可以发送,并作为ReceiveChannel
实例提供给消费者,以便只有他们可以从中接收。
注意,send
和接收receive
都声明为suspend
方法
interface SendChannel<in E> {
suspend fun send(element: E)
fun close(): Boolean
}
interface ReceiveChannel<out E> {
suspend fun receive(): E
}
interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
生产者可以关闭通道以指示不再有元素。
Library
中定义了几种类型的Channel
。 它们在内部可以存储多种元素,只是在send
调用是否可以挂起方面有所不同。 对于所有通道类型,receive
调用的行为方式相同:如果通道不为空,则接收元素,否则将挂起。
1、Unlimited channel
Unlimited channel
)是最接近队列的模拟:生产者可以将元素发送到此通道,并且它将无限增长。 send
方法将永远不会被挂起。 如果没有更多的内存,则会抛出OutOfMemoryException
。 和队列不同的是当使用者尝试从空通道接收消息并被挂起直到有一些新元素发送到该通道时继续使用。2、Buffered channel
Buffered channel
)的大小受指定数字的限制。 生产者可以将元素发送到此通道,直到达到最大限制。 所有元素都在内部存储。 通道已满时,下一个send
呼叫将被挂起,直到出现更多可用空间。Rendezvous channel
)是没有缓冲区的通道。 这与创建大小为零的缓冲通道(Buffered channel
)相同。 其中一个功能(send
或receive
)始终被挂起,直到调用另一个功能为止。 如果调用了send
函数,但消费者没有准备好处理该元素则receive
会挂起,并且send
也会被挂起。 同样,如果调用了receive
函数且通道为空,换句话说,没有准备好发送该元素的的send
被挂起-receive
也会被挂起。4、Conflated channel
Conflated channel
)的新元素将覆盖先前发送的元素,因此接收方将始终仅能获取最新元素。 send
调用将永远不会被挂起。val rendezvousChannel = Channel<String>()
val bufferedChannel = Channel<String>(10)
val conflatedChannel = Channel<String>(CONFLATED)
val unlimitedChannel = Channel<String>(UNLIMITED)
Rendezvous channel
)。import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.*fun main() = runBlocking<Unit> {
val channel = Channel<String>()
launch {
channel.send("A1")
channel.send("A2")
log("A done")
}
launch {
channel.send("B1")
log("B done")
}
launch {
repeat(3) {
val x = channel.receive()
log(x)
}
}
}
fun log(message: Any?) {
println("[${Thread.currentThread().name}] $message")
}
[main @coroutine#4] A1
[main @coroutine#4] B1
[main @coroutine#2] A done
[main @coroutine#3] B done
[main @coroutine#4] A2