Fwd: [PATCH] preliminary implementation of "smart_nopush"

Eric Wong normalperson at yhbt.net
Thu Jan 27 22:51:42 EST 2011


This kgio change is mainly targeted at Rainbows! users using
keepalive, so I might as well forward it here.

----- Forwarded message from Eric Wong <normalperson at yhbt.net> -----

From: Eric Wong <normalperson at yhbt.net>
To: kgio at librelist.org
Subject: [PATCH] preliminary implementation of "smart_nopush"
Message-ID: <20110128034856.GA10919 at dcvr.yhbt.net>

I just pushed this out for Linux users.  It's intended for use
with Rainbows! and sites that serve small response bodies
(e.g. http://bogomips.org/ and http://yhbt.net/ :)

I think I'll actually try it on my server later or tomorrow and stop
using nginx entirely :>

>From 910f6f3df099c04fcd55bd6b20785cce69cb36ae Mon Sep 17 00:00:00 2001
From: Eric Wong <normalperson at yhbt.net>
Date: Thu, 27 Jan 2011 19:43:39 -0800
Subject: [PATCH] preliminary implementation of "smart_nopush"

It only supports TCP_CORK under Linux right now.

We use a very basic strategy to use TCP_CORK semantics optimally
in most TCP servers:  On corked sockets, we will uncork on recv()
if there was a previous send().  Otherwise we do not fiddle
with TCP_CORK at all.

Under Linux, we can rely on TCP_CORK being inherited in an
accept()-ed client socket so we can avoid syscalls for each
accept()-ed client if we already know the accept() socket corks.

This module does NOTHING for client TCP sockets, we only deal
with accept()-ed sockets right now.
---
 ext/kgio/accept.c         |   15 +++-
 ext/kgio/kgio.h           |    5 ++
 ext/kgio/kgio_ext.c       |    1 +
 ext/kgio/nopush.c         |  167 +++++++++++++++++++++++++++++++++++++++++++++
 ext/kgio/read_write.c     |    3 +
 kgio.gemspec              |    1 +
 test/test_nopush_smart.rb |  110 +++++++++++++++++++++++++++++
 7 files changed, 299 insertions(+), 3 deletions(-)
 create mode 100644 ext/kgio/nopush.c
 create mode 100644 test/test_nopush_smart.rb

diff --git a/ext/kgio/accept.c b/ext/kgio/accept.c
index 66c2712..a147fec 100644
--- a/ext/kgio/accept.c
+++ b/ext/kgio/accept.c
@@ -133,14 +133,21 @@ static VALUE acceptor(int argc, const VALUE *argv)
 	rb_raise(rb_eArgError, "wrong number of arguments (%d for 1)", argc);
 }
 
+#if defined(__linux__)
+#  define post_accept kgio_nopush_accept
+#else
+#  define post_accept(a,b,c,d) for(;0;)
+#endif
+
 static VALUE
-my_accept(VALUE io, VALUE klass,
+my_accept(VALUE accept_io, VALUE klass,
           struct sockaddr *addr, socklen_t *addrlen, int nonblock)
 {
 	int client;
+	VALUE client_io;
 	struct accept_args a;
 
-	a.fd = my_fileno(io);
+	a.fd = my_fileno(accept_io);
 	a.addr = addr;
 	a.addrlen = addrlen;
 retry:
@@ -175,7 +182,9 @@ retry:
 			rb_sys_fail("accept");
 		}
 	}
-	return sock_for_fd(klass, client);
+	client_io = sock_for_fd(klass, client);
+	post_accept(accept_io, client_io, a.fd, client);
+	return client_io;
 }
 
 static void in_addr_set(VALUE io, struct sockaddr_in *addr)
diff --git a/ext/kgio/kgio.h b/ext/kgio/kgio.h
index dc270e6..cf117b6 100644
--- a/ext/kgio/kgio.h
+++ b/ext/kgio/kgio.h
@@ -33,6 +33,11 @@ void init_kgio_wait(void);
 void init_kgio_read_write(void);
 void init_kgio_accept(void);
 void init_kgio_connect(void);
+void init_kgio_nopush(void);
+
+void kgio_nopush_accept(VALUE, VALUE, int, int);
+void kgio_nopush_recv(VALUE, int);
+void kgio_nopush_send(VALUE, int);
 
 VALUE kgio_call_wait_writable(VALUE io);
 VALUE kgio_call_wait_readable(VALUE io);
