[PATCH (geoip)] use IO.pread from the io-extra lib if possible

Clifford Heath clifford.heath at gmail.com
Thu Nov 12 15:50:05 EST 2009


Patch applied and pushed as 0.8.6, available on gemcutter.org

Clifford Heath.

On 07/11/2009, at 5:24 AM, Eric Wong wrote:

> I sent this to the author of geoip a few days ago and haven't heard
> back, yet.  Since he may be on vacation or busy, somebody may also hit
> this problem in the mean time, I figured I'd post the patch + git
> repo I sent him here.
>
> Unicorn users:
>
>  The pread(2)/pwrite(2) syscalls are very useful for making libraries
>  like this one (that rely on an on-disk database) compatible with
>  preload_app.  TokyoCabinet is a great example of a library that's
>  already compatible with Unicorn+preload_app out-of-the-box.
>
> ----- Forwarded message from Eric Wong <normalperson at yhbt.net> -----
>
> From: Eric Wong <normalperson at yhbt.net>
> To: Clifford Heath <clifford.heath at gmail.com>
> Subject: [PATCH (geoip)] use IO.pread from the io-extra lib if  
> possible
>
> Hi Clifford,
>
> One user of Unicorn[1] with the "preload_app" feature ran into  
> problems
> with the open file descriptor being shared across multiple processes.
>
> I've only lightly tested this, but it seems to work.  Of course
> I'll send you updates in case I notice anything broken.
>
> I've also pushed the below patch up to git://yhbt.net/geoip.git
> in case you'd rather "git pull" than "git am".
>
>
> Full disclosure: I'm also a contributor to io-extra, but I've kept
> the dependency optional in case folks can't install or use io-extra
> because of an unsupported OS.
>
>
> PS: btw, might want to update the http://geoip.rubyforge.org
>    site to point to the up-to-date repository :)
>
> [1] - http://unicorn.bogomips.org/
>
> From ec75c0fc83e22a4c8c90a896b51291ef01773d79 Mon Sep 17 00:00:00 2001
> From: Eric Wong <normalperson at yhbt.net>
> Date: Tue, 3 Nov 2009 13:06:06 -0800
> Subject: [PATCH] use IO.pread from the io-extra lib if possible
>
> This allows the objects created in one process to be used
> concurrently with forked child processes in addition to multiple
> threads, as the pread(2) system calls are designed for
> concurrent operations across the board on POSIX systems).  This
> is useful in cases when a master process forks off several child
> processes to serve queries.  In case io-extra is not available
> or not installable, then we'll fall back on using a Mutex and at
> least still get thread-safety.
> ---
> lib/geoip.rb |   51 ++++++++++++++++++++++++++++++ 
> +--------------------
> 1 files changed, 31 insertions(+), 20 deletions(-)
>
> diff --git a/lib/geoip.rb b/lib/geoip.rb
> index 4a9b4c2..6fce6dd 100644
> --- a/lib/geoip.rb
> +++ b/lib/geoip.rb
> @@ -43,6 +43,11 @@ $:.unshift File.dirname(__FILE__)
> #=end
> require 'thread'  # Needed for Mutex
> require 'socket'
> +begin
> +    require 'io/extra' # for IO.pread
> +rescue LoadError
> +    # oh well, hope they're not forking after initializing
> +end
>
> class GeoIP
>     VERSION = "0.8.4"
> @@ -429,7 +434,7 @@ class GeoIP
>     # +filename+ is a String holding the path to the GeoIP.dat file
>     # +options+ is an integer holding caching flags (unimplemented)
>     def initialize(filename, flags = 0)
> -        @mutex = Mutex.new
> +        @mutex = IO.respond_to?(:pread) ? false : Mutex.new
>         @flags = flags
>         @databaseType = GEOIP_COUNTRY_EDITION
>         @record_length = STANDARD_RECORD_LENGTH
> @@ -530,11 +535,9 @@ class GeoIP
>     private
>
>     def read_city(pos, hostname = '', ip = '')
> -        record = ""
> -        @mutex.synchronize {
> -            @file.seek(pos + (2*@record_length-1) *  
> @databaseSegments[0])
> -            return nil unless record = @file.read(FULL_RECORD_LENGTH)
> -        }
> +        off = pos + (2*@record_length-1) * @databaseSegments[0]
> +        record = atomic_read(FULL_RECORD_LENGTH, off)
> +        return nil unless record && record.size == FULL_RECORD_LENGTH
>
>         # The country code is the first byte:
>         code = record[0]
> @@ -655,11 +658,8 @@ class GeoIP
>             throw "Invalid GeoIP database type, can't look up  
> Organization/ISP by IP"
>         end
>         pos = seek_record(ipnum);
> -        record = ""
> -        @mutex.synchronize {
> -            @file.seek(pos + (2*@record_length-1) *  
> @databaseSegments[0])
> -            record = @file.read(MAX_ORG_RECORD_LENGTH)
> -        }
> +        off = pos + (2*@record_length-1) * @databaseSegments[0]
> +        record = atomic_read(MAX_ORG_RECORD_LENGTH, off)
>         record = record.sub(/\000.*/n, '')
>         record
>     end
> @@ -686,11 +686,8 @@ class GeoIP
>             throw "Invalid GeoIP database type, can't look up ASN by  
> IP"
>         end
>         pos = seek_record(ipnum);
> -        record = ""
> -        @mutex.synchronize {
> -          @file.seek(pos + (2*@record_length-1) *  
> @databaseSegments[0])
> -          record = @file.read(MAX_ASN_RECORD_LENGTH)
> -        }
> +        off = pos + (2*@record_length-1) * @databaseSegments[0]
> +        record = atomic_read(MAX_ASN_RECORD_LENGTH, off)
>         record = record.sub(/\000.*/n, '')
>
>         if record =~ /^(AS\d+)\s(.*)$/
> @@ -739,10 +736,8 @@ class GeoIP
>         offset = 0
>         mask = 0x80000000
>         31.downto(0) { |depth|
> -            buf = @mutex.synchronize {
> -                @file.seek(@record_length * 2 * offset);
> -                @file.read(@record_length * 2);
> -            }
> +            off = @record_length * 2 * offset
> +            buf = atomic_read(@record_length * 2, off)
>             buf.slice!(0... at record_length) if ((ipnum & mask) != 0)
>             offset = le_to_ui(buf[0... at record_length].unpack("C*"))
>             return offset if (offset >= @databaseSegments[0])
> @@ -761,6 +756,22 @@ class GeoIP
>     def le_to_ui(s)
>         be_to_ui(s.reverse)
>     end
> +
> +    # reads +length+ bytes from +offset+ as atomically as possible
> +    # if IO.pread is available, it'll use that (making it both  
> multithread
> +    # and multiprocess-safe).  Otherwise we'll use a mutex to  
> synchronize
> +    # access (only providing protection against multiple threads,  
> but not
> +    # file descriptors shared across multiple processes).
> +    def atomic_read(length, offset)
> +        if @mutex
> +            @mutex.synchronize {
> +                @file.seek(offset)
> +                @file.read(length)
> +            }
> +        else
> +            IO.pread(@file.fileno, length, offset)
> +        end
> +    end
> end
>
> if $0 == __FILE__
> -- 
> Eric Wong
>
> ----- End forwarded message -----



More information about the mongrel-unicorn mailing list