=== modified file 'ppio.nim' --- ppio.nim 2019-04-04 17:35:39 +0000 +++ ppio.nim 2019-04-01 18:02:44 +0000 @@ -1,8 +1,6 @@ # vim: fileencoding=utf8 ft=nim et sw=2 ts=2 sts=2 import os import posix -import asyncfutures -import asyncdispatch const ERR_PERM = 100 @@ -83,79 +81,6 @@ quit(exitcode) -proc copy_data( - fd_from:cint, - fd_to:cint, - # pp_cb:proc(data:string) -): Future[void] = - var flags: cint - - flags = posix.fcntl(fd_from, posix.F_GETFL) - if flags == -1: libc_fatal("fcntl():") - if posix.fcntl(fd_from, posix.F_SETFL, flags or posix.O_NONBLOCK) == -1: - libc_fatal("fcntl():") - - flags = posix.fcntl(fd_to, posix.F_GETFL) - if flags == -1: libc_fatal("fcntl():") - if posix.fcntl(fd_to, posix.F_SETFL, flags or posix.O_NONBLOCK) == -1: - libc_fatal("fcntl():") - - var - future_result = newFuture[void]("copy_data") - buf_size = 4096 # we could query posix.PC_PIPE_BUF, but this is common - buffer = newString(buf_size) - buf_written = 0 - - # Mutually referential function declaration - proc read_cb(fd: AsyncFD): bool {.gcsafe.} - proc write_cb(fd: AsyncFD): bool {.gcsafe.} - - proc read_cb(fd: AsyncFD): bool {.gcsafe.} = - let res = posix.read(fd_from, addr buffer[0], buf_size) - if res < 0: - let lastError = osLastError() - if lastError.int32 in {EINTR, EWOULDBLOCK, EAGAIN}: - return true # try again - else: - future_result.fail(newException(OSError, osErrorMsg(lastError))) - elif res == 0: - # End of "file" - if posix.close(fd_to) != 0: - let lastError = osLastError() - future_result.fail(newException(OSError, osErrorMsg(lastError))) - else: - future_result.complete() - else: - buffer.setLen(res) - asyncdispatch.addWrite(fd_to.AsyncFD, write_cb) - return false - - - proc write_cb(fd: AsyncFD): bool {.gcsafe.} = - let remaining = buffer.len - buf_written - var d = cast[cstring](buffer) - let res = posix.write(fd_to, addr d[buf_written], remaining.cint) - if res < 0: - let lastError = osLastError() - if lastError.int32 in {EINTR, EWOULDBLOCK, EAGAIN}: - return true # try again - else: - future_result.fail(newException(OSError, osErrorMsg(lastError))) - return false - else: - buf_written.inc(res) - if res < remaining: - return true # more to write - else: - asyncdispatch.addRead(fd_from.AsyncFD, read_cb) - buf_written = 0 - buffer.setLen(buf_size) - return false - - asyncdispatch.addRead(fd_from.AsyncFD, read_cb) - return future_result - - proc main(): void = var args = posixParseArgs() write(stderr, "args: " & repr(args) & "\n") @@ -176,7 +101,7 @@ pipeout: array[0 .. 1, cint] if posix.pipe(pipein) != 0: libc_fatal("pipe():") - if posix.pipe(pipeout) != 0: libc_fatal("pipe():") + if posix.pipe(pipein) != 0: libc_fatal("pipe():") var pid = fork() case pid @@ -196,30 +121,13 @@ quit(ERR_TEMP) else: # Parent process - echo "pipein: " & repr(pipein) if close(pipein[0]) != 0: libc_fatal("close():") - echo "pipeout: " & repr(pipein) if close(pipeout[1]) != 0: libc_fatal("close():") # input: 0 -> pipein[1] # output: pipeout[0] -> 1 # TODO: IO loop here - var - rw_stdin: Future[void] = copy_data(0, pipein[1]) - rw_stdout: Future[void] = copy_data(pipeout[0], 1) - - asyncdispatch.waitFor(rw_stdin or rw_stdout) - if rw_stdin.failed: - rw_stdin.read # re-reaise - if rw_stdout.failed: - rw_stdout.read # re-reaise - - asyncdispatch.waitFor(rw_stdin and rw_stdout) - if rw_stdin.failed: - rw_stdin.read # re-reaise - if rw_stdout.failed: - rw_stdout.read # re-reaise var status: cint