Go-Like Channels in C
Paul J. Lucas

Paul J. Lucas @pauljlucas

About: C++ Jedi Master

Location:
San Francisco Bay Area
Joined:
Jan 21, 2017

Go-Like Channels in C

Publish Date: Jun 27
0 0

Introduction

To communicate among threads in a multithreaded program, you typically need to use mutexes to share the memory of variables. While that works, writing such programs is often hard to get right, even for experts. Not holding a lock when you should can lead to subtle and hard-to-reproduce bugs; holding multiple locks simultaneously can lead to deadlocks.

A channel is a data structure used for message passing among threads.

Or processes, but the focus here is on threads. For processes on Unix, there are pipes.

Instead of communicating among threads by sharing memory (requiring explicitly using mutexes and copying), you share memory by communicating (only explicitly copying).

Notes to the reader

This article assumes familiarity with traditional multithreaded programming, specifically the concepts of mutexes, condition variables, and the pthreads API specifically.

Channels in Go

While channels have been around since 1978, they’ve been popularized by having direct support in the Go programming language. For example, in Go:

func sum_array(vals []int, result chan int) {
  sum := 0
  for _, val := range vals {
    sum += val
  }
  result <- sum          // Send partial sum to channel.
}

func main() {
  vals := []int{ 7, 2, 8, -9, 4, 0 };
  c := make(chan int)
  go sum_array(vals[:len(s)/2], c)
  go sum_array(vals[len(s)/2:], c)
  sum := <-c + <-c       // Receive sums from channel.
}
Enter fullscreen mode Exit fullscreen mode

That code uses two “goroutines” (light-weight threads, created by the lines starting with go), each calling sum_array to sum half an array of integers. Each partial sum is then sent to the channel c using the <- binary operator. In main, the partial sums are read from the channel using the <- unary operator. Notice that there is no explicit use of mutexes, condition variables, or signaling between threads. The <- operator handles all that behind the curtain for you.

That code uses an unbuffered channel, that is a sender will block until a receiver is available and vice versa, i.e., the threads have to rendezvous. Go also supports buffered channels of a fixed size, e.g.:

  c := make(chan int 16) // Buffered channel, 16 capacity.
Enter fullscreen mode Exit fullscreen mode

For buffered channels, senders can send without blocking as long as the channel isn’t full; receivers can receive without blocking as long as the channel isn’t empty.

In addition to the one-at-a-time operations of send and receive, Go also has a select statement that allows you to select from multiple channels simultaneously:

