/*
 * Copyright 2009-2015 Samy Al Bahra.
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 * SUCH DAMAGE.
 */

#ifndef CK_RING_H
#define CK_RING_H

#include <ck_cc.h>
#include <ck_md.h>
#include <ck_pr.h>
#include <ck_stdbool.h>
#include <ck_string.h>

/*
 * Concurrent ring buffer.
 */

struct ck_ring {
	unsigned int c_head;
	char pad[CK_MD_CACHELINE - sizeof(unsigned int)];
	unsigned int p_tail;
	unsigned int p_head;
	char _pad[CK_MD_CACHELINE - sizeof(unsigned int) * 2];
	unsigned int size;
	unsigned int mask;
};
typedef struct ck_ring ck_ring_t;

struct ck_ring_buffer {
	void *value;
};
typedef struct ck_ring_buffer ck_ring_buffer_t;

CK_CC_INLINE static unsigned int
ck_ring_size(const struct ck_ring *ring)
{
	unsigned int c, p;

	c = ck_pr_load_uint(&ring->c_head);
	p = ck_pr_load_uint(&ring->p_tail);
	return (p - c) & ring->mask;
}

CK_CC_INLINE static unsigned int
ck_ring_capacity(const struct ck_ring *ring)
{

	return ring->size;
}

/*
 * This function is only safe to call when there are no concurrent operations
 * on the ring. This is primarily meant for persistent ck_ring use-cases. The
 * function returns true if any mutations were performed on the ring.
 */
CK_CC_INLINE static bool
ck_ring_repair(struct ck_ring *ring)
{
	bool r = false;

	if (ring->p_tail != ring->p_head) {
		ring->p_tail = ring->p_head;
		r = true;
	}

	return r;
}

/*
 * This can be called when no concurrent updates are occurring on the ring
 * structure to check for consistency. This is primarily meant to be used for
 * persistent storage of the ring. If this functions returns false, the ring
 * is in an inconsistent state.
 */
CK_CC_INLINE static bool
ck_ring_valid(const struct ck_ring *ring)
{
	unsigned int size = ring->size;
	unsigned int c_head = ring->c_head;
	unsigned int p_head = ring->p_head;

	/* The ring must be a power of 2. */
	if (size & (size - 1))
		return false;

	/* The consumer counter must always be smaller than the producer. */
	if (c_head > p_head)
		return false;

	/* The producer may only be up to size slots ahead of consumer. */
	if (p_head - c_head >= size)
		return false;

	return true;
}

CK_CC_INLINE static void
ck_ring_init(struct ck_ring *ring, unsigned int size)
{

	ring->size = size;
	ring->mask = size - 1;
	ring->p_tail = 0;
	ring->p_head = 0;
	ring->c_head = 0;
	return;
}

/*
 * The _ck_ring_* namespace is internal only and must not used externally.
 */

/*
 * This function will return a region of memory to write for the next value
 * for a single producer.
 */
CK_CC_FORCE_INLINE static void *
_ck_ring_enqueue_reserve_sp(struct ck_ring *ring,
    void *CK_CC_RESTRICT buffer,
    unsigned int ts,
    unsigned int *size)
{
	const unsigned int mask = ring->mask;
	unsigned int consumer, producer, delta;

	consumer = ck_pr_load_uint(&ring->c_head);
	producer = ring->p_tail;
	delta = producer + 1;
	if (size != NULL)
		*size = (producer - consumer) & mask;

	if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask)))
		return NULL;

	return (char *)buffer + ts * (producer & mask);
}

/*
 * This is to be called to commit and make visible a region of previously
 * reserved with reverse_sp.
 */
CK_CC_FORCE_INLINE static void
_ck_ring_enqueue_commit_sp(struct ck_ring *ring)
{

	ck_pr_fence_store();
	ck_pr_store_uint(&ring->p_tail, ring->p_tail + 1);
	return;
}

