[Archipelago-submits] [51] trunk/archipelago: added Publishable#stop! that stops the Jockey or unpublishes the Publishable.
nobody at rubyforge.org
nobody at rubyforge.org
Sun Nov 26 21:04:49 EST 2006
Revision: 51
Author: zond
Date: 2006-11-26 21:04:48 -0500 (Sun, 26 Nov 2006)
Log Message:
-----------
added Publishable#stop! that stops the Jockey or unpublishes the Publishable. used it in tests. added assertions that it is the RIGHT chests the pirate has in pirate test. added BerkeleyHashish#each, Chest#each and Pirate#each and tests for them. simplified Publishable#service_id. made BerkeleyHashishProvider close the databases before it unlinks.
Modified Paths:
--------------
trunk/archipelago/lib/archipelago/disco.rb
trunk/archipelago/lib/archipelago/hashish.rb
trunk/archipelago/lib/archipelago/pirate.rb
trunk/archipelago/lib/archipelago/treasure.rb
trunk/archipelago/tests/pirate_test.rb
trunk/archipelago/tests/treasure_test.rb
Modified: trunk/archipelago/lib/archipelago/disco.rb
===================================================================
--- trunk/archipelago/lib/archipelago/disco.rb 2006-11-27 01:14:19 UTC (rev 50)
+++ trunk/archipelago/lib/archipelago/disco.rb 2006-11-27 02:04:48 UTC (rev 51)
@@ -145,6 +145,17 @@
def valid?
true
end
+
+ #
+ # Stops the publishing of this Publishable.
+ #
+ def stop!
+ if defined?(Archipelago::Disco::MC) && @jockey == Archipelago::Disco::MC
+ @jockey.unpublish(self.service_id)
+ else
+ @jockey.stop!
+ end
+ end
#
# Returns our semi-unique id so that we can be found again.
@@ -158,11 +169,7 @@
# Stuff that didnt fit in any of the other databases.
#
@metadata ||= @persistence_provider.get_hashish("metadata")
- service_id = @metadata["service_id"]
- unless service_id
- service_id = @metadata["service_id"] ||= Digest::SHA1.hexdigest("#{HOST}:#{Time.new.to_f}:#{self.object_id}:#{rand(1 << 32)}")
- end
- return service_id
+ return @metadata["service_id"] ||= Digest::SHA1.hexdigest("#{HOST}:#{Time.new.to_f}:#{self.object_id}:#{rand(1 << 32)}")
end
end
@@ -224,6 +231,16 @@
end
#
+ # A class used to defined removed services.
+ #
+ class UnPublish
+ attr_reader :service_id
+ def initialize(service_id)
+ @service_id = service_id
+ end
+ end
+
+ #
# A class used to define an existing service.
#
class Record < ServiceDescription
@@ -253,6 +270,7 @@
class ServiceLocker
attr_reader :hash
include Archipelago::Current::Synchronized
+ include Archipelago::Current::ThreadedCollection
def initialize(hash = nil)
super
@hash = hash || {}
@@ -278,7 +296,7 @@
end
end
else
- super(*args)
+ super(meth, *args, &block)
end
end
#
@@ -454,6 +472,17 @@
end
end
+ #
+ # Removes the service with given +service_id+ from the published services.
+ #
+ def unpublish(service_id)
+ @local_services.delete(service_id)
+ @new_service_semaphore.broadcast
+ unless @thrifty_publishing
+ @outgoing << [nil, UnPublish.new(service_id)]
+ end
+ end
+
private
#
@@ -534,8 +563,8 @@
end
#
- # Start the thread picking incoming Records and Queries and
- # handling them properly
+ # Start the thread picking incoming Records, Queries and UnPublishes and
+ # handling them properly.
#
def start_picker
@picker_thread = Thread.new do
@@ -555,6 +584,8 @@
@remote_services[data[:service_id]] = data
@new_service_semaphore.broadcast
end
+ elsif Archipelago::Disco::UnPublish === data
+ @remote_services.delete(data.service_id)
end
rescue Exception => e
puts e
Modified: trunk/archipelago/lib/archipelago/hashish.rb
===================================================================
--- trunk/archipelago/lib/archipelago/hashish.rb 2006-11-27 01:14:19 UTC (rev 50)
+++ trunk/archipelago/lib/archipelago/hashish.rb 2006-11-27 02:04:48 UTC (rev 51)
@@ -48,6 +48,13 @@
@lock = Archipelago::Current::Lock.new
end
#
+ # Close the @content_db and @timestamps_db behind this BerkeleyHashish
+ #
+ def close!
+ @content_db.close
+ @timestamps_db.close
+ end
+ #
# Returns a deep ( Marshal.load(Marshal.dump(o)) ) clone
# of the object represented by +key+.
#
@@ -125,6 +132,19 @@
end
end
#
+ # Will do +callable+.call(key, value) for each
+ # key-and-value pair in this Hashish.
+ #
+ # NB: This is totaly thread-unsafe, only do this
+ # for management or rescue!
+ #
+ def each(callable)
+ @content_db.each do |serialized_key, serialized_value|
+ key = Marshal.load(serialized_key)
+ callable.call(key, self.[](key))
+ end
+ end
+ #
# Delete +key+ and its value and timestamp.
#
def delete(key)
@@ -198,6 +218,8 @@
def initialize(env_path)
env_path.mkpath
@env = BDB::Env.open(env_path, BDB::CREATE | BDB::INIT_MPOOL)
+ @berkeley_hashishes = []
+ @bdb_dbs = []
end
#
# Returns a cleverly cached (but slightly inefficient)
@@ -205,20 +227,31 @@
# using +name+.
#
def get_cached_hashish(name)
- BerkeleyHashish.new(name, @env)
+ hashish = BerkeleyHashish.new(name, @env)
+ @berkeley_hashishes << hashish
+ return hashish
end
#
# Returns a normal hash-like instance using +name+.
#
def get_hashish(name)
- @env.open_db(BDB::HASH, name, nil, BDB::CREATE | BDB::NOMMAP)
+ db = @env.open_db(BDB::HASH, name, nil, BDB::CREATE | BDB::NOMMAP)
+ @bdb_dbs << db
+ return db
end
#
- # Removes the persistent files of this instance.
+ # Closes databases opened by this instance and removes the persistent files.
#
def unlink
- p = Pathname.new(@env.home)
- p.rmtree if p.exist?
+ @berkeley_hashishes.each do |h|
+ h.close!
+ end
+ @bdb_dbs.each do |d|
+ d.close
+ end
+ home = Pathname.new(@env.home)
+ @env.close
+ home.rmtree if home.exist?
end
end
Modified: trunk/archipelago/lib/archipelago/pirate.rb
===================================================================
--- trunk/archipelago/lib/archipelago/pirate.rb 2006-11-27 01:14:19 UTC (rev 50)
+++ trunk/archipelago/lib/archipelago/pirate.rb 2006-11-27 02:04:48 UTC (rev 51)
@@ -220,6 +220,19 @@
@service_update_thread.kill
end
+ #
+ # Will do +callable+.call(key, value)
+ # for each key-and-value pair in this database network.
+ #
+ # NB: This is totaly thread-unsafe, only do this
+ # for management or rescue!
+ #
+ def each(callable)
+ @chests.t_each do |service_id, chest|
+ chest[:service].each(callable)
+ end
+ end
+
private
#
Modified: trunk/archipelago/lib/archipelago/treasure.rb
===================================================================
--- trunk/archipelago/lib/archipelago/treasure.rb 2006-11-27 01:14:19 UTC (rev 50)
+++ trunk/archipelago/lib/archipelago/treasure.rb 2006-11-27 02:04:48 UTC (rev 51)
@@ -296,6 +296,17 @@
end
#
+ # Will do +callable+.call(key, value)
+ # for each key-and-value pair in this Chest.
+ #
+ # NB: This is totaly thread-unsafe, only do this
+ # for management or rescue!
+ #
+ def each(callable)
+ @db.each(callable)
+ end
+
+ #
# Evaluate +data+ if we have not already seen +label+ or if we have an earlier +timestamp+ than the one given.
#
def evaluate!(label, timestamp, data)
Modified: trunk/archipelago/tests/pirate_test.rb
===================================================================
--- trunk/archipelago/tests/pirate_test.rb 2006-11-27 01:14:19 UTC (rev 50)
+++ trunk/archipelago/tests/pirate_test.rb 2006-11-27 02:04:48 UTC (rev 51)
@@ -11,24 +11,37 @@
@c.publish!
@c2 = TestChest.new(:persistence_provider => Archipelago::Hashish::BerkeleyHashishProvider.new(Pathname.new(__FILE__).parent.join("chest2.db")))
@c2.publish!
- @tm = TestManager.new
+ @tm = TestManager.new(:persistence_provider => Archipelago::Hashish::BerkeleyHashishProvider.new(Pathname.new(__FILE__).parent.join("tranny.db")))
@tm.publish!
- assert_within(10) do
- !@p.chests.empty?
+ assert_within(2) do
+ Set.new(@p.chests.keys) == Set.new([@c.service_id, @c2.service_id])
end
- assert_within(10) do
- !@p.trannies.empty?
+ assert_within(2) do
+ @p.trannies.keys == [@tm.service_id]
end
end
def teardown
@p.stop!
+ @c.stop!
@c.persistence_provider.unlink
+ @c2.stop!
@c2.persistence_provider.unlink
+ @tm.stop!
@tm.persistence_provider.unlink
DRb.stop_service
end
+ def test_each
+ @p["oj"] = "bla"
+ @p["brunt"] = "ja"
+ h = {}
+ @p.each(Proc.new do |k,v|
+ h[k] = v
+ end)
+ assert_equal({"oj" => "bla", "brunt" => "ja"}, h)
+ end
+
def test_evaluate
assert_raise(NameError) do
e = Evaltest.new
Modified: trunk/archipelago/tests/treasure_test.rb
===================================================================
--- trunk/archipelago/tests/treasure_test.rb 2006-11-27 01:14:19 UTC (rev 50)
+++ trunk/archipelago/tests/treasure_test.rb 2006-11-27 02:04:48 UTC (rev 51)
@@ -27,6 +27,16 @@
DRb.stop_service
end
+ def test_each
+ @c["oj"] = "bla"
+ @c["brunt"] = "ja"
+ h = {}
+ @c.each(Proc.new do |k,v|
+ h[k] = v
+ end)
+ assert_equal({"oj" => "bla", "brunt" => "ja"}, h)
+ end
+
def test_around_save
s = A.new("hehu")
@c["oj"] = s
More information about the Archipelago-submits
mailing list