Перейти к основному содержимому

Каналы

Каналы предоставляют потокобезопасный, асинхронный конвейер для передачи данных между производителями и потребителями. Они особенно подходят для игровых систем, где работа генерируется с одной стороны (фоновые потоки, коллбэки событий, завершения заданий) и должна потребляться с другой (главный поток, пулы рабочих потоков).

Создание канала

Каналы создаются через статический фабричный класс ValkarnTask.Channel. Публичного конструктора нет.

Неограниченный канал

Channel<T> channel = ValkarnTask.Channel.CreateUnbounded<T>(bool multiConsumer = false);

Неограниченный канал не имеет ограничения ёмкости. WriteAsync и TryWrite всегда немедленно успешно выполняются, пока канал не завершён. Элементы накапливаются во внутренней Queue<T> до тех пор, пока потребитель их не прочитает.

Параметры

ПараметрПо умолчаниюОписание
multiConsumerfalseЕсли true, поддерживаются несколько параллельных вызовов ReadAsync (конкурирующие потребители). Если false, только один читатель может одновременно ожидать ReadAsync.

Ограниченный канал

Channel<T> channel = ValkarnTask.Channel.CreateBounded<T>(int capacity, bool multiConsumer = false);

Ограниченный канал хранит не более capacity элементов в кольцевом буфере фиксированного размера. Когда буфер полон, WriteAsync асинхронно приостанавливает вызывающего, пока не освободится место (обратное давление). TryWrite немедленно возвращает false, когда канал полон, вместо ожидания.

Параметры

ПараметрПо умолчаниюОписание
capacityобязательныйМаксимальное количество элементов в буфере. Должно быть больше нуля.
multiConsumerfalseТо же, что и для неограниченного — включает нескольких параллельных читателей.

Выбор между ограниченным и неограниченным

Используйте CreateBounded, когда нужно применить обратное давление — то есть когда хотите, чтобы производители автоматически замедлялись, если потребители отстают. Используйте CreateUnbounded, когда скорость производителя естественно ограничена (например, события ввода) и неограниченная буферизация приемлема, или когда вы уже учли рост памяти.


Channel<T>

Channel<T> — это контейнер, открывающий две стороны конвейера как отдельные объекты.

public sealed class Channel<T>
{
public ChannelReader<T> Reader { get; }
public ChannelWriter<T> Writer { get; }
}

Держите ссылку на Writer на стороне производителя и ссылку на Reader на стороне потребителя. Они не обязаны находиться в одном потоке.


ChannelWriter<T>

ChannelWriter<T> — это сторона записи канала. Получите его через channel.Writer.

TryWrite

public abstract bool TryWrite(T item);

Пытается записать элемент без приостановки. Возвращает true, если элемент принят; возвращает false, если канал полон (ограниченный) или завершён.

Используйте TryWrite на горячих путях, где вы можете позволить себе отбросить элементы, или когда опрашиваете в цикле и хотите избежать аллокаций от асинхронных конечных автоматов.

if (!channel.Writer.TryWrite(item))
{
// канал полон или закрыт — обработайте соответственно
}

WriteAsync

public abstract ValkarnTask WriteAsync(T item);

Записывает элемент в канал, при необходимости асинхронно приостанавливая вызывающего.

  • Неограниченные каналы: всегда завершается синхронно (быстрый путь без аллокаций), пока канал открыт.
  • Ограниченные каналы: завершается синхронно при наличии места в буфере; приостанавливает вызывающего и ставит в очередь запись в ожидании, когда буфер полон. Вызывающий возобновляется, как только потребитель прочитает элемент и освободит слот.

Бросает ChannelClosedException, если канал был завершён через Complete() до или во время записи.

await channel.Writer.WriteAsync(item);

Несколько производителей могут параллельно вызывать WriteAsync на ограниченном канале. Каждый приостановленный писатель ставится в очередь и разблокируется в порядке FIFO по мере освобождения места.

Complete

public abstract void Complete();

Сигнализирует, что больше элементов записываться не будет. После вызова Complete():

  • Все уже записанные элементы остаются в буфере и могут быть потреблены.
  • Новые вызовы WriteAsync или TryWrite завершатся с ChannelClosedException.
  • После полного опустошения буфера ChannelReader<T>.Completion завершается, и все ожидающие или будущие вызовы ReadAsync бросают ChannelClosedException.