CK_CC_FORCE_INLINE static bool
_ck_ring_enqueue_sp(struct ck_ring *ring,
    void *CK_CC_RESTRICT buffer,
    const void *CK_CC_RESTRICT entry,
    unsigned int ts,
    unsigned int *size)
{
	const unsigned int mask = ring->mask;
	unsigned int consumer, producer, delta;

	consumer = ck_pr_load_uint(&ring->c_head);
	producer = ring->p_tail;
	delta = producer + 1;
	if (size != NULL)
		*size = (producer - consumer) & mask;

	if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask)))
		return false;

	buffer = (char *)buffer + ts * (producer & mask);
	memcpy(buffer, entry, ts);

	/*
	 * Make sure to update slot value before indicating
	 * that the slot is available for consumption.
	 */
	ck_pr_fence_store();
	ck_pr_store_uint(&ring->p_tail, delta);
	return true;
}

CK_CC_FORCE_INLINE static bool
_ck_ring_enqueue_sp_size(struct ck_ring *ring,
    void *CK_CC_RESTRICT buffer,
    const void *CK_CC_RESTRICT entry,
    unsigned int ts,
    unsigned int *size)
{
	unsigned int sz;
	bool r;

	r = _ck_ring_enqueue_sp(ring, buffer, entry, ts, &sz);
	*size = sz;
	return r;
}

CK_CC_FORCE_INLINE static bool
_ck_ring_dequeue_sc(struct ck_ring *ring,
    const void *CK_CC_RESTRICT buffer,
    void *CK_CC_RESTRICT target,
    unsigned int size)
{
	const unsigned int mask = ring->mask;
	unsigned int consumer, producer;

	consumer = ring->c_head;
	producer = ck_pr_load_uint(&ring->p_tail);

	if (CK_CC_UNLIKELY(consumer == producer))
		return false;

	/*
	 * Make sure to serialize with respect to our snapshot
	 * of the producer counter.
	 */
	ck_pr_fence_load();

	buffer = (const char *)buffer + size * (consumer & mask);
	memcpy(target, buffer, size);

	/*
	 * Make sure copy is completed with respect to consumer
	 * update.
	 */
	ck_pr_fence_store();
	ck_pr_store_uint(&ring->c_head, consumer + 1);
	return true;
}

CK_CC_FORCE_INLINE static void *
_ck_ring_enqueue_reserve_mp(struct ck_ring *ring,
    void *buffer,
    unsigned int ts,
    unsigned int *ticket,
    unsigned int *size)
{
	const unsigned int mask = ring->mask;
	unsigned int producer, consumer, delta;

	producer = ck_pr_load_uint(&ring->p_head);

	for (;;) {
		ck_pr_fence_load();
		consumer = ck_pr_load_uint(&ring->c_head);

		delta = producer + 1;

		if (CK_CC_LIKELY((producer - consumer) < mask)) {
			if (ck_pr_cas_uint_value(&ring->p_head,
			    producer, delta, &producer) == true) {
				break;
			}
		} else {
			unsigned int new_producer;

			ck_pr_fence_load();
			new_producer = ck_pr_load_uint(&ring->p_head);

			if (producer == new_producer) {
				if (size != NULL)
					*size = (producer - consumer) & mask;

				return false;
			}

			producer = new_producer;
		}
	}

	*ticket = producer;
	if (size != NULL)
		*size = (producer - consumer) & mask;

	return (char *)buffer + ts * (producer & mask);
}

CK_CC_FORCE_INLINE static void
_ck_ring_enqueue_commit_mp(struct ck_ring *ring, unsigned int producer)
{

	while (ck_pr_load_uint(&ring->p_tail) != producer)
		ck_pr_stall();

	ck_pr_fence_store();
	ck_pr_store_uint(&ring->p_tail, producer + 1);
	return;
}

