Tries to balance several guarantees, in order of priority:
don’t get deadlocked
provide results in desired order
provide results as soon as they are available
process input as soon as possible
# File lib/chef/chef_fs/parallelizer.rb, line 32 def initialize(num_threads) @tasks = Queue.new @threads = [] @stop_thread = {} resize(num_threads) end
# File lib/chef/chef_fs/parallelizer.rb, line 28 def self.parallel_do(enumerable, options = {}, &block) parallelizer.parallel_do(enumerable, options, &block) end
# File lib/chef/chef_fs/parallelizer.rb, line 24 def self.parallelize(enumerable, options = {}, &block) parallelizer.parallelize(enumerable, options, &block) end
# File lib/chef/chef_fs/parallelizer.rb, line 78 def kill @threads.each do |thread| Thread.kill(thread) @stop_thread.delete(thread) end @threads = [] end
# File lib/chef/chef_fs/parallelizer.rb, line 39 def num_threads @threads.size end
# File lib/chef/chef_fs/parallelizer.rb, line 47 def parallel_do(enumerable, options = {}, &block) ParallelEnumerable.new(@tasks, enumerable, options.merge(:ordered => false), &block).wait end
# File lib/chef/chef_fs/parallelizer.rb, line 43 def parallelize(enumerable, options = {}, &block) ParallelEnumerable.new(@tasks, enumerable, options, &block) end
# File lib/chef/chef_fs/parallelizer.rb, line 55 def resize(to_threads, wait = true, timeout = nil) if to_threads < num_threads threads_to_stop = @threads[to_threads..num_threads-1] @threads = @threads.slice(0, to_threads) threads_to_stop.each do |thread| @stop_thread[thread] = true end if wait start_time = Time.now threads_to_stop.each do |thread| thread_timeout = timeout ? timeout - (Time.now - start_time) : nil thread.join(thread_timeout) end end else num_threads.upto(to_threads - 1) do |i| @threads[i] = Thread.new(&method(:worker_loop)) end end end
Generated with the Darkfish Rdoc Generator 2.