Introduction
To communicate among threads in a multithreaded program, you typically need to use mutexes to share the memory of variables. While that works, writing such programs is often hard to get right, even for experts. Not holding a lock when you should can lead to subtle and hard-to-reproduce bugs; holding multiple locks simultaneously can lead to deadlocks.
A channel is a data structure used for message passing among threads.
Or processes, but the focus here is on threads. For processes on Unix, there are pipes.
Instead of communicating among threads by sharing memory (requiring explicitly using mutexes and copying), you share memory by communicating (only explicitly copying).
Notes to the reader
This article assumes familiarity with traditional multithreaded programming, specifically the concepts of mutexes, condition variables, and the pthreads API specifically.
Channels in Go
While channels have been around since 1978, they’ve been popularized by having direct support in the Go programming language. For example, in Go:
func sum_array(vals []int, result chan int) {
sum := 0
for _, val := range vals {
sum += val
}
result <- sum // Send partial sum to channel.
}
func main() {
vals := []int{ 7, 2, 8, -9, 4, 0 };
c := make(chan int)
go sum_array(vals[:len(s)/2], c)
go sum_array(vals[len(s)/2:], c)
sum := <-c + <-c // Receive sums from channel.
}
That code uses two “goroutines” (light-weight threads, created by the lines starting with go
), each calling sum_array
to sum half an array of integers. Each partial sum is then sent to the channel c
using the <-
binary operator. In main
, the partial sums are read from the channel using the <-
unary operator. Notice that there is no explicit use of mutexes, condition variables, or signaling between threads. The <-
operator handles all that behind the curtain for you.
That code uses an unbuffered channel, that is a sender will block until a receiver is available and vice versa, i.e., the threads have to rendezvous. Go also supports buffered channels of a fixed size, e.g.:
c := make(chan int 16) // Buffered channel, 16 capacity.
For buffered channels, senders can send without blocking as long as the channel isn’t full; receivers can receive without blocking as long as the channel isn’t empty.
In addition to the one-at-a-time operations of send and receive, Go also has a select
statement that allows you to select from multiple channels simultaneously:
func fibonacci(fibc chan int, quitc chan bool) {
fib, next_fib := 0, 1
for {
select {
case fibc <- fib: // If sent number, calc next number.
fib, next_fib = next_fib, fib + next_fib
case <-quitc: // If received quit, return.
return
}
}
}
That code calculates Fibonacci numbers and sends them to the fibc
channel. The select
waits until either there is a receiver to receive the next result or any value is received on the quitc
channel.
The select
statement may have an optional default
case that’s selected only if no channels are ready. Lastly, channels may be closed to indicate to receivers that no more messages are coming.
Channels in C API: Basics
To create a channel in C, we can do:
struct chan c;
chan_init( &c, 100, sizeof(int) );
where 100
specifies the capacity of the buffer (if 0
, the channel will be unbuffered) and sizeof
specifies the size of each message.
Since we can’t add an <-
operator to C, we’ll have to substitute ordinary function calls:
int chan_send( struct chan *chan, void const *send_buf,
struct timespec const *duration );
int chan_recv( struct chan *chan, void *recv_buf,
struct timespec const *duration );
extern struct timespec const *const CHAN_NO_TIMEOUT;
But this allows us to specify an optional time-out, something that can’t be done directly in Go. The value NULL
means “don’t wait,” CHAN_NO_TIMEOUT
means “wait indefinitely,” and any other value means to wait for that amount of time.
To implement a time-out in Go, you typically
select
on the channel(s) of interest, plus an additional channel that receives a value after a time-out has expired.
The functions return a standard int
error code:
-
0
: success; -
EINVAL
: invalid argument; -
EPIPE
: channel is closed; -
EAGAIN
:duration
isNULL
and can’t immediately send or receive; -
ETIMEDOUT
:duration
has expired.
There’s also:
void chan_close( struct chan *chan );
void chan_cleanup( struct chan *chan,
void (*msg_cleanup_fn)( void* ) );
where chan_close
closes the channel and chan_cleanup
cleans-up its internal resources. The msg_cleanup_fn
parameter is used only for buffered channels to clean-up each unreceived message, if any. It may be NULL
for unbuffered channels or if no clean-up is necessary.
There’s no equivalent of
chan_cleanup
in Go because Go has garbage collection, so channels and unreceived messages are cleaned-up automatically.
The API for chan_select
is more complicated, so will be deferred until later.
Channels in C Implementation: Basics
This part of the implementation covers the data structure for channels, initialization, clean-up, time-outs, and buffered send and receive.
Data Structures
To implement channels, we’ll obviously need a data structure:
struct chan {
union {
struct {
void *ring_buf; // Message ring buffer.
unsigned ring_len; // Number of messages.
unsigned recv_idx; // Receive index.
unsigned send_idx; // Send index.
} buf;
struct {
void *recv_buf; // Where to put a message.
pthread_cond_t cpy_done[2]; // Receive/send copy done.
pthread_cond_t not_busy[2]; // Receive/send no longer busy.
bool is_busy[2]; // Is recv/send busy?
} unbuf;
};
pthread_cond_t ready[2]; // Receive/send ready.
unsigned short wait_cnt[2]; // Waiting to receive/send.
pthread_mutex_t mtx; // Channel mutex.
size_t msg_size; // Message size.
unsigned buf_cap; // 0 = unbuffered.
bool is_closed; // Is channel closed?
};
Approximately half of the structure’s members are within an anonymous union grouped by buf
and unbuf
structures. For buffered channels, messages will be stored in a ring buffer. For unbuffered channels, recv_buf
is a pointer to where a sender should copy a message to. (An explanation of the remaining members will be in due course.)
The two ready
pthread condition variables mean “receiver can proceed” because either a sender is available (for an unbuffered channel) or the buffer isn’t empty (for a buffered channel) and “sender can proceed” because either a receiver is available (for an unbuffered channel) or the buffer isn’t full (for a buffered channel), respectively.
The two wait_cnt
counts the number of threads waiting to receive and send, respectively. The rest of the members are straightforward.
Internally, we’ll also define:
enum chan_dir {
CHAN_RECV, // Receive direction.
CHAN_SEND // Send direction.
};
typedef enum chan_dir chan_dir;
to be more mnemonic indices into the is_busy
, not_busy
, ready
, wait_cnt
, and cpy_done
arrays than 0
and 1
.
Initialization, Close, & Clean-Up
The implementation of chan_init
is fairly straightforward:
int chan_init( struct chan *chan, unsigned buf_cap,
size_t msg_size ) {
assert( chan != NULL );
if ( buf_cap > 0 ) {
if ( unlikely( msg_size == 0 ) )
return EINVAL;
chan->buf.ring_buf = malloc( buf_cap * msg_size );
if ( chan->buf.ring_buf == NULL )
return ENOMEM;
chan->buf.recv_idx = chan->buf.send_idx = 0;
chan->buf.ring_len = 0;
}
else {
chan->unbuf.recv_buf = NULL;
pthread_cond_init( &chan->unbuf.cpy_done[ CHAN_RECV ], NULL );
pthread_cond_init( &chan->unbuf.cpy_done[ CHAN_SEND ], NULL );
pthread_cond_init( &chan->unbuf.not_busy[ CHAN_RECV ], NULL );
pthread_cond_init( &chan->unbuf.not_busy[ CHAN_SEND ], NULL );
chan->unbuf.is_busy[ CHAN_RECV ] = false;
chan->unbuf.is_busy[ CHAN_SEND ] = false;
}
chan->buf_cap = buf_cap;
chan->msg_size = msg_size;
chan->is_closed = false;
pthread_mutex_init( &chan->mtx, NULL );
pthread_cond_init( &chan->ready[ CHAN_RECV ], NULL );
pthread_cond_init( &chan->ready[ CHAN_SEND ], NULL );
chan->wait_cnt[ CHAN_RECV ] = 0
chan->wait_cnt[ CHAN_SEND ] = 0;
return 0;
}
For information about
likely
andunlikely
, see here.
Not mentioned previously is that chan_init
returns an int
where zero indicates success and non-zero is an error code.
Also not mentioned previously is that, for an unbuffered channel, we allow msg_size
to be zero. This is marginally useful when you simply want one thread to signal another and the message contents are irrelevant.
In Go, you can use a
chan bool
as was done in the earlier example with thequitc
channel. The fact that it received any value is what matters; whether it was true or false doesn’t matter.
The implementation of chan_close
is:
void chan_close( struct chan *chan ) {
assert( chan != NULL );
pthread_mutex_lock( &chan->mtx );
bool const was_already_closed = chan->is_closed;
chan->is_closed = true;
pthread_mutex_unlock( &chan->mtx );
if ( was_already_closed )
return;
chan_signal( chan, CHAN_RECV, &pthread_cond_broadcast );
chan_signal( chan, CHAN_SEND, &pthread_cond_broadcast );
if ( chan->buf_cap == 0 ) {
pthread_cond_broadcast( &chan->unbuf.cpy_done[ CHAN_RECV ] );
pthread_cond_broadcast( &chan->unbuf.cpy_done[ CHAN_SEND ] );
pthread_cond_broadcast( &chan->unbuf.not_busy[ CHAN_RECV ] );
pthread_cond_broadcast( &chan->unbuf.not_busy[ CHAN_SEND ] );
}
}
If the channel was not already closed, use pthread_cond_broadcast
to wake up all threads that may be waiting on condition variables.
The chan_signal
function signals the relevant condition variable, but only if there are actually other threads waiting:
static void chan_signal( struct chan *chan, chan_dir dir,
int (*pthread_cond_fn)( pthread_cond_t* ) ) {
if ( chan->wait_cnt[ dir ] > 0 )
(*pthread_cond_fn)( &chan->ready[ dir ] );
}
The implementation of chan_cleanup
is also fairly straightforward:
void chan_cleanup( struct chan *chan,
void (*msg_cleanup_fn)( void* ) ) {
if ( chan == NULL )
return;
if ( chan->buf_cap > 0 ) {
if ( chan->buf.ring_len > 0 && msg_cleanup_fn != NULL ) {
unsigned idx = chan->buf.recv_idx;
for ( unsigned i = 0; i < chan->buf.ring_len; ++i ) {
(*msg_cleanup_fn)( chan_buf_at( chan, idx ) );
idx = (idx + 1) % chan->buf_cap;
}
}
free( chan->buf.ring_buf );
}
else {
pthread_cond_destroy( &chan->unbuf.cpy_done[ CHAN_RECV ] );
pthread_cond_destroy( &chan->unbuf.cpy_done[ CHAN_SEND ] );
pthread_cond_destroy( &chan->unbuf.not_busy[ CHAN_RECV ] );
pthread_cond_destroy( &chan->unbuf.not_busy[ CHAN_SEND ] );
}
pthread_cond_destroy( &chan->ready[ CHAN_RECV ] );
pthread_cond_destroy( &chan->ready[ CHAN_SEND ] );
pthread_mutex_destroy( &chan->mtx );
}
where chan_buf_at
is a utility function to get the address of the ith message in the ring buffer:
static inline void* chan_buf_at( struct chan *chan, unsigned abs_idx ) {
return (char*)chan->buf.ring_buf + abs_idx * chan->msg_size;
}
Send, Receive, & Time
The implementation of chan_send
is:
int chan_send( struct chan *chan, void const *send_buf,
struct timespec const *duration ) {
assert( chan != NULL );
struct timespec abs_ts;
struct timespec const *const abs_time =
ts_dur_to_abs( duration, &abs_ts );
if ( chan->buf_cap > 0 ) {
if ( unlikely( send_buf == NULL ) )
return EINVAL;
return chan_buf_send( chan, send_buf, abs_time );
}
if ( unlikely( chan->msg_size > 0 && send_buf == NULL ) )
return EINVAL;
return chan_unbuf_send( chan, send_buf, abs_time );
}
In the channel API, we allow the user to specify a time-out as a duration, i.e., a time relative to now, because it’s simpler. However, in the pthreads API, a time-out must be specified as an absolute time in the future. Hence, we need to convert a relative to an absolute time:
static struct timespec const*
ts_dur_to_abs( struct timespec const *duration,
struct timespec *abs_time ) {
assert( abs_time != NULL );
if ( duration == NULL || duration == CHAN_NO_TIMEOUT )
return duration;
struct timeval now;
(void)gettimeofday( &now, NULL );
*abs_time = (struct timespec){
.tv_sec = now.tv_sec + duration->tv_sec,
.tv_nsec = now.tv_usec * 1000 + duration->tv_nsec
};
return abs_time;
}
Once that’s done, chan_send
calls either chan_buf_send
or chan_unbuf_send
.
The implementation of chan_recv
is similar:
int chan_recv( struct chan *chan, void *recv_buf,
struct timespec const *duration ) {
assert( chan != NULL );
struct timespec abs_ts;
struct timespec const *const abs_time =
ts_dur_to_abs( duration, &abs_ts );
if ( chan->buf_cap > 0 ) {
if ( unlikely( recv_buf == NULL ) )
return EINVAL;
return chan_buf_recv( chan, recv_buf, abs_time );
}
if ( unlikely( chan->msg_size > 0 && recv_buf == NULL ) )
return EINVAL;
return chan_unbuf_recv( chan, recv_buf, abs_time );
}
Channels in C Implementation: Buffered Send & Receive
This part of the implementation covers buffered send and receive. They’re a fairly straightforward implementation of the classic producer-consumer problem.
Buffered Send
The implementation of chan_buf_send
is:
static int chan_buf_send( struct chan *chan, void const *send_buf,
struct timespec const *abs_time ) {
int rv = 0;
pthread_mutex_lock( &chan->mtx );
do {
if ( chan->is_closed ) {
rv = EPIPE;
}
else if ( chan->buf.ring_len < chan->buf_cap ) {
memcpy( chan_buf_at( chan, chan->buf.send_idx ), send_buf,
chan->msg_size );
chan->buf.send_idx = (chan->buf.send_idx + 1) % chan->buf_cap;
if ( ++chan->buf.ring_len == 1 )
chan_signal( chan, CHAN_BUF_NOT_EMPTY, &pthread_cond_signal );
break;
}
else {
rv = chan_wait( chan, CHAN_BUF_NOT_FULL, abs_time );
}
} while ( rv == 0 );
pthread_mutex_unlock( &chan->mtx );
return rv;
}
There are three cases:
- The channel is closed: return
EPIPE
. - The channel isn’t full: copy the message into the buffer and increment its length. If the new length is 1, it means the channel was empty but isn’t now, so signal any receivers that may be waiting.
- Otherwise, wait until
abs_time
for the channel to become not full.
For mnemonic convenience, we define these macros to specify which condition we either want to notify about or wait for:
#define CHAN_BUF_NOT_EMPTY CHAN_RECV
#define CHAN_BUF_NOT_FULL CHAN_SEND
The chan_wait
function is used to wait until a channel is “ready” (can proceed):
static int chan_wait( struct chan *chan, chan_dir dir,
struct timespec const *abs_time ) {
assert( chan != NULL );
if ( chan->is_closed )
return EPIPE;
++chan->wait_cnt[ dir ];
int const rv =
pthread_cond_wait_wrapper( &chan->ready[ dir ].chan_ready,
&chan->mtx, abs_time );
--chan->wait_cnt[ dir ];
return chan->is_closed ? EPIPE : rv;
}
If the channel is closed, return EPIPE
; otherwise increment the relevant wait_cnt
and wait. When the wait returns, decrement the relevant wait_cnt
. If the channel was closed while waiting, return EPIPE
; otherwise return whatever the wrapper returned.
The pthread_cond_wait_wrapper
function handles the special cases for abs_time
, specifically if NULL
, returns EAGAIN
; if CHAN_NO_TIMEOUT
, calls pthread_cond_wait
; otherwise calls pthread_cond_timedwait
:
static int pthread_cond_wait_wrapper( pthread_cond_t *cond,
pthread_mutex_t *mtx,
struct timespec const *abs_time ) {
assert( cond != NULL );
assert( mtx != NULL );
if ( abs_time == NULL )
return EAGAIN;
int const pcw_rv = abs_time == CHAN_NO_TIMEOUT ?
pthread_cond_wait( cond, mtx ) :
pthread_cond_timedwait( cond, mtx, abs_time );
switch ( pcw_rv ) {
case 0:
case ETIMEDOUT:
return pcw_rv;
default:
errno = pcw_rv;
perror( PACKAGE ); // defined by Autotools
exit( 1 );
}
}
Buffered Receive
The implementation of chan_buf_recv
is:
static int chan_buf_recv( struct chan *chan, void *recv_buf,
struct timespec const *abs_time ) {
int rv = 0;
pthread_mutex_lock( &chan->mtx );
do {
// Since we can still read from a closed, non-empty, buffered
// channel, there's no check for is_closed first.
if ( chan->buf.ring_len > 0 ) {
memcpy( recv_buf, chan_buf_at( chan, chan->buf.recv_idx ),
chan->msg_size );
chan->buf.recv_idx = (chan->buf.recv_idx + 1) % chan->buf_cap;
if ( chan->buf.ring_len-- == chan->buf_cap )
chan_signal( chan, CHAN_BUF_NOT_FULL, &pthread_cond_signal );
break;
}
rv = chan_wait( chan, CHAN_BUF_NOT_EMPTY, abs_time );
} while ( rv == 0 );
pthread_mutex_unlock( &chan->mtx );
return rv;
}
There are two cases:
- The channel isn’t empty: copy the message from the buffer and decrement its length. If the old length was its capacity, it means the channel was full but isn’t now, so notify any receivers that may be waiting.
- Otherwise, wait until
abs_time
for the channel to become not empty.
Channels in C Implementation: Unbuffered Send & Receive
This part of the implementation covers unbuffered send and receive. They’re a bit trickier to implement because:
- The sender and receiver have to rendezvous, i.e., meet at the same place (in memory) and time to copy a message.
- However, this does have the advantage of enabling the sender to copy a message directly to where the receiver wants it with only a single
memcpy
rather than the two required for buffered channels. - While either a sender (or receiver) is waiting for a receiver (or sender), additional senders (or receivers) must be excluded since, for a single channel, only a single sender and receiver can rendezvous.
- After a message is copied, the sender has to block to allow the receiver to do something with the message, before attempting to send another message (more later).
Unbuffered Send
The implementation of chan_unbuf_send
is:
static int chan_unbuf_send( struct chan *chan, void const *send_buf,
struct timespec const *abs_time ) {
pthread_mutex_lock( &chan->mtx );
int rv = chan_unbuf_acquire( chan, CHAN_SEND, abs_time );
if ( rv == 0 ) {
chan_signal( chan, CHAN_RECV, &pthread_cond_signal );
do {
if ( chan->unbuf.is_busy[ CHAN_RECV ] ) {
if ( chan->msg_size > 0 )
memcpy( chan->unbuf.recv_buf, send_buf, chan->msg_size );
pthread_cond_signal( &chan->unbuf.cpy_done[ CHAN_RECV ] );
pthread_cond_wait( &chan->unbuf.cpy_done[ CHAN_SEND ], &chan->mtx );
pthread_cond_signal( &chan->unbuf.cpy_done[ CHAN_RECV ] );
break;
}
rv = chan_wait( chan, CHAN_SEND, abs_time );
} while ( rv == 0 );
chan_unbuf_release( chan, CHAN_SEND );
}
pthread_mutex_unlock( &chan->mtx );
return rv;
}
The chan_unbuf_acquire
function is used to acquire exclusive access to an unbuffered channel for either sending or receiving:
static int chan_unbuf_acquire( struct chan *chan, chan_dir dir,
struct timespec const *abs_time ) {
int rv = 0;
while ( rv == 0 && chan->unbuf.is_busy[ dir ] ) {
rv = chan->is_closed ? EPIPE :
pthread_cond_wait_wrapper( &chan->unbuf.not_busy[ dir ],
&chan->mtx, abs_time );
} // while
if ( rv == 0 )
chan->unbuf.is_busy[ dir ] = true;
return rv;
}
This is what the is_busy[2]
is for: to know whether either the send end, receive end, or both are “busy” with a thread. If busy, the thread waits until either the relevant not_busy
condition has been signaled or the time-out expires. There’s also a corresponding release function:
static void chan_unbuf_release( struct chan *chan, chan_dir dir ) {
chan->unbuf.is_busy[ dir ] = false;
pthread_cond_signal( &chan->unbuf.not_busy[ dir ] );
}
In chan_unbuf_send
, the line:
if ( chan->unbuf.is_busy[ CHAN_RECV ] ) {
checks whether the channel is busy for a receiver: if so, it means there is a receiver already waiting to rendezvous with the sender, so the sender can proceed to copy the message immediately:
if ( chan->msg_size > 0 )
memcpy( chan->unbuf.recv_buf, send_buf, chan->msg_size );
As a reminder, for an unbuffered channel, we allow msg_size
to be zero for only signaling (no message copying) between threads.
The lines:
pthread_cond_signal( &chan->unbuf.cpy_done[ CHAN_RECV ] );
pthread_cond_wait( &chan->unbuf.cpy_done[ CHAN_SEND ], &chan->mtx );
pthread_cond_signal( &chan->unbuf.cpy_done[ CHAN_RECV ] );
address point 4 above, that is the sender has to block to allow the receiver to do something with the message, before attempting to send another message. To understand the problem, consider the following sequence of events between two threads where each thread is in a loop, one sending and the other receiving:
- On thread 1,
chan_unbuf_recv
is called on a channel, but no sender is present, so it waits. - On thread 2,
chan_unbuf_send
is called, sees a receiver is waiting, copies the message immediately, and returns. - The kernel’s task scheduler arbitrarily decides to schedule thread 2 again immediately. As stated, it’s in a loop, so
chan_unbuf_send
is called again, sees a receiver is still “waiting” (even though the message was already copied), and immediately copies a new message overwriting the previous message!
Step 3 can happen any number of times overwriting messages before the scheduler could decide to run thread 1. Note that the same thing can happen with the sender and receiver roles reversed, i.e., the receiver could receive the same message multiple times thinking it’s a new message since the sender isn’t given a chance to run. (For simplicity, we’ll stick to the event sequence as originally presented, i.e., the sender is called multiple times.)
What’s needed is a way to force thread 2 to block after sending a message and wait on a condition variable. Since it’s blocked, the scheduler won’t schedule it again and it therefore will schedule thread 1 to allow its chan_unbuf_recv
to return and allow its loop to do whatever with the message.
This is what the cpy_done
condition variable is for. Additionally, both calls to pthread_cond_signal
are necessary to implement a handshake between the two threads.
Unbuffered Receive
The implementation of chan_unbuf_recv
is pretty much the same as chan_unbuf_send
except CHAN_RECV
and CHAN_SEND
are swapped:
static int chan_unbuf_recv( struct chan *chan, void *recv_buf,
struct timespec const *abs_time ) {
pthread_mutex_lock( &chan->mtx );
int rv = chan_unbuf_acquire( chan, CHAN_RECV, abs_time );
if ( rv == 0 ) {
chan->unbuf.recv_buf = recv_buf;
chan_signal( chan, CHAN_SEND, &pthread_cond_signal );
do {
if ( chan->unbuf.is_busy[ CHAN_SEND ] ) {
pthread_cond_signal( &chan->unbuf.cpy_done[ CHAN_SEND ] );
pthread_cond_wait( &chan->unbuf.cpy_done[ CHAN_RECV ], &chan->mtx );
pthread_cond_signal( &chan->unbuf.cpy_done[ CHAN_SEND ] );
break;
}
rv = chan_wait( chan, CHAN_RECV, abs_time );
} while ( rv == 0 );
chan->unbuf.recv_buf = NULL;
chan_unbuf_release( chan, CHAN_RECV );
}
pthread_mutex_unlock( &chan->mtx );
return rv;
}
The only other difference is the addition of the lines:
chan->unbuf.recv_buf = recv_buf;
// ...
chan->unbuf.recv_buf = NULL;
that tells the sender where to copy the message to directly and to reset it when done.
Channels in C API: Select
Just as we can’t add an <-
operator to C, we can't add a select
statement either, so, again, we’ll have to substitute an ordinary function call:
int chan_select( unsigned recv_len, struct chan *recv_chan[recv_len],
void *recv_buf[recv_len],
unsigned send_len, struct chan *send_chan[send_len],
void const *send_buf[send_len],
struct timespec const *duration );
That is you pass an optional array of channels to receive from (and the buffers to receive into), another optional array of channels to send from (and the buffers to send from), and a time-out. When a channel is selected, the function returns the index into either recv_chan
or send_chan
so you know which channel was selected. If no channel was selected, it returns -1 and sets errno
if there was an error.
You can use a chan_select
in a switch
statement. For example, the fibonacci
function in Go can be rewritten in C as:
void fibonacci( struct chan *fibc, struct chan *quitc ) {
unsigned fib = 0, prev_fib, next_fib = 1;
for (;;) {
switch ( chan_select(
1, (struct chan*[]){ quitc }, (void *[]){ NULL },
1, (struct chan*[]){ fibc }, (void const*[]){ &fib },
CHAN_NO_TIMEOUT ) ) {
case CHAN_SEND(0): // If sent number ...
prev_fib = fib; // ... calculate next number.
fib = next_fib;
next_fib += prev_fib;
break;
case CHAN_RECV(0): // If received quit ...
return; // ... quit.
}
}
}
CHAN_RECV
and CHAN_SEND
are macros used to discriminate between the returned index in the recv_chan
array from the send_chan
array:
#define CHAN_RECV(IDX) ((int)(IDX))
#define CHAN_SEND(IDX) (1024 + (int)(IDX))
If you’re wondering how there can be both an enumeration with constants
CHAN_RECV
andCHAN_SEND
and macros having the same names, the preprocessor doesn’t expand a macro with arguments unless the macro’s name is immediately followed by(
.
Admittedly, chan_select
is nowhere near as elegant as the select
statement in Go, but that’s the best you can do in C without direct language support.
Channels in C Implementation: Select
The code for send and receive is relatively straightforward (for multithreaded code). The code to implement select
, specifically a blocking select
, is quite a bit more complicated. Why? A blocking select
waits for any one of a number of channels to become ready and, unfortunately, the pthreads API has no way to pthread_cond_wait
for multiple condition variables simultaneously, one for each channel being selected from. But the signaler can signal any number of condition variables. Hence one approach is to have a linked list of “observers” for every channel where an observer is either the channel itself or an active select
in some thread.
Data Structures
To implement an observer, we’ll need another data structure (where the non-obvious members will be explained in due course):
typedef struct chan_obs_impl chan_obs_impl;
struct chan_obs_impl {
struct chan *chan; // The channel being observed.
chan_obs_impl *next; // The next observer, if any.
pthread_mutex_t *pmtx; // The mutex to use.
pthread_cond_t chan_ready; // Is chan ready?
unsigned key; // A fairly unique key.
};
In the chan
structure, the ready[2]
array will be replaced by:
chan_obs_impl observer[2]; // Receiver/sender.
When an observer is embedded inside a chan
structure, pmtx
will point to the channel’s own mtx
; when an observer is a chan_select
, pmtx
will point to a mutex in chan_select
’s stack frame (more later).
Given chan_obs_impl
, we’ll rename chan_signal
to chan_signal_all_obs
and expand its implementation to signal all observers:
static void chan_signal_all_obs( struct chan *chan, chan_dir dir,
int (*pthread_cond_fn)( pthread_cond_t* ) ) {
if ( chan->wait_cnt[ dir ] == 0 ) // Nobody is waiting.
return;
#ifndef NDEBUG
int debug_locked_cnt = 0;
#endif /* NDEBUG */
pthread_mutex_t *pmtx = NULL, *next_pmtx = NULL;
for ( chan_impl_obs *obs = &chan->observer[ dir ], *next_obs;
obs != NULL; obs = next_obs, pmtx = next_pmtx ) {
obs->chan = chan;
(*pthread_cond_fn)( &obs->chan_ready );
next_obs = obs->next;
if ( next_obs != NULL ) {
next_pmtx = next_obs->pmtx; // Do hand-over-hand locking:
pthread_mutex_lock( next_pmtx ); // lock next mutex ...
DEBUG_BLOCK( ++debug_locked_cnt; );
}
if ( pmtx != NULL ) {
pthread_mutex_unlock( pmtx ); // ... before unlocking. previous.
DEBUG_BLOCK( --debug_locked_cnt; );
}
} // for
assert( debug_locked_cnt == 0 );
}
The function starts at the channel’s own observer, then traverses the linked list of chan_select
observers following the next
pointers signaling each in turn. During the traversal, it performs hand-over-hand locking, that is it locks the next observer’s mutex before unlocking the previous observer’s mutex to ensure thread safety.
As a safety check for debugging, a debug_locked_cnt
counts the current number of mutexes locked. At the end of the function is an assert
to ensure the number of mutexes that were locked were also unlocked. DEBUG_BLOCK
is a macro that expands its arguments only if NDEBUG
is not defined.
We’ll also need another data structure to refer to a channel:
struct chan_select_ref {
struct chan *chan; // The channel referred to.
unsigned short param_idx; // Index into recv_chan/send_chan.
chan_dir dir; // Selected channel direction.
bool maybe_ready; // Is channel maybe ready?
};
typedef struct chan_select_ref chan_select_ref;
Channels in C Implementation: Select
The implementation for chan_select
starts off as:
int chan_select( unsigned recv_len, struct chan *recv_chan[recv_len],
void *recv_buf[recv_len],
unsigned send_len, struct chan *send_chan[send_len],
void const *send_buf[send_len],
struct timespec const *duration ) {
if ( unlikely( recv_len > 0 && (recv_chan == NULL || recv_buf == NULL) ) ||
unlikely( send_len > 0 && (send_chan == NULL || send_buf == NULL) ) ) {
errno = EINVAL;
return -1;
}
unsigned const total_channels = recv_len + send_len;
chan_select_ref stack_ref[16];
chan_select_ref *const ref = total_channels <= ARRAY_SIZE( stack_ref ) ?
stack_ref : malloc( total_channels * sizeof( chan_select_ref ) );
struct timespec abs_ts;
struct timespec const *const abs_time =
ts_dur_to_abs( duration, &abs_ts );
unsigned chans_open; // Number of open channels.
bool const is_blocking = duration != NULL;
chan_impl_obs select_obs; // Observer for this select.
pthread_mutex_t select_mtx; // Mutex for select_obs.
chan_select_ref const *selected_ref; // Reference to selected channel.
int rv;
// ...
The first few lines simply check for invalid arguments. Next, we attempt small-size optimization, specifically if the total number of channels is ≤ 16, we use a stack-based array of chan_select_ref
; otherwise we malloc
it. Next, abs_time
is calculated just as in other functions. Next, local variables are declared.
The next part of the implementation is:
if ( is_blocking ) {
pthread_mutex_init( &select_mtx, NULL );
chan_obs_init( &select_obs, &select_mtx );
chan_obs_init_key( &select_obs );
}
// ...
If this is a blocking select
, then we need to initialize the mutex we’ll use as well as our observer:
static void chan_obs_init( chan_impl_obs *obs, pthread_mutex_t *pmtx ) {
assert( obs != NULL );
obs->chan = NULL;
pthread_cond_init( &obs->chan_ready, NULL );
obs->key = 0;
obs->next = NULL;
obs->pmtx = pmtx;
}
static void chan_obs_init_key( chan_impl_obs *obs ) {
assert( obs != NULL );
static unsigned next_key = 1;
static pthread_mutex_t next_key_mtx = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock( &next_key_mtx );
obs->key = next_key++;
if ( next_key == 0 ) // Reserved for channel itself
++next_key;
pthread_mutex_unlock( &next_key_mtx );
}
The chan_obs_init_key
function initializes an observer’s key
to a (fairly) unique integer. Why does an observer need a key? It’s so when an observer is inserted into a linked list of observers, it’s inserted in ascending key order. Why does the list need to be in key order? As mentioned, when traversing the list, hand-over-hand locking is used. By having the list in key order, it means mutexes for adjacent observers are always locked in the same order eliminating the possibility of a deadlock.
The next part of the chan_select
implementation is:
do {
chans_open = 0;
rv = 0;
selected_ref = NULL;
unsigned const maybe_ready_len =
chan_select_init(
ref, &chans_open, recv_len, recv_chan, CHAN_RECV,
is_blocking ? &select_obs : NULL
) +
chan_select_init(
ref, &chans_open, send_len, send_chan, CHAN_SEND,
is_blocking ? &select_obs : NULL
);
if ( chans_open == 0 )
break;
// ...
It starts a big do
... while
loop and determines which of the given channels might be ready by calling chan_select_init
once each for the receive and send channels, also updating chans_open
. If all the channels are closed, we just break
and return.
The implementation of chan_select_init
is:
static unsigned chan_select_init( chan_select_ref ref[], unsigned *pref_len,
unsigned chan_len,
struct chan *chan[chan_len], chan_dir dir,
chan_impl_obs *add_obs ) {
assert( ref != NULL );
assert( pref_len != NULL );
assert( chan_len == 0 || chan != NULL );
unsigned maybe_ready_len = 0;
if ( chan != NULL ) {
for ( unsigned i = 0; i < chan_len; ++i ) {
bool is_ready = false;
pthread_mutex_lock( &chan[i]->mtx );
bool const is_closed = chan[i]->is_closed;
if ( !is_closed ) {
is_ready = chan[i]->buf_cap > 0 ?
(dir == CHAN_RECV ?
chan[i]->buf.ring_len > 0 :
chan[i]->buf.ring_len < chan[i]->buf_cap) :
chan[i]->wait_cnt[ !dir ] > 0;
if ( add_obs != NULL )
chan_add_obs( chan[i], dir, add_obs );
}
pthread_mutex_unlock( &chan[i]->mtx );
if ( is_closed )
continue;
if ( is_ready || add_obs != NULL ) {
ref[ (*pref_len)++ ] = (chan_select_ref){
.chan = chan[i],
.dir = dir,
.param_idx = (unsigned short)i,
.maybe_ready = is_ready
};
}
if ( is_ready )
++maybe_ready_len;
} // for
}
return maybe_ready_len;
}
It iterates through the array of channels looking for those that are not closed and determining whether each is ready.
The add_obs
parameter is not NULL
only for blocking select
s. For a blocking select
, we need to add its observer via chan_add_obs
:
static void chan_add_obs( struct chan *chan, chan_dir dir,
chan_impl_obs *add_obs ) {
assert( chan != NULL );
assert( add_obs != NULL );
#ifndef NDEBUG
int debug_locked_cnt = 0;
#endif /* NDEBUG */
pthread_mutex_t *pmtx = NULL, *next_pmtx = NULL;
for ( chan_impl_obs *obs = &chan->observer[ dir ], *next_obs; obs != NULL;
obs = next_obs, pmtx = next_pmtx ) {
next_obs = obs->next;
if ( next_obs == NULL ) { // At end of list.
obs->next = add_obs;
}
else {
next_pmtx = next_obs->pmtx; // Do hand-over-hand locking:
pthread_mutex_lock( next_pmtx ); // lock next mutex ... (1)
DEBUG_BLOCK( ++debug_locked_cnt; );
if ( add_obs->key < next_obs->key ) {
obs->next = add_obs;
add_obs->next = next_obs;
next_obs = NULL; // Will cause loop to exit ... (2)
pmtx = next_pmtx; // Will unlock next_mptx.
}
else {
assert( add_obs->key != next_obs->key );
}
}
if ( pmtx != NULL ) { // (2) ... yet still runs this code.
pthread_mutex_unlock( pmtx ); // (1) ... before unlocking previous.
DEBUG_BLOCK( --debug_locked_cnt; );
}
} // for
assert( debug_locked_cnt == 0 );
++chan->wait_cnt[ dir ];
}
This function is similar to chan_signal_all_obs
in its list traversal. The difference is that it inserts the current select
’s key into the list when it finds an observer with a key > its own. It also increments wait_cnt
since the select
will be waiting.
In chan_select_init
, the lines:
if ( is_ready || add_obs != NULL ) {
ref[ (*pref_len)++ ] = (chan_select_ref){
.chan = chan[i],
.dir = dir,
.param_idx = (unsigned short)i,
.maybe_ready = is_ready
};
}
initialize the chan_select_ref
for the current channel.
If chan_select_init
checks whether channels are ready, why does it return a count of the channels that may be ready? Because even if a channel is ready at the time it’s checked, as soon as its mutex is unlocked, another thread might operate on the channel and make it no longer ready — “maybe” is the best we can do.
The next part of the chan_select
implementation is:
unsigned select_len = chans_open;
if ( maybe_ready_len > 0 && maybe_ready_len < chans_open ) {
qsort( // Sort maybe ready channels first ...
ref, chans_open, sizeof( chan_select_ref ),
(qsort_cmp_fn)&chan_select_ref_cmp
);
select_len = maybe_ready_len; // ... and select only from those.
}
else {
// Otherwise, either no or all channels are ready, so there's
// no need to sort them.
}
// ...
The chan_select_ref
array is now initialized. If maybe_ready_len
> 0, but < chans_open
, it means that only some of the channels may be ready, so sort the array putting those first to increase the odds we’ll select a channel that’s ready using the chan_select_ref_cmp
function:
static int chan_select_ref_cmp( chan_select_ref const *i_csr,
chan_select_ref const *j_csr ) {
// sort maybe_ready (true, aka, 1) before !maybe_ready (false, aka, 0)
return (int)j_csr->maybe_ready - (int)i_csr->maybe_ready;
}
As stated in the comment, if either no or all channels are ready, there’s no need to sort them.
The next part of the chan_select
implementation is:
struct timespec const *might_as_well_wait_time;
if ( maybe_ready_len == 0 && is_blocking ) {
// None of the channels may be ready and we should wait -- so wait.
pthread_mutex_lock( &select_mtx );
if ( pthread_cond_wait_wrapper( &select_obs.chan_ready, &select_mtx,
abs_time ) == ETIMEDOUT ) {
rv = ETIMEDOUT;
}
pthread_mutex_unlock( &select_mtx );
if ( rv == 0 ) { // A channel became ready: find it.
for ( unsigned i = 0; i < select_len; ++i ) {
if ( select_obs.chan == ref[i].chan ) {
selected_ref = &ref[i];
break;
}
} // for
assert( selected_ref != NULL );
}
might_as_well_wait_time = NULL;
}
else {
// Some or all channels may be ready: pick one at random.
static pthread_once_t once = PTHREAD_ONCE_INIT;
pthread_once( &once, &srand_init );
selected_ref = &ref[ rand() % (int)select_len ];
might_as_well_wait_time = chans_open == 1 ? abs_time : NULL;
}
// ...
If none of the channels are ready and we’re a blocking select
, then wait for one to become ready; otherwise just pick a maybe ready channel at random after calling srand
once:
static void srand_init( void ) {
struct timeval now;
(void)gettimeofday( &now, /*tzp=*/NULL );
srand( (unsigned)now.tv_usec );
}
In the degenerate case of only one channel being open, set might_as_well_wait_time
to abs_time
since we might as well just wait for it.
The next part of the chan_select
implementation is:
if ( selected_ref != NULL ) {
struct chan *const selected_chan = selected_ref->chan;
rv = selected_ref->dir == CHAN_RECV ?
selected_chan->buf_cap > 0 ?
chan_buf_recv(
selected_chan, recv_buf[ selected_ref->param_idx ],
might_as_well_wait_time
) :
chan_unbuf_recv(
selected_chan, recv_buf[ selected_ref->param_idx ],
might_as_well_wait_time
)
:
selected_chan->buf_cap > 0 ?
chan_buf_send(
selected_chan, send_buf[ selected_ref->param_idx ],
might_as_well_wait_time
) :
chan_unbuf_send(
selected_chan, send_buf[ selected_ref->param_idx ],
might_as_well_wait_time
);
}
// ...
If we’ve selected a channel, then attempt either to receive from or send to the channel using might_as_well_wait_time
:
- If the channel is still ready, we won’t wait.
- If there’s only a single channel open, wait for it to become ready.
- Otherwise don’t wait: we’ll try another channel.
At this point, either we successfully sent or received on the selected channel, it became no longer ready, or the time-out expired. We need to do a bit of clean-up. The next part of the chan_select
implementation is:
if ( is_blocking ) {
for ( unsigned i = 0; i < recv_len; ++i )
chan_remove_obs( recv_chan[i], CHAN_RECV, &select_obs );
for ( unsigned i = 0; i < send_len; ++i )
chan_remove_obs( send_chan[i], CHAN_SEND, &select_obs );
}
// ...
If we’re a blocking select, we need to remove ourselves from our channels’ list of observers:
static void chan_remove_obs( struct chan *chan, chan_dir dir,
chan_impl_obs *remove_obs ) {
assert( chan != NULL );
assert( remove_obs != NULL );
#ifndef NDEBUG
int debug_locked_cnt = 0;
bool debug_removed = false;
#endif /* NDEBUG */
pthread_mutex_t *pmtx = NULL, *next_pmtx = NULL;
pthread_mutex_lock( &chan->mtx );
for ( chan_impl_obs *obs = &chan->observer[ dir ], *next_obs; obs != NULL;
obs = next_obs, pmtx = next_pmtx ) {
next_obs = obs->next;
if ( next_obs == remove_obs ) {
// remove_obs is an observer in our caller's stack frame, i.e.,
// this thread, so there's no need to lock next_obs->pmtx.
obs->next = next_obs->next;
next_obs = NULL; // Will cause loop to exit ... (1)
DEBUG_BLOCK( debug_removed = true; );
}
else if ( next_obs != NULL ) {
next_pmtx = next_obs->pmtx; // Do hand-over-hand locking:
PTHREAD_MUTEX_LOCK( next_pmtx ); // lock next mutex ... (2)
DEBUG_BLOCK( ++debug_locked_cnt; );
}
if ( pmtx != NULL ) { // (1) ... yet still runs this code.
PTHREAD_MUTEX_UNLOCK( pmtx ); // (2) ... before unlocking previous.
DEBUG_BLOCK( --debug_locked_cnt; );
}
} // for
--chan->wait_cnt[ dir ];
pthread_mutex_unlock( &chan->mtx );
assert( debug_locked_cnt == 0 );
assert( debug_removed );
}
The function does the same hand-over-hand locking looking for the observer to remove.
The next part of the chan_select
implementation ends the big do
... while
loop:
} while ( rv == EAGAIN || (rv == EPIPE && chans_open > 1) );
// ...
If rv
is 0
, we succeeded; if ETIMEDOUT
, we timed-out. In either of those cases we’re done and shouldn’t try again. However, if rv
is:
-
EAGAIN
, we selected a channel that isn’t ready (even if it had been ready shortly before); -
EPIPE
, we selected a channel that was open when we originally checked, but closed now.
In either of the latter two cases, we should try again, hence the loop.
The last part of the chan_select
implementation is:
if ( selected_ref == NULL ) {
if ( rv != 0 )
errno = rv;
rv = -1;
}
else {
rv = selected_ref->dir == CHAN_RECV ?
CHAN_RECV( selected_ref->param_idx ) :
CHAN_SEND( selected_ref->param_idx );
}
if ( is_blocking ) {
chan_obs_cleanup( &select_obs );
pthread_mutex_destroy( &select_mtx );
}
if ( ref != stack_ref )
free( ref );
return rv;
}
If we didn’t select a channel and rv
is non-zero, set errno
and return -1
; otherwise, set rv
to the index into either the chan_recv
or chan_send
array. Lastly, a bit more clean-up.
Conclusion
The full source code for this implementation, C Chan ,is available. If you ignore the syntactic sugar Go provides for dealing with channels, C Chan is perfectly usable in real-world programs.
Epilogue
Weren’t there already other Go-like channel libraries written in C? Yes, but the ones I could find that are serious are either incomplete (e.g., don’t support blocking select
) or whose code is buggy due to having race conditions.
What about a C++ version? Many C libraries can have trivial C++ wrappers written for them to get full type support (to deal with T*
rather than void*
) and RAII. Unfortunately, a trivial C++ wrapper can’t be written for C Chan because, in a C++ version, you want messages to be copied as objects of type T
using T
’s =
operator, not memcpy
; or you want the option to std::move
messages instead. C Chan itself would have to be modified to support alternatives to memcpy
.