This page looks best with JavaScript enabled

理解 WorkManager 的实现

 ·  ☕ 6 min read

最近使用 Android Jetpack 中 WorkManager 组件做了一个上报一些做分析用途数据的需求, 用着感觉挺香的. 于是想看下其内部的实现原理.

背景

使用 WorkManager 的原因在于 Android 系统在每个版本上都对 App 运行后台任务加了限制, 不同版本有不同的限制, 其目的都是为了节省用户设备的电量消耗. 参考: https://developer.android.com/guide/background .

WorkManager 的功能

https://developer.android.com/topic/libraries/architecture/workmanager

  • 当用户设备满足一些条件的情况下才执行的任务, 比如用户手机使用的是无线网络,电量充足,储存空间充足的情况下才执行某些任务.

  • 向后兼容到 API 14

    • API 23+ 使用 JobScheduler
    • API 14-22 结合 BroadcastReceiver 和 AlarmManager
    • 向下兼容
  • 可以为任务添加约束, 例如无线网络可用或充电状态下才执行

  • 可以提交一次性或定期任务

  • 可以监视和管理提交的任务

  • 可以将多个任务链接在一起(并行、串行、组合)

  • 即使应用或设备重启也能保证任务的执行

  • 类似 Doze 模式一样节电

  • 运行在非主线程

简单例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 实现 Worker
class UploadWorker(appContext: Context, workerParams: WorkerParameters) : Worker(appContext, workerParams) {

    override fun doWork(): Result {
        // 这里运行在非主线程
        uploadImages()
        // 返回任务执行成功、失败、重试、取消
        return Result.success()
    }
}

// 创建约束条件
val constraints = Constraints.Builder()
        .setRequiresBatteryNotLow(true)                 // 电量不低
        .setRequiredNetworkType(NetworkType.CONNECTED)  // 连接了网络
        .setRequiresCharging(true)                      // 充电中
        .setRequiresStorageNotLow(true)                 // 储存空间不低
        .setRequiresDeviceIdle(true)                    // 设备空闲中
        .build()

// 定义输入数据
// 输入数据通过 key value 匹配, key 为 String, value 为基本数据类型和 String
val imageData = workDataOf(Constants.KEY_IMAGE_URI to imageUriString)

// 创建请求
val request = OneTimeWorkRequestBuilder<UploadWorker>()
        .setInputData(imageData)        // 输入数据
        .setConstraints(constraints)    // 约束条件
        .setBackoffCriteria(            // 重试任务时的策略
                BackoffPolicy.LINEAR,
                OneTimeWorkRequest.MIN_BACKOFF_MILLIS,
                TimeUnit.MILLISECONDS)
        .build()

//提交任务
WorkManager.getInstance(context).enque(request);

实现原理

在实现原理上, 这里先提出几个问题:

  • 如何确保任务一定会被执行, 即使在应用重启或手机重启之后
  • 如何确保是在非主线程运行的
  • 如何去匹配我们创建请求时提交的约束的
  • 如何监控和管理任务

整体流程

流程

从上图中可以看出一个 WorkRequest 被提交之后的流程:

  1. WorkRequest 的信息会经 Internal TaskExecutor 储存到数据库
  2. 当满足约束条件时, WorkFactory 从数据库拿出 WorkRequest 的信息构造出 Worker, 然后在 Executor 中执行 Worker 的 doWork 方法.

有几个组件:

  • WorkRequest: 一个接口, 定义了 Worker 的相关信息都在这个里面, 有两个实现类 OneTimeWorkRequestPeriodicWorkRequest 分别对应一次性任务和周期任务
  • Internal TaskExecutor: WorkManager 内部的线程池, 用来执行将提交的 WorkRequest 储存到数据库的动作
  • WorkerFactory: 根据 WorkRequest 里的信息创建 Worker 实例的工厂类
  • Worker: 我们自己实现的 Woker, 例子中的 UploadWorker
  • Executor: 执行 Worker doWork 方法的线程池, 默认是调用 Executors.newFixedThreadPool 创建的线程池, 也可以自己配置这个线程池

WorkManager 的初始化

WorkManger 将会是一个单例对象, 但是从例子代码中没有看的调用初始化的地方. 通过文档可知 WorkManager 有两种初始化方式:

  1. 应用启动之后, 它自己自动初始化
  2. 按需初始化, 到了需要用到的地方才初始化. 可以避免初始化影响应用的启动速度

自动初始化

利用 ContentProvider, ContentProvider 会在应用启动之后调用 onCreate 方法, 所以自动初始化就是在 onCreate 中执行 WorkManager 的初始化

