view iothread.c @ 88:2875fe2d8fd5

ubx: switch from stdio to read/write syscalls Occasionally, we'd run into deadlocks between iothread calling fgetc and the main thread calling fwrite. Instead of trying to diagnose the reason, it is simpler to just switch to xread/xwrite. Signed-off-by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
author Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
date Mon, 22 Feb 2021 09:14:24 -0500
parents 0e4ab6c2a99c
children 71cae6a9a299
line wrap: on
line source

/*
 * Copyright (c) 2019-2021 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

#include <jeffpc/thread.h>
#include <jeffpc/types.h>
#include <jeffpc/io.h>
#include <jeffpc/hexdump.h>
#include <jeffpc/time.h>
#include <jeffpc/rand.h>

#include <unistd.h>

#include "iothread.h"
#include "ubx.h"
#include "xstdio.h"
#include "rfc1145.h"
#include "frame.h"

#define VERSION_POLL_INTERVAL (30 * 60 * 1000000000ull) /* 30 minutes */

static pthread_t io_thread;
static int input_file;
static FILE *log_file;
static bool verbose;
static bool poll_version;
static uint32_t seq; /* message sequence number */

/*
 * read until we get the beginning of a NMEA or UBX message
 *
 * Returns a negated errno on error, 0 if we found a UBX message sync byte,
 * and 1 if we found a NMEA sync byte.
 */
static int sync_reader(void)
{
	size_t skipped;

	fprintf(stderr, "Syncing reader...\n");

	for (skipped = 0;; skipped++) {
		uint8_t byte;
		int ret;

		ret = xread(input_file, &byte, sizeof(byte));
		if (ret)
			return ret;

		switch (byte) {
			case '$':
				return 1;
			case UBX_SYNC_BYTE_1:
				return 0;
		}
	}
}

static int verify_cksum(struct ubx_header *hdr, const uint8_t *raw,
			 uint16_t got)
{
	uint16_t exp;

	exp = rfc1145_cksum(hdr->class, hdr->id,
			    hdr->len & 0xff, hdr->len >> 8,
			    raw, hdr->len);

	if (exp != got)
		fprintf(stderr, "UBX message checksum mismatch (got %04x exp %04x)\n",
			got, exp);

	return (exp == got) ? 0 : -ECKSUM;
}

static int log_raw_ubx(struct ubx_header *_hdr, uint8_t *raw,
		       uint64_t start_time, uint64_t start_tick)
{
	static uint32_t session;
	struct ubx_header hdr = *_hdr;
	struct frame frame;
	int ret;

	/* no log == no-op */
	if (!log_file)
		return 0;

	if (!session)
		session = rand32();

	frame.magic   = cpu32_to_be(FRAME_MAGIC);
	frame.session = cpu32_to_be(session);
	frame.tick    = cpu64_to_be(start_tick);
	frame.time    = cpu64_to_be(start_time);
	frame.len     = cpu32_to_be(sizeof(hdr) + _hdr->len + 2);
	frame.seq     = cpu32_to_be(seq++);
	frame._pad    = 0;

	ret = xfwrite(log_file, &frame, sizeof(frame));
	if (ret)
		return ret;

	hdr.len = cpu16_to_le(hdr.len);

	ret = xfwrite(log_file, &hdr, sizeof(hdr));
	if (ret)
		return ret;

	ret = xfwrite(log_file, raw, _hdr->len + 2);
	if (ret)
		return ret;

	fflush(log_file);

	return 0;
}

