changeset 88:2875fe2d8fd5

ubx: switch from stdio to read/write syscalls Occasionally, we'd run into deadlocks between iothread calling fgetc and the main thread calling fwrite. Instead of trying to diagnose the reason, it is simpler to just switch to xread/xwrite. Signed-off-by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
author Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
date Mon, 22 Feb 2021 09:14:24 -0500
parents 0e4ab6c2a99c
children 2d760d9d3435
files capture.c iothread.c iothread.h ubx.c ubx.h
diffstat 5 files changed, 77 insertions(+), 78 deletions(-) [+]
line wrap: on
line diff
--- a/capture.c	Mon Feb 22 08:46:16 2021 -0500
+++ b/capture.c	Mon Feb 22 09:14:24 2021 -0500
@@ -31,8 +31,6 @@
 
 #define DEFAULT_UBXPORT	3
 
-static char file_buffer[65536 + 8]; /* worst case message */
-
 static void usage(const char *prog)
 {
 	fprintf(stderr, "Usage: %s [-cegrs] -d <device> [-u <ubxport>] "
@@ -51,7 +49,7 @@
 	exit(1);
 }
 
-static int cfg_port(FILE *file, bool ro, uint8_t ubxport, bool beidou,
+static int cfg_port(int fd, bool ro, uint8_t ubxport, bool beidou,
 		    bool galileo, bool gps, bool glonass, bool sbas)
 {
 	struct ubx_cfg_prt_uart prt_uart = {
@@ -143,29 +141,29 @@
 	}
 
 	/* request version info */
-	ret = send_ubx(file, UBX_MON_VER, NULL, 0);
+	ret = send_ubx(fd, UBX_MON_VER, NULL, 0);
 	if (ret)
 		return ret;
 
 	/* request serial number */
-	ret = send_ubx(file, UBX_SEC_UNIQID, NULL, 0);
+	ret = send_ubx(fd, UBX_SEC_UNIQID, NULL, 0);
 	if (ret)
 		return ret;
 
 	/* disable NMEA, enable UBX */
-	ret = send_ubx_with_ack(file, UBX_CFG_PRT, prt,
+	ret = send_ubx_with_ack(fd, UBX_CFG_PRT, prt,
 				sizeof(struct ubx_cfg_prt_uart));
 	if (ret)
 		return ret;
 
 	/* set dynamics mode */
-	ret = send_ubx_with_ack(file, UBX_CFG_NAV5, &nav5,
+	ret = send_ubx_with_ack(fd, UBX_CFG_NAV5, &nav5,
 				sizeof(struct ubx_cfg_nav5));
 	if (ret)
 		return ret;
 
 	/* enable gnss systems */
-	ret = send_ubx_with_ack(file, UBX_CFG_GNSS, &gnss,
+	ret = send_ubx_with_ack(fd, UBX_CFG_GNSS, &gnss,
 				sizeof(struct ubx_cfg_gnss));
 	if (ret)
 		return ret;
@@ -190,7 +188,7 @@
 
 		fprintf(stderr, "Enabling %s (%04x)...\n", ubx_msg_name(id), id);
 
-		ret = enable_ubx_msg(file, id, ubxport, rate);
+		ret = enable_ubx_msg(fd, id, ubxport, rate);
 		if (ret)
 			return ret;
 	}
@@ -212,11 +210,10 @@
 	bool enable_sbas;
 	uint8_t ubxport;
 	bool verbose;
-	FILE *ifile;
+	int ifile;
 	FILE *rfile;
 	int opt;
 	int ret;
-	int fd;
 
 	device = NULL;
 	log = NULL;
@@ -307,15 +304,15 @@
 		input_readonly = !S_ISCHR(stat.st_mode);
 	}
 
-	fd = xopen(device, input_readonly ? O_RDONLY : O_RDWR, 0);
-	if (fd < 0) {
+	ifile = xopen(device, input_readonly ? O_RDONLY : O_RDWR, 0);
+	if (ifile < 0) {
 		fprintf(stderr, "Error: Could not open device %s: %s\n",
-			device, xstrerror(fd));
+			device, xstrerror(ifile));
 		usage(argv[0]);
 	}
 
 	if (!input_readonly) {
-		if (tcgetattr(fd, &termios) < 0) {
+		if (tcgetattr(ifile, &termios) < 0) {
 			fprintf(stderr, "Error: Failed to get tty attrs: %s\n",
 				xstrerror(-errno));
 			return 6;
@@ -335,20 +332,13 @@
 			return 7;
 		}
 
-		if (tcsetattr(fd, TCSAFLUSH, &termios)) {
+		if (tcsetattr(ifile, TCSAFLUSH, &termios)) {
 			fprintf(stderr, "Error: Failed to set attrs on device: %s\n",
 				xstrerror(-errno));
 			return 8;
 		}
 	}
 
-	ifile = fdopen(fd, input_readonly ? "rb" : "wb+");
-	if (ifile == NULL) {
-		fprintf(stderr, "Error: Failed to fdopen: %s\n",
-			xstrerror(-errno));
-		return 9;
-	}
-
 	rfile = fopen(log, "wb");
 	if (rfile == NULL) {
 		fprintf(stderr, "Error: Failed to fopen log: %s\n",
@@ -356,9 +346,6 @@
 		return 9;
 	}
 
-	if (setvbuf(ifile, file_buffer, _IOFBF, sizeof(file_buffer)) == EOF)
-		fprintf(stderr, "Warn: Failed to mark stream as buffered\n");
-
 	/* make sure ack queuing is set up */
 	ubx_init_queue();
 
@@ -382,7 +369,7 @@
 	for (;;)
 		sleep(3600);
 
-	fclose(ifile);
+	xclose(ifile);
 	fclose(rfile);
 
 	return 0;
--- a/iothread.c	Mon Feb 22 08:46:16 2021 -0500
+++ b/iothread.c	Mon Feb 22 09:14:24 2021 -0500
@@ -38,13 +38,18 @@
 #define VERSION_POLL_INTERVAL (30 * 60 * 1000000000ull) /* 30 minutes */
 
 static pthread_t io_thread;
-static FILE *input_file;
+static int input_file;
 static FILE *log_file;
 static bool verbose;
 static bool poll_version;
 static uint32_t seq; /* message sequence number */
 
-/* read until we get the beginning of a NMEA or UBX message */
+/*
+ * read until we get the beginning of a NMEA or UBX message
+ *
+ * Returns a negated errno on error, 0 if we found a UBX message sync byte,
+ * and 1 if we found a NMEA sync byte.
+ */
 static int sync_reader(void)
 {
 	size_t skipped;
@@ -52,15 +57,18 @@
 	fprintf(stderr, "Syncing reader...\n");
 
 	for (skipped = 0;; skipped++) {
-		int byte;
+		uint8_t byte;
+		int ret;
 
-		byte = fgetc(input_file);
-		if (byte == EOF)
-			return -EPIPE;
+		ret = xread(input_file, &byte, sizeof(byte));
+		if (ret)
+			return ret;
 
-		if ((byte == '$') || (byte == UBX_SYNC_BYTE_1)) {
-			fprintf(stderr, "Sync discarded %zu bytes\n", skipped);
-			return (ungetc(byte, input_file) == EOF) ? -ENOMEM : 0;
+		switch (byte) {
+			case '$':
+				return 1;
+			case UBX_SYNC_BYTE_1:
+				return 0;
 		}
 	}
 }
@@ -128,9 +136,16 @@
 	struct ubx_header hdr;
 	int ret;
 
-	ret = xfread(input_file, &hdr, sizeof(hdr));
-	if (ret)
+	/* the caller read the first byte */
+	hdr.sync[0] = UBX_SYNC_BYTE_1;
+
+	/* read the rest of the header */
+	ret = xread(input_file, &hdr.sync[1], sizeof(hdr) - 1);
+	if (ret) {
+		fprintf(stderr, "Failed to read UBX header: %s\n",
+			xstrerror(ret));
 		return ret;
+	}
 
 	hdr.len = le16_to_cpu(hdr.len);
 
@@ -148,7 +163,7 @@
 
 	uint8_t raw[hdr.len + 2];
 
-	ret = xfread(input_file, raw, sizeof(raw));
+	ret = xread(input_file, raw, sizeof(raw));
 	if (ret)
 		return ret;
 
@@ -185,11 +200,12 @@
 	size_t skipped;
 
 	for (skipped = 0;; skipped++) {
-		int byte;
+		uint8_t byte;
+		int ret;
 
-		byte = fgetc(input_file);
-		if (byte == EOF)
-			return -EPIPE;
+		ret = xread(input_file, &byte, sizeof(byte));
+		if (ret)
+			return ret;
 
 		if (byte == '\n')
 			break;
@@ -203,15 +219,19 @@
 static void *iothread_reader(void *data)
 {
 	uint64_t last_poll_time;
+	uint8_t byte;
 	int ret;
 
 	ret = sync_reader();
-	if (ret) {
+	if (ret < 0) {
 		fprintf(stderr, "Failed to synchronize reader: %s\n",
 			xstrerror(ret));
 		return NULL;
 	}
 
+	/* set up the sync byte for the first iteration */
+	byte = (ret == 0) ? UBX_SYNC_BYTE_1 : '$';
+
 	/* setup already got it all */
 	last_poll_time = gettime();
 
@@ -219,23 +239,10 @@
 	for (;;) {
 		uint64_t start_tick;
 		uint64_t start_time;
-		int byte;
-
-		byte = fgetc(input_file);
-		if (byte == EOF) {
-			fprintf(stderr, "Failed to read message sync byte\n");
-			return NULL;
-		}
 
 		start_tick = gettick();
 		start_time = gettime();
 
-		/* easier to unget than to shift data around */
-		if (ungetc(byte, input_file) == EOF) {
-			fprintf(stderr, "Failed to unget sync byte\n");
-			return NULL;
-		}
-
 		if (byte == UBX_SYNC_BYTE_1) {
 			ret = read_ubx(start_time, start_tick);
 		} else if (byte == '$') {
@@ -274,12 +281,20 @@
 
 			last_poll_time = gettime();
 		}
+
+		/* read the sync byte for the next iteration */
+		ret = xread(input_file, &byte, sizeof(byte));
+		if (ret) {
+			fprintf(stderr, "Failed to read message sync byte: %s\n",
+				xstrerror(ret));
+			return NULL;
+		}
 	}
 
 	return NULL;
 }
 
-int iothread_start(FILE *ifile, FILE *lfile, bool _verbose, bool _poll_version)
+int iothread_start(int ifile, FILE *lfile, bool _verbose, bool _poll_version)
 {
 	input_file = ifile;
 	log_file = lfile;
--- a/iothread.h	Mon Feb 22 08:46:16 2021 -0500
+++ b/iothread.h	Mon Feb 22 09:14:24 2021 -0500
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2019-2020 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
+ * Copyright (c) 2019-2021 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
  *
  * Permission is hereby granted, free of charge, to any person obtaining a copy
  * of this software and associated documentation files (the "Software"), to deal
@@ -27,7 +27,7 @@
 
 #include <jeffpc/types.h>
 
-extern int iothread_start(FILE *ifile, FILE *lfile, bool verbose,
+extern int iothread_start(int ifile, FILE *lfile, bool verbose,
 		          bool poll_version);
 
 #endif
--- a/ubx.c	Mon Feb 22 08:46:16 2021 -0500
+++ b/ubx.c	Mon Feb 22 09:14:24 2021 -0500
@@ -26,7 +26,6 @@
 #include <jeffpc/list.h>
 
 #include "ubx.h"
-#include "xstdio.h"
 #include "rfc1145.h"
 
 static LOCK_CLASS(ack_queue_lc);
@@ -77,7 +76,7 @@
 	return names[id];
 }
 
-static int __send_ubx(FILE *file, enum ubx_msg_id id, const void *data,
+static int __send_ubx(int fd, enum ubx_msg_id id, const void *data,
 		      size_t len)
 {
 	struct ubx_header hdr = {
@@ -99,11 +98,11 @@
 			le16_to_cpu(hdr.len), out);
 	}
 
-	ret = xfwrite(file, &hdr, sizeof(hdr));
+	ret = xwrite(fd, &hdr, sizeof(hdr));
 	if (ret)
 		return ret;
 
-	ret = xfwrite(file, data, len);
+	ret = xwrite(fd, data, len);
 	if (ret)
 		return ret;
 
@@ -111,14 +110,14 @@
 					  hdr.len & 0xff, hdr.len >> 8,
 					  data, len));
 
-	ret = xfwrite(file, &cksum, sizeof(cksum));
+	ret = xwrite(fd, &cksum, sizeof(cksum));
 	if (ret)
 		return ret;
 
 	return 0;
 }
 
-static bool __read_ubx_ack(FILE *file, enum ubx_msg_id id)
+static bool __read_ubx_ack(int fd, enum ubx_msg_id id)
 {
 	struct ack_waiter waiter;
 
@@ -165,33 +164,33 @@
 	MXUNLOCK(&ack_lock);
 }
 
-int send_ubx(FILE *file, enum ubx_msg_id id, const void *data, size_t len)
+int send_ubx(int fd, enum ubx_msg_id id, const void *data, size_t len)
 {
-	return __send_ubx(file, id, data, len);
+	return __send_ubx(fd, id, data, len);
 }
 
-int send_ubx_with_ack(FILE *file, enum ubx_msg_id id, const void *data,
+int send_ubx_with_ack(int fd, enum ubx_msg_id id, const void *data,
 		      size_t len)
 {
 	bool ack;
 	int ret;
 
-	ret = __send_ubx(file, id, data, len);
+	ret = __send_ubx(fd, id, data, len);
 	if (ret)
 		return ret;
 
-	ack = __read_ubx_ack(file, id);
+	ack = __read_ubx_ack(fd, id);
 
 	return ack ? 0 : -ENOTSUP;
 }
 
-int enable_ubx_msg(FILE *file, enum ubx_msg_id id, int port, int rate)
+int enable_ubx_msg(int fd, enum ubx_msg_id id, int port, int rate)
 {
 	uint8_t raw[8] = { id >> 8, id & 0xff, 0, 0, 0, 0, 0, 0 };
 
 	raw[2 + port] = rate;
 
-	return send_ubx_with_ack(file, UBX_CFG_MSG, raw, sizeof(raw));
+	return send_ubx_with_ack(fd, UBX_CFG_MSG, raw, sizeof(raw));
 }
 
 bool parse_and_process_ubx_message(const uint8_t *buf, size_t len,
--- a/ubx.h	Mon Feb 22 08:46:16 2021 -0500
+++ b/ubx.h	Mon Feb 22 09:14:24 2021 -0500
@@ -23,8 +23,6 @@
 #ifndef __UBX_H
 #define __UBX_H
 
-#include <stdio.h>
-
 #include <jeffpc/error.h>
 #include <jeffpc/types.h>
 
@@ -282,11 +280,11 @@
 
 extern void ubx_init_queue(void);
 extern const char *ubx_msg_name(enum ubx_msg_id id);
-extern int send_ubx(FILE *file, enum ubx_msg_id id, const void *data,
+extern int send_ubx(int fd, enum ubx_msg_id id, const void *data,
 		    size_t len);
-extern int send_ubx_with_ack(FILE *file, enum ubx_msg_id id, const void *data,
+extern int send_ubx_with_ack(int fd, enum ubx_msg_id id, const void *data,
 			     size_t len);
-extern int enable_ubx_msg(FILE *file, enum ubx_msg_id id, int port, int rate);
+extern int enable_ubx_msg(int fd, enum ubx_msg_id id, int port, int rate);
 extern void notify_ubx_ack(const uint8_t *data, size_t len, bool success);
 
 extern bool parse_and_process_ubx_message(const uint8_t *buf, size_t len,