CK_CC_FORCE_INLINE static bool
_ck_ring_enqueue_mp(struct ck_ring *ring,
    void *buffer,
    const void *entry,
    unsigned int ts,
    unsigned int *size)
{
	const unsigned int mask = ring->mask;
	unsigned int producer, consumer, delta;
	bool r = true;

	producer = ck_pr_load_uint(&ring->p_head);

	for (;;) {
		/*
		 * The snapshot of producer must be up to date with respect to
		 * consumer.
		 */
		ck_pr_fence_load();
		consumer = ck_pr_load_uint(&ring->c_head);

		delta = producer + 1;

		/*
		 * Only try to CAS if the producer is not clearly stale (not
		 * less than consumer) and the buffer is definitely not full.
		 */
		if (CK_CC_LIKELY((producer - consumer) < mask)) {
			if (ck_pr_cas_uint_value(&ring->p_head,
			    producer, delta, &producer) == true) {
				break;
			}
		} else {
			unsigned int new_producer;

			/*
			 * Slow path.  Either the buffer is full or we have a
			 * stale snapshot of p_head.  Execute a second read of
			 * p_read that must be ordered wrt the snapshot of
			 * c_head.
			 */
			ck_pr_fence_load();
			new_producer = ck_pr_load_uint(&ring->p_head);

			/*
			 * Only fail if we haven't made forward progress in
			 * production: the buffer must have been full when we
			 * read new_producer (or we wrapped around UINT_MAX
			 * during this iteration).
			 */
			if (producer == new_producer) {
				r = false;
				goto leave;
			}

			/*
			 * p_head advanced during this iteration. Try again.
			 */
			producer = new_producer;
		}
	}

	buffer = (char *)buffer + ts * (producer & mask);
	memcpy(buffer, entry, ts);

	/*
	 * Wait until all concurrent producers have completed writing
	 * their data into the ring buffer.
	 */
	while (ck_pr_load_uint(&ring->p_tail) != producer)
		ck_pr_stall();

	/*
	 * Ensure that copy is completed before updating shared producer
	 * counter.
	 */
	ck_pr_fence_store();
	ck_pr_store_uint(&ring->p_tail, delta);

leave:
	if (size != NULL)
		*size = (producer - consumer) & mask;

	return r;
}

CK_CC_FORCE_INLINE static bool
_ck_ring_enqueue_mp_size(struct ck_ring *ring,
    void *buffer,
    const void *entry,
    unsigned int ts,
    unsigned int *size)
{
	unsigned int sz;
	bool r;

	r = _ck_ring_enqueue_mp(ring, buffer, entry, ts, &sz);
	*size = sz;
	return r;
}

CK_CC_FORCE_INLINE static bool
_ck_ring_trydequeue_mc(struct ck_ring *ring,
    const void *buffer,
    void *data,
    unsigned int size)
{
	const unsigned int mask = ring->mask;
	unsigned int consumer, producer;

	consumer = ck_pr_load_uint(&ring->c_head);
	ck_pr_fence_load();
	producer = ck_pr_load_uint(&ring->p_tail);

	if (CK_CC_UNLIKELY(consumer == producer))
		return false;

	ck_pr_fence_load();

	buffer = (const char *)buffer + size * (consumer & mask);
	memcpy(data, buffer, size);

	ck_pr_fence_store_atomic();
	return ck_pr_cas_uint(&ring->c_head, consumer, consumer + 1);
}

CK_CC_FORCE_INLINE static bool
_ck_ring_dequeue_mc(struct ck_ring *ring,
    const void *buffer,
    void *data,
    unsigned int ts)
{
	const unsigned int mask = ring->mask;
	unsigned int consumer, producer;

	consumer = ck_pr_load_uint(&ring->c_head);

	do {
		const char *target;

		/*
		 * Producer counter must represent state relative to
		 * our latest consumer snapshot.
		 */
		ck_pr_fence_load();
		producer = ck_pr_load_uint(&ring->p_tail);

		if (CK_CC_UNLIKELY(consumer == producer))
			return false;

		ck_pr_fence_load();

		target = (const char *)buffer + ts * (consumer & mask);
		memcpy(data, target, ts);

		/* Serialize load with respect to head update. */
		ck_pr_fence_store_atomic();
	} while (ck_pr_cas_uint_value(&ring->c_head,
				      consumer,
				      consumer + 1,
				      &consumer) == false);

	return true;
}

