aboutsummaryrefslogtreecommitdiff
path: root/src/job.c
diff options
context:
space:
mode:
authorSergey Poznyakoff <gray@gnu.org>2020-04-16 19:54:23 +0300
committerSergey Poznyakoff <gray@gnu.org>2020-04-16 20:03:29 +0300
commita6cc8f5fcbcbf4c707149940de2ee9a33220c34d (patch)
tree3b0da6ae669a58779c9f40503191e6808a60e1b9 /src/job.c
parent2cc5b63b5f90d2dd9b05f50ac28e379d417f1a8c (diff)
downloadwydawca-a6cc8f5fcbcbf4c707149940de2ee9a33220c34d.tar.gz
wydawca-a6cc8f5fcbcbf4c707149940de2ee9a33220c34d.tar.bz2
Rewrite as a multi-threaded program.
* NEWS: Update. * configure.ac: Version 3.1.95 * src/Makefile.am (wydawca_SOURCES): Remove job.c (LDADD): Link with libpthread. * src/job.c: Remove. * src/config.c: Remove the wakeup-interval statement. * src/net.c (wydawca_listener): Rewrite as a thread function wy_thr_listen. * src/timer.c (timer_get): Use thread-specific storage. (timer_get_count): Remove. * src/triplet.c: Rewrite using multi-thread model. * src/wydawca.c: Likewise. * src/wydawca.h: Update. * tests/check-fail.at: Update expected output. * tests/check-notify.at: Likewise. * tests/inotify-ok.at: Likewise. * tests/inotify-rmsymlink.at: Likewise. * tests/inotify-symlink.at: Likewise. * tests/inotify-unatt00.at: Likewise. * tests/inotify-unatt01.at: Likewise. * tests/upl12f.at: Likewise. * tests/upload-dry.at: Likewise. `
Diffstat (limited to 'src/job.c')
-rw-r--r--src/job.c441
1 files changed, 0 insertions, 441 deletions
diff --git a/src/job.c b/src/job.c
deleted file mode 100644
index 84d3bb5..0000000
--- a/src/job.c
+++ /dev/null
@@ -1,441 +0,0 @@
-/* wydawca - automatic release submission daemon
- Copyright (C) 2009-2013, 2017, 2019-2020 Sergey Poznyakoff
-
- This program is free software; you can redistribute it and/or modify it
- under the terms of the GNU General Public License as published by the
- Free Software Foundation; either version 3 of the License, or (at your
- option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License along
- with this program. If not, see <http://www.gnu.org/licenses/>. */
-
-#include "wydawca.h"
-
-enum {
- STATE_FINISHED,
- STATE_QUEUED,
- STATE_ACTIVE
-};
-
-#define STATE_MASK(s) (1<<(s))
-
-#define STATE_ANY \
- (STATE_MASK(STATE_FINISHED) |\
- STATE_MASK(STATE_QUEUED) | \
- STATE_MASK(STATE_ACTIVE))
-
-struct job {
- enum job_type type;
- struct job *next, *prev;
- int state;
- pid_t pid;
- time_t timestamp;
- int exit_status;
- char *printable;
- union {
- struct wy_triplet *triplet; /* JOB_TRIPLET */
- struct spool *spool; /* JOB_SPOOL */
- } v;
-};
-
-struct job *queue;
-size_t jobmax;
-size_t jobcnt;
-
-static int wakeup;
-
-RETSIGTYPE
-queue_signal(int sig)
-{
- wakeup = 1;
- signal(sig, queue_signal);
-}
-
-void
-set_timer(time_t interval)
-{
- wakeup = 0;
- if (interval)
- alarm(interval);
-}
-
-
-char const *
-job_printable(struct job *job)
-{
- if (!job->printable) {
- size_t s = 0;
-
- switch (job->type) {
- case JOB_TRIPLET:
- if (grecs_asprintf(&job->printable, &s,
- "triplet(%s,%s)",
- job->v.triplet->spool->tag,
- job->v.triplet->name))
- grecs_alloc_die();
- break;
-
- case JOB_SPOOL:
- if (grecs_asprintf(&job->printable, &s,
- "spool(%s)",
- job->v.spool->tag))
- grecs_alloc_die();
- break;
-
- case JOB_ALL_SPOOLS:
- job->printable = grecs_strdup("all spools");
- }
- }
- return job->printable;
-}
-
-struct job *
-job_locate(int type, int mask, void *data)
-{
- struct job *p;
- for (p = queue; p; p = p->next)
- if (p->type == type && (STATE_MASK(p->state) & mask)) {
- switch (p->type) {
- case JOB_TRIPLET:
- if (p->v.triplet == data)
- return p;
- break;
-
- case JOB_SPOOL:
- if (p->v.spool == data)
- return p;
- break;
-
- case JOB_ALL_SPOOLS:
- return p;
- }
- }
- return p;
-}
-
-size_t
-job_active_count()
-{
- struct job *job;
- size_t count = 0;
- for (job = queue; job; job = job->next)
- if (job->state == STATE_ACTIVE)
- count++;
- return count;
-}
-
-int
-wydawca_scanner(struct job *job)
-{
- int rc;
- initstats();
- timer_start("wydawca");
- spool_create_timers();
- switch (job->type) {
- case JOB_TRIPLET:
- triplet_commit(job->v.triplet);
- rc = 0;
- break;
-
- case JOB_ALL_SPOOLS:
- rc = scan_all_spools();
- break;
-
- case JOB_SPOOL:
- rc = scan_spool(job->v.spool);
- notify_flush(job->v.spool);
- }
- timer_stop("wydawca");
- if (!single_process)
- logstats();
- return rc;
-}
-
-int
-job_start(struct job *job)
-{
- pid_t pid;
-
- if (jobmax && jobcnt == jobmax) {
- wy_log(LOG_NOTICE, "maximum number of processes active");
- return 1;
- }
-
- wy_debug(1, (_("starting job: %s"), job_printable(job)));
-
- if (single_process) {
- if (wydawca_scanner(job))
- job->state = STATE_QUEUED;
- else
- job->state = STATE_FINISHED;
- wakeup = 1;
- return 0;
- }
-
- pid = fork();
- if (pid == 0) {
- signal(SIGHUP, SIG_DFL);
- signal(SIGTERM, SIG_DFL);
- signal(SIGQUIT, SIG_DFL);
- signal(SIGINT, SIG_DFL);
- signal(SIGCHLD, SIG_DFL);
- signal(SIGALRM, SIG_DFL);
- alarm(0);
- exit(wydawca_scanner(job) ? WYDAWCA_EX_AGAIN : 0);
- } else if (pid == -1) {
- wy_log(LOG_CRIT, "fork: %s", strerror(errno));
- return -1;
- } else {
- job->state = STATE_ACTIVE;
- job->pid = pid;
- jobcnt++;
- }
- return 0;
-}
-
-void
-job_remove(struct job *job)
-{
- struct job *p;
-
- wy_debug(1, (_("removing job: %s"), job_printable(job)));
- p = job->prev;
- if (p)
- p->next = job->next;
- else
- queue = job->next;
-
- p = job->next;
- if (p)
- p->prev = job->prev;
-}
-
-void
-job_destroy(struct job *job)
-{
- free(job->printable);
- free(job);
-}
-
-void
-job_insert(struct job *job, struct job *elt)
-{
- struct job *p;
-
- job->prev = elt;
- if (!elt) {
- if (queue)
- queue->prev = job;
- job->next = queue;
- queue = job;
- return;
- }
-
- p = elt->next;
- elt->next = job;
-
- if (p)
- p->prev = job;
-}
-
-void
-job_schedule(int type, void *data)
-{
- struct job *job;
-
- job = job_locate(type,
- type == JOB_TRIPLET
- ? STATE_MASK(STATE_QUEUED) : STATE_ANY,
- data);
- if (!job) {
- job = grecs_zalloc(sizeof(*job));
- job->type = type;
- job->pid = -1;
- time(&job->timestamp);
-
- switch (type) {
- case JOB_TRIPLET:{
- struct wy_triplet *tp = data;
- job->v.triplet = tp;
- tp->job = job;
- break;
- }
-
- case JOB_SPOOL:
- job->v.spool = data;
- break;
-
- case JOB_ALL_SPOOLS:
- break;
- }
- wy_debug(1, (_("scheduling job %s"), job_printable(job)));
- job_insert(job, NULL);
- }
-
- job->state = STATE_QUEUED;
- job_start(job);
-}
-
-static void
-print_status(struct job *job, int expect_term)
-{
- int status = job->exit_status;
-
- if (WIFEXITED(status)) {
- int exit_code = WEXITSTATUS(status);
- if (exit_code == 0) {
- wy_debug(1, (_("%lu (job %s) exited successfully"),
- (unsigned long) job->pid, job_printable(job)));
- } else if (exit_code == WYDAWCA_EX_AGAIN) {
- wy_debug(1, (_("%lu (job %s) reported tempfail"),
- (unsigned long) job->pid, job_printable(job)));
- } else
- wy_log(LOG_ERR,
- _("%lu (job %s) failed with status %d"),
- (unsigned long) job->pid,
- job_printable(job), exit_code);
- } else if (WIFSIGNALED(status)) {
- int prio;
- if (expect_term && WTERMSIG(status) == SIGTERM) {
- if (!wy_debug_level)
- return;
- prio = LOG_DEBUG;
- } else
- prio = LOG_ERR;
-
- wy_log(prio,
- _("%lu (job %s) terminated on signal %d"),
- (unsigned long) job->pid, job_printable(job),
- WTERMSIG(status));
- } else if (WIFSTOPPED(status))
- wy_log(LOG_NOTICE,
- _("%lu (job %s) stopped on signal %d"),
- (unsigned long) job->pid, job_printable(job),
- WSTOPSIG(status));
-#ifdef WCOREDUMP
- else if (WCOREDUMP(status))
- wy_log(LOG_NOTICE,
- _("%lu (job %s) dumped core"),
- (unsigned long) job->pid, job_printable(job));
-#endif
- else
- wy_log(LOG_ERR,
- _("%lu (job %s) terminated with unrecognized status"),
- (unsigned long) job->pid, job_printable(job));
-}
-
-void
-job_queue_runner(time_t min_interval)
-{
- struct job *job;
- time_t now = time(NULL);
-
- if (wakeup) {
- wakeup = 0;
-
- for (;;) {
- int status;
-
- pid_t pid = waitpid((pid_t) - 1, &status, WNOHANG);
- if (pid <= 0)
- break;
- for (job = queue; job; job = job->next) {
- if (job->state == STATE_ACTIVE && job->pid == pid) {
- remove_triplet(job->v.triplet, 1);
- job->v.triplet = NULL;
- job->exit_status = status;
- job->state = STATE_FINISHED;
- jobcnt--;
- }
- }
- }
-
- for (job = queue; job;) {
- struct job *next = job->next;
- if (job->state == STATE_FINISHED) {
- print_status(job, 0);
- if (WIFEXITED(job->exit_status) &&
- WEXITSTATUS(job->exit_status) == WYDAWCA_EX_AGAIN) {
- time_t interval = lock_timeout;
- if (interval == 0)
- interval = lock_expire_time;
- /* Re-queue the job */
- job->state = STATE_QUEUED;
- job->timestamp = now + interval;
- } else {
- job_remove(job);
- job_destroy(job);
- job = next;
- continue;
- }
- }
-
- if (job->state == STATE_QUEUED) {
- if (job->timestamp >= now) {
- if (job_start(job))
- pause();
- now = time(NULL);
- } else {
- time_t interval = job->timestamp - now;
- if (min_interval == 0 || interval < min_interval)
- min_interval = interval;
- }
- }
-
- job = next;
- }
- }
-
- if (min_interval) {
- wy_debug(2, (_("computed interval: %lu"), min_interval));
- set_timer(min_interval);
- }
-}
-
-void
-job_queue_wait(void)
-{
- while (queue) {
- int status;
- struct job *job;
-
- pid_t pid = waitpid((pid_t) - 1, &status, 0);
- if (pid <= 0)
- break;
- for (job = queue; job; job = job->next) {
- if (job->state == STATE_ACTIVE && job->pid == pid) {
- remove_triplet(job->v.triplet, 1);
- job->v.triplet = NULL;
- job->exit_status = status;
- job->state = STATE_FINISHED;
- jobcnt--;
- }
- }
-
- for (job = queue; job;) {
- struct job *next = job->next;
- if (job->state == STATE_FINISHED) {
- print_status(job, 0);
- job_remove(job);
- job_destroy(job);
- } else if (job->state == STATE_QUEUED) {
- job_remove(job);
- job_destroy(job);
- }
- job = next;
- }
- }
- logstats();
-}
-
-void
-job_init(void)
-{
- signal(SIGCHLD, queue_signal);
- signal(SIGALRM, queue_signal);
-}

Return to:

Send suggestions and report system problems to the System administrator.