[Backgroundrb-devel] A new QueueWorker class
Ezra Zygmuntowicz
ezmobius at gmail.com
Thu Aug 24 02:41:13 EDT 2006
Wow! Thats pretty thick ;) I'll work my way through it this weekend
and see how it works.
Thanks
-Ezra
On Aug 23, 2006, at 9:43 PM, David Lemstra wrote:
> Hello all,
> I've come up w/ a worker class that manages queued jobs using a fixed
> number of child workers. Well, that's not quite true -- a new
> worker is
> spawned for each job, but you set the total number that may exist
> at once.
>
> There are three components:
> 1) queue_worker.rb: The singleton worker that manages the child
> workers.
> You probably want to auto start this. Make sure you give it
> :singleton=>true. Read this file for the methods to interact with your
> children. (ie. queue_job(), delete_job(), job_progress() )
>
> 2) backgroundrb_rails_queue.rb: The super class for the "child
> workers"
> (and uses backgroundrb_rails.rb in turn). This file needs to be
> included
> in background.rb
>
> 3) Your child worker, which should be a subclass of
> BackgrounDRb::RailsQueue, is otherwise the about same as normal
> If it's a big loop, you probably want to use terminate?() on each
> iteration and update @progress. Use suicide() at the end to make room
> for the next child.
>
> Options: (probably in your backgroundrb.yml)
> autostart:
> :queue_key:
> class: queue_worker
> args:
> :num_child_workers: 2
> :child_class: :cost_calculator_worker
> :reQ_on_finish: true
> :singleton: true
>
> :queue_key can be changed to what you want, but it is the permanent
> key
> of the QueueWorker
> :num_child_workers: is up to you!
> :child_class: your worker class you want as child workers.
> :reQ_on_finish: do you want results to be stored in the queue until
> you
> call job_progress!() ?
> Note: to be able to access your child jobs w/ the QueueWorker methods,
> include a unique :id in your {args} when you queue_job({args})
>
> I'll attach the files. If they don't go through, I'll resend as text.
>
> BTW, This works well enough for me, but I'm learning as I go too,
> so no
> guarantees :) I don't use the fancy timing options, so ymmmv for
> :next_start and :interval.
>
> Let me know if you find any issues (though I'm off-line for a week
> after
> this post). I'm wondering myself if it might be better to reuse child
> workers instead of re-spawning new ones. Another day maybe.
>
> cheers,
> David Lemstra
> # Put your code that runs your task inside the do_work method
> # it will be run automatically in a thread. You have access to
> # all of your rails models if you set load_rails to true in the
> # config file. You also get @logger inside of this class by default.
> require 'monitor.rb'
>
> class QueueWorker < BackgrounDRb::Rails
>
> attr_reader :q, :id_hash, :completed
> def initialize(key, args={})
> super(key,args)
> @num_child_workers = args[:num_child_workers] ? args
> [:num_child_workers] : 1
> @child_workers = Array.new(@num_child_workers) {|i| Hash
> [:job_key,nil,:args,nil,:s_thread,nil, :s_mutex,Mutex.new, :child,
> i] }
>
> @q = []
> @q.extend(MonitorMixin)
> @q_loaded_cv = @q.new_cond
> @id_hash = {}
> @id_hash_mutex = Mutex.new
>
> raise ArgumentError unless args.has_key?(:child_class)
> @child_class = args[:child_class]
> @reQ_on_finish = args[:reQ_on_finish] || false
> @completed = 0
> end
>
> def queue_job(args)
> return nil if @id_hash && args[:id] && @id_hash.has_key?(args
> [:id])
> @q.synchronize do
> @q.push args
> @id_hash_mutex.synchronize { @id_hash[args[:id]] =
> {:status=>:queued, :job_key=>nil, :results=>nil } } if args[:id]
> @q_loaded_cv.signal
> end
> return true
> end
>
> def job_in_progress?(job_id)
> @id_hash.has_key?(job_id)
> end
>
> def job_status?(job_id)
> @id_hash_mutex.synchronize do
> return nil unless @id_hash.has_key?(job_id)
> return @id_hash[job_key][:status]
> end
> end
>
> def job_progress(job_id)
> report_hsh = {}
> @id_hash_mutex.synchronize do
> return nil unless @id_hash.has_key?(job_id)
> report_hsh[:status] = @id_hash[job_id][:status]
> report_hsh[:progress] = case @id_hash[job_id][:status]
> when :queued then
> ahead = 0
> @q.each_index {|i| if @q[i][:id] == job_id then ahead =
> i; break; end }
> ahead
> when :running
> w = self[@id_hash[job_id][:job_key]]
> w.nil? ? nil : w.progress
> when :done
> @id_hash[job_id][:results]
> else nil
> end
> end
> return report_hsh
> end
>
> def job_progress!(job_id)
> report_hsh = {}
> @id_hash_mutex.synchronize do
> return nil unless @id_hash.has_key?(job_id)
> report_hsh[:status] = @id_hash[job_id][:status]
> report_hsh[:progress] = case @id_hash[job_id][:status]
> when :queued then
> ahead = 0
> @q.each_index {|i| if @q[i][:id] == job_id then ahead =
> i; break; end }
> ahead
> when :running
> w = self[@id_hash[job_id][:job_key]]
> if w.nil?
> @id_hash.delete(job_id)[:results]
> else
> w.progress
> end
> when :done
> @id_hash.delete(job_id)[:results]
> else nil
> end
> end
> return report_hsh
> end
>
> def delete_job(job_id)
> args = nil
> @q.synchronize do @id_hash_mutex.synchronize do
> args = @id_hash[job_id]
> return true if args.nil?
> if args[:status] == :queued
> @q.delete_if {|h| h[:id] == job_id }
> @id_hash.delete(job_id)
> return true
> elsif args[:status] == :done
> @id_hash.delete(job_id)
> return true
> end
> end
> end
> ::BackgrounDRb::MiddleMan.instance.delete_worker(args
> [:job_key]) if args[:status] == :running
> return true
> end
>
> def do_work(args)
> # You probably don't want to mess with this method unless you
> know what's what.
> @child_workers.each do |child_hash|
> child_hash[:s_thread] = Thread.start do
> loop do
> # Wait for a new job in the @q
> child_hash[:s_mutex].synchronize do
> # get the Q mutex and wait for a job
> @q.synchronize do
> tl = Thread.list
> @q_loaded_cv.wait_while { @q.empty? }
> child_hash[:args] = @q.shift
> if child_hash[:args][:id]
> @id_hash_mutex.synchronize do
> @id_hash[child_hash[:args][:id]][:status] = :running
> child_hash[:job_key] = spawn_worker
> ({:args=>child_hash[:args],:class=>@child_class})
> @id_hash[child_hash[:args][:id]][:job_key] =
> child_hash[:job_key]
> end
> else
> child_hash[:job_key] = spawn_worker(job_args.merge
> (:class=>@child_class))
> end
> end
> self[child_hash[:job_key]].thread[:DQ_request].wait
> (child_hash[:s_mutex])
> # grab and store the results
> if child_hash[:args][:id]
> @id_hash_mutex.synchronize do
> if @reQ_on_finish
> r = self[child_hash[:job_key]].results
> @id_hash[child_hash[:args][:id]][:results] = r if r
> @id_hash[child_hash[:args][:id]][:status] = :done
> @id_hash[child_hash[:args][:id]][:done_at] =
> Time.now
> else
> @id_hash.delete(child_hash[:args][:id])
> end
> end
> end
> self[child_hash[:job_key]].thread[:DQed].signal
> @completed += 1
> [:args,:job_key].each {|k| child_hash[k] = nil }
> end
> # Loop back and wait for the job_key to get killed again....
> end
> end
> end
> end
>
> private
>
> def [](key)
> # Use jobs to avoid the access time update w/ []
> ::BackgrounDRb::MiddleMan.instance.jobs[key]
> end
>
> end
> module BackgrounDRb
>
> class RailsQueue < BackgrounDRb::Rails
> attr_reader :progress
> def initialize(key, args={})
> super(key,args)
> @job_ctrl = true
> end
>
> def start_process
> return if schedule_first_run && schedule_first_run.to_i >
> Time.now.to_i
> @thread = Thread.new do
> Thread.current[:safe_to_kill] = ConditionVariable.new
> Thread.current[:kill] = false
> Thread.current[:DQ_request] = ConditionVariable.new
> Thread.current[:DQed] = ConditionVariable.new
> Thread.current[:mutex] = Mutex.new
> begin
> Thread.current[:mutex].synchronize do
> do_work(@args)
> end
> rescue Exception => e
> @logger.error "#{ e.message } - (#{ e.class })" << "\n"
> << (e.backtrace or []).join("\n")
> end
> end
> @next_start = @interval.from_now if schedule_repeat
> end
>
> def results
> # Overwrite this method and set reQ_on_finish = true (in the
> queue worker args)
> # to have a process put it's results in back in the queue
> # for pickup before being killed
> nil
> end
>
> def before_DQ(args=nil)
> # stub method that gets called before dequeue is run.
> # Overwrite in your class instance
> true
> end
>
> def terminate(args=nil)
> do_DQ(args)
> super(args)
> end
>
> def suicide(args=nil)
> do_DQ(args)
> kill
> Thread.exit
> end
>
> private
> def do_DQ(args=nil)
> before_DQ(args)
> Thread.current[:DQ_request].signal
> Thread.current[:DQed].wait(Thread.current[:mutex])
> end
> end
> end_______________________________________________
> Backgroundrb-devel mailing list
> Backgroundrb-devel at rubyforge.org
> http://rubyforge.org/mailman/listinfo/backgroundrb-devel
More information about the Backgroundrb-devel
mailing list