Kotlin 协程的实现原理简析

大纲

背景: 使用

Kotlin Coroutine 的使用方法,参照官方文档食用即可。这里只简单给出一些概念。

suspend

​使用 suspend 表示函数支持挂起操作,目的在于告诉编译器,该方法可能产生阻塞(因为普通方法也能使用 suspend 标记,但是实际不会起作用)。suspend 方法会被编译为继承 SuspendLambda 的类

创建协程

  • launch

    返回 *Job*,​能够获取和控制当前 Scope 下的协程的状态,比如取消(cancel)协程、获取是否运行(isActive)。

  • async

    ​返回 *Deferred*,Deferred 继承自 Job,它拥有 Job 的功能。另外 Deferred 还类似 Java UTC 中的 Future 接口,通过 Deferred 能够获取协程执行完毕之后的结果。

  • withContext

    ​切换代码块的执行上下文

结构化并发

​将多个协程放到一个可以管理的空间里面,这个空间的名字就叫 CoroutineScope。通过 Scope 就能够统一的管理内部的协程,方便对多个协程整体上做取消(cancel)、等待结果(join)等操作。

实现原理

Kotlin 的协程实现属于有限状态机编程, 有限状态机编程是编程范式的一种。是指利用有限状态机来进行编程实现。

Kotlin 在 JVM 上实现的协程机制,本身没有超脱出 JVM 的范畴,也不能够超脱,所以代码本质还是运行在操作系统级别的线程上的。所以 Kotlin Coroutine 的实现就是要在 JVM 上实现一套代码(任务)的挂起和恢复机制,而挂起和恢复刚好能抽取为两种状态,于是要实现代码运行的挂起和恢复的需求,就转变为了实现一种控制状态转移的需求。而有限状态下的编程,使用有限状态机编程范式再合适不过了。

Kotlin 协程本质还是运行在线程上的,所以如果从代码的运行角度来看,并没有太多的魔法,代码的运行机制和传统一样,虚拟机按行读取指令和操作数,然后执行操作。所以 Kotlin 协程的魔法更多是在编译期就开始了。

​执行的最小单元 CodeBlock

在 Kotlin 代码编译时,launch/asyncsuspend 方法中的代码会根据挂起点被拆分到多个 Code Block 中。Code Block 会被封装为 Runnable,被封装的 Runnable 会被 Dispatcher 执行。使用哪种 Dispatcher 则由传递给 launch/asyncCoroutineContext 参数决定。

例子代码:

class RunBlockingDemo {
    fun demo() {
        runBlocking {
            println(1) // block 1
            launch { // block 1
                println(3) // block 2
                suspendFunc() // block 2
                println(6) // block 3
            }
            println(2) // block 1
        }
    }

    private suspend fun suspendFunc() {
        println(4) // block 4
        delay(500) // block 4
        println(5) // block 5
    }
}

例子代码中,使用的 runBlocking 来创建协程,runBlocking 会为内部的协程提供一个阻塞当前线程的 EventLoop 队列,等队列中的所有协程都执行完之后,runBlocking 才会执行完毕。例子代码中的 launch 没有传递额外的 CoroutineContext 参数,所以它会继承 runBlocking 的 context 去使用。

例子代码的执行过程可以简述为:

运行流 说明
1. 调用 runBlockingblock 1 入队,然后开始循环从队列中消费任务
2. block 1 出队执行,输出 1
3. launch 被调用,block 2 入队
4. 输出 3
5. block 1 执行完成,执行继续从队列中取下一个任务
6. block 2 出队执行,输出 3,调用 suspendFunc 方法
8. suspendFunc 方法在编译之后,会在调用时先持有它自己的代码执行完成之后将要继续执行的代码块(block 3)的引用(在它自己的代码执行完成之后,就会恢复执行 block 3)
9. 执行 suspendFunc ,输出 4
10. 调用 delayblock 5 会被封装为一个 delay task 并入队
11. block 2block 4 执行完毕,执行继续从队列中取下一个任务
12. 这时队列中存在的任务是 block 5 ,循环会一直循环等待,直到到满足了 block 5 delay 的时间时就将 block 5 出队
13. block 5 出队执行,输出 5
14. block 5 执行完成就代表 suspendFunc 执行完毕了,就会恢复执行 suspendFunc 在进入时持有的 block 3
15. block 3 执行,输出 6
16. runBlocking 中的所有代码执行完毕,程序执行完毕

通过上面这个例子能看出,虽然所有代码都是在同一个线程执行的,但是 Kotlin 协程却实现了非阻塞的运行(println(2) 不会被 println(3) 阻塞),而协程内部又是按照同步的方式执行的(println(5)delay 完成之后才会被执行)。

