// SPDX-License-Identifier: CC0-1.0
// https://spdx.org/licenses/CC0-1.0.html
#define _POSIX_C_SOURCE
#define _GNU_SOURCE
#include <stdlib.h>
#include <stdint.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <limits.h>
#include <locale.h>
#include <ctype.h>
#include <stdio.h>
#include <string.h>
#include <pthread.h>
#include <termios.h>
#include <signal.h>
#include <time.h>
#include <errno.h>
/* Receive size histogram upper edge */
#ifndef READ_HISTOGRAM
# define READ_HISTOGRAM 65536
#endif
/* Silence unused parameter warnings */
#ifndef UNUSED
# define UNUSED __attribute__ ((unused))
#endif
/* Signal used internally to indicate worker thread is done */
#ifndef WORKER_SIGNAL
# define WORKER_SIGNAL (SIGRTMIN+0)
#endif
/* Clock source used for measuring receive duration */
#ifndef WORKER_CLOCK
# define WORKER_CLOCK CLOCK_MONOTONIC
#endif
/* Access mode used for the memory mapping file */
#ifndef MAPPING_MODE
# define MAPPING_MODE 0640
#endif
/* Flags used when memory mapping a file */
#ifndef MAPPING_FLAGS
# ifdef MAP_NORESERVE
# define MAPPING_FLAGS MAP_SHARED | MAP_NORESERVE
# else
# define MAPPING_FLAGS MAP_SHARED
# endif
#endif
/* Xorshift64* seed */
#ifndef SEED
# define SEED 1
#endif
/* Worker thread: Reads input from a character device.
*/
pthread_t main_thread;
pthread_t worker_thread;
struct timespec receive_started;
struct timespec receive_finished;
const union sigval worker_failure = { .sival_ptr = (void *)(intptr_t)1 };
const union sigval worker_success = { .sival_ptr = (void *)(intptr_t)0 };
const char *mapping = NULL; /* Name of memory-mapped file, if any. */
size_t mapping_len = 0; /* Size of the memory-map, if any. */
int src = -1;
unsigned char *data = NULL; /* Pointer to storage */
size_t size = 0; /* Amount to read */
volatile size_t have_ = 0; /* Use data_have() instead! */
size_t *read_histogram = NULL;
size_t read_histogram_max = 0;
static inline size_t data_have(void) { return __atomic_load_n(&have_, __ATOMIC_SEQ_CST); }
static inline size_t data_have_add(const size_t bytes) { return __atomic_add_fetch(&have_, bytes, __ATOMIC_SEQ_CST); }
void *worker(void *payload UNUSED)
{
unsigned char start[1] = { 1 };
unsigned char end[1] = { 0 };
size_t have = data_have();
ssize_t n;
/* Send start byte. */
do {
n = write(src, start, 1);
} while (n == -1 && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK));
if (n != 1) {
void *retval = (void *)(intptr_t)(n == -1 ? errno : EINVAL);
pthread_sigqueue(main_thread, WORKER_SIGNAL, worker_failure);
return retval;
}
/* Record start time. */
clock_gettime(WORKER_CLOCK, &receive_started);
/* Receive loop. */
if (read_histogram) {
while (have < size) {
n = read(src, data + have, size - have);
if (n == -1)
return (void *)(intptr_t)errno;
if (n < (ssize_t)1) {
++read_histogram[0];
} else
if ((size_t)n < read_histogram_max) {
++read_histogram[n];
} else {
++read_histogram[read_histogram_max];
}
have = data_have_add(n);
}
} else {
while (have < size) {
n = read(src, data + have, size - have);
if (n == -1)
return (void *)(intptr_t)errno;
have = data_have_add(n);
}
}
/* Record stop time. */
clock_gettime(WORKER_CLOCK, &receive_finished);
/* Send end mark. */
do {
n = write(src, end, 1);
} while (n == -1 && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK));
if (n != 1) {
void *retval = (void *)(intptr_t)(n == -1 ? errno : EINVAL);
pthread_sigqueue(main_thread, WORKER_SIGNAL, worker_failure);
return retval;
}
/* Set completion signal. */
pthread_sigqueue(main_thread, WORKER_SIGNAL, worker_success);
return (void *)0;
}
static int parse_size(const char *s, size_t *to)
{
const char *end;
unsigned long uval, scale;
size_t val;
if (!s || !*s)
return errno = EINVAL;
scale = 1;
end = s;
errno = 0;
uval = strtoul(s, (char **)(&end), 0);
if (errno)
return errno;
if (!end || end == s)
return errno = EINVAL;
while (isspace((unsigned char)(*end)))
end++;
if (end[0] == 'k') {
scale = 1024;
end++;
} else
if (end[0] == 'M') {
scale = 1024 * 1024;
end++;
} else
if (end[0] == 'G') {
scale = 1024UL * 1024UL * 1024UL;
end++;
}
while (isspace((unsigned char)(*end)))
end++;
if (*end)
return errno = EINVAL;
val = uval * scale;
if ((unsigned long)(val / scale) != uval)
return errno = ENOMEM;
if (to)
*to = val;
return 0;
}
static uint64_t state = SEED;
static uint64_t left_state = 0;
static unsigned int left = 0;
static inline uint64_t next_state(void)
{
uint64_t x = state;
x ^= x >> 12;
x ^= x << 25;
x ^= x >> 27;
state = x;
return x * UINT64_C(2685821657736338717);
}
static inline unsigned char next_byte(void)
{
unsigned char result;
if (!left) {
left = 8;
left_state = next_state();
}
result = left_state;
left_state >>= 8;
left--;
return result;
}
void cleanup(void)
{
/* Unmap or free. */
if (mapping_len > 0) {
munmap(data, mapping_len);
} else {
free(data);
}
mapping_len = 0;
data = NULL;
/* Discard read histogram, if any. */
if (read_histogram) {
free(read_histogram);
read_histogram = NULL;
}
/* Close device. */
if (src != -1) {
/* But first, discard any data in kernel buffers. */
tcflush(src, TCIOFLUSH);
if (close(src) == -1) {
fprintf(stderr, "Error closing device: %s.\n", strerror(errno));
}
src = -1;
}
/* Unlink mapping, if any. */
if (mapping) {
if (unlink(mapping) == -1) {
fprintf(stderr, "Error removing mapping: %s.\n", strerror(errno));
}
mapping = NULL;
}
}
int main(int argc, char *argv[])
{
pthread_attr_t attrs;
siginfo_t info;
sigset_t signals;
void *retval;
int result, exit_status = EXIT_SUCCESS;
setlocale(LC_ALL, "");
if (argc < 3 || argc > 4) {
const char *argv0 = (argc > 0) ? argv[0] : "(this)";
printf("\n");
printf("Usage: %s [ -h | --help ]\n", argv0);
printf(" %s DEVICE SIZE [ MAPPING ]\n", argv0);
printf("\n");
printf("Where:\n");
printf(" DEVICE is the character device to talk to,\n");
printf(" SIZE is the number of bytes to receive,\n");
printf(" MAPPING is the name of the memory-mapped file\n");
printf(" to use, if storing data to a file.\n");
printf("\n");
printf("You can use suffix k for units of 1024 bytes,\n");
printf(" M for units of 1048576 bytes, and\n");
printf(" G for units of 1073741824 bytes.\n");
printf("\n");
printf("When running, you can send an USR1 or USR2 signal\n");
printf("to request progress updates.\n");
printf("\n");
if (argc == 1 || argc == 2)
return EXIT_SUCCESS;
else
return EXIT_FAILURE;
}
/* Parse size. */
if (parse_size(argv[2], &size) || size < 1) {
fprintf(stderr, "%s: Invalid size.\n", argv[2]);
return EXIT_FAILURE;
}
/* Open the device. */
src = open(argv[1], O_RDWR | O_CLOEXEC);
if (src == -1) {
fprintf(stderr, "Cannot open device %s: %s.\n", argv[1], strerror(errno));
return EXIT_FAILURE;
}
/* Set termios properties, if a termios-like device. */
if (isatty(src)) {
struct termios settings;
memset(&settings, 0, sizeof settings);
if (tcgetattr(src, &settings) == 0) {
settings.c_iflag &= ~(BRKINT | PARMRK | INPCK | ISTRIP | INLCR | IGNCR | ICRNL | IXON | IXOFF);
settings.c_iflag |= IGNBRK | IGNPAR;
settings.c_oflag &= ~(OPOST | ONLCR | ONOCR | ONLRET);
settings.c_cflag &= ~(CSIZE | PARENB | CLOCAL);
settings.c_cflag |= CS8 | CREAD | HUPCL;
settings.c_lflag &= ~(ISIG | ICANON | ECHO | ECHOE | ECHOK | ECHONL | TOSTOP | IEXTEN);
settings.c_cc[VMIN] = 0;
settings.c_cc[VTIME] = 10;
tcflush(src, TCIOFLUSH);
tcsetattr(src, TCSANOW, &settings);
tcflush(src, TCIOFLUSH);
fprintf(stderr, "Device %s prepared for communications.\n", argv[1]);
fflush(stderr);
}
}
/* Block signals we respond to. This will be inherited by the worker thread. */
sigemptyset(&signals);
sigaddset(&signals, SIGINT);
sigaddset(&signals, SIGHUP);
sigaddset(&signals, SIGTERM);
sigaddset(&signals, SIGUSR1);
sigaddset(&signals, SIGUSR2);
sigaddset(&signals, WORKER_SIGNAL);
if (sigprocmask(SIG_BLOCK, &signals, NULL) == -1) {
fprintf(stderr, "Cannot block signals: %s.\n", strerror(errno));
close(src);
return EXIT_FAILURE;
}
if (argc == 4) {
size_t page, tail;
int mapfd;
mapping = argv[3];
page = sysconf(_SC_PAGESIZE);
tail = size % page;
if (tail) {
mapping_len = size + page - tail;
} else {
mapping_len = size;
}
/* Create mapping file. */
mapfd = open(mapping, O_RDWR | O_CREAT | O_EXCL | O_CLOEXEC, MAPPING_MODE);
if (mapfd == -1) {
fprintf(stderr, "Cannot create mapping file %s: %s.\n", mapping, strerror(errno));
close(src);
return EXIT_FAILURE;
}
if (ftruncate(mapfd, (off_t)mapping_len) == -1) {
fprintf(stderr, "Cannot resize mapping file %s: %s.\n", mapping, strerror(errno));
close(src);
close(mapfd);
unlink(mapping);
return EXIT_FAILURE;
}
result = posix_fallocate(mapfd, (off_t)0, (off_t)mapping_len);
if (result) {
fprintf(stderr, "Cannot allocate enough storage space for mapping file %s: %s.\n", mapping, strerror(errno));
close(src);
close(mapfd);
unlink(mapping);
return EXIT_FAILURE;
}
data = mmap((void *)0, mapping_len, PROT_READ | PROT_WRITE, MAPPING_FLAGS, mapfd, (off_t)0);
if ((void *)data == MAP_FAILED) {
fprintf(stderr, "Cannot memory-map %s: %s.\n", mapping, strerror(errno));
close(src);
close(mapfd);
unlink(mapping);
return EXIT_FAILURE;
}
if (close(mapfd) == -1) {
fprintf(stderr, "Error closing %s: %s.\n", mapping, strerror(errno));
close(src);
close(mapfd);
munmap(data, mapping_len);
unlink(mapping);
return EXIT_FAILURE;
}
/* Clear the mapping to zeroes. */
fprintf(stderr, "Clearing %s .. ", mapping);
fflush(stderr);
memset(data, 0, mapping_len);
fprintf(stderr, "Done; mapping is okay.\n");
fflush(stderr);
} else {
data = malloc(size);
if (!data) {
fprintf(stderr, "Cannot allocate %zu bytes of memory.\n", size);
close(src);
return EXIT_FAILURE;
}
fprintf(stderr, "Clearing receive buffer .. ");
fflush(stderr);
memset(data, 0, size);
fprintf(stderr, "Done.\n");
fflush(stderr);
}
/* Allocate receive size histogram. */
read_histogram_max = READ_HISTOGRAM;
read_histogram = calloc(read_histogram_max + 1, sizeof read_histogram[0]);
if (!read_histogram) {
fprintf(stderr, "Cannot allocate memory for receive statistics.\n");
cleanup();
return EXIT_FAILURE;
}
/* Initialize shared variables. */
main_thread = pthread_self();
clock_gettime(WORKER_CLOCK, &receive_started);
/* Create the worker thread, using a smallish stack. */
pthread_attr_init(&attrs);
pthread_attr_setstacksize(&attrs, 2*PTHREAD_STACK_MIN);
result = pthread_create(&worker_thread, &attrs, worker, NULL);
if (result) {
pthread_attr_destroy(&attrs);
fprintf(stderr, "Cannot create receiver thread: %s.\n", strerror(result));
cleanup();
return EXIT_FAILURE;
}
/* Respond to signals. */
while (1) {
if (sigwaitinfo(&signals, &info) == -1) {
fprintf(stderr, "Error catching signals: %s.\n", strerror(errno));
pthread_cancel(worker_thread);
exit_status = EXIT_FAILURE;
break;
}
if (info.si_signo == SIGUSR1 || info.si_signo == SIGUSR2) {
struct timespec now;
double elapsed;
size_t bytes;
bytes = data_have();
clock_gettime(WORKER_CLOCK, &now);
elapsed = (double)(now.tv_sec - receive_started.tv_sec)
+ (double)(now.tv_nsec - receive_started.tv_nsec) / 1000000000.0;
printf("Received %zu bytes in %.3f seconds (%.0f bytes/second)\n",
bytes, elapsed, (double)bytes / elapsed);
fflush(stdout);
continue;
}
if (info.si_signo == SIGINT || info.si_signo == SIGHUP || info.si_signo == SIGTERM) {
const char *name = (info.si_signo == SIGINT) ? "INT" :
(info.si_signo == SIGHUP) ? "HUP" :
(info.si_signo == SIGTERM) ? "TERM" : "unknown";
fprintf(stderr, "Aborted by %s signal.\n", name);
fflush(stderr);
pthread_cancel(worker_thread);
break;
}
if (info.si_signo == WORKER_SIGNAL) {
/* Ignore signals not sent by our worker thread. */
if (info.si_pid != getpid() || info.si_code != SI_QUEUE)
continue;
/* Success? */
if (info.si_value.sival_ptr == worker_success.sival_ptr) {
double elapsed = (double)(receive_finished.tv_sec - receive_started.tv_sec)
+ (double)(receive_finished.tv_nsec - receive_started.tv_nsec) / 1000000000.0;
size_t bytes = data_have();
double rate = (double)bytes / elapsed;
printf("Received %zu bytes in %.3f second (", bytes, elapsed);
if (rate >= 1000000000.0)
printf("%.1f Gbytes/sec, ", rate/1073741824.0);
else if (rate >= 1000000.0)
printf("%.1f Mbytes/sec, ", rate/1048576.0);
else if (rate >= 1000.0)
printf("%.1f kbytes/sec, ", rate/1024.0);
else
printf("%.0f bytes/sec, ", rate);
rate *= 8.0;
if (rate >= 1000000000.0)
printf("%.1f Gbits/sec)\n", rate/1073741824.0);
if (rate >= 1000000.0)
printf("%.1f Mbits/sec)\n", rate/1048576.0);
else if (rate >= 1000.0)
printf("%.1f kbits/sec)\n", rate/1024.0);
else
printf("%.0f bits/sec)\n", rate);
fflush(stdout);
} else {
fprintf(stderr, "Receiver thread failed.\n");
fflush(stderr);
exit_status = EXIT_FAILURE;
}
break;
}
fprintf(stderr, "(Ignored signal %d)\n", info.si_signo);
fflush(stderr);
}
/* Reap worker thread. */
retval = NULL;
result = pthread_join(worker_thread, &retval);
if (result) {
fprintf(stderr, "Lost receiver thread: %s.\n", strerror(errno));
exit_status = EXIT_FAILURE;
} else
if ((intptr_t)retval == 0) {
printf("Received thread completed successfully.\n");
} else {
fprintf(stderr, "Receiver thread failed: %s.\n", strerror((intptr_t)retval));
exit_status = EXIT_FAILURE;
}
if (read_histogram) {
size_t i, n = 0;
for (i = 1; i <= read_histogram_max; i++)
n += read_histogram[i];
printf("# Receive statistics (%zu successful reads, %zu additional attempts):\n",
n, read_histogram[0]);
printf("# Size Number of read()s\n");
for (i = 1; i < read_histogram_max; i++) {
if (read_histogram[i] > 0) {
printf("%8zu %zu\n", i, read_histogram[i]);
}
}
if (read_histogram[read_histogram_max] > 0) {
printf("%8zu+ %zu\n", read_histogram_max, read_histogram[read_histogram_max]);
}
}
if (data_have() == size) {
size_t i;
printf("Verifying data .. ");
fflush(stdout);
for (i = 0; i < size; i++)
if (data[i] != next_byte())
break;
if (i >= size) {
printf("No errors.\n");
} else
if (i > 0) {
printf("Only first %zu bytes match.\n", i);
} else {
printf("All of the data is wrong.\n");
}
}
cleanup();
return exit_status;
}