协程的初次尝试与学习

协程的初次尝试与学习

九月 20, 2020

本文内容基于Kotlin 1.4,可能存在更低版本不包含的新特性或语法,可能存在更高版本已抛弃的语法或API。

你可以通过Gradle引入

1
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.0.0-RC1'

概述

尽管kotlincn.net中已经有非常详细的文档了,但是个人感觉还是没有找到重点,就像一篇文章不知道它的中心思想一样,每一个用例感觉都懂了,但是又感觉都没有懂。所以这里留下个人学习的笔记。

概念

Kotlin-JVM由于Java的局限性,是没有办法做到类似于Go语言的协程的,而官网也承认了,这只是基于Java线程池的轻量级线程。
翻译过来,可以理解为,利用线程池,在线程基础上再次包装了一个协程任务框架。就像Runnable对于Thread的关系一样,但是它包含了更多的逻辑,可以抛开线程的概念,是一个异步任务的框架。这样理解,可能会更加清晰一些了。

基本组成

CoroutineScope

名字直译过来是协程域,可以理解为,这是一个管理器,一个容器。协程必须要在域内才能运行,并且域有义务管理协程的生命周期,做到最基本的等待以及取消协程等工作,就像ViewGroup一样。
它可以被继承,然后来实现不同场景的效果,它也是可以嵌套的。

Job

名字直译可以是作业,个人感觉理解成“任务”也是可以的,它表示一个协程,当创建一个协程时,就会得到一个Job对象,它代表了协程对象,可以用它来一定程度上控制协程。比如取消协程。但是注意,尽管官网没有重点提醒,但是这里需要重点说明一下。协程所谓的取消,指的是任务开始前取消执行,如果任务在运行过程中(CPU占用中)。那么是不能被取消的,也就是说,它本质上只是一个状态位,判定不通过的话不启动而已。因此,如果一个协程任务是可以被中止的,那么需要在任务中的逻辑节点中手动判定一下,任务内部又一个隐藏的变量isActive,如果它返回了False,那么表示协程被取消了,此时需要中断逻辑。另一方面,协程中断某些逻辑性任务时,会抛出一个任务取消的异常,那么也可以理解为,协程的中断依赖于Java的异常机制。

任务的创建可以在协程域之外,也有多种创建方式,但是最终的运行,仍然需要在域中。

CoroutineContext

名字直译可以理解为协程上下文,它是表示协程的行为集合,包括

  • Job的生命周期
  • CoroutineDispatcher协程调度器的指定
  • CoroutineName协程名称,主要用于调试
  • CoroutineExceptionHandler协程异常处理器,主要处理未捕获的异常

它会存在于Job中,他默认会从环境(域)中继承,并且在创建时确定下来。它可以Job创建时额外指定,以此来切换线程。

CoroutineDispatcher

从名字来看,是协程调度器,它的用处,其实更加明确的说法是:“指定协程运行的线程池”。是的,是线程池,这里需要说到一个注意点,协程是线程之上的结构化任务管理,那么它就希望使用者尽量抛弃线程概念,同时尽可能复用资源,那么就会存在一个可怕的事情,协程经过一次挂起,可以立即为“暂停”,那么下次继续运行的时候,极大可能就会变成了另一个线程,如果协程运行过程中,将一些数据关联到协程之外的线程上,那么数据可能就不在了,同时,如果运行时,通过线程信息来标示任务,那么也可能因为线程的切换导致标记失效。
同样,如果使用了错误的调度器,那么也会导致一些奇怪的问题发生,一般情况下,IO调度器,主要用于执行一些网络访问,数据库读写,文件读写等与读写有关的操作,据文档所说,协程会在一定程度上优化这个过程。而内存中的常规计算或者解析,比如Json的解析,排序,高斯模糊等,就建议在Default中了,他主要针对CPU计算做了些优化。

结构

协程的结构,可以根据他们的表现特性,来组建一下层次。
这里我说一下我的理解,协程域管理一组协程,可以通过控制来直接控制内部协程的取消与中止,还可以控制他们的开始时机等。
但是并不代表同一个的协程在同一个线程池运行,它更像是一个业务层的管理者,而协程的运行线程,取决于Job创建时指定的调度器,然后帮我们做好了回调函数,来多个任务间切换,看起来就像同步一样。同样,每个Job中,又再次嵌套了别的,这样,一个树状的协程结构就出来了。
尽管是包含关系的层次嵌套,但是因为运行线程不受这个结构约束,那么可以想象带来的好处就是,一个数据请求的业务需求,可以建立一个关联图形界面的生命周期,当他关闭时,停止所有操作。接着,在这里里面,创建网络连接的的任务,关联到IO调度器。再创建数据处理的任务,关联到Default调度器。因为协程的挂起能力,尽管同时创建,但是我们仍然可以让它在数据请求回来之后开始执行。最后数据处理结果返回,这时线程处于最初创建的中,位于UI线程,我们可以刷新界面了。
上述结构,只是基于前面概念描述的猜想,但是能不能做到呢?我们需要实际分析一下。

