Browse Source

pass handles via pipes

Aurel Bílý 6 years ago
parent
commit
211e6ee224
4 changed files with 158 additions and 52 deletions
  1. 10 1
      libs/uv/uv.ml
  2. 66 6
      libs/uv/uv_stubs.c
  3. 1 0
      src/macro/eval/evalHash.ml
  4. 81 45
      src/macro/eval/evalStdLib.ml

+ 10 - 1
libs/uv/uv.ml

@@ -246,9 +246,10 @@ external timer_stop : t_timer -> unit_cb -> unit uv_result = "w_timer_stop"
 type process_cb = (int * int) uv_result -> unit
 
 type process_io =
-	| UvIoPipe of bool * bool * t_stream
 	| UvIoIgnore
 	| UvIoInherit
+	| UvIoPipe of bool * bool * t_stream
+	| UvIoIpc of t_stream
 
 external spawn : t_loop -> process_cb -> string -> string array -> string array -> string -> int -> process_io array -> int -> int -> t_process uv_result = "w_spawn_bytecode" "w_spawn"
 external process_kill : t_process -> int -> unit uv_result = "w_process_kill"
@@ -256,9 +257,17 @@ external process_get_pid : t_process -> int = "w_process_get_pid"
 
 (* ------------- PIPES ---------------------------------------------- *)
 
+type pipe_accepted =
+	| UvPipe of t_pipe
+	| UvTcp of t_tcp
+
 external pipe_init : t_loop -> bool -> t_pipe uv_result = "w_pipe_init"
+external pipe_open : t_pipe -> int -> unit uv_result = "w_pipe_open"
 external pipe_accept : t_loop -> t_pipe -> t_pipe uv_result = "w_pipe_accept"
 external pipe_bind_ipc : t_pipe -> string -> unit uv_result = "w_pipe_bind_ipc"
 external pipe_connect_ipc : t_pipe -> string -> unit_cb -> unit uv_result = "w_pipe_connect_ipc"
+external pipe_write_handle : t_pipe -> bytes -> t_stream -> unit_cb -> unit uv_result = "w_pipe_write_handle"
+external pipe_pending_count : t_pipe -> int = "w_pipe_pending_count"
+external pipe_accept_pending : t_loop -> t_pipe -> pipe_accepted uv_result = "w_pipe_accept_pending"
 external pipe_getsockname : t_pipe -> string uv_result = "w_pipe_getsockname"
 external pipe_getpeername : t_pipe -> string uv_result = "w_pipe_getpeername"

+ 66 - 6
libs/uv/uv_stubs.c

