[Archipelago-submits] [210] trunk/archipelago: made Dump#insert! check given timestamp to see if new chunks should be added.

nobody at rubyforge.org nobody at rubyforge.org
Fri Feb 9 12:53:28 EST 2007


Revision: 210
Author:   zond
Date:     2007-02-09 12:53:27 -0500 (Fri, 09 Feb 2007)

Log Message:
-----------
made Dump#insert! check given timestamp to see if new chunks should be added. Made Sanitation#[]= add timestamps. Added Sanitation#redistribute that reuses timestamps.

Modified Paths:
--------------
    trunk/archipelago/doc/dump maintenance.dia
    trunk/archipelago/lib/archipelago/dump.rb
    trunk/archipelago/lib/archipelago/sanitation.rb

Modified: trunk/archipelago/doc/dump maintenance.dia
===================================================================
(Binary files differ)

Modified: trunk/archipelago/lib/archipelago/dump.rb
===================================================================
--- trunk/archipelago/lib/archipelago/dump.rb	2007-02-09 11:06:40 UTC (rev 209)
+++ trunk/archipelago/lib/archipelago/dump.rb	2007-02-09 17:53:27 UTC (rev 210)
@@ -79,12 +79,25 @@
         yield
       end
 
-      def insert!(key, values)
-        @db.env.begin(BDB::TXN_COMMIT, @db) do |txn, db|
-          db.delete(key)
+      def insert!(key, values, timestamp = "")
+        if (duplicates = @db.duplicates(key)).empty?
           values.each do |value|
-            db[key] = value
+            @db[key] = timestamp + value
           end
+        else
+          my_timestamp = duplicates.first[0...4]
+          if timestamp != my_timestamp || duplicates.size > values.size
+            @db.env.begin(BDB::TXN_COMMIT, @db) do |txn, db|
+              db.delete(key)
+              values.each do |value|
+                db[key] = timestamp + value
+              end
+            end
+          else
+            values[0...(values.size - duplicates.size)].each do |value|
+              @db[key] = timestamp + value
+            end
+          end
         end
       end
 
@@ -125,17 +138,6 @@
       def lost_peer(record)
       end
 
-      #
-      # Ensures that all the dumps responsible for +key+
-      # has chunks for that key.
-      #
-      def redistribute(key)
-        # Since fetching a key will try to reconstruct the value
-        # if bits are missing, and inserting a key makes sure it
-        # is present enough, this is as simple as get+set.
-        @officer[key] = @officer[key]
-      end
-
     end
 
   end

Modified: trunk/archipelago/lib/archipelago/sanitation.rb
===================================================================
--- trunk/archipelago/lib/archipelago/sanitation.rb	2007-02-09 11:06:40 UTC (rev 209)
+++ trunk/archipelago/lib/archipelago/sanitation.rb	2007-02-09 17:53:27 UTC (rev 210)
@@ -101,8 +101,9 @@
       #
       # Write +key+ and +value+ into the site network with a good level of redundancy etc.
       #
-      def []=(key, value)
-        t = [Time.now.to_i].pack("I")
+      # Optionally the timestamp +t+ can be provided, but it defaults to now.
+      #
+      def []=(key, value, t = [Time.now.to_i].pack("I"))
         super_string = Oneliner::SuperString.new(value)
         nr_of_needed_chunks = @minimum_nr_of_chunks / @minimum_redundancy_ratio
         chunk_size = (super_string.size / nr_of_needed_chunks) + @metadata_overhead
@@ -112,8 +113,9 @@
         dump_hash.t_each do |dump_id, nr_of_chunks_needed|
           @sites[dump_id][:service].insert!(key, 
                                             (0...nr_of_chunks_needed).collect do |nr_of_chunks_needed|
-                                              t + super_string.encode(chunk_size)
-                                            end)
+                                              super_string.encode(chunk_size)
+                                            end,
+                                            t)
         end
       end
 
@@ -128,40 +130,7 @@
       # Get the data for +key+ in the site network.
       #
       def [](key)
-        dump_hash = responsible_sites(key, @minimum_nr_of_chunks)
-        dump_ids = dump_hash.keys
-        newest_timestamp = "\000\000\000\000"
-        threads = []
-        rval = Oneliner::SuperString.new
-        rval.extend(MonitorMixin)
-
-        dump_hash.t_each do |dump_id, nr_of_chunks_available|
-          site = @sites[dump_id][:service]
-          chunks = site.fetch(key)
-          rval.mon_synchronize do
-            while !rval.decode_done? && chunks.size > 0
-              chunk = chunks.shift
-              t = chunk[0...4]
-              data = chunk[4..-1]
-              if t > newest_timestamp
-                rval = Oneliner::SuperString.new
-                newest_timestamp = t
-              end
-              
-              if t == newest_timestamp
-                rval.decode!(data)
-              end
-            end
-          end
-        end
-
-        if rval.decode_done?
-          return rval.to_s
-        else
-          raise NotEnoughDataException.new(self, key) if newest_timestamp != "\000\000\000\000"
-          return nil
-        end
-
+        fetch(key).first
       end
 
       #
@@ -211,9 +180,59 @@
         @sites[get_least_greater_than(@sites, dump_id, 1).last][:service]
       end
 
+      #
+      # Ensures that all the dumps responsible for +key+
+      # has chunks for that key without changing the timestamp
+      # for +key+.
+      #
+      def redistribute(key)
+        value, timestamp = fetch(key)
+        self.[]=(key, value, timestamp)
+      end
+
       private
 
       #
+      # Returns [the value for +key+, the timestamp for the value].
+      #
+      def fetch(key)
+        dump_hash = responsible_sites(key, @minimum_nr_of_chunks)
+        dump_ids = dump_hash.keys
+        newest_timestamp = "\000\000\000\000"
+        threads = []
+        rval = Oneliner::SuperString.new
+        rval.extend(MonitorMixin)
+
+        dump_hash.t_each do |dump_id, nr_of_chunks_available|
+          site = @sites[dump_id][:service]
+          chunks = site.fetch(key)
+          rval.mon_synchronize do
+            while !rval.decode_done? && chunks.size > 0
+              chunk = chunks.shift
+              t = chunk[0...4]
+              data = chunk[4..-1]
+              if t > newest_timestamp
+                rval = Oneliner::SuperString.new
+                newest_timestamp = t
+              end
+              
+              if t == newest_timestamp
+                rval.decode!(data)
+              end
+            end
+          end
+        end
+
+        if rval.decode_done?
+          return [rval.to_s, newest_timestamp]
+        else
+          raise NotEnoughDataException.new(self, key) if newest_timestamp != "\000\000\000\000"
+          return [nil, nil]
+        end
+
+      end
+
+      #
       # Gets the +n+ smallest keys from +hash+ that
       # are greater than +o+. 
       #




More information about the Archipelago-submits mailing list