view src/common/vclock.c @ 153:2a8c2cf48674

common: XDR nvclock code needs to allocate an nvclock at times Since we're always dealing with nvclocks as pointers, we need to deal with them as double-pointers here. This allows us to properly allocate a new nvclock during a decode operation. Signed-off-by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
author Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
date Sun, 18 Oct 2015 15:53:41 -0400
parents ad398971b27a
children
line wrap: on
line source

/*
 * Copyright (c) 2015 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

#include <stdlib.h>
#include <string.h>

#include <nomad/types.h>
#include <nomad/error.h>

/*
 * Vector clocks
 *
 * A vector clock is a set of <node, sequence id> pairs.  Since it is a set,
 * each node may appear at most once.  There are a few implied behaviors:
 *
 * (1) if a node doesn't appear in the vector, it's sequence id is assumed
 *     to be zero.
 * (2) node zero is reserved.  Internally to this code, it can be used to
 *     indicate an unused entry.  However, a sequence id get of node zero
 *     will return zero.
 *
 * As far as this implementation of vector clocks is concerned, the ent[]
 * array is managed as follows:
 *
 * (1) nodes with zero sequence id are *not* stored.  This is important for
 *     the set function.  Whenever the new sequence id ends up equal to
 *     zero, the <node, seq> pair is removed from the vector.
 * (2) entries may not be used contiguously.
 */

struct nvclock *nvclock_alloc(void)
{
	return zalloc(sizeof(struct nvclock));
}

struct nvclock *nvclock_dup(struct nvclock *clock)
{
	struct nvclock *ret;

	ret = nvclock_alloc();
	if (!ret)
		return NULL;

	memcpy(ret, clock, sizeof(*ret));

	return ret;
}

void nvclock_free(struct nvclock *clock)
{
	free(clock);
}

static struct nvclockent *__get_ent(struct nvclock *clock, uint64_t node,
				    bool alloc)
{
	int i;

	if (!clock || !node)
		return ERR_PTR(EINVAL);

	for (i = 0; i < NVCLOCK_NUM_NODES; i++)
		if (clock->ent[i].node == node)
			return &clock->ent[i];

	if (!alloc)
		return ERR_PTR(ENOENT);

	/*
	 * We didn't find it; we're supposed to allocate an entry.
	 */

	for (i = 0; i < NVCLOCK_NUM_NODES; i++) {
		if (!clock->ent[i].node) {
			clock->ent[i].node = node;
			return &clock->ent[i];
		}
	}

	return ERR_PTR(ENOMEM);
}

/*
 * Get @clock's @node sequence id.
 */
uint64_t nvclock_get_node(struct nvclock *clock, uint64_t node)
{
	struct nvclockent *ent;

	ent = __get_ent(clock, node, false);

	return IS_ERR(ent) ? 0 : ent->seq;
}

uint64_t nvclock_get(struct nvclock *clock)
{
	return nvclock_get_node(clock, nomad_local_node_id());
}

/*
 * Remove @node from @clock.
 */
int nvclock_remove_node(struct nvclock *clock, uint64_t node)
{
	struct nvclockent *ent;

	if (!clock || !node)
		return EINVAL;

	ent = __get_ent(clock, node, false);
	if (IS_ERR(ent))
		return 0;

	ent->node = 0;
	ent->seq = 0;

	return 0;
}

int nvclock_remove(struct nvclock *clock)
{
	return nvclock_remove_node(clock, nomad_local_node_id());
}

/*
 * Set @clock's @node to @seq.
 */
int nvclock_set_node(struct nvclock *clock, uint64_t node, uint64_t seq)
{
	struct nvclockent *ent;

	if (!seq)
		return nvclock_remove_node(clock, node);

	ent = __get_ent(clock, node, true);
	if (IS_ERR(ent))
		return PTR_ERR(ent);

	ent->seq = seq;

	return 0;
}

int nvclock_set(struct nvclock *clock, uint64_t seq)
{
	return nvclock_set_node(clock, nomad_local_node_id(), seq);
}

/*
 * Increment @clock's @node by 1.
 */
