メインコンテンツまでスキップ

チャネル

チャネルはプロデューサーとコンシューマーの間でデータを受け渡すための、スレッドセーフで非同期対応のパイプラインを提供します。バックグラウンドスレッド、イベントコールバック、ジョブ完了など一方から作業が生成され、メインスレッドやワーカープールなど別の場所で消費されるゲームシステムに特に適しています。

チャネルの作成

チャネルはValkarnTask.Channel静的ファクトリークラスを通じて作成されます。パブリックコンストラクターはありません。

無界チャネル

Channel<T> channel = ValkarnTask.Channel.CreateUnbounded<T>(bool multiConsumer = false);

無界チャネルには容量制限がありません。チャネルが完了していない限り、WriteAsyncTryWriteは常に即座に成功します。コンシューマーが読み取るまで、アイテムは内部のQueue<T>に蓄積されます。

パラメーター

パラメーターデフォルト説明
multiConsumerfalsetrueの場合、複数の並行ReadAsync呼び出しをサポート(競合コンシューマー)。falseの場合、一度に1つのリーダーのみがReadAsyncをawaitできます。

有界チャネル

Channel<T> channel = ValkarnTask.Channel.CreateBounded<T>(int capacity, bool multiConsumer = false);

有界チャネルは固定サイズのリングバッファーに最大capacity個のアイテムを保持します。バッファーが満杯の場合、WriteAsyncはスペースが空くまで呼び出しコードを非同期サスペンドします(バックプレッシャー)。TryWriteは満杯の時に待機せず即座にfalseを返します。

パラメーター

パラメーターデフォルト説明
capacity必須バッファーが保持できるアイテムの最大数。ゼロより大きくなければなりません。
multiConsumerfalse無界と同様 — 複数の並行リーダーを有効にします。

有界と無界の選択

バックプレッシャーを適用したい場合、つまりコンシューマーが遅れた場合にプロデューサーを自動的に遅くしたい場合はCreateBoundedを使用してください。プロデューサーのレートが自然に制限されている場合(入力イベントなど)や、メモリ成長を既に考慮している場合はCreateUnboundedを使用してください。


Channel<T>

Channel<T>はパイプラインの2つの側面を別々のオブジェクトとして公開するコンテナです。

public sealed class Channel<T>
{
public ChannelReader<T> Reader { get; }
public ChannelWriter<T> Writer { get; }
}

プロデューサー側にWriter参照を、コンシューマー側にReader参照を保持してください。同じスレッドである必要はありません。


ChannelWriter<T>

ChannelWriter<T>はチャネルの書き込み側です。channel.Writerから取得してください。

TryWrite

public abstract bool TryWrite(T item);

サスペンドせずにアイテムの書き込みを試みます。アイテムが受け入れられた場合trueを返します;チャネルが満杯(有界)または完了した場合falseを返します。

アイテムを破棄できるホットパスで使用するか、ループでポーリングして非同期ステートマシンからのアロケーションを避けたい場合に使用します。

if (!channel.Writer.TryWrite(item))
{
// チャネルが満杯またはクローズ済み — 適切に処理
}

WriteAsync

public abstract ValkarnTask WriteAsync(T item);

必要に応じて呼び出し元を非同期サスペンドしながらチャネルにアイテムを書き込みます。

  • 無界チャネル:チャネルがオープンである限り常に同期的に完了(ゼロアロケーション高速パス)。
  • 有界チャネル:バッファーにスペースがある場合は同期的に完了;バッファーが満杯の場合は呼び出し元をサスペンドし、コンシューマーがアイテムを読み取ってスロットを解放すると再開されます。

Complete()が書き込みの前または最中に呼ばれている場合はChannelClosedExceptionをスローします。

await channel.Writer.WriteAsync(item);

複数のプロデューサーが有界チャネルでWriteAsyncを並行して呼び出すことができます。各サスペンドされたライターはFIFO順でキューされ、スペースが空くとアンブロックされます。

Complete

public abstract void Complete();

これ以上アイテムが書き込まれないことをシグナルします。Complete()が呼ばれた後:

  • 既に書き込まれたアイテムはバッファーに残り、消費できます。
  • WriteAsyncまたはTryWriteの新しい呼び出しはChannelClosedExceptionで失敗します。
  • バッファーが完全にドレインされると、ChannelReader<T>.Completionが完了し、保留中または将来のReadAsync呼び出しはChannelClosedExceptionをスローします。