/*
 * The ck_ring_*_spsc namespace is the public interface for interacting with a
 * ring buffer containing pointers. Correctness is only provided if there is up
 * to one concurrent consumer and up to one concurrent producer.
 */
CK_CC_INLINE static bool
ck_ring_enqueue_spsc_size(struct ck_ring *ring,
    struct ck_ring_buffer *buffer,
    const void *entry,
    unsigned int *size)
{

	return _ck_ring_enqueue_sp_size(ring, buffer, &entry,
	    sizeof(entry), size);
}

CK_CC_INLINE static bool
ck_ring_enqueue_spsc(struct ck_ring *ring,
    struct ck_ring_buffer *buffer,
    const void *entry)
{

	return _ck_ring_enqueue_sp(ring, buffer,
	    &entry, sizeof(entry), NULL);
}

CK_CC_INLINE static void *
ck_ring_enqueue_reserve_spsc_size(struct ck_ring *ring,
    struct ck_ring_buffer *buffer,
    unsigned int *size)
{

	return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *),
	    size);
}

CK_CC_INLINE static void *
ck_ring_enqueue_reserve_spsc(struct ck_ring *ring,
    struct ck_ring_buffer *buffer)
{

	return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *),
	    NULL);
}

CK_CC_INLINE static void
ck_ring_enqueue_commit_spsc(struct ck_ring *ring)
{

	_ck_ring_enqueue_commit_sp(ring);
	return;
}

CK_CC_INLINE static bool
ck_ring_dequeue_spsc(struct ck_ring *ring,
    const struct ck_ring_buffer *buffer,
    void *data)
{

	return _ck_ring_dequeue_sc(ring, buffer,
	    (void **)data, sizeof(void *));
}

/*
 * The ck_ring_*_mpmc namespace is the public interface for interacting with a
 * ring buffer containing pointers. Correctness is provided for any number of
 * producers and consumers.
 */
CK_CC_INLINE static bool
ck_ring_enqueue_mpmc(struct ck_ring *ring,
    struct ck_ring_buffer *buffer,
    const void *entry)
{

	return _ck_ring_enqueue_mp(ring, buffer, &entry, sizeof(entry), NULL);
}

CK_CC_INLINE static bool
ck_ring_enqueue_mpmc_size(struct ck_ring *ring,
    struct ck_ring_buffer *buffer,
    const void *entry,
    unsigned int *size)
{

	return _ck_ring_enqueue_mp_size(ring, buffer, &entry, sizeof(entry),
	    size);
}

CK_CC_INLINE static void *
ck_ring_enqueue_reserve_mpmc(struct ck_ring *ring,
    struct ck_ring_buffer *buffer,
    unsigned int *ticket)
{

	return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *),
	    ticket, NULL);
}

CK_CC_INLINE static void *
ck_ring_enqueue_reserve_mpmc_size(struct ck_ring *ring,
    struct ck_ring_buffer *buffer,
    unsigned int *ticket,
    unsigned int *size)
{

	return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *),
	    ticket, size);
}

CK_CC_INLINE static void
ck_ring_enqueue_commit_mpmc(struct ck_ring *ring, unsigned int ticket)
{

	_ck_ring_enqueue_commit_mp(ring, ticket);
	return;
}

CK_CC_INLINE static bool
ck_ring_trydequeue_mpmc(struct ck_ring *ring,
    const struct ck_ring_buffer *buffer,
    void *data)
{

	return _ck_ring_trydequeue_mc(ring,
	    buffer, (void **)data, sizeof(void *));
}