方法

注意,此处标题我用的是方法,因为这些都不是Kotlin标准库中的关键字,而是协程扩展库中的顶级函数,看起来会像是关键字。

launch

从名字上看,这就是发射,启动的方法。它是提供的方法,用途是创建并且启动一个协程任务,同时返回一个Job对象,但是需要注意的是,它类似于我们的线程池execute方法,它并不会有返回值,就只是单纯的启动了一个协程任务。
它可以在常规方法中启动。

async

名字来看,这是异步的意思,它也是提供的方法,它的作用和launch相似,都是创建并且启动一个协程任务,但是它会存在返回值,也就是协程任务的返回值可以通过它的返回的对象取到。
它返回的是一个Deferred对象,可以通过它的await()方法,以阻塞的方式等待结果。
它不能从常规方法中启动。

runBlocking

这是一个顶层的方法,主要用途是基于当前线程创建一个。它的表现有以下几点:

  • 可以使用协程,不管是运行在当前线程还是其他线程
  • 它具有的基本特点,内部包含一个loop循环,保证方法不会在协程任务结束之前结束掉。(这在main方法中很有用)
  • 它可以有返回值
delay

这也是一个顶层函数,它主要用于协程中的挂起,它和线程的sleep是有区别的,它使用的是挂起,也就是说,不会一直占用线程。

coroutineScope

这是一个顶层方法,但是要求在一个中执行,它会创建另一个,可以理解为创建另一组任务。它将返回一个对象,可以取消内部的全部协程任务。

withContext

指定一个调度器来执行协程,并且会把结果作为返回值当作方法的返回值。它需要在中使用,并且他是会使协程挂起。

yield

这是一个协程中的表示当前任务让出线程,并且挂起。它类似于任务的重排,将任务放置到队列最后,直到有资源执行任务。让出挂起期间,可以被取消任务。
后遗症是,yield后的任务,可能运行在下一个线程中了。

suspend

这是一个关键字,前缀在fun之前,用于表示这是一个协程挂起方法。但是只有在导入协程库之后,才会有效。它的表现是:

  • 可以在方法中直接使用协程中的方法。
  • 只能在协程中调用这个方法
  • 调用时,协程处于挂起状态,直到它执行完毕,产生返回值
  • 如果方法中存在另一个协程方法,那么以此类推。

使用

上面介绍的都是一些概念性的东西,现在尝试使用一下它们。

开始

我们使用IDEA来编写Demo,记得项目建好后,导入协程仓库,就在文章开头。
首先,我们需要创建一个main方法,由于上面内容可以知道,协程需要在中,我们需要一个。我们可以这样:

1
2
3
fun main() = runBlocking<Unit> {

}

使用runBlocking在主线程创建一个,并且它自带了loop,我们可以不用担心协程在异步过程中,应用程序结束了。

接着,我们尝试让它做一个异步任务,就像这样:

1
2
3
4
5
6
7
fun main() = runBlocking<Unit> {
launch { // 在后台启动一个新的协程并继续
delay(1000L) // 非阻塞的等待 1 秒钟(默认时间单位是毫秒)
println("World!") // 在延迟后打印输出
}
println("Hello,") // 协程已在等待时主线程还在继续
}

这是官网上的demo,它会在中启动任务,并且等待全部执行完毕后关闭。
输出结果是:

1
2
3
4
Hello,
World!

Process finished with exit code 0

这里,我尝试了不按照官网的方式来写。比如这样:

1
2
3
4
5
6
7
fun main() = runBlocking<Unit> {
GlobalScope.launch() { // 在后台启动一个新的协程并继续
delay(1000L) // 非阻塞的等待 1 秒钟(默认时间单位是毫秒)
println("World!") // 在延迟后打印输出
}
println("Hello,") // 协程已在等待时主线程还在继续
}

输出结果是:

1
2
3
Hello,

Process finished with exit code 0

结果中,协程延迟1秒后的输出没有了,因为任务结束了。不是说runBlocking会等待所有任务结束吗?
这里我们理一下逻辑,runBlocking本质是在当前环境创建一个,那么它的等待功能来源于本身,而我们使用的是GlobalScope,这是一个全局,尽管代码是在runBlocking的里面,但是真正的协程却是放在了全局中。
那么我们可以得到一个结论,我们代码不管写在哪里,本质上还是需要看任务是放到了哪个中。
尽管看起来像是废话,但是需要注意的是,这样的结果就会引申出一些结论了:

  • 和我们的List没有本质区别,需要添加进去才会有效。
  • 任务嵌套与关联时,需要注意的关联与区别,否则可能发生逻辑错误但是又找不到原因。
  • 的任务等待以及任务管理功能,仅针对明确添加到中的任务,的嵌套只有使用coroutineScope创建的才会有效果,否则也会产生上面的效果。

