class Dynflow::ExecutionPlan

TODO extract planning logic to an extra class ExecutionPlanner

Attributes

ended_at[R]
execution_history[R]
execution_time[R]
finalize_flow[R]
id[R]
real_time[R]
root_plan_step[R]
run_flow[R]
started_at[R]
steps[R]
world[R]

Public Class Methods

new(world, id = SecureRandom.uuid, state = :pending, root_plan_step = nil, run_flow = Flows::Concurrence.new([]), finalize_flow = Flows::Sequence.new([]), steps = {}, started_at = nil, ended_at = nil, execution_time = nil, real_time = 0.0, execution_history = ExecutionHistory.new) click to toggle source

all params with default values are part of private api

# File lib/dynflow/execution_plan.rb, line 37
def initialize(world,
    id = SecureRandom.uuid,
    state = :pending,
    root_plan_step = nil,
    run_flow = Flows::Concurrence.new([]),
    finalize_flow = Flows::Sequence.new([]),
    steps = {},
    started_at = nil,
    ended_at = nil,
    execution_time = nil,
    real_time = 0.0,
    execution_history = ExecutionHistory.new)

  @id                = Type! id, String
  @world             = Type! world, World
  self.state         = state
  @run_flow          = Type! run_flow, Flows::Abstract
  @finalize_flow     = Type! finalize_flow, Flows::Abstract
  @root_plan_step    = root_plan_step
  @started_at        = Type! started_at, Time, NilClass
  @ended_at          = Type! ended_at, Time, NilClass
  @execution_time    = Type! execution_time, Numeric, NilClass
  @real_time         = Type! real_time, Numeric
  @execution_history = Type! execution_history, ExecutionHistory

  steps.all? do |k, v|
    Type! k, Integer
    Type! v, Steps::Abstract
  end
  @steps = steps
end
new_from_hash(hash, world) click to toggle source
# File lib/dynflow/execution_plan.rb, line 338
def self.new_from_hash(hash, world)
  check_class_matching hash
  execution_plan_id = hash[:id]
  steps             = steps_from_hash(hash[:step_ids], execution_plan_id, world)
  self.new(world,
           execution_plan_id,
           hash[:state],
           steps[hash[:root_plan_step_id]],
           Flows::Abstract.from_hash(hash[:run_flow]),
           Flows::Abstract.from_hash(hash[:finalize_flow]),
           steps,
           string_to_time(hash[:started_at]),
           string_to_time(hash[:ended_at]),
           hash[:execution_time].to_f,
           hash[:real_time].to_f,
           ExecutionHistory.new_from_hash(hash[:execution_history]))
end
results() click to toggle source
# File lib/dynflow/execution_plan.rb, line 22
def self.results
  @results ||= [:pending, :success, :warning, :error]
end
state_transitions() click to toggle source
# File lib/dynflow/execution_plan.rb, line 26
def self.state_transitions
  @state_transitions ||= { pending:  [:stopped, :scheduled, :planning],
                           scheduled: [:planning, :stopped],
                           planning: [:planned, :stopped],
                           planned:  [:running, :stopped],
                           running:  [:paused, :stopped],
                           paused:   [:running],
                           stopped:  [] }
end
states() click to toggle source
# File lib/dynflow/execution_plan.rb, line 18
def self.states
  @states ||= [:pending, :scheduled, :planning, :planned, :running, :paused, :stopped]
end

Private Class Methods

steps_from_hash(step_ids, execution_plan_id, world) click to toggle source
# File lib/dynflow/execution_plan.rb, line 408
def self.steps_from_hash(step_ids, execution_plan_id, world)
  step_ids.inject({}) do |hash, step_id|
    step = world.persistence.load_step(execution_plan_id, step_id, world)
    hash.update(step_id.to_i => step)
  end
end

Public Instance Methods

actions() click to toggle source

@return [Array<Action>] actions in Present phase

# File lib/dynflow/execution_plan.rb, line 380
def actions
  @actions ||= begin
    [entry_action] + entry_action.all_planned_actions
  end
