aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergey Poznyakoff <gray@gnu.org>2021-05-09 14:31:51 +0300
committerSergey Poznyakoff <gray@gnu.org>2021-05-09 14:39:41 +0300
commit31072e99328bdc1d9a5b3888e090335ca24e6849 (patch)
tree6cb7d1ec0d83942128bcc45c35e785339f18c473
parent8b7ecc7a66b930ea3f78d10043364022d861faa1 (diff)
downloadrpipe-31072e99328bdc1d9a5b3888e090335ca24e6849.tar.gz
rpipe-31072e99328bdc1d9a5b3888e090335ca24e6849.tar.bz2
client: make input processing independent on input timing
* src/client.c (waitreply): Rewrite the state machine. Split the NET "client" into NETIN and NETOUT parts. * t/chargen.c: New option -t introduces output delay points. * t/delay.at: New test. * t/pipein.at: Additional tests to check input from pipes. * t/pipeout.at: Likewise. * t/status.at: Don't use t_sink if no input is provided. Add more tests. * t/Makefile.am: Add new test. * t/testsuite.at: Include new test.
-rw-r--r--src/client.c171
-rw-r--r--t/Makefile.am1
-rw-r--r--t/chargen.c165
-rw-r--r--t/delay.at27
-rw-r--r--t/pipein.at16
-rw-r--r--t/pipeout.at25
-rw-r--r--t/status.at15
-rw-r--r--t/testsuite.at1
8 files changed, 308 insertions, 113 deletions
diff --git a/src/client.c b/src/client.c
index e95979c..a6fdd9b 100644
--- a/src/client.c
+++ b/src/client.c
@@ -38,11 +38,6 @@ static int client_sigv[] = {
SIGPIPE
};
static int client_sigc = sizeof(client_sigv)/sizeof(client_sigv[0]);
-
-struct rpipe_client {
- struct iobuf ibuf; /* Input buffer */
- struct iobuf obuf; /* Output buffer */
-};
static int
valid_pollfd(struct pollfd *pf, int n)
@@ -55,32 +50,44 @@ valid_pollfd(struct pollfd *pf, int n)
return 0;
}
+enum {
+ NETIN,
+ NETOUT,
+ STDIN,
+ STDOUT,
+ NFD
+};
+
static void
waitreply(int fd)
{
- enum {
- NET,
- IN,
- OUT,
- NFD
- };
struct pollfd fds[NFD];
- struct rpipe_client client[NFD];
+ struct iobuf buf[NFD];
struct crlfdot_decoder_state state = CRLFDOT_DECODER_STATE_INIT;
+
memset(&fds, 0, sizeof(fds));
- fds[NET].fd = fd;
- fds[NET].events = POLLIN;
+ fds[NETIN].fd = fd;
+ fds[NETIN].events = POLLIN;
- fds[IN].fd = 0;
- fds[IN].events = POLLIN;
+ fds[NETOUT].fd = fd;
+ fds[NETOUT].events = 0;
+
+ fds[STDIN].fd = 0;
+ fds[STDIN].events = POLLIN;
- fds[OUT].fd = 1;
- fds[OUT].events = POLLOUT;
+ fds[STDOUT].fd = 1;
+ fds[STDOUT].events = 0;
- memset(&client, 0, sizeof(client));
+ memset(&buf, 0, sizeof(buf));
while (valid_pollfd(fds, NFD)) {
+ /*
+ * Save the currently requested STDIN events. They could
+ * be modified prior to their actual use.
+ */
+ int stdin_evt = fds[STDIN].events;
+
ssize_t n = poll(fds, NFD, -1);
if (n == -1) {
if (errno != EINTR)
@@ -88,59 +95,98 @@ waitreply(int fd)
continue;
}
- if (fds[NET].revents & POLLIN) {
- n = iobuf_fill(&client[NET].ibuf, fds[NET].fd);
+ /* Check if there is input from the net */
+ if (fds[NETIN].revents & POLLIN) {
+ n = iobuf_fill(&buf[NETIN], fds[NETIN].fd);
if (n == -1) {
error(EX_OSERR, errno, "net read");
} else if (n == 0) {
- fds[NET].events = 0;
+ shutdown(fds[NETIN].fd, SHUT_RD);
+ fds[NETIN].fd = -fds[NETIN].fd;
} else {
- fds[OUT].events |= POLLOUT;
- if (iobuf_avail_size(&client[NET].ibuf) == 0)
- fds[NET].events &= ~POLLIN;
+ fds[STDOUT].events |= POLLOUT;
+ if (iobuf_avail_size(&buf[NETIN]) == 0)
+ fds[NETIN].events &= ~POLLIN;
}
}
- if (fds[NET].revents & POLLOUT) {
- iobuf_transfer(&client[NET].obuf, &client[IN].ibuf);
- n = iobuf_flush(&client[NET].obuf, fds[NET].fd);
+ /*
+ * Check whether the network can be written to. This is
+ * where the requested STDIN events can be modified (see
+ * the comment before stdin_evt above.
+ */
+ if (fds[NETOUT].revents & POLLOUT) {
+ iobuf_transfer(&buf[NETOUT], &buf[STDIN]);
+ if (iobuf_avail_size(&buf[STDIN]))
+ fds[STDIN].events |= POLLIN;
+
+ n = iobuf_flush(&buf[NETOUT], fds[NETOUT].fd);
if (n == -1) {
error(EX_OSERR, errno, "net write");
} else if (n == 0) {
error(EX_OSERR, ENOSPC, "net write");
}
- if (iobuf_data_size(&client[NET].obuf) == 0) {
- fds[NET].events &= ~POLLOUT;
- if (fds[IN].events == 0 &&
- iobuf_data_size(&client[IN].ibuf) == 0) {
- shutdown(fds[NET].fd, SHUT_WR);
- }
+ if (iobuf_data_size(&buf[NETOUT]) == 0) {
+ fds[NETOUT].events &= ~POLLOUT;
}
}
- if ((fds[IN].revents & POLLIN) == 0 ||
- (n = iobuf_fill(&client[IN].ibuf, fds[IN].fd)) == 0) {
- if (iobuf_data_size(&client[NET].obuf) == 0 &&
- iobuf_data_size(&client[IN].ibuf) == 0) {
- shutdown(fds[NET].fd, SHUT_WR);
- fds[NET].events &= ~POLLOUT;
+ /*
+ * Check whether there are some data waiting on the input,
+ * or the input is exhausted. The latter is tricky because
+ * both POLLIN and POLLHUP can be set simultaneously. On
+ * the other hand, if POLLHUP alone is set, the input is
+ * actually exhausted only if the POLLIN bit was requested
+ * when polling. Its actual state in the fds[STDIN].events
+ * field can't be relied upon, because it could have been
+ * changed when analyzing NETOUT state above. Hence the need
+ * for the additional variable stdin_evt.
+ */
+ if (fds[STDIN].revents & POLLIN) {
+ n = iobuf_fill(&buf[STDIN], fds[STDIN].fd);
+ if (n == -1) {
+ error(EX_OSERR, errno, "stdin read");
+ } else if (n == 0) {
+ shutdown(fds[NETOUT].fd, SHUT_WR);
+ fds[NETOUT].fd = -fds[NETOUT].fd;
+ fds[STDIN].events = 0;
+ } else {
+ fds[NETOUT].events |= POLLOUT;
+ if (iobuf_avail_size(&buf[STDIN]) == 0)
+ fds[STDIN].events &= ~POLLIN;
}
- fds[IN].events = 0;
- } else if (n == -1) {
- error(EX_OSERR, errno, "stdin read");
- } else {
- fds[NET].events |= POLLOUT;
+ } else if (fds[STDIN].revents == POLLHUP && (stdin_evt & POLLIN)) {
+ /* This indicates EOF on input */
+ fds[STDIN].fd = -1;
+ }
+
+ /*
+ * Shutdown the write end of the net socket, if EOF was seen
+ * on input. This will indicate EOF for the remote party.
+ * However, if at least one of NETOUT or STDIN buffers contains
+ * some data, it would be premature to do so. In this case
+ * wait until the data have been transferred to the remote end.
+ */
+ if (fds[STDIN].fd == -1 && fds[NETOUT].fd >= 0 &&
+ iobuf_data_size(&buf[NETOUT]) == 0 &&
+ iobuf_data_size(&buf[STDIN]) == 0) {
+ shutdown(fds[NETOUT].fd, SHUT_WR);
+ fds[NETOUT].fd = -fds[NETOUT].fd;
}
- if (fds[OUT].revents & POLLOUT) {
- crlfdot_decoder(&state, &client[OUT].obuf,
- &client[NET].ibuf);
- if (iobuf_avail_size(&client[NET].ibuf))
- fds[NET].events |= POLLIN;
- if (iobuf_data_size(&client[OUT].obuf) == 0)
- fds[OUT].events &= ~POLLOUT;
+ /*
+ * Output the data obtained from the network. Shut down
+ * both ends of the network socket if we have received the
+ * CR LF '.' CR LF marker. This indicates end of the loop.
+ */
+ if (fds[STDOUT].revents & POLLOUT) {
+ crlfdot_decoder(&state, &buf[STDOUT], &buf[NETIN]);
+ if (iobuf_avail_size(&buf[NETIN]))
+ fds[NETIN].events |= POLLIN;
+ if (iobuf_data_size(&buf[STDOUT]) == 0)
+ fds[STDOUT].events &= ~POLLOUT;
else {
- n = iobuf_flush(&client[OUT].obuf, fds[OUT].fd);
+ n = iobuf_flush(&buf[STDOUT], fds[STDOUT].fd);
if (n == -1) {
error(EX_OSERR, errno, "stdout write");
} else if (n == 0) {
@@ -149,24 +195,35 @@ waitreply(int fd)
}
if (state.state == -1) {
/* Final state reached. */
- fds[IN].events = 0;
+ fds[STDIN].events = 0;
+ if (fds[NETOUT].fd >= 0) {
+ shutdown(fds[NETOUT].fd, SHUT_WR);
+ fds[NETOUT].fd = -fds[NETOUT].fd;
+ }
+ if (fds[NETIN].fd >= 0) {
+ shutdown(fds[NETIN].fd, SHUT_RD);
+ fds[NETIN].fd = -fds[NETIN].fd;
+ }
}
}
}
- if (state.state == -1 && iobuf_data_size(&client[NET].ibuf)) {
+ close(fd);
+
+ /* Read in and analyze exit status of the remote program. */
+ if (state.state == -1 && iobuf_data_size(&buf[NETIN])) {
char *p;
unsigned long u = 0;
static char dig[] = "0123456789";
int c;
int status;
int ok = 0;
- while ((c = iobuf_getc(&client[NET].ibuf)) != -1
+ while ((c = iobuf_getc(&buf[NETIN])) != -1
&& (p = strchr(dig, c)) != NULL) {
u = u * 10 + p - dig;
ok = 1;
}
- if (!(c == '\r' && iobuf_getc(&client[NET].ibuf) == '\n'))
+ if (!(c == '\r' && iobuf_getc(&buf[NETIN]) == '\n'))
error(0, 0, "remote protocol error: CRLF missing");
if (!ok || u > INT_MAX)
error(EX_UNAVAILABLE, 0,
diff --git a/t/Makefile.am b/t/Makefile.am
index c8e696d..620fa29 100644
--- a/t/Makefile.am
+++ b/t/Makefile.am
@@ -29,6 +29,7 @@ $(srcdir)/package.m4: $(top_srcdir)/configure.ac
## ------------ ##
TESTSUITE_AT = \
+ delay.at\
pipein.at\
pipeout.at\
status.at\
diff --git a/t/chargen.c b/t/chargen.c
index ec97cfd..6f77915 100644
--- a/t/chargen.c
+++ b/t/chargen.c
@@ -3,7 +3,7 @@
chargen - generate a stream of characters
SYNOPSIS
- chargen [-c C] [-l LEN] [-p] [-s N]
+ chargen [-c C] [-l LEN] [-p] [-s N] [-t MAP]
DESCRIPTION
Produces on standard output a stream of characters. The stream consists
@@ -20,6 +20,16 @@
-s N Start from character with ordinal number N
+ -t MAP
+ Introduces delays at arbitrary points of output. MAP is a
+ sequence of N=T pairs, separated by colons. In each pair,
+ N is the offset and T is the delay. The effect of each pair
+ is to sleep for T microseconds before outputting Nth character.
+ T can be optionally folowed by 's', in which case it is measured
+ in seconds.
+
+ At most 128 pairs can be supplied.
+
LICENSE
Copyright (C) 2020-2021 Sergey Poznyakoff
@@ -47,50 +57,117 @@
int
main(int argc, char **argv)
{
- int c;
- int start = 0;
- int mod = UCHAR_MAX;
- size_t i, len = 4096;
- int printable = 0;
- char *p;
-
- while ((c = getopt(argc, argv, "cl:ps:")) != EOF) {
- switch (c) {
- case 'c':
- start = optarg[0];
- break;
- case 'p':
- printable = 1;
- break;
- case 's':
- start = atoi(optarg) % UCHAR_MAX;
- break;
- case 'l':
- errno = 0;
- len = strtoul(optarg, &p, 10);
- if (*p) {
- fprintf(stderr, "bad length (near %s)", p);
- exit(1);
- } else if (errno) {
- perror("bad length");
- exit(1);
- }
- break;
- default:
- exit(2);
+ int c;
+ int start = 0;
+ int mod = UCHAR_MAX;
+ size_t i, len = 4096;
+ int printable = 0;
+ char *p;
+
+ struct timeout_map {
+ size_t off;
+ unsigned long delay;
+ };
+#define TMAX 128
+ static struct timeout_map tmap[TMAX];
+ static int tmax = 0;
+ int tind = 0;
+#define USEC_IN_SEC (1e6)
+
+ while ((c = getopt(argc, argv, "cl:ps:t:")) != EOF) {
+ switch (c) {
+ case 'c':
+ start = optarg[0];
+ break;
+ case 'p':
+ printable = 1;
+ break;
+ case 's':
+ start = atoi(optarg) % UCHAR_MAX;
+ break;
+ case 'l':
+ errno = 0;
+ len = strtoul(optarg, &p, 10);
+ if (*p) {
+ fprintf(stderr, "bad length (near %s)\n", p);
+ exit(1);
+ } else if (errno) {
+ perror("bad length");
+ exit(1);
+ }
+ break;
+ case 't':
+ while (*optarg) {
+ char *p;
+ unsigned long n;
+
+ if (tmax == TMAX) {
+ fprintf(stderr, "timeout map overflow\n");
+ exit(1);
+ }
+
+ n = strtoul(optarg, &p, 10);
+
+ if (n == 0 && errno == ERANGE) {
+ fprintf(stderr, "invalid offset (near %s)\n",
+ optarg);
+ exit(1);
+ }
+ tmap[tmax].off = n;
+
+ optarg = p;
+ if (*optarg == 0)
+ break;
+ if (*optarg != '=') {
+ fprintf(stderr, "expected '=' (near %s)\n",
+ optarg);
+ exit(1);
+ }
+ optarg++;
+
+ n = strtoul(optarg, &p, 10);
+ if (n == 0 && errno == ERANGE) {
+ fprintf(stderr, "invalid delay (near %s)\n",
+ optarg);
+ exit(1);
+ }
+ if (*p == 's') {
+ if (UINT_MAX / USEC_IN_SEC < n) {
+ fprintf(stderr, "delay too big (near %s)\n",
+ optarg);
+ exit(1);
+ }
+ n *= USEC_IN_SEC;
+ p++;
+ }
+ tmap[tmax].delay = n;
+ tmax++;
+
+ optarg = p;
+ if (*optarg == ':')
+ optarg++;
+ }
+ break;
+
+ default:
+ exit(2);
+ }
}
- }
- if (printable && !isprint(start))
- do {
- start = (start + 1) % mod;
- } while (printable && !isprint(start));
+ if (printable && !isprint(start))
+ do {
+ start = (start + 1) % mod;
+ } while (printable && !isprint(start));
- for (i = 0; i < len; i++) {
- putchar(start);
- do {
- start = (start + 1) % mod;
- } while (printable && !isprint(start));
- }
- return 0;
+ for (i = 0; i < len; i++) {
+ if (tind < tmax && tmap[tind].off == i) {
+ usleep(tmap[tind].delay);
+ tind++;
+ }
+ putchar(start);
+ do {
+ start = (start + 1) % mod;
+ } while (printable && !isprint(start));
+ }
+ return 0;
}
diff --git a/t/delay.at b/t/delay.at
new file mode 100644
index 0000000..713459a
--- /dev/null
+++ b/t/delay.at
@@ -0,0 +1,27 @@
+# This file is part of rpipe testsuite -*- autotest -*-
+# Copyright (C) 2019-2021 Sergey Poznyakoff
+# License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>
+# This is free software: you are free to change and redistribute it.
+# There is NO WARRANTY, to the extent permitted by law.
+AT_SETUP([delayed input])
+AT_KEYWORDS([data receive delay])
+
+AT_CHECK([chargen -t 0=1s:10=500000:15=1s -l 16 | tee expout | \
+ rpipe -T $TESTDIR/t_store -- 0
+status=$?
+cat store
+exit $status
+],
+[0],
+[expout])
+
+AT_CHECK([chargen -t 0=1s:10=500000:8000=1s:15999=1s -l 16000 | tee expout | \
+ rpipe -T $TESTDIR/t_store -- 0
+status=$?
+cat store
+exit $status
+],
+[0],
+[expout])
+
+AT_CLEANUP
diff --git a/t/pipein.at b/t/pipein.at
index 975b614..62ee0ff 100644
--- a/t/pipein.at
+++ b/t/pipein.at
@@ -5,7 +5,7 @@
# There is NO WARRANTY, to the extent permitted by law.
AT_SETUP([data sent])
AT_KEYWORDS([data send])
-AT_CHECK([chargen -p 16 > input
+AT_CHECK([chargen -p -l 16 > input
cp input expout
echo "" >> expout
rpipe -T $TESTDIR/t_echo -- 0 < input
@@ -13,7 +13,13 @@ rpipe -T $TESTDIR/t_echo -- 0 < input
[0],
[expout])
-AT_CHECK([chargen -p 16000 > input
+AT_CHECK([
+cat input | rpipe -T $TESTDIR/t_echo -- 0
+],
+[0],
+[expout])
+
+AT_CHECK([chargen -p -l 16000 > input
cp input expout
echo "" >> expout
rpipe -T $TESTDIR/t_echo -- 0 < input
@@ -21,10 +27,8 @@ rpipe -T $TESTDIR/t_echo -- 0 < input
[0],
[expout])
-AT_CHECK([chargen -p 16000 > input
-cp input expout
-echo "" >> expout
-rpipe -T $TESTDIR/t_echo -- 10 < input
+AT_CHECK([
+cat input | rpipe -T $TESTDIR/t_echo -- 10
],
[10],
[expout])
diff --git a/t/pipeout.at b/t/pipeout.at
index d34b77a..79add31 100644
--- a/t/pipeout.at
+++ b/t/pipeout.at
@@ -5,7 +5,8 @@
# There is NO WARRANTY, to the extent permitted by law.
AT_SETUP([data received])
AT_KEYWORDS([data receive])
-AT_CHECK([chargen 16 > expout
+
+AT_CHECK([chargen -l 16 > expout
rpipe -T $TESTDIR/t_store -- 0 < expout
status=$?
cat store
@@ -13,7 +14,17 @@ exit $status
],
[0],
[expout])
-AT_CHECK([chargen 16000 > expout
+
+AT_CHECK([
+cat expout | rpipe -T $TESTDIR/t_store -- 0
+status=$?
+cat store
+exit $status
+],
+[0],
+[expout])
+
+AT_CHECK([chargen -l 16000 > expout
rpipe -T $TESTDIR/t_store -- 0 < expout
status=$?
cat store
@@ -21,4 +32,14 @@ exit $status
],
[0],
[expout])
+
+AT_CHECK([
+cat expout | rpipe -T $TESTDIR/t_store -- 0
+status=$?
+cat store
+exit $status
+],
+[0],
+[expout])
+
AT_CLEANUP
diff --git a/t/status.at b/t/status.at
index 6c1ab83..12f27cf 100644
--- a/t/status.at
+++ b/t/status.at
@@ -5,8 +5,15 @@
# There is NO WARRANTY, to the extent permitted by law.
AT_SETUP([status propagation])
AT_KEYWORDS([status])
-AT_CHECK([rpipe -T $TESTDIR/t_sink -- 0],[0])
-AT_CHECK([rpipe -T $TESTDIR/t_sink -- 1],[1])
-AT_CHECK([chargen 2048 | rpipe -T $TESTDIR/t_sink -- 0],[0])
-AT_CHECK([chargen 2048 | rpipe -T $TESTDIR/t_sink -- 1],[1])
+AT_CHECK([rpipe -T /bin/sh -c 'exit 0' --],[0])
+AT_CHECK([rpipe -T /bin/sh -c 'exit 1' --],[1])
+
+AT_CHECK([rpipe -T $TESTDIR/t_sink -- 0 < /dev/null],[0])
+AT_CHECK([rpipe -T $TESTDIR/t_sink -- 1 < /dev/null],[1])
+
+AT_CHECK([cat /dev/null | rpipe -T $TESTDIR/t_sink -- 0],[0])
+AT_CHECK([cat /dev/null | rpipe -T $TESTDIR/t_sink -- 1],[1])
+
+AT_CHECK([chargen -l 2048 | rpipe -T $TESTDIR/t_sink -- 0],[0])
+AT_CHECK([chargen -l 2048 | rpipe -T $TESTDIR/t_sink -- 1],[1])
AT_CLEANUP
diff --git a/t/testsuite.at b/t/testsuite.at
index 630ddb0..cf4124e 100644
--- a/t/testsuite.at
+++ b/t/testsuite.at
@@ -10,4 +10,5 @@ AT_INIT
AT_TESTED([rpipe])
m4_include([status.at])
m4_include([pipeout.at])
+m4_include([delay.at])
m4_include([pipein.at])

Return to:

Send suggestions and report system problems to the System administrator.