[Archipelago-submits] [210] trunk/archipelago: made Dump#insert! check given timestamp to see if new chunks should be added.
nobody at rubyforge.org
nobody at rubyforge.org
Fri Feb 9 12:53:28 EST 2007
Revision: 210
Author: zond
Date: 2007-02-09 12:53:27 -0500 (Fri, 09 Feb 2007)
Log Message:
-----------
made Dump#insert! check given timestamp to see if new chunks should be added. Made Sanitation#[]= add timestamps. Added Sanitation#redistribute that reuses timestamps.
Modified Paths:
--------------
trunk/archipelago/doc/dump maintenance.dia
trunk/archipelago/lib/archipelago/dump.rb
trunk/archipelago/lib/archipelago/sanitation.rb
Modified: trunk/archipelago/doc/dump maintenance.dia
===================================================================
(Binary files differ)
Modified: trunk/archipelago/lib/archipelago/dump.rb
===================================================================
--- trunk/archipelago/lib/archipelago/dump.rb 2007-02-09 11:06:40 UTC (rev 209)
+++ trunk/archipelago/lib/archipelago/dump.rb 2007-02-09 17:53:27 UTC (rev 210)
@@ -79,12 +79,25 @@
yield
end
- def insert!(key, values)
- @db.env.begin(BDB::TXN_COMMIT, @db) do |txn, db|
- db.delete(key)
+ def insert!(key, values, timestamp = "")
+ if (duplicates = @db.duplicates(key)).empty?
values.each do |value|
- db[key] = value
+ @db[key] = timestamp + value
end
+ else
+ my_timestamp = duplicates.first[0...4]
+ if timestamp != my_timestamp || duplicates.size > values.size
+ @db.env.begin(BDB::TXN_COMMIT, @db) do |txn, db|
+ db.delete(key)
+ values.each do |value|
+ db[key] = timestamp + value
+ end
+ end
+ else
+ values[0...(values.size - duplicates.size)].each do |value|
+ @db[key] = timestamp + value
+ end
+ end
end
end
@@ -125,17 +138,6 @@
def lost_peer(record)
end
- #
- # Ensures that all the dumps responsible for +key+
- # has chunks for that key.
- #
- def redistribute(key)
- # Since fetching a key will try to reconstruct the value
- # if bits are missing, and inserting a key makes sure it
- # is present enough, this is as simple as get+set.
- @officer[key] = @officer[key]
- end
-
end
end
Modified: trunk/archipelago/lib/archipelago/sanitation.rb
===================================================================
--- trunk/archipelago/lib/archipelago/sanitation.rb 2007-02-09 11:06:40 UTC (rev 209)
+++ trunk/archipelago/lib/archipelago/sanitation.rb 2007-02-09 17:53:27 UTC (rev 210)
@@ -101,8 +101,9 @@
#
# Write +key+ and +value+ into the site network with a good level of redundancy etc.
#
- def []=(key, value)
- t = [Time.now.to_i].pack("I")
+ # Optionally the timestamp +t+ can be provided, but it defaults to now.
+ #
+ def []=(key, value, t = [Time.now.to_i].pack("I"))
super_string = Oneliner::SuperString.new(value)
nr_of_needed_chunks = @minimum_nr_of_chunks / @minimum_redundancy_ratio
chunk_size = (super_string.size / nr_of_needed_chunks) + @metadata_overhead
@@ -112,8 +113,9 @@
dump_hash.t_each do |dump_id, nr_of_chunks_needed|
@sites[dump_id][:service].insert!(key,
(0...nr_of_chunks_needed).collect do |nr_of_chunks_needed|
- t + super_string.encode(chunk_size)
- end)
+ super_string.encode(chunk_size)
+ end,
+ t)
end
end
@@ -128,40 +130,7 @@
# Get the data for +key+ in the site network.
#
def [](key)
- dump_hash = responsible_sites(key, @minimum_nr_of_chunks)
- dump_ids = dump_hash.keys
- newest_timestamp = "\000\000\000\000"
- threads = []
- rval = Oneliner::SuperString.new
- rval.extend(MonitorMixin)
-
- dump_hash.t_each do |dump_id, nr_of_chunks_available|
- site = @sites[dump_id][:service]
- chunks = site.fetch(key)
- rval.mon_synchronize do
- while !rval.decode_done? && chunks.size > 0
- chunk = chunks.shift
- t = chunk[0...4]
- data = chunk[4..-1]
- if t > newest_timestamp
- rval = Oneliner::SuperString.new
- newest_timestamp = t
- end
-
- if t == newest_timestamp
- rval.decode!(data)
- end
- end
- end
- end
-
- if rval.decode_done?
- return rval.to_s
- else
- raise NotEnoughDataException.new(self, key) if newest_timestamp != "\000\000\000\000"
- return nil
- end
-
+ fetch(key).first
end
#
@@ -211,9 +180,59 @@
@sites[get_least_greater_than(@sites, dump_id, 1).last][:service]
end
+ #
+ # Ensures that all the dumps responsible for +key+
+ # has chunks for that key without changing the timestamp
+ # for +key+.
+ #
+ def redistribute(key)
+ value, timestamp = fetch(key)
+ self.[]=(key, value, timestamp)
+ end
+
private
#
+ # Returns [the value for +key+, the timestamp for the value].
+ #
+ def fetch(key)
+ dump_hash = responsible_sites(key, @minimum_nr_of_chunks)
+ dump_ids = dump_hash.keys
+ newest_timestamp = "\000\000\000\000"
+ threads = []
+ rval = Oneliner::SuperString.new
+ rval.extend(MonitorMixin)
+
+ dump_hash.t_each do |dump_id, nr_of_chunks_available|
+ site = @sites[dump_id][:service]
+ chunks = site.fetch(key)
+ rval.mon_synchronize do
+ while !rval.decode_done? && chunks.size > 0
+ chunk = chunks.shift
+ t = chunk[0...4]
+ data = chunk[4..-1]
+ if t > newest_timestamp
+ rval = Oneliner::SuperString.new
+ newest_timestamp = t
+ end
+
+ if t == newest_timestamp
+ rval.decode!(data)
+ end
+ end
+ end
+ end
+
+ if rval.decode_done?
+ return [rval.to_s, newest_timestamp]
+ else
+ raise NotEnoughDataException.new(self, key) if newest_timestamp != "\000\000\000\000"
+ return [nil, nil]
+ end
+
+ end
+
+ #
# Gets the +n+ smallest keys from +hash+ that
# are greater than +o+.
#
More information about the Archipelago-submits
mailing list