CK_CC_INLINE static bool
ck_ring_dequeue_mpmc(struct ck_ring *ring,
    const struct ck_ring_buffer *buffer,
    void *data)
{

	return _ck_ring_dequeue_mc(ring, buffer, (void **)data,
	    sizeof(void *));
}

/*
 * The ck_ring_*_spmc namespace is the public interface for interacting with a
 * ring buffer containing pointers. Correctness is provided for any number of
 * consumers with up to one concurrent producer.
 */
CK_CC_INLINE static void *
ck_ring_enqueue_reserve_spmc_size(struct ck_ring *ring,
    struct ck_ring_buffer *buffer,
    unsigned int *size)
{

	return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *), size);
}

CK_CC_INLINE static void *
ck_ring_enqueue_reserve_spmc(struct ck_ring *ring,
    struct ck_ring_buffer *buffer)
{

	return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *), NULL);
}

CK_CC_INLINE static void
ck_ring_enqueue_commit_spmc(struct ck_ring *ring)
{

	_ck_ring_enqueue_commit_sp(ring);
	return;
}

CK_CC_INLINE static bool
ck_ring_enqueue_spmc_size(struct ck_ring *ring,
    struct ck_ring_buffer *buffer,
    const void *entry,
    unsigned int *size)
{

	return _ck_ring_enqueue_sp_size(ring, buffer, &entry,
	    sizeof(entry), size);
}

CK_CC_INLINE static bool
ck_ring_enqueue_spmc(struct ck_ring *ring,
    struct ck_ring_buffer *buffer,
    const void *entry)
{

	return _ck_ring_enqueue_sp(ring, buffer, &entry,
	    sizeof(entry), NULL);
}

CK_CC_INLINE static bool
ck_ring_trydequeue_spmc(struct ck_ring *ring,
    const struct ck_ring_buffer *buffer,
    void *data)
{

	return _ck_ring_trydequeue_mc(ring, buffer, (void **)data, sizeof(void *));
}

CK_CC_INLINE static bool
ck_ring_dequeue_spmc(struct ck_ring *ring,
    const struct ck_ring_buffer *buffer,
    void *data)
{

	return _ck_ring_dequeue_mc(ring, buffer, (void **)data, sizeof(void *));
}

/*
 * The ck_ring_*_mpsc namespace is the public interface for interacting with a
 * ring buffer containing pointers. Correctness is provided for any number of
 * producers with up to one concurrent consumers.
 */
CK_CC_INLINE static void *
ck_ring_enqueue_reserve_mpsc(struct ck_ring *ring,
    struct ck_ring_buffer *buffer,
    unsigned int *ticket)
{

	return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *),
	    ticket, NULL);
}

CK_CC_INLINE static void *
ck_ring_enqueue_reserve_mpsc_size(struct ck_ring *ring,
    struct ck_ring_buffer *buffer,
    unsigned int *ticket,
    unsigned int *size)
{

	return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *),
	    ticket, size);
}

CK_CC_INLINE static void
ck_ring_enqueue_commit_mpsc(struct ck_ring *ring, unsigned int ticket)
{

	_ck_ring_enqueue_commit_mp(ring, ticket);
	return;
}

CK_CC_INLINE static bool
ck_ring_enqueue_mpsc(struct ck_ring *ring,
    struct ck_ring_buffer *buffer,
    const void *entry)
{

	return _ck_ring_enqueue_mp(ring, buffer, &entry,
	    sizeof(entry), NULL);
}

CK_CC_INLINE static bool
ck_ring_enqueue_mpsc_size(struct ck_ring *ring,
    struct ck_ring_buffer *buffer,
    const void *entry,
    unsigned int *size)
{

	return _ck_ring_enqueue_mp_size(ring, buffer, &entry,
	    sizeof(entry), size);
}

