作业与 Awaitable 桥接
Valkarn Tasks 提供了一组桥接类型,将 Unity 的作业系统和 Awaitable API 连接到 ValkarnTask 管道。每个桥接都是一个薄薄的、最小化分配的层;没有任何隐藏的魔法。
此处描述的所有类型都在命名空间 UnaPartidaMas.Valkarn.Tasks.Bridge 中,并由 #if UNITY_5_3_OR_NEWER(或 #if UNITY_2023_1_OR_NEWER 用于 Awaitable 支持)守卫。
JobHandleExtensions——等待单个 JobHandle
最简单的桥接。在任何 JobHandle 上调用 .ToValkarnTask() 以获得一个在作业完成时完成的 ValkarnTask。
public static ValkarnTask ToValkarnTask(
this JobHandle handle,
PlayerLoopTiming timing = PlayerLoopTiming.Update,
CancellationToken cancellationToken = default)
工作原理
- 快速路径。 如果
handle.IsCompleted已经为 true,则立即调用handle.Complete()并返回ValkarnTask.CompletedTask——零分配,不注册 PlayerLoop。 - 普通路径。 租用一个池化的
JobHandlePromise,在给定的timing在 PlayerLoop 上注册,并包装在ValkarnTask中返回。每帧MoveNext()调用JobHandle.ScheduleBatchedJobs()(在编辑模式和批量模式下刷新作业队列)然后检查handle.IsCompleted。当 handle 完成时,promise 完成任务并将自身返回对象池。 - 取消。 如果
CancellationToken触发,handle 被强制完成(始终调用handle.Complete()以防止作业系统泄漏)并且任务转换到已取消状态。
基本用法
using UnaPartidaMas.Valkarn.Tasks;
using UnaPartidaMas.Valkarn.Tasks.Bridge;
using Unity.Jobs;
// 调度作业并立即等待它。
var handle = myJob.Schedule();
await handle.ToValkarnTask();
// 使用非默认时机和取消。
await handle.ToValkarnTask(PlayerLoopTiming.PreUpdate, destroyCancellationToken);
JobHandleWhenAll——并行等待多个 JobHandle
当你需要调度几个独立的作业并仅在所有作业完成后才恢复时,使用 JobHandleExtensions.WhenAll。
// 最简重载:在 Update 时机等待所有 handle。
public static ValkarnTask WhenAll(params JobHandle[] handles)
// 完整重载:可配置时机和取消。
public static ValkarnTask WhenAll(
JobHandle[] handles,
PlayerLoopTiming timing,
CancellationToken cancellationToken = default)
// 完整重载的扩展方法别名。
public static ValkarnTask ToValkarnTask(
this JobHandle[] handles,
PlayerLoopTiming timing = PlayerLoopTiming.Update,
CancellationToken cancellationToken = default)
工作原理
- 快速路径。 如果数组中的每个 handle 都已完成,所有 handle 被最终化,立即返回
ValkarnTask.CompletedTask。 - 空数组。 返回
ValkarnTask.CompletedTask。 - 普通路径。 创建一个池化的
JobHandleArrayPromise。内部从ArrayPool<JobHandle>.Shared租用一个JobHandle[](避免每次调用的堆分配),复制输入的 handle,并在 PlayerLoop 上注册。每帧使用紧凑的交换收缩循环仅迭代仍待处理的 handle,并调用JobHandle.ScheduleBatchedJobs()以保持工作线程运行。 - 取消。 所有剩余 handle 被强制完成,任务被取消。
用法
var handleA = jobA.Schedule();
var handleB = jobB.Schedule();
var handleC = jobC.Schedule();
// 等待所有三个。
await JobHandleExtensions.WhenAll(handleA, handleB, handleC);
// 或使用数组上的扩展方法。
JobHandle[] handles = { handleA, handleB, handleC };
await handles.ToValkarnTask();
TempNativeArrayScope——跨 await 点的 NativeArray 生命周期
问题
使用 Allocator.TempJob 分配的 NativeArray<T> 生命周期很短。如果你分配一个数组,调度作业,await 作业 handle,然后忘记释放数组,Unity 的安全系统会报告内存泄漏。在长异步方法中使用普通的 try/finally 可以工作,但容易出错。
TempNativeArrayScope<T> 是一个包装 NativeArray<T> 的结构体,在作用域结束时使用 using 语句释放它——RAII 模式应用于原生内存。
API
public struct TempNativeArrayScope<T> : IDisposable where T : struct
{
// 访问包装的数组。如果已释放则抛出 ObjectDisposedException。
public NativeArray<T> Array { get; }
// 如果作用域未被释放且数组已创建,则为 true。
public bool IsCreated { get; }
// 使用 Allocator.TempJob 分配新的 NativeArray<T> 并取得所有权。
public static TempNativeArrayScope<T> Create(int length);
// 取得已分配的 NativeArray<T> 的所有权。
public static TempNativeArrayScope<T> Wrap(NativeArray<T> existing);
// 释放数组。幂等:多次调用是安全的。
public void Dispose();
}
// 非泛型助手(类型推断便利)。
public static class TempNativeArrayScope
{
public static TempNativeArrayScope<T> Create<T>(int length) where T : struct;
public static TempNativeArrayScope<T> Wrap<T>(NativeArray<T> existing) where T : struct;
}
Dispose 使用普通 int 标志而不是 Interlocked,因为作用域设计为通过 using var 在主线程上单线程使用。
用法
using UnaPartidaMas.Valkarn.Tasks.Bridge;
using Unity.Collections;
using Unity.Jobs;
async ValkarnTask ProcessDataAsync(int count, CancellationToken ct)
{
// using 语句保证在作用域退出时调用 Dispose(),
// 无论是正常完成、异常还是取消。
using var inputScope = TempNativeArrayScope.Create<float>(count);
using var outputScope = TempNativeArrayScope.Create<float>(count);
NativeArray<float> input = inputScope.Array;
NativeArray<float> output = outputScope.Array;
// 填充输入,调度作业。
var job = new MyProcessingJob { Input = input, Output = output };
var handle = job.Schedule(count, 64);
// 不阻塞主线程地等待。
// NativeArray 保持有效——作业仍在运行。
await handle.ToValkarnTask(cancellationToken: ct);
// 作业完成。在这里读取结果。
float total = 0f;
for (int i = 0; i < count; i++)
total += output[i];
UnityEngine.Debug.Log($"总和: {total}");
// inputScope.Dispose() 和 outputScope.Dispose() 在这里自动运行。
}
你也可以包装已分配的数组:
var existing = new NativeArray<int>(1024, Allocator.TempJob);
using var scope = TempNativeArrayScope.Wrap(existing);
// scope 拥有 existing 并将释放它。
常见陷阱:没有作用域的 NativeArray 生命周期
这种模式是错误的,会导致安全系统错误:
// 错误:如果发生异常,数组可能超出作业的生命周期或泄漏。
var array = new NativeArray<float>(1024, Allocator.TempJob);
var handle = job.Schedule(1024, 64);
await handle.ToValkarnTask(); // 挂起点——数组必须保持存活
array.Dispose(); // 如果上面发生异常则永远不会到达
使用 TempNativeArrayScope 或 try/finally 来保证在所有代码路径上都执行释放。
AwaitableBridge——将 Unity Awaitable 转换为 ValkarnTask
AwaitableBridge 提供扩展方法,用于将 Unity 的 Awaitable 和 Awaitable<T> 类型(自 Unity 2023.1 起可用)转换为 ValkarnTask 兼容的等待器。
注意: Awaitable 也有自己的 GetAwaiter()。由于 C# 重载解析始终优先选择实例方法而不是扩展方法,在 async ValkarnTask 方法内写 await myAwaitable 已经可以正确工作——Unity 的等待器实现了 ICriticalNotifyCompletion,ValkarnTask 构建器接受它。只有当你想将 Awaitable 传给组合器(ValkarnTask.WhenAll、ValkarnTask.WhenAny)或将其存储为 ValkarnTask 变量时,才需要 .AsValkarnTask() 扩展方法。
此文件由 #if UNITY_2023_1_OR_NEWER 守卫。
API
// 将 Awaitable 转换为 ValkarnTask 兼容的等待器。
public static AwaitableValkarnTaskAwaiter AsValkarnTask(this Awaitable awaitable)
// 将 Awaitable<T> 转换为 ValkarnTask 兼容的等待器。
public static AwaitableValkarnTaskAwaiter<T> AsValkarnTask<T>(this Awaitable<T> awaitable)
两个等待器都实现了 ICriticalNotifyCompletion,跳过了 ExecutionContext 捕获。它们将 IsCompleted、GetResult 和 OnCompleted 直接委托给包装的 Unity 等待器。
用法
using UnaPartidaMas.Valkarn.Tasks.Bridge;
using UnityEngine;
// 直接等待——在 async ValkarnTask 方法中无需转换。
async ValkarnTask DirectExample()
{
await Awaitable.NextFrameAsync(); // 无需转换。
await Awaitable.WaitForSecondsAsync(1f);
}
// 显式转换——用于组合器和存储时需要。
async ValkarnTask CombinatorExample()
{
ValkarnTask a = Awaitable.NextFrameAsync().AsValkarnTask();
ValkarnTask b = Awaitable.WaitForSecondsAsync(2f).AsValkarnTask();
await ValkarnTask.WhenAll(a, b);
}
// 带结果类型的泛型版本。
async ValkarnTask<Texture2D> LoadTextureExample(string path)
{
Awaitable<Texture2D> loadOp = LoadTextureAsync(path);
return await loadOp.AsValkarnTask();
}
JobBridge——源码生成的包装器
JobBridge.cs 定义了 JobPromise<TJob>,这是源码生成器使用的泛型池化 promise 类型。它是一个实现细节;你通常不会自己实例化它。
// 每帧轮询 JobHandle。由源码生成的 ScheduleAsync 方法使用。
public sealed class JobPromise<TJob> : ValkarnTask.ISource, IPlayerLoopItem, IPoolNode<JobPromise<TJob>>
where TJob : struct
{
public static JobPromise<TJob> Create(JobHandle handle, CancellationToken ct, out uint token);
}
行为与 JobHandlePromise(参见 JobHandleExtensions)相同,只是它对作业类型是泛型的,以实现对象池隔离——每种作业类型都有自己的对象池。
源码生成器:JobBridgeGenerator
JobBridgeGenerator 是一个 Roslyn 增量源码生成器(类 UnaPartidaMas.Valkarn.Tasks.SourceGen.Generators.JobBridgeGenerator),自动为你的作业类型生成 ScheduleAsync 扩展方法。
检测内容
生成器扫描编译中所有实现以下之一的公共结构体:
Unity.Jobs.IJobUnity.Jobs.IJobParallelForUnity.Jobs.IJobFor
私有和内部作业结构体被跳过。如果结构体嵌套在非公共类型中,也会被跳过。
如果 UnaPartidaMas.Valkarn.Tasks.ValkarnTask 在编译中未找到,生成器不执行任何操作,因此在不引用 Valkarn Tasks 的程序集中是安全的。
生成内容
输出文件是 ValkarnTask.JobBridge.Generated.g.cs。对于每个检测到的作业类型,它生成一个 public static class __<TypeName>_AsyncExt,包含:
| 作业接口 | 生成的方法签名 |
|---|---|
IJob | public static ValkarnTask ScheduleAsync(this ref MyJob job, CancellationToken ct = default) |
IJobParallelFor | public static ValkarnTask ScheduleAsync(this ref MyJob job, int arrayLength, int innerLoopBatchCount, CancellationToken ct = default) |
IJobFor | public static ValkarnTask ScheduleAsync(this ref MyJob job, int arrayLength, CancellationToken ct = default) |
IJobFor | public static ValkarnTask ScheduleParallelAsync(this ref MyJob job, int arrayLength, int innerLoopBatchCount, CancellationToken ct = default) |
每个生成的方法使用标准 Unity 扩展方法调度作业,然后将生成的 JobHandle 包装在 JobPromise<TJob> 中并返回一个 ValkarnTask。
对于嵌套类型(例如,外部类中的作业结构体),生成的类名使用下划线:__Outer_Inner_AsyncExt。
生成方法的用法
// IJob 示例
public struct MyCalculationJob : IJob
{
public NativeArray<float> Data;
public void Execute() { /* ... */ }
}
// 生成器产生:
// public static ValkarnTask ScheduleAsync(this ref MyCalculationJob job, CancellationToken ct = default)
async ValkarnTask RunCalculation(CancellationToken ct)
{
using var scope = TempNativeArrayScope.Create<float>(1024);
var job = new MyCalculationJob { Data = scope.Array };
await job.ScheduleAsync(ct); // 生成的扩展方法
// 在这里从 scope.Array 读取结果。
}
// IJobParallelFor 示例
public struct MyParallelJob : IJobParallelFor
{
public NativeArray<float> Input;
public NativeArray<float> Output;
public void Execute(int index) { Output[index] = Input[index] * 2f; }
}
async ValkarnTask RunParallel(int length, CancellationToken ct)
{
using var inputScope = TempNativeArrayScope.Create<float>(length);
using var outputScope = TempNativeArrayScope.Create<float>(length);
var job = new MyParallelJob { Input = inputScope.Array, Output = outputScope.Array };
await job.ScheduleAsync(length, innerLoopBatchCount: 64, ct);
}
源码生成器:AwaitableBridgeGenerator
AwaitableBridgeGenerator 检测编译中是否存在 UnityEngine.Awaitable 和 UnityEngine.Awaitable<T>,并在存在时生成 AsValkarnTask() 扩展方法。
输出文件是 ValkarnTask.AwaitableBridge.Generated.g.cs。生成的代码位于类 AwaitableBridgeExtensions 下的 namespace UnaPartidaMas.Valkarn.Tasks.Bridge 中。
生成的方法:
// 当找到 UnityEngine.Awaitable 时生成:
public static async ValkarnTask AsValkarnTask(this Awaitable awaitable)
{
await awaitable;
}
// 当找到 UnityEngine.Awaitable<T> 时生成:
public static async ValkarnTask<T> AsValkarnTask<T>(this Awaitable<T> awaitable)
{
return await awaitable;
}
这些是 async ValkarnTask 方法,因此它们经过 Valkarn Tasks 的池化异步构建器——在预热的对象池上是零分配的。
生成器有守卫:如果 ValkarnTask 不在编译中,不会生成任何代码。这防止了在引用 Unity 但不引用 Valkarn Tasks 的程序集中出现 CS0246 错误。
完整工作示例:ECS 系统中的作业桥接
以下来自 Samples~/ECS/JobBridgeExample.cs,展示了从 ISystem 调度 Burst 并行作业、不阻塞地等待它并写回结果的完整模式。
#if UNITY_5_3_OR_NEWER && VTASKS_HAS_ENTITIES
using System.Threading;
using Unity.Burst;
using Unity.Collections;
using Unity.Entities;
using Unity.Jobs;
using UnaPartidaMas.Valkarn.Tasks;
using UnaPartidaMas.Valkarn.Tasks.Bridge;
using UnaPartidaMas.Valkarn.Tasks.ECS;
public partial struct JobBridgeExample : ISystem
{
public void OnCreate(ref SystemState state)
{
state.RequireForUpdate<HealthData>();
}
public void OnUpdate(ref SystemState state)
{
var worldCt = state.World.GetWorldCancellationToken();
// 在 OnUpdate 内同步提取所有实体数据。
// 异步方法不能有 ref 参数(CS1988),因此数据
// 必须在这里复制并按值传递给异步方法。
var query = SystemAPI.QueryBuilder().WithAll<HealthData>().Build();
var entityCount = query.CalculateEntityCount();
if (entityCount == 0) return;
var entities = query.ToEntityArray(Allocator.TempJob);
var healthArray = query.ToComponentDataArray<HealthData>(Allocator.TempJob);
var results = new NativeArray<float>(entityCount, Allocator.TempJob);
// 异步方法取得 NativeArray 的所有权并释放它们。
ProcessHealthAsync(state.EntityManager, entities, healthArray, results, worldCt).Forget();
state.Enabled = false;
}
public void OnDestroy(ref SystemState state) { }
static async ValkarnTask ProcessHealthAsync(
EntityManager entityManager,
NativeArray<Entity> entities,
NativeArray<HealthData> healthArray,
NativeArray<float> results,
CancellationToken ct)
{
try
{
// 阶段 1:调度 Burst 作业。
var job = new HealthProcessingJob
{
HealthInputs = healthArray,
ProcessedOutputs = results,
};
var handle = job.Schedule(entities.Length, batchSize: 64);
// 阶段 2:不阻塞主线程地等待完成。
await handle.ToValkarnTask(cancellationToken: ct);
// 阶段 3:应用结果。我们回到主线程了。
ct.ThrowIfCancellationRequested();
for (int i = 0; i < entities.Length; i++)
{
// 作业运行期间实体可能已被销毁。
if (!entityManager.SafeEntityExists(entities[i]))
continue;
entityManager.SetComponentData(entities[i], new HealthData
{
CurrentHealth = results[i],
});
}
}
finally
{
// 始终释放 NativeArray——在成功、异常或取消时运行。
if (entities.IsCreated) entities.Dispose();
if (healthArray.IsCreated) healthArray.Dispose();
if (results.IsCreated) results.Dispose();
}
}
[BurstCompile]
struct HealthProcessingJob : IJobParallelFor
{
[ReadOnly] public NativeArray<HealthData> HealthInputs;
[WriteOnly] public NativeArray<float> ProcessedOutputs;
public void Execute(int index)
{
var h = HealthInputs[index];
var newHealth = h.CurrentHealth + h.RegenRate;
if (newHealth > h.MaxHealth) newHealth = h.MaxHealth;
ProcessedOutputs[index] = newHealth;
}
}
struct HealthData : IComponentData
{
public float CurrentHealth;
public float MaxHealth;
public float RegenRate;
}
}
#endif
桥接类型摘要
| 类型 | 用途 | 分配情况 |
|---|---|---|
JobHandleExtensions.ToValkarnTask() | 等待单个 JobHandle | 快速路径零分配;否则池化 promise |
JobHandleExtensions.WhenAll() | 并行等待多个 JobHandle | 快速路径零分配;否则池化 promise + ArrayPool 租用 |
TempNativeArrayScope<T> | NativeArray 的 RAII 生命周期管理 | 无(结构体) |
AwaitableBridge.AsValkarnTask() | 将 Awaitable/Awaitable<T> 转换为 ValkarnTask | 无(结构体等待器) |
生成的 ScheduleAsync() | 直接等待类型化作业 | 池化 JobPromise<TJob> |