正是由于编译器将来自不同协程的代码块相互交错的插入到事件循环队列中,才让仅使用一个线程就能实现代码块的挂起和恢复得以实现。

其他的 CoroutineContext 或许使用不同的 Dispatcher 在不同的线程上采用不同的策略去执行协程,但是其过程与这个例子是类似的。

​维护执行状态的 Continuation

到目前为止,我们已经了解了使代码块执行的逻辑概念了。但是,在该概念是如何实现上面还存在一些疑惑。

  1. 如何在已编译的 Java 字节码中实现这个逻辑概念?
  2. 如何跟踪代码块的执行状态?或者说如何决定一个代码块执行完成之后该执行的哪一个代码块?

生成的类

我尝试通过 JD-GUI 去反编译 RunBlockingDemo 类,但是 suspendFuncA 方法 JD-GUI 反编译失败了,没有显示出来。于是我先通过 javap RunBlockingDemo 查看了 RunBlockingDemo 类会有哪些方法,发现 suspendFuncA 被编译为了下面的样子:

final java.lang.Object suspendFuncA(kotlin.coroutines.Continuation<? super kotlin.Unit>);

然后我再使用 javap -c RunBlockingDemo 反编译得到编译之后的字节码:

suspendFuncA 方法的反编译字节码太长, 点击可展开

    Code:
       0: aload_1 // 加载第二个局部变量到栈顶, 这里的第二个局部变量就是方法的第一个参数 Continuation
       1: instanceof    #29                 // class RunBlockingDemo$suspendFuncA$1
       4: ifeq          39 //如果不是 RunBlockingDemo$suspendFuncA$1 的对象,就跳转到 39 行
       7: aload_1          //否则加载第二个局部变量到栈顶
       8: checkcast     #29                 // class RunBlockingDemo$suspendFuncA$1
      11: astore        5  //栈顶对象存入第5个局部变量,即方法的第一个参数通过了 checkcast 判断之后,存入第5个局部变量
      13: aload         5  //加载第5个局部变量到栈顶
      15: getfield      #33                 // Field RunBlockingDemo$suspendFuncA$1.label:I  // 获取 label 字段
      18: ldc           #34                 // int -2147483648 // 从常量池中加载第34个整形常量到栈顶,其值是 Integer.MIN_VALUE
      20: iand                              // 从栈中弹出两个整数,进行与运算,即将 label 和 Integer.MIN_VALUE 进行与运算
      21: ifeq          39                  // 如果结果为0,就跳转到 39 行
      24: aload         5                   // 否则加载第5个局部变量到栈顶
      26: dup                               // 复制栈顶元素
      27: getfield      #33                 // Field RunBlockingDemo$suspendFuncA$1.label:I // 加载 label 字段到栈顶
      30: ldc           #34                 // int -2147483648 // 从常量池加载数字 Integer.MIN_VALUE
      32: isub                              // label 减去 Integer.MIN_VALUE 的结果存入栈顶
      33: putfield      #33                 // Field RunBlockingDemo$suspendFuncA$1.label:I // 将栈顶元素存入 label
      36: goto          50                   // 跳转到 50 行
      39: new           #29                 // class RunBlockingDemo$suspendFuncA$1
      42: dup
      43: aload_0
      44: aload_1
      45: invokespecial #35                 // Method RunBlockingDemo$suspendFuncA$1."<init>":(LRunBlockingDemo;Lkotlin/coroutines/Continuation;)V
      48: astore        5
      50: aload         5                   // 加载第5个局部变量
      52: getfield      #39                 // Field RunBlockingDemo$suspendFuncA$1.result:Ljava/lang/Object;  // result 字段加载到栈顶
      55: astore        4                   // result 存到第4个局部变量中
      57: invokestatic  #45                 // Method kotlin/coroutines/intrinsics/IntrinsicsKt.getCOROUTINE_SUSPENDED:()Ljava/lang/Object;
      60: astore        6                   // 将  getCOUROUTINE_SUSPENDED() 的返回值存入第6个局部变量中
      62: aload         5                   // 加载第5个局部变量
      64: getfield      #33                 // Field RunBlockingDemo$suspendFuncA$1.label:I // 加载 label 字段到栈顶
      67: tableswitch   { // 0 to 1
                     0: 88          // label 为 0 就跳转到 88 行
                     1: 133         // label 为 1 就跳转到 133 行
               default: 165         // 默认跳转到 165 行
          }
      88: aload         4           // 加载 result 到栈顶
      90: invokestatic  #51                 // Method kotlin/ResultKt.throwOnFailure:(Ljava/lang/Object;)V
      93: iconst_4                  // 加载常量数字 4 到栈顶
      94: istore_2                  // 将栈顶数字(这里就是4)存到第二个局部变量中
      95: iconst_0
      96: istore_3                  // 将数字0 存到第3个局部变量中
      97: getstatic     #57                 // Field java/lang/System.out:Ljava/io/PrintStream;
     100: iload_2                   // 加载第2个局部变量,这里存的是 4
     101: invokevirtual #63                 // Method java/io/PrintStream.println:(I)V                           // 调用 println 输出栈顶的数字 4
     104: ldc2_w        #64                 // long 500l    // 加载常量池第63个长整形数字到栈顶,这里就是 500L
     107: aload         5
     109: aload         5
     111: aload_0
     112: putfield      #68                 // Field RunBlockingDemo$suspendFuncA$1.L$0:Ljava/lang/Object;  // 将 this 对象引用存储到 L$0 字段
     115: aload         5
     117: iconst_1
     118: putfield      #33                 // Field RunBlockingDemo$suspendFuncA$1.label:I             // label 赋值 1
     121: invokestatic  #74                 // Method kotlinx/coroutines/DelayKt.delay:(JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
     124: dup
     125: aload         6
     127: if_acmpne     149                 // 如果 delay 的返回值 != getCOROUTINE_SUSPEND() 的返回值,就跳转 149 行
     130: aload         6                   // 否则返回 getCOROUTINE_SUSPEND() 的返回值
     132: areturn
     133: aload         5
     135: getfield      #68                 // Field RunBlockingDemo$suspendFuncA$1.L$0:Ljava/lang/Object;
     138: checkcast     #2                  // class RunBlockingDemo
     141: astore_0
     142: aload         4
     144: invokestatic  #51                 // Method kotlin/ResultKt.throwOnFailure:(Ljava/lang/Object;)V
     147: aload         4
     149: pop
     150: iconst_5
     151: istore_2      // 5 存入第2个局部变量
     152: iconst_0
     153: istore_3      // 0 存入第3个局部变量
     154: getstatic     #57                 // Field java/lang/System.out:Ljava/io/PrintStream;
     157: iload_2
     158: invokevirtual #63                 // Method java/io/PrintStream.println:(I)V              // 输出 5
     161: getstatic     #80                 // Field kotlin/Unit.INSTANCE:Lkotlin/Unit;
     164: areturn      // return Unit.INSTANCE
     165: new           #82                 // class java/lang/IllegalStateException
     168: dup
     169: ldc           #84                 // String call to 'resume' before 'invoke' with coroutine
     171: invokespecial #87                 // Method java/lang/IllegalStateException."<init>":(Ljava/lang/String;)V
     174: athrow        // 抛出异常

