def each(foreach=nil, after=nil, &blk)
raise ArgumentError, 'proc or block required for iteration' unless foreach ||= blk
raise RuntimeError, 'cannot iterate over an iterator more than once' if @started or @ended
@started = true
@pending = 0
@workers = 0
all_done = proc{
after.call if after and @ended and @pending == 0
}
@process_next = proc{
unless @ended or @workers > @concurrency
if @list.empty?
@ended = true
@workers -= 1
all_done.call
else
item = @list.shift
@pending += 1
is_done = false
on_done = proc{
raise RuntimeError, 'already completed this iteration' if is_done
is_done = true
@pending -= 1
if @ended
all_done.call
else
EM.next_tick(@process_next)
end
}
class << on_done
alias :next :call
end
foreach.call(item, on_done)
end
else
@workers -= 1
end
}
spawn_workers
self
end