=== modified file '.merlin' --- .merlin 2019-04-12 23:47:50 +0000 +++ .merlin 2019-04-11 14:01:44 +0000 @@ -1,10 +1,8 @@ S . B _build PKG core +PKG async PKG core_extended -PKG lwt -PKG lwt.unix -PKG lwt_ppx PKG ppx_let PKG ppx_deriving PKG ppx_deriving.show === modified file 'Makefile' --- Makefile 2019-04-12 23:47:50 +0000 +++ Makefile 2019-04-12 00:15:26 +0000 @@ -6,5 +6,5 @@ ppio.bc: subprocess.cmo ppio.cmo compile ./compile subprocess.cmo ppio.cmo -o ppio.bc -%.cmo: %.ml compile +%.cmx: %.ml compile ./compile -c $< === modified file 'compile' --- compile 2019-04-12 23:47:50 +0000 +++ compile 2019-04-12 00:15:26 +0000 @@ -2,13 +2,12 @@ set -x exec ocamlfind ocamlc \ -linkpkg \ + -package async \ + -package async_unix \ -package core \ -package base \ - -package lwt \ - -package lwt.unix \ - -package ppx_let \ -package ppx_deriving \ -package ppx_deriving.show \ - -package lwt_ppx \ + -package ppx_let \ -thread \ "$@" === modified file 'ppio.ml' --- ppio.ml 2019-04-12 23:47:50 +0000 +++ ppio.ml 2019-04-12 00:15:26 +0000 @@ -1,6 +1,7 @@ (* vim: fileencoding=utf8 ft=ocaml et sw=2 ts=4 sts=4 *) open Core +(* open Async *) let do_echo_prompt () = Out_channel.output_string stdout "prompt: "; @@ -58,59 +59,66 @@ (* Copy data from the reader to the writer, using the provided buffer as scratch space *) -let rec copy_blocks bufsize buffer r w cb () = - Lwt.bind (Lwt_unix.read r buffer 0 bufsize) (fun bytes_read -> - if bytes_read > 0 then ( - cb bytes_read buffer; - Lwt.bind - (Lwt_io.write_from_exactly w buffer 0 bytes_read) - (copy_blocks bufsize buffer r w cb) - ) else ( - cb 0 buffer; - Lwt_io.close w - )) +let rec copy_blocks buffer r w cb = + let open Async in + Reader.read r buffer + >>= function + | `Eof -> + cb 0 buffer; + Reader.close r + | `Ok bytes_read -> + cb bytes_read buffer; + Writer.write w (Bytes.to_string buffer) ~len:bytes_read; + Writer.flushed w + >>= fun () -> + copy_blocks buffer r w cb ;; -let copy_cb fd_in fd_out cb = - let bufsize = 16 * 1024 in +let copy_cb info fd_in fd_out cb = (* let buffer = Bytes.to_string (Bytes.create (16 * 1024)) in *) - let buffer = Bytes.create bufsize in - copy_blocks bufsize buffer fd_in (Lwt_io.of_fd Lwt_io.Output fd_out) cb () + let buffer = Bytes.create (16 * 1024) in + let create file_descr kind = + Async.Unix.Fd.create Fifo file_descr (Info.tag info ~tag:kind) + in + copy_blocks + buffer + (Async.Reader.create (create fd_in "reader")) + (Async.Writer.create (create fd_out "writer")) + cb ;; let cb_null (len:int) (buf:Bytes.t) = ();; let main {hideendl; endl; color; exe} = pe ((show_cmd_args {hideendl; endl; color; exe})^"\n"); - Lwt_unix.set_default_async_method Lwt_unix.Async_none; - let fd0 = Lwt_unix.stdin in - let fd1 = Lwt_unix.stdout in - let stdin_r, stdin_w = Lwt_unix.pipe_out () in - let stdout_r, stdout_w = Lwt_unix.pipe_in () in + let fd0 = Unix.File_descr.of_int 0 in + let fd1 = Unix.File_descr.of_int 1 in + let stdin_r, stdin_w = Unix.pipe () in + let stdout_r, stdout_w = Unix.pipe () in printf "pipes: (%d %d) (%d %d)\n%!" - (Unix.File_descr.to_int stdin_r) - (Unix.File_descr.to_int (Lwt_unix.unix_file_descr stdin_w)) - (Unix.File_descr.to_int (Lwt_unix.unix_file_descr stdout_r)) - (Unix.File_descr.to_int stdout_w) + (Core.Unix.File_descr.to_int stdin_r) + (Core.Unix.File_descr.to_int stdin_w) + (Core.Unix.File_descr.to_int stdout_r) + (Core.Unix.File_descr.to_int stdout_w) ; let pid = Subprocess.create_process ~redirects:[ - Lwt_unix.unix_file_descr fd0, Subprocess.Redirect stdin_r; - Lwt_unix.unix_file_descr fd1, Subprocess.Redirect stdout_w; - Lwt_unix.unix_file_descr stdin_w, Subprocess.Close; - Lwt_unix.unix_file_descr stdout_r, Subprocess.Close; + fd0, Subprocess.Redirect stdin_r; + fd1, Subprocess.Redirect stdout_w; + stdin_w, Subprocess.Close; + stdout_r, Subprocess.Close; ] (List.nth_exn exe 0) (List.to_array exe) in printf "pid: %d\n%!" pid; - Lwt_unix.set_blocking ~set_flags:true fd0 false; - Lwt_unix.set_blocking ~set_flags:true fd1 false; - Lwt_main.run (Lwt.join [ - copy_cb fd0 stdin_w cb_null; - copy_cb stdout_r fd1 cb_null; - ]); - let _, status = Subprocess.waitpid [] pid in - exit (Subprocess.wait_estatus status); + let open Async in + Deferred.all_unit [ + copy_cb (Info.of_string "stdin") fd0 stdin_w cb_null; + copy_cb (Info.of_string "stdout") stdout_r fd1 cb_null; + ] >>= (fun () -> + let _, status = Subprocess.waitpid [] pid in + exit (Subprocess.wait_estatus status); + ) ;; let cmd = @@ -120,7 +128,7 @@ | true -> Some value | false -> None) in - Command.basic ~summary:"Pretty print I/O to a spawned command." + Async.Command.async ~summary:"Pretty print I/O to a spawned command." [%map_open let hide = flag "-H" ~aliases:["--hide-newlines"] no_arg ~doc:"Suppress printing codes of the line-terminating sequence"