commit 0b3cfa820e632e6cd1652e261b008df39dddbc8a
parent 0ba5fd00ec4e1e25bb868b2b5a7b98d24b56197e
Author: Laurent Bercot <ska-skaware@skarnet.org>
Date: Sat, 25 Sep 2021 17:56:14 +0000
Full userspace 1-copy s6-ioconnect
Signed-off-by: Laurent Bercot <ska@appnovation.com>
Diffstat:
1 file changed, 69 insertions(+), 54 deletions(-)
diff --git a/src/conn-tools/s6-ioconnect.c b/src/conn-tools/s6-ioconnect.c
@@ -1,7 +1,5 @@
/* ISC license. */
-#include <skalibs/nonposix.h>
-#include <sys/socket.h>
#include <errno.h>
#include <signal.h>
@@ -9,7 +7,7 @@
#include <skalibs/allreadwrite.h>
#include <skalibs/sgetopt.h>
#include <skalibs/error.h>
-#include <skalibs/iobuffer.h>
+#include <skalibs/buffer.h>
#include <skalibs/sig.h>
#include <skalibs/selfpipe.h>
#include <skalibs/strerr2.h>
@@ -20,37 +18,37 @@
#define USAGE "s6-ioconnect [ -t timeout ] [ -r fdr ] [ -w fdw ] [ -0 ] [ -1 ] [ -6 ] [ -7 ]"
#define dieusage() strerr_dieusage(100, USAGE)
-typedef struct ioblah_s ioblah_t, *ioblah_t_ref ;
+#define BSIZE 8192
+
+typedef struct ioblah_s ioblah, *ioblah_ref ;
struct ioblah_s
{
- unsigned int fd ;
+ buffer b ;
unsigned int xindex ;
unsigned int flagsocket : 1 ;
- unsigned int flagopen : 1 ;
} ;
-static ioblah_t a[2][2] = { { { 0, 5, 0, 1 }, { 7, 5, 0, 1 } }, { { 6, 5, 0, 1 }, { 1, 5, 0, 1 } } } ;
-static iobuffer b[2] ;
-static iopause_fd x[5] = { { -1, IOPAUSE_READ, 0 } } ;
-
-static void closeit (unsigned int i, unsigned int j)
+static char buf[2][BSIZE] = { { '\0' }, { '\0' } } ;
+static ioblah a[2][2] =
{
- if (a[i][j].flagsocket)
{
- char fmt[UINT_FMT] ;
- fmt[uint_fmt(fmt, a[i][j].fd)] = 0 ;
- if ((shutdown(a[i][j].fd, j) < 0) && (errno != ENOTSOCK) && (errno != ENOTCONN))
- strerr_warnwu4sys("shutdown fd ", fmt, " for ", j ? "writing" : "reading") ;
+ { .b = BUFFER_INIT(&buffer_read, 0, buf[0], BSIZE), .xindex = 5, .flagsocket = 0 },
+ { .b = BUFFER_INIT(&buffer_write, 7, buf[0], BSIZE), .xindex = 5, .flagsocket = 0 }
+ },
+ {
+ { .b = BUFFER_INIT(&buffer_read, 6, buf[1], BSIZE), .xindex = 5, .flagsocket = 0 },
+ { .b = BUFFER_INIT(&buffer_write, 1, buf[1], BSIZE), .xindex = 5, .flagsocket = 0 }
}
- fd_close(a[i][j].fd) ;
- a[i][j].flagopen = 0 ;
- a[i][j].xindex = 5 ;
-}
+} ;
+static iopause_fd x[5] = { [0] = { .fd = -1, .events = IOPAUSE_READ } } ;
-static inline void finishit (unsigned int i)
+static void closeit (unsigned int i, unsigned int j)
{
- closeit(i, 1) ;
- iobuffer_finish(&b[i]) ;
+ int fd = buffer_fd(&a[i][j].b) ;
+ if (a[i][j].flagsocket) fd_shutdown(fd, j) ;
+ fd_close(fd) ;
+ buffer_fd(&a[i][j].b) = -1 ;
+ a[i][j].xindex = 5 ;
}
static void handle_signals (void)
@@ -73,6 +71,20 @@ static void handle_signals (void)
}
}
+static int flushit (unsigned int i)
+{
+ int r = buffer_flush(&a[i][1].b) ;
+ a[i][0].b.c.p = a[i][1].b.c.p ; /* XXX: abstraction leak */
+ return r ;
+}
+
+static int fillit (unsigned int i)
+{
+ ssize_t r = sanitize_read(buffer_fill(&a[i][0].b)) ;
+ a[i][1].b.c.n = a[i][0].b.c.n ; /* XXX: abstraction leak */
+ return r >= 0 ;
+}
+
int main (int argc, char const *const *argv)
{
tain tto ;
@@ -92,22 +104,24 @@ int main (int argc, char const *const *argv)
case '6' : a[1][0].flagsocket = 1 ; break ;
case '7' : a[0][1].flagsocket = 1 ; break ;
case 't' : if (!uint0_scan(l.arg, &t)) dieusage() ; break ;
- case 'r' : if (!uint0_scan(l.arg, &a[1][0].fd)) dieusage() ; break ;
- case 'w' : if (!uint0_scan(l.arg, &a[0][1].fd)) dieusage() ; break ;
+ case 'r' : if (!int0_scan(l.arg, &buffer_fd(&a[1][0].b))) dieusage() ; break ;
+ case 'w' : if (!int0_scan(l.arg, &buffer_fd(&a[0][1].b))) dieusage() ; break ;
default : dieusage() ;
}
}
if (t) tain_from_millisecs(&tto, t) ; else tto = tain_infinite_relative ;
argc -= l.ind ; argv += l.ind ;
}
- if ((a[0][1].fd < 3) || (a[1][0].fd < 3)) dieusage() ;
+ if ((buffer_fd(&a[0][1].b) < 3) || (buffer_fd(&a[1][0].b) < 3)) dieusage() ;
for (i = 0 ; i < 2 ; i++)
- {
for (j = 0 ; j < 2 ; j++)
- if (ndelay_on(a[i][j].fd) == -1) strerr_diefu1sys(111, "ndelay_on") ;
- if (!iobuffer_init(&b[i], a[i][0].fd, a[i][1].fd)) strerr_diefu1sys(111, "iobuffer_init") ;
- }
- if (!sig_ignore(SIGPIPE)) strerr_diefu1sys(111, "sig_ignore") ;
+ if (ndelay_on(buffer_fd(&a[i][j].b)) == -1)
+ {
+ char fmt[INT_FMT] ;
+ fmt[int_fmt(fmt, buffer_fd(&a[i][j].b))] = 0 ;
+ strerr_diefu3sys(111, "set fd ", fmt, " non-blocking") ;
+ }
+ if (!sig_ignore(SIGPIPE)) strerr_diefu1sys(111, "ignore SIGPIPE") ;
tain_now_set_stopwatch_g() ;
x[0].fd = selfpipe_init() ;
if (x[0].fd < 0) strerr_diefu1sys(111, "selfpipe_init") ;
@@ -118,31 +132,32 @@ int main (int argc, char const *const *argv)
{
tain deadline ;
unsigned int xlen = 1 ;
- int r ;
+ tain_add_g(&deadline, buffer_isempty(&a[0][1].b) && buffer_isempty(&a[1][1].b) ? &tto : &tain_infinite_relative) ;
- tain_add_g(&deadline, iobuffer_isempty(&b[0]) && iobuffer_isempty(&b[1]) ? &tto : &tain_infinite_relative) ;
for (i = 0 ; i < 2 ; i++)
{
- if (a[i][0].flagopen && iobuffer_isreadable(&b[i]))
+ if (buffer_fd(&a[i][0].b) >= 0 && buffer_isreadable(&a[i][0].b))
{
- x[xlen].fd = a[i][0].fd ;
+ x[xlen].fd = buffer_fd(&a[i][0].b) ;
x[xlen].events = IOPAUSE_READ ;
a[i][0].xindex = xlen++ ;
}
else a[i][0].xindex = 5 ;
- if (a[i][1].flagopen)
+ if (buffer_fd(&a[i][1].b) >= 0)
{
- x[xlen].fd = a[i][1].fd ;
- x[xlen].events = IOPAUSE_EXCEPT | (iobuffer_isempty(&b[i]) ? 0 : IOPAUSE_WRITE) ;
+ x[xlen].fd = buffer_fd(&a[i][1].b) ;
+ x[xlen].events = IOPAUSE_EXCEPT | (buffer_iswritable(&a[i][1].b) ? IOPAUSE_WRITE : 0) ;
a[i][1].xindex = xlen++ ;
}
else a[i][1].xindex = 5 ;
}
- if (xlen == 1 || (xlen == 2 && (x[1].fd == a[0][0].fd || x[1].fd == a[1][0].fd))) break ;
+ if (xlen == 1) break ;
- r = iopause_g(x, xlen, &deadline) ;
- if (r < 0) strerr_diefu1sys(111, "iopause") ;
- else if (!r) return 1 ;
+ {
+ int r = iopause_g(x, xlen, &deadline) ;
+ if (r < 0) strerr_diefu1sys(111, "iopause") ;
+ else if (!r) return 1 ;
+ }
if (x[0].revents & IOPAUSE_READ) handle_signals() ;
@@ -151,27 +166,27 @@ int main (int argc, char const *const *argv)
int dead = 0 ;
if (x[a[i][1].xindex].revents & IOPAUSE_EXCEPT)
{
- if (!iobuffer_isempty(&b[i]))
+ if (!buffer_isempty(&a[i][1].b))
{
- char fmt[UINT_FMT] ;
- fmt[uint_fmt(fmt, a[i][1].fd)] = 0 ;
- iobuffer_flush(&b[i]) ; /* sets errno */
+ char fmt[INT_FMT] ;
+ fmt[int_fmt(fmt, buffer_fd(&a[i][1].b))] = 0 ;
+ flushit(i) ; /* sets errno */
strerr_warnwu2sys("write to fd ", fmt) ;
}
dead = 1 ;
}
else if (x[a[i][1].xindex].revents & IOPAUSE_WRITE)
{
- if (!iobuffer_flush(&b[i]))
+ if (!flushit(i))
{
if (!error_isagain(errno)) dead = 1 ;
}
- else if (!a[i][0].flagopen) dead = 1 ;
+ else if (buffer_fd(&a[i][0].b) == -1) dead = 1 ;
}
if (dead)
{
- if (!a[i][0].flagopen) closeit(i, 0) ;
- finishit(i) ;
+ if (buffer_fd(&a[i][0].b) >= 0) closeit(i, 0) ;
+ closeit(i, 1) ;
}
}
@@ -179,16 +194,16 @@ int main (int argc, char const *const *argv)
{
if (x[a[i][0].xindex].revents & (IOPAUSE_READ | IOPAUSE_EXCEPT))
{
- if (sanitize_read(iobuffer_fill(&b[i])) < 0)
+ if (!fillit(i))
{
if (errno != EPIPE)
{
- char fmt[UINT_FMT] ;
- fmt[uint_fmt(fmt, a[i][0].fd)] = 0 ;
+ char fmt[INT_FMT] ;
+ fmt[int_fmt(fmt, buffer_fd(&a[i][0].b))] = 0 ;
strerr_warnwu2sys("read from fd ", fmt) ;
}
closeit(i, 0) ;
- if (iobuffer_isempty(&b[i])) finishit(i) ;
+ if (buffer_isempty(&a[i][1].b)) closeit(i, 1) ;
}
}
}