Complete() идемпотентен при однократном вызове — многократный вызов безопасен (последующие вызовы игнорируются).

// Сигнализировать об окончании работы
channel.Writer.Complete();

ChannelReader<T>

ChannelReader<T> — это сторона чтения канала. Получите его через channel.Reader.

ReadAsync

public abstract ValkarnTask<T> ReadAsync();

Читает следующий элемент из канала. Если элемент недоступен, вызывающий асинхронно приостанавливается до его появления. Когда канал завершён и полностью опустошён, бросает ChannelClosedException.

T item = await channel.Reader.ReadAsync();

Режим одного потребителя (по умолчанию): одновременно может выполняться только один ReadAsync. Попытка запустить второй параллельный ReadAsync немедленно бросает исключение. Это ограничение позволяет внутреннюю оптимизацию без аллокаций — ядро читателя встраивается непосредственно в реализацию канала, а не выделяется из пула при каждом вызове.

Режим нескольких потребителей (multiConsumer: true): любое количество вызовов ReadAsync может быть в ожидании одновременно. Каждый ожидающий вызывающий ставится в очередь и разрешается в порядке FIFO по мере поступления элементов.

TryRead

public abstract bool TryRead(out T item);

Пытается прочитать элемент без приостановки. Возвращает true и заполняет item, если элемент доступен; возвращает false, если канал пуст (устанавливает item в default).

TryRead не различает пустой-но-открытый канал и пустой-и-закрытый канал. Используйте Completion для определения закрытого состояния при использовании TryRead в цикле опроса.

ReadAllAsync

public IAsyncEnumerable<T> ReadAllAsync(CancellationToken cancellationToken = default);

Возвращает IAsyncEnumerable<T>, итерирующий все элементы до завершения и опустошения канала. Перечисление завершается аккуратно без распространения ChannelClosedException — оно просто останавливается.

await foreach (var item in channel.Reader.ReadAllAsync(ct))
{
Process(item);
}
// Сюда попадаем, когда канал завершён и пуст

Токен отмены, переданный в ReadAllAsync, используется как запасной вариант. Если GetAsyncEnumerator также передан токен (что делает await foreach через WithCancellation), токен со стороны foreach имеет приоритет.

Completion

public abstract ValkarnTask Completion { get; }

ValkarnTask, завершающийся когда канал полностью опустошён после вызова Complete(). В частности:

  • Если Complete() вызывается на уже пустом канале, Completion разрешается немедленно.
  • Если Complete() вызывается пока в буфере ещё есть элементы, Completion разрешается только после потребления последнего элемента.

Ожидание Completion — канонический способ дождаться завершения конвейера.

channel.Writer.Complete();
await channel.Reader.Completion;
// Все элементы потреблены

ChannelClosedException

public sealed class ChannelClosedException : InvalidOperationException

Бросается в двух случаях:

  1. Чтение из завершённого, опустошённого каналаReadAsync() бросает, когда канал помечен завершённым и элементов не осталось.
  2. Запись в завершённый каналWriteAsync() бросает, когда Complete() был вызван до записи.

ChannelClosedException наследует от InvalidOperationException. Он не бросается TryRead или TryWrite, которые вместо этого возвращают false.

Конструкторы:

new ChannelClosedException()
new ChannelClosedException(string message)
new ChannelClosedException(Exception innerException)

Ограниченный канал: обратное давление подробно

Когда буфер ограниченного канала полон и вызывается WriteAsync, писатель приостанавливается и запись в ожидании внутренне ставится в очередь. Писатель удерживает свой элемент. Когда потребитель вызывает ReadAsync или TryRead и удаляет элемент из очереди:

  1. Освободившийся слот немедленно захватывается самым старым ожидающим писателем.
  2. Элемент этого писателя помещается в буфер.
  3. Ожидающий код писателя возобновляется.

Это означает, что полный ограниченный канал никогда не теряет элементы и никогда не тратит впустую ёмкость буфера — всегда есть взаимно однозначное соответствие между освобождением слота и возобновлением заблокированного писателя. В случаях, когда читатель прибывает пока ожидающие писатели ждут, но буфер пуст, элемент передаётся напрямую, минуя буфер.