1
2
3
4
5
6
7
8
public class WorkManagerInitializer extends ContentProvider {
    @Override
    public boolean onCreate() {
        // Initialize WorkManager with the default configuration.
        WorkManager.initialize(getContext(), new Configuration.Builder().build());
        return true;
    }
}

按需初始化

因为 WorkManagerInitializer 这个 ContentProvider 会被 WorkManager 库注册到应用的 manifest.xml 中, 所以如果要使用按需初始化, 需要主动移除这个 ContentProvider 并让 Application 实现 Configuration.Provider 接口:

1
2
3
4
5
<provider
    android:name="androidx.work.impl.WorkManagerInitializer"
    android:authorities="${applicationId}.workmanager-init"
    tools:node="remove"
    android:exported="false" />
1
2
3
4
5
6
7
public final class MApp extends Application implements Configuration.Provider {

    @Override
    public Configuration getWorkManagerConfiguration() {
        return new Configuration.Builder().build();
    }
}

这样之后, 就不会在应用启动时调用 WorkManagerInitializer 的 onCreate 了. 但是每次获取实例就需要使用 getInstance(Context ctx) 带参数的那个, 因为要在 getInstance 里做初始化.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
public static @NonNull WorkManagerImpl getInstance(@NonNull Context context) {
    synchronized (sLock) {
        WorkManagerImpl instance = getInstance();
        if (instance == null) {
            Context appContext = context.getApplicationContext();
            if (appContext instanceof Configuration.Provider) {
                initialize(appContext, ((Configuration.Provider) appContext).getWorkManagerConfiguration());
                instance = getInstance(appContext);
            } else {
                
            }
        }
        return instance;
    }
}

初始化

