Каналы
Каналы предоставляют потокобезопасный, асинхронный конвейер для передачи данных между производителями и потребителями. Они особенно подходят для игровых систем, где работа генерируется с одной стороны (фоновые потоки, коллбэки событий, завершения заданий) и должна потребляться с другой (главный поток, пулы рабочих потоков).
Создание канала
Каналы создаются через статический фабричный класс ValkarnTask.Channel. Публичного конструктора нет.
Неограниченный канал
Channel<T> channel = ValkarnTask.Channel.CreateUnbounded<T>(bool multiConsumer = false);
Неограниченный канал не имеет ограничения ёмкости. WriteAsync и TryWrite всегда немедленно успешно выполняются, пока канал не завершён. Элементы накапливаются во внутренней Queue<T> до тех пор, пока потребитель их не прочитает.
Параметры
| Параметр | По умолчанию | Описание |
|---|---|---|
multiConsumer | false | Если true, поддерживаются несколько параллельных вызовов ReadAsync (конкурирующие потребители). Если false, только один читатель может одновременно ожидать ReadAsync. |
Ограниченный канал
Channel<T> channel = ValkarnTask.Channel.CreateBounded<T>(int capacity, bool multiConsumer = false);
Ограниченный канал хранит не более capacity элементов в кольцевом буфере фиксированного размера. Когда буфер полон, WriteAsync асинхронно приостанавливает вызывающего, пока не освободится место (обратное давление). TryWrite немедленно возвращает false, когда канал полон, вместо ожидания.
Параметры
| Параметр | По умолчанию | Описание |
|---|---|---|
capacity | обязательный | Максимальное количество элементов в буфере. Должно быть больше нуля. |
multiConsumer | false | То же, что и для неограниченного — включает нескольких параллельных читателей. |
Выбор между ограниченным и неограниченным
Используйте CreateBounded, когда нужно применить обратное давление — то есть когда хотите, чтобы производители автоматически замедлялись, если потребители отстают. Используйте CreateUnbounded, когда скорость производителя естественно ограничена (например, события ввода) и неограниченная буферизация приемлема, или когда вы уже учли рост памяти.
Channel<T>
Channel<T> — это контейнер, открывающий две стороны конвейера как отдельные объекты.
public sealed class Channel<T>
{
public ChannelReader<T> Reader { get; }
public ChannelWriter<T> Writer { get; }
}
Держите ссылку на Writer на стороне производителя и ссылку на Reader на стороне потребителя. Они не обязаны находиться в одном потоке.
ChannelWriter<T>
ChannelWriter<T> — это сторона записи канала. Получите его через channel.Writer.
TryWrite
public abstract bool TryWrite(T item);
Пытается записать элемент без приостановки. Возвращает true, если элемент принят; возвращает false, если канал полон (ограниченный) или завершён.
Используйте TryWrite на горячих путях, где вы можете позволить себе отбросить элементы, или когда опрашиваете в цикле и хотите избежать аллокаций от асинхронных конечных автоматов.
if (!channel.Writer.TryWrite(item))
{
// канал полон или закрыт — обработайте соответственно
}
WriteAsync
public abstract ValkarnTask WriteAsync(T item);
Записывает элемент в канал, при необходимости асинхронно приостанавливая вызывающего.
- Неограниченные каналы: всегда завершается синхронно (быстрый путь без аллокаций), пока канал открыт.
- Ограниченные каналы: завершается синхронно при наличии места в буфере; приостанавливает вызывающего и ставит в очередь запись в ожидании, когда буфер полон. Вызывающий возобновляется, как только потребитель прочитает элемент и освободит слот.
Бросает ChannelClosedException, если канал был завершён через Complete() до или во время записи.
await channel.Writer.WriteAsync(item);
Несколько производителей могут параллельно вызывать WriteAsync на ограниченном канале. Каждый приостановленный писатель ставится в очередь и разблокируется в порядке FIFO по мере освобождения места.
Complete
public abstract void Complete();
Сигнализирует, что больше элементов записываться не будет. После вызова Complete():
- Все уже записанные элементы остаются в буфере и могут быть потреблены.
- Новые вызовы
WriteAsyncилиTryWriteзавершатся сChannelClosedException. - После полного опустошения буфера
ChannelReader<T>.Completionзавершается, и все ожидающие или будущие вызовыReadAsyncбросаютChannelClosedException.
Complete() идемпотентен при однократном вызове — многократный вызов безопасен (последующие вызовы игнорируются).
// Сигнализировать об окончании работы
channel.Writer.Complete();
ChannelReader<T>
ChannelReader<T> — это сторона чтения канала. Получите его через channel.Reader.
ReadAsync
public abstract ValkarnTask<T> ReadAsync();
Читает следующий элемент из канала. Если элемент недоступен, вызывающий асинхронно приостанавливается до его появления. Когда канал завершён и полностью опустошён, бросает ChannelClosedException.
T item = await channel.Reader.ReadAsync();
Режим одного потребителя (по умолчанию): одновременно может выполняться только один ReadAsync. Попытка запустить второй параллельный ReadAsync немедленно бросает исключение. Это ограничение позволяет внутреннюю оптимизацию без аллокаций — ядро читателя встраивается непосредственно в реализацию канала, а не выделяется из пула при каждом вызове.
Режим нескольких потребителей (multiConsumer: true): любое количество вызовов ReadAsync может быть в ожидании одновременно. Каждый ожидающий вызывающий ставится в очередь и разрешается в порядке FIFO по мере поступления элементов.
TryRead
public abstract bool TryRead(out T item);
Пытается прочитать элемент без приостановки. Возвращает true и заполняет item, если элемент доступен; возвращает false, если канал пуст (устанавливает item в default).
TryRead не различает пустой-но-открытый канал и пустой-и-закрытый канал. Используйте Completion для определения закрытого состояния при использовании TryRead в цикле опроса.
ReadAllAsync
public IAsyncEnumerable<T> ReadAllAsync(CancellationToken cancellationToken = default);
Возвращает IAsyncEnumerable<T>, итерирующий все элементы до завершения и опустошения канала. Перечисление завершается аккуратно без распространения ChannelClosedException — оно просто останавливается.
await foreach (var item in channel.Reader.ReadAllAsync(ct))
{
Process(item);
}
// Сюда попадаем, когда канал завершён и пуст
Токен отмены, переданный в ReadAllAsync, используется как запасной вариант. Если GetAsyncEnumerator также передан токен (что делает await foreach через WithCancellation), токен со стороны foreach имеет приоритет.
Completion
public abstract ValkarnTask Completion { get; }
ValkarnTask, завершающийся когда канал полностью опустошён после вызова Complete(). В частности:
- Если
Complete()вызывается на уже пустом канале,Completionразрешается немедленно. - Если
Complete()вызывается пока в буфере ещё есть элементы,Completionразрешается только после потребления последнего элемента.
Ожидание Completion — канонический способ дождаться завершения конвейера.
channel.Writer.Complete();
await channel.Reader.Completion;
// Все элементы потреблены
ChannelClosedException
public sealed class ChannelClosedException : InvalidOperationException
Бросается в двух случаях:
- Чтение из завершённого, опустошённого канала —
ReadAsync()бросает, когда канал помечен завершённым и элементов не осталось. - Запись в завершённый канал —
WriteAsync()бросает, когдаComplete()был вызван до записи.
ChannelClosedException наследует от InvalidOperationException. Он не бросается TryRead или TryWrite, которые вместо этого возвращают false.
Конструкторы:
new ChannelClosedException()
new ChannelClosedException(string message)
new ChannelClosedException(Exception innerException)
Ограниченный канал: обратное давление подробно
Когда буфер ограниченного канала полон и вызывается WriteAsync, писатель приостанавливается и запись в ожидании внутренне ставится в очередь. Писатель удерживает свой элемент. Когда потребитель вызывает ReadAsync или TryRead и удаляет элемент из очереди:
- Освободившийся слот немедленно захватывается самым старым ожидающим писателем.
- Элемент этого писателя помещается в буфер.
- Ожидающий код писателя возобновляется.
Это означает, что полный ограниченный канал никогда не теряет элементы и никогда не тратит впустую ёмкость буфера — всегда есть взаимно однозначное соответствие между освобождением слота и возобновлением заблокированного писателя. В случаях, когда читатель прибывает пока ожидающие писатели ждут, но буфер пуст, элемент передаётся напрямую, минуя буфер.
Паттерны
Базовый producer/consumer
var channel = ValkarnTask.Channel.CreateUnbounded<WorkItem>();
// Производитель (например, работает в фоновом потоке или из коллбэков)
async ValkarnTask ProduceAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
var work = await FetchNextWorkItemAsync(ct);
await channel.Writer.WriteAsync(work);
}
channel.Writer.Complete();
}
// Потребитель (запускается там, где вы выберете)
async ValkarnTask ConsumeAsync()
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
await ProcessAsync(item);
}
}
Несколько производителей, один потребитель
Несколько производителей могут каждый держать ссылку на channel.Writer и параллельно вызывать WriteAsync. Все операции с каналом защищены внутренней блокировкой, поэтому это безопасно.
var channel = ValkarnTask.Channel.CreateBounded<Event>(capacity: 64);
// Несколько производителей пишут параллельно
ValkarnTask ProducerA() => ProduceFrom(sourceA, channel.Writer);
ValkarnTask ProducerB() => ProduceFrom(sourceB, channel.Writer);
ValkarnTask ProducerC() => ProduceFrom(sourceC, channel.Writer);
// Один потребитель (по умолчанию — флаг multiConsumer не нужен)
async ValkarnTask ConsumerAsync()
{
await foreach (var ev in channel.Reader.ReadAllAsync())
HandleEvent(ev);
}
При использовании нескольких производителей с ограниченным каналом аккуратно координируйте Complete() — вызывайте его только после того, как все производители завершили запись, иначе некоторые писатели могут получить ChannelClosedException.
Несколько производителей, несколько потребителей
// multiConsumer: true включает параллельные ReadAsync от нескольких потребителей
var channel = ValkarnTask.Channel.CreateUnbounded<Job>(multiConsumer: true);
async ValkarnTask WorkerAsync(int id, CancellationToken ct)
{
try
{
while (true)
{
var job = await channel.Reader.ReadAsync();
await ExecuteJobAsync(job, ct);
}
}
catch (ChannelClosedException)
{
// Канал завершён — выходим корректно
}
}
Каждый элемент доставляется ровно одному потребителю. Потребители конкурируют за элементы в порядке FIFO (потребитель, ждавший дольше всех, получает следующий доступный элемент).
Корректное завершение работы
Рекомендуемая последовательность завершения:
- Сигнализировать всем производителям остановиться (например, отменить их
CancellationToken). - Вызвать
channel.Writer.Complete()после того, как все производители прекратили запись. - Ожидать
channel.Reader.Completionдля подтверждения потребления всех элементов.
cts.Cancel(); // остановить производителей
await allProducersTask; // дождаться их завершения
channel.Writer.Complete(); // запечатать канал
await channel.Reader.Completion; // слить оставшиеся элементы
Если вы используете ReadAllAsync, шаг 3 выполняется автоматически — цикл await foreach завершается, когда канал завершён и пуст.
Сравнение с System.Threading.Channels
| Возможность | Каналы Valkarn | System.Threading.Channels |
|---|---|---|
| Тип возврата | ValkarnTask / ValkarnTask<T> | ValueTask / ValueTask<T> |
| Аллокация (горячий путь) | Ноль (однопотребительский неограниченный) | Почти ноль |
WaitToReadAsync | Отсутствует — используйте ReadAsync или ReadAllAsync | Присутствует |
TryComplete(Exception) | Отсутствует — используйте Complete() | Присутствует |
Count / CanCount | Не открыт | Присутствует на некоторых типах каналов |
| Политика сброса при заполнении | Не поддерживается — WriteAsync блокирует | DropWrite, DropNewest, DropOldest, Wait |
| Асинхронное перечисление | ReadAllAsync() | ReadAllAsync() |
| Потокобезопасность | Полная (на основе блокировки) | Полная (на основе блокировки) |
Основное отличие в том, что каналы Valkarn нативно интегрированы с ValkarnTask для ожидания без накладных расходов в сборках Unity, и путь единственного потребителя избегает аренды/возврата из пула при каждом вызове ReadAsync, встраивая ядро завершения непосредственно в канал.