Complete()は単一呼び出しに対してべき等です — 複数回呼び出しても安全(後続の呼び出しは無視されます)。

// 作業の終了をシグナル
channel.Writer.Complete();

ChannelReader<T>

ChannelReader<T>はチャネルの読み取り側です。channel.Readerから取得してください。

ReadAsync

public abstract ValkarnTask<T> ReadAsync();

チャネルから次のアイテムを読み取ります。現在アイテムが利用できない場合、呼び出し元は到着するまで非同期サスペンドされます。チャネルが完了かつ完全にドレインされた場合、ChannelClosedExceptionをスローします。

T item = await channel.Reader.ReadAsync();

シングルコンシューマーモード(デフォルト):一度に1つのReadAsyncのみがインフライトになれます。2つ目の並行ReadAsyncを開始しようとすると即座にスローします。この制限により内部のゼロアロケーション最適化が可能になります — リーダーコアは呼び出しごとのプールレンタルなしに、チャネル実装に直接埋め込まれます。

マルチコンシューマーモードmultiConsumer: true):任意の数のReadAsync呼び出しが同時に保留できます。各保留中の呼び出し元はキューされ、アイテムが利用可能になるとFIFO順で解決されます。

TryRead

public abstract bool TryRead(out T item);

サスペンドせずにアイテムの読み取りを試みます。アイテムが利用可能な場合trueを返してitemを設定します;チャネルが空の場合falseを返します(itemdefaultに設定)。

TryReadは空だが開いているチャネルと空でクローズされたチャネルを区別しません。ポーリングループでTryReadを使用している場合、クローズ状態を検出するにはCompletionを使用してください。

ReadAllAsync

public IAsyncEnumerable<T> ReadAllAsync(CancellationToken cancellationToken = default);

チャネルが完了してドレインされるまですべてのアイテムを反復するIAsyncEnumerable<T>を返します。列挙はChannelClosedExceptionを伝播させずにクリーンに終了します。

await foreach (var item in channel.Reader.ReadAllAsync(ct))
{
Process(item);
}
// チャネルが完了して空の場合にここに到達

ReadAllAsyncに渡されたキャンセルトークンはフォールバックとして使用されます。GetAsyncEnumeratorにもトークンが渡される場合(await foreachWithCancellationで行うように)、foreach側のトークンが優先されます。

Completion

public abstract ValkarnTask Completion { get; }

Complete()が呼ばれた後にチャネルが完全にドレインされると完了するValkarnTask。具体的には:

  • Complete()が既に空のチャネルで呼ばれた場合、Completionは即座に解決されます。
  • Complete()がバッファーにアイテムが残っている間に呼ばれた場合、最後のアイテムが消費されてからCompletionが解決されます。

Completionをawaitすることがパイプラインの終了を待つ標準的な方法です。

channel.Writer.Complete();
await channel.Reader.Completion;
// すべてのアイテムが消費された

ChannelClosedException

public sealed class ChannelClosedException : InvalidOperationException

2つの状況でスローされます:

  1. 完了してドレインされたチャネルからの読み取り — チャネルが完了とマークされ、アイテムが残っていない場合にReadAsync()がスローします。
  2. 完了したチャネルへの書き込み — 書き込みの前にComplete()が呼ばれた場合にWriteAsync()がスローします。

ChannelClosedExceptionInvalidOperationExceptionから継承します。TryReadまたはTryWriteではスローされません;代わりにfalseを返します。

コンストラクター:

new ChannelClosedException()
new ChannelClosedException(string message)
new ChannelClosedException(Exception innerException)

有界チャネル:バックプレッシャーの詳細

有界チャネルのバッファーが満杯でWriteAsyncが呼ばれると、ライターはサスペンドされ、保留ライターレコードが内部にエンキューされます。ライターはアイテムを保持します。コンシューマーがReadAsyncまたはTryReadを呼び出してアイテムをデキューすると:

  1. 解放されたスロットは即座に最も古い保留ライターにクレームされます。
  2. そのライターのアイテムがバッファーに配置されます。
  3. ライターのawaitコードが再開されます。

これは満杯の有界チャネルがアイテムを失ったりバッファー容量を無駄にしたりしないことを意味します — スロットが空くことと、ブロックされたライターが再開されることの間には常に一対一の対応があります。バッファーが空の状態で保留ライターが待機中にリーダーが到着した場合、アイテムはバッファーに触れることなく直接ハンドオフされます。


