[Archipelago-submits] [51] trunk/archipelago: added Publishable#stop! that stops the Jockey or unpublishes the Publishable.

nobody at rubyforge.org nobody at rubyforge.org
Sun Nov 26 21:04:49 EST 2006


Revision: 51
Author:   zond
Date:     2006-11-26 21:04:48 -0500 (Sun, 26 Nov 2006)

Log Message:
-----------
added Publishable#stop! that stops the Jockey or unpublishes the Publishable. used it in tests. added assertions that it is the RIGHT chests the pirate has in pirate test. added BerkeleyHashish#each, Chest#each and Pirate#each and tests for them. simplified Publishable#service_id. made BerkeleyHashishProvider close the databases before it unlinks.

Modified Paths:
--------------
    trunk/archipelago/lib/archipelago/disco.rb
    trunk/archipelago/lib/archipelago/hashish.rb
    trunk/archipelago/lib/archipelago/pirate.rb
    trunk/archipelago/lib/archipelago/treasure.rb
    trunk/archipelago/tests/pirate_test.rb
    trunk/archipelago/tests/treasure_test.rb

Modified: trunk/archipelago/lib/archipelago/disco.rb
===================================================================
--- trunk/archipelago/lib/archipelago/disco.rb	2006-11-27 01:14:19 UTC (rev 50)
+++ trunk/archipelago/lib/archipelago/disco.rb	2006-11-27 02:04:48 UTC (rev 51)
@@ -145,6 +145,17 @@
       def valid?
         true
       end
+
+      #
+      # Stops the publishing of this Publishable.
+      #
+      def stop!
+        if defined?(Archipelago::Disco::MC) && @jockey == Archipelago::Disco::MC
+          @jockey.unpublish(self.service_id)
+        else
+          @jockey.stop!
+        end
+      end
       
       #
       # Returns our semi-unique id so that we can be found again.
@@ -158,11 +169,7 @@
         # Stuff that didnt fit in any of the other databases.
         #
         @metadata ||= @persistence_provider.get_hashish("metadata")
-        service_id = @metadata["service_id"]
-        unless service_id
-          service_id = @metadata["service_id"] ||= Digest::SHA1.hexdigest("#{HOST}:#{Time.new.to_f}:#{self.object_id}:#{rand(1 << 32)}")
-        end
-        return service_id
+        return @metadata["service_id"] ||= Digest::SHA1.hexdigest("#{HOST}:#{Time.new.to_f}:#{self.object_id}:#{rand(1 << 32)}")
       end
 
     end
@@ -224,6 +231,16 @@
     end
 
     #
+    # A class used to defined removed services.
+    #
+    class UnPublish
+      attr_reader :service_id
+      def initialize(service_id)
+        @service_id = service_id
+      end
+    end
+
+    #
     # A class used to define an existing service.
     #
     class Record < ServiceDescription
@@ -253,6 +270,7 @@
     class ServiceLocker
       attr_reader :hash
       include Archipelago::Current::Synchronized
+      include Archipelago::Current::ThreadedCollection
       def initialize(hash = nil)
         super
         @hash = hash || {}
@@ -278,7 +296,7 @@
             end
           end
         else
-          super(*args)
+          super(meth, *args, &block)
         end
       end
       #
@@ -454,6 +472,17 @@
         end
       end
 
+      #
+      # Removes the service with given +service_id+ from the published services.
+      #
+      def unpublish(service_id)
+        @local_services.delete(service_id)
+        @new_service_semaphore.broadcast
+        unless @thrifty_publishing
+          @outgoing << [nil, UnPublish.new(service_id)]
+        end
+      end
+
       private
 
       #
@@ -534,8 +563,8 @@
       end
 
       #
-      # Start the thread picking incoming Records and Queries and
-      # handling them properly
+      # Start the thread picking incoming Records, Queries and UnPublishes and
+      # handling them properly.
       #
       def start_picker
         @picker_thread = Thread.new do
@@ -555,6 +584,8 @@
                   @remote_services[data[:service_id]] = data
                   @new_service_semaphore.broadcast
                 end
+              elsif Archipelago::Disco::UnPublish === data
+                @remote_services.delete(data.service_id)
               end
             rescue Exception => e
               puts e

Modified: trunk/archipelago/lib/archipelago/hashish.rb
===================================================================
--- trunk/archipelago/lib/archipelago/hashish.rb	2006-11-27 01:14:19 UTC (rev 50)
+++ trunk/archipelago/lib/archipelago/hashish.rb	2006-11-27 02:04:48 UTC (rev 51)
@@ -48,6 +48,13 @@
         @lock = Archipelago::Current::Lock.new
       end
       #
+      # Close the @content_db and @timestamps_db behind this BerkeleyHashish
+      #
+      def close!
+        @content_db.close
+        @timestamps_db.close
+      end
+      #
       # Returns a deep ( Marshal.load(Marshal.dump(o)) ) clone
       # of the object represented by +key+.
       #
@@ -125,6 +132,19 @@
         end
       end
       #
+      # Will do +callable+.call(key, value) for each
+      # key-and-value pair in this Hashish.
+      #
+      # NB: This is totaly thread-unsafe, only do this
+      # for management or rescue!
+      #
+      def each(callable)
+        @content_db.each do |serialized_key, serialized_value|
+          key = Marshal.load(serialized_key)
+          callable.call(key, self.[](key))
+        end
+      end
+      #
       # Delete +key+ and its value and timestamp.
       #
       def delete(key)
