Threads channel что это
Потоки, блокировки и условные переменные в C++11 [Часть 1]
В первой части этой статьи основное внимание будет уделено потокам и блокировкам в С++11, условные переменные во всей своей красе будут подробно рассмотрены во второй части…
Потоки
В C++11, работа с потокам осуществляется по средствам класса std::thread (доступного из заголовочного файла
Также следует отметить, что если функция потока кидает исключение, то оно не будет поймано try-catch блоком. Т.е. следующий код не будет работать (точнее работать то будет, но не так как было задумано: без перехвата исключений):
Для передачи исключений между потоками, необходимо ловить их в функции потока и хранить их где-то, чтобы, в дальнейшем, получить к ним доступ.
Блокировки
Программа должна выдавать примерно следующее:
Теперь, результат работы программы будет следующего вида:
Можно поспорить насчет того, что метод dump() должен быть константным, ибо не изменяет состояние контейнера. Попробуйте сделать его таковым и получите ошибку при компиляции:
Предположим, что эта функция вызвана из двух разных потоков, из первого потока: элемент удаляется из 1 контейнера и добавляется во 2, из второго потока, наоборот, элемент удаляется из 2 контейнера и добавляется в 1. Это может вызвать deadlock (если контекст потока переключается от одного потока к другому, сразу после первой блокировки).
System.Threading.Channels — высокопроизводительный производитель-потребитель и асинхронность без аллокаций и стэк дайва
И снова здравствуй. Какое-то время назад я писал о другом малоизвестном инструменте для любителей высокой производительности — System.IO.Pipelines. По своей сути, рассматриваемый System.Threading.Channels (в дальнейшем «каналы») построен по похожим принципам, что и Пайплайны, решает ту же задачу — Производитель-Потребитель. Однако имеет в разы более простое апи, которое изящно вольется в любого рода enterprise-код. При этом использует асинхронность без аллокаций и без stack-dive даже в асинхронном случае! (Не всегда, но часто).
Оглавление
Введение
Задача Производитель/Потребитель встречается на пути программистов довольно часто и уже не первый десяток лет. Сам Эдсгер Дейкстра приложил руку к решению данной задачи — ему принадлежит идея использования семафоров для синхронизации потоков при организации работы по принципу производитель/потребитель. И хотя ее решение в простейшем виде известно и довольно тривиально, в реальном мире данный шаблон (Производитель/Потребитель) может встречаться в гораздо более усложненном виде. Также современные стандарты программирования накладывают свои отпечатки, код пишется более упрощенно и разбивается для дальнейшего переиспользования. Все делается для понижения порога написания качественного кода и упрощения данного процесса. И рассматриваемое пространство имен — System.Threading.Channels — очередной шаг на пути к этой цели.
Какое-то время назад я рассматривал System.IO.Pipelines. Там требовалось более внимательная работа и глубокое осознание дела, в ход шли Span и Memory, а для эффективной работы требовалось не вызывать очевидных методов (чтобы избежать лишних выделений памяти) и постоянно думать в байтах. Из-за этого программный интерфейс Пайплайнов был нетривиален и интуитивно не понятен.
В System.Threading.Channels пользователю представляется гораздо более простое api для работы. Стоит упомянуть, что несмотря на простоту api, данный инструмент является весьма оптимизированным и на протяжении своей работы вполне вероятно не выделит память. Возможно это благодаря тому, что под капотом повсеместно используется ValueTask, а даже в случае реальной асинхронности используется IValueTaskSource, который переиспользуется для дальнейших операций. Именно в этом заключается весь интерес реализации Каналов.
Каналы являются обобщенными, тип обобщения, как несложно догадаться — тип, экземпляры которого будут производиться и потребляться. Интересно то, что реализация класса Channel, которая помещается в 1 строку (источник github):
Таким образом основной класс каналов параметризован 2 типами — отдельно под канал производитель и канал потребитель. Но для реализованых каналов это не используется.
Для тех, кто знаком с Пайплайнами, общий подход для начала работы покажется знакомым. А именно. Мы создаем 1 центральный класс, из которого вытаскиваем отдельно производителей(ChannelWriter) и потребителей(ChannelReader). Несмотря на названия, стоит помнить, что это именно производитель/потребитель, а не читатель/писатель из еще одной классической одноименной задачи на многопоточность. ChannelReader изменяет состояние общего channel (вытаскивает значение), которое более становится недоступно. А значит он скорее не читает, а потребляет. Но с реализацией мы ознакомимся позже.
Начало работы. Channel
Начало работы с каналами начинается с абстрактного класса Channel и статического класса Channel, который создает наиболее подходящую реализацию. Далее из этого общего Channel можно получать ChannelWriter для записи в канал и ChannelReader для потребления из канала. Канал является хранилищем общей информации для ChannelWriter и ChannelReader, так, именно в нем хранятся все данные. А уже логика их записи или потребления рассредоточена в ChannelWriter и ChannelReader, Условно каналы можно разделить на 2 группы — безграничные и ограниченные. Первые более простые по реализации, в них можно писать безгранично (пока память позволяет). Вторые же ограничены неким максимальным значением количества записей.
Здесь вытекает немного разная природа асинхронности. В неограниченных каналах операция записи всегда будет завершаться синхронно, нет ничего, что могло бы остановить от записи в канал. Для ограниченных каналов ситуация иная. При стандартном поведении (которое можно заменить) операция записи будет завершаться синхронно до тех пор пока в канале есть место для новых экземпляров. Как только канал заполнен, операция записи не завершится, пока не освободится место (после того, как потребитель потребил потребляемое). Поэтому здесь операция будет реально асинхронной со сменой потоков и сопутствующими изменениями (или без смены, что будет описано чуть позже).
Поведения читателей по большей части одинаково — если в канале есть что-то, то читатель просто читает это и завершается синхронно. Если ничего нет, то ожидает пока кто-то что-то запишет.
Статический класс Channel содержит 4 метода для создания вышеперечисленных каналов:
При желании можно указать более точные опции для создания канала, которые помогут оптимизировать его под указанные нужды.
UnboundedChannelOptions содержит 3 свойства, значение которых по умолчанию false:
У него есть 2 свойства:
Пример начала работы с каналами:
Итак, приступим к изучению непосредственно ChannelReader и ChannelWriter, а также интересных деталей реализации. Они все сводятся к асинхронности без выделений памяти с помощью IValueTaskSource.
ChannelReader — потребитель
У данного метода в абстрактном классе есть реализация, которая основана на методах TryRead и WaitToReadAsync. Если опустить все инфраструктурные нюансы (исключения и cancelation tokens), то логика примерно такая — попытаться прочитать объект с помощью TryRead. Если не удалось, то в цикле while(true) проверять результат метода WaitToReadAsync. Если true, то есть данные есть, вызвать TryRead. Если TryRead получается прочитать, то вернуть результат, в противном случае цикл по новой. Цикл нужен для неудачных попыток чтения — в результате гонки потоков, сразу много потоков могут получить завершение WaitToReadAsync, но объект будет только один, соответственно только один поток сможет прочитать, а остальные уйдут на повторный круг.
Однако данная реализация, как правило, переопределена на что-то более завязанное на внутреннем устройстве.
ChannelWriter — производитель
Все аналогично потребителю, так что сразу смотрим методы:
А теперь перейдем к самой интересной части.
Асинхронность без алллокаций
В процессе написания и изучения кода, я осознал, что почти ничего интересного в реализации всех этих операций нет. В общем можно описать так — избегание лишних блокировок с помощью конкурентных коллекций и обильное использование ValueTask, который является структурой, что экономит память. Однако спешу напомнить, что не стоит быстрой заменой проходиться по всем файлам на вашей ПЭВМ и заменять все Task на ValueTask. Он имеет смысл только в случаях, когда операция в большинстве случаев завершается синхронно. Ведь, как мы помним, при асинхронности вполне вероятна смена потока, а значит и стек уже будет не тот, что прежде. Да и вообще, истинный профессионал в области производительности знает — не оптимизируй до возникновения проблем.
Радует одно, в профессионалы я себя записывать не буду, а поэтому самое время разобраться, в чем же секрет написания асинхронного кода без выделений памяти, что на первый взгляд звучит слишком хорошо для правды. Но бывает и такое.
Интерфейс IValueTaskSource
Как заверяет производитель, данную структуру следует использовать лишь очевидно — с ключевым словом await. То есть не следует применять await много раз к одному и тому же ValueTask, использовать комбинаторы, добавлять несколько продолжений и тп. Также не следует получать результат из ValueTask более одного раза. А связано это как раз с тем, что мы пытаемся понять — переиспользованием всего этого добра без выделения памяти.
Я уже упомянул интерфейс IValueTaskSource. Именно он помогает сэкономить память. Делается это с помощью переиспользования самого IValueTaskSource несколько раз для множества задач. Но именно из-за этого переиспользования и нет возможности баловаться с ValueTask.
Итак, IValueTaskSource. Данный интерфейс имеет 3 метода, реализовав которые вы будете успешно экономить память и время на выделении тех заветных байт.
CompareExchange
Имеются также перегрузи с int, long, float, double, IntPtr, object.
Сам метод атомарный, то бишь выполняется без прерываний. Сравнивает 2 значения и, если они равны, выполняет присваивание нового значения в переменную. Решают проблему, когда нужно проверить значение переменной и в зависимости от него изменить переменную.
Допустим, вы хотите инкрементировать переменную, если ее значение меньше 10.
Далее идут 2 потока.
Поток 1 | Поток 2 |
---|---|
Проверяет значение переменной на некоторое условие (то есть меньше ли оно 10), которое срабатывает | — |
Между проверкой и изменением значения | Присваивает переменной значение, не удовлетворяющее условию (например, 15) |
Изменяет значение, хотя не должен, ведь условие уже не соблюдается | — |
При использовании данного метода, вы либо изменяете именно то значение, что хотели, либо не изменяете, получив при этом актуальное значение переменной.
location1 — переменная, значение которой мы хотим поменять. Оно сравнивается с comparand, в случае равенства в location1 записывается value. Если операция удалась, то метод вернет прошлое значение переменной location1. Если же нет, то будет возращено актуальное значение location1.
Если говорить чуть глубже, то существует инструкция языка ассемблера cmpxchg, которая выполняет эти действия. Именно она и используется под капотом.
Stack dive
Рассматривая весь этот код я не раз наткнулся на упоминания «Stack Dive». Это очень крутая и интересная штука, которая на самом деле очень нежелательна. Суть в том, что при синхронном выполнении продолжений мы можем исчерпать ресурсы стека.
Допустим, мы имеем 10000 задач, в стиле
Допустим, первая задача завершает выполнение и этим освобождает продолжение второй, которое мы начинаем тут же выполнять синхронно в этом потоке, то есть забирая кусок стека стек фреймом данного продолжения. В свою очередь, данное продолжение разблокирует продолжение третей задачи, которое мы тоже начинаем сразу выполнять. И так далее. Если в продолжении больше нет await’ов или чего-то, что как-то сбросит стек, то мы просто будем потреблять стековое пространство до упора. Что может вызвать StackOverflow и крах приложения. В рассмотрении кода я упомяну, как с этим борется AsyncOperation.
AsyncOperation как реализация IValueTaskSource
Также AsyncOperation реализует IThreadPoolWorkItem с единственным методом — void Execute() => SetCompletionAndInvokeContinuation(). Метод SetCompletionAndInvokeContinuation как раз и занимается выполнением продолжения. И данный метод вызывается либо напрямую в коде AsyncOperation, либо через упомянутый Execute. Ведь типы реализующие IThreadPoolWorkItem можно забрасывать в тред пул как-то вот так ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false).
Метод Execute будет выполнен тред пулом.
Само выполнение продолжения довольно тривиально.
Продолжение _continuation копируется в локальную переменную, на ее место записывается s_completedSentinel — искусственный объект-марионетка (иль часовой, не знаю, как глаголить мне в нашей речи), который указывает, что задача завершена. Ну а далее локальная копия реального продолжения просто выполняется. При наличии ExecutionContext, данные действия постятся в контекст. Никакого секрета тут нет. Этот код может быть вызван как напрямую классом — просто вызвав метод, инкапсулирующий эти действия, так и через интерфейс IThreadPoolWorkItem в тред пуле. Теперь можно догадаться, как работает функция с выполнением продолжений синхронно.
Первый метод интерфейса IValueTaskSource — GetResult (github).
Метод тривиален. — он сохраняет принятый параметр в _result и сигнализирует о завершении, а именно вызывает метод SignalCompleteion, который довольно интересен.
В данном методе используется все, о чем мы говорили в начале.
В самом начале, если _continuation == null, мы записываем марионетку s_completedSentinel.
Далее метод можно разделить на 4 блока. Сразу скажу для простоты понимания схемы, 4 блок — просто синхронное выполнение продолжения. То есть тривиальное выполнение продолжения через метод, как я описано в абзаце про IThreadPoolWorkItem.
Третий и последний, но самый сложный метод интерфейса IValueTaskSource — OnCompleted (github)
Метод добавляет продолжение, которое выполняется по завершению.
При необходимости захватывает ExecutionContext и SynchronizationContext.
Далее используется Interlocked.CompareExchange, описанный выше, чтобы сохранить продолжение в поле, сравнивая его с null. Напоминаю, что CompareExchange возвращает актуальное значение переменной.
Если сохранение продолжения прошло, то возвращается значение, которое было в переменной до обновления, то есть null. Это означает, что операция еще не завершилась на момент записи продолжения. И тот, кто ее завершит сам со всем разберется (как мы смотрели выше). И нам нет смысла выполнять какие-то дополнительные действия. И на этом работа метода завершается.
Если сохранить значение не получилось, то есть из CompareExchange вернулось что-то кроме null. В этом случае кто-то успел положить значение в быстрее нас. То есть произошла одна из 2 ситуаций — или задача завершилась быстрее, чем мы до сюда дошли, или была попытка записать более 1 продолжения, что делать нельзя.
Таким образом проверяем возвращенное значение, равно ли оно s_completedSentinel — именно оно было бы записано в случае завершения.
Main, before await. Thread id: 1
Created thread for writing with delay, before await write. Thread id: 4
Main, after await (will be processed by created thread for writing). Thread id: 4
Created thread for writing with delay, after await write. Thread id: 4
An Introduction to System.Threading.Channels
December 11th, 2019
“Producer/consumer” problems are everywhere, in all facets of our lives. A line cook at a fast food restaurant, slicing tomatoes that are handed off to another cook to assemble a burger, which is handed off to a register worker to fulfill your order, which you happily gobble down. Postal drivers delivering mail all along their routes, and you either seeing a truck arrive and going out to the mailbox to retrieve your deliveries or just checking later in the day when you get home from work. An airline employee offloading suitcases from a cargo hold of a jetliner, placing them onto a conveyer belt, where they’re shuttled down to another employee who transfers bags to a van and drives them to yet another conveyer that will take them to you. And a happy engaged couple preparing to send out invites to their wedding, with one partner addressing an envelope and handing it off to the other who stuffs and licks it.
As software developers, we routinely see happenings from our everyday lives make their way into our software, and “producer/consumer” problems are no exception. Anyone who’s piped together commands at a command-line has utilized producer/consumer, with the stdout from one program being fed as the stdin to another. Anyone who’s launched multiple workers to compute discrete values or to download data from multiple sites has utilized producer/consumer, with a consumer aggregating results for display or further processing. Anyone who’s tried to parallelize a pipeline has very explicitly employed producer/consumer. And so on.
All of these scenarios, whether in our real-world or software lives, have something in common: there is some vehicle for handing off the results from the producer to the consumer. The fast food employee places the completed burgers in a stand that the register worker pulls from to fill the customer’s bag. The postal worker places mail into a mailbox. The engaged couple’s hands meet to transfer the materials from one to the other. In software, such a hand-off requires a data structure of some kind to facilitate the transaction, storage that can used by the producer to transfer a result and potentially buffer more, while also enabling the consumer to be notified that one or more results are available. Enter System.Threading.Channels.
What is a Channel?
I often find it easiest to understand some technology by implementing a simple version myself. In doing so, I learn about various problems implementers of that technology may have had to overcome, trade-offs they may have had to make, and the best way to utilize the functionality. To that end, let’s start learning about System.Threading.Channels by implementing a “channel” from scratch.
A channel is simply a data structure that’s used to store produced data for a consumer to retrieve, and an appropriate synchronization to enable that to happen safely, while also enabling appropriate notifications in both directions. There is a multitude of possible design decisions involved. Should a channel be able to hold an unbounded number of items? If not, what should happen when it fills up? How critical is performance? Do we need to try to minimize synchronization? Can we make any assumptions about how many producers and consumers are allowed concurrently? For the purposes of quickly writing a simple channel, let’s make simplifying assumptions that we don’t need to enforce any particular bound and that we don’t need to be overly concerned about overheads. We’ll also make up a simple API.
To start, we need our type, to which we’ll add a few simple methods:
Now we just need to implement these two methods. To start, we’ll add two fields to our type: one to serve as the storage mechanism, and one to coordinate between the producers and consumers:
We use a ConcurrentQueue to store the data, freeing us from needing to do our own locking to protect the buffering data structure, as ConcurrentQueue is already thread-safe for any number of producers and any number of consumers to access concurrently. And we use a SempahoreSlim to help coordinate between producers and consumers and to notify consumers that might be waiting for additional data to arrive.
Our Write method is simple. It just needs to store the data into the queue and increment the SemaphoreSlim ‘s count by “release”ing it:
And our ReadAsync method is almost just as simple. It needs to wait for data to be available and then take it out.
Note that because no other code could be manipulating the semaphore or the queue, we know that once we’ve successfully waited on the semaphore, the queue will have data to give us, which is why we can just assert that the TryDequeue method successfully returned one. If those assumptions ever changed, this implementation would need to become more complicated.
And that’s it: we have our basic channel. If all you need are the basic features assumed here, such an implementation is perfectly reasonable. Of course, the requirements are often more significant, both on performance and on APIs necessary to enable more scenarios.
Now that we understand the basics of what a channel provides, we can switch to looking at the actual System.Threading.Channel APIs.
Introducing System.Threading.Channels
The core abstractions exposed from the System.Threading.Channels library are a writer:
And the base implementation of the virtual method employs the exact pattern nested-loop pattern shown previously with WaitToReadAsync and TryRead :
Built-In Channel Implementations
Ok, so we know how to write to writers and read from readers… but from where do we get those writers and readers?
The Channel type exposes a Writer property and a Reader property that returns a ChannelWriter and a ChannelReader
This base abstract class is available for the niche uses cases where a channel may itself transform written data into a different type for consumption, but the vast majority use case has TWrite and TRead being the same, which is why the majority use happens via the derived Channel type, which is nothing more than:
The non-generic Channel type then provides factories for several implementations of Channel :
The BoundedChannelOptions passed to CreateBounded layers on additional options specific to bounding. In addition to the maximum capacity supported by the channel, it also exposes a BoundedChannelFullMode enum that indicates the behavior writes should experience when the channel is full:
Performance
Consider this benchmark:
Here we’re just testing the throughput and memory allocation on an unbounded channel when writing an element and then reading out that element 10 million times, which means an element will always be available for the read to consume and thus the read will always complete synchronously, yielding the following results on my machine (the 72 bytes shown in the Allocated column is for the single Task returned from WriteThenRead):
Method | Mean | Error | StdDev | Gen 0 | Gen 1 | Gen 2 | Allocated |
---|---|---|---|---|---|---|---|
WriteThenRead | 527.8 ms | 2.03 ms | 1.90 ms | – | – | – | 72 B |
But now let’s change it slightly, first issuing the read and only then writing the element that will satisfy it. In this case, reads will always complete asynchronously because the data to complete them will never be available:
which on my machine for 10 million writes and reads yields results like this:
Method | Mean | Error | StdDev | Gen 0 | Gen 1 | Gen 2 | Allocated |
---|---|---|---|---|---|---|---|
ReadThenWrite | 881.2 ms | 4.60 ms | 4.30 ms | – | – | – | 72 B |
So, there’s some more overhead when every read completes asynchronously, but even here we see zero allocations for the 10 million asynchronously-completing reads (again, the 72 bytes shown in the Allocated column is for the Task returned from ReadThenWrite)!
Combinators
Generally consumption of channels is simple, using one of the approaches shown earlier. But as with IEnumerable s, it’s also possible to implement various kinds of operations over channels to accomplish a specific purpose. For example, let’s say I want to wait for the first element to arrive from either of two supplied readers; I could write something like this: