[ruby-oci8-commit] [447] trunk/ruby-oci8: run connection-cleanup functions in a native thread to correctly release connections in GC .

nobody at rubyforge.org nobody at rubyforge.org
Wed Aug 31 10:12:45 EDT 2011


Revision: 447
Author:   kubo
Date:     2011-08-31 10:12:45 -0400 (Wed, 31 Aug 2011)

Log Message:
-----------
run connection-cleanup functions in a native thread to correctly release connections in GC.

Modified Paths:
--------------
    trunk/ruby-oci8/ChangeLog
    trunk/ruby-oci8/ext/oci8/env.c
    trunk/ruby-oci8/ext/oci8/extconf.rb
    trunk/ruby-oci8/ext/oci8/oci8.c
    trunk/ruby-oci8/ext/oci8/oci8.h
    trunk/ruby-oci8/ext/oci8/oci8lib.c

Added Paths:
-----------
    trunk/ruby-oci8/ext/oci8/thread_util.c
    trunk/ruby-oci8/ext/oci8/thread_util.h

Modified: trunk/ruby-oci8/ChangeLog
===================================================================
--- trunk/ruby-oci8/ChangeLog	2011-08-27 13:21:15 UTC (rev 446)
+++ trunk/ruby-oci8/ChangeLog	2011-08-31 14:12:45 UTC (rev 447)
@@ -1,3 +1,9 @@
+2011-08-31  KUBO Takehiro  <kubo at jiubao.org>
+	* ext/oci8/env.c, ext/oci8/extconf.rb, ext/oci8/oci8.c, ext/oci8/oci8.h,
+	  ext/oci8/oci8lib.c, ext/oci8/thread_util.c, ext/oci8/thread_util.h:
+	    run connection-cleanup functions in a native thread to correctly
+	    release connections in GC.
+
 2011-08-27  KUBO Takehiro  <kubo at jiubao.org>
 	* ext/oci8/oci8.c, ext/oci8/oci8.h: refactoring for prior arrangement to
 	    properly release garbage sessions.

Modified: trunk/ruby-oci8/ext/oci8/env.c
===================================================================
--- trunk/ruby-oci8/ext/oci8/env.c	2011-08-27 13:21:15 UTC (rev 446)
+++ trunk/ruby-oci8/ext/oci8/env.c	2011-08-31 14:12:45 UTC (rev 447)
@@ -12,11 +12,7 @@
 #include <util.h>
 #endif
 
-#ifdef HAVE_RB_THREAD_BLOCKING_REGION
 ub4 oci8_env_mode = OCI_OBJECT | OCI_THREADED;
-#else
-ub4 oci8_env_mode = OCI_OBJECT;
-#endif
 
 OCIEnv *oci8_global_envhp;
 

Modified: trunk/ruby-oci8/ext/oci8/extconf.rb
===================================================================
--- trunk/ruby-oci8/ext/oci8/extconf.rb	2011-08-27 13:21:15 UTC (rev 446)
+++ trunk/ruby-oci8/ext/oci8/extconf.rb	2011-08-31 14:12:45 UTC (rev 447)
@@ -89,7 +89,7 @@
          "stmt.o", "bind.o", "metadata.o", "attr.o",
          "lob.o", "oradate.o",
          "ocinumber.o", "ocidatetime.o", "object.o", "apiwrap.o",
-         "encoding.o", "oranumber_util.o"]
+         "encoding.o", "oranumber_util.o", "thread_util.o"]
 
 if RUBY_PLATFORM =~ /mswin32|cygwin|mingw32|bccwin32/
   $defs << "-DUSE_WIN32_C"

Modified: trunk/ruby-oci8/ext/oci8/oci8.c
===================================================================
--- trunk/ruby-oci8/ext/oci8/oci8.c	2011-08-27 13:21:15 UTC (rev 446)
+++ trunk/ruby-oci8/ext/oci8/oci8.c	2011-08-31 14:12:45 UTC (rev 447)
@@ -6,6 +6,7 @@
  *
  */
 #include "oci8.h"
+#include <errno.h>
 #ifdef HAVE_UNISTD_H
 #include <unistd.h> /* getpid() */
 #endif
