options: :ordered [true|false] - whether the output should stay in the same order
as the input (even though it may not actually be processed in that order). Default: true
:stop_on_exception [true|false] - if true, when an exception occurs in either
input or output, we wait for any outstanding processing to complete, but will not process any new inputs. Default: false
:main_thread_processing [true|false] - whether the main thread pulling
on each() is allowed to process inputs. Default: true NOTE: If you set this to false, parallelizer.kill will stop each() in its tracks, so you need to know for sure that won't happen.
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 20 def initialize(parent_task_queue, input_enumerable, options = {}, &block) @parent_task_queue = parent_task_queue @input_enumerable = input_enumerable @options = options @block = block @unconsumed_input = Queue.new @in_process = {} @unconsumed_output = Queue.new end
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 87 def count(*args, &block) if args.size == 0 && block.nil? @input_enumerable.count else original_count(*args, &block) end end
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 103 def drop(n) restricted_copy(@input_enumerable.drop(n)).to_a end
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 36 def each each_with_input do |output, index, input, type| yield output end end
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 64 def each_with_exceptions(&block) if @options[:ordered] == false each_with_exceptions_unordered(&block) else each_with_exceptions_ordered(&block) end end
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 42 def each_with_index each_with_input do |output, index, input| yield output, index end end
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 48 def each_with_input exception = nil each_with_exceptions do |output, index, input, type| if type == :exception if @options[:ordered] == false exception ||= output else raise output end else yield output, index, input end end raise exception if exception end
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 95 def first(n=nil) if n restricted_copy(@input_enumerable.first(n)).to_a else first(1)[0] end end
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 107 def flatten(levels = nil) FlattenEnumerable.new(self, levels) end
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 139 def lazy RestrictedLazy.new(self, original_lazy) end
Enumerable methods
# File lib/chef/chef_fs/parallelizer/parallel_enumerable.rb, line 81 def restricted_copy(enumerable) ParallelEnumerable.new(@parent_task_queue, enumerable, @options, &@block) end
Generated with the Darkfish Rdoc Generator 2.