@@ -1122,12 +1122,20 @@ CAMLprim value w_spawn(value loop, value cb, value file, value args, value env,
 					break;
 			}
 		} else {
-			stdio_u[i].flags = UV_CREATE_PIPE;
-			if (Bool_val(Field(stdio_entry, 0)))
-				stdio_u[i].flags |= UV_READABLE_PIPE;
-			if (Bool_val(Field(stdio_entry, 1)))
-				stdio_u[i].flags |= UV_WRITABLE_PIPE;
-			stdio_u[i].data.stream = Stream_val(Field(stdio_entry, 2));
+			switch (Tag_val(stdio_entry)) {
+				case 0: // Pipe
+					stdio_u[i].flags = UV_CREATE_PIPE;
+					if (Bool_val(Field(stdio_entry, 0)))
+						stdio_u[i].flags |= UV_READABLE_PIPE;
+					if (Bool_val(Field(stdio_entry, 1)))
+						stdio_u[i].flags |= UV_WRITABLE_PIPE;
+					stdio_u[i].data.stream = Stream_val(Field(stdio_entry, 2));
+					break;
+				default: // 1, Ipc
+					stdio_u[i].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_WRITABLE_PIPE;
+					stdio_u[i].data.stream = Stream_val(Field(stdio_entry, 0));
+					break;
+			}
 		}
 	}
 	uv_process_options_t options = {
@@ -1174,6 +1182,12 @@ CAMLprim value w_pipe_init(value loop, value ipc) {
 	UV_SUCCESS(handle);
 }
 
+CAMLprim value w_pipe_open(value pipe, value fd) {
+	CAMLparam2(pipe, fd);
+	UV_ERROR_CHECK(uv_pipe_open(Pipe_val(pipe), Int_val(fd)));
+	UV_SUCCESS_UNIT;
+}
+
 CAMLprim value w_pipe_accept(value loop, value server) {
 	CAMLparam2(loop, server);
 	UV_ALLOC_CHECK(client, uv_pipe_t);
@@ -1200,6 +1214,42 @@ CAMLprim value w_pipe_connect_ipc(value handle, value path, value cb) {
 	UV_SUCCESS_UNIT;
 }
 
+CAMLprim value w_pipe_pending_count(value handle) {
+	CAMLparam1(handle);
+	CAMLreturn(Val_int(uv_pipe_pending_count(Pipe_val(handle))));
+}
+
+CAMLprim value w_pipe_accept_pending(value loop, value handle) {
+	CAMLparam2(loop, handle);
+	CAMLlocal1(ret);
+	switch (uv_pipe_pending_type(Pipe_val(handle))) {
+		case UV_NAMED_PIPE: {
+			ret = caml_alloc(1, 0);
+			UV_ALLOC_CHECK(client, uv_pipe_t);
+			UV_ERROR_CHECK_C(uv_pipe_init(Loop_val(loop), Pipe_val(client), 0), free(Pipe_val(client)));
+			UV_HANDLE_DATA(Pipe_val(client)) = alloc_data_pipe();
+			if (UV_HANDLE_DATA(Pipe_val(client)) == NULL)
+				UV_ERROR(0);
+			UV_ERROR_CHECK_C(uv_accept(Stream_val(handle), Stream_val(client)), free(Pipe_val(client)));
+			Store_field(ret, 0, client);
+		}; break;
+		case UV_TCP: {
+			ret = caml_alloc(1, 1);
+			UV_ALLOC_CHECK(client, uv_pipe_t);
+			UV_ERROR_CHECK_C(uv_tcp_init(Loop_val(loop), Tcp_val(client)), free(Tcp_val(client)));
+			UV_HANDLE_DATA(Tcp_val(client)) = alloc_data_tcp(Val_unit, Val_unit);
+			if (UV_HANDLE_DATA(Tcp_val(client)) == NULL)
+				UV_ERROR(0);
+			UV_ERROR_CHECK_C(uv_accept(Stream_val(handle), Stream_val(client)), free(Tcp_val(client)));
+			Store_field(ret, 0, client);
+		}; break;
+		default:
+			UV_ERROR(0);
+			break;
+	}
+	UV_SUCCESS(ret);
+}
+
 CAMLprim value w_pipe_getsockname(value handle) {
 	CAMLparam1(handle);
 	char path[256];
@@ -1217,3 +1267,13 @@ CAMLprim value w_pipe_getpeername(value handle) {
 	path[path_size] = 0;
 	UV_SUCCESS(caml_copy_string(path));
 }
+
+CAMLprim value w_pipe_write_handle(value handle, value data, value send_handle, value cb) {
+	CAMLparam4(handle, data, send_handle, cb);
+	UV_ALLOC_CHECK(req, uv_write_t);
+	UV_REQ_DATA(Write_val(req)) = (void *)cb;
+	caml_register_global_root(UV_REQ_DATA_A(Write_val(req)));
+	uv_buf_t buf = uv_buf_init(&Byte(data, 0), caml_string_length(data));
+	UV_ERROR_CHECK_C(uv_write2(Write_val(req), Stream_val(handle), &buf, 1, Stream_val(send_handle), (void (*)(uv_write_t *, int))handle_stream_cb), free(Write_val(req)));
+	UV_SUCCESS_UNIT;
+}

+ 1 - 0
src/macro/eval/evalHash.ml

@@ -157,3 +157,4 @@ let key_eval_uv_Process = hash "eval.uv.Process"
 let key_sys_FileWatcherEvent = hash "sys.FileWatcherEvent"
 let key_eval_uv_Pipe = hash "eval.uv.Pipe"
 let key_eval_uv_Stream = hash "eval.uv.Stream"
+let key_eval_uv_PipeAccept = hash "eval.uv.PipeAccept"

+ 81 - 45
src/macro/eval/evalStdLib.ml

@@ -3741,48 +3741,6 @@ module StdUv = struct
 		let unref = wrap_unref this
 	end
 
-	module Pipe = struct
-		let this vthis = match vthis with
-			| VInstance {ikind = IUv (UvPipe t)} -> t
-			| v -> unexpected_value v "UvPipe"
-		let new_ = (fun vl ->
-			match vl with
-				| [] ->
-					let pipe = wrap_sync (Uv.pipe_init (loop ()) false) in
-					encode_instance key_eval_uv_Pipe ~kind:(IUv (UvPipe pipe))
-				| _ -> assert false
-		)
-		let accept = vifun0 (fun vthis ->
-			let this = this vthis in
-			let res = wrap_sync (Uv.pipe_accept (loop ()) this) in
-			encode_instance key_eval_uv_Pipe ~kind:(IUv (UvPipe res))
-		)
-		let bindIpc = vifun1 (fun vthis path ->
-			let this = this vthis in
-			let path = decode_string path in
-			Uv.pipe_bind_ipc this path;
-			vnull
-		)
-		let connectIpc = vifun2 (fun vthis path cb ->
-			let this = this vthis in
-			let path = decode_string path in
-			wrap_sync (Uv.pipe_connect_ipc this path (wrap_cb_unit cb));
-			vnull
-		)
-		let getName fn = vifun0 (fun vthis ->
-			let this = this vthis in
-			let path = wrap_sync (fn this) in
-			encode_enum_value key_nusys_net_SocketAddress 1 [|encode_string path|] None
-		)
-		let getSockName = getName Uv.pipe_getsockname
-		let getPeerName = getName Uv.pipe_getpeername
-		let asStream = vifun0 (fun vthis ->
-			let this = this vthis in
-			let stream = Uv.stream_of_handle this in
-			encode_instance key_eval_uv_Stream ~kind:(IUv (UvStream stream))
-		)
-	end
-
 	module Stream = struct
 		let this vthis = match vthis with
 			| VInstance {ikind = IUv (UvStream t)} -> t
@@ -3827,6 +3785,77 @@ module StdUv = struct
 		let unref = wrap_unref this
 	end
 
+	module Pipe = struct
+		let this vthis = match vthis with
+			| VInstance {ikind = IUv (UvPipe t)} -> t
+			| v -> unexpected_value v "UvPipe"
+		let new_ = (fun vl ->
+			match vl with
+				| [ipc] ->
+					let ipc = decode_bool ipc in
+					let pipe = wrap_sync (Uv.pipe_init (loop ()) ipc) in
+					encode_instance key_eval_uv_Pipe ~kind:(IUv (UvPipe pipe))
+				| _ -> assert false
+		)
+		let open_ = vifun1 (fun vthis fd ->
+			let this = this vthis in
+			let fd = decode_int fd in
+			wrap_sync (Uv.pipe_open this fd);
+			vnull
+		)
+		let accept = vifun0 (fun vthis ->
+			let this = this vthis in
+			let res = wrap_sync (Uv.pipe_accept (loop ()) this) in
+			encode_instance key_eval_uv_Pipe ~kind:(IUv (UvPipe res))
+		)
+		let bindIpc = vifun1 (fun vthis path ->
+			let this = this vthis in
+			let path = decode_string path in
+			wrap_sync (Uv.pipe_bind_ipc this path);
+			vnull
+		)
+		let connectIpc = vifun2 (fun vthis path cb ->
+			let this = this vthis in
+			let path = decode_string path in
+			wrap_sync (Uv.pipe_connect_ipc this path (wrap_cb_unit cb));
+			vnull
+		)
+		let writeHandle = vifun3 (fun vthis data handle cb ->
+			let this = this vthis in
+			let data = decode_bytes data in
+			let handle = Stream.this handle in
+			wrap_sync (Uv.pipe_write_handle this data handle (wrap_cb_unit cb));
+			vnull
+		)
+		let pendingCount = vifun0 (fun vthis ->
+			let this = this vthis in
+			vint (Uv.pipe_pending_count this)
+		)
+		let acceptPending = vifun0 (fun vthis ->
+			let this = this vthis in
+			let accepted = wrap_sync (Uv.pipe_accept_pending (loop ()) this) in
+			match accepted with
+				| UvTcp v ->
+					let handle = encode_instance key_eval_uv_Socket ~kind:(IUv (UvTcp v)) in
+					encode_enum_value key_eval_uv_PipeAccept 0 [|handle|] None
+				| UvPipe v ->
+					let handle = encode_instance key_eval_uv_Pipe ~kind:(IUv (UvPipe v)) in
+					encode_enum_value key_eval_uv_PipeAccept 1 [|handle|] None
+		)
+		let getName fn = vifun0 (fun vthis ->
+			let this = this vthis in
+			let path = wrap_sync (fn this) in
+			encode_enum_value key_nusys_net_SocketAddress 1 [|encode_string path|] None
+		)
+		let getSockName = getName Uv.pipe_getsockname
+		let getPeerName = getName Uv.pipe_getpeername
+		let asStream = vifun0 (fun vthis ->
+			let this = this vthis in
+			let stream = Uv.stream_of_handle this in
+			encode_instance key_eval_uv_Stream ~kind:(IUv (UvStream stream))
+		)
+	end
+
 	module Process = struct
 		let this vthis = match vthis with
 			| VInstance {ikind = IUv (UvProcess t)} -> t
@@ -3840,13 +3869,16 @@ module StdUv = struct
 					let cwd = decode_string cwd in
 					let flags = decode_int flags in
 					let stdio = Array.of_list (List.map (fun stdio -> match (decode_enum stdio) with
-							| 0, [readable; writable; pipe] ->
+							| 0, [] -> Uv.UvIoIgnore
+							| 1, [] -> Uv.UvIoInherit
+							| 2, [readable; writable; pipe] ->
 								let readable = decode_bool readable in
 								let writable = decode_bool writable in
 								let pipe = Stream.this pipe in
 								Uv.UvIoPipe (readable, writable, pipe)
-							| 1, [] -> Uv.UvIoIgnore
-							| 2, [] -> Uv.UvIoInherit
+							| 3, [pipe] ->
+								let pipe = Stream.this pipe in
+								Uv.UvIoIpc pipe
 							| _ -> assert false
 						) (decode_array stdio)) in
 					let uid = decode_int uid in
@@ -4627,9 +4659,13 @@ let init_standard_library builtins =
 		"unref",StdUv.Process.unref;
 	];
 	init_fields builtins (["eval";"uv"],"Pipe") [] [
+		"open",StdUv.Pipe.open_;
 		"accept",StdUv.Pipe.accept;
 		"bindIpc",StdUv.Pipe.bindIpc;
 		"connectIpc",StdUv.Pipe.connectIpc;
+		"writeHandle",StdUv.Pipe.writeHandle;
+		"pendingCount",StdUv.Pipe.pendingCount;
+		"acceptPending",StdUv.Pipe.acceptPending;
 		"getSockName",StdUv.Pipe.getSockName;
 		"getPeerName",StdUv.Pipe.getPeerName;
 		"asStream",StdUv.Pipe.asStream;