@@ -198,6 +218,8 @@
       def initialize(env_path)
         env_path.mkpath
         @env = BDB::Env.open(env_path, BDB::CREATE | BDB::INIT_MPOOL)
+        @berkeley_hashishes = []
+        @bdb_dbs = []
       end
       #
       # Returns a cleverly cached (but slightly inefficient)
@@ -205,20 +227,31 @@
       # using +name+.
       #
       def get_cached_hashish(name)
-        BerkeleyHashish.new(name, @env)
+        hashish = BerkeleyHashish.new(name, @env)
+        @berkeley_hashishes << hashish
+        return hashish
       end
       #
       # Returns a normal hash-like instance using +name+.
       #
       def get_hashish(name)
-        @env.open_db(BDB::HASH, name, nil, BDB::CREATE | BDB::NOMMAP)
+        db = @env.open_db(BDB::HASH, name, nil, BDB::CREATE | BDB::NOMMAP)
+        @bdb_dbs << db
+        return db
       end
       #
-      # Removes the persistent files of this instance.
+      # Closes databases opened by this instance and removes the persistent files.
       #
       def unlink
-        p = Pathname.new(@env.home)
-        p.rmtree if p.exist?
+        @berkeley_hashishes.each do |h|
+          h.close!
+        end
+        @bdb_dbs.each do |d|
+          d.close
+        end
+        home = Pathname.new(@env.home)
+        @env.close
+        home.rmtree if home.exist?
       end
     end
     

Modified: trunk/archipelago/lib/archipelago/pirate.rb
===================================================================
--- trunk/archipelago/lib/archipelago/pirate.rb	2006-11-27 01:14:19 UTC (rev 50)
+++ trunk/archipelago/lib/archipelago/pirate.rb	2006-11-27 02:04:48 UTC (rev 51)
@@ -220,6 +220,19 @@
         @service_update_thread.kill
       end
 
+      #
+      # Will do +callable+.call(key, value)
+      # for each key-and-value pair in this database network.
+      #
+      # NB: This is totaly thread-unsafe, only do this
+      # for management or rescue!
+      #
+      def each(callable)
+        @chests.t_each do |service_id, chest|
+          chest[:service].each(callable)
+        end
+      end
+
       private
 
       #

Modified: trunk/archipelago/lib/archipelago/treasure.rb
===================================================================
--- trunk/archipelago/lib/archipelago/treasure.rb	2006-11-27 01:14:19 UTC (rev 50)
+++ trunk/archipelago/lib/archipelago/treasure.rb	2006-11-27 02:04:48 UTC (rev 51)
@@ -296,6 +296,17 @@
       end
 
       #
+      # Will do +callable+.call(key, value)
+      # for each key-and-value pair in this Chest.
+      #
+      # NB: This is totaly thread-unsafe, only do this
+      # for management or rescue!
+      #
+      def each(callable)
+        @db.each(callable)
+      end
+
+      #
       # Evaluate +data+ if we have not already seen +label+ or if we have an earlier +timestamp+ than the one given.
       #
       def evaluate!(label, timestamp, data)

Modified: trunk/archipelago/tests/pirate_test.rb
===================================================================
--- trunk/archipelago/tests/pirate_test.rb	2006-11-27 01:14:19 UTC (rev 50)
+++ trunk/archipelago/tests/pirate_test.rb	2006-11-27 02:04:48 UTC (rev 51)
@@ -11,24 +11,37 @@
     @c.publish!
     @c2 = TestChest.new(:persistence_provider => Archipelago::Hashish::BerkeleyHashishProvider.new(Pathname.new(__FILE__).parent.join("chest2.db")))
     @c2.publish!
-    @tm = TestManager.new
+    @tm = TestManager.new(:persistence_provider => Archipelago::Hashish::BerkeleyHashishProvider.new(Pathname.new(__FILE__).parent.join("tranny.db")))
     @tm.publish!
-    assert_within(10) do
-      !@p.chests.empty?
+    assert_within(2) do
+      Set.new(@p.chests.keys) == Set.new([@c.service_id, @c2.service_id])
     end
-    assert_within(10) do
-      !@p.trannies.empty?
+    assert_within(2) do
+      @p.trannies.keys == [@tm.service_id]
     end
   end
 
   def teardown
     @p.stop!
+    @c.stop!
     @c.persistence_provider.unlink
+    @c2.stop!
     @c2.persistence_provider.unlink
+    @tm.stop!
     @tm.persistence_provider.unlink
     DRb.stop_service
   end
 
+  def test_each
+    @p["oj"] = "bla"
+    @p["brunt"] = "ja"
+    h = {}
+    @p.each(Proc.new do |k,v|
+              h[k] = v
+            end)
+    assert_equal({"oj" => "bla", "brunt" => "ja"}, h)
+  end
+
   def test_evaluate
     assert_raise(NameError) do
       e = Evaltest.new

Modified: trunk/archipelago/tests/treasure_test.rb
===================================================================
--- trunk/archipelago/tests/treasure_test.rb	2006-11-27 01:14:19 UTC (rev 50)
+++ trunk/archipelago/tests/treasure_test.rb	2006-11-27 02:04:48 UTC (rev 51)
@@ -27,6 +27,16 @@
     DRb.stop_service
   end
 
+  def test_each
+    @c["oj"] = "bla"
+    @c["brunt"] = "ja"
+    h = {}
+    @c.each(Proc.new do |k,v|
+              h[k] = v
+            end)
+    assert_equal({"oj" => "bla", "brunt" => "ja"}, h)
+  end
+
   def test_around_save
     s = A.new("hehu")
     @c["oj"] = s




More information about the Archipelago-submits mailing list