パターン

基本的なプロデューサー/コンシューマー

var channel = ValkarnTask.Channel.CreateUnbounded<WorkItem>();

// プロデューサー(例:バックグラウンドスレッドやコールバックで実行)
async ValkarnTask ProduceAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
var work = await FetchNextWorkItemAsync(ct);
await channel.Writer.WriteAsync(work);
}
channel.Writer.Complete();
}

// コンシューマー(呼び出し先を選択して実行)
async ValkarnTask ConsumeAsync()
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
await ProcessAsync(item);
}
}

複数プロデューサー、シングルコンシューマー

複数のプロデューサーがそれぞれchannel.Writerへの参照を保持し、WriteAsyncを並行して呼び出すことができます。すべてのチャネル操作は内部ロックで保護されているため、これは安全です。

var channel = ValkarnTask.Channel.CreateBounded<Event>(capacity: 64);

// 複数のプロデューサーが並行して書き込む
ValkarnTask ProducerA() => ProduceFrom(sourceA, channel.Writer);
ValkarnTask ProducerB() => ProduceFrom(sourceB, channel.Writer);
ValkarnTask ProducerC() => ProduceFrom(sourceC, channel.Writer);

// シングルコンシューマー(デフォルト — multiConsumerフラグ不要)
async ValkarnTask ConsumerAsync()
{
await foreach (var ev in channel.Reader.ReadAllAsync())
HandleEvent(ev);
}

複数プロデューサーで有界チャネルを使用する場合、Complete()を慎重に調整してください — すべてのプロデューサーが書き込みを終了した後にのみ呼び出してください。そうでないと一部のライターがChannelClosedExceptionを受け取る可能性があります。

複数プロデューサー、複数コンシューマー

// multiConsumer: trueで複数のコンシューマーからの並行ReadAsyncが有効になる
var channel = ValkarnTask.Channel.CreateUnbounded<Job>(multiConsumer: true);

async ValkarnTask WorkerAsync(int id, CancellationToken ct)
{
try
{
while (true)
{
var job = await channel.Reader.ReadAsync();
await ExecuteJobAsync(job, ct);
}
}
catch (ChannelClosedException)
{
// チャネルが完了 — グレースフルに終了
}
}

各アイテムはちょうど1つのコンシューマーに届きます。コンシューマーはFIFO順でアイテムを競います(最も長く待っているコンシューマーが次の利用可能なアイテムを取得します)。

グレースフルシャットダウン

推奨されるシャットダウンシーケンスは:

  1. すべてのプロデューサーに停止をシグナルします(例:CancellationTokenをキャンセル)。
  2. すべてのプロデューサーが書き込みを停止した後、channel.Writer.Complete()を呼び出します。
  3. すべてのアイテムが消費されたことを確認するためchannel.Reader.Completionをawaitします。
cts.Cancel();                        // プロデューサーを停止
await allProducersTask; // それらが終了するのを待つ
channel.Writer.Complete(); // チャネルを封印
await channel.Reader.Completion; // 残りのアイテムをドレイン

ReadAllAsyncを使用している場合、ステップ3は自動的に発生します — await foreachループはチャネルが完了して空になると終了します。


System.Threading.Channelsとの比較

機能ValkarnチャネルSystem.Threading.Channels
戻り値型ValkarnTask / ValkarnTask<T>ValueTask / ValueTask<T>
アロケーション(ホットパス)ゼロ(シングルコンシューマー無界)ほぼゼロ
WaitToReadAsyncなし — ReadAsyncまたはReadAllAsyncを使用あり
TryComplete(Exception)なし — Complete()を使用あり
Count / CanCount公開なし一部のチャネル型に存在
満杯時のドロップポリシー非対応 — WriteAsyncがブロックDropWriteDropNewestDropOldestWait
非同期列挙可能ReadAllAsync()ReadAllAsync()
スレッド安全性完全(ロックベース)完全(ロックベース)

主要な違いは、ValkarnチャネルがUnityビルドでValkarnTaskとネイティブに統合してオーバーヘッドゼロのawaitを実現し、シングルコンシューマーパスはReadAsync呼び出しごとのプールレンタル/返却を、完了コアをチャネルに直接埋め込むことで回避している点です。