CK_CC_INLINE static bool
ck_ring_dequeue_mpsc(struct ck_ring *ring,
    const struct ck_ring_buffer *buffer,
    void *data)
{

	return _ck_ring_dequeue_sc(ring, buffer, (void **)data,
	    sizeof(void *));
}

/*
 * CK_RING_PROTOTYPE is used to define a type-safe interface for inlining
 * values of a particular type in the ring the buffer.
 */
#define CK_RING_PROTOTYPE(name, type)				\
CK_CC_INLINE static struct type *				\
ck_ring_enqueue_reserve_spsc_##name(struct ck_ring *a,		\
    struct type *b)						\
{								\
								\
	return _ck_ring_enqueue_reserve_sp(a, b, 		\
	    sizeof(struct type), NULL);				\
}								\
								\
CK_CC_INLINE static struct type *				\
ck_ring_enqueue_reserve_spsc_size_##name(struct ck_ring *a,	\
    struct type *b,						\
    unsigned int *c)						\
{								\
								\
	return _ck_ring_enqueue_reserve_sp(a, b, 		\
	    sizeof(struct type), c);				\
}								\
								\
CK_CC_INLINE static bool					\
ck_ring_enqueue_spsc_size_##name(struct ck_ring *a,		\
    struct type *b,						\
    struct type *c,						\
    unsigned int *d)						\
{								\
								\
	return _ck_ring_enqueue_sp_size(a, b, c,		\
	    sizeof(struct type), d);				\
}								\
								\
CK_CC_INLINE static bool					\
ck_ring_enqueue_spsc_##name(struct ck_ring *a,			\
    struct type *b,						\
    struct type *c)						\
{								\
								\
	return _ck_ring_enqueue_sp(a, b, c,			\
	    sizeof(struct type), NULL);				\
}								\
								\
CK_CC_INLINE static bool					\
ck_ring_dequeue_spsc_##name(struct ck_ring *a,			\
    struct type *b,						\
    struct type *c)						\
{								\
								\
	return _ck_ring_dequeue_sc(a, b, c,			\
	    sizeof(struct type));				\
}								\
								\
CK_CC_INLINE static struct type *				\
ck_ring_enqueue_reserve_spmc_##name(struct ck_ring *a,		\
    struct type *b)						\
{								\
								\
	return _ck_ring_enqueue_reserve_sp(a, b, 		\
	    sizeof(struct type), NULL);				\
}								\
								\
CK_CC_INLINE static struct type *				\
ck_ring_enqueue_reserve_spmc_size_##name(struct ck_ring *a,	\
    struct type *b,						\
    unsigned int *c)						\
{								\
								\
	return _ck_ring_enqueue_reserve_sp(a, b, 		\
	    sizeof(struct type), c);				\
}								\
								\
CK_CC_INLINE static bool					\
ck_ring_enqueue_spmc_size_##name(struct ck_ring *a,		\
    struct type *b,						\
    struct type *c,						\
    unsigned int *d)						\
{								\
								\
	return _ck_ring_enqueue_sp_size(a, b, c,		\
	    sizeof(struct type), d);				\
}								\
								\
CK_CC_INLINE static bool					\
ck_ring_enqueue_spmc_##name(struct ck_ring *a,			\
    struct type *b,						\
    struct type *c)						\
{								\
								\
	return _ck_ring_enqueue_sp(a, b, c,			\
	    sizeof(struct type), NULL);				\
}								\
								\
CK_CC_INLINE static bool					\
ck_ring_trydequeue_spmc_##name(struct ck_ring *a,		\
    struct type *b,						\
    struct type *c)						\
{								\
								\
	return _ck_ring_trydequeue_mc(a,			\
	    b, c, sizeof(struct type));				\
}								\
								\
CK_CC_INLINE static bool					\
ck_ring_dequeue_spmc_##name(struct ck_ring *a,			\
    struct type *b,						\
    struct type *c)						\
{								\
								\
	return _ck_ring_dequeue_mc(a, b, c,			\
	    sizeof(struct type));				\
}								\
								\
