[Win32utils-devel] rb_w32_select function patch
Heesob Park
phasis at gmail.com
Fri May 2 01:21:24 EDT 2008
Hi,
2008/4/30 Berger, Daniel <Daniel.Berger at qwest.com>:
>
> Excellent, thanks!
>
> With this in place I'm wondering how open3 could be implemented to work
> on Windows without fork and without resorting to a C extension. Or if
> IO.popen needs to be reworked. Any thoughts?
>
Before considering open3, there is a remain issue of writing on pipe
blocking problem.
Blocking on write pipe will block whole application.
Here is a test code:
readPipe, writePipe = IO.pipe
t = Thread.new { sleep 5; while true; sleep 0.1; puts "got
#{readPipe.readline.length} bytes"; end }
i = 1
while true
i += 1
sleep 1
puts "hello from main"
if i>3
writePipe.puts "a"*2048
end
end
t.join
I have modified rb_w32_select using
NtQueryInformationFile,FilePipeLocalInformation to detect or prevent
blocking of pipe write.
Here is the latest patch:
--- win32.c.org 2008-04-29 14:44:41.000000000 +0900
+++ win32.c 2008-05-02 14:08:10.000000000 +0900
@@ -2046,6 +2050,114 @@
return fileset->fd_count;
}
+static HANDLE main_select_event=NULL;
+static HANDLE *select_thread_list=NULL;
+
+static DWORD WINAPI
+select_read_thread(PVOID argp)
+{
+ HANDLE fd_handle = (HANDLE)argp;
+ DWORD ret = 1;
+ DWORD mode = 0;
+
+ GetConsoleMode(fd_handle,&mode);
+ if(mode) {
+ /* Console Input */
+ DWORD num_events,num_events_read,pre_num=0;
+ INPUT_RECORD *input_record;
+ int i;
+
+ while(1) {
+ if(WaitForSingleObject(main_select_event,0)!=WAIT_TIMEOUT) break;
+ GetNumberOfConsoleInputEvents(fd_handle, &num_events);
+ if(pre_num != num_events) {
+ input_record = (INPUT_RECORD
*)malloc(sizeof(INPUT_RECORD)*num_events);
+ PeekConsoleInput(fd_handle, input_record, num_events,
&num_events_read);
+ for(i=0;i<num_events;i++) {
+
if(input_record[i].Event.KeyEvent.uChar.AsciiChar) return ret;
+ if ((input_record[i].EventType == KEY_EVENT) &&
input_record[i].Event.KeyEvent.bKeyDown) {
+
if(input_record[i].Event.KeyEvent.uChar.AsciiChar==13) { /* carriage
return */
+ free(input_record);
+ return ret;
+ }
+ else
if(input_record[i].Event.KeyEvent.uChar.AsciiChar==26) { /* ^Z - end
of file */
+ free(input_record);
+ return ret;
+ }
+ }
+ }
+ free(input_record);
+ pre_num = num_events;
+ }
+ }
+ return ret;
+ } else {
+ DWORD state;
+ ret = GetNamedPipeHandleState(fd_handle,&state,NULL,NULL,NULL,NULL,0);
+ if(ret) {
+ /* Pipe Input */
+ int bytes_avail=0;
+ while(1) {
+
if(WaitForSingleObject(main_select_event,0)!=WAIT_TIMEOUT) break;
+ ret = PeekNamedPipe(fd_handle,NULL,0,NULL,&bytes_avail,NULL);
+ if(bytes_avail>0) {
+ return bytes_avail;
+ }
+ }
+ }
+ return ret;
+ }
+ return ret;
+}
+
+typedef DWORD (WINAPI *PNtQueryInformationFile)(HANDLE, PVOID, PVOID,
DWORD, DWORD );
+#define FilePipeLocalInformation 24
+typedef struct _FILE_PIPE_LOCAL_INFORMATION {
+ ULONG NamedPipeType;
+ ULONG NamedPipeConfiguration;
+ ULONG MaximumInstances;
+ ULONG CurrentInstances;
+ ULONG InboundQuota;
+ ULONG ReadDataAvailable;
+ ULONG OutboundQuota;
+ ULONG WriteQuotaAvailable;
+ ULONG NamedPipeState;
+ ULONG NamedPipeEnd;
+} FILE_PIPE_LOCAL_INFORMATION, *PFILE_PIPE_LOCAL_INFORMATION;
+static PNtQueryInformationFile NtQueryInformationFile=NULL;
+
+static DWORD WINAPI
+select_write_thread(PVOID argp)
+{
+ HANDLE fd_handle = (HANDLE)argp;
+ DWORD ret = 1;
+ DWORD mode = 0;
+
+ GetConsoleMode(fd_handle,&mode);
+ if(mode) {
+ /* Console Output */
+ return ret;
+ } else {
+ DWORD state;
+ ret = GetNamedPipeHandleState(fd_handle,&state,NULL,NULL,NULL,NULL,0);
+ if(ret) {
+ /* Pipe output */
+ int bytes_written=0;
+ int bytes_avail=0;
+ DWORD iob[2];
+ FILE_PIPE_LOCAL_INFORMATION fpli = {0};
+ if(NtQueryInformationFile) {
+ while(1) {
+ if(WaitForSingleObject(main_select_event,0)!=WAIT_TIMEOUT) break;
+ ret = NtQueryInformationFile(fd_handle,iob,&fpli,sizeof(fpli),FilePipeLocalInformation);
+ if(fpli.OutboundQuota==fpli.WriteQuotaAvailable) return ret;
+ }
+ }
+ }
+ }
+ return ret;
+}
+
long
rb_w32_select (int nfds, fd_set *rd, fd_set *wr, fd_set *ex,
struct timeval *timeout)
@@ -2074,6 +2187,86 @@
file_nfds += extract_file_fd(wr, &file_wr);
if (file_nfds)
{
+ DWORD val,exit_code;
+ int i;
+ if(!NtQueryInformationFile)
+ NtQueryInformationFile = (PNtQueryInformationFile)
+
GetProcAddress(GetModuleHandle("ntdll.dll"),"NtQueryInformationFile"
);
+ main_select_event = CreateEvent(NULL,TRUE,FALSE,NULL);
+ if(main_select_event == NULL)
+ {
+ printf("CreateEvent failed (%d)\n", GetLastError());
+ return -1;
+ }
+ select_thread_list = malloc(sizeof(HANDLE)*(file_nfds+1));
+ for(i=0; i<file_nfds; i++)
+ {
+ if(i<file_rd.fd_count) {
+ select_thread_list[i] = CreateThread(NULL,0,select_read_thread,
+ (PVOID)file_rd.fd_array[i],0,&val);
+ }
+ else {
+ select_thread_list[i] = CreateThread(NULL,0,select_write_thread,
+ (PVOID)file_wr.fd_array[i-file_rd.fd_count],0,&val);
+ }
+ if (select_thread_list[i] == NULL)
+ {
+ printf("CreateThread failed (%d)\n", GetLastError());
+ return -1;
+ }
+ }
+
+ select_thread_list[file_nfds] = interrupted_event;
+ if(timeout)
+ r = WaitForMultipleObjects(file_nfds+1,select_thread_list,FALSE,
+ timeout->tv_sec * 1000 + timeout->tv_usec / 1000);
+ else
+ r = WaitForMultipleObjects(file_nfds+1,select_thread_list,FALSE,
+ INFINITE);
+
+ if (!SetEvent(main_select_event) )
+ {
+ printf("SetEvent failed (%d)\n", GetLastError());
+ return -1;
+ }
+ /* thread cleanup */
+ for(i=0;i<file_nfds;i++) {
+ while(1) {
+ exit_code = 0;
+ GetExitCodeThread(select_thread_list[i],&exit_code);
+ if(exit_code!=STILL_ACTIVE) {
+ CloseHandle(select_thread_list[i]);
+ break;
+ }
+ }
+ }
+ free(select_thread_list);
+ CloseHandle(main_select_event);
+
+ if(r==WAIT_TIMEOUT) { /* timeout */
+ FD_ZERO(&file_rd);
+ FD_ZERO(&file_wr);
+ if(rd) *rd = file_rd;
+ if(wr) *wr = file_wr;
+ return 0;
+ } else if((r -= WAIT_OBJECT_0) == file_nfds) { /* interrupt */
+ FD_ZERO(&file_rd);
+ FD_ZERO(&file_wr);
+ if(rd) *rd = file_rd;
+ if(wr) *wr = file_wr;
+ return -1;
+ } else if(r<file_rd.fd_count) { /* read ready */
+ FD_ZERO(&file_wr);
+ if(rd) *rd = file_rd;
+ if(wr) *wr = file_wr;
+ return 1;
+ } else { /* write ready */
+ FD_ZERO(&file_rd);
+ if(rd) *rd = file_rd;
+ if(wr) *wr = file_wr;
+ return 1;
+ }
+
// assume normal files are always readable/writable
// fake read/write fd_set and return value
if (rd) *rd = file_rd;
Regards,
Park Heesob
More information about the win32utils-devel
mailing list