初始化工作的 initialize 方法里:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
public static void initialize(@NonNull Context context, @NonNull Configuration configuration) {
    synchronized (sLock) {
        if (sDelegatedInstance == null) {
            context = context.getApplicationContext();
            if (sDefaultInstance == null) {
                // 创建单例
                sDefaultInstance = new WorkManagerImpl(
                        context,
                        configuration,
                        new WorkManagerTaskExecutor(configuration.getTaskExecutor()));
            }
            sDelegatedInstance = sDefaultInstance;
        }
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
public WorkManagerImpl(
        @NonNull Context context,
        @NonNull Configuration configuration,
        @NonNull TaskExecutor workTaskExecutor,
        boolean useTestDatabase) {
    Context applicationContext = context.getApplicationContext();
    // 创建数据库
    WorkDatabase database = WorkDatabase.create(applicationContext, configuration.getTaskExecutor(), useTestDatabase);
    Logger.setLogger(new Logger.LogcatLogger(configuration.getMinimumLoggingLevel()));
    // 创建 schedulers
    List<Scheduler> schedulers = createSchedulers(applicationContext, workTaskExecutor);
    // 创建 processor
    Processor processor = new Processor(
            context,
            configuration,
            workTaskExecutor,
            database,
            schedulers);
    internalInit(context, configuration, workTaskExecutor, database, schedulers, processor);
}

创建任务

通过例子中的代码能看到最终组装出来的是一个 WorkRequest 对象, 那么这个对象其实就是定义了我们要执行的任务信息, 每个组装信息都能通过 Builder 构建.

WorkRequest

WorkRequest 的 Buidler 是一个抽象类, 一次性和周期任务分别实现了自己的 Buidler.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// WorkRequest.Builder
public abstract static class Builder<B extends Builder, W extends WorkRequest> {
    boolean mBackoffCriteriaSet = false;
    UUID mId;
    WorkSpec mWorkSpec;
    Set<String> mTags = new HashSet<>();

    Builder(@NonNull Class<? extends ListenableWorker> workerClass) {
        mId = UUID.randomUUID();        // 分配一个 uuid
        mWorkSpec = new WorkSpec(mId.toString(), workerClass.getName()); // 创建 WorkSpec
        addTag(workerClass.getName());  // 传入的 Worker 的 className 作为一个 tag
    }

    public final W build() {
        W returnValue = buildInternal(); // 子类实现 buildInternal 方法
        // Create a new id and WorkSpec so this WorkRequest.Builder can be used multiple times.
        mId = UUID.randomUUID();
        mWorkSpec = new WorkSpec(mWorkSpec);
        mWorkSpec.id = mId.toString();
        return returnValue;
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// OneTimeWorkRequest.Builder
public static final class Builder extends WorkRequest.Builder<Builder, OneTimeWorkRequest> {

    public Builder(@NonNull Class<? extends ListenableWorker> workerClass) {
        super(workerClass);
        // 为 inputMergerClassName 单独赋个值
        mWorkSpec.inputMergerClassName = OverwritingInputMerger.class.getName();
    }

    @Override
    OneTimeWorkRequest buildInternal() {
        if (mBackoffCriteriaSet
                && Build.VERSION.SDK_INT >= 23
                && mWorkSpec.constraints.requiresDeviceIdle()) {
            throw new IllegalArgumentException("Cannot set backoff criteria on an idle mode job");
        }
        return new OneTimeWorkRequest(this);
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// PeriodicWorkRequest.Builder
public static final class Builder extends WorkRequest.Builder<Builder, PeriodicWorkRequest> {
    public Builder(
            @NonNull Class<? extends ListenableWorker> workerClass,
            long repeatInterval,
            @NonNull TimeUnit repeatIntervalTimeUnit) {
        super(workerClass);
        // 周期任务的间隔执行时间
        mWorkSpec.setPeriodic(repeatIntervalTimeUnit.toMillis(repeatInterval));
    }

    PeriodicWorkRequest buildInternal() {
        if (mBackoffCriteriaSet
                && Build.VERSION.SDK_INT >= 23
                && mWorkSpec.constraints.requiresDeviceIdle()) {
            throw new IllegalArgumentException("Cannot set backoff criteria on an idle mode job");
        }
        return new PeriodicWorkRequest(this);
    }
}

WorkSpec

查看 WorkSpec 的定义能发现其利用 Room 实现的数据库字段和类字段的映射

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
public class WorkSpec {
    @ColumnInfo(name = "id") @PrimaryKey @NonNull public String id;
    @ColumnInfo(name = "state") @NonNull public WorkInfo.State state = ENQUEUED;
    @ColumnInfo(name = "worker_class_name") @NonNull public String workerClassName;
    @ColumnInfo(name = "input_merger_class_name") public String inputMergerClassName;
    @ColumnInfo(name = "input") @NonNull public Data input = Data.EMPTY;
    @ColumnInfo(name = "output") @NonNull public Data output = Data.EMPTY;
    @ColumnInfo(name = "initial_delay") public long initialDelay;
    @ColumnInfo(name = "interval_duration") public long intervalDuration;
    @ColumnInfo(name = "flex_duration") public long flexDuration;
    @Embedded @NonNull public Constraints constraints = Constraints.NONE;
    @ColumnInfo(name = "run_attempt_count") @IntRange(from = 0) public int runAttemptCount;
    @ColumnInfo(name = "backoff_policy") @NonNull public BackoffPolicy backoffPolicy = BackoffPolicy.EXPONENTIAL;
    @ColumnInfo(name = "backoff_delay_duration") public long backoffDelayDuration = WorkRequest.DEFAULT_BACKOFF_DELAY_MILLIS;
}

在创建任务的时候, 每个任务会:

  • 分配一个 uuid
  • 创建一个 WorkSpec
  • 添加 Worker 的类名为 tag
  • 一次性任务设置 inputMergerClassName
  • 周期任务设置周期执行的间隔时间

提交任务

看下第一个问题: 如何确保任务一定会被执行, 即使在应用重启或手机重启之后.

在应用重启或手机重启之后依然能执行提交的任务, 那说明任务肯定需要存储到磁盘的. 看流程图中也能看出有数据库的参与, 带着这个猜想看下源码. 任务的提交从调用 enqueue 开始:

1
2
WorkManager.getInstance(context).enqueue(request);
// getInstance 返回的是 WorkManagerImpl 单列对象
1
2
3
4
5
6
7
8
//WorkManager#enqueue(WorkRequest)
public final Operation enqueue(@NonNull WorkRequest workRequest) {
    // 单个 workReeust 会被装到 List 中, 后续方法都接收 List<WorkReuest>
    return enqueue(Collections.singletonList(workRequest));
}

//最终调用到抽象方法 enqueue, 也就是调用 WorkManagerImpl 的实现
public abstract Operation enqueue(@NonNull List<? extends WorkRequest> requests);
1
2
3
4
5
6
7
8
//WorkManagerImpl#enqueue
public Operation enqueue(List<? extends WorkRequest> workRequests) {
    if (workRequests.isEmpty()) {
        throw new IllegalArgumentException("enqueue needs at least one WorkRequest.");
    }
    // 后续工作交给 WorkContinuationImpl
    return new WorkContinuationImpl(this, workRequests).enqueue();
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
//WorkContinuationImpl#enqueue
public Operation enqueue() {
    // 防止重复提交, mEnqueued 会在 EnqueueRunnable 中被标记为 true
    if (!mEnqueued) {
        EnqueueRunnable runnable = new EnqueueRunnable(this);
        // EnqueueRunnable 被提交到线程池执行
        mWorkManagerImpl.getWorkTaskExecutor().executeOnBackgroundThread(runnable);
        mOperation = runnable.getOperation();
    } else {
        Logger.get().warning(TAG, String.format("Already enqueued work ids (%s)", TextUtils.join(", ", mIds)));
    }
    return mOperation;
}

EnqueueRunnable

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
//EnqueueRunnable#run
public void run() {
    try {
        // 判断 Worker 是否有循环引用
        if (mWorkContinuation.hasCycles()) {
            throw new IllegalStateException(String.format("WorkContinuation has cycles (%s)", mWorkContinuation));
        }
        // 储存到数据库, 并返回是否需要执行
        boolean needsScheduling = addToDatabase();
        if (needsScheduling) {
            // 需要执行:
            // 启用 RescheduleReceiver: 一个广播接收器
            final Context context = mWorkContinuation.getWorkManagerImpl().getApplicationContext();
            PackageManagerHelper.setComponentEnabled(context, RescheduleReceiver.class, true);
            // 在后台执行 work
            scheduleWorkInBackground();
        }
        mOperation.setState(Operation.SUCCESS);
    } catch (Throwable exception) {
        mOperation.setState(new Operation.State.FAILURE(exception));
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
//EnqueueRunnable#addToDatabase
public boolean addToDatabase() {
    WorkManagerImpl workManagerImpl = mWorkContinuation.getWorkManagerImpl();
    WorkDatabase workDatabase = workManagerImpl.getWorkDatabase();
    // 开始一个事务
    workDatabase.beginTransaction();
    try {
        boolean needsScheduling = processContinuation(mWorkContinuation);
        workDatabase.setTransactionSuccessful();
        return needsScheduling;
    } finally {
        // 关闭事务
        workDatabase.endTransaction();
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// addToDatabase 经过几个调用
// EnqueueRunnable#processContinuation
// EnqueueRunnable#enqueueContinuation
// 最终会走到: EnqueueRunnable#enqueueWorkWithPrerequisites
private static boolean enqueueWorkWithPrerequisites(
    WorkManagerImpl workManagerImpl,
    @NonNull List<? extends WorkRequest> workList,
    String[] prerequisiteIds,
    String name,
    ExistingWorkPolicy existingWorkPolicy) {
    boolean needsScheduling = false;
    // prerequisiteIds 这里会为空, 与 WorkContinuation 的构造函数的 parent 参数有关
    //...
    for (WorkRequest work : workList) {
        // 获取 WorkRequest 中的 WorkSpec
        WorkSpec workSpec = work.getWorkSpec();
        if (hasPrerequisite && !hasCompletedAllPrerequisites) {
            //...
        } else {
            if (!workSpec.isPeriodic()) {
                // 如果是周期任务, 则将周期执行的开始时间设为当前时间
                workSpec.periodStartTime = currentTimeMillis;
            } else {
                workSpec.periodStartTime = 0L;
            }
        }
        // 在 Api 23~35 将带有电量和存储空间约束的 Worker 代理给 ConstraintTrackingWorker
        if (Build.VERSION.SDK_INT >= WorkManagerImpl.MIN_JOB_SCHEDULER_API_LEVEL
                && Build.VERSION.SDK_INT <= 25) {
            tryDelegateConstrainedWorkSpec(workSpec);
        } else if (Build.VERSION.SDK_INT <= WorkManagerImpl.MAX_PRE_JOB_SCHEDULER_API_LEVEL
                && usesScheduler(workManagerImpl, Schedulers.GCM_SCHEDULER)) {
            tryDelegateConstrainedWorkSpec(workSpec);
        }
        if (workSpec.state == ENQUEUED) {
            needsScheduling = true;
        }
        // 将 WorkSpace 插入到 WorkSpec 表
        workDatabase.workSpecDao().insertWorkSpec(workSpec);
        for (String tag : work.getTags()) {
            // 将 tag 和 worker 的 uuid 绑定, 插入到 WorkTag 表
            workDatabase.workTagDao().insert(new WorkTag(tag, work.getStringId()));
        }
        if (isNamed) {
            // 如果是命名 Worker, 将 name 和 worker 的 uuid 绑定, 插入到 WorkName 表
            workDatabase.workNameDao().insert(new WorkName(name, work.getStringId()));
        }
    }
    return needsScheduling;
}
DB 表 WorkSpec WorkTag

整体流程图

执行任务

//TODO


Yang
WRITTEN BY
Yang
Developer