end
add_finalize_step(action) click to toggle source
# File lib/dynflow/execution_plan.rb, line 311
def add_finalize_step(action)
  add_step(Steps::FinalizeStep, action.class, action.id).tap do |step|
    step.progress_weight = action.finalize_progress_weight
    finalize_flow << Flows::Atom.new(step.id)
  end
end
add_plan_step(action_class, caller_action = nil) click to toggle source
# File lib/dynflow/execution_plan.rb, line 293
def add_plan_step(action_class, caller_action = nil)
  add_step(Steps::PlanStep, action_class, generate_action_id).tap do |step|
    # TODO: to be removed and preferred by the caller_action
    if caller_action && caller_action.execution_plan_id == self.id
      @steps[caller_action.plan_step_id].children << step.id
    end
    step.initialize_action(caller_action)
  end
end
add_run_step(action) click to toggle source
# File lib/dynflow/execution_plan.rb, line 303
def add_run_step(action)
  add_step(Steps::RunStep, action.class, action.id).tap do |step|
    step.progress_weight = action.run_progress_weight
    @dependency_graph.add_dependencies(step, action)
    current_run_flow.add_and_resolve(@dependency_graph, Flows::Atom.new(step.id))
  end
end
add_scheduling_step(action_class, caller_action = nil) click to toggle source
# File lib/dynflow/execution_plan.rb, line 287
def add_scheduling_step(action_class, caller_action = nil)
  add_step(Steps::PlanStep, action_class, generate_action_id, :scheduling).tap do |step|
    step.initialize_action(caller_action)
  end
end
caller_execution_plan_id() click to toggle source
# File lib/dynflow/execution_plan.rb, line 386
def caller_execution_plan_id
  entry_action.caller_execution_plan_id
end
cancel() click to toggle source

sends the cancel event to all currently running and cancellable steps. if the plan is just scheduled, it cancels it (and returns an one-item array with the future value of the cancel result)

# File lib/dynflow/execution_plan.rb, line 209
def cancel
  if state == :scheduled
    [Concurrent.future.tap { |f| f.success delay_record.cancel }]
  else
    steps_to_cancel.map do |step|
      world.event(id, step.id, ::Dynflow::Action::Cancellable::Cancel)
    end
  end
end
cancellable?() click to toggle source
# File lib/dynflow/execution_plan.rb, line 219
def cancellable?
  return true if state == :scheduled
  return false unless state == :running
  steps_to_cancel.any?
end
compute_execution_time() click to toggle source
# File lib/dynflow/execution_plan.rb, line 356
def compute_execution_time
  self.steps.values.reduce(0) do |execution_time, step|
    execution_time + (step.execution_time || 0)
  end
end
current_run_flow() click to toggle source

@api private

# File lib/dynflow/execution_plan.rb, line 263
def current_run_flow
  @run_flow_stack.last
end
delay(caller_action, action_class, delay_options, *args) click to toggle source
# File lib/dynflow/execution_plan.rb, line 159
def delay(caller_action, action_class, delay_options, *args)
  save
  @root_plan_step = add_scheduling_step(action_class, caller_action)
  execution_history.add("delay", @world.id)
  serializer = root_plan_step.delay(delay_options, args)
  delayed_plan = DelayedPlan.new(@world,
                                     id,
                                     delay_options[:start_at],
                                     delay_options.fetch(:start_before, nil),
                                     serializer)
  persistence.save_delayed_plan(delayed_plan)
ensure
  update_state(error? ? :stopped : :scheduled)
end
delay_record() click to toggle source
# File lib/dynflow/execution_plan.rb, line 174
def delay_record
  @delay_record ||= persistence.load_delayed_plan(id)
end
entry_action() click to toggle source
# File lib/dynflow/execution_plan.rb, line 375
def entry_action
  @entry_action ||= root_plan_step.action(self)
end
error?() click to toggle source
# File lib/dynflow/execution_plan.rb, line 103
def error?
  result == :error
end
error_in_plan?() click to toggle source
# File lib/dynflow/execution_plan.rb, line 107
def error_in_plan?
  steps_in_state(:error).any? { |step| step.is_a? Steps::PlanStep }