方法

那么我们继续尝试它的其他特性,这里我们跳过那些看起来差不多的用例。我们直接尝试抽取协程到方法。

1
2
3
4
5
6
7
8
9
fun main() = runBlocking<Unit> {
val world = test()
println("Hello, $world")
}

suspend fun test(): String {
delay(1000)
return "World!"
}

输出结果是:

1
2
3
Hello, World!

Process finished with exit code 0

这里我们没有使用官网的案例了,可以看到,我们直接用suspend声明了一个方法,里面挂起了1000ms,然后main里面直接调用并且使用了返回值。
结果也符合我们的预期,那么发生了什么?

上面我们有说到,suspend会让调用者挂起,然后等待返回值,这里我们可以理解,它在逻辑上发生了阻塞,并且等待结果产生后,再恢复了逻辑。
就相当于,我们在test方法传入了一个回调函数,它的返回值传入了回调函数,而我们最后的println语句是写在回调函数中。

尽管如此,但是挂起这个词还是让人感觉很不安,因为到底是挂起了谁?又把谁停止了下来。

这里,我们再次回忆一下上面的概念,协程必须在中执行,就算是当前线程,那么也必须以一个当前线程的为起点,而我们说了这是一个管理器而已,内部有loop,那么挂起的就是这个了。
那么挂起的这个,会影响到外部吗?或者说,会产生什么影响吗?
我们其实可以直接试试:

1
2
3
4
5
6
7
8
9
10
11
12
13
fun main() {
println("on start, ${System.currentTimeMillis()}")
runBlocking<Unit> {
val world = test()
println("Hello, $world, ${System.currentTimeMillis()}")
}
println("on end, ${System.currentTimeMillis()}")
}

suspend fun test(): String {
delay(1000)
return "World!"
}

输出结果是:

1
2
3
4
5
on start, 1600587921039
Hello, World!, 1600587922113
on end, 1600587922115

Process finished with exit code 0

我在打印时,带上了时间戳。并且runBlocking不再是整个main了。我们从打印结果就可以总结几个点:

  • 的内部循环,会阻塞所在线程(一般情况下)
  • 挂起指的是内部的loop挂起当前任务,加入到循环末尾,等待结果或者等待结束(这一点是依据JavaScript语言特点推论)

异步

上面那么多,其实我们似乎还是做不到最开始的猜想,那么我们再增加一点东西:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
fun main() {
println("on start, ${System.currentTimeMillis()}")
runBlocking<Unit> {
val data = async(Dispatchers.IO) { getData() }
val world = async(Dispatchers.Default) { parse(data.await()) }
println("Hello, ${world.await()}, ${System.currentTimeMillis()}")
}
println("on end, ${System.currentTimeMillis()}")
}

suspend fun getData(): String {
println("getData, ${System.currentTimeMillis()}")
delay(1000)
return "{\"value\":\"World!\"}"
}

suspend fun parse(value: String): String {
println("parse, ${System.currentTimeMillis()}")
delay(1000)
val json = JSONObject(value)
return json.optString("value")?:""
}

输出结果是:

1
2
3
4
5
6
7
on start, 1600589874877
getData, 1600589875013
parse, 1600589876030
Hello, World!, 1600589877043
on end, 1600589877043

Process finished with exit code 0

可以看到,耗时仍然很多,但是,我们做了一开始的设想。
我们有一个获取数据的方法,它是一个耗时的操作
我们还有一个数据解析的方法,它也是一个耗时的操作
而我们的解析方法,需要获取数据方法的返回值
我们上面的操作就完成了这个过程。

但是在写的过程中,发现一个疑惑,data.await()到底应该放在哪里呢?
我想了很久,觉得还是应该放在async里面,因为这是一个阻塞当前协程的挂起方法,会等待返回值,如果我希望的是等待最终返回值,那么这个等待,应该是交给下一个处理方法来等待。尽管看起来总的耗时不变,但是语意和结果就变了,也许我发起parse的异步任务之后,还有其他任务呢?如果data.await()放在外面,那岂不是要全部后续方法都等着他?这不是我们希望的,所以最后还是放在了async里面。
我们可以写个demo验证一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
fun main() {
println("on start, ${System.currentTimeMillis()}")
runBlocking<Unit> {
val data = async(Dispatchers.IO) { getData() }
val world = async(Dispatchers.Default) { parse(data.await()) }
launch { other() }
println("Hello, ${world.await()}, ${System.currentTimeMillis()}")
}
println("on end, ${System.currentTimeMillis()}")
}