@@ -79,9 +80,21 @@
 static void oci8_svcctx_free(oci8_base_t *base)
 {
     oci8_svcctx_t *svcctx = (oci8_svcctx_t *)base;
-    if (svcctx->logoff_method != NULL) {
-        /* TODO: not to block GC. */
-        svcctx->logoff_method(svcctx);
+    if (svcctx->logoff_strategy != NULL) {
+        const oci8_logoff_strategy_t *strategy = svcctx->logoff_strategy;
+        void *data = strategy->prepare(svcctx);
+        int rv;
+        svcctx->base.type = 0;
+        svcctx->logoff_strategy = NULL;
+        rv = oci8_run_native_thread(strategy->execute, data);
+        if (rv != 0) {
+            errno = rv;
+#ifdef WIN32
+            rb_sys_fail("_beginthread");
+#else
+            rb_sys_fail("pthread_create");
+#endif
+        }
     }
 }
 
@@ -226,39 +239,101 @@
     return rb_ary_new3(4, user, pass, dbname, mode);
 }
 
-static void call_oci_logoff(oci8_svcctx_t *svcctx)
+/*
+ * Logoff strategy for sessions connected by OCILogon.
+ */
+typedef struct {
+    OCISvcCtx *svchp;
+    OCISession *usrhp;
+    OCIServer *srvhp;
+} simple_logoff_arg_t;
+
+static void *simple_logoff_prepare(oci8_svcctx_t *svcctx)
 {
-    svcctx->logoff_method = NULL;
-    oci_lc(OCILogoff_nb(svcctx, svcctx->base.hp.svc, oci8_errhp));
-    svcctx->base.type = 0;
+    simple_logoff_arg_t *sla = xmalloc(sizeof(simple_logoff_arg_t));
+    sla->svchp = svcctx->base.hp.svc;
+    sla->usrhp = svcctx->usrhp;
+    sla->srvhp = svcctx->srvhp;
+    svcctx->usrhp = NULL;
+    svcctx->srvhp = NULL;
+    return sla;
 }
 
