aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergey Poznyakoff <gray@gnu.org.ua>2020-02-18 11:08:12 +0200
committerSergey Poznyakoff <gray@gnu.org.ua>2020-02-18 11:13:11 +0200
commit2fd25b47e87977e9ec60ea1fc3d5d82a10572f55 (patch)
tree38b6bbc932068cad3a4da5a2b32d07442920098c
parentf3f2375e0d8636377994da045ca6ab6aaa51e3f0 (diff)
downloadping903-2fd25b47e87977e9ec60ea1fc3d5d82a10572f55.tar.gz
ping903-2fd25b47e87977e9ec60ea1fc3d5d82a10572f55.tar.bz2
Wait for pending probes to terminate before initializing new probes
* src/pinger.c (sendq_cond): Rename to sendq_update_cond. (sendq_empty_cond): New variable. (sendq_wait_empty): New function. (sendq_enqueue): Take second argument: delay in seconds. Broadcast sendq_empty_cond when the queue becomes empty. (p903_scheduler): Wait for pending probes to terminate. Correct the probe interval as necessary.
-rw-r--r--src/pinger.c84
1 files changed, 67 insertions, 17 deletions
diff --git a/src/pinger.c b/src/pinger.c
index 6ebf608..aef88d0 100644
--- a/src/pinger.c
+++ b/src/pinger.c
@@ -49,7 +49,8 @@ static int ping_fd;
static HOSTADDR *sendq_head, *sendq_tail;
static pthread_mutex_t sendq_mutex = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t sendq_cond = PTHREAD_COND_INITIALIZER;
+static pthread_cond_t sendq_update_cond = PTHREAD_COND_INITIALIZER;
+static pthread_cond_t sendq_empty_cond = PTHREAD_COND_INITIALIZER;
#define ICMP_HEADER_LEN (offsetof(struct icmp, icmp_data))
#define PING_HEADER_LEN (ICMP_HEADER_LEN+sizeof(struct timeval))
@@ -132,15 +133,33 @@ p903_init(void)
emalloc_die();
}
-void
-sendq_enqueue(HOSTADDR *addr)
+static int
+sendq_wait_empty(struct timeval *tv)
+{
+ int ret = 0;
+ struct timeval t1, t2;
+ pthread_mutex_lock(&sendq_mutex);
+ gettimeofday(&t1, NULL);
+ while (sendq_head) {
+ ret = 1;
+ pthread_cond_wait(&sendq_empty_cond, &sendq_mutex);
+ }
+ gettimeofday(&t2, NULL);
+ pthread_mutex_unlock(&sendq_mutex);
+ if (ret)
+ timersub(&t2, &t1, tv);
+ return ret;
+}
+
+static void
+sendq_enqueue(HOSTADDR *addr, int delay)
{
struct timeval tv;
pthread_mutex_lock(&sendq_mutex);
gettimeofday(&tv, NULL);
- tv.tv_sec += ping_interval;
+ tv.tv_sec += delay;
addr->xmit_tv = tv;
addr->next = NULL;
if (sendq_tail)
@@ -148,11 +167,11 @@ sendq_enqueue(HOSTADDR *addr)
else
sendq_head = addr;
sendq_tail = addr;
- pthread_cond_broadcast(&sendq_cond);
+ pthread_cond_broadcast(&sendq_update_cond);
pthread_mutex_unlock(&sendq_mutex);
}
-HOSTADDR *
+static HOSTADDR *
sendq_dequeue(void)
{
HOSTADDR *host;
@@ -160,13 +179,15 @@ sendq_dequeue(void)
pthread_mutex_lock(&sendq_mutex);
while (!sendq_head) {
- pthread_cond_wait(&sendq_cond, &sendq_mutex);
+ pthread_cond_wait(&sendq_update_cond, &sendq_mutex);
}
host = sendq_head;
sendq_head = host->next;
if (sendq_head == NULL)
sendq_tail = NULL;
host->next = NULL;
+ if (!sendq_head)
+ pthread_cond_broadcast(&sendq_empty_cond);
pthread_mutex_unlock(&sendq_mutex);
ts.tv_sec = host->xmit_tv.tv_sec;
@@ -175,7 +196,7 @@ sendq_dequeue(void)
return host;
}
-unsigned short
+static unsigned short
icmp_cksum(unsigned char * addr, int len)
{
register int sum = 0;
@@ -197,7 +218,7 @@ icmp_cksum(unsigned char * addr, int len)
return answer;
}
-int
+static int
icmp_generic_encode(unsigned char * buffer, size_t bufsize, int type,
int ident, int seqno)
{
@@ -216,7 +237,7 @@ icmp_generic_encode(unsigned char * buffer, size_t bufsize, int type,
return 0;
}
-int
+static int
icmp_generic_decode(unsigned char * buffer, size_t bufsize,
struct ip **ipp, struct icmp ** icmpp)
{
@@ -270,7 +291,7 @@ send_echo(HOSTADDR *host, unsigned char *ping_buffer)
seqno = seqno_alloc(host, &host->xmit_tv);
if (seqno == -1) {
- sendq_enqueue(host);
+ sendq_enqueue(host, 0);
return;
}
@@ -281,7 +302,7 @@ send_echo(HOSTADDR *host, unsigned char *ping_buffer)
if (n < 0) {
error("%s: sendto: %s", host->name, strerror(errno));
if (errno == EINTR || errno == ENOBUFS) {
- sendq_enqueue(host);
+ sendq_enqueue(host, 0);
}
} else {
if (host->xmit_count == 0)
@@ -318,13 +339,13 @@ p903_sender(void *p)
return NULL;
}
-double
+static double
nabs(double a)
{
return (a < 0) ? -a : a;
}
-double
+static double
nsqrt(double a, double prec)
{
double x0, x1;
@@ -689,7 +710,7 @@ p903_receiver(void *p)
log_echo((struct sockaddr *)&addr, addrlen, icmp, ip, n, rtt);
if (host->recv_count < ping_count) {
- sendq_enqueue(host);
+ sendq_enqueue(host, ping_interval);
} else {
host_stat(host, &tv_now);
}
@@ -697,15 +718,44 @@ p903_receiver(void *p)
}
}
+#define UPDATE_MAX 5
+
void *
p903_scheduler(void *p)
{
+ int update_round = 0;
+ double d;
+
while (1) {
size_t i;
- for (i = 0; i < hostaddr_count; i++) {
+ struct timeval tv;
+
+ /* Reset all statistics */
+ for (i = 0; i < hostaddr_count; i++)
host_reset(hostaddr + i);
- sendq_enqueue(hostaddr + i);
+
+ /* Wait for pending probes to finish */
+ if (sendq_wait_empty(&tv)) {
+ if (update_round == 0) {
+ info("scheduler: "
+ "starting probe interval correction");
+ d = 0;
+ }
+ d += timeval_to_double(&tv);
+ update_round++;
+ info("scheduler: average: %.3fs", d / update_round);
+ if (update_round == UPDATE_MAX) {
+ d /= update_round;
+ probe_interval += 3*d/2;
+ info("scheduler: "
+ "finished probe interval correction: %lu",
+ probe_interval);
+ update_round = 0;
+ }
}
+
+ for (i = 0; i < hostaddr_count; i++)
+ sendq_enqueue(hostaddr + i, 0);
sleep(probe_interval);
}
}

Return to:

Send suggestions and report system problems to the System administrator.