CK_CC_INLINE static struct type *				\
ck_ring_enqueue_reserve_mpsc_##name(struct ck_ring *a,		\
    struct type *b,						\
    unsigned int *c)						\
{								\
								\
	return _ck_ring_enqueue_reserve_mp(a, b, 		\
	    sizeof(struct type), c, NULL);			\
}								\
								\
CK_CC_INLINE static struct type *				\
ck_ring_enqueue_reserve_mpsc_size_##name(struct ck_ring *a,	\
    struct type *b,						\
    unsigned int *c,						\
    unsigned int *d)						\
{								\
								\
	return _ck_ring_enqueue_reserve_mp(a, b, 		\
	    sizeof(struct type), c, d);				\
}								\
								\
CK_CC_INLINE static bool					\
ck_ring_enqueue_mpsc_##name(struct ck_ring *a,			\
    struct type *b,						\
    struct type *c)						\
{								\
								\
	return _ck_ring_enqueue_mp(a, b, c,			\
	    sizeof(struct type), NULL);				\
}								\
								\
CK_CC_INLINE static bool					\
ck_ring_enqueue_mpsc_size_##name(struct ck_ring *a,		\
    struct type *b,						\
    struct type *c,						\
    unsigned int *d)						\
{								\
								\
	return _ck_ring_enqueue_mp_size(a, b, c,		\
	    sizeof(struct type), d);				\
}								\
								\
CK_CC_INLINE static bool					\
ck_ring_dequeue_mpsc_##name(struct ck_ring *a,			\
    struct type *b,						\
    struct type *c)						\
{								\
								\
	return _ck_ring_dequeue_sc(a, b, c,			\
	    sizeof(struct type));				\
}								\
								\
CK_CC_INLINE static struct type *				\
ck_ring_enqueue_reserve_mpmc_##name(struct ck_ring *a,		\
    struct type *b,						\
    unsigned int *c)						\
{								\
								\
	return _ck_ring_enqueue_reserve_mp(a, b, 		\
	    sizeof(struct type), c, NULL);			\
}								\
								\
CK_CC_INLINE static struct type *				\
ck_ring_enqueue_reserve_mpmc_size_##name(struct ck_ring *a,	\
    struct type *b,						\
    unsigned int *c,						\
    unsigned int *d)						\
{								\
								\
	return _ck_ring_enqueue_reserve_mp(a, b, 		\
	    sizeof(struct type), c, d);				\
}								\
								\
CK_CC_INLINE static bool					\
ck_ring_enqueue_mpmc_size_##name(struct ck_ring *a,		\
    struct type *b,						\
    struct type *c,						\
    unsigned int *d)						\
{								\
								\
	return _ck_ring_enqueue_mp_size(a, b, c,		\
	    sizeof(struct type), d);				\
}								\
								\
CK_CC_INLINE static bool					\
ck_ring_enqueue_mpmc_##name(struct ck_ring *a,			\
    struct type *b,						\
    struct type *c)						\
{								\
								\
	return _ck_ring_enqueue_mp(a, b, c,			\
	    sizeof(struct type), NULL);				\
}								\
								\
CK_CC_INLINE static bool					\
ck_ring_trydequeue_mpmc_##name(struct ck_ring *a,		\
    struct type *b,						\
    struct type *c)						\
{								\
								\
	return _ck_ring_trydequeue_mc(a,			\
	    b, c, sizeof(struct type));				\
}								\
								\
CK_CC_INLINE static bool					\
ck_ring_dequeue_mpmc_##name(struct ck_ring *a,			\
    struct type *b,						\
    struct type *c)						\
{								\
								\
	return _ck_ring_dequeue_mc(a, b, c,			\
	    sizeof(struct type));				\
}

/*
 * A single producer with one concurrent consumer.
 */