-static void call_session_end(oci8_svcctx_t *svcctx)
+static VALUE simple_logoff_execute(void *arg)
 {
+    simple_logoff_arg_t *sla = (simple_logoff_arg_t *)arg;
+    OCIError *errhp = oci8_errhp;
+    sword rv;
+
+    OCITransRollback(sla->svchp, errhp, OCI_DEFAULT);
+    rv = OCILogoff(sla->svchp, errhp);
+    free(sla);
+    return (VALUE)rv;
+}
+
+static const oci8_logoff_strategy_t simple_logoff = {
+    simple_logoff_prepare,
+    simple_logoff_execute,
+};
+
+/*
+ * Logoff strategy for sessions connected by OCIServerAttach and OCISessionBegin.
+ */
+
+typedef struct {
+    OCISvcCtx *svchp;
+    OCISession *usrhp;
+    OCIServer *srvhp;
+    unsigned char state;
+} complex_logoff_arg_t;
+
+static void *complex_logoff_prepare(oci8_svcctx_t *svcctx)
+{
+    complex_logoff_arg_t *cla = xmalloc(sizeof(complex_logoff_arg_t));
+    cla->svchp = svcctx->base.hp.svc;
+    cla->usrhp = svcctx->usrhp;
+    cla->srvhp = svcctx->srvhp;
+    cla->state = svcctx->state;
+    svcctx->usrhp = NULL;
+    svcctx->srvhp = NULL;
+    svcctx->state = 0;
+    return cla;
+}
+
+static VALUE complex_logoff_execute(void *arg)
+{
+    complex_logoff_arg_t *cla = (complex_logoff_arg_t *)arg;
+    OCIError *errhp = oci8_errhp;
     sword rv = OCI_SUCCESS;
 
-    if (svcctx->state & OCI8_STATE_SESSION_BEGIN_WAS_CALLED) {
-        rv = OCISessionEnd_nb(svcctx, svcctx->base.hp.svc, oci8_errhp, svcctx->usrhp, OCI_DEFAULT);
-        svcctx->state &= ~OCI8_STATE_SESSION_BEGIN_WAS_CALLED;
+    OCITransRollback(cla->svchp, errhp, OCI_DEFAULT);
+
+    if (cla->state & OCI8_STATE_SESSION_BEGIN_WAS_CALLED) {
+        rv = OCISessionEnd(cla->svchp, oci8_errhp, cla->usrhp, OCI_DEFAULT);
+        cla->state &= ~OCI8_STATE_SESSION_BEGIN_WAS_CALLED;
     }
-    if (svcctx->state & OCI8_STATE_SERVER_ATTACH_WAS_CALLED) {
-        rv = OCIServerDetach_nb(svcctx, svcctx->srvhp, oci8_errhp, OCI_DEFAULT);
-        svcctx->state &= ~OCI8_STATE_SERVER_ATTACH_WAS_CALLED;
+    if (cla->state & OCI8_STATE_SERVER_ATTACH_WAS_CALLED) {
+        rv = OCIServerDetach(cla->srvhp, oci8_errhp, OCI_DEFAULT);
+        cla->state &= ~OCI8_STATE_SERVER_ATTACH_WAS_CALLED;
     }
-    if (svcctx->usrhp != NULL) {
-        OCIHandleFree(svcctx->usrhp, OCI_HTYPE_SESSION);
-        svcctx->usrhp = NULL;
+    if (cla->usrhp != NULL) {
+        OCIHandleFree(cla->usrhp, OCI_HTYPE_SESSION);
     }
-    if (svcctx->srvhp != NULL) {
-        OCIHandleFree(svcctx->srvhp, OCI_HTYPE_SERVER);
-        svcctx->srvhp = NULL;
+    if (cla->srvhp != NULL) {
+        OCIHandleFree(cla->srvhp, OCI_HTYPE_SERVER);
     }
-    svcctx->logoff_method = NULL;
-    if (rv != OCI_SUCCESS) {
-        oci8_raise(oci8_errhp, rv, NULL);
+    if (cla->svchp != NULL) {
+        OCIHandleFree(cla->svchp, OCI_HTYPE_SVCCTX);
     }
+    free(cla);
+    return (VALUE)rv;
 }
 
+static const oci8_logoff_strategy_t complex_logoff = {
+    complex_logoff_prepare,
+    complex_logoff_execute,
+};
+
 /*
  * call-seq:
  *   logon(username, password, dbname) -> connection
@@ -271,7 +346,7 @@
 {
     oci8_svcctx_t *svcctx = DATA_PTR(self);
 
-    if (svcctx->logoff_method != NULL) {
+    if (svcctx->logoff_strategy != NULL) {
         rb_raise(rb_eRuntimeError, "Could not reuse the session.");
     }
 
@@ -289,7 +364,7 @@
                        NIL_P(dbname) ? NULL : RSTRING_ORATEXT(dbname),
                        NIL_P(dbname) ? 0 : RSTRING_LEN(dbname)));
     svcctx->base.type = OCI_HTYPE_SVCCTX;
-    svcctx->logoff_method = call_oci_logoff;
+    svcctx->logoff_strategy = &simple_logoff;
 
     /* setup the session handle */
     oci_lc(OCIAttrGet(svcctx->base.hp.ptr, OCI_HTYPE_SVCCTX, &svcctx->usrhp, 0, OCI_ATTR_SESSION, oci8_errhp));
@@ -316,10 +391,10 @@
     oci8_svcctx_t *svcctx = DATA_PTR(self);
     sword rv;
 
-    if (svcctx->logoff_method != NULL) {
+    if (svcctx->logoff_strategy != NULL) {
         rb_raise(rb_eRuntimeError, "Could not reuse the session.");
     }
-    svcctx->logoff_method = call_session_end;
+    svcctx->logoff_strategy = &complex_logoff;
     svcctx->state = 0;
 
     /* allocate a service context handle */
@@ -380,7 +455,7 @@
 {
     oci8_svcctx_t *svcctx = oci8_get_svcctx(self);
 
-    if (svcctx->logoff_method != call_session_end) {
+    if (svcctx->logoff_strategy != &complex_logoff) {
         rb_raise(rb_eRuntimeError, "Use this method only for the service context handle created by OCI8#server_handle().");
     }
     if (svcctx->state & OCI8_STATE_SERVER_ATTACH_WAS_CALLED) {
@@ -417,7 +492,7 @@
 {
     oci8_svcctx_t *svcctx = DATA_PTR(self);
 
-    if (svcctx->logoff_method != call_session_end) {
+    if (svcctx->logoff_strategy != &complex_logoff) {
         rb_raise(rb_eRuntimeError, "Use this method only for the service context handle created by OCI8#server_handle().");
     }
     if (svcctx->state & OCI8_STATE_SESSION_BEGIN_WAS_CALLED) {
@@ -453,9 +528,12 @@
     while (svcctx->base.children != NULL) {
         oci8_base_free(svcctx->base.children);
     }
-    if (svcctx->logoff_method != NULL) {
-        oci_lc(OCITransRollback_nb(svcctx, svcctx->base.hp.svc, oci8_errhp, OCI_DEFAULT));
-        svcctx->logoff_method(svcctx);
+    if (svcctx->logoff_strategy != NULL) {
+        const oci8_logoff_strategy_t *strategy = svcctx->logoff_strategy;
+        void *data = strategy->prepare(svcctx);
+        svcctx->base.type = 0;
+        svcctx->logoff_strategy = NULL;
+        oci_lc(oci8_blocking_region(svcctx, strategy->execute, data));
     }
     return Qtrue;
 }
@@ -519,10 +597,7 @@
 #else
     sb1 non_blocking;
 
-    if (svcctx->server->hp.srvhp == NULL) {
-        oci_lc(OCIAttrGet(svcctx->base.hp.ptr, OCI_HTYPE_SVCCTX, &svcctx->server->hp.srvhp, 0, OCI_ATTR_SERVER, oci8_errhp));
-    }
-    oci_lc(OCIAttrGet(svcctx->server->hp.srvhp, OCI_HTYPE_SERVER, &non_blocking, 0, OCI_ATTR_NONBLOCKING_MODE, oci8_errhp));
+    oci_lc(OCIAttrGet(svcctx->srvhp, OCI_HTYPE_SERVER, &non_blocking, 0, OCI_ATTR_NONBLOCKING_MODE, oci8_errhp));
     return non_blocking ? Qtrue : Qfalse;
 #endif
 }
@@ -572,13 +647,10 @@
 #else
     sb1 non_blocking;
 
-    if (svcctx->server->hp.srvhp == NULL) {
-        oci_lc(OCIAttrGet(svcctx->base.hp.ptr, OCI_HTYPE_SVCCTX, &svcctx->server->hp.srvhp, 0, OCI_ATTR_SERVER, oci8_errhp));
-    }
-    oci_lc(OCIAttrGet(svcctx->server->hp.srvhp, OCI_HTYPE_SERVER, &non_blocking, 0, OCI_ATTR_NONBLOCKING_MODE, oci8_errhp));
+    oci_lc(OCIAttrGet(svcctx->srvhp, OCI_HTYPE_SERVER, &non_blocking, 0, OCI_ATTR_NONBLOCKING_MODE, oci8_errhp));
     if ((RTEST(val) && !non_blocking) || (!RTEST(val) && non_blocking)) {
         /* toggle blocking / non-blocking. */
-        oci_lc(OCIAttrSet(svcctx->server->hp.srvhp, OCI_HTYPE_SERVER, 0, 0, OCI_ATTR_NONBLOCKING_MODE, oci8_errhp));
+        oci_lc(OCIAttrSet(svcctx->srvhp, OCI_HTYPE_SERVER, 0, 0, OCI_ATTR_NONBLOCKING_MODE, oci8_errhp));
     }
 #endif
     return val;

Modified: trunk/ruby-oci8/ext/oci8/oci8.h
===================================================================
--- trunk/ruby-oci8/ext/oci8/oci8.h	2011-08-27 13:21:15 UTC (rev 446)
+++ trunk/ruby-oci8/ext/oci8/oci8.h	2011-08-31 14:12:45 UTC (rev 447)
@@ -300,11 +300,12 @@
     } u;
 };
 
+typedef struct oci8_logoff_strategy oci8_logoff_strategy_t;
 
 typedef struct oci8_svcctx {
     oci8_base_t base;
     volatile VALUE executing_thread;
-    void (*logoff_method)(struct oci8_svcctx *svcctx);
+    const oci8_logoff_strategy_t *logoff_strategy;
     OCISession *usrhp;
     OCIServer *srvhp;
     rb_pid_t pid;
@@ -316,6 +317,11 @@
     VALUE long_read_len;
 } oci8_svcctx_t;
 
+struct oci8_logoff_strategy {
+    void *(*prepare)(oci8_svcctx_t *svcctx);
+    rb_blocking_function_t *execute;
+};
+
 typedef struct {
     dvoid *hp; /* OCIBind* or OCIDefine* */
     dvoid *valuep;
@@ -540,6 +546,7 @@
 #define OCI8SafeStringValue(v) SafeStringValue(v)
 #endif
 
+#include "thread_util.h"
 #include "apiwrap.h"
 
 #endif

Modified: trunk/ruby-oci8/ext/oci8/oci8lib.c
===================================================================
--- trunk/ruby-oci8/ext/oci8/oci8lib.c	2011-08-27 13:21:15 UTC (rev 446)
+++ trunk/ruby-oci8/ext/oci8/oci8lib.c	2011-08-31 14:12:45 UTC (rev 447)
@@ -1,6 +1,6 @@
 /* -*- c-file-style: "ruby"; indent-tabs-mode: nil -*- */
 /*
- * Copyright (C) 2002-2009 KUBO Takehiro <kubo at jiubao.org>
+ * Copyright (C) 2002-2011 KUBO Takehiro <kubo at jiubao.org>
  */
 
 #include "oci8.h"
@@ -90,6 +90,7 @@
     rb_set_end_proc(at_exit_func, Qnil);
 #endif
 
+    Init_oci8_thread_util();
     Init_oci8_error();
     Init_oci8_env();
 

Added: trunk/ruby-oci8/ext/oci8/thread_util.c
===================================================================
--- trunk/ruby-oci8/ext/oci8/thread_util.c	                        (rev 0)
+++ trunk/ruby-oci8/ext/oci8/thread_util.c	2011-08-31 14:12:45 UTC (rev 447)
@@ -0,0 +1,81 @@
+/* -*- c-file-style: "ruby"; indent-tabs-mode: nil -*- */
+/*
+ * thread_util.c - part of ruby-oci8
+ *
+ * Copyright (C) 2011 KUBO Takehiro <kubo at jiubao.org>
+ */
+#include "oci8.h"
+#include <errno.h>
+
+#ifndef WIN32
+#include <pthread.h>
+static pthread_attr_t detached_thread_attr;
+#endif
+
+typedef struct {
+    rb_blocking_function_t *func;
+    void *arg;
+} adapter_arg_t;
+
+void Init_oci8_thread_util(void)
+{
+#ifndef WIN32
+    pthread_attr_init(&detached_thread_attr);
+    pthread_attr_setdetachstate(&detached_thread_attr, PTHREAD_CREATE_DETACHED);
+#endif
+}
+
+#ifdef WIN32
+
+static void __cdecl adapter(void *arg)
+{
+    adapter_arg_t *aa = (adapter_arg_t *)arg;
+    aa->func(aa->arg);
+    free(aa);
+}
+
+int oci8_run_native_thread(rb_blocking_function_t func, void *arg)
+{
+    adapter_arg_t *aa = malloc(sizeof(adapter_arg_t));
+    if (aa == NULL) {
+        return ENOMEM;
+    }
+
+    aa->func = func;
+    aa->arg = arg;
+    if (_beginthread(adapter, 0, aa) == (uintptr_t)-1L) {
+        int err = errno;
+        free(aa);
+        return err;
+    }
+    return 0;
+}
+
+#else
+
+static void *adapter(void *arg)
+{
+    adapter_arg_t *aa = (adapter_arg_t *)arg;
+    aa->func(aa->arg);
+    free(aa);
+    return NULL;
+}
+
+int oci8_run_native_thread(rb_blocking_function_t func, void *arg)
+{
+    pthread_t thread;
+    adapter_arg_t *aa = malloc(sizeof(adapter_arg_t));
+    int rv;
+    if (aa == NULL) {
+        return ENOMEM;
+    }
+
+    aa->func = func;
+    aa->arg = arg;
+    rv = pthread_create(&thread, &detached_thread_attr, adapter, aa);
+    if (rv != 0) {
+        free(aa);
+    }
+    return rv;
+}
+#endif

Added: trunk/ruby-oci8/ext/oci8/thread_util.h
===================================================================
--- trunk/ruby-oci8/ext/oci8/thread_util.h	                        (rev 0)
+++ trunk/ruby-oci8/ext/oci8/thread_util.h	2011-08-31 14:12:45 UTC (rev 447)
@@ -0,0 +1,21 @@
+/* -*- c-file-style: "ruby"; indent-tabs-mode: nil -*- */
+/*
+ * thread_util.h - part of ruby-oci8
+ *
+ * Copyright (C) 2011 KUBO Takehiro <kubo at jiubao.org>
+ */
+#ifndef NATIVE_THREAD_H
+
+/*
+ * Prepare to execute thread-related functions.
+ */
+void Init_oci8_thread_util(void);
+
+/*
+ * Run the func in a new native thread.
+ * Don't call any ruby functions in the func.
+ * The return value is errno.
+ */
+int oci8_run_native_thread(rb_blocking_function_t func, void *arg);
+
+#endif




More information about the ruby-oci8-commit mailing list