int nvclock_inc_node(struct nvclock *clock, uint64_t node)
{
	struct nvclockent *ent;

	ent = __get_ent(clock, node, true);
	if (IS_ERR(ent))
		return PTR_ERR(ent);

	ent->seq++;

	return 0;
}

int nvclock_inc(struct nvclock *clock)
{
	return nvclock_inc_node(clock, nomad_local_node_id());
}

static int vc_cmp(const void *va, const void *vb)
{
	const struct nvclockent *a = va;
	const struct nvclockent *b = vb;

	if (a->node < b->node)
		return -1;
	if (a->node > b->node)
		return 1;
	return 0;
}

/*
 * Are all elements in @u <= than their corresponding elements in @v?
 */
static bool all_le(struct nvclock *u, int i, struct nvclock *v, int j)
{
	while ((i < NVCLOCK_NUM_NODES) && (j < NVCLOCK_NUM_NODES)) {
		struct nvclockent *a = &u->ent[i];
		struct nvclockent *b = &v->ent[j];


		if (a->node < b->node) {
			/*
			 * @v doesn't have this node, therefore it is
			 * assumed to be zero.  Since @u's element won't be
			 * zero (otherwise it'd be omitted), we are
			 * essentially asking this question:
			 *
			 * x <= 0, where x >= 1
			 *
			 * which is obviously false.
			 */
			return false;
		}

		if (a->node == b->node) {
			if (a->seq > b->seq)
				return false;

			/* advance both */
			i++;
			j++;
		} else {
			/* let @u catch up */
			i++;
		}
	}

	return true;
}

static enum nvclockcmp __nvclock_cmp(struct nvclock *u, struct nvclock *v,
				     int i, int j)
{
	/* handle some trivial cases (one or both vectors being empty) */
	if (i == NVCLOCK_NUM_NODES && j == NVCLOCK_NUM_NODES)
		return NVC_EQ;
	if (i == NVCLOCK_NUM_NODES)
		return NVC_LT;
	if (j == NVCLOCK_NUM_NODES)
		return NVC_GT;

	/*
	 * We have two non-empty vectors.
	 */
	if (i == j) {
		bool all_eq;
		int tmp;

		/*
		 * Both vectors have the same number of elements so they
		 * have a chance of being equal
		 */
		all_eq = true;

		for (tmp = i; all_eq && (tmp < NVCLOCK_NUM_NODES); tmp++)
			all_eq = ((u->ent[tmp].node == v->ent[tmp].node) &&
				  (u->ent[tmp].seq == v->ent[tmp].seq));

		if (all_eq)
			return NVC_EQ;
	}

	if (all_le(u, i, v, j))
		return NVC_LT;

	if (all_le(v, j, u, i))
		return NVC_GT;

	return NVC_DIV;
}

/*
 * Turn @a and @b into @u and @v and initialize @_i and @_j to work with @u
 * and @v.
 */
static void __nvclock_cmp_prep(const struct nvclock *a, struct nvclock *u,
			       int *_i, const struct nvclock *b,
			       struct nvclock *v, int *_j)
{
	int i, j;

	*u = *a;
	*v = *b;

	qsort(u->ent, NVCLOCK_NUM_NODES, sizeof(struct nvclockent), vc_cmp);
	qsort(v->ent, NVCLOCK_NUM_NODES, sizeof(struct nvclockent), vc_cmp);

	/* find first non-zero node */
	for (i = 0; i < NVCLOCK_NUM_NODES; i++)
		if (u->ent[i].node)
			break;

	for (j = 0; j < NVCLOCK_NUM_NODES; j++)
		if (v->ent[j].node)
			break;

	*_i = i;
	*_j = j;
}

enum nvclockcmp nvclock_cmp(const struct nvclock *c1, const struct nvclock *c2)
{
	struct nvclock u, v;
	int i, j;

	__nvclock_cmp_prep(c1, &u, &i, c2, &v, &j);

	return __nvclock_cmp(&u, &v, i, j);
}

