[Backgroundrb-devel] A new QueueWorker class

David Lemstra david at lemstra.ca
Thu Sep 14 17:41:53 EDT 2006


Michael,
It all looks good to me (but some error messages might help).
Maybe superclass :generate_preview_worker with BackgrounDRb::Rails first
and make sure it works on its own w/o the Q baggage.
Maybe setting :id manually in the arg hash is needed. I'm not sure if
I've tried it without.
David Lemstra

Michael Siebert wrote:
> Hey David,
> I'm currently trying out your QueueWorker and I am experiencing a little
> problem:
> It seems the worker (my worker) is never started. Just to make sure I
> got it right:
> 
>       MiddleMan[:queue].queue_job({
>           :in_filename => self.getFullPath,
>           :out_filename => self.getPreviewPath,
>           :width => 300,
>           :height => 600
>        
>         }
>       )
> 
> This should put another instance of my worker into the queue and run it,
> if there are enough threads "free". is that how you meant it?
> I modified the configuration a little bit to fit my needs:
> 
> autostart:
>  :queue:
>    class: queue_worker
>    args:
>      :num_child_workers: 1
>      :child_class: :generate_preview_worker
>      :reQ_on_finish: false
>      :singleton: true
> 
> Any ideas what i might be doing wrong?
> 
> regards,
> Micha
> 
> 2006/8/24, David Lemstra <david at lemstra.ca <mailto:david at lemstra.ca>>:
> 
>     Hello all,
>     I've come up w/ a worker class that manages queued jobs using a fixed
>     number of child workers. Well, that's not quite true -- a new worker is
>     spawned for each job, but you set the total number that may exist at
>     once.
> 
>     There are three components:
>     1) queue_worker.rb: The singleton worker that manages the child
>     workers.
>     You probably want to auto start this. Make sure you give it
>     :singleton=>true. Read this file for the methods to interact with your
>     children. (ie. queue_job(), delete_job(), job_progress() )
> 
>     2) backgroundrb_rails_queue.rb: The super class for the "child workers"
>     (and uses backgroundrb_rails.rb in turn). This file needs to be included
>     in background.rb
> 
>     3) Your child worker, which should be a subclass of
>     BackgrounDRb::RailsQueue, is otherwise the about same as normal
>     If it's a big loop, you probably want to use terminate?() on each
>     iteration and update @progress. Use suicide() at the end to make room
>     for the next child.
> 
>     Options: (probably in your backgroundrb.yml)
>     autostart:
>       :queue_key:
>         class: queue_worker
>         args:
>           :num_child_workers: 2
>           :child_class: :cost_calculator_worker
>           :reQ_on_finish: true
>           :singleton: true
> 
>     :queue_key can be changed to what you want, but it is the permanent key
>     of the QueueWorker
>     :num_child_workers: is up to you!
>     :child_class: your worker class you want as child workers.
>     :reQ_on_finish: do you want results to be stored in the queue until you
>     call job_progress!() ?
>     Note: to be able to access your child jobs w/ the QueueWorker methods,
>     include a unique :id in your {args} when you queue_job({args})
> 
>     I'll attach the files. If they don't go through, I'll resend as text.
> 
>     BTW, This works well enough for me, but I'm learning as I go too, so no
>     guarantees :) I don't use the fancy timing options, so ymmmv for
>     :next_start and :interval.
> 
>     Let me know if you find any issues (though I'm off-line for a week
>     after
>     this post). I'm wondering myself if it might be better to reuse child
>     workers instead of re-spawning new ones. Another day maybe.
> 
>     cheers,
>     David Lemstra
> 
> 
>     # Put your code that runs your task inside the do_work method
>     # it will be run automatically in a thread. You have access to
>     # all of your rails models if you set load_rails to true in the
>     # config file. You also get @logger inside of this class by default.
>     require 'monitor.rb '
> 
>     class QueueWorker < BackgrounDRb::Rails
> 
>       attr_reader :q, :id_hash, :completed
>       def initialize(key, args={})
>         super(key,args)
>         @num_child_workers = args[:num_child_workers] ?
>     args[:num_child_workers] : 1
>         @child_workers = Array.new(@num_child_workers) {|i|
>     Hash[:job_key,nil,:args,nil,:s_thread,nil, :s_mutex,Mutex.new,
>     :child, i] }
> 
>         @q = []
>         @q.extend(MonitorMixin)
>         @q_loaded_cv = @q.new_cond
>         @id_hash = {}
>         @id_hash_mutex = Mutex.new
> 
>         raise ArgumentError unless args.has_key?(:child_class)
>         @child_class = args[:child_class]
>         @reQ_on_finish = args[:reQ_on_finish] || false
>         @completed = 0
>       end
> 
>       def queue_job(args)
>         return nil if @id_hash && args[:id] && @id_hash.has_key?(args[:id])
>         @q.synchronize do
>           @q.push args
>           @id_hash_mutex.synchronize { @id_hash[args[:id]] =
>     {:status=>:queued, :job_key=>nil, :results=>nil } } if args[:id]
>           @q_loaded_cv.signal
>         end
>         return true
>       end
> 
>       def job_in_progress?(job_id)
>         @id_hash.has_key?(job_id)
>       end
> 
>       def job_status?(job_id)
>         @id_hash_mutex.synchronize do
>           return nil unless @id_hash.has_key?(job_id)
>           return @id_hash[job_key][:status]
>         end
>       end
> 
>       def job_progress(job_id)
>         report_hsh = {}
>         @id_hash_mutex.synchronize do
>           return nil unless @id_hash.has_key?(job_id)
>           report_hsh[:status] = @id_hash[job_id][:status]
>           report_hsh[:progress] = case @id_hash[job_id][:status]
>             when :queued then
>               ahead = 0
>               @q.each_index {|i| if @q[i][:id] == job_id then ahead = i;
>     break; end }
>               ahead
>             when :running
>               w = self[@id_hash[job_id][:job_key]]
>               w.nil? ? nil : w.progress
>             when :done
>               @id_hash[job_id][:results]
>             else nil
>           end
>         end
>         return report_hsh
>       end
> 
>       def job_progress!(job_id)
>         report_hsh = {}
>         @id_hash_mutex.synchronize do
>           return nil unless @id_hash.has_key?(job_id)
>           report_hsh[:status] = @id_hash[job_id][:status]
>           report_hsh[:progress] = case @id_hash[job_id][:status]
>             when :queued then
>               ahead = 0
>               @q.each_index {|i| if @q[i][:id] == job_id then ahead = i;
>     break; end }
>               ahead
>             when :running
>               w = self[@id_hash[job_id][:job_key]]
>               if w.nil?
>                 @id_hash.delete(job_id)[:results]
>               else
>                 w.progress
>               end
>             when :done
>               @id_hash.delete(job_id)[:results]
>             else nil
>           end
>         end
>         return report_hsh
>       end
> 
>       def delete_job(job_id)
>         args = nil
>         @q.synchronize do  @id_hash_mutex.synchronize do
>           args = @id_hash[job_id]
>           return true if args.nil?
>           if args[:status] == :queued
>             @q.delete_if {|h| h[:id] == job_id }
>             @id_hash.delete(job_id)
>             return true
>           elsif args[:status] == :done
>             @id_hash.delete(job_id)
>             return true
>           end
>         end
>         end
>         ::BackgrounDRb::MiddleMan.instance.delete_worker(args[:job_key])
>     if args[:status] == :running
>         return true
>       end
> 
>       def do_work(args)
>         # You probably don't want to mess with this method unless you
>     know what's what.
>         @child_workers.each do |child_hash|
>           child_hash[:s_thread] = Thread.start do
>             loop do
>               # Wait for a new job in the @q
>               child_hash[:s_mutex].synchronize do
>                 # get the Q mutex and wait for a job
>                 @q.synchronize do
>                 tl = Thread.list
>                   @q_loaded_cv.wait_while { @ q.empty? }
>                   child_hash[:args] = @q.shift
>                   if child_hash[:args][:id]
>                     @id_hash_mutex.synchronize do
>                       @id_hash[child_hash[:args][:id]][:status] = :running
>                       child_hash[:job_key] =
>     spawn_worker({:args=>child_hash[:args],:class=>@child_class})
>                       @id_hash[child_hash[:args][:id]][:job_key] =
>     child_hash[:job_key]
>                     end
>                   else
>                     child_hash[:job_key] =
>     spawn_worker(job_args.merge(:class=>@child_class))
>                   end
>                 end
>                 self[child_hash[:job_key]].thread[:DQ_request].wait(child_hash[:s_mutex])
> 
>                 # grab and store the results
>                 if child_hash[:args][:id]
>                   @id_hash_mutex.synchronize do
>                     if @reQ_on_finish
>                       r = self[child_hash[:job_key]].results
>                       @id_hash[child_hash[:args][:id]][:results] = r if r
>                       @id_hash[child_hash[:args][:id]][:status] = :done
>                       @id_hash[child_hash[:args][:id]][:done_at] = Time.now
>                     else
>                       @id_hash.delete(child_hash[:args][:id])
>                     end
>                   end
>                 end
>                 self[child_hash[:job_key]].thread[:DQed].signal
>                 @completed += 1
>                 [:args,:job_key].each {|k| child_hash[k] = nil }
>               end
>               # Loop back and wait for the job_key to get killed again....
>             end
>           end
>         end
>       end
> 
>       private
> 
>       def [](key)
>         # Use jobs to avoid the access time update w/ []
>         ::BackgrounDRb::MiddleMan.instance.jobs[key]
>       end
> 
>     end
> 
> 
>     module BackgrounDRb
> 
>       class RailsQueue < BackgrounDRb::Rails
>         attr_reader :progress
>         def initialize(key, args={})
>           super(key,args)
>           @job_ctrl = true
>         end
> 
>         def start_process
>           return if schedule_first_run && schedule_first_run.to_i >
>     Time.now.to_i
>           @thread = Thread.new do
>             Thread.current[:safe_to_kill] = ConditionVariable.new
>             Thread.current[:kill] = false
>             Thread.current[:DQ_request] = ConditionVariable.new
>              Thread.current[:DQed] = ConditionVariable.new
>             Thread.current[:mutex] = Mutex.new
>             begin
>               Thread.current[:mutex].synchronize do
>                 do_work(@args)
>               end
>             rescue Exception => e
>               @logger.error "#{ e.message } - (#{ e.class })" << "\n" <<
>     (e.backtrace or []).join("\n")
>             end
>           end
>           @next_start = @interval.from_now if schedule_repeat
>         end
> 
>         def results
>           # Overwrite this method and set reQ_on_finish = true (in the
>     queue worker args)
>           # to have a process put it's results in back in the queue
>           # for pickup before being killed
>           nil
>         end
> 
>         def before_DQ(args=nil)
>           # stub method that gets called before dequeue is run.
>           # Overwrite in your class instance
>           true
>         end
> 
>         def terminate(args=nil)
>           do_DQ(args)
>           super(args)
>         end
> 
>         def suicide(args=nil)
>           do_DQ(args)
>           kill
>           Thread.exit
>         end
> 
>         private
>         def do_DQ(args=nil)
>           before_DQ(args)
>           Thread.current[:DQ_request].signal
>           Thread.current[:DQed].wait(Thread.current[:mutex])
>         end
>       end
>     end
> 
>     _______________________________________________
>     Backgroundrb-devel mailing list
>     Backgroundrb-devel at rubyforge.org
>     <mailto:Backgroundrb-devel at rubyforge.org>
>     http://rubyforge.org/mailman/listinfo/backgroundrb-devel
> 
> 
> 
> 
> -- 
> Michael Siebert <info at siebert-wd.de <mailto:info at siebert-wd.de>>
> 
> www.siebert-wd.de <http://www.siebert-wd.de> - Gedanken lesen
> www.stellar-legends.de <http://www.stellar-legends.de> -
> Weltraum-Browsergame im Alpha-Stadium

-- 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
| David Lemstra             B.Eng.Mgt., M.A.Sc. |
|~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~|
| david at lemstra.ca | 4339 Harrison Rd. Binbrook |
| (905)-692-3687   | Ontario, Canada. L0R1C0    |
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
| "If it ain't broke, it doesn't have enough    |
| features yet" ~Scott Adams                    |
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~


More information about the Backgroundrb-devel mailing list