/* wydawca - automatic release submission daemon Copyright (C) 2009-2013 Sergey Poznyakoff This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . */ #include "wydawca.h" #define STATE_FINISHED 0x01 #define STATE_QUEUED 0x02 #define STATE_ACTIVE 0x04 #define STATE_ANY (STATE_FINISHED|STATE_QUEUED|STATE_ACTIVE) struct job { struct job *next, *prev; int state; struct spool *spool; struct wy_triplet *triplet_list; uid_t uid; pid_t pid; time_t timestamp; int exit_status; }; struct job *queue; size_t jobmax; size_t jobcnt; struct spool fake_spool = { "all spools" }, inotify_spool = { "inotify"}; static int wakeup; RETSIGTYPE queue_signal(int sig) { wakeup = 1; signal(sig, queue_signal); } void set_timer(time_t interval) { wakeup = 0; if (interval) alarm(interval); } void triplet_jq_unlink(struct wy_triplet *tp) { if (!tp || !tp->job) return; if (tp->prev) tp->jq_prev->jq_next = tp->jq_next; else tp->job->triplet_list = tp->jq_next; if (tp->jq_next) tp->jq_next->jq_prev = tp->jq_prev; tp->jq_next = tp->jq_prev = NULL; } void jq_clear(struct job *job) { struct wy_triplet *tp; if (!job->triplet_list) return; for (tp = job->triplet_list; tp; ) { struct wy_triplet *next = tp->jq_next; remove_triplet(tp, 1); tp = next; } job->triplet_list = NULL; } struct job * job_locate(const struct spool *spool, uid_t uid, int mask) { struct job *p; for (p = queue; p; p = p->next) if ((p->state & mask) && p->spool == spool && p->uid == uid) break; return p; } size_t job_active_count() { struct job *job; size_t count = 0; for (job = queue; job; job = job->next) if (job->state & STATE_ACTIVE) count++; return count; } static int procspool(struct spool *spool, void *data) { struct wy_triplet *trp = *(struct wy_triplet **)data; if (!trp) return 0; timer_start("spool"); timer_start(spool->tag); spool_commit_triplets(spool, trp); timer_stop(spool->tag); timer_stop("spool"); return 0; } int wydawca_scanner(struct job *job) { int rc; initstats(); timer_start("wydawca"); spool_create_timers(); if (job->spool == &inotify_spool) rc = for_each_spool(procspool, &job->triplet_list); else if (job->spool == &fake_spool) rc = scan_all_spools(1, &job->uid); else { rc = scan_spool(job->spool, 1, &job->uid); } timer_stop("wydawca"); logstats(); notify_flush(job->spool); return rc; } int job_start(struct job *job) { pid_t pid; if (jobmax && jobcnt == jobmax) { wy_log(LOG_NOTICE, "maximum number of processes active"); return 1; } wy_debug(1, (_("starting job: %s, %lu"), job->spool->tag, (unsigned long)job->uid)); if (single_process) { if (wydawca_scanner(job)) job->state = STATE_QUEUED; else job->state = STATE_FINISHED; wakeup = 1; return 0; } pid = fork(); if (pid == 0) { int i; signal(SIGHUP, SIG_DFL); signal(SIGTERM, SIG_DFL); signal(SIGQUIT, SIG_DFL); signal(SIGINT, SIG_DFL); signal(SIGCHLD, SIG_DFL); signal(SIGALRM, SIG_DFL); alarm(0); for (i = getdtablesize(); i > 2; i--) close(i); exit(wydawca_scanner(job) ? WYDAWCA_EX_AGAIN : 0); } else if (pid == -1) { wy_log(LOG_CRIT, "fork: %s", strerror(errno)); return -1; } else { job->state = STATE_ACTIVE; job->pid = pid; jobcnt++; } return 0; } void job_remove(struct job *job) { struct job *p; wy_debug(1, (_("removing job: %s, %lu"), job->spool->tag, (unsigned long)job->uid)); p = job->prev; if (p) p->next = job->next; else queue = job->next; p = job->next; if (p) p->prev = job->prev; } void job_insert(struct job *job, struct job *elt) { struct job *p; job->prev = elt; if (!elt) { if (queue) queue->prev = job; job->next = queue; queue = job; return; } p = elt->next; elt->next = job; if (p) p->prev = job; } void schedule_job(struct spool *spool, uid_t uid, struct wy_triplet *tp) { struct job *job; if (!spool) spool = &fake_spool; wy_debug(1, (_("scheduling job: %s, %lu"), spool->tag, (unsigned long)uid)); job = job_locate(spool, uid, tp ? STATE_QUEUED : STATE_ANY); if (!job) { job = grecs_zalloc(sizeof(*job)); job->spool = spool; job->uid = uid; job->pid = -1; job->triplet_list = tp; for (; tp; tp = tp->jq_next) tp->job = job; time(&job->timestamp); job_insert(job, NULL); } job->state |= STATE_QUEUED; job_start(job); } static void print_status(struct job *job, int expect_term) { struct passwd *pw = getpwuid(job->uid); int status = job->exit_status; if (WIFEXITED(status)) { int exit_code = WEXITSTATUS(status); if (exit_code == 0) { wy_debug(1, (_("%lu (%s, %s) exited successfully"), (unsigned long)job->pid, job->spool->tag, pw->pw_name)); } else if (exit_code == WYDAWCA_EX_AGAIN) { wy_debug(1, (_("%lu (%s, %s) reported tempfail"), (unsigned long)job->pid, job->spool->tag, pw->pw_name)); } else wy_log(LOG_ERR, _("%lu (%s, %s) failed with status %d"), (unsigned long)job->pid, job->spool->tag, pw->pw_name, exit_code); } else if (WIFSIGNALED(status)) { int prio; if (expect_term && WTERMSIG(status) == SIGTERM) { if (!wy_debug_level) return; prio = LOG_DEBUG; } else prio = LOG_ERR; wy_log(prio, _("%lu (%s, %s) terminated on signal %d"), (unsigned long)job->pid, job->spool->tag, pw->pw_name, WTERMSIG(status)); } else if (WIFSTOPPED(status)) wy_log(LOG_NOTICE, _("%lu (%s, %s) stopped on signal %d"), (unsigned long)job->pid, job->spool->tag, pw->pw_name, WSTOPSIG(status)); #ifdef WCOREDUMP else if (WCOREDUMP(status)) wy_log(LOG_NOTICE, _("%lu (%s, %s) dumped core"), (unsigned long)job->pid, job->spool->tag, pw->pw_name); #endif else wy_log(LOG_ERR, _("%lu (%s, %s) terminated with unrecognized status"), (unsigned long)job->pid, job->spool->tag, pw->pw_name); } void job_queue_runner(time_t min_interval) { int status; struct job *job; time_t now = time(NULL); if (wakeup) { wakeup = 0; for (;;) { pid_t pid = waitpid((pid_t) - 1, &status, WNOHANG); if (pid <= 0) break; for (job = queue; job; job = job->next) { if ((job->state & STATE_ACTIVE) && job->pid == pid) { job->state &= ~STATE_ACTIVE; job->state |= STATE_FINISHED; jq_clear(job); job->exit_status = status; jobcnt--; } } } for (job = queue; job;) { struct job *next = job->next; if (job->state & STATE_FINISHED) { print_status(job, 0); if ((job->state &= ~STATE_FINISHED) == 0) { if (WIFEXITED(job->exit_status) && WEXITSTATUS(job->exit_status) == WYDAWCA_EX_AGAIN) { time_t interval = lock_timeout; if (interval == 0) interval = lock_expire_time; /* Re-queue the job */ job->state = STATE_QUEUED; job->timestamp = now + interval; } else { job_remove(job); free(job); job = next; continue; } } } if (job->state == STATE_QUEUED) { if (job->timestamp >= now) { if (job_start(job)) pause(); now = time(NULL); } else { time_t interval = job->timestamp - now; if (min_interval == 0 || interval < min_interval) min_interval = interval; } } job = next; } } if (min_interval) { wy_debug(2, (_("computed interval: %lu"), min_interval)); set_timer(min_interval); } } void job_init() { signal(SIGCHLD, queue_signal); signal(SIGALRM, queue_signal); }