func fibonacci(fibc chan int, quitc chan bool) {
  fib, next_fib := 0, 1
  for {
    select {
      case fibc <- fib:  // If sent number, calc next number.
        fib, next_fib = next_fib, fib + next_fib
      case <-quitc:      // If received quit, return.
        return
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

That code calculates Fibonacci numbers and sends them to the fibc channel. The select waits until either there is a receiver to receive the next result or any value is received on the quitc channel.

The select statement may have an optional default case that’s selected only if no channels are ready. Lastly, channels may be closed to indicate to receivers that no more messages are coming.

Channels in C API: Basics

To create a channel in C, we can do:

struct chan c;
chan_init( &c, 100, sizeof(int) );
Enter fullscreen mode Exit fullscreen mode

where 100 specifies the capacity of the buffer (if 0, the channel will be unbuffered) and sizeof specifies the size of each message.

Since we can’t add an <- operator to C, we’ll have to substitute ordinary function calls:

int chan_send( struct chan *chan, void const *send_buf,
               struct timespec const *duration );

int chan_recv( struct chan *chan, void *recv_buf,
               struct timespec const *duration );

extern struct timespec const *const CHAN_NO_TIMEOUT;
Enter fullscreen mode Exit fullscreen mode

But this allows us to specify an optional time-out, something that can’t be done directly in Go. The value NULL means “don’t wait,” CHAN_NO_TIMEOUT means “wait indefinitely,” and any other value means to wait for that amount of time.

To implement a time-out in Go, you typically select on the channel(s) of interest, plus an additional channel that receives a value after a time-out has expired.

The functions return a standard int error code:

  • 0: success;
  • EINVAL: invalid argument;
  • EPIPE: channel is closed;
  • EAGAIN: duration is NULL and can’t immediately send or receive;
  • ETIMEDOUT: duration has expired.

There’s also:

void chan_close( struct chan *chan );

void chan_cleanup( struct chan *chan,
                   void (*msg_cleanup_fn)( void* ) );
Enter fullscreen mode Exit fullscreen mode

where chan_close closes the channel and chan_cleanup cleans-up its internal resources. The msg_cleanup_fn parameter is used only for buffered channels to clean-up each unreceived message, if any. It may be NULL for unbuffered channels or if no clean-up is necessary.

There’s no equivalent of chan_cleanup in Go because Go has garbage collection, so channels and unreceived messages are cleaned-up automatically.

The API for chan_select is more complicated, so will be deferred until later.

Channels in C Implementation: Basics

This part of the implementation covers the data structure for channels, initialization, clean-up, time-outs, and buffered send and receive.

Data Structures

To implement channels, we’ll obviously need a data structure:

struct chan {
  union {
    struct {
      void           *ring_buf;     // Message ring buffer.
      unsigned        ring_len;     // Number of messages.
      unsigned        recv_idx;     // Receive index.
      unsigned        send_idx;     // Send index.
    } buf;
    struct {
      void           *recv_buf;     // Where to put a message.
      pthread_cond_t  cpy_done[2];  // Receive/send copy done.
      pthread_cond_t  not_busy[2];  // Receive/send no longer busy.
      bool            is_busy[2];   // Is recv/send busy?
    } unbuf;
  };

  pthread_cond_t      ready[2];     // Receive/send ready.
  unsigned short      wait_cnt[2];  // Waiting to receive/send.

  pthread_mutex_t     mtx;          // Channel mutex.
  size_t              msg_size;     // Message size.
  unsigned            buf_cap;      // 0 = unbuffered.
  bool                is_closed;    // Is channel closed?
};
Enter fullscreen mode Exit fullscreen mode

Approximately half of the structure’s members are within an anonymous union grouped by buf and unbuf structures. For buffered channels, messages will be stored in a ring buffer. For unbuffered channels, recv_buf is a pointer to where a sender should copy a message to. (An explanation of the remaining members will be in due course.)

The two ready pthread condition variables mean “receiver can proceed” because either a sender is available (for an unbuffered channel) or the buffer isn’t empty (for a buffered channel) and “sender can proceed” because either a receiver is available (for an unbuffered channel) or the buffer isn’t full (for a buffered channel), respectively.

The two wait_cnt counts the number of threads waiting to receive and send, respectively. The rest of the members are straightforward.

Internally, we’ll also define:

enum chan_dir {
  CHAN_RECV,                    // Receive direction.
  CHAN_SEND                     // Send direction.
};
typedef enum chan_dir chan_dir;
Enter fullscreen mode Exit fullscreen mode

to be more mnemonic indices into the is_busy, not_busy, ready, wait_cnt, and cpy_done arrays than 0 and 1.

Initialization, Close, & Clean-Up

The implementation of chan_init is fairly straightforward:

int chan_init( struct chan *chan, unsigned buf_cap,
               size_t msg_size ) {
  assert( chan != NULL );

  if ( buf_cap > 0 ) {
    if ( unlikely( msg_size == 0 ) )
      return EINVAL;
    chan->buf.ring_buf = malloc( buf_cap * msg_size );
    if ( chan->buf.ring_buf == NULL )
      return ENOMEM;
    chan->buf.recv_idx = chan->buf.send_idx = 0;
    chan->buf.ring_len = 0;
  }
  else {
    chan->unbuf.recv_buf = NULL;
    pthread_cond_init( &chan->unbuf.cpy_done[ CHAN_RECV ], NULL );
    pthread_cond_init( &chan->unbuf.cpy_done[ CHAN_SEND ], NULL );
    pthread_cond_init( &chan->unbuf.not_busy[ CHAN_RECV ], NULL );
    pthread_cond_init( &chan->unbuf.not_busy[ CHAN_SEND ], NULL );
    chan->unbuf.is_busy[ CHAN_RECV ] = false;
    chan->unbuf.is_busy[ CHAN_SEND ] = false;
  }

  chan->buf_cap = buf_cap;
  chan->msg_size = msg_size;
  chan->is_closed = false;

  pthread_mutex_init( &chan->mtx, NULL );
  pthread_cond_init( &chan->ready[ CHAN_RECV ], NULL );
  pthread_cond_init( &chan->ready[ CHAN_SEND ], NULL );
  chan->wait_cnt[ CHAN_RECV ] = 0
  chan->wait_cnt[ CHAN_SEND ] = 0;

  return 0;
}
Enter fullscreen mode Exit fullscreen mode

For information about likely and unlikely, see here.

Not mentioned previously is that chan_init returns an int where zero indicates success and non-zero is an error code.

Also not mentioned previously is that, for an unbuffered channel, we allow msg_size to be zero. This is marginally useful when you simply want one thread to signal another and the message contents are irrelevant.

In Go, you can use a chan bool as was done in the earlier example with the quitc channel. The fact that it received any value is what matters; whether it was true or false doesn’t matter.

The implementation of chan_close is:

void chan_close( struct chan *chan ) {
  assert( chan != NULL );
  pthread_mutex_lock( &chan->mtx );
  bool const was_already_closed = chan->is_closed;
  chan->is_closed = true;
  pthread_mutex_unlock( &chan->mtx );
  if ( was_already_closed )
    return;
  chan_signal( chan, CHAN_RECV, &pthread_cond_broadcast );
  chan_signal( chan, CHAN_SEND, &pthread_cond_broadcast );
  if ( chan->buf_cap == 0 ) {
    pthread_cond_broadcast( &chan->unbuf.cpy_done[ CHAN_RECV ] );
    pthread_cond_broadcast( &chan->unbuf.cpy_done[ CHAN_SEND ] );
    pthread_cond_broadcast( &chan->unbuf.not_busy[ CHAN_RECV ] );
    pthread_cond_broadcast( &chan->unbuf.not_busy[ CHAN_SEND ] );
  }
}
Enter fullscreen mode Exit fullscreen mode

If the channel was not already closed, use pthread_cond_broadcast to wake up all threads that may be waiting on condition variables.

The chan_signal function signals the relevant condition variable, but only if there are actually other threads waiting:

static void chan_signal( struct chan *chan, chan_dir dir,
                         int (*pthread_cond_fn)( pthread_cond_t* ) ) {
  if ( chan->wait_cnt[ dir ] > 0 )
    (*pthread_cond_fn)( &chan->ready[ dir ] );
}
Enter fullscreen mode Exit fullscreen mode

The implementation of chan_cleanup is also fairly straightforward:

void chan_cleanup( struct chan *chan,
                   void (*msg_cleanup_fn)( void* ) ) {
  if ( chan == NULL )
    return;

  if ( chan->buf_cap > 0 ) {
    if ( chan->buf.ring_len > 0 && msg_cleanup_fn != NULL ) {
      unsigned idx = chan->buf.recv_idx;
      for ( unsigned i = 0; i < chan->buf.ring_len; ++i ) {
        (*msg_cleanup_fn)( chan_buf_at( chan, idx ) );
        idx = (idx + 1) % chan->buf_cap;
      }
    }
    free( chan->buf.ring_buf );
  }
  else {
    pthread_cond_destroy( &chan->unbuf.cpy_done[ CHAN_RECV ] );
    pthread_cond_destroy( &chan->unbuf.cpy_done[ CHAN_SEND ] );
    pthread_cond_destroy( &chan->unbuf.not_busy[ CHAN_RECV ] );
    pthread_cond_destroy( &chan->unbuf.not_busy[ CHAN_SEND ] );
  }

  pthread_cond_destroy( &chan->ready[ CHAN_RECV ] );
  pthread_cond_destroy( &chan->ready[ CHAN_SEND ] );
  pthread_mutex_destroy( &chan->mtx );
}
Enter fullscreen mode Exit fullscreen mode

where chan_buf_at is a utility function to get the address of the ith message in the ring buffer:

static inline void* chan_buf_at( struct chan *chan, unsigned abs_idx ) {
  return (char*)chan->buf.ring_buf + abs_idx * chan->msg_size;
}
Enter fullscreen mode Exit fullscreen mode

Send, Receive, & Time

The implementation of chan_send is:

int chan_send( struct chan *chan, void const *send_buf,
               struct timespec const *duration ) {
  assert( chan != NULL );

  struct timespec abs_ts;
  struct timespec const *const abs_time =
    ts_dur_to_abs( duration, &abs_ts );

  if ( chan->buf_cap > 0 ) {
    if ( unlikely( send_buf == NULL ) )
      return EINVAL;
    return chan_buf_send( chan, send_buf, abs_time );
  }

  if ( unlikely( chan->msg_size > 0 && send_buf == NULL ) )
    return EINVAL;
  return chan_unbuf_send( chan, send_buf, abs_time );
}
Enter fullscreen mode Exit fullscreen mode

In the channel API, we allow the user to specify a time-out as a duration, i.e., a time relative to now, because it’s simpler. However, in the pthreads API, a time-out must be specified as an absolute time in the future. Hence, we need to convert a relative to an absolute time:

static struct timespec const*
ts_dur_to_abs( struct timespec const *duration,
               struct timespec *abs_time ) {
  assert( abs_time != NULL );

  if ( duration == NULL || duration == CHAN_NO_TIMEOUT )
    return duration;

  struct timeval now;
  (void)gettimeofday( &now, NULL );

  *abs_time = (struct timespec){
    .tv_sec  = now.tv_sec         + duration->tv_sec,
    .tv_nsec = now.tv_usec * 1000 + duration->tv_nsec
  };

  return abs_time;
}
Enter fullscreen mode Exit fullscreen mode

Once that’s done, chan_send calls either chan_buf_send or chan_unbuf_send.

The implementation of chan_recv is similar:

int chan_recv( struct chan *chan, void *recv_buf,
               struct timespec const *duration ) {
  assert( chan != NULL );

  struct timespec abs_ts;
  struct timespec const *const abs_time =
    ts_dur_to_abs( duration, &abs_ts );

  if ( chan->buf_cap > 0 ) {
    if ( unlikely( recv_buf == NULL ) )
      return EINVAL;
    return chan_buf_recv( chan, recv_buf, abs_time );
  }

  if ( unlikely( chan->msg_size > 0 && recv_buf == NULL ) )
    return EINVAL;
  return chan_unbuf_recv( chan, recv_buf, abs_time );
}
Enter fullscreen mode Exit fullscreen mode

Channels in C Implementation: Buffered Send & Receive

This part of the implementation covers buffered send and receive. They’re a fairly straightforward implementation of the classic producer-consumer problem.

Buffered Send

The implementation of chan_buf_send is:

static int chan_buf_send( struct chan *chan, void const *send_buf,
                          struct timespec const *abs_time ) {
  int rv = 0;
  pthread_mutex_lock( &chan->mtx );

  do {
    if ( chan->is_closed ) {
      rv = EPIPE;
    }
    else if ( chan->buf.ring_len < chan->buf_cap ) {
      memcpy( chan_buf_at( chan, chan->buf.send_idx ), send_buf,
              chan->msg_size );
      chan->buf.send_idx = (chan->buf.send_idx + 1) % chan->buf_cap;
      if ( ++chan->buf.ring_len == 1 )
        chan_signal( chan, CHAN_BUF_NOT_EMPTY, &pthread_cond_signal );
      break;
    }
    else {
      rv = chan_wait( chan, CHAN_BUF_NOT_FULL, abs_time );
    }
  } while ( rv == 0 );

  pthread_mutex_unlock( &chan->mtx );
  return rv;
}
Enter fullscreen mode Exit fullscreen mode

There are three cases:

  1. The channel is closed: return EPIPE.
  2. The channel isn’t full: copy the message into the buffer and increment its length. If the new length is 1, it means the channel was empty but isn’t now, so signal any receivers that may be waiting.
  3. Otherwise, wait until abs_time for the channel to become not full.

For mnemonic convenience, we define these macros to specify which condition we either want to notify about or wait for:

#define CHAN_BUF_NOT_EMPTY    CHAN_RECV
#define CHAN_BUF_NOT_FULL     CHAN_SEND
Enter fullscreen mode Exit fullscreen mode

The chan_wait function is used to wait until a channel is “ready” (can proceed):

static int chan_wait( struct chan *chan, chan_dir dir,
                      struct timespec const *abs_time ) {
  assert( chan != NULL );

  if ( chan->is_closed )
    return EPIPE;

  ++chan->wait_cnt[ dir ];
  int const rv =
    pthread_cond_wait_wrapper( &chan->ready[ dir ].chan_ready,
                               &chan->mtx, abs_time );
  --chan->wait_cnt[ dir ];

  return chan->is_closed ? EPIPE : rv;
}
Enter fullscreen mode Exit fullscreen mode

If the channel is closed, return EPIPE; otherwise increment the relevant wait_cnt and wait. When the wait returns, decrement the relevant wait_cnt. If the channel was closed while waiting, return EPIPE; otherwise return whatever the wrapper returned.

The pthread_cond_wait_wrapper function handles the special cases for abs_time, specifically if NULL, returns EAGAIN; if CHAN_NO_TIMEOUT, calls pthread_cond_wait; otherwise calls pthread_cond_timedwait:

static int pthread_cond_wait_wrapper( pthread_cond_t *cond,
                                      pthread_mutex_t *mtx,
                                      struct timespec const *abs_time ) {
  assert( cond != NULL );
  assert( mtx != NULL );

  if ( abs_time == NULL )
    return EAGAIN;

  int const pcw_rv = abs_time == CHAN_NO_TIMEOUT ?
    pthread_cond_wait( cond, mtx ) :
    pthread_cond_timedwait( cond, mtx, abs_time );

  switch ( pcw_rv ) {
    case 0:
    case ETIMEDOUT:
      return pcw_rv;
    default:
      errno = pcw_rv;
      perror( PACKAGE );  // defined by Autotools
      exit( 1 );
  }
}
Enter fullscreen mode Exit fullscreen mode

Buffered Receive

The implementation of chan_buf_recv is:

static int chan_buf_recv( struct chan *chan, void *recv_buf,
                          struct timespec const *abs_time ) {
  int rv = 0;
  pthread_mutex_lock( &chan->mtx );

  do {
    // Since we can still read from a closed, non-empty, buffered
    // channel, there's no check for is_closed first.
    if ( chan->buf.ring_len > 0 ) {
      memcpy( recv_buf, chan_buf_at( chan, chan->buf.recv_idx ),
              chan->msg_size );
      chan->buf.recv_idx = (chan->buf.recv_idx + 1) % chan->buf_cap;
      if ( chan->buf.ring_len-- == chan->buf_cap )
        chan_signal( chan, CHAN_BUF_NOT_FULL, &pthread_cond_signal );
      break;
    }
    rv = chan_wait( chan, CHAN_BUF_NOT_EMPTY, abs_time );
  } while ( rv == 0 );

  pthread_mutex_unlock( &chan->mtx );
  return rv;
}
Enter fullscreen mode Exit fullscreen mode

There are two cases:

  1. The channel isn’t empty: copy the message from the buffer and decrement its length. If the old length was its capacity, it means the channel was full but isn’t now, so notify any receivers that may be waiting.
  2. Otherwise, wait until abs_time for the channel to become not empty.

Channels in C Implementation: Unbuffered Send & Receive

This part of the implementation covers unbuffered send and receive. They’re a bit trickier to implement because:

  1. The sender and receiver have to rendezvous, i.e., meet at the same place (in memory) and time to copy a message.
  2. However, this does have the advantage of enabling the sender to copy a message directly to where the receiver wants it with only a single memcpy rather than the two required for buffered channels.
  3. While either a sender (or receiver) is waiting for a receiver (or sender), additional senders (or receivers) must be excluded since, for a single channel, only a single sender and receiver can rendezvous.
  4. After a message is copied, the sender has to block to allow the receiver to do something with the message, before attempting to send another message (more later).

Unbuffered Send

The implementation of chan_unbuf_send is:

static int chan_unbuf_send( struct chan *chan, void const *send_buf,
                            struct timespec const *abs_time ) {
  pthread_mutex_lock( &chan->mtx );

  int rv = chan_unbuf_acquire( chan, CHAN_SEND, abs_time );
  if ( rv == 0 ) {
    chan_signal( chan, CHAN_RECV, &pthread_cond_signal );

    do {
      if ( chan->unbuf.is_busy[ CHAN_RECV ] ) {
        if ( chan->msg_size > 0 )
          memcpy( chan->unbuf.recv_buf, send_buf, chan->msg_size );
        pthread_cond_signal( &chan->unbuf.cpy_done[ CHAN_RECV ] );
        pthread_cond_wait( &chan->unbuf.cpy_done[ CHAN_SEND ], &chan->mtx );
        pthread_cond_signal( &chan->unbuf.cpy_done[ CHAN_RECV ] );
        break;
      }
      rv = chan_wait( chan, CHAN_SEND, abs_time );
    } while ( rv == 0 );

    chan_unbuf_release( chan, CHAN_SEND );
  }

  pthread_mutex_unlock( &chan->mtx );
  return rv;
}
Enter fullscreen mode Exit fullscreen mode

The chan_unbuf_acquire function is used to acquire exclusive access to an unbuffered channel for either sending or receiving:

static int chan_unbuf_acquire( struct chan *chan, chan_dir dir,
                               struct timespec const *abs_time ) {
  int rv = 0;
  while ( rv == 0 && chan->unbuf.is_busy[ dir ] ) {
    rv = chan->is_closed ? EPIPE :
      pthread_cond_wait_wrapper( &chan->unbuf.not_busy[ dir ],
                                 &chan->mtx, abs_time );
  } // while
  if ( rv == 0 )
    chan->unbuf.is_busy[ dir ] = true;
  return rv;
}
Enter fullscreen mode Exit fullscreen mode

This is what the is_busy[2] is for: to know whether either the send end, receive end, or both are “busy” with a thread. If busy, the thread waits until either the relevant not_busy condition has been signaled or the time-out expires. There’s also a corresponding release function:

static void chan_unbuf_release( struct chan *chan, chan_dir dir ) {
  chan->unbuf.is_busy[ dir ] = false;
  pthread_cond_signal( &chan->unbuf.not_busy[ dir ] );
}
Enter fullscreen mode Exit fullscreen mode

In chan_unbuf_send, the line:

if ( chan->unbuf.is_busy[ CHAN_RECV ] ) {
Enter fullscreen mode Exit fullscreen mode

checks whether the channel is busy for a receiver: if so, it means there is a receiver already waiting to rendezvous with the sender, so the sender can proceed to copy the message immediately:

if ( chan->msg_size > 0 )
  memcpy( chan->unbuf.recv_buf, send_buf, chan->msg_size );
Enter fullscreen mode Exit fullscreen mode

As a reminder, for an unbuffered channel, we allow msg_size to be zero for only signaling (no message copying) between threads.

The lines:

pthread_cond_signal( &chan->unbuf.cpy_done[ CHAN_RECV ] );
pthread_cond_wait( &chan->unbuf.cpy_done[ CHAN_SEND ], &chan->mtx );
pthread_cond_signal( &chan->unbuf.cpy_done[ CHAN_RECV ] );
Enter fullscreen mode Exit fullscreen mode

address point 4 above, that is the sender has to block to allow the receiver to do something with the message, before attempting to send another message. To understand the problem, consider the following sequence of events between two threads where each thread is in a loop, one sending and the other receiving:

  1. On thread 1, chan_unbuf_recv is called on a channel, but no sender is present, so it waits.
  2. On thread 2, chan_unbuf_send is called, sees a receiver is waiting, copies the message immediately, and returns.
  3. The kernel’s task scheduler arbitrarily decides to schedule thread 2 again immediately. As stated, it’s in a loop, so chan_unbuf_send is called again, sees a receiver is still “waiting” (even though the message was already copied), and immediately copies a new message overwriting the previous message!

Step 3 can happen any number of times overwriting messages before the scheduler could decide to run thread 1. Note that the same thing can happen with the sender and receiver roles reversed, i.e., the receiver could receive the same message multiple times thinking it’s a new message since the sender isn’t given a chance to run. (For simplicity, we’ll stick to the event sequence as originally presented, i.e., the sender is called multiple times.)

What’s needed is a way to force thread 2 to block after sending a message and wait on a condition variable. Since it’s blocked, the scheduler won’t schedule it again and it therefore will schedule thread 1 to allow its chan_unbuf_recv to return and allow its loop to do whatever with the message.

This is what the cpy_done condition variable is for. Additionally, both calls to pthread_cond_signal are necessary to implement a handshake between the two threads.

Unbuffered Receive

The implementation of chan_unbuf_recv is pretty much the same as chan_unbuf_send except CHAN_RECV and CHAN_SEND are swapped:

static int chan_unbuf_recv( struct chan *chan, void *recv_buf,
                            struct timespec const *abs_time ) {
  pthread_mutex_lock( &chan->mtx );

  int rv = chan_unbuf_acquire( chan, CHAN_RECV, abs_time );
  if ( rv == 0 ) {
    chan->unbuf.recv_buf = recv_buf;
    chan_signal( chan, CHAN_SEND, &pthread_cond_signal );

    do {
      if ( chan->unbuf.is_busy[ CHAN_SEND ] ) {
        pthread_cond_signal( &chan->unbuf.cpy_done[ CHAN_SEND ] );
        pthread_cond_wait( &chan->unbuf.cpy_done[ CHAN_RECV ], &chan->mtx );
        pthread_cond_signal( &chan->unbuf.cpy_done[ CHAN_SEND ] );
        break;
      }
      rv = chan_wait( chan, CHAN_RECV, abs_time );
    } while ( rv == 0 );

    chan->unbuf.recv_buf = NULL;
    chan_unbuf_release( chan, CHAN_RECV );
  }

  pthread_mutex_unlock( &chan->mtx );
  return rv;
}
Enter fullscreen mode Exit fullscreen mode

The only other difference is the addition of the lines:

chan->unbuf.recv_buf = recv_buf;
// ...
chan->unbuf.recv_buf = NULL;
Enter fullscreen mode Exit fullscreen mode

that tells the sender where to copy the message to directly and to reset it when done.

Channels in C API: Select

Just as we can’t add an <- operator to C, we can't add a select statement either, so, again, we’ll have to substitute an ordinary function call:

int chan_select( unsigned recv_len, struct chan *recv_chan[recv_len],
                 void *recv_buf[recv_len],
                 unsigned send_len, struct chan *send_chan[send_len],
                 void const *send_buf[send_len],
                 struct timespec const *duration );
Enter fullscreen mode Exit fullscreen mode

That is you pass an optional array of channels to receive from (and the buffers to receive into), another optional array of channels to send from (and the buffers to send from), and a time-out. When a channel is selected, the function returns the index into either recv_chan or send_chan so you know which channel was selected. If no channel was selected, it returns -1 and sets errno if there was an error.

You can use a chan_select in a switch statement. For example, the fibonacci function in Go can be rewritten in C as:

void fibonacci( struct chan *fibc, struct chan *quitc ) {
  unsigned fib = 0, prev_fib, next_fib = 1;
  for (;;) {
    switch ( chan_select(
               1, (struct chan*[]){ quitc }, (void      *[]){ NULL },
               1, (struct chan*[]){ fibc  }, (void const*[]){ &fib },
               CHAN_NO_TIMEOUT ) ) {
      case CHAN_SEND(0): // If sent number ...
        prev_fib = fib;  // ... calculate next number.
        fib = next_fib;
        next_fib += prev_fib;
        break;
      case CHAN_RECV(0): // If received quit ...
        return;          // ... quit.
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

CHAN_RECV and CHAN_SEND are macros used to discriminate between the returned index in the recv_chan array from the send_chan array:

#define CHAN_RECV(IDX)  ((int)(IDX))
#define CHAN_SEND(IDX)  (1024 + (int)(IDX))
Enter fullscreen mode Exit fullscreen mode

If you’re wondering how there can be both an enumeration with constants CHAN_RECV and CHAN_SEND and macros having the same names, the preprocessor doesn’t expand a macro with arguments unless the macro’s name is immediately followed by (.

Admittedly, chan_select is nowhere near as elegant as the select statement in Go, but that’s the best you can do in C without direct language support.

Channels in C Implementation: Select

The code for send and receive is relatively straightforward (for multithreaded code). The code to implement select, specifically a blocking select, is quite a bit more complicated. Why? A blocking select waits for any one of a number of channels to become ready and, unfortunately, the pthreads API has no way to pthread_cond_wait for multiple condition variables simultaneously, one for each channel being selected from. But the signaler can signal any number of condition variables. Hence one approach is to have a linked list of “observers” for every channel where an observer is either the channel itself or an active select in some thread.

Data Structures

To implement an observer, we’ll need another data structure (where the non-obvious members will be explained in due course):

typedef struct chan_obs_impl chan_obs_impl;
struct chan_obs_impl {
  struct chan      *chan;           // The channel being observed.
  chan_obs_impl    *next;           // The next observer, if any.
  pthread_mutex_t  *pmtx;           // The mutex to use.
  pthread_cond_t    chan_ready;     // Is chan ready?
  unsigned          key;            // A fairly unique key.
};
Enter fullscreen mode Exit fullscreen mode

In the chan structure, the ready[2] array will be replaced by:

  chan_obs_impl     observer[2];    // Receiver/sender.
Enter fullscreen mode Exit fullscreen mode

When an observer is embedded inside a chan structure, pmtx will point to the channel’s own mtx; when an observer is a chan_select, pmtx will point to a mutex in chan_select’s stack frame (more later).

Given chan_obs_impl, we’ll rename chan_signal to chan_signal_all_obs and expand its implementation to signal all observers:

static void chan_signal_all_obs( struct chan *chan, chan_dir dir,
                                 int (*pthread_cond_fn)( pthread_cond_t* ) ) {
  if ( chan->wait_cnt[ dir ] == 0 )     // Nobody is waiting.
    return;

#ifndef NDEBUG
  int debug_locked_cnt = 0;
#endif /* NDEBUG */
  pthread_mutex_t *pmtx = NULL, *next_pmtx = NULL;

  for ( chan_impl_obs *obs = &chan->observer[ dir ], *next_obs;
        obs != NULL; obs = next_obs, pmtx = next_pmtx ) {
    obs->chan = chan;
    (*pthread_cond_fn)( &obs->chan_ready );
    next_obs = obs->next;
    if ( next_obs != NULL ) {
      next_pmtx = next_obs->pmtx;       // Do hand-over-hand locking:
      pthread_mutex_lock( next_pmtx );  // lock next mutex ...
      DEBUG_BLOCK( ++debug_locked_cnt; );
    }
    if ( pmtx != NULL ) {
      pthread_mutex_unlock( pmtx );     // ... before unlocking. previous.
      DEBUG_BLOCK( --debug_locked_cnt; );
    }
  } // for

  assert( debug_locked_cnt == 0 );
}
Enter fullscreen mode Exit fullscreen mode

The function starts at the channel’s own observer, then traverses the linked list of chan_select observers following the next pointers signaling each in turn. During the traversal, it performs hand-over-hand locking, that is it locks the next observer’s mutex before unlocking the previous observer’s mutex to ensure thread safety.

As a safety check for debugging, a debug_locked_cnt counts the current number of mutexes locked. At the end of the function is an assert to ensure the number of mutexes that were locked were also unlocked. DEBUG_BLOCK is a macro that expands its arguments only if NDEBUG is not defined.

We’ll also need another data structure to refer to a channel:

struct chan_select_ref {
  struct chan    *chan;         // The channel referred to.
  unsigned short  param_idx;    // Index into recv_chan/send_chan.
  chan_dir        dir;          // Selected channel direction.
  bool            maybe_ready;  // Is channel maybe ready?
};
typedef struct chan_select_ref chan_select_ref;
Enter fullscreen mode Exit fullscreen mode

Channels in C Implementation: Select

The implementation for chan_select starts off as:

int chan_select( unsigned recv_len, struct chan *recv_chan[recv_len],
                 void *recv_buf[recv_len],
                 unsigned send_len, struct chan *send_chan[send_len],
                 void const *send_buf[send_len],
                 struct timespec const *duration ) {
  if ( unlikely( recv_len > 0 && (recv_chan == NULL || recv_buf == NULL) ) ||
       unlikely( send_len > 0 && (send_chan == NULL || send_buf == NULL) ) ) {
    errno = EINVAL;
    return -1;
  }

  unsigned const total_channels = recv_len + send_len;

  chan_select_ref stack_ref[16];
  chan_select_ref *const ref = total_channels <= ARRAY_SIZE( stack_ref ) ?
    stack_ref : malloc( total_channels * sizeof( chan_select_ref ) );

  struct timespec abs_ts;
  struct timespec const *const abs_time =
    ts_dur_to_abs( duration, &abs_ts );

  unsigned                chans_open;   // Number of open channels.
  bool const              is_blocking = duration != NULL;
  chan_impl_obs           select_obs;   // Observer for this select.
  pthread_mutex_t         select_mtx;   // Mutex for select_obs.
  chan_select_ref const  *selected_ref; // Reference to selected channel.
  int                     rv;

  // ...
Enter fullscreen mode Exit fullscreen mode

The first few lines simply check for invalid arguments. Next, we attempt small-size optimization, specifically if the total number of channels is ≤ 16, we use a stack-based array of chan_select_ref; otherwise we malloc it. Next, abs_time is calculated just as in other functions. Next, local variables are declared.

The next part of the implementation is:

  if ( is_blocking ) {
    pthread_mutex_init( &select_mtx, NULL );
    chan_obs_init( &select_obs, &select_mtx );
    chan_obs_init_key( &select_obs );
  }

  // ...
Enter fullscreen mode Exit fullscreen mode

If this is a blocking select, then we need to initialize the mutex we’ll use as well as our observer:

static void chan_obs_init( chan_impl_obs *obs, pthread_mutex_t *pmtx ) {
  assert( obs != NULL );

  obs->chan = NULL;
  pthread_cond_init( &obs->chan_ready, NULL );
  obs->key  = 0;
  obs->next = NULL;
  obs->pmtx = pmtx;
}

static void chan_obs_init_key( chan_impl_obs *obs ) {
  assert( obs != NULL );

  static unsigned        next_key     = 1;
  static pthread_mutex_t next_key_mtx = PTHREAD_MUTEX_INITIALIZER;

  pthread_mutex_lock( &next_key_mtx );
  obs->key = next_key++;
  if ( next_key == 0 )    // Reserved for channel itself
    ++next_key;
  pthread_mutex_unlock( &next_key_mtx );
}
Enter fullscreen mode Exit fullscreen mode

The chan_obs_init_key function initializes an observer’s key to a (fairly) unique integer. Why does an observer need a key? It’s so when an observer is inserted into a linked list of observers, it’s inserted in ascending key order. Why does the list need to be in key order? As mentioned, when traversing the list, hand-over-hand locking is used. By having the list in key order, it means mutexes for adjacent observers are always locked in the same order eliminating the possibility of a deadlock.

The next part of the chan_select implementation is:

  do {
    chans_open = 0;
    rv = 0;
    selected_ref = NULL;

    unsigned const maybe_ready_len =
      chan_select_init(
        ref, &chans_open, recv_len, recv_chan, CHAN_RECV,
        is_blocking ? &select_obs : NULL
      ) +
      chan_select_init(
        ref, &chans_open, send_len, send_chan, CHAN_SEND,
        is_blocking ? &select_obs : NULL
      );

    if ( chans_open == 0 )
      break;

    // ...
Enter fullscreen mode Exit fullscreen mode

It starts a big do ... while loop and determines which of the given channels might be ready by calling chan_select_init once each for the receive and send channels, also updating chans_open. If all the channels are closed, we just break and return.

The implementation of chan_select_init is:

static unsigned chan_select_init( chan_select_ref ref[], unsigned *pref_len,
                                  unsigned chan_len,
                                  struct chan *chan[chan_len], chan_dir dir,
                                  chan_impl_obs *add_obs ) {
  assert( ref != NULL );
  assert( pref_len != NULL );
  assert( chan_len == 0 || chan != NULL );

  unsigned maybe_ready_len = 0;

  if ( chan != NULL ) {
    for ( unsigned i = 0; i < chan_len; ++i ) {
      bool is_ready = false;
      pthread_mutex_lock( &chan[i]->mtx );

      bool const is_closed = chan[i]->is_closed;
      if ( !is_closed ) {
        is_ready = chan[i]->buf_cap > 0 ?
          (dir == CHAN_RECV ?
            chan[i]->buf.ring_len > 0 :
            chan[i]->buf.ring_len < chan[i]->buf_cap) :
          chan[i]->wait_cnt[ !dir ] > 0;

        if ( add_obs != NULL )
          chan_add_obs( chan[i], dir, add_obs );
      }

      pthread_mutex_unlock( &chan[i]->mtx );

      if ( is_closed )
        continue;
      if ( is_ready || add_obs != NULL ) {
        ref[ (*pref_len)++ ] = (chan_select_ref){
          .chan = chan[i],
          .dir = dir,
          .param_idx = (unsigned short)i,
          .maybe_ready = is_ready
        };
      }
      if ( is_ready )
        ++maybe_ready_len;
    } // for
  }

  return maybe_ready_len;
}
Enter fullscreen mode Exit fullscreen mode

It iterates through the array of channels looking for those that are not closed and determining whether each is ready.

The add_obs parameter is not NULL only for blocking selects. For a blocking select, we need to add its observer via chan_add_obs:

static void chan_add_obs( struct chan *chan, chan_dir dir,
                          chan_impl_obs *add_obs ) {
  assert( chan != NULL );
  assert( add_obs != NULL );

#ifndef NDEBUG
  int debug_locked_cnt = 0;
#endif /* NDEBUG */
  pthread_mutex_t *pmtx = NULL, *next_pmtx = NULL;

  for ( chan_impl_obs *obs = &chan->observer[ dir ], *next_obs; obs != NULL;
        obs = next_obs, pmtx = next_pmtx ) {
    next_obs = obs->next;
    if ( next_obs == NULL ) {           // At end of list.
      obs->next = add_obs;
    }
    else {
      next_pmtx = next_obs->pmtx;       // Do hand-over-hand locking:
      pthread_mutex_lock( next_pmtx );  // lock next mutex ... (1)
      DEBUG_BLOCK( ++debug_locked_cnt; );
      if ( add_obs->key < next_obs->key ) {
        obs->next = add_obs;
        add_obs->next = next_obs;
        next_obs = NULL;                // Will cause loop to exit ... (2)
        pmtx = next_pmtx;               // Will unlock next_mptx.
      }
      else {
        assert( add_obs->key != next_obs->key );
      }
    }
    if ( pmtx != NULL ) {               // (2) ... yet still runs this code.
      pthread_mutex_unlock( pmtx );     // (1) ... before unlocking previous.
      DEBUG_BLOCK( --debug_locked_cnt; );
    }
  } // for

  assert( debug_locked_cnt == 0 );
  ++chan->wait_cnt[ dir ];
}
Enter fullscreen mode Exit fullscreen mode

This function is similar to chan_signal_all_obs in its list traversal. The difference is that it inserts the current select’s key into the list when it finds an observer with a key > its own. It also increments wait_cnt since the select will be waiting.

In chan_select_init, the lines:

      if ( is_ready || add_obs != NULL ) {
        ref[ (*pref_len)++ ] = (chan_select_ref){
          .chan = chan[i],
          .dir = dir,
          .param_idx = (unsigned short)i,
          .maybe_ready = is_ready
        };
      }
Enter fullscreen mode Exit fullscreen mode

initialize the chan_select_ref for the current channel.

If chan_select_init checks whether channels are ready, why does it return a count of the channels that may be ready? Because even if a channel is ready at the time it’s checked, as soon as its mutex is unlocked, another thread might operate on the channel and make it no longer ready — “maybe” is the best we can do.

The next part of the chan_select implementation is:

    unsigned select_len = chans_open;

    if ( maybe_ready_len > 0 && maybe_ready_len < chans_open ) {
      qsort(                        // Sort maybe ready channels first ...
        ref, chans_open, sizeof( chan_select_ref ),
        (qsort_cmp_fn)&chan_select_ref_cmp
      );
      select_len = maybe_ready_len; // ... and select only from those.
    }
    else {
      // Otherwise, either no or all channels are ready, so there's
      // no need to sort them.
    }

    // ...
Enter fullscreen mode Exit fullscreen mode

The chan_select_ref array is now initialized. If maybe_ready_len > 0, but < chans_open, it means that only some of the channels may be ready, so sort the array putting those first to increase the odds we’ll select a channel that’s ready using the chan_select_ref_cmp function:

static int chan_select_ref_cmp( chan_select_ref const *i_csr,
                                chan_select_ref const *j_csr ) {
  // sort maybe_ready (true, aka, 1) before !maybe_ready (false, aka, 0)
  return (int)j_csr->maybe_ready - (int)i_csr->maybe_ready;
}
Enter fullscreen mode Exit fullscreen mode

As stated in the comment, if either no or all channels are ready, there’s no need to sort them.

The next part of the chan_select implementation is:

    struct timespec const *might_as_well_wait_time;
    if ( maybe_ready_len == 0 && is_blocking ) {
      // None of the channels may be ready and we should wait -- so wait.
      pthread_mutex_lock( &select_mtx );
      if ( pthread_cond_wait_wrapper( &select_obs.chan_ready, &select_mtx,
                                      abs_time ) == ETIMEDOUT ) {
        rv = ETIMEDOUT;
      }
      pthread_mutex_unlock( &select_mtx );
      if ( rv == 0 ) {                  // A channel became ready: find it.
        for ( unsigned i = 0; i < select_len; ++i ) {
          if ( select_obs.chan == ref[i].chan ) {
            selected_ref = &ref[i];
            break;
          }
        } // for
        assert( selected_ref != NULL );
      }
      might_as_well_wait_time = NULL;
    }
    else {
      // Some or all channels may be ready: pick one at random.
      static pthread_once_t once = PTHREAD_ONCE_INIT;
      pthread_once( &once, &srand_init );
      selected_ref = &ref[ rand() % (int)select_len ];
      might_as_well_wait_time = chans_open == 1 ? abs_time : NULL;
    }

    // ...
Enter fullscreen mode Exit fullscreen mode

If none of the channels are ready and we’re a blocking select, then wait for one to become ready; otherwise just pick a maybe ready channel at random after calling srand once:

static void srand_init( void ) {
  struct timeval now;
  (void)gettimeofday( &now, /*tzp=*/NULL );
  srand( (unsigned)now.tv_usec );
}
Enter fullscreen mode Exit fullscreen mode

In the degenerate case of only one channel being open, set might_as_well_wait_time to abs_time since we might as well just wait for it.

The next part of the chan_select implementation is:

    if ( selected_ref != NULL ) {
      struct chan *const selected_chan = selected_ref->chan;
      rv = selected_ref->dir == CHAN_RECV ?
        selected_chan->buf_cap > 0 ?
          chan_buf_recv(
            selected_chan, recv_buf[ selected_ref->param_idx ],
            might_as_well_wait_time
          ) :
          chan_unbuf_recv(
            selected_chan, recv_buf[ selected_ref->param_idx ],
            might_as_well_wait_time
          )
      :
        selected_chan->buf_cap > 0 ?
          chan_buf_send(
            selected_chan, send_buf[ selected_ref->param_idx ],
            might_as_well_wait_time
          ) :
          chan_unbuf_send(
            selected_chan, send_buf[ selected_ref->param_idx ],
            might_as_well_wait_time
          );
    }

    // ...
Enter fullscreen mode Exit fullscreen mode

If we’ve selected a channel, then attempt either to receive from or send to the channel using might_as_well_wait_time:

  1. If the channel is still ready, we won’t wait.
  2. If there’s only a single channel open, wait for it to become ready.
  3. Otherwise don’t wait: we’ll try another channel.

At this point, either we successfully sent or received on the selected channel, it became no longer ready, or the time-out expired. We need to do a bit of clean-up. The next part of the chan_select implementation is:

    if ( is_blocking ) {
      for ( unsigned i = 0; i < recv_len; ++i )
        chan_remove_obs( recv_chan[i], CHAN_RECV, &select_obs );
      for ( unsigned i = 0; i < send_len; ++i )
        chan_remove_obs( send_chan[i], CHAN_SEND, &select_obs );
    }

    // ...
Enter fullscreen mode Exit fullscreen mode

If we’re a blocking select, we need to remove ourselves from our channels’ list of observers:

static void chan_remove_obs( struct chan *chan, chan_dir dir,
                             chan_impl_obs *remove_obs ) {
  assert( chan != NULL );
  assert( remove_obs != NULL );

#ifndef NDEBUG
  int   debug_locked_cnt = 0;
  bool  debug_removed = false;
#endif /* NDEBUG */
  pthread_mutex_t *pmtx = NULL, *next_pmtx = NULL;

  pthread_mutex_lock( &chan->mtx );

  for ( chan_impl_obs *obs = &chan->observer[ dir ], *next_obs; obs != NULL;
        obs = next_obs, pmtx = next_pmtx ) {
    next_obs = obs->next;
    if ( next_obs == remove_obs ) {
      // remove_obs is an observer in our caller's stack frame, i.e.,
      // this thread, so there's no need to lock next_obs->pmtx.
      obs->next = next_obs->next;
      next_obs = NULL;                  // Will cause loop to exit ... (1)
      DEBUG_BLOCK( debug_removed = true; );
    }
    else if ( next_obs != NULL ) {
      next_pmtx = next_obs->pmtx;       // Do hand-over-hand locking:
      PTHREAD_MUTEX_LOCK( next_pmtx );  // lock next mutex ... (2)
      DEBUG_BLOCK( ++debug_locked_cnt; );
    }
    if ( pmtx != NULL ) {               // (1) ... yet still runs this code.
      PTHREAD_MUTEX_UNLOCK( pmtx );     // (2) ... before unlocking previous.
      DEBUG_BLOCK( --debug_locked_cnt; );
    }
  } // for

  --chan->wait_cnt[ dir ];
  pthread_mutex_unlock( &chan->mtx );
  assert( debug_locked_cnt == 0 );
  assert( debug_removed );
}
Enter fullscreen mode Exit fullscreen mode

The function does the same hand-over-hand locking looking for the observer to remove.

The next part of the chan_select implementation ends the big do ... while loop:

  } while ( rv == EAGAIN || (rv == EPIPE && chans_open > 1) );

  // ...
Enter fullscreen mode Exit fullscreen mode

If rv is 0, we succeeded; if ETIMEDOUT, we timed-out. In either of those cases we’re done and shouldn’t try again. However, if rv is:

  • EAGAIN, we selected a channel that isn’t ready (even if it had been ready shortly before);
  • EPIPE, we selected a channel that was open when we originally checked, but closed now.

In either of the latter two cases, we should try again, hence the loop.

The last part of the chan_select implementation is:

  if ( selected_ref == NULL ) {
    if ( rv != 0 )
      errno = rv;
    rv = -1;
  }
  else {
    rv = selected_ref->dir == CHAN_RECV ?
      CHAN_RECV( selected_ref->param_idx ) :
      CHAN_SEND( selected_ref->param_idx );
  }

  if ( is_blocking ) {
    chan_obs_cleanup( &select_obs );
    pthread_mutex_destroy( &select_mtx );
  }
  if ( ref != stack_ref )
    free( ref );

  return rv;
}
Enter fullscreen mode Exit fullscreen mode

If we didn’t select a channel and rv is non-zero, set errno and return -1; otherwise, set rv to the index into either the chan_recv or chan_send array. Lastly, a bit more clean-up.

Conclusion

The full source code for this implementation, C Chan ,is available. If you ignore the syntactic sugar Go provides for dealing with channels, C Chan is perfectly usable in real-world programs.

Epilogue

Weren’t there already other Go-like channel libraries written in C? Yes, but the ones I could find that are serious are either incomplete (e.g., don’t support blocking select) or whose code is buggy due to having race conditions.

What about a C++ version? Many C libraries can have trivial C++ wrappers written for them to get full type support (to deal with T* rather than void*) and RAII. Unfortunately, a trivial C++ wrapper can’t be written for C Chan because, in a C++ version, you want messages to be copied as objects of type T using T’s = operator, not memcpy; or you want the option to std::move messages instead. C Chan itself would have to be modified to support alternatives to memcpy.

Comments 0 total

    Add comment