end
errors() click to toggle source
# File lib/dynflow/execution_plan.rb, line 111
def errors
  steps.values.map(&:error).compact
end
failed_steps() click to toggle source
# File lib/dynflow/execution_plan.rb, line 133
def failed_steps
  steps_in_state(:error)
end
generate_action_id() click to toggle source
# File lib/dynflow/execution_plan.rb, line 149
def generate_action_id
  @last_action_id ||= 0
  @last_action_id += 1
end
generate_step_id() click to toggle source
# File lib/dynflow/execution_plan.rb, line 154
def generate_step_id
  @last_step_id ||= 0
  @last_step_id += 1
end
logger() click to toggle source
# File lib/dynflow/execution_plan.rb, line 69
def logger
  @world.logger
end
plan(*args) click to toggle source
# File lib/dynflow/execution_plan.rb, line 187
def plan(*args)
  update_state(:planning)
  world.middleware.execute(:plan_phase, root_plan_step.action_class, self) do
    with_planning_scope do
      root_plan_step.execute(self, nil, false, *args)

      if @dependency_graph.unresolved?
        raise "Some dependencies were not resolved: #{@dependency_graph.inspect}"
      end
    end
  end

  if @run_flow.size == 1
    @run_flow = @run_flow.sub_flows.first
  end
  steps.values.each(&:save)
  update_state(error? ? :stopped : :planned)
end
prepare(action_class, options = {}) click to toggle source
# File lib/dynflow/execution_plan.rb, line 178
def prepare(action_class, options = {})
  options = options.dup
  caller_action = Type! options.delete(:caller_action), Dynflow::Action, NilClass
  raise "Unexpected options #{options.keys.inspect}" unless options.empty?
  save
  @root_plan_step = add_plan_step(action_class, caller_action)
  @root_plan_step.save
end
progress() click to toggle source

@return [0..1] the percentage of the progress. See Action::Progress for more info

# File lib/dynflow/execution_plan.rb, line 364
def progress
  return 0 if [:pending, :planning, :scheduled].include?(state)
  flow_step_ids         = run_flow.all_step_ids + finalize_flow.all_step_ids
  plan_done, plan_total = flow_step_ids.reduce([0.0, 0]) do |(done, total), step_id|
    step = self.steps[step_id]
    [done + (step.progress_done * step.progress_weight),
     total + step.progress_weight]
  end
  plan_total > 0 ? (plan_done / plan_total) : 1
end
rescue_from_error() click to toggle source
# File lib/dynflow/execution_plan.rb, line 141
def rescue_from_error
  if rescue_plan_id = self.rescue_plan_id
    @world.execute(rescue_plan_id)
  else
    raise Errors::RescueError, 'Unable to rescue from the error'
  end
end
rescue_plan_id() click to toggle source
# File lib/dynflow/execution_plan.rb, line 123
def rescue_plan_id
  case rescue_strategy
  when Action::Rescue::Pause
    nil
  when Action::Rescue::Skip
    failed_steps.each { |step| self.skip(step) }
    self.id
  end
end
rescue_strategy() click to toggle source
# File lib/dynflow/execution_plan.rb, line 115
def rescue_strategy
  Type! entry_action.rescue_strategy, Action::Rescue::Strategy
end
result() click to toggle source
# File lib/dynflow/execution_plan.rb, line 90
def result
  all_steps = steps.values
  if all_steps.any? { |step| step.state == :error }
    return :error
  elsif all_steps.any? { |step| [:skipping, :skipped].include?(step.state) }
    return :warning
  elsif all_steps.all? { |step| step.state == :success }
    return :success
  else
    return :pending
  end
end
save() click to toggle source
# File lib/dynflow/execution_plan.rb, line 334
def save
  persistence.save_execution_plan(self)
end
skip(step) click to toggle source
# File lib/dynflow/execution_plan.rb, line 231
def skip(step)
  steps_to_skip = steps_to_skip(step).each(&:mark_to_skip)
  self.save
  return steps_to_skip