diff --git a/ext/kgio/kgio_ext.c b/ext/kgio/kgio_ext.c
index 0a457ff..1ebdaae 100644
--- a/ext/kgio/kgio_ext.c
+++ b/ext/kgio/kgio_ext.c
@@ -6,4 +6,5 @@ void Init_kgio_ext(void)
 	init_kgio_read_write();
 	init_kgio_connect();
 	init_kgio_accept();
+	init_kgio_nopush();
 }
diff --git a/ext/kgio/nopush.c b/ext/kgio/nopush.c
new file mode 100644
index 0000000..c8a7619
--- /dev/null
+++ b/ext/kgio/nopush.c
@@ -0,0 +1,167 @@
+/*
+ * We use a very basic strategy to use TCP_CORK semantics optimally
+ * in most TCP servers:  On corked sockets, we will uncork on recv()
+ * if there was a previous send().  Otherwise we do not fiddle
+ * with TCP_CORK at all.
+ *
+ * Under Linux, we can rely on TCP_CORK being inherited in an
+ * accept()-ed client socket so we can avoid syscalls for each
+ * accept()-ed client if we know the accept() socket corks.
+ *
+ * This module does NOTHING for client TCP sockets, we only deal
+ * with accept()-ed sockets right now.
+ */
+
+#include "kgio.h"
+
+enum nopush_state {
+	NOPUSH_STATE_IGNORE = -1,
+	NOPUSH_STATE_WRITER = 0,
+	NOPUSH_STATE_WRITTEN = 1,
+	NOPUSH_STATE_ACCEPTOR = 2
+};
+
+struct nopush_socket {
+	VALUE io;
+	enum nopush_state state;
+};
+
+static int enabled;
+static long capa;
+static struct nopush_socket *active;
+
+static void set_acceptor_state(struct nopush_socket *nps, int fd);
+static void flush_pending_data(int fd);
+
+static void grow(int fd)
+{
+	long new_capa = fd + 64;
+	size_t size;
+
+	assert(new_capa > capa && "grow()-ing for low fd");
+	size = new_capa * sizeof(struct nopush_socket);
+	active = xrealloc(active, size);
+
+	while (capa < new_capa) {
+		struct nopush_socket *nps = &active[capa++];
+
+		nps->io = Qnil;
+		nps->state = NOPUSH_STATE_IGNORE;
+	}
+}
+
+static VALUE s_get_nopush_smart(VALUE self)
+{
+	return enabled ? Qtrue : Qfalse;
+}
+
+static VALUE s_set_nopush_smart(VALUE self, VALUE val)
+{
+	enabled = RTEST(val);
+
+	return val;
+}
+
+void init_kgio_nopush(void)
+{
+	VALUE m = rb_define_module("Kgio");
+
+	rb_define_singleton_method(m, "nopush_smart?", s_get_nopush_smart, 0);
+	rb_define_singleton_method(m, "nopush_smart=", s_set_nopush_smart, 1);
+}
+
+/*
+ * called after a successful write, just mark that we've put something
+ * in the skb and will need to uncork on the next write.
+ */
+void kgio_nopush_send(VALUE io, int fd)
+{
+	struct nopush_socket *nps;
+
+	if (fd >= capa) return;
+	nps = &active[fd];
+	if (nps->io == io && nps->state == NOPUSH_STATE_WRITER)
+		nps->state = NOPUSH_STATE_WRITTEN;
+}
+
+/* called on successful accept() */
+void kgio_nopush_accept(VALUE accept_io, VALUE io, int accept_fd, int fd)
+{
+	struct nopush_socket *accept_nps, *client_nps;
+
+	if (!enabled)
+		return;
+	assert(fd >= 0 && "client_fd negative");
+	assert(accept_fd >= 0 && "accept_fd negative");
+	if (fd >= capa || accept_fd >= capa)
+		grow(fd > accept_fd ? fd : accept_fd);
+
+	accept_nps = &active[accept_fd];
+
+	if (accept_nps->io != accept_io) {
+		accept_nps->io = accept_io;
+		set_acceptor_state(accept_nps, fd);
+	}
+	client_nps = &active[fd];
+	client_nps->io = io;
+	if (accept_nps->state == NOPUSH_STATE_ACCEPTOR)
+		client_nps->state = NOPUSH_STATE_WRITER;
+	else
+		client_nps->state = NOPUSH_STATE_IGNORE;
+}
+
+void kgio_nopush_recv(VALUE io, int fd)
+{
+	struct nopush_socket *nps;
+
+	if (fd >= capa)
+		return;
+
+	nps = &active[fd];
+	if (nps->io != io || nps->state != NOPUSH_STATE_WRITTEN)
+		return;
+
+	/* reset internal state and flush corked buffers */
+	nps->state = NOPUSH_STATE_WRITER;
+	if (enabled)
+		flush_pending_data(fd);
+}
+
+#ifdef __linux__
+#include <netinet/tcp.h>
+static void set_acceptor_state(struct nopush_socket *nps, int fd)
+{
+	int corked = 0;
+	socklen_t optlen = sizeof(int);
+
+	if (getsockopt(fd, SOL_TCP, TCP_CORK, &corked, &optlen) != 0) {
+		if (errno != EOPNOTSUPP)
+			rb_sys_fail("getsockopt(SOL_TCP, TCP_CORK)");
+		errno = 0;
+		nps->state = NOPUSH_STATE_IGNORE;
+	} else if (corked) {
+		nps->state = NOPUSH_STATE_ACCEPTOR;
+	} else {
+		nps->state = NOPUSH_STATE_IGNORE;
+	}
+}
+
+/*
+ * checks to see if we've written anything since the last recv()
+ * If we have, uncork the socket and immediately recork it.
+ */
+static void flush_pending_data(int fd)
+{
+	int optval = 0;
+	socklen_t optlen = sizeof(int);
+
+	if (setsockopt(fd, SOL_TCP, TCP_CORK, &optval, optlen) != 0)
+		rb_sys_fail("setsockopt(SOL_TCP, TCP_CORK, 0)");
+	/* immediately recork */
+	optval = 1;
+	if (setsockopt(fd, SOL_TCP, TCP_CORK, &optval, optlen) != 0)
+		rb_sys_fail("setsockopt(SOL_TCP, TCP_CORK, 1)");
+}
+/* TODO: add FreeBSD support */
+
+#endif /* linux */
diff --git a/ext/kgio/read_write.c b/ext/kgio/read_write.c
index 7ba2925..a954865 100644
--- a/ext/kgio/read_write.c
+++ b/ext/kgio/read_write.c
@@ -164,6 +164,7 @@ static VALUE my_recv(int io_wait, int argc, VALUE *argv, VALUE io)
 	long n;
 
 	prepare_read(&a, argc, argv, io);