suspend fun getData(): String {
println("getData, ${System.currentTimeMillis()}")
delay(1000)
return "{\"value\":\"World!\"}"
}

suspend fun parse(value: String): String {
println("parse, ${System.currentTimeMillis()}")
delay(1000)
val json = JSONObject(value)
return json.optString("value")?:""
}

suspend fun other() {
println("other start, ${System.currentTimeMillis()}")
delay(1000)
println("other end, ${System.currentTimeMillis()}")
}

上面我又加了个异步任务,让我们看看结果:

1
2
3
4
5
6
7
8
9
on start, 1600590487126
getData, 1600590487219
other start, 1600590487227
other end, 1600590488232
parse, 1600590488234
Hello, World!, 1600590489248
on end, 1600590489248

Process finished with exit code 0

可以看到,需要等待的解析任务没有影响到另一个任务的执行。那么按照我们上面的分析,data.await()放到外面呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
fun main() {
println("on start, ${System.currentTimeMillis()}")
runBlocking<Unit> {
val data = async(Dispatchers.IO) { getData() }.await()
val world = async(Dispatchers.Default) { parse(data) }
launch { other() }
println("Hello, ${world.await()}, ${System.currentTimeMillis()}")
}
println("on end, ${System.currentTimeMillis()}")
}

suspend fun getData(): String {
println("getData, ${System.currentTimeMillis()}")
delay(1000)
return "{\"value\":\"World!\"}"
}

suspend fun parse(value: String): String {
println("parse, ${System.currentTimeMillis()}")
delay(1000)
val json = JSONObject(value)
return json.optString("value")?:""
}

suspend fun other() {
println("other start, ${System.currentTimeMillis()}")
delay(1000)
println("other end, ${System.currentTimeMillis()}")
}

可以看到,只改了一个.await()的位置,那么结果呢?

1
2
3
4
5
6
7
8
9
on start, 1600590621552
getData, 1600590621642
parse, 1600590622658
other start, 1600590622660
other end, 1600590623664
Hello, World!, 1600590623673
on end, 1600590623674

Process finished with exit code 0

可以看到,结果符合预期,它影响了我们后续的任务。这个挂起,只是相对线程来说的,可以释放挂起时的线程资源。但是对于内部来说,就是阻塞了。

Android

以上就是协程基本使用和一些特性了,那么Android中怎么使用呢?
我们知道,Android中,我们不用也不能自己写main方法,同时有UI线程,这是不能阻塞的,那么用协程,是怎么样的呢?我们一起体验一下。

开始

AndroidXLifecycleScope其实已经提供了包装好的并且关联生命周期的协程方法,但是此时我们是为了探究协程本身,因此,这里只使用协程的核心库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class MainActivity : AppCompatActivity() {

// 由于协程必须要有容器,因此,我们准备一个与Activity关联的域,它的对应调度器是主线程
// 表示这是一个基于主线程的域
private val activityScope = CoroutineScope(Dispatchers.Main)

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
setSupportActionBar(toolbar)

// 触发更新方法
update()
}

// 声明一个耗时的数据加载方法,我们让它延时1秒
private suspend fun loadData(arg: String): String {
delay(1000)
return "Hello $arg"
}

// 更新的方法,这里我们需要完成异步数据加载以及UI的更新
private fun update() {
// 用当前的域来启动一个协程
activityScope.launch {
val arg = "Kotlin"
// 切换到IO协程做数据加载,当前位置挂起
val result = withContext(Dispatchers.IO) {
loadData(arg)
}
// 得到数据后,为View更新数据
textView.text = result
}
}

override fun onDestroy() {
super.onDestroy()
// 清理协程任务
activityScope.cancel()
}

}

这个demo的布局文件是一个TextView。显示效果是应用打开1秒后,界面的文本发生了变化。
然后我们在onDestroy中清理了协程任务。

一切看起来没什么问题,运行起来也没有什么问题。但是细心的小伙伴是不是发现了什么问题?
为什么前面的main里面用创建出来的运行时,会阻塞后面的输出,我们说了,里面是有loop的,会阻塞线程的,但是这里为什么没有?甚至时间加长到10秒,都没有发生ANR呢?

这里我们似乎弄混了一件事情,前面的阻塞,是runBlocking方式创建的,而且是这个本身阻塞了。而我们这里,虽然也创建了一个,但是这个只是在主线程,并不代表它整个运行在主线程。可以理解为,它是一个Handler,挂起时是不占用线程的。

结束

上面还是一个非常基础的Demo,但是,我也不打算继续举例了,因为协程的案例非常多,官方或者第三方实现的集成库也非常多。弄明白了基本的逻辑及特性之后,只剩下了个人的喜好选择而已。因此,再写下去也没有了必要。

后记

参考文档