end
steps_in_state(*states) click to toggle source
# File lib/dynflow/execution_plan.rb, line 137
def steps_in_state(*states)
  self.steps.values.find_all {|step| states.include?(step.state) }
end
steps_to_cancel() click to toggle source
# File lib/dynflow/execution_plan.rb, line 225
def steps_to_cancel
  steps_in_state(:running, :suspended).find_all do |step|
    step.action(self).is_a?(::Dynflow::Action::Cancellable)
  end
end
steps_to_skip(step) click to toggle source

All the steps that need to get skipped when wanting to skip the step includes the step itself, all steps dependent on it (even transitively) FIND maybe move to persistence to let adapter to do it effectively? @return [Array<Steps::Abstract>]

# File lib/dynflow/execution_plan.rb, line 241
def steps_to_skip(step)
  dependent_steps = steps.values.find_all do |s|
    next if s.is_a? Steps::PlanStep
    action = persistence.load_action(s)
    action.required_step_ids.include?(step.id)
  end

  steps_to_skip = dependent_steps.map do |dependent_step|
    steps_to_skip(dependent_step)
  end.flatten

  steps_to_skip << step

  if step.is_a? Steps::RunStep
    finalize_step_id = persistence.load_action(step).finalize_step_id
    steps_to_skip << steps[finalize_step_id] if finalize_step_id
  end

  return steps_to_skip.uniq
end
sub_plans() click to toggle source
# File lib/dynflow/execution_plan.rb, line 119
def sub_plans
  persistence.find_execution_plans(filters: { 'caller_execution_plan_id' => self.id })
end
switch_flow(new_flow, &block) click to toggle source

@api private Switches the flow type (Sequence, Concurrence) to be used within the block.

# File lib/dynflow/execution_plan.rb, line 279
def switch_flow(new_flow, &block)
  @run_flow_stack << new_flow
  return block.call
ensure
  @run_flow_stack.pop
  current_run_flow.add_and_resolve(@dependency_graph, new_flow) if current_run_flow
end
to_hash() click to toggle source
# File lib/dynflow/execution_plan.rb, line 318
def to_hash
  recursive_to_hash id:                self.id,
                    class:             self.class.to_s,
                    state:             self.state,
                    result:            result,
                    root_plan_step_id: root_plan_step && root_plan_step.id,
                    run_flow:          run_flow,
                    finalize_flow:     finalize_flow,
                    step_ids:          steps.map { |id, _| id },
                    started_at:        time_to_str(started_at),
                    ended_at:          time_to_str(ended_at),
                    execution_time:    execution_time,
                    real_time:         real_time,
                    execution_history: execution_history.to_hash
end
update_state(state) click to toggle source
# File lib/dynflow/execution_plan.rb, line 73
def update_state(state)
  original = self.state
  case self.state = state
  when :planning
    @started_at = Time.now
  when :stopped
    @ended_at       = Time.now
    @real_time      = @ended_at - @started_at unless @started_at.nil?
    @execution_time = compute_execution_time
  else
    # ignore
  end
  logger.debug format('%13s %s    %9s >> %9s',
                      'ExecutionPlan', id, original, state)
  self.save
end
with_planning_scope(&block) click to toggle source

@api private

# File lib/dynflow/execution_plan.rb, line 268
def with_planning_scope(&block)
  @run_flow_stack   = []
  @dependency_graph = DependencyGraph.new
  switch_flow(run_flow, &block)
ensure
  @run_flow_stack   = nil
  @dependency_graph = nil
end

Private Instance Methods

add_step(step_class, action_class, action_id, state = :pending) click to toggle source
# File lib/dynflow/execution_plan.rb, line 396
def add_step(step_class, action_class, action_id, state = :pending)
  step_class.new(self.id,
                 self.generate_step_id,
                 state,
                 action_class,
                 action_id,
                 nil,
                 world).tap do |new_step|
    @steps[new_step.id] = new_step
  end
end
persistence() click to toggle source
# File lib/dynflow/execution_plan.rb, line 392
def persistence
  world.persistence
end