+	kgio_nopush_recv(io, a.fd);
 
 	if (a.len > 0) {
 retry:
@@ -320,6 +321,8 @@ retry:
 	n = (long)send(a.fd, a.ptr, a.len, MSG_DONTWAIT);
 	if (write_check(&a, n, "send", io_wait) != 0)
 		goto retry;
+	if (TYPE(a.buf) != T_SYMBOL)
+		kgio_nopush_send(io, a.fd);
 	return a.buf;
 }
 
diff --git a/kgio.gemspec b/kgio.gemspec
index ef523b5..96b9e02 100644
--- a/kgio.gemspec
+++ b/kgio.gemspec
@@ -22,6 +22,7 @@ Gem::Specification.new do |s|
   s.extensions = %w(ext/kgio/extconf.rb)
 
   s.add_development_dependency('wrongdoc', '~> 1.4')
+  s.add_development_dependency('strace_me', '~> 1.0')
 
   # s.license = %w(LGPL) # disabled for compatibility with older RubyGems
 end
diff --git a/test/test_nopush_smart.rb b/test/test_nopush_smart.rb
new file mode 100644
index 0000000..6d4a698
--- /dev/null
+++ b/test/test_nopush_smart.rb
@@ -0,0 +1,110 @@
+require 'tempfile'
+require 'test/unit'
+RUBY_PLATFORM =~ /linux/ and require 'strace'
+$-w = true
+require 'kgio'
+
+class TestNoPushSmart < Test::Unit::TestCase
+  TCP_CORK = 3
+
+  def setup
+    Kgio.nopush_smart = false
+    assert_equal false, Kgio.nopush_smart?
+
+    @host = ENV["TEST_HOST"] || '127.0.0.1'
+    @srv = Kgio::TCPServer.new(@host, 0)
+    assert_nothing_raised {
+      @srv.setsockopt(Socket::SOL_TCP, TCP_CORK, 1)
+    } if RUBY_PLATFORM =~ /linux/
+    @port = @srv.addr[1]
+  end
+
+  def test_nopush_smart_true_unix
+    Kgio.nopush_smart = true
+    tmp = Tempfile.new('kgio_unix')
+    @path = tmp.path
+    File.unlink(@path)
+    tmp.close rescue nil
+    @srv = Kgio::UNIXServer.new(@path)
+    @rd = Kgio::UNIXSocket.new(@path)
+    io, err = Strace.me { @wr = @srv.kgio_accept }
+    assert_nil err
+    rc = nil
+    io, err = Strace.me {
+      @wr.kgio_write "HI\n"
+      rc = @wr.kgio_tryread 666
+    }
+    assert_nil err
+    lines = io.readlines
+    assert lines.grep(/TCP_CORK/).empty?, lines.inspect
+    assert_equal :wait_readable, rc
+  ensure
+    File.unlink(@path) rescue nil
+  end
+
+  def test_nopush_smart_false
+    Kgio.nopush_smart = nil
+    assert_equal false, Kgio.nopush_smart?
+
+    @wr = Kgio::TCPSocket.new(@host, @port)
+    io, err = Strace.me { @rd = @srv.kgio_accept }
+    assert_nil err
+    lines = io.readlines
+    assert lines.grep(/TCP_CORK/).empty?, lines.inspect
+    assert_equal 1, @rd.getsockopt(Socket::SOL_TCP, TCP_CORK).unpack("i")[0]
+
+    rbuf = "..."
+    t0 = Time.now
+    @rd.kgio_write "HI\n"
+    @wr.kgio_read(3, rbuf)
+    diff = Time.now - t0
+    assert(diff >= 0.200, "TCP_CORK broken? diff=#{diff} > 200ms")
+    assert_equal "HI\n", rbuf
+  end if RUBY_PLATFORM =~ /linux/
+
+  def test_nopush_smart_true
+    Kgio.nopush_smart = true
+    assert_equal true, Kgio.nopush_smart?
+    @wr = Kgio::TCPSocket.new(@host, @port)
+    io, err = Strace.me { @rd = @srv.kgio_accept }
+    assert_nil err
+    lines = io.readlines
+    assert_equal 1, lines.grep(/TCP_CORK/).size, lines.inspect
+    assert_equal 1, @rd.getsockopt(Socket::SOL_TCP, TCP_CORK).unpack("i")[0]
+
+    @wr.write "HI\n"
+    rbuf = ""
+    io, err = Strace.me { @rd.kgio_read(3, rbuf) }
+    assert_nil err
+    lines = io.readlines
+    assert lines.grep(/TCP_CORK/).empty?, lines.inspect
+    assert_equal "HI\n", rbuf
+
+    t0 = Time.now
+    @rd.kgio_write "HI2U2\n"
+    @rd.kgio_write "HOW\n"
+    rc = false
+    io, err = Strace.me { rc = @rd.kgio_tryread(666) }
+    @wr.readpartial(666, rbuf)
+    rbuf == "HI2U2\nHOW\n" or warn "rbuf=#{rbuf.inspect} looking bad?"
+    diff = Time.now - t0
+    assert(diff < 0.200, "time diff=#{diff} >= 200ms")
+    assert_equal :wait_readable, rc
+    assert_nil err
+    lines = io.readlines
+    assert_equal 2, lines.grep(/TCP_CORK/).size, lines.inspect
+    assert_nothing_raised { @wr.close }
+    assert_nothing_raised { @rd.close }
+
+    @wr = Kgio::TCPSocket.new(@host, @port)
+    io, err = Strace.me { @rd = @srv.kgio_accept }
+    assert_nil err
+    lines = io.readlines
+    assert lines.grep(/TCP_CORK/).empty?, "optimization fail: #{lines.inspect}"
+    assert_equal 1, @rd.getsockopt(Socket::SOL_TCP, TCP_CORK).unpack("i")[0]
+  end if RUBY_PLATFORM =~ /linux/
+
+  def teardown
+    Kgio.nopush_smart = false
+  end
+end
-- 
Eric Wong

----- End forwarded message -----


More information about the rainbows-talk mailing list