Паттерны

Базовый producer/consumer

var channel = ValkarnTask.Channel.CreateUnbounded<WorkItem>();

// Производитель (например, работает в фоновом потоке или из коллбэков)
async ValkarnTask ProduceAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
var work = await FetchNextWorkItemAsync(ct);
await channel.Writer.WriteAsync(work);
}
channel.Writer.Complete();
}

// Потребитель (запускается там, где вы выберете)
async ValkarnTask ConsumeAsync()
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
await ProcessAsync(item);
}
}

Несколько производителей, один потребитель

Несколько производителей могут каждый держать ссылку на channel.Writer и параллельно вызывать WriteAsync. Все операции с каналом защищены внутренней блокировкой, поэтому это безопасно.

var channel = ValkarnTask.Channel.CreateBounded<Event>(capacity: 64);

// Несколько производителей пишут параллельно
ValkarnTask ProducerA() => ProduceFrom(sourceA, channel.Writer);
ValkarnTask ProducerB() => ProduceFrom(sourceB, channel.Writer);
ValkarnTask ProducerC() => ProduceFrom(sourceC, channel.Writer);

// Один потребитель (по умолчанию — флаг multiConsumer не нужен)
async ValkarnTask ConsumerAsync()
{
await foreach (var ev in channel.Reader.ReadAllAsync())
HandleEvent(ev);
}

При использовании нескольких производителей с ограниченным каналом аккуратно координируйте Complete() — вызывайте его только после того, как все производители завершили запись, иначе некоторые писатели могут получить ChannelClosedException.

Несколько производителей, несколько потребителей

// multiConsumer: true включает параллельные ReadAsync от нескольких потребителей
var channel = ValkarnTask.Channel.CreateUnbounded<Job>(multiConsumer: true);

async ValkarnTask WorkerAsync(int id, CancellationToken ct)
{
try
{
while (true)
{
var job = await channel.Reader.ReadAsync();
await ExecuteJobAsync(job, ct);
}
}
catch (ChannelClosedException)
{
// Канал завершён — выходим корректно
}
}

Каждый элемент доставляется ровно одному потребителю. Потребители конкурируют за элементы в порядке FIFO (потребитель, ждавший дольше всех, получает следующий доступный элемент).

Корректное завершение работы

Рекомендуемая последовательность завершения:

  1. Сигнализировать всем производителям остановиться (например, отменить их CancellationToken).
  2. Вызвать channel.Writer.Complete() после того, как все производители прекратили запись.
  3. Ожидать channel.Reader.Completion для подтверждения потребления всех элементов.
cts.Cancel();                        // остановить производителей
await allProducersTask; // дождаться их завершения
channel.Writer.Complete(); // запечатать канал
await channel.Reader.Completion; // слить оставшиеся элементы

Если вы используете ReadAllAsync, шаг 3 выполняется автоматически — цикл await foreach завершается, когда канал завершён и пуст.


Сравнение с System.Threading.Channels

ВозможностьКаналы ValkarnSystem.Threading.Channels
Тип возвратаValkarnTask / ValkarnTask<T>ValueTask / ValueTask<T>
Аллокация (горячий путь)Ноль (однопотребительский неограниченный)Почти ноль
WaitToReadAsyncОтсутствует — используйте ReadAsync или ReadAllAsyncПрисутствует
TryComplete(Exception)Отсутствует — используйте Complete()Присутствует
Count / CanCountНе открытПрисутствует на некоторых типах каналов
Политика сброса при заполненииНе поддерживается — WriteAsync блокируетDropWrite, DropNewest, DropOldest, Wait
Асинхронное перечислениеReadAllAsync()ReadAllAsync()
ПотокобезопасностьПолная (на основе блокировки)Полная (на основе блокировки)

Основное отличие в том, что каналы Valkarn нативно интегрированы с ValkarnTask для ожидания без накладных расходов в сборках Unity, и путь единственного потребителя избегает аренды/возврата из пула при каждом вызове ReadAsync, встраивая ядро завершения непосредственно в канал.