changeset 49:5bfd1f433513

common: implement nvclock_cmp() This function compares two vector clocks. The result indicates whether the first clock is less than, equal to, greater than the second, or if the two clocks are diverged. Partially resolves #5. Signed-off-by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
author Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
date Wed, 01 Jul 2015 20:22:14 -0400
parents 76a1a48ed358
children 89b2b8ae97c3
files src/common/vclock.c
diffstat 1 files changed, 106 insertions(+), 2 deletions(-) [+]
line wrap: on
line diff
--- a/src/common/vclock.c	Wed Jul 01 20:18:39 2015 -0400
+++ b/src/common/vclock.c	Wed Jul 01 20:22:14 2015 -0400
@@ -186,10 +186,114 @@
 	return nvclock_inc_node(clock, nomad_local_node_id());
 }
 
+static int vc_cmp(const void *va, const void *vb)
+{
+	const struct nvclockent *a = va;
+	const struct nvclockent *b = vb;
+
+	if (a->node < b->node)
+		return -1;
+	if (a->node > b->node)
+		return 1;
+	return 0;
+}
+
+/*
+ * Are all elements in @u <= than their corresponding elements in @v?
+ */
+static bool all_le(struct nvclock *u, int i, struct nvclock *v, int j)
+{
+	while ((i < NVCLOCK_NUM_NODES) && (j < NVCLOCK_NUM_NODES)) {
+		struct nvclockent *a = &u->ent[i];
+		struct nvclockent *b = &v->ent[j];
+
+
+		if (a->node < b->node) {
+			/*
+			 * @v doesn't have this node, therefore it is
+			 * assumed to be zero.  Since @u's element won't be
+			 * zero (otherwise it'd be omitted), we are
+			 * essentially asking this question:
+			 *
+			 * x <= 0, where x >= 1
+			 *
+			 * which is obviously false.
+			 */
+			return false;
+		}
+
+		if (a->node == b->node) {
+			if (a->seq > b->seq)
+				return false;
+
+			/* advance both */
+			i++;
+			j++;
+		} else {
+			/* let @u catch up */
+			i++;
+		}
+	}
+
+	return true;
+}
+
 enum nvclockcmp nvclock_cmp(const struct nvclock *c1, const struct nvclock *c2)
 {
-#warning "not yet implemented"
-	// XXX: actually compare
+	struct nvclock u, v;
+	int i, j;
+
+	u = *c1;
+	v = *c2;
+
+	qsort(u.ent, NVCLOCK_NUM_NODES, sizeof(struct nvclockent), vc_cmp);
+	qsort(v.ent, NVCLOCK_NUM_NODES, sizeof(struct nvclockent), vc_cmp);
+
+	/* find first non-zero node */
+	for (i = 0; i < NVCLOCK_NUM_NODES; i++)
+		if (u.ent[i].node)
+			break;
+
+	for (j = 0; j < NVCLOCK_NUM_NODES; j++)
+		if (v.ent[j].node)
+			break;
+
+	/* handle some trivial cases (one or both vectors being empty */
+	if (i == NVCLOCK_NUM_NODES && j == NVCLOCK_NUM_NODES)
+		return NVC_EQ;
+	if (i == NVCLOCK_NUM_NODES)
+		return NVC_LT;
+	if (j == NVCLOCK_NUM_NODES)
+		return NVC_GT;
+
+	/*
+	 * We have two non-empty vectors.
+	 */
+	if (i == j) {
+		bool all_eq;
+		int tmp;
+
+		/*
+		 * Both vectors have the same number of elements so they
+		 * have a chance of being equal
+		 */
+		all_eq = true;
+
+		for (tmp = i; all_eq && (tmp < NVCLOCK_NUM_NODES); tmp++)
+			all_eq = ((u.ent[tmp].node == v.ent[tmp].node) &&
+				  (u.ent[tmp].seq == v.ent[tmp].seq));
+
+		if (all_eq)
+			return NVC_EQ;
+	}
+
+	if (all_le(&u, i, &v, j))
+		return NVC_LT;
+
+	if (all_le(&v, j, &u, i))
+		return NVC_GT;
+
+	return NVC_DIV;
 }
 
 int nvclock_cmp_total(const struct nvclock *c1, const struct nvclock *c2)