/* This file is part of Grot. Copyright (C) 2009, 2010 Sergey Poznyakoff Grot is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. Grot is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with grot. If not, see . */ #include "grot.h" #include #include char *progname; #define DEBUG_QUERY(host, query) \ do \ if (verbose > 1) \ printf("%s: query: %s\n", \ host, query); \ while (0) void verror(MYSQL *mysql, char *fmt, va_list ap) { fprintf(stderr, "%s: ", progname); vfprintf(stderr, fmt, ap); if (mysql) fprintf(stderr, ": %s", mysql_error(mysql)); fprintf(stderr, "\n"); } void terror(int code, MYSQL *mysql, char *fmt, ...) { va_list ap; va_start(ap, fmt); verror(mysql, fmt, ap); va_end(ap); if (code) exit(code); } void * xmalloc(size_t size) { void *p = malloc(size); if (!p) terror(1, NULL, "not enough memory"); return p; } void * xzalloc(size_t size) { void *p = xmalloc(size); memset(p, 0, size); return p; } char * xstrdup(char *str) { char *p; if (!str) return NULL; p = xmalloc(strlen(str) + 1); return strcpy(p, str); } MYSQL mysql; struct config_file { char *file; char *group; }; struct config_file slave_files[] = { { DEFAULT_USER_CONFIG, NULL }, /* group will be replaced by the host name */ { DEFAULT_USER_CONFIG, "slave" }, { DEFAULT_SYSTEM_CONFIG, NULL }, /* likewise */ { DEFAULT_SYSTEM_CONFIG, "slave" }, }; struct config_file master_files[] = { { DEFAULT_USER_CONFIG, "grot" }, { DEFAULT_SYSTEM_CONFIG, "grot" }, { NULL, "grot" }, }; #define NITEMS(a) sizeof(a)/sizeof((a)[0]) MYSQL * try_connect(MYSQL *mp, struct config_file *cfg, int ncfg, char *host, char *user, char *pass, unsigned port, char *socket) { int i; MYSQL *ret; for (i = 0; i < ncfg; i++, cfg++) { mysql_init(mp); if (cfg->file) mysql_options(mp, MYSQL_READ_DEFAULT_FILE, cfg->file); if (cfg->group) mysql_options(mp, MYSQL_READ_DEFAULT_GROUP, cfg->group); ret = mysql_real_connect(mp, host, user, pass, NULL, port, socket, 0); if (ret) break; mysql_close(mp); } return ret; } static void do_connect() { if (!try_connect(&mysql, master_files, NITEMS(master_files), grot_host, grot_user, grot_password, grot_port, grot_socket)) terror(1, &mysql, "cannot connect to master SQL server"); } struct slave { struct slave *next; char *host; char *logfile; }; unsigned long slave_count; struct slave *slave_head, *slave_tail; void add_slave(char *name, size_t len) { struct slave *sp = xzalloc(sizeof(*sp)); sp->host = xmalloc(len+1); memcpy(sp->host, name, len); sp->next = NULL; sp->host[len] = 0; if (slave_tail) slave_tail->next = sp; else slave_head = sp; slave_tail = sp; slave_count++; } /* 0 1 2 3 4 401230 bind ns1.farlep.net:60446 (null) Binlog Dump 3837405 Has sent all binlog to slave; waiting for binlog to be updated (null) */ void collect_slaves() { MYSQL_RES *res; size_t nrows, ncols, i; DEBUG_QUERY("master", "SHOW PROCESS LIST"); res = mysql_list_processes(&mysql); if (!res) terror(1, &mysql, "cannot list processes"); nrows = mysql_num_rows (res); ncols = mysql_num_fields (res); for (i = 0; i < nrows; i++) { MYSQL_ROW row; mysql_data_seek (res, i); row = mysql_fetch_row (res); if (strcmp(row[4], "Binlog Dump") == 0) { size_t len = strcspn(row[2], ":"); add_slave(row[2], len); } } mysql_free_result (res); } void list_slaves() { unsigned long i; struct slave *sp; printf("No. of slaves: %lu\n", slave_count); printf("Slave servers:\n"); for (sp = slave_head, i = 0; sp; sp = sp->next, i++) printf("%4ld: %s\n", i, sp->host); } void process_slave(struct slave *sp) { MYSQL sm; MYSQL_RES *res; MYSQL_ROW row; if (verbose) printf("Processing host %s\n", sp->host); slave_files[0].group = slave_files[2].group = sp->host; if (!try_connect(&sm, slave_files, NITEMS(slave_files), sp->host, NULL, NULL, 0, NULL)) terror(1, &sm, "cannot connect to slave server"); DEBUG_QUERY(sp->host, "SHOW SLAVE STATUS"); if (mysql_query(&sm, "SHOW SLAVE STATUS")) terror(1, &sm, "%s: cannot get slave status", sp->host); res = mysql_store_result(&sm); if (mysql_num_rows(res) < 1) terror(1, NULL, "%s: empty slave status", sp->host); row = mysql_fetch_row (res); if (verbose) printf("%s: %s\n", sp->host, row[5]); sp->logfile = xstrdup(row[5]); mysql_free_result(res); mysql_close(&sm); } void collect_slave_logs() { struct slave *sp; for (sp = slave_head; sp; sp = sp->next) process_slave(sp); } static char *log_prefix; static size_t log_prefix_len; static char *earliest_log; static char **server_log_table; static size_t server_log_count; int comp_log(const void *a, const void *b) { return strcmp(*(char**)a, *(char**)b); } char ** find_server_log(char *log) { return bsearch(&log, server_log_table, server_log_count, sizeof(server_log_table[0]), comp_log); } void set_log_prefix() { MYSQL_RES *res; MYSQL_ROW row; char *p; size_t count, i; DEBUG_QUERY("master", "SHOW BINARY LOGS"); if (mysql_query(&mysql, "SHOW BINARY LOGS")) terror(1, &mysql, "cannot list binary logs on master"); res = mysql_store_result(&mysql); if (!res) terror(1, &mysql, "cannot get result"); count = mysql_num_rows(res); row = mysql_fetch_row(res); p = strchr(row[0], '.'); if (!p) terror(1, NULL, "unexpected binary log name: %s", row[0]); log_prefix_len = p - row[0] + 1; log_prefix = xmalloc(log_prefix_len + 1); memcpy(log_prefix, row[0], log_prefix_len); log_prefix[log_prefix_len] = 0; if (verbose) printf("Log prefix: %s\n", log_prefix); server_log_table = xmalloc(count*sizeof(server_log_table[0])); server_log_count = count; for (i = 0; i < count; i++) { server_log_table[i] = xstrdup(row[0]); row = mysql_fetch_row(res); } qsort(server_log_table, server_log_count, sizeof(server_log_table[0]), comp_log); mysql_free_result(res); } static int logcmp(char *a, char *b) { unsigned long na, nb; na = strtoul(a + log_prefix_len, NULL, 10); nb = strtoul(b + log_prefix_len, NULL, 10); if (na < nb) return -1; else if (na > nb) return 1; return 0; } void find_earliest_log() { struct slave *sp; for (sp = slave_head; sp; sp = sp->next) { if (strlen(sp->logfile) < log_prefix_len || memcmp(sp->logfile, log_prefix, log_prefix_len)) { terror(0, NULL, "%s: logfile does not match base prefix: %s", sp->host, sp->logfile); continue; } if (!find_server_log(sp->logfile)) { terror(0, NULL, "Log %s is used by %s, but is absent on master", sp->logfile, sp->host); continue; } if (!earliest_log || logcmp(sp->logfile, earliest_log) < 0) earliest_log = sp->logfile; } } #define PURGE_TO_COMMAND "PURGE MASTER LOGS TO " void purge_logs_to(char *logfile) { char *buf = xmalloc(sizeof(PURGE_TO_COMMAND) + 2 + strlen(logfile)); strcpy(buf, PURGE_TO_COMMAND); strcat(buf, "'"); strcat(buf, logfile); strcat(buf, "'"); DEBUG_QUERY("master", buf); if (!dry_run_option && mysql_query(&mysql, buf)) terror(1, &mysql, "failed to purge logs"); free(buf); } void flush_logs() { DEBUG_QUERY("master", "FLUSH LOGS"); if (!dry_run_option && mysql_query(&mysql, "FLUSH LOGS")) terror(1, &mysql, "failed to purge logs"); } int main(int argc, char **argv) { progname = argv[0]; read_options(argc, argv); do_connect(); set_log_prefix(); collect_slaves(); if (!slave_head) terror(1, NULL, "No slaves"); if (verbose) list_slaves(); collect_slave_logs(); find_earliest_log(); if (earliest_log) { if (verbose) printf("earliest common log: %s\n", earliest_log); if (keep_count) { char **p = find_server_log(earliest_log); if (!p) abort(); /* should not happen */ p -= keep_count; if (p < server_log_table) earliest_log = NULL; else earliest_log = p[0]; } } if (earliest_log) { purge_logs_to(earliest_log); if (flush_logs_option) flush_logs(); } else { if (verbose) printf("Nothing to do\n"); } mysql_close(&mysql); exit(0); }