/* This file is part of Smap. Copyright (C) 2006-2007, 2010, 2014 Sergey Poznyakoff Smap is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. Smap is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with Smap. If not, see . */ #ifdef HAVE_CONFIG_H # include #endif #include #include #include #include #include #include #include #include #include #include "smap/diag.h" #include "smap/stream.h" #include "smap/streamdef.h" /* Bound on length of the string representing a size_t. log10 (2.0) < 146/485; */ #define SIZE_T_STRLEN_BOUND ((sizeof(size_t) * CHAR_BIT) * 146 / 485 + 1) struct sockmap_output_stream { struct _smap_stream base; int fd; size_t bufsize; char *buf; int debug_idx; char *debug_pfx; }; static char * format_len(char *buf, size_t arg) { char *p = buf + SIZE_T_STRLEN_BOUND; *--p = 0; while (arg) { unsigned n = arg % 10; *--p = '0' + n; arg /= 10; } return p; } static int _sockmap_output_stream_write(struct _smap_stream *stream, const char *buf, size_t len, size_t *pret) { struct sockmap_output_stream *sp = (struct sockmap_output_stream *)stream; char nbuf[SIZE_T_STRLEN_BOUND+1], *p; size_t size, n; if (smap_trace_str) { smap_stream_write(smap_trace_str, buf, len, NULL); } len--; p = format_len(nbuf, len); n = strlen(p); size = n + 3 + len; if (size > sp->bufsize) { char *newbuf = realloc(sp->buf, size); if (!newbuf) return ENOMEM; sp->buf = newbuf; sp->bufsize = size; } memcpy(sp->buf, p, n); sp->buf[n++] = ':'; memcpy(sp->buf + n, buf, len); n += len; sp->buf[n++] = ','; sp->buf[n] = 0; if (smap_debug_np(sp->debug_idx, 10)) smap_stream_printf(smap_debug_str, "%s: %s\n", sp->debug_pfx ? sp->debug_pfx : "send", sp->buf); if (write(sp->fd, sp->buf, n) != n) return errno; *pret = len + 1; return 0; } static int _sockmap_output_stream_close(struct _smap_stream *stream) { struct sockmap_output_stream *sp = (struct sockmap_output_stream *) stream; if (stream->flags & SMAP_STREAM_NO_CLOSE) return 0; if (close(sp->fd)) return errno; return 0; } static int _sockmap_output_stream_ioctl(struct _smap_stream *stream, int code, void *ptr) { struct sockmap_output_stream *sp = (struct sockmap_output_stream *) stream; switch (code) { case SMAP_IOCTL_SET_DEBUG_IDX: if (!ptr) return EINVAL; sp->debug_idx = *(int*)ptr; break; case SMAP_IOCTL_SET_DEBUG_PFX: if (!ptr) return EINVAL; sp->debug_pfx = strdup((char*)ptr); break; default: return EINVAL; } return 0; } static void _sockmap_output_stream_destroy(struct _smap_stream *stream) { struct sockmap_output_stream *sp = (struct sockmap_output_stream *)stream; free(sp->buf); free(sp->debug_pfx); } int smap_sockmap_output_stream_create(smap_stream_t *pstream, int fd, int flags) { struct sockmap_output_stream *str = (struct sockmap_output_stream *) _smap_stream_create(sizeof(*str), SMAP_STREAM_WRITE | (flags & SMAP_STREAM_NO_CLOSE)); if (!str) return ENOMEM; str->fd = fd; str->base.write = _sockmap_output_stream_write; str->base.close = _sockmap_output_stream_close; str->base.ctl = _sockmap_output_stream_ioctl; str->base.done = _sockmap_output_stream_destroy; *pstream = (smap_stream_t) str; return 0; } struct sockmap_input_stream { struct _smap_stream base; char nbuf[SIZE_T_STRLEN_BOUND+1]; /* Buffer for reading initial segment of data */ size_t nlen; /* Length of data stored in nbuf */ size_t cp; /* Offset of colon in nbuf */ size_t reqsz; /* Payload size */ int fd; int debug_idx; char *debug_pfx; }; static void report_invalid_prefix(struct sockmap_input_stream *sp, const char *diag) { static char buf[1024]; size_t n; struct sockaddr_in saddr; socklen_t slen; slen = sizeof(saddr); if (getpeername(sp->fd, (struct sockaddr*) &saddr, &slen) != -1) { if (saddr.sin_family == AF_INET) smap_stream_printf(smap_debug_str, "%s:%d: ", inet_ntoa(saddr.sin_addr), ntohs(saddr.sin_port)); else if (saddr.sin_family == AF_UNIX) smap_stream_printf(smap_debug_str, "[local socket]: "); } smap_stream_printf(smap_debug_str, "sockmap protocol error " "(%s): %s", diag, sp->nbuf); n = recv(sp->fd, buf, sizeof(buf), 0); if (n > 0) smap_stream_write(smap_debug_str, buf, n, NULL); smap_stream_write(smap_debug_str, "\n", 1, NULL); } int read_payload_length(struct sockmap_input_stream *sp) { int len = 0; sp->cp = 0; while (len < SIZE_T_STRLEN_BOUND) { ssize_t n; char *p; n = recv(sp->fd, sp->nbuf + len, SIZE_T_STRLEN_BOUND - len, 0); if (n < 0) { smap_debug(sp->debug_idx, 1, ("error reading from fd #%d: %s", sp->fd, strerror(errno))); return errno; } if (n == 0) return EOF; len += n; if ((p = memchr(sp->nbuf, ':', len))) { sp->cp = p - sp->nbuf; sp->nbuf[len] = 0; sp->nlen = len; return 0; } } sp->nbuf[len] = 0; sp->nlen = len; if (smap_debug_np(sp->debug_idx, 1)) report_invalid_prefix(sp, "prefix too long"); return EPROTO; } #define ISDIGIT(c) ('0' <= (c) && (c) <= '9') static int _sockmap_input_stream_read(struct _smap_stream *stream, char *buf, size_t size, size_t *pret) { struct sockmap_input_stream *sp = (struct sockmap_input_stream *)stream; int rc; size_t reqsz; char *pb = buf; if (sp->nlen == 0) { size_t i; rc = read_payload_length(sp); if (rc == EOF) { *pret = 0; return 0; } if (smap_debug_np(sp->debug_idx, 10)) smap_stream_printf(smap_debug_str, "%s: %s", sp->debug_pfx ? sp->debug_pfx : "recv", sp->nbuf); if (rc) return rc; reqsz = 0; for (i = 0; i < sp->cp; i++) { reqsz *= 10; if (!ISDIGIT(sp->nbuf[i])) { if (smap_debug_np(sp->debug_idx, 1)) report_invalid_prefix(sp, "invalid prefix"); sp->nlen = sp->cp = sp->reqsz = 0; return EPROTO; } reqsz += sp->nbuf[i] - '0'; } sp->reqsz = reqsz; } else reqsz = sp->reqsz; reqsz++; if (reqsz > size) { *pret = reqsz; stream->flags |= _SMAP_STR_MORESPC; return ERANGE; } if (sp->nlen > sp->cp) { size_t rds = sp->nlen - sp->cp - 1; memcpy(buf, sp->nbuf + sp->cp + 1, rds); buf += rds; size -= rds; reqsz -= rds; } while (reqsz > 0) { size_t n = recv(sp->fd, buf, reqsz, 0); if (smap_debug_np(sp->debug_idx, 10)) smap_stream_write(smap_debug_str, buf, n, NULL); if (n < 0) return errno; if (n == 0) return EIO; buf += n; size -= n; reqsz -= n; } if (smap_debug_np(sp->debug_idx, 10)) { smap_stream_write(smap_debug_str, "\n", 1, NULL); smap_stream_flush(smap_debug_str); } if (smap_trace_str) { smap_stream_write(smap_trace_str, pb, sp->reqsz, NULL); smap_stream_printf(smap_trace_str, " => "); } *pret = sp->reqsz + 1; sp->nlen = sp->cp = sp->reqsz = 0; if (buf[-1] == ',') buf[-1] = '\n'; else { smap_debug(sp->debug_idx, 1, ("sockmap protocol error " "(mising terminating comma)")); return EPROTO; } return 0; } static int _sockmap_input_stream_close(struct _smap_stream *stream) { struct sockmap_input_stream *sp = (struct sockmap_input_stream *) stream; if (stream->flags & SMAP_STREAM_NO_CLOSE) return 0; if (close(sp->fd)) return errno; return 0; } static void _sockmap_input_stream_destroy(struct _smap_stream *stream) { struct sockmap_input_stream *sp = (struct sockmap_input_stream *)stream; free(sp->debug_pfx); } static int _sockmap_input_stream_ioctl(struct _smap_stream *stream, int code, void *ptr) { struct sockmap_input_stream *sp = (struct sockmap_input_stream *) stream; switch (code) { case SMAP_IOCTL_SET_DEBUG_IDX: if (!ptr) return EINVAL; sp->debug_idx = *(int*)ptr; break; case SMAP_IOCTL_SET_DEBUG_PFX: if (!ptr) return EINVAL; sp->debug_pfx = strdup((char*)ptr); break; default: return EINVAL; } return 0; } int smap_sockmap_input_stream_create(smap_stream_t *pstream, int fd, int flags) { struct sockmap_input_stream *str = (struct sockmap_input_stream *) _smap_stream_create(sizeof(*str), SMAP_STREAM_READ | (flags & SMAP_STREAM_NO_CLOSE)); if (!str) return ENOMEM; str->fd = fd; str->base.read = _sockmap_input_stream_read; /* FIXME: Implement readdelim */ str->base.close = _sockmap_input_stream_close; str->base.ctl = _sockmap_input_stream_ioctl; str->base.done = _sockmap_input_stream_destroy; *pstream = (smap_stream_t) str; return 0; } struct sockmap_stream { struct _smap_stream base; int fd; smap_stream_t in; smap_stream_t out; }; static int _sockmap_stream_write(struct _smap_stream *stream, const char *buf, size_t size, size_t *pret) { struct sockmap_stream *sp = (struct sockmap_stream *) stream; return smap_stream_write(sp->out, buf, size, pret); } static int _sockmap_stream_read(struct _smap_stream *stream, char *buf, size_t size, size_t *pret) { struct sockmap_stream *sp = (struct sockmap_stream *) stream; return smap_stream_read(sp->in, buf, size, pret); } static int _sockmap_stream_close(struct _smap_stream *stream) { struct sockmap_stream *sp = (struct sockmap_stream *) stream; if (stream->flags & SMAP_STREAM_NO_CLOSE) return 0; smap_stream_close(sp->in); smap_stream_close(sp->out); if (sp->fd != -1 && close(sp->fd)) return errno; return 0; } static void _sockmap_stream_done(struct _smap_stream *stream) { struct sockmap_stream *sp = (struct sockmap_stream *) stream; if (stream->flags & SMAP_STREAM_NO_CLOSE) return; smap_stream_destroy(&sp->in); smap_stream_destroy(&sp->out); } static int _sockmap_stream_ioctl(struct _smap_stream *stream, int code, void *ptr) { struct sockmap_stream *sp = (struct sockmap_stream *) stream; char **pfx; switch (code) { case SMAP_IOCTL_SET_DEBUG_IDX: if (!ptr) return EINVAL; smap_stream_ioctl(sp->in, code, ptr); smap_stream_ioctl(sp->out, code, ptr); break; case SMAP_IOCTL_SET_DEBUG_PFX: if (!ptr) return EINVAL; pfx = ptr; smap_stream_ioctl(sp->in, code, pfx[0]); smap_stream_ioctl(sp->out, code, pfx[1]); break; default: return EINVAL; } return 0; } int smap_sockmap_stream_create2(smap_stream_t *pstream, int fd[], int flags) { int rc; int sflags = 0; struct sockmap_stream *str = (struct sockmap_stream *) _smap_stream_create(sizeof(*str), SMAP_STREAM_READ|SMAP_STREAM_WRITE| flags); if (!str) return ENOMEM; if (fd[0] == fd[1]) sflags = SMAP_STREAM_NO_CLOSE; rc = smap_sockmap_output_stream_create(&str->out, fd[1], sflags); if (rc) { free(str); return rc; } rc = smap_sockmap_input_stream_create(&str->in, fd[0], sflags); if (rc) { free(str); smap_stream_destroy(&str->in); return rc; } smap_stream_set_buffer(str->out, smap_buffer_line, 1024); smap_stream_set_flags(str->out, SMAP_STREAM_EXPBUF); smap_stream_set_buffer(str->in, smap_buffer_full, 1024); smap_stream_set_flags(str->in, SMAP_STREAM_EXPBUF); str->base.read = _sockmap_stream_read; str->base.write = _sockmap_stream_write; str->base.close = _sockmap_stream_close; str->base.ctl = _sockmap_stream_ioctl; str->base.done = _sockmap_stream_done; str->fd = fd[0] == fd[1] ? fd[0] : -1; *pstream = (smap_stream_t)str; return 0; } int smap_sockmap_stream_create(smap_stream_t *pstream, int fd, int flags) { int pfd[2] = { fd, fd }; return smap_sockmap_stream_create2(pstream, pfd, flags); }