diff options
author | Sergey Poznyakoff <gray@gnu.org> | 2021-05-09 14:31:51 +0300 |
---|---|---|
committer | Sergey Poznyakoff <gray@gnu.org> | 2021-05-09 14:39:41 +0300 |
commit | 31072e99328bdc1d9a5b3888e090335ca24e6849 (patch) | |
tree | 6cb7d1ec0d83942128bcc45c35e785339f18c473 | |
parent | 8b7ecc7a66b930ea3f78d10043364022d861faa1 (diff) | |
download | rpipe-31072e99328bdc1d9a5b3888e090335ca24e6849.tar.gz rpipe-31072e99328bdc1d9a5b3888e090335ca24e6849.tar.bz2 |
client: make input processing independent on input timing
* src/client.c (waitreply): Rewrite the state machine. Split the NET
"client" into NETIN and NETOUT parts.
* t/chargen.c: New option -t introduces output delay points.
* t/delay.at: New test.
* t/pipein.at: Additional tests to check input from pipes.
* t/pipeout.at: Likewise.
* t/status.at: Don't use t_sink if no input is provided. Add more tests.
* t/Makefile.am: Add new test.
* t/testsuite.at: Include new test.
-rw-r--r-- | src/client.c | 171 | ||||
-rw-r--r-- | t/Makefile.am | 1 | ||||
-rw-r--r-- | t/chargen.c | 165 | ||||
-rw-r--r-- | t/delay.at | 27 | ||||
-rw-r--r-- | t/pipein.at | 16 | ||||
-rw-r--r-- | t/pipeout.at | 25 | ||||
-rw-r--r-- | t/status.at | 15 | ||||
-rw-r--r-- | t/testsuite.at | 1 |
8 files changed, 308 insertions, 113 deletions
diff --git a/src/client.c b/src/client.c index e95979c..a6fdd9b 100644 --- a/src/client.c +++ b/src/client.c @@ -38,11 +38,6 @@ static int client_sigv[] = { SIGPIPE }; static int client_sigc = sizeof(client_sigv)/sizeof(client_sigv[0]); - -struct rpipe_client { - struct iobuf ibuf; /* Input buffer */ - struct iobuf obuf; /* Output buffer */ -}; static int valid_pollfd(struct pollfd *pf, int n) @@ -55,32 +50,44 @@ valid_pollfd(struct pollfd *pf, int n) return 0; } +enum { + NETIN, + NETOUT, + STDIN, + STDOUT, + NFD +}; + static void waitreply(int fd) { - enum { - NET, - IN, - OUT, - NFD - }; struct pollfd fds[NFD]; - struct rpipe_client client[NFD]; + struct iobuf buf[NFD]; struct crlfdot_decoder_state state = CRLFDOT_DECODER_STATE_INIT; + memset(&fds, 0, sizeof(fds)); - fds[NET].fd = fd; - fds[NET].events = POLLIN; + fds[NETIN].fd = fd; + fds[NETIN].events = POLLIN; - fds[IN].fd = 0; - fds[IN].events = POLLIN; + fds[NETOUT].fd = fd; + fds[NETOUT].events = 0; + + fds[STDIN].fd = 0; + fds[STDIN].events = POLLIN; - fds[OUT].fd = 1; - fds[OUT].events = POLLOUT; + fds[STDOUT].fd = 1; + fds[STDOUT].events = 0; - memset(&client, 0, sizeof(client)); + memset(&buf, 0, sizeof(buf)); while (valid_pollfd(fds, NFD)) { + /* + * Save the currently requested STDIN events. They could + * be modified prior to their actual use. + */ + int stdin_evt = fds[STDIN].events; + ssize_t n = poll(fds, NFD, -1); if (n == -1) { if (errno != EINTR) @@ -88,59 +95,98 @@ waitreply(int fd) continue; } - if (fds[NET].revents & POLLIN) { - n = iobuf_fill(&client[NET].ibuf, fds[NET].fd); + /* Check if there is input from the net */ + if (fds[NETIN].revents & POLLIN) { + n = iobuf_fill(&buf[NETIN], fds[NETIN].fd); if (n == -1) { error(EX_OSERR, errno, "net read"); } else if (n == 0) { - fds[NET].events = 0; + shutdown(fds[NETIN].fd, SHUT_RD); + fds[NETIN].fd = -fds[NETIN].fd; } else { - fds[OUT].events |= POLLOUT; - if (iobuf_avail_size(&client[NET].ibuf) == 0) - fds[NET].events &= ~POLLIN; + fds[STDOUT].events |= POLLOUT; + if (iobuf_avail_size(&buf[NETIN]) == 0) + fds[NETIN].events &= ~POLLIN; } } - if (fds[NET].revents & POLLOUT) { - iobuf_transfer(&client[NET].obuf, &client[IN].ibuf); - n = iobuf_flush(&client[NET].obuf, fds[NET].fd); + /* + * Check whether the network can be written to. This is + * where the requested STDIN events can be modified (see + * the comment before stdin_evt above. + */ + if (fds[NETOUT].revents & POLLOUT) { + iobuf_transfer(&buf[NETOUT], &buf[STDIN]); + if (iobuf_avail_size(&buf[STDIN])) + fds[STDIN].events |= POLLIN; + + n = iobuf_flush(&buf[NETOUT], fds[NETOUT].fd); if (n == -1) { error(EX_OSERR, errno, "net write"); } else if (n == 0) { error(EX_OSERR, ENOSPC, "net write"); } - if (iobuf_data_size(&client[NET].obuf) == 0) { - fds[NET].events &= ~POLLOUT; - if (fds[IN].events == 0 && - iobuf_data_size(&client[IN].ibuf) == 0) { - shutdown(fds[NET].fd, SHUT_WR); - } + if (iobuf_data_size(&buf[NETOUT]) == 0) { + fds[NETOUT].events &= ~POLLOUT; } } - if ((fds[IN].revents & POLLIN) == 0 || - (n = iobuf_fill(&client[IN].ibuf, fds[IN].fd)) == 0) { - if (iobuf_data_size(&client[NET].obuf) == 0 && - iobuf_data_size(&client[IN].ibuf) == 0) { - shutdown(fds[NET].fd, SHUT_WR); - fds[NET].events &= ~POLLOUT; + /* + * Check whether there are some data waiting on the input, + * or the input is exhausted. The latter is tricky because + * both POLLIN and POLLHUP can be set simultaneously. On + * the other hand, if POLLHUP alone is set, the input is + * actually exhausted only if the POLLIN bit was requested + * when polling. Its actual state in the fds[STDIN].events + * field can't be relied upon, because it could have been + * changed when analyzing NETOUT state above. Hence the need + * for the additional variable stdin_evt. + */ + if (fds[STDIN].revents & POLLIN) { + n = iobuf_fill(&buf[STDIN], fds[STDIN].fd); + if (n == -1) { + error(EX_OSERR, errno, "stdin read"); + } else if (n == 0) { + shutdown(fds[NETOUT].fd, SHUT_WR); + fds[NETOUT].fd = -fds[NETOUT].fd; + fds[STDIN].events = 0; + } else { + fds[NETOUT].events |= POLLOUT; + if (iobuf_avail_size(&buf[STDIN]) == 0) + fds[STDIN].events &= ~POLLIN; } - fds[IN].events = 0; - } else if (n == -1) { - error(EX_OSERR, errno, "stdin read"); - } else { - fds[NET].events |= POLLOUT; + } else if (fds[STDIN].revents == POLLHUP && (stdin_evt & POLLIN)) { + /* This indicates EOF on input */ + fds[STDIN].fd = -1; + } + + /* + * Shutdown the write end of the net socket, if EOF was seen + * on input. This will indicate EOF for the remote party. + * However, if at least one of NETOUT or STDIN buffers contains + * some data, it would be premature to do so. In this case + * wait until the data have been transferred to the remote end. + */ + if (fds[STDIN].fd == -1 && fds[NETOUT].fd >= 0 && + iobuf_data_size(&buf[NETOUT]) == 0 && + iobuf_data_size(&buf[STDIN]) == 0) { + shutdown(fds[NETOUT].fd, SHUT_WR); + fds[NETOUT].fd = -fds[NETOUT].fd; } - if (fds[OUT].revents & POLLOUT) { - crlfdot_decoder(&state, &client[OUT].obuf, - &client[NET].ibuf); - if (iobuf_avail_size(&client[NET].ibuf)) - fds[NET].events |= POLLIN; - if (iobuf_data_size(&client[OUT].obuf) == 0) - fds[OUT].events &= ~POLLOUT; + /* + * Output the data obtained from the network. Shut down + * both ends of the network socket if we have received the + * CR LF '.' CR LF marker. This indicates end of the loop. + */ + if (fds[STDOUT].revents & POLLOUT) { + crlfdot_decoder(&state, &buf[STDOUT], &buf[NETIN]); + if (iobuf_avail_size(&buf[NETIN])) + fds[NETIN].events |= POLLIN; + if (iobuf_data_size(&buf[STDOUT]) == 0) + fds[STDOUT].events &= ~POLLOUT; else { - n = iobuf_flush(&client[OUT].obuf, fds[OUT].fd); + n = iobuf_flush(&buf[STDOUT], fds[STDOUT].fd); if (n == -1) { error(EX_OSERR, errno, "stdout write"); } else if (n == 0) { @@ -149,24 +195,35 @@ waitreply(int fd) } if (state.state == -1) { /* Final state reached. */ - fds[IN].events = 0; + fds[STDIN].events = 0; + if (fds[NETOUT].fd >= 0) { + shutdown(fds[NETOUT].fd, SHUT_WR); + fds[NETOUT].fd = -fds[NETOUT].fd; + } + if (fds[NETIN].fd >= 0) { + shutdown(fds[NETIN].fd, SHUT_RD); + fds[NETIN].fd = -fds[NETIN].fd; + } } } } - if (state.state == -1 && iobuf_data_size(&client[NET].ibuf)) { + close(fd); + + /* Read in and analyze exit status of the remote program. */ + if (state.state == -1 && iobuf_data_size(&buf[NETIN])) { char *p; unsigned long u = 0; static char dig[] = "0123456789"; int c; int status; int ok = 0; - while ((c = iobuf_getc(&client[NET].ibuf)) != -1 + while ((c = iobuf_getc(&buf[NETIN])) != -1 && (p = strchr(dig, c)) != NULL) { u = u * 10 + p - dig; ok = 1; } - if (!(c == '\r' && iobuf_getc(&client[NET].ibuf) == '\n')) + if (!(c == '\r' && iobuf_getc(&buf[NETIN]) == '\n')) error(0, 0, "remote protocol error: CRLF missing"); if (!ok || u > INT_MAX) error(EX_UNAVAILABLE, 0, diff --git a/t/Makefile.am b/t/Makefile.am index c8e696d..620fa29 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -29,6 +29,7 @@ $(srcdir)/package.m4: $(top_srcdir)/configure.ac ## ------------ ## TESTSUITE_AT = \ + delay.at\ pipein.at\ pipeout.at\ status.at\ diff --git a/t/chargen.c b/t/chargen.c index ec97cfd..6f77915 100644 --- a/t/chargen.c +++ b/t/chargen.c @@ -3,7 +3,7 @@ chargen - generate a stream of characters SYNOPSIS - chargen [-c C] [-l LEN] [-p] [-s N] + chargen [-c C] [-l LEN] [-p] [-s N] [-t MAP] DESCRIPTION Produces on standard output a stream of characters. The stream consists @@ -20,6 +20,16 @@ -s N Start from character with ordinal number N + -t MAP + Introduces delays at arbitrary points of output. MAP is a + sequence of N=T pairs, separated by colons. In each pair, + N is the offset and T is the delay. The effect of each pair + is to sleep for T microseconds before outputting Nth character. + T can be optionally folowed by 's', in which case it is measured + in seconds. + + At most 128 pairs can be supplied. + LICENSE Copyright (C) 2020-2021 Sergey Poznyakoff @@ -47,50 +57,117 @@ int main(int argc, char **argv) { - int c; - int start = 0; - int mod = UCHAR_MAX; - size_t i, len = 4096; - int printable = 0; - char *p; - - while ((c = getopt(argc, argv, "cl:ps:")) != EOF) { - switch (c) { - case 'c': - start = optarg[0]; - break; - case 'p': - printable = 1; - break; - case 's': - start = atoi(optarg) % UCHAR_MAX; - break; - case 'l': - errno = 0; - len = strtoul(optarg, &p, 10); - if (*p) { - fprintf(stderr, "bad length (near %s)", p); - exit(1); - } else if (errno) { - perror("bad length"); - exit(1); - } - break; - default: - exit(2); + int c; + int start = 0; + int mod = UCHAR_MAX; + size_t i, len = 4096; + int printable = 0; + char *p; + + struct timeout_map { + size_t off; + unsigned long delay; + }; +#define TMAX 128 + static struct timeout_map tmap[TMAX]; + static int tmax = 0; + int tind = 0; +#define USEC_IN_SEC (1e6) + + while ((c = getopt(argc, argv, "cl:ps:t:")) != EOF) { + switch (c) { + case 'c': + start = optarg[0]; + break; + case 'p': + printable = 1; + break; + case 's': + start = atoi(optarg) % UCHAR_MAX; + break; + case 'l': + errno = 0; + len = strtoul(optarg, &p, 10); + if (*p) { + fprintf(stderr, "bad length (near %s)\n", p); + exit(1); + } else if (errno) { + perror("bad length"); + exit(1); + } + break; + case 't': + while (*optarg) { + char *p; + unsigned long n; + + if (tmax == TMAX) { + fprintf(stderr, "timeout map overflow\n"); + exit(1); + } + + n = strtoul(optarg, &p, 10); + + if (n == 0 && errno == ERANGE) { + fprintf(stderr, "invalid offset (near %s)\n", + optarg); + exit(1); + } + tmap[tmax].off = n; + + optarg = p; + if (*optarg == 0) + break; + if (*optarg != '=') { + fprintf(stderr, "expected '=' (near %s)\n", + optarg); + exit(1); + } + optarg++; + + n = strtoul(optarg, &p, 10); + if (n == 0 && errno == ERANGE) { + fprintf(stderr, "invalid delay (near %s)\n", + optarg); + exit(1); + } + if (*p == 's') { + if (UINT_MAX / USEC_IN_SEC < n) { + fprintf(stderr, "delay too big (near %s)\n", + optarg); + exit(1); + } + n *= USEC_IN_SEC; + p++; + } + tmap[tmax].delay = n; + tmax++; + + optarg = p; + if (*optarg == ':') + optarg++; + } + break; + + default: + exit(2); + } } - } - if (printable && !isprint(start)) - do { - start = (start + 1) % mod; - } while (printable && !isprint(start)); + if (printable && !isprint(start)) + do { + start = (start + 1) % mod; + } while (printable && !isprint(start)); - for (i = 0; i < len; i++) { - putchar(start); - do { - start = (start + 1) % mod; - } while (printable && !isprint(start)); - } - return 0; + for (i = 0; i < len; i++) { + if (tind < tmax && tmap[tind].off == i) { + usleep(tmap[tind].delay); + tind++; + } + putchar(start); + do { + start = (start + 1) % mod; + } while (printable && !isprint(start)); + } + return 0; } diff --git a/t/delay.at b/t/delay.at new file mode 100644 index 0000000..713459a --- /dev/null +++ b/t/delay.at @@ -0,0 +1,27 @@ +# This file is part of rpipe testsuite -*- autotest -*- +# Copyright (C) 2019-2021 Sergey Poznyakoff +# License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html> +# This is free software: you are free to change and redistribute it. +# There is NO WARRANTY, to the extent permitted by law. +AT_SETUP([delayed input]) +AT_KEYWORDS([data receive delay]) + +AT_CHECK([chargen -t 0=1s:10=500000:15=1s -l 16 | tee expout | \ + rpipe -T $TESTDIR/t_store -- 0 +status=$? +cat store +exit $status +], +[0], +[expout]) + +AT_CHECK([chargen -t 0=1s:10=500000:8000=1s:15999=1s -l 16000 | tee expout | \ + rpipe -T $TESTDIR/t_store -- 0 +status=$? +cat store +exit $status +], +[0], +[expout]) + +AT_CLEANUP diff --git a/t/pipein.at b/t/pipein.at index 975b614..62ee0ff 100644 --- a/t/pipein.at +++ b/t/pipein.at @@ -5,7 +5,7 @@ # There is NO WARRANTY, to the extent permitted by law. AT_SETUP([data sent]) AT_KEYWORDS([data send]) -AT_CHECK([chargen -p 16 > input +AT_CHECK([chargen -p -l 16 > input cp input expout echo "" >> expout rpipe -T $TESTDIR/t_echo -- 0 < input @@ -13,7 +13,13 @@ rpipe -T $TESTDIR/t_echo -- 0 < input [0], [expout]) -AT_CHECK([chargen -p 16000 > input +AT_CHECK([ +cat input | rpipe -T $TESTDIR/t_echo -- 0 +], +[0], +[expout]) + +AT_CHECK([chargen -p -l 16000 > input cp input expout echo "" >> expout rpipe -T $TESTDIR/t_echo -- 0 < input @@ -21,10 +27,8 @@ rpipe -T $TESTDIR/t_echo -- 0 < input [0], [expout]) -AT_CHECK([chargen -p 16000 > input -cp input expout -echo "" >> expout -rpipe -T $TESTDIR/t_echo -- 10 < input +AT_CHECK([ +cat input | rpipe -T $TESTDIR/t_echo -- 10 ], [10], [expout]) diff --git a/t/pipeout.at b/t/pipeout.at index d34b77a..79add31 100644 --- a/t/pipeout.at +++ b/t/pipeout.at @@ -5,7 +5,8 @@ # There is NO WARRANTY, to the extent permitted by law. AT_SETUP([data received]) AT_KEYWORDS([data receive]) -AT_CHECK([chargen 16 > expout + +AT_CHECK([chargen -l 16 > expout rpipe -T $TESTDIR/t_store -- 0 < expout status=$? cat store @@ -13,7 +14,17 @@ exit $status ], [0], [expout]) -AT_CHECK([chargen 16000 > expout + +AT_CHECK([ +cat expout | rpipe -T $TESTDIR/t_store -- 0 +status=$? +cat store +exit $status +], +[0], +[expout]) + +AT_CHECK([chargen -l 16000 > expout rpipe -T $TESTDIR/t_store -- 0 < expout status=$? cat store @@ -21,4 +32,14 @@ exit $status ], [0], [expout]) + +AT_CHECK([ +cat expout | rpipe -T $TESTDIR/t_store -- 0 +status=$? +cat store +exit $status +], +[0], +[expout]) + AT_CLEANUP diff --git a/t/status.at b/t/status.at index 6c1ab83..12f27cf 100644 --- a/t/status.at +++ b/t/status.at @@ -5,8 +5,15 @@ # There is NO WARRANTY, to the extent permitted by law. AT_SETUP([status propagation]) AT_KEYWORDS([status]) -AT_CHECK([rpipe -T $TESTDIR/t_sink -- 0],[0]) -AT_CHECK([rpipe -T $TESTDIR/t_sink -- 1],[1]) -AT_CHECK([chargen 2048 | rpipe -T $TESTDIR/t_sink -- 0],[0]) -AT_CHECK([chargen 2048 | rpipe -T $TESTDIR/t_sink -- 1],[1]) +AT_CHECK([rpipe -T /bin/sh -c 'exit 0' --],[0]) +AT_CHECK([rpipe -T /bin/sh -c 'exit 1' --],[1]) + +AT_CHECK([rpipe -T $TESTDIR/t_sink -- 0 < /dev/null],[0]) +AT_CHECK([rpipe -T $TESTDIR/t_sink -- 1 < /dev/null],[1]) + +AT_CHECK([cat /dev/null | rpipe -T $TESTDIR/t_sink -- 0],[0]) +AT_CHECK([cat /dev/null | rpipe -T $TESTDIR/t_sink -- 1],[1]) + +AT_CHECK([chargen -l 2048 | rpipe -T $TESTDIR/t_sink -- 0],[0]) +AT_CHECK([chargen -l 2048 | rpipe -T $TESTDIR/t_sink -- 1],[1]) AT_CLEANUP diff --git a/t/testsuite.at b/t/testsuite.at index 630ddb0..cf4124e 100644 --- a/t/testsuite.at +++ b/t/testsuite.at @@ -10,4 +10,5 @@ AT_INIT AT_TESTED([rpipe]) m4_include([status.at]) m4_include([pipeout.at]) +m4_include([delay.at]) m4_include([pipein.at]) |