下面是我根据反编译出来的字节码推测出的 RunBlockingDemo 类的 suspendFuncA 方法经过编译器处理之后的源码:

final Object suspendFuncA(Continuation<?> continuation){
    if(!continuation istanceOf RunBlockingDemo$suspendFuncA$1 || 
        (RunBlockingDemo$suspendFuncA$1)continuation.label & Integer.MIN_VALUE == 0 ){
        continuation = new RunBlockingDemo$suspendFuncA$1(this);
    }
    continuation.label = contination.label - Integer.MIN_VALUE;
    Object obj = IntrinsicsKt.getCOROUTINE_SUSPEND();
    switch(continuation.label) {
        case 0: 
            println(4);
            continuation.L$0 = this;
            continuation.label = 1;
            if(delay(500, continuation) == obj) {
                return obj;
            }
        case 1:
            ResultKt.throwOnFailure(continuation.result);
            println(5);
            return Unit.INSTANCE;
        default:
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    }
}

从源码中能得出两个有趣的地方:

  1. 例子中的 supendFuncA 是没有参数的,但是编译之后,编译器为它增加了一个 Continuation 类型的参数
  2. suspendFuncA 内部通过一个 switch-case 去调度执行不同的代码块,而 switch 的参数则是传入的 Continuation 对象的 label 字段

因此不难推测出,编译器为 suspendFuncA 方法增加的 Continuation 参数就是用来跟踪协程运行状态的对象,而且运行状态就保存在一个简单的 int 类型的 label 字段中。

再回头看看 suspendFuncA 方法是如何被调用的。

  • 首先编译器会为 suspend 方法生成一个实现了 ContinuationImpl 的包装类,然后在包装类的 invokeSuspend 方法中调用 suspendFuncA。 下面就是编译器为 suspendFuncA 方法生成的包装类(使用 JD-GUI 查看):

    class RunBlockingDemo$suspendFuncA$1 extends ContinuationImpl {
    int label;
    
    Object L$0;
    
    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
        this.result = $result;
        this.label |= Integer.MIN_VALUE;
        return RunBlockingDemo.this.suspendFuncA(this);
    }
    
    RunBlockingDemo$suspendFuncA$1(Continuation paramContinuation) {
        super(paramContinuation);
    }
    }
    

继承关系