[Backgroundrb-devel] A new QueueWorker class

Ezra Zygmuntowicz ezmobius at gmail.com
Thu Aug 24 02:41:13 EDT 2006


	Wow! Thats pretty thick ;) I'll work my way through it this weekend  
and see how it works.

Thanks
-Ezra


On Aug 23, 2006, at 9:43 PM, David Lemstra wrote:

> Hello all,
> I've come up w/ a worker class that manages queued jobs using a fixed
> number of child workers. Well, that's not quite true -- a new  
> worker is
> spawned for each job, but you set the total number that may exist  
> at once.
>
> There are three components:
> 1) queue_worker.rb: The singleton worker that manages the child  
> workers.
> You probably want to auto start this. Make sure you give it
> :singleton=>true. Read this file for the methods to interact with your
> children. (ie. queue_job(), delete_job(), job_progress() )
>
> 2) backgroundrb_rails_queue.rb: The super class for the "child  
> workers"
> (and uses backgroundrb_rails.rb in turn). This file needs to be  
> included
> in background.rb
>
> 3) Your child worker, which should be a subclass of
> BackgrounDRb::RailsQueue, is otherwise the about same as normal
> If it's a big loop, you probably want to use terminate?() on each
> iteration and update @progress. Use suicide() at the end to make room
> for the next child.
>
> Options: (probably in your backgroundrb.yml)
> autostart:
>   :queue_key:
>     class: queue_worker
>     args:
>       :num_child_workers: 2
>       :child_class: :cost_calculator_worker
>       :reQ_on_finish: true
>       :singleton: true
>
> :queue_key can be changed to what you want, but it is the permanent  
> key
> of the QueueWorker
> :num_child_workers: is up to you!
> :child_class: your worker class you want as child workers.
> :reQ_on_finish: do you want results to be stored in the queue until  
> you
> call job_progress!() ?
> Note: to be able to access your child jobs w/ the QueueWorker methods,
> include a unique :id in your {args} when you queue_job({args})
>
> I'll attach the files. If they don't go through, I'll resend as text.
>
> BTW, This works well enough for me, but I'm learning as I go too,  
> so no
> guarantees :) I don't use the fancy timing options, so ymmmv for
> :next_start and :interval.
>
> Let me know if you find any issues (though I'm off-line for a week  
> after
> this post). I'm wondering myself if it might be better to reuse child
> workers instead of re-spawning new ones. Another day maybe.
>
> cheers,
> David Lemstra
> # Put your code that runs your task inside the do_work method
> # it will be run automatically in a thread. You have access to
> # all of your rails models if you set load_rails to true in the
> # config file. You also get @logger inside of this class by default.
> require 'monitor.rb'
>
> class QueueWorker < BackgrounDRb::Rails
>
>   attr_reader :q, :id_hash, :completed
>   def initialize(key, args={})
>     super(key,args)
>     @num_child_workers = args[:num_child_workers] ? args 
> [:num_child_workers] : 1
>     @child_workers = Array.new(@num_child_workers) {|i| Hash 
> [:job_key,nil,:args,nil,:s_thread,nil, :s_mutex,Mutex.new, :child,  
> i] }
>
>     @q = []
>     @q.extend(MonitorMixin)
>     @q_loaded_cv = @q.new_cond
>     @id_hash = {}
>     @id_hash_mutex = Mutex.new
>
>     raise ArgumentError unless args.has_key?(:child_class)
>     @child_class = args[:child_class]
>     @reQ_on_finish = args[:reQ_on_finish] || false
>     @completed = 0
>   end
>
>   def queue_job(args)
>     return nil if @id_hash && args[:id] && @id_hash.has_key?(args 
> [:id])
>     @q.synchronize do
>       @q.push args
>       @id_hash_mutex.synchronize { @id_hash[args[:id]] =  
> {:status=>:queued, :job_key=>nil, :results=>nil } } if args[:id]
>       @q_loaded_cv.signal
>     end
>     return true
>   end
>
>   def job_in_progress?(job_id)
>     @id_hash.has_key?(job_id)
>   end
>
>   def job_status?(job_id)
>     @id_hash_mutex.synchronize do
>       return nil unless @id_hash.has_key?(job_id)
>       return @id_hash[job_key][:status]
>     end
>   end
>
>   def job_progress(job_id)
>     report_hsh = {}
>     @id_hash_mutex.synchronize do
>       return nil unless @id_hash.has_key?(job_id)
>       report_hsh[:status] = @id_hash[job_id][:status]
>       report_hsh[:progress] = case @id_hash[job_id][:status]
>         when :queued then
>           ahead = 0
>           @q.each_index {|i| if @q[i][:id] == job_id then ahead =  
> i; break; end }
>           ahead
>         when :running
>           w = self[@id_hash[job_id][:job_key]]
>           w.nil? ? nil : w.progress
>         when :done
>           @id_hash[job_id][:results]
>         else nil
>       end
>     end
>     return report_hsh
>   end
>
>   def job_progress!(job_id)
>     report_hsh = {}
>     @id_hash_mutex.synchronize do
>       return nil unless @id_hash.has_key?(job_id)
>       report_hsh[:status] = @id_hash[job_id][:status]
>       report_hsh[:progress] = case @id_hash[job_id][:status]
>         when :queued then
>           ahead = 0
>           @q.each_index {|i| if @q[i][:id] == job_id then ahead =  
> i; break; end }
>           ahead
>         when :running
>           w = self[@id_hash[job_id][:job_key]]
>           if w.nil?
>             @id_hash.delete(job_id)[:results]
>           else
>             w.progress
>           end
>         when :done
>           @id_hash.delete(job_id)[:results]
>         else nil
>       end
>     end
>     return report_hsh
>   end
>
>   def delete_job(job_id)
>     args = nil
>     @q.synchronize do  @id_hash_mutex.synchronize do
>       args = @id_hash[job_id]
>       return true if args.nil?
>       if args[:status] == :queued
>         @q.delete_if {|h| h[:id] == job_id }
>         @id_hash.delete(job_id)
>         return true
>       elsif args[:status] == :done
>         @id_hash.delete(job_id)
>         return true
>       end
>     end
>     end
>     ::BackgrounDRb::MiddleMan.instance.delete_worker(args 
> [:job_key]) if args[:status] == :running
>     return true
>   end
>
>   def do_work(args)
>     # You probably don't want to mess with this method unless you  
> know what's what.
>     @child_workers.each do |child_hash|
>       child_hash[:s_thread] = Thread.start do
>         loop do
>           # Wait for a new job in the @q
>           child_hash[:s_mutex].synchronize do
>             # get the Q mutex and wait for a job
>             @q.synchronize do
>             tl = Thread.list
>               @q_loaded_cv.wait_while { @q.empty? }
>               child_hash[:args] = @q.shift
>               if child_hash[:args][:id]
>                 @id_hash_mutex.synchronize do
>                   @id_hash[child_hash[:args][:id]][:status] = :running
>                   child_hash[:job_key] = spawn_worker 
> ({:args=>child_hash[:args],:class=>@child_class})
>                   @id_hash[child_hash[:args][:id]][:job_key] =  
> child_hash[:job_key]
>                 end
>               else
>                 child_hash[:job_key] = spawn_worker(job_args.merge 
> (:class=>@child_class))
>               end
>             end
>             self[child_hash[:job_key]].thread[:DQ_request].wait 
> (child_hash[:s_mutex])
>             # grab and store the results
>             if child_hash[:args][:id]
>               @id_hash_mutex.synchronize do
>                 if @reQ_on_finish
>                   r = self[child_hash[:job_key]].results
>                   @id_hash[child_hash[:args][:id]][:results] = r if r
>                   @id_hash[child_hash[:args][:id]][:status] = :done
>                   @id_hash[child_hash[:args][:id]][:done_at] =  
> Time.now
>                 else
>                   @id_hash.delete(child_hash[:args][:id])
>                 end
>               end
>             end
>             self[child_hash[:job_key]].thread[:DQed].signal
>             @completed += 1
>             [:args,:job_key].each {|k| child_hash[k] = nil }
>           end
>           # Loop back and wait for the job_key to get killed again....
>         end
>       end
>     end
>   end
>
>   private
>
>   def [](key)
>     # Use jobs to avoid the access time update w/ []
>     ::BackgrounDRb::MiddleMan.instance.jobs[key]
>   end
>
> end
> module BackgrounDRb
>
>   class RailsQueue < BackgrounDRb::Rails
>     attr_reader :progress
>     def initialize(key, args={})
>       super(key,args)
>       @job_ctrl = true
>     end
>
>     def start_process
>       return if schedule_first_run && schedule_first_run.to_i >  
> Time.now.to_i
>       @thread = Thread.new do
>         Thread.current[:safe_to_kill] = ConditionVariable.new
>         Thread.current[:kill] = false
>         Thread.current[:DQ_request] = ConditionVariable.new
>         Thread.current[:DQed] = ConditionVariable.new
>         Thread.current[:mutex] = Mutex.new
>         begin
>           Thread.current[:mutex].synchronize do
>             do_work(@args)
>           end
>         rescue Exception => e
>           @logger.error "#{ e.message } - (#{ e.class })" << "\n"  
> << (e.backtrace or []).join("\n")
>         end
>       end
>       @next_start = @interval.from_now if schedule_repeat
>     end
>
>     def results
>       # Overwrite this method and set reQ_on_finish = true (in the  
> queue worker args)
>       # to have a process put it's results in back in the queue
>       # for pickup before being killed
>       nil
>     end
>
>     def before_DQ(args=nil)
>       # stub method that gets called before dequeue is run.
>       # Overwrite in your class instance
>       true
>     end
>
>     def terminate(args=nil)
>       do_DQ(args)
>       super(args)
>     end
>
>     def suicide(args=nil)
>       do_DQ(args)
>       kill
>       Thread.exit
>     end
>
>     private
>     def do_DQ(args=nil)
>       before_DQ(args)
>       Thread.current[:DQ_request].signal
>       Thread.current[:DQed].wait(Thread.current[:mutex])
>     end
>   end
> end_______________________________________________
> Backgroundrb-devel mailing list
> Backgroundrb-devel at rubyforge.org
> http://rubyforge.org/mailman/listinfo/backgroundrb-devel



More information about the Backgroundrb-devel mailing list