#define CK_RING_ENQUEUE_SPSC(name, a, b, c)			\
	ck_ring_enqueue_spsc_##name(a, b, c)
#define CK_RING_ENQUEUE_SPSC_SIZE(name, a, b, c, d)		\
	ck_ring_enqueue_spsc_size_##name(a, b, c, d)
#define CK_RING_ENQUEUE_RESERVE_SPSC(name, a, b, c)		\
	ck_ring_enqueue_reserve_spsc_##name(a, b, c)
#define CK_RING_ENQUEUE_RESERVE_SPSC_SIZE(name, a, b, c, d)	\
	ck_ring_enqueue_reserve_spsc_size_##name(a, b, c, d)
#define CK_RING_DEQUEUE_SPSC(name, a, b, c)			\
	ck_ring_dequeue_spsc_##name(a, b, c)

/*
 * A single producer with any number of concurrent consumers.
 */
#define CK_RING_ENQUEUE_SPMC(name, a, b, c)			\
	ck_ring_enqueue_spmc_##name(a, b, c)
#define CK_RING_ENQUEUE_SPMC_SIZE(name, a, b, c, d)		\
	ck_ring_enqueue_spmc_size_##name(a, b, c, d)
#define CK_RING_ENQUEUE_RESERVE_SPMC(name, a, b, c)		\
	ck_ring_enqueue_reserve_spmc_##name(a, b, c)
#define CK_RING_ENQUEUE_RESERVE_SPMC_SIZE(name, a, b, c, d)	\
	ck_ring_enqueue_reserve_spmc_size_##name(a, b, c, d)
#define CK_RING_TRYDEQUEUE_SPMC(name, a, b, c)			\
	ck_ring_trydequeue_spmc_##name(a, b, c)
#define CK_RING_DEQUEUE_SPMC(name, a, b, c)			\
	ck_ring_dequeue_spmc_##name(a, b, c)

/*
 * Any number of concurrent producers with up to one
 * concurrent consumer.
 */
#define CK_RING_ENQUEUE_MPSC(name, a, b, c)			\
	ck_ring_enqueue_mpsc_##name(a, b, c)
#define CK_RING_ENQUEUE_MPSC_SIZE(name, a, b, c, d)		\
	ck_ring_enqueue_mpsc_size_##name(a, b, c, d)
#define CK_RING_ENQUEUE_RESERVE_MPSC(name, a, b, c)		\
	ck_ring_enqueue_reserve_mpsc_##name(a, b, c)
#define CK_RING_ENQUEUE_RESERVE_MPSC_SIZE(name, a, b, c, d)	\
	ck_ring_enqueue_reserve_mpsc_size_##name(a, b, c, d)
#define CK_RING_DEQUEUE_MPSC(name, a, b, c)			\
	ck_ring_dequeue_mpsc_##name(a, b, c)

/*
 * Any number of concurrent producers and consumers.
 */
#define CK_RING_ENQUEUE_MPMC(name, a, b, c)			\
	ck_ring_enqueue_mpmc_##name(a, b, c)
#define CK_RING_ENQUEUE_MPMC_SIZE(name, a, b, c, d)		\
	ck_ring_enqueue_mpmc_size_##name(a, b, c, d)
#define CK_RING_ENQUEUE_RESERVE_MPMC(name, a, b, c)		\
	ck_ring_enqueue_reserve_mpmc_##name(a, b, c)
#define CK_RING_ENQUEUE_RESERVE_MPMC_SIZE(name, a, b, c, d)	\
	ck_ring_enqueue_reserve_mpmc_size_##name(a, b, c, d)
#define CK_RING_TRYDEQUEUE_MPMC(name, a, b, c)			\
	ck_ring_trydequeue_mpmc_##name(a, b, c)
#define CK_RING_DEQUEUE_MPMC(name, a, b, c)			\
	ck_ring_dequeue_mpmc_##name(a, b, c)

#endif /* CK_RING_H */