static int read_ubx(uint64_t start_time, uint64_t start_tick)
{
	struct ubx_header hdr;
	int ret;

	/* the caller read the first byte */
	hdr.sync[0] = UBX_SYNC_BYTE_1;

	/* read the rest of the header */
	ret = xread(input_file, &hdr.sync[1], sizeof(hdr) - 1);
	if (ret) {
		fprintf(stderr, "Failed to read UBX header: %s\n",
			xstrerror(ret));
		return ret;
	}

	hdr.len = le16_to_cpu(hdr.len);

	if ((hdr.sync[0] != UBX_SYNC_BYTE_1) ||
	    (hdr.sync[1] != UBX_SYNC_BYTE_2)) {
		fprintf(stderr, "UBX message sync byte mismatch %02x %02x\n",
			hdr.sync[0], hdr.sync[1]);
		return -EILSEQ;
	}

#if 0
	fprintf(stderr, "Received UBX message: %02x %02x len %u (%04x)\n",
		hdr.class, hdr.id, hdr.len, hdr.len);
#endif

	uint8_t raw[hdr.len + 2];

	ret = xread(input_file, raw, sizeof(raw));
	if (ret)
		return ret;

	ret = verify_cksum(&hdr, raw, le16_to_cpu_unaligned(&raw[hdr.len]));
	if (ret)
		return ret;

	if (verbose) {
		char out[sizeof(raw)*2+1];

		hexdumpz(out, raw, hdr.len, false);
		fprintf(stderr, "< %02x%02x %02x %02x (%s) %04x %s %02x%02x\n",
			hdr.sync[0], hdr.sync[1],
			hdr.class, hdr.id,
			ubx_msg_name((hdr.class << 8) | hdr.id),
			hdr.len, out,
			raw[hdr.len], raw[hdr.len + 1]);
	}

	if ((mkmsgid(hdr.class, hdr.id) == UBX_ACK_ACK) ||
	    (mkmsgid(hdr.class, hdr.id) == UBX_ACK_NAK))
		notify_ubx_ack(raw, hdr.len, hdr.id);

	ret = log_raw_ubx(&hdr, raw, start_time, start_tick);
	if (ret)
		fprintf(stderr, "Failed to log raw UBX message: %s\n",
			xstrerror(ret));

	return 0;
}

static int read_nmea(void)
{
	size_t skipped;

	for (skipped = 0;; skipped++) {
		uint8_t byte;
		int ret;

		ret = xread(input_file, &byte, sizeof(byte));
		if (ret)
			return ret;

		if (byte == '\n')
			break;
	}

	fprintf(stderr, "Skipped %zu bytes of NMEA\n", skipped);

	return 0;
}

static void *iothread_reader(void *data)
{
	uint64_t last_poll_time;
	uint8_t byte;
	int ret;

	ret = sync_reader();
	if (ret < 0) {
		fprintf(stderr, "Failed to synchronize reader: %s\n",
			xstrerror(ret));
		return NULL;
	}

	/* set up the sync byte for the first iteration */
	byte = (ret == 0) ? UBX_SYNC_BYTE_1 : '$';

	/* setup already got it all */
	last_poll_time = gettime();

	/* read a NMEA or UBX message & process it */
	for (;;) {
		uint64_t start_tick;
		uint64_t start_time;

		start_tick = gettick();
		start_time = gettime();

		if (byte == UBX_SYNC_BYTE_1) {
			ret = read_ubx(start_time, start_tick);
		} else if (byte == '$') {
			ret = read_nmea();
		} else {
			fprintf(stderr, "Received unknown sync byte %02x (%c)\n",
				byte, byte);
			ret = -EILSEQ;
		}

		if (ret) {
			fprintf(stderr, "Failed to read message: %s\n",
				xstrerror(ret));
			return NULL;
		}

		if (poll_version &&
		    ((start_time - last_poll_time) > VERSION_POLL_INTERVAL)) {
			/* request the version and serial number */

			/* request version info */
			ret = send_ubx(input_file, UBX_MON_VER, NULL, 0);
			if (ret) {
				fprintf(stderr, "Failed to request periodic "
					"version info: %s\n", xstrerror(ret));
				return NULL;
			}

			/* request serial number */
			ret = send_ubx(input_file, UBX_SEC_UNIQID, NULL, 0);
			if (ret) {
				fprintf(stderr, "Failed to request periodic "
					"serial number: %s\n", xstrerror(ret));
				return NULL;
			}

			last_poll_time = gettime();
		}

		/* read the sync byte for the next iteration */
		ret = xread(input_file, &byte, sizeof(byte));
		if (ret) {
			fprintf(stderr, "Failed to read message sync byte: %s\n",
				xstrerror(ret));
			return NULL;
		}
	}

	return NULL;
}

int iothread_start(int ifile, FILE *lfile, bool _verbose, bool _poll_version)
{
	input_file = ifile;
	log_file = lfile;
	verbose = _verbose;
	poll_version = _poll_version;

	return xthr_create(&io_thread, iothread_reader, NULL);
}