diff options
author | Sergey Poznyakoff <gray@gnu.org> | 2020-04-16 19:54:23 +0300 |
---|---|---|
committer | Sergey Poznyakoff <gray@gnu.org> | 2020-04-16 20:03:29 +0300 |
commit | a6cc8f5fcbcbf4c707149940de2ee9a33220c34d (patch) | |
tree | 3b0da6ae669a58779c9f40503191e6808a60e1b9 /src/job.c | |
parent | 2cc5b63b5f90d2dd9b05f50ac28e379d417f1a8c (diff) | |
download | wydawca-a6cc8f5fcbcbf4c707149940de2ee9a33220c34d.tar.gz wydawca-a6cc8f5fcbcbf4c707149940de2ee9a33220c34d.tar.bz2 |
Rewrite as a multi-threaded program.
* NEWS: Update.
* configure.ac: Version 3.1.95
* src/Makefile.am (wydawca_SOURCES): Remove job.c
(LDADD): Link with libpthread.
* src/job.c: Remove.
* src/config.c: Remove the wakeup-interval statement.
* src/net.c (wydawca_listener): Rewrite as a thread function
wy_thr_listen.
* src/timer.c (timer_get): Use thread-specific storage.
(timer_get_count): Remove.
* src/triplet.c: Rewrite using multi-thread model.
* src/wydawca.c: Likewise.
* src/wydawca.h: Update.
* tests/check-fail.at: Update expected output.
* tests/check-notify.at: Likewise.
* tests/inotify-ok.at: Likewise.
* tests/inotify-rmsymlink.at: Likewise.
* tests/inotify-symlink.at: Likewise.
* tests/inotify-unatt00.at: Likewise.
* tests/inotify-unatt01.at: Likewise.
* tests/upl12f.at: Likewise.
* tests/upload-dry.at: Likewise.
`
Diffstat (limited to 'src/job.c')
-rw-r--r-- | src/job.c | 441 |
1 files changed, 0 insertions, 441 deletions
diff --git a/src/job.c b/src/job.c deleted file mode 100644 index 84d3bb5..0000000 --- a/src/job.c +++ /dev/null @@ -1,441 +0,0 @@ -/* wydawca - automatic release submission daemon - Copyright (C) 2009-2013, 2017, 2019-2020 Sergey Poznyakoff - - This program is free software; you can redistribute it and/or modify it - under the terms of the GNU General Public License as published by the - Free Software Foundation; either version 3 of the License, or (at your - option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License along - with this program. If not, see <http://www.gnu.org/licenses/>. */ - -#include "wydawca.h" - -enum { - STATE_FINISHED, - STATE_QUEUED, - STATE_ACTIVE -}; - -#define STATE_MASK(s) (1<<(s)) - -#define STATE_ANY \ - (STATE_MASK(STATE_FINISHED) |\ - STATE_MASK(STATE_QUEUED) | \ - STATE_MASK(STATE_ACTIVE)) - -struct job { - enum job_type type; - struct job *next, *prev; - int state; - pid_t pid; - time_t timestamp; - int exit_status; - char *printable; - union { - struct wy_triplet *triplet; /* JOB_TRIPLET */ - struct spool *spool; /* JOB_SPOOL */ - } v; -}; - -struct job *queue; -size_t jobmax; -size_t jobcnt; - -static int wakeup; - -RETSIGTYPE -queue_signal(int sig) -{ - wakeup = 1; - signal(sig, queue_signal); -} - -void -set_timer(time_t interval) -{ - wakeup = 0; - if (interval) - alarm(interval); -} - - -char const * -job_printable(struct job *job) -{ - if (!job->printable) { - size_t s = 0; - - switch (job->type) { - case JOB_TRIPLET: - if (grecs_asprintf(&job->printable, &s, - "triplet(%s,%s)", - job->v.triplet->spool->tag, - job->v.triplet->name)) - grecs_alloc_die(); - break; - - case JOB_SPOOL: - if (grecs_asprintf(&job->printable, &s, - "spool(%s)", - job->v.spool->tag)) - grecs_alloc_die(); - break; - - case JOB_ALL_SPOOLS: - job->printable = grecs_strdup("all spools"); - } - } - return job->printable; -} - -struct job * -job_locate(int type, int mask, void *data) -{ - struct job *p; - for (p = queue; p; p = p->next) - if (p->type == type && (STATE_MASK(p->state) & mask)) { - switch (p->type) { - case JOB_TRIPLET: - if (p->v.triplet == data) - return p; - break; - - case JOB_SPOOL: - if (p->v.spool == data) - return p; - break; - - case JOB_ALL_SPOOLS: - return p; - } - } - return p; -} - -size_t -job_active_count() -{ - struct job *job; - size_t count = 0; - for (job = queue; job; job = job->next) - if (job->state == STATE_ACTIVE) - count++; - return count; -} - -int -wydawca_scanner(struct job *job) -{ - int rc; - initstats(); - timer_start("wydawca"); - spool_create_timers(); - switch (job->type) { - case JOB_TRIPLET: - triplet_commit(job->v.triplet); - rc = 0; - break; - - case JOB_ALL_SPOOLS: - rc = scan_all_spools(); - break; - - case JOB_SPOOL: - rc = scan_spool(job->v.spool); - notify_flush(job->v.spool); - } - timer_stop("wydawca"); - if (!single_process) - logstats(); - return rc; -} - -int -job_start(struct job *job) -{ - pid_t pid; - - if (jobmax && jobcnt == jobmax) { - wy_log(LOG_NOTICE, "maximum number of processes active"); - return 1; - } - - wy_debug(1, (_("starting job: %s"), job_printable(job))); - - if (single_process) { - if (wydawca_scanner(job)) - job->state = STATE_QUEUED; - else - job->state = STATE_FINISHED; - wakeup = 1; - return 0; - } - - pid = fork(); - if (pid == 0) { - signal(SIGHUP, SIG_DFL); - signal(SIGTERM, SIG_DFL); - signal(SIGQUIT, SIG_DFL); - signal(SIGINT, SIG_DFL); - signal(SIGCHLD, SIG_DFL); - signal(SIGALRM, SIG_DFL); - alarm(0); - exit(wydawca_scanner(job) ? WYDAWCA_EX_AGAIN : 0); - } else if (pid == -1) { - wy_log(LOG_CRIT, "fork: %s", strerror(errno)); - return -1; - } else { - job->state = STATE_ACTIVE; - job->pid = pid; - jobcnt++; - } - return 0; -} - -void -job_remove(struct job *job) -{ - struct job *p; - - wy_debug(1, (_("removing job: %s"), job_printable(job))); - p = job->prev; - if (p) - p->next = job->next; - else - queue = job->next; - - p = job->next; - if (p) - p->prev = job->prev; -} - -void -job_destroy(struct job *job) -{ - free(job->printable); - free(job); -} - -void -job_insert(struct job *job, struct job *elt) -{ - struct job *p; - - job->prev = elt; - if (!elt) { - if (queue) - queue->prev = job; - job->next = queue; - queue = job; - return; - } - - p = elt->next; - elt->next = job; - - if (p) - p->prev = job; -} - -void -job_schedule(int type, void *data) -{ - struct job *job; - - job = job_locate(type, - type == JOB_TRIPLET - ? STATE_MASK(STATE_QUEUED) : STATE_ANY, - data); - if (!job) { - job = grecs_zalloc(sizeof(*job)); - job->type = type; - job->pid = -1; - time(&job->timestamp); - - switch (type) { - case JOB_TRIPLET:{ - struct wy_triplet *tp = data; - job->v.triplet = tp; - tp->job = job; - break; - } - - case JOB_SPOOL: - job->v.spool = data; - break; - - case JOB_ALL_SPOOLS: - break; - } - wy_debug(1, (_("scheduling job %s"), job_printable(job))); - job_insert(job, NULL); - } - - job->state = STATE_QUEUED; - job_start(job); -} - -static void -print_status(struct job *job, int expect_term) -{ - int status = job->exit_status; - - if (WIFEXITED(status)) { - int exit_code = WEXITSTATUS(status); - if (exit_code == 0) { - wy_debug(1, (_("%lu (job %s) exited successfully"), - (unsigned long) job->pid, job_printable(job))); - } else if (exit_code == WYDAWCA_EX_AGAIN) { - wy_debug(1, (_("%lu (job %s) reported tempfail"), - (unsigned long) job->pid, job_printable(job))); - } else - wy_log(LOG_ERR, - _("%lu (job %s) failed with status %d"), - (unsigned long) job->pid, - job_printable(job), exit_code); - } else if (WIFSIGNALED(status)) { - int prio; - if (expect_term && WTERMSIG(status) == SIGTERM) { - if (!wy_debug_level) - return; - prio = LOG_DEBUG; - } else - prio = LOG_ERR; - - wy_log(prio, - _("%lu (job %s) terminated on signal %d"), - (unsigned long) job->pid, job_printable(job), - WTERMSIG(status)); - } else if (WIFSTOPPED(status)) - wy_log(LOG_NOTICE, - _("%lu (job %s) stopped on signal %d"), - (unsigned long) job->pid, job_printable(job), - WSTOPSIG(status)); -#ifdef WCOREDUMP - else if (WCOREDUMP(status)) - wy_log(LOG_NOTICE, - _("%lu (job %s) dumped core"), - (unsigned long) job->pid, job_printable(job)); -#endif - else - wy_log(LOG_ERR, - _("%lu (job %s) terminated with unrecognized status"), - (unsigned long) job->pid, job_printable(job)); -} - -void -job_queue_runner(time_t min_interval) -{ - struct job *job; - time_t now = time(NULL); - - if (wakeup) { - wakeup = 0; - - for (;;) { - int status; - - pid_t pid = waitpid((pid_t) - 1, &status, WNOHANG); - if (pid <= 0) - break; - for (job = queue; job; job = job->next) { - if (job->state == STATE_ACTIVE && job->pid == pid) { - remove_triplet(job->v.triplet, 1); - job->v.triplet = NULL; - job->exit_status = status; - job->state = STATE_FINISHED; - jobcnt--; - } - } - } - - for (job = queue; job;) { - struct job *next = job->next; - if (job->state == STATE_FINISHED) { - print_status(job, 0); - if (WIFEXITED(job->exit_status) && - WEXITSTATUS(job->exit_status) == WYDAWCA_EX_AGAIN) { - time_t interval = lock_timeout; - if (interval == 0) - interval = lock_expire_time; - /* Re-queue the job */ - job->state = STATE_QUEUED; - job->timestamp = now + interval; - } else { - job_remove(job); - job_destroy(job); - job = next; - continue; - } - } - - if (job->state == STATE_QUEUED) { - if (job->timestamp >= now) { - if (job_start(job)) - pause(); - now = time(NULL); - } else { - time_t interval = job->timestamp - now; - if (min_interval == 0 || interval < min_interval) - min_interval = interval; - } - } - - job = next; - } - } - - if (min_interval) { - wy_debug(2, (_("computed interval: %lu"), min_interval)); - set_timer(min_interval); - } -} - -void -job_queue_wait(void) -{ - while (queue) { - int status; - struct job *job; - - pid_t pid = waitpid((pid_t) - 1, &status, 0); - if (pid <= 0) - break; - for (job = queue; job; job = job->next) { - if (job->state == STATE_ACTIVE && job->pid == pid) { - remove_triplet(job->v.triplet, 1); - job->v.triplet = NULL; - job->exit_status = status; - job->state = STATE_FINISHED; - jobcnt--; - } - } - - for (job = queue; job;) { - struct job *next = job->next; - if (job->state == STATE_FINISHED) { - print_status(job, 0); - job_remove(job); - job_destroy(job); - } else if (job->state == STATE_QUEUED) { - job_remove(job); - job_destroy(job); - } - job = next; - } - } - logstats(); -} - -void -job_init(void) -{ - signal(SIGCHLD, queue_signal); - signal(SIGALRM, queue_signal); -} |