changeset 70:d0275feca971

common: finish implementing nvclock_cmp_total Resolves #5. Signed-off-by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
author Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
date Mon, 17 Aug 2015 18:14:42 -0400
parents 74d61c2c754b
children 93c2347efced
files src/common/vclock.c
diffstat 1 files changed, 114 insertions(+), 28 deletions(-) [+]
line wrap: on
line diff
--- a/src/common/vclock.c	Tue Jul 07 21:47:30 2015 -0400
+++ b/src/common/vclock.c	Mon Aug 17 18:14:42 2015 -0400
@@ -238,27 +238,10 @@
 	return true;
 }
 
-enum nvclockcmp nvclock_cmp(const struct nvclock *c1, const struct nvclock *c2)
+static enum nvclockcmp __nvclock_cmp(struct nvclock *u, struct nvclock *v,
+				     int i, int j)
 {
-	struct nvclock u, v;
-	int i, j;
-
-	u = *c1;
-	v = *c2;
-
-	qsort(u.ent, NVCLOCK_NUM_NODES, sizeof(struct nvclockent), vc_cmp);
-	qsort(v.ent, NVCLOCK_NUM_NODES, sizeof(struct nvclockent), vc_cmp);
-
-	/* find first non-zero node */
-	for (i = 0; i < NVCLOCK_NUM_NODES; i++)
-		if (u.ent[i].node)
-			break;
-
-	for (j = 0; j < NVCLOCK_NUM_NODES; j++)
-		if (v.ent[j].node)
-			break;
-
-	/* handle some trivial cases (one or both vectors being empty */
+	/* handle some trivial cases (one or both vectors being empty) */
 	if (i == NVCLOCK_NUM_NODES && j == NVCLOCK_NUM_NODES)
 		return NVC_EQ;
 	if (i == NVCLOCK_NUM_NODES)
@@ -280,25 +263,69 @@
 		all_eq = true;
 
 		for (tmp = i; all_eq && (tmp < NVCLOCK_NUM_NODES); tmp++)
-			all_eq = ((u.ent[tmp].node == v.ent[tmp].node) &&
-				  (u.ent[tmp].seq == v.ent[tmp].seq));
+			all_eq = ((u->ent[tmp].node == v->ent[tmp].node) &&
+				  (u->ent[tmp].seq == v->ent[tmp].seq));
 
 		if (all_eq)
 			return NVC_EQ;
 	}
 
-	if (all_le(&u, i, &v, j))
+	if (all_le(u, i, v, j))
 		return NVC_LT;
 
-	if (all_le(&v, j, &u, i))
+	if (all_le(v, j, u, i))
 		return NVC_GT;
 
 	return NVC_DIV;
 }
 
+/*
+ * Turn @a and @b into @u and @v and initialize @_i and @_j to work with @u
+ * and @v.
+ */
+static void __nvclock_cmp_prep(const struct nvclock *a, struct nvclock *u,
+			       int *_i, const struct nvclock *b,
+			       struct nvclock *v, int *_j)
+{
+	int i, j;
+
+	*u = *a;
+	*v = *b;
+
+	qsort(u->ent, NVCLOCK_NUM_NODES, sizeof(struct nvclockent), vc_cmp);
+	qsort(v->ent, NVCLOCK_NUM_NODES, sizeof(struct nvclockent), vc_cmp);
+
+	/* find first non-zero node */
+	for (i = 0; i < NVCLOCK_NUM_NODES; i++)
+		if (u->ent[i].node)
+			break;
+
+	for (j = 0; j < NVCLOCK_NUM_NODES; j++)
+		if (v->ent[j].node)
+			break;
+
+	*_i = i;
+	*_j = j;
+}
+
+enum nvclockcmp nvclock_cmp(const struct nvclock *c1, const struct nvclock *c2)
+{
+	struct nvclock u, v;
+	int i, j;
+
+	__nvclock_cmp_prep(c1, &u, &i, c2, &v, &j);
+
+	return __nvclock_cmp(&u, &v, i, j);
+}
+
 int nvclock_cmp_total(const struct nvclock *c1, const struct nvclock *c2)
 {
-	switch (nvclock_cmp(c1, c2)) {
+	struct nvclock u, v;
+	int i, j;
+
+	__nvclock_cmp_prep(c1, &u, &i, c2, &v, &j);
+
+	switch (__nvclock_cmp(&u, &v, i, j)) {
 		case NVC_LT:
 			return -1;
 		case NVC_EQ:
@@ -319,9 +346,68 @@
 	 * Note that (c1, c2) must return the opposite value of (c2, c1)
 	 * otherwise this comparator can't be used to sort deterministically.
 	 *
-	 * XXX: describe algo
+	 * We sort both vector clocks and then we compare them element-wise.
+	 * The first element that isn't equal determines the direction of
+	 * the result we return.  This has the nice property of producing
+	 * human friendly sorting as well.
+	 *
+	 * For example, suppose we are comparing the following vector
+	 * clocks:
+	 *
+	 *  c1 = { <1, 1>, <3, 1> }
+	 *  c2 = { <1, 1>, <2, 1> }
+	 *
+	 * Since the first element is the same, we skip it.  The second
+	 * element, being the first unequal element, determines our output.
+	 * In this case, since node 3 is numerically greater than node 2, we
+	 * return +1 to indicate "greater than".
 	 */
 
-#warning "not yet implemented"
-	// XXX
+	u = *c1;
+	v = *c2;
+
+	qsort(u.ent, NVCLOCK_NUM_NODES, sizeof(struct nvclockent), vc_cmp);
+	qsort(v.ent, NVCLOCK_NUM_NODES, sizeof(struct nvclockent), vc_cmp);
+
+	/* find first non-zero node */
+	for (i = 0; i < NVCLOCK_NUM_NODES; i++)
+		if (u.ent[i].node)
+			break;
+
+	for (j = 0; j < NVCLOCK_NUM_NODES; j++)
+		if (v.ent[j].node)
+			break;
+
+	for (; i < NVCLOCK_NUM_NODES && j < NVCLOCK_NUM_NODES; i++, j++) {
+		/* compare the nodes */
+		if (u.ent[i].node > v.ent[j].node)
+			return 1;
+		if (u.ent[i].node < v.ent[j].node)
+			return -1;
+
+		/* nodes are the same, compare the sequence numbers */
+		if (u.ent[i].seq > v.ent[i].seq)
+			return 1;
+		if (u.ent[i].seq < v.ent[i].seq)
+			return -1;
+
+		/* sequence numbers are the same, move onto the next entry */
+	}
+
+	/*
+	 * One or both of the vectors reached an end.  If both reached an
+	 * end, that means that everything we compared was identical.  But
+	 * that can't be because __nvclock_cmp() would have returned NVC_EQ.
+	 * So, in reality we should have reached only one of the vector's
+	 * end.  Whichever vector still has elements remaining is considered
+	 * greater of the two.
+	 */
+
+	if (i == NVCLOCK_NUM_NODES)
+		return -1;
+	if (j == NVCLOCK_NUM_NODES)
+		return 1;
+
+	ASSERT(0);
+	return 0xbad; /* pacify gcc */
 }