int nvclock_cmp_total(const struct nvclock *c1, const struct nvclock *c2)
{
	struct nvclock u, v;
	int i, j;

	__nvclock_cmp_prep(c1, &u, &i, c2, &v, &j);

	switch (__nvclock_cmp(&u, &v, i, j)) {
		case NVC_LT:
			return -1;
		case NVC_EQ:
			return 0;
		case NVC_GT:
			return 1;
		case NVC_DIV:
			/* this is the hard part...see below */
			break;
	}

	/*
	 * We have two clocks and neither is an ancestor of the other.
	 * Since we are after a total ordering, we need to decide whether we
	 * should return -1 or +1.  (We cannot return 0 since that'd imply
	 * that the two clocks are equivalent - and they definitely aren't.)
	 *
	 * Note that (c1, c2) must return the opposite value of (c2, c1)
	 * otherwise this comparator can't be used to sort deterministically.
	 *
	 * We sort both vector clocks and then we compare them element-wise.
	 * The first element that isn't equal determines the direction of
	 * the result we return.  This has the nice property of producing
	 * human friendly sorting as well.
	 *
	 * For example, suppose we are comparing the following vector
	 * clocks:
	 *
	 *  c1 = { <1, 1>, <3, 1> }
	 *  c2 = { <1, 1>, <2, 1> }
	 *
	 * Since the first element is the same, we skip it.  The second
	 * element, being the first unequal element, determines our output.
	 * In this case, since node 3 is numerically greater than node 2, we
	 * return +1 to indicate "greater than".
	 */

	u = *c1;
	v = *c2;

	qsort(u.ent, NVCLOCK_NUM_NODES, sizeof(struct nvclockent), vc_cmp);
	qsort(v.ent, NVCLOCK_NUM_NODES, sizeof(struct nvclockent), vc_cmp);

	/* find first non-zero node */
	for (i = 0; i < NVCLOCK_NUM_NODES; i++)
		if (u.ent[i].node)
			break;

	for (j = 0; j < NVCLOCK_NUM_NODES; j++)
		if (v.ent[j].node)
			break;

	for (; i < NVCLOCK_NUM_NODES && j < NVCLOCK_NUM_NODES; i++, j++) {
		/* compare the nodes */
		if (u.ent[i].node > v.ent[j].node)
			return 1;
		if (u.ent[i].node < v.ent[j].node)
			return -1;

		/* nodes are the same, compare the sequence numbers */
		if (u.ent[i].seq > v.ent[i].seq)
			return 1;
		if (u.ent[i].seq < v.ent[i].seq)
			return -1;

		/* sequence numbers are the same, move onto the next entry */
	}

	/*
	 * One or both of the vectors reached an end.  If both reached an
	 * end, that means that everything we compared was identical.  But
	 * that can't be because __nvclock_cmp() would have returned NVC_EQ.
	 * So, in reality we should have reached only one of the vector's
	 * end.  Whichever vector still has elements remaining is considered
	 * greater of the two.
	 */

	if (i == NVCLOCK_NUM_NODES)
		return -1;
	if (j == NVCLOCK_NUM_NODES)
		return 1;

	ASSERT(0);
	return 0xbad; /* pacify gcc */
}

bool_t xdr_nvclock(XDR *xdrs, struct nvclock **arg)
{
	bool shouldfree = false;
	struct nvclock dummy;
	struct nvclock *clock;
	int i;

	clock = *arg;

	if (xdrs->x_op == XDR_FREE) {
		nvclock_free(clock);
		return TRUE;
	}

	/*
	 * If we don't have clock...
	 */
	if (!clock && (xdrs->x_op == XDR_ENCODE)) {
		/*
		 * ...send one filled with zeros.
		 */
		memset(&dummy, 0, sizeof(struct nvclock));
		clock = &dummy;
	} else if (!clock && (xdrs->x_op == XDR_DECODE)) {
		/*
		 * ...recieve into a newly allocated one.
		 */
		clock = nvclock_alloc();
		if (!clock)
			return FALSE;
		*arg = clock;
		shouldfree = true;
	}

	for (i = 0; i < NVCLOCK_NUM_NODES; i++) {
		if (!xdr_uint64_t(xdrs, &clock->ent[i].node))
			goto err;
		if (!xdr_uint64_t(xdrs, &clock->ent[i].seq))
			goto err;
	}

	return TRUE;

err:
	if (shouldfree) {
		nvclock_free(clock);
		*arg = NULL;
	}

	return FALSE;
}