boolean_fill.R

              
            

A stagerunner describes a linear sequence of execution: import data, perform this munging step, then that munging, then do some modeling, etc. However, it is structured hierarchically as a nested list for easier usability. This function will create a nested list with the exact same structure as the stagerunner except that each terminal node is either TRUE or FALSE.

Specifically, given a tree structure with exactly one TRUE value in the terminal nodes, all successors of that node will be marked as TRUE as well. Conversely, if forward = FALSE, then all predecessors of that node will be marked as TRUE.

For example, imagine we have a stagerunner with the following stages:

  • import data
  • clean data
    • impute variable 1
    • discretize variable 2
  • train model

If el, the first argument to the boolean_fill function depicted on the right, refers to the “impute variable 1” stage, it will be represented as el = list(F, list(T, F), F).

  • import data
  • clean data
    • impute variable 1
    • discretize variable 2
  • train model

Then calling the boolean_fill function depicted on the right with forward = TRUE will signify we want to select the stages occuring afterwards:

              # The left panel will explain at a high level the code you see in this right panel.
# Scroll down to begin reading the code behind the stagerunner package.

            
  • import data
  • clean data
    • impute variable 1
    • discretize variable 2
  • train model

The return value will be list(F, list(T, T), T). If instead we want the stages before “impute variable 1”, the return value will be list(T, list(T, F), F).

  • import data
  • clean data
    • impute variable 1
    • discretize variable 2
  • train model
              #' Fill a nested logical list with TRUEs before or after the first TRUE
#' 
#' This is a helper function to implement the \code{to} parameter
#' in the \code{run} method on a stageRunner object.
#'
#' @seealso \code{\link{run}}
#' @name boolean_fill
#' @param el list. A nested list of logicals with exactly one entry \code{TRUE}.
#' @param forward logical. \code{FALSE} for backwards, and \code{TRUE} for forwards.
#'   The default is \code{TRUE}.
#' @return the filled list
boolean_fill <- function(el, forward = TRUE) {
            

We now dig into the actual code. The !is.finite condition on the right works due to the behavior of which. If no element of the (possibly nested) list el contains a TRUE, it will return a zero-length vector. When we subset to [1], we get NA, which fails is.finite.

                ix <- which(vapply(el, contains_true, logical(1)))[1]
  if (!is.finite(ix)) stop("boolean_fill called but no TRUEs found")

            

We could have checked that there is a TRUE somewhere in el more elegantly, but we need the precise location, ix.

                if (isTRUE(forward)) {
    fills <- seq_len(length(el) - ix) + ix
  } else {
    fills <- seq_len(ix - 1)
  }
  el[fills] <- TRUE
  
            

If the sequence of slots before or after the index which contains a TRUE (according to the value of forward) have been flattened to a TRUE value, the only remaining TRUE flattening has to occur recursively in any remaining list elements.

                if (!is.atomic(el[[ix]])) {
    el[[ix]] <- boolean_fill(el[[ix]], forward = forward)
  }
  el
}

            

compare_stage_keys.R

              
            

A stagerunner is simply a linear sequence (of usually functions) that is packaged as a tree structure to make it easier to reference related groups of operations.

Since stagerunners are intended to be run sequentially, that is, only backward to forwards rather than the other way around, it is important to be able to identify when it is accidentally run in the latter way. For example, if we have three stages and a runner is called in the wrong order with runner$run(2, 1), we expect stage 1 to execute before stage 2.

The point of compare_stage_keys is to determine whether a later stage has been called prior to an earlier stage. If that is the case, this function will return TRUE, and later in the internals of the stagerunner object we will be able to flip the keys.

              #' Compare two stage keys to see which one has a stage run first
#' 
#' This is a helper function to implement the \code{to} parameter
#' in the \code{run} method on a stageRunner object.
#'
#' @seealso \code{\link{run}}
#' @name compare_stage_keys
#' @param key1 list
#' @param key2 list
#' @return logical. Whether or not key1 runs a stage before key2.
compare_stage_keys <- function(key1, key2) {
  if (length(key1) == 0 || isTRUE(key1)) return(TRUE)
  index_of_true <- function(el) which(vapply(el, contains_true, logical(1)))[1]
            

ix will be a vector of the location in the list structure of key1 and key2 where the first (and, if they are correctly formed, the only) TRUE occurs.

                ix <- vapply(list(key1, key2), index_of_true, numeric(1))

            

If no element in the first structure has a TRUE, the first key definitely cannot run a stage first (since no stages have been marked for running!).

                if (!is.finite(ix[1])) {
    return(FALSE)
            

Otherwise, if the latter has no TRUEs, the first key definitely runs first.

                } else if (!is.finite(ix[2])) { return(TRUE) }

  if (ix[1] == ix[2]) {
            

If the keys specify the exact same stage in the stagerunner, there is a tie and we may as well return TRUE.

                  if (is.atomic(key1) && is.atomic(key2)) {
      TRUE
    } else {
            

Otherwise, one of the two keys must be another list, so we can recursively determine which key runs first.

                    compare_stage_keys(key1[[ix[1]]], key2[[ix[2]]])
    }
            

We now come across a special case. Imagine we have a stagerunner as before.

  • import data
  • clean data
    • impute variable 1
    • remove outliers from variable 1
    • discretize variable 2
  • train model

If we call runner$run("clean data/2", "clean data") this will signify to “run from 'remove outliers from variable 1' to the end of the 'clean data' stage” (i.e., until “discretize variable 2”).

However, note in this case that the two keys would be represented by list(F, list(F, T, T), F) and list(F, T, F). If we are not careful, the compare_stage_keys function will indicate that the latter occurs before the former, the keys will be flipped, and we will end up executing “impute variable 1” instead!

  • import data
  • clean data
    • impute variable 1 (the unintended effect)
    • remove outliers from variable 1
    • discretize variable 2
  • train model
              
            

We solve this problem by returning TRUE if key2 consists purely of TRUEs (i.e., if it signifies “run until the end of this stage”).

                } else if (all(vapply(key2, isTRUE, logical(1)))) {
    TRUE 
  } else { 
            

Finally, if the keys are directly comparable, the first key should be run earlier if and only if the first key contains a TRUE earlier than the second key.

                  ix[1] < ix[2]
  }
}


            

copy_env.R

              
            
              #' Copy one environment into another recursively.
#' 
#' @param to environment. The new environment.
#' @param from environment. The old environment.
#' @note Both \code{to} and \code{from} must be pre-existing environments
#'   or this function will error.
copy_env <- function(to, from) {
  stopifnot(is.environment(to) && is.environment(from))
  rm(list = ls(to, all.names = TRUE), envir = to)
  for (name in ls(from, all.names = TRUE)) {
    if (is.environment(from[[name]])) {
      # Copy a sub-environment in full.
      assign(name, new.env(parent = parent.env(from[[name]])), envir = to) 
      copy_env(to[[name]], from[[name]])
    } else assign(name, from[[name]], envir = to)
  }
}

            

is_pre_stagerunner.R

              
            

The only way to turn an object into a stagerunner is if it can be interpreted as a hierarchical sequence of execution: run some stuff in group A, then run some stuff in group B, and so on, with each group potentially containing more subgroups.

In other words, the things which can be turned into stagerunners are:

  • functions
  • other stagerunners (sub-stagerunners)
  • lists composed of the above

The purpose of the is_pre_stagerunner function is to determine whether an object satisfies these restrictions.

              #' Whether or not an object can be transformed into a stageRunner.
#'
#' @param x ANY. An R object for which it will be determined whether or not
#'    it admits a conversion to a stageRunner.
#' @return TRUE or FALSE according to whether the object can be transformed
#'    to a stageRunner. In general, only a function or list of functions
#'    can be turned into a stageRunner.
#' @export
#' @examples
#' stopifnot(is_pre_stagerunner(function(e) { e$x <- 1 }))
#' stopifnot(is_pre_stagerunner(list(function(e) { e$x <- 1 }, function(e) { e$y <- 2 })))
#' stopifnot(is_pre_stagerunner(
#'   list(a = function(e) { e$x <- 1 },
#'     list(b = function(e) { e$y <- 2 }, c = function(e) { e$z <- 3 }))))
#' 
#' stopifnot(!is_pre_stagerunner(NULL))
#' stopifnot(!is_pre_stagerunner(5))
#' stopifnot(!is_pre_stagerunner(iris))
is_pre_stagerunner <- function(x) {
  if (is.function(x) || is.stagerunner(x)) { return(TRUE) }
  if (!is.recursive(x) || is.environment(x)) { return(FALSE) }

            

Using a for loop is a tiny bit faster than an apply-family operation because we can exit the function early.

                for (i in seq_along(x)) {
    if (!(is.function(x[[i]]) || is.stagerunner(x[[i]]) || is.null(x[[i]]) ||
            

We use the base function Recall for its recursive effect.

                        (is.recursive(x[[i]]) && Recall(x[[i]])))) {
      return(FALSE)
    }
  }

  TRUE
}

            

normalize_stage_keys.R

              
            

To determine what stages of a stagerunner to execute, we will use a nested list format that is equivalent in structure to the runner. For example, imagine we have a stagerunner with the following stages:

  • import data
  • clean data
    • impute variable 1
    • discretize variable 2
  • train model

We would like to be able to execute swaths of this runner at will: runner$run("clean"), runner$run("clean/1", "clean/2") and runner$run(2) should all execute the data cleaning sub-stages.

The normalize_stage_keys function will convert human-readable descriptions of what to execute, like "clean" or 2, to a nested list format that will be easier to use later during stage execution.

For example, "clean/1" will be converted to list(F, list(T, F), F) and mimic the structure of the stagerunner.

              #' Normalize a reference to stage keys 
#'
#' For example, \code{list('data/one', 2)} would be converted to
#' \code{list('data', list('one')), 2)}.
#'
#' @name normalize_stage_keys
#' @param keys a list. The keys to normalize.
#' @param stages a list. The stages we're normalizing with respect to.
#' @param parent_key character. A helper for sane recursive error handling.
#'    For example, if we try to reference key \code{foo/bar}, but a recursive
#'    call to \code{normalize_stage_keys} errors when \code{bar} isn't found,
#'    we would still like the error to display the full name (\code{foo/bar}).
#' @param to an indexing parameter. If \code{keys} refers to a single stage,
#'   attempt to find all stages from that stage to this stage (or, if this one
#'   comes first, this stage to that stage). For example, if we have
#'      \code{stages = list(a = list(b = 1, c = 2), d = 3, e = list(f = 4, g = 5))}
#'   where the numbers are some functions, and we call \code{normalize_stage_keys}
#'   with \code{keys = 'a/c'} and \code{to = 'e/f'}, then we would obtain a nested
#'   list of logicals referencing \code{"a/c", "d", "e/f"}.
#' @return a list. The format is nested logicals. For example, if \code{stages} is
#'   \code{list(one = stageRunner$new(new.env(), list(subone = function(cx) 1)),
#'              two = function(cx) 1)}
#' then
#'   \code{normalize_stage_keys('one/subone')}
#' would return
#'   \code{list(one = list(subone = TRUE), two = FALSE)}.
#' @seealso stageRunner__run
#' @examples
#' \dontrun{
#'   stopifnot(identical(normalize_stage_keys("foo/bar",
#'     list(foo = list(bar = NULL, baz = NULL))),
#'     list(list(TRUE, FALSE))))
#' }
normalize_stage_keys <- function(keys, stages, to = NULL, parent_key = "") {
  if (is.null(to)) {
    normalize_stage_keys_unidirectional(keys, stages, parent_key)
  } else {
    normalize_stage_keys_bidirectional(keys, to, stages)
  }
}

normalize_stage_keys_unidirectional <- function(keys, stages, parent_key) {
  if (is.null(keys) || length(keys) == 0 || identical(keys, "")) {
            

By default, no key provided means to execute everything in this stage. For single stages, that means TRUE. For multiple stages, a list of TRUEs equal to the number of stages.

                  return(if (!is.list(stages) || length(stages) == 1) TRUE
           else rep(list(TRUE), length(stages)))
  }
            

Stagerunners are set up recursively, so we need to extract the list of stages out of the stagerunner object. For example,

  • import data
  • clean data
    • impute variable 1
    • discretize variable 2

actually consists of two stagerunners, one for the whole list and one for the “clean data” stage.

                if (is.stagerunner(stages)) stages <- stages$stages

            

The output of normalize_stage_keys is a (possibly nested) list whose terminal nodes are all logical. If keys is already of this format, we are done.

                if (all_logical(keys)) return(keys) # Already normalized

            

We performed checks for special cases, so now we call a function that assumes all those cases have been taken care of.

                normalize_stage_keys_unidirectional_(keys, stages, parent_key)
}

            

Our strategy to determine which stages to run, and thus translate keys from a form like “munge/impute variable 5” to a nested list, will be to start with a list of consisting entirely of FALSE and filling in the sub-stages the user requested in the keys with TRUEs.

              normalize_stage_keys_unidirectional_ <- function(keys, stages, parent_key) {
  if (is.numeric(keys) && any(keys < 0)) { 
    as.list(!is.element(seq_len(stage_length(stages)), -keys))
            

Negative indexing, like -c(2:3), is easy: set everything except those keys to TRUE.

                } else {
    key_length      <- if (is.list(stages)) length(stages) else 1
    normalized_keys <- rep(list(FALSE), key_length)

            

Each element of the provided keys has a chance to modify normalized_keys. We achieve this using Reduce.

                  Reduce(function(normalized_keys, index) {
      normalize_stage_keys_by_index(keys, stages, parent_key, index, normalized_keys)
    }, seq_along(keys), normalized_keys)
  }
}

normalize_stage_keys_by_index <- function(keys, stages, parent_key,
                                          key_index, normalized_keys) {
  key <- keys[[key_index]]

  rest_keys <- key[-1]
  key       <- key[[1]]
  
  normalize_stage_key(key = key, keys = rest_keys, stages = stages,
                      parent_key = parent_key, key_index = key_index,
                      normalized_keys = normalized_keys)
}

normalize_stage_key <- function(...) {
  UseMethod("normalize_stage_key")
}

normalize_stage_key.logical <- function(key, key_index, normalized_keys, ...) {
  #stop("!")
  normalized_keys[[key_index]] <- key
  normalized_keys
}

normalize_stage_key.numeric <- function(key, keys, stages, parent_key, 
                                        normalized_keys, ...) {
  stopifnot(length(key) == 1)
  if (key > stage_length(stages)) {
    stop(sprintf(
      "Cannot reference sub-stage %s of stage %s because it only has %d stages",
      sQuote(key), dQuote(parent_key), stage_length(stages)
    ))
  }

  normalized_keys[[as.integer(key)]] <-
    if (length(keys) == 0) TRUE
    else normalize_stage_keys(keys, stages[[as.integer(key)]],
                              parent_key = paste0(parent_key, key, '/'))
  normalized_keys
}

normalize_stage_key.character <- function(key, keys, stages, parent_key,
                                          normalized_keys, ...) {

  # The hard part! Allow things like one/subone/subsubone/etc
  # to reference arbitrarily nested stages.
  if (length(key) == 0) stop("Stage key of length zero")
  key <- strsplit(key, '/')[[1]]

  if (is.stageRunnerNode(stages)) {
    stop("No stage with key '", paste0(parent_key, key[[1]]), "' found")
  }

  key_index <- tolower(key[[1]]) == tolower(names(stages))
  if (!any(key_index)) {
    key_index <- grepl(tolower(key[[1]]), tolower(names(stages)))
  }

  if (is.finite(suppressWarnings(tmp <- as.numeric(key[[1]]))) &&
      tmp > 0 && tmp <= length(stages)) {
    key_index <- tmp
  } else if (length(key_index) == 0 || sum(key_index) == 0) {
    stop("No stage with key '", paste0(parent_key, key[[1]]), "' found")
  } else if (sum(key_index) > 1) {
    stop("Multiple stages with key '", paste0(parent_key, key[[1]]),
           "', found: ", paste0(parent_key, names(stages)[key_index], collapse = ', '))
  } else key_index <- which(key_index) # now an integer of length 1

  normalized_keys[[key_index]] <- special_or_lists(
    normalized_keys[[key_index]],
    normalize_stage_keys(append(paste0(key[-1], collapse = '/'), keys), 
      stages[[key_index]], parent_key = paste0(parent_key, key[[1]], '/'))
  )

  normalized_keys
}

normalize_stage_key.default <- function(...) {
  stop("Invalid stage key")
}

normalize_stage_keys_bidirectional <- function(from, to, stages) {
            

First, we turn our human-readable keys like “clean/impute variable 1” into a more convenient list structure like list(F, list(T, F, F), F).

                from <- normalize_stage_keys(from, stages)
  to   <- normalize_stage_keys(to, stages)
            

Recall our helper compare_stage_keys, which returns FALSE if the first argument occurs before the second. In this situation, we need to swap the keys.

                if (!compare_stage_keys(from, to)) {
            

A convenient swapping mechanism without introducing temporary variables. In R, the list2env utility can funnel named values in a list directly into an environment. Try it yourself:

x <- 1
y <- 2
list2env(list(x = y, y = x), environment())
cat(x, ",", y)
                  list2env(list(from = to, to = from), environment())
  }
            

And finally the magic trick that pulls it all together. See the more thorough explanation below beside the special_and_lists helper.

                special_and_lists(
    boolean_fill(from, forward = TRUE),
    boolean_fill(to,   forward = FALSE)
  )
}

            

Terminal stages in a stagerunner are stageRunnerNode objects, so we treat those as stages of length 1.

              stage_length <- function(obj) {
  if (is.list(obj)) length(obj)
  else 1
}

            

show_message.R

              
            

Consider our example stagerunner from before:

  • import data
  • clean data
    • impute variable 1
    • discretize variable 2
  • train model

Our goal is to display progress when executing the stagerunner:

runner

              #' Show a progress message when executing a stagerunner.
#'
#' @name show_message
#' @param stage_names character.
#' @param stage_index integer.
#' @param begin logical. Whether we are showing the begin or end message.
#' @param nested logical. Whether or not this is a nested stage (i.e.
#'    contains another stageRunner).
#' @param depth integer. How many tabs to space by (for nested stages).
#' @return Nothing, but print the message to standard output.
#' @examples 
#' \dontrun{
#'   show_message(c('one', 'two'), 2) # Will print "Beginning one stage..."
#' }
show_message <- function(stage_names, stage_index, begin = TRUE,
                         nested = FALSE, depth = 1) {
  stage_name <- stage_names[stage_index]

            

If the stage was not named (i.e., only a function was given), we “impute” the name with an ordinal: “fifth”, “twelfth”, “21st”, etc. (depending on the index of the stage).

                if (is.null(stage_name) || identical(stage_name, "") || identical(stage_name, NA_character_)) {
    stage_name <- as.ordinal(stage_index)
  }

  if (begin) {
    stage_name <- crayon::green(stage_name)
  } else {
    stage_name <- crayon::blue(stage_name)
  }

            

We indent by depth double-spaces to show nested stages clearly.

                prefix     <- paste(rep("  ", depth - 1), collapse = '')
            

We turn “import data” into “1. import data”.

                stage_name <- paste0(stage_index, ". ", stage_name)

            

Non-terminal stages (i.e., those with more sub-stages) have a beginning and an ending, so we show “Beginning 2. clean data stage” and “Ending 2. clean data stage”.

                if (nested) {
    cat(paste0(prefix, if (begin) "Beginn" else "End", "ing ",
               stage_name, " stage...\n"))
  } else if (begin) {
            

Whereas terminal stages (i.e., those without sub-stages) just run, so we show “Running 1. import data stage”.

                  cat(paste0(prefix, "Running ", stage_name, "...\n"))
  }
}

            

special_and_lists.R

              
            

Imagine we want to run from “create validation set” to “create derived variable.”

  • import data
  • create validation set
  • munge data
    • impute variable 1
    • create derived variable
    • drop some variables
  • train model

The syntax for this is runner$run("val", "munge/derived") (amongst other ways – substrings are matched to stage names by the normalize_stage_keys helper).

To translate this into code, stagerunner builds the following two trees:

  • FALSE
  • TRUE
    • TRUE
    • TRUE
    • TRUE
  • TRUE

and

  • TRUE
  • TRUE
    • TRUE
    • TRUE
    • FALSE
  • FALSE

and then intersects them:

  • FALSE
  • TRUE
    • TRUE
    • TRUE
    • FALSE
  • FALSE

The point of special_and_lists is to perform this intersection operation.

              #' AND two lists together with some regards for nesting
#'
#' The structure of the lists should be the same. That is,
#' as a tree, the two lists should be isomorphic. For example,
#' \code{special_and_lists(list(a = FALSE, b = list(b = TRUE, c = FALSE)),
#'                         list(a = FALSE, b = list(b = FALSE, c = TRUE)))}
#' yields
#' \code{list(a = FALSE, b = list(b = FALSE, c = TRUE))}
#' and
#' \code{special_and_lists(list(a = FALSE, b = list(b = TRUE, c = FALSE)),
#'                         list(a = list(b = FALSE, c = TRUE), b = FALSE))}
#' yields
#' \code{list(a = list(b = FALSE, c = TRUE), b = list(b = TRUE, c = FALSE))}
#'
#' Note that lists get ANDed based on *order*, not on key names (as this could
#' be ambiguous), so make sure the two lists have the same comparable key orders.
#' For example, \code{special_and_lists(list(a = TRUE, b = FALSE), list(b = FALSE, a = TRUE))}
#' would mistakenly return \code{list(a = TRUE, b = TRUE)}.
#'
#' @name special_and_lists
#' @param list1 a list.
#' @param list2 a list.
#' @seealso \code{\link{special_or_lists}}
#' @return the and'ed list.
#' @examples \dontrun{
#'   stopifnot(identical(
#'     special_and_lists(list(a = FALSE, b = list(b = TRUE, c = FALSE)),
#'                       list(a = FALSE, b = list(b = FALSE, c = TRUE))),
#'     list(a = FALSE, b = list(b = FALSE, c = TRUE))
#'  ))
#' 
#'   stopifnot(identical(
#'     special_and_lists(list(a = FALSE, b = list(b = TRUE, c = FALSE)),
#'                       list(a = list(b = FALSE, c = TRUE), b = FALSE)),
#'     list(a = list(b = FALSE, c = TRUE), b = list(b = TRUE, c = FALSE))
#'  ))
#' }
special_and_lists <- function(list1, list2) {
  if (identical(list1, FALSE) || identical(list2, FALSE)) {
    FALSE
            

If one of the two lists is TRUE, an “AND” operation is simply equivalent to choosing the other list.

                } else if (identical(list1, TRUE)) {
    list2
  } else if (identical(list2, TRUE)) {
    list1
  } else if (!(is.list(list1) && is.list(list2))) {
    stop("special_and_lists only accepts lists or atomic logicals of length 1")
  } else if (length(list1) != length(list2)) {
    stop("special_and_lists only accepts lists of the same length")
  } else {
            

This function should only ever be used on lists coming from the same hierarchy of stages, so give a warning if this is not the case.

                  if (!identical(names(list1), names(list2))) {
      warning("special_and_lists matches lists by order, not name, ",
              "but the names of the two lists do not match!")
    }

            

We use Map to recursively apply the operation to the remaining elements.

                  Map(special_and_lists, list1, list2)
  }
}


            

special_or_lists.R

              
            

This function is equivalent to special_and_lists but instead we apply “OR” to each pair of logical values:

  • FALSE
  • TRUE
    • TRUE
    • TRUE
    • FALSE
  • FALSE

and

  • FALSE
  • FALSE
    • FALSE
    • TRUE
    • TRUE
  • FALSE

would become

  • FALSE
  • TRUE
    • TRUE
    • TRUE
    • TRUE
  • FALSE
              #' OR two lists together with some regards for nesting
#'
#' The structure of the lists should be the same. That is,
#' as a tree, the two lists should be isomorphic. For example,
#' \code{special_or_lists(list(a = FALSE, b = list(b = TRUE, c = FALSE)),
#'                         list(a = FALSE, b = list(b = FALSE, c = TRUE)))}
#' yields
#' \code{list(a = FALSE, b = list(b = TRUE, c = TRUE))}
#' and
#' \code{special_or_lists(list(a = FALSE, b = list(b = TRUE, c = FALSE)),
#'                         list(a = list(b = FALSE, c = TRUE), b = FALSE))}
#' yields
#' \code{list(a = list(b = FALSE, c = TRUE), b = list(b = TRUE, c = FALSE))}
#'
#' Note that lists get ORed based on *order*, not on key names (as this could
#' be ambiguous), so make sure the two lists have the same comparable key orders.
#' For example, \code{special_or_lists(list(a = TRUE, b = FALSE), list(b = FALSE, a = TRUE))}
#' would mistakenly return \code{list(a = TRUE, b = TRUE)}.
#'
#' @name special_or_lists
#' @param list1 a list.
#' @param list2 a list.
#' @seealso \code{\link{special_and_lists}}
#' @return the or'ed list.
special_or_lists <- function(list1, list2) {
  if (identical(list1, TRUE) || identical(list2, TRUE)) {
    TRUE
  } else if (identical(list1, FALSE)) {
    list2
  } else if (identical(list2, FALSE)) {
    list1
  } else if (!(is.list(list1) && is.list(list2))) {
    stop("special_or_lists only accepts lists or atomic logicals of length 1")
  } else if (length(list1) != length(list2)) {
    stop("special_or_lists only accepts lists of the same length")
  } else {
    if (!identical(names(list1), names(list2))) {
      warning("special_or_lists matches lists by order, not name, ",
              "but the names of the two lists do not match!")
    }
     
    Map(special_or_lists, list1, list2)
  }
}


            

stagerunner-append.R

              
            

Imagine we have two stagerunners:

  • Import data
  • Clean data

and

  • Train model
  • Export model

It is a natural operation to concatenate or append these stagerunners into a single runner. We can do this using the $append method.

However, append will create one final stage at the end instead of juxtaposing the stages, to make it clear which runner was appended.

              #' Append one stageRunner to the end of another.
#'
#' @name stageRunner_append
#' @param other_runner stageRunner. Another stageRunner to append to the current one.
#' @param label character. The label for the new stages (this will be the name of the
#'   newly appended list element).
stageRunner_append <- function(other_runner, label = NULL) {
  stopifnot(is.stagerunner(other_runner))
  new_stage <- structure(list(other_runner), names = label)
            

Appending a stagerunner is simply concatenating its stages.

                self$stages <- c(self$stages, new_stage)
  self
}


            

stagerunner-around.R

              
            

The around method on a stagerunner is used as sort of setup and teardown hooks on arbitrary stages.

For example, imagine we have a stagerunner that looks like the following.

  • import data
  • clean data
    • impute variable 1
    • discretize variable 2

Imagine we want to write test functions that ensure the correct behavior is happening during the data cleaning. We can write a stagerunner with an identical tree structure that performs additional testing to ensure our work is correct:

new_runner <- stageRunner$new(new.env(), list(
  "import data" = function(e) { yield(); stopifnot(!is.null(e$data)) },
  "clean data"  = list(
    "impute variable 1" = function(e) {
      yield()
      stopifnot(!any(is.na(e$data$variable1)))
    }, "discretize variable 2" = function(e) {
      yield()
      stopifnot(is.factor(e$data$variable2))
   })
))

The keyword yield is injected into a stagerunner that is used with the around method, and means “execute the stage of the stagerunner that is being wrapped that would normally occur at this point.” Code before and after the yield keyword can be used to perform additional assertions about what happened during the execution of the stage.

runner$around(new_runner)
runner$run()

If any of the above assertions fail, we will now get an error.

              #' Wrap a function around a stageRunner's terminal nodes
#'
#' If we want to execute some behavior just before and just after executing
#' terminal nodes in a stageRunner, a solution without this method would be
#' to overlay two runners -- one before and one after. However, this is messy,
#' so this function is intended to replace this approach with just one function.
#'
#' Consider the runner
#'   \code{sr1 <- stageRunner$new(some_env, list(a = function(e) print('2')))}
#' If we run 
#'   \code{sr2 <- stageRunner$new(some_env, list(a = function(e) {
#'     print('1'); yield(); print('3') }))
#'    sr1$around(sr2)
#'    sr1$run()
#'  }
#' then we will see 1, 2, and 3 printed in succession. The \code{yield()}
#' keyword is used to specify when to execute the terminal node that
#' is sandwiched in the "around" runner.
#'
#' @name stageRunner_around
#' @param other_runner stageRunner. Another stageRunner from which to create
#'   an around procedure. Alternatively, we could give a function or a list
#'   of functions.
stageRunner_around <- function(other_runner) {
  if (is.null(other_runner)) return(self)
  if (!is.stagerunner(other_runner)) {
    # Create a new stagerunner if `other_runner` is not already a runner.
    other_runner <- stageRunner$new(self$.context, other_runner)
  }

            

If no names are given names(other_runner$stages) may be NULL. We would like a character vector of empty strings instead so may we obtain the correct names in the loop below.

                stagenames <- names(other_runner$stages) %||% rep("", length(other_runner$stages))
  lapply(seq_along(other_runner$stages), function(stage_index) {
    name <- stagenames[stage_index]
            

We assume each named stage has a unique name.

                  # TODO: (RK) It may be possible to avoid this assumption by counting  
    # duplicately named stages.
    this_index <- 
      if (identical(name, "")) stage_index
      else if (is.element(name, names(self$stages))) name
      else return()

            

If both this stage and the corresponding stage on the other runner are stagerunners, we recursively use the around method. Otherwise, we use the stageRunnerNode$around method.

                  if (is.stagerunner(self$stages[[this_index]]) &&
        is.stagerunner(other_runner$stages[[stage_index]])) {
      self$stages[[this_index]]$around(other_runner$stages[[stage_index]])
    } else if (is.stageRunnerNode(self$stages[[this_index]]) &&
               is.stageRunnerNode(other_runner$stages[[stage_index]])) {
      self$stages[[this_index]]$around(other_runner$stages[[stage_index]])
    } else {
      warning("Cannot apply stageRunner$around because ",
              this_index, " is not a terminal node.")
    }
  })
  self
}

            

stagerunner-coalesce.R

              
            

Coalescing two stagerunners is a critical operation for modifying a runner that is “in flight” and is currently being executed.

Imagine we have our usual example runner:

  • import data
  • clean data
    • impute variable 1
    • discretize variable 2
  • train model

It has remember = TRUE, which means it is keeping a copy of the current context in each stage. As in the example, we have executed it to the imputation substage. We now realize there is a mistake in the imputation code. We can re-create a new fresh stagerunner with the same structure, but it will not have the history of context changes!

Instead, we must coalesce the old runner onto the new runner, so that it carries over the environment changes. That way, when we continue execution from the imputation substage in our fixed runner, it will resume as before without having to re-import the data.

This can be inefficient for large datasets, but using the objectdiff package we can avoid the memory problems that may arise. For even larger datasets, we may need database-backed storage, but this is beyond the scope of stagerunners for now.

              #' Coalescing a stageRunner object is taking another stageRunner object
#' with similar stage names and replacing the latter's cached environments
#' with the former's.
#'
#' @name stageRunner_coalesce
#' @param other_runner stageRunner. Another stageRunner from which to coalesce.
#' @note coalescing is ill-defined for stageRunner with unnamed stages,
#'    since it is impossible to tell when a stage has changed.
stageRunner_coalesce <- function(other_runner) {
  # TODO: Should we care about insertion of new stages causing cache wipes?
  # For now it seems like this would just be an annoyance.
  # stopifnot(remember)
  if (!isTRUE(self$remember)) return()

            

We must handle these cases: (1) integration with objectdiff, and (2) vanilla R environment objects. Both are tricky.

                if (self$with_tracked_environment()) {
    if (!other_runner$with_tracked_environment()) {
      stop("Cannot coalesce stageRunners using tracked_environments with ",
           "those using vanilla environments", call. = FALSE)
    }

    compare_head <- function(x, y) {
      m <- seq_len(min(length(x), length(y)))
      x[m] != y[m]
    }

    common <- sum(cumsum(compare_head(self$stage_names(), other_runner$stage_names())) == 0)
    # Warning: Coalescing stageRunners with tracked_environments does not
    # duplicate the tracked_environment, so the other_runner becomes invalidated,
    # and this is a destructive action.
    # TODO: (RK) What if the tracked_environment given initially to the stageRunner
    # already has some commits?
    commits     <- package_function("objectdiff", "commits")
    `.context<-` <- function(obj, value) {
      if (is.stagerunner(obj)) {
        obj$.context <- value
        for (stage in obj$stages) { Recall(stage, value) }
      } else if (is.stageRunnerNode(obj)) {
        obj$.context <- value
        if (is.stagerunner(obj$callable)) { Recall(obj$callable, value) }
      }
    }
    self$.context  <- other_runner$.context
    for (stage in self$stages) { .context(stage) <- other_runner$.context }
            

Mark common executed stages.

                  self_iterator  <- treeSkeleton(self)$root()$first_leaf()
    other_iterator <- treeSkeleton(other_runner)$root()$first_leaf()
    for (i in seq_along(common)) {
      self_iterator$object$executed <- other_iterator$object$executed
      self_iterator  <- self_iterator$successor()
      other_iterator <- other_iterator$successor()
    }

            

Coalescing is a destructive action since the other runner will no longer be able to perform its function after the environments are moved.

                  other_runner$.context <- new.env(parent = emptyenv())
    commit_count   <- length(commits(self$.context)) 
    mismatch_count <- commit_count - (common + 1)
    if (mismatch_count > 0) {
      package_function("objectdiff", "force_push")(self$.context, commit_count)
      package_function("objectdiff", "rollback")  (self$.context, mismatch_count)
    }
  } else {
    if (other_runner$with_tracked_environment()) {
      stop("Cannot coalesce stageRunners using vanilla environments with ",
           "those using tracked_environments", call. = FALSE)
    }

    stagenames <- names(other_runner$stages) %||% character(length(other_runner$stages))
    lapply(seq_along(other_runner$stages), function(stage_index) {
      # TODO: Match by name *OR* index
      if (stagenames[[stage_index]] %in% names(self$stages)) {
        # If both are stageRunners, try to coalesce our sub-stages.
        if (is.stagerunner(self$stages[[names(self$stages)[stage_index]]]) &&
            is.stagerunner(other_runner$stages[[stage_index]])) {
            self$stages[[names(self$stages)[stage_index]]]$coalesce(
              other_runner$stages[[stage_index]])
        # If both are not stageRunners, copy the cached_env if and only if
        # the stored function and its environment are identical
        } else if (!is.stagerunner(self$stages[[names(self$stages)[stage_index]]]) &&
            !is.stagerunner(other_runner$stages[[stage_index]]) &&
            !is.null(other_runner$stages[[stage_index]]$.cached_env) #&&
            #identical(deparse(stages[[names(stages)[stage_index]]]$fn),
            #          deparse(other_runner$stages[[stage_index]]$fn)) # &&
            # This is way too tricky and far beyond my abilities..
            #identical(stagerunner:::as.list.environment(environment(stages[[names(stages)[stage_index]]]$fn)),
            #          stagerunner:::as.list.environment(environment(other_runner$stages[[stage_index]]$fn)))
            ) {
          self$stages[[names(self$stages)[stage_index]]]$.cached_env <-
            new.env(parent = parent.env(self$.context))
          if (is.environment(other_runner$stages[[stage_index]]$.cached_env) &&
              is.environment(self$stages[[names(self$stages)[stage_index]]]$.cached_env)) {
            copy_env(self$stages[[names(self$stages)[stage_index]]]$.cached_env,
                     other_runner$stages[[stage_index]]$.cached_env)
            self$stages[[names(self$stages)[stage_index]]]$executed <- 
              other_runner$stages[[stage_index]]$executed
          }
        }
      }
    })
    self$.set_parents()
  }
  self
}


            

stagerunner-current_stage.R

              
            
              #' This allows us to get the furthest executed stage.
#'
#' @name stageRunner_current_stage
#' @return a character stage key giving the latest executed stage.
#'   If the stageRunner does not have caching enabled, this will
#'   always return the first stage key (`'1'`).
stageRunner_current_stage <- function() {
  for (stage_index in rev(seq_along(self$stages))) {
            

We use the stageRunnerNode$was_executed helper to determine if this stage has been executed yet.

                  is_executed_terminal_node <- is.stageRunnerNode(self$stages[[stage_index]]) &&
      self$stages[[stage_index]]$was_executed()

    if (is_executed_terminal_node) return(as.character(stage_index))

            

We can recursively use current_stage if the current stage is another stagerunner rather than a terminal node.

                  has_executed_terminal_node <- is.stagerunner(self$stages[[stage_index]]) &&
      is.character(tmp <- self$stages[[stage_index]]$current_stage())

    if (has_executed_terminal_node) return(paste(c(stage_index, tmp), collapse = '/'))
  }
  FALSE
}

            

stagerunner-has_key.R

              
            
              #' Whether or not the stageRunner has a key matching this input.
#'
#' @param key ANY. The potential key.
#' @return \code{TRUE} or \code{FALSE} accordingly.
stageRunner_has_key <- function(key) {
            

We turn the key, like “data/foo” or c(1,2) into a nested list of logicals in the usual format, or FALSE if the key cannot be parsed.

                has <- tryCatch(normalize_stage_keys(key, self$stages),
                  error = function(.) FALSE)
            

If any substage is TRUE, the stagerunner contains this key. Note that keys may refer to several different substages! This method will tell us whether it is possible to execute anything using the provided key.

                any(c(has, recursive = TRUE))
}

            

stagerunner-initialize.R

              
            
              #' Initialize a stageRunner object.
#'
#' stageRunner objects are used for executing a linear sequence of
#' actions on a context (an environment). For example, if we have an
#' environment \code{e} containing \code{x = 1, y = 2}, then using
#' \code{stages = list(function(e) e$x <- e$x + 1, function(e) e$y <- e$y - e$x)}
#' will cause \code{x = 2, y = 0} after running the stages.
#'
#' @name stageRunner_initialize
#' @param context environment. The initial environment that is getting
#'    modified during the execution of the stages. 
#' @param stages list. The functions to execute on the \code{context}.
#' @param remember logical. Whether to keep a copy of the context and its
#'    contents throughout each stage for debugging purposes--this makes it
#'    easy to go back and investigate a stage.
#'    
#'    The default is \code{FALSE}. When set to \code{TRUE}, the return value
#'    of the \code{run} method will be a list of two environments: one of what
#'    the context looked like before the \code{run} call, and another
#'    of the aftermath.
            

When a stagerunner object is initialized, it needs to convert a pre-stagerunner, like

list(first = some_function, second = list(
  sub1 = another_function, sub2 = a_third_function
)

into a stagerunner object. This class constructor will turn the above into a hierarchy of stagerunners to make it easier to recursively re-use functionality.

              #' @param mode character. Controls the default behavior of calling the
#'    \code{run} method for this stageRunner. The two supported options are
#'    "head" and "next". The former gives a stageRunner which always begins
#'    from the first stage if the \code{from} parameter to the \code{run}
#'    method is blank. Otherwise, it will begin from the previous unexecuted
#'    stage. The default is "head". This argument has no effect if
#'    \code{remember = FALSE}.
stagerunner_initialize <- function(context, stages, remember = FALSE,
                                   mode = getOption("stagerunner.mode") %||% "head") {
  
            

As a convenient shortcut, if a stagerunner is initialized without a second argument but with a first argument that can be turned into stages, we create a new environment for the context.

                if (missing(stages) && !missing(context) && is_pre_stagerunner(context)) {
    stages  <- context
            

The only parent environment that makes sense is the calling environment.

                  context <- new.env(parent = parent.frame())
  }

  if (identical(remember, FALSE) && is(context, "tracked_environment")) {
    stop("Can not use tracked environments with stagerunners that have caching ",
         "disabled (remember = FALSE)")
  }

            

The enforce_type helper in utils.R will print a nice and colorful error message if we have initialized our stagerunner with the wrong argument types.

                enforce_type(context,  "environment", "stagerunner", "context")
  enforce_type(remember, "logical",     "stagerunner", "remember")
  enforce_type(mode,     "character",   "stagerunner", "mode")

            

match.arg is a convenient base R helper that will error unless one of a given set of options is chosen.

                match.arg(mode, c("head", "next"))

  stopifnot(length(remember) == 1)

  self$.parent   <- NULL
  # The .finished flag is used for certain features when printing a stagerunner.
  self$.finished <- FALSE 
  self$.context  <- context
  self$.mode     <- tolower(mode)
  self$remember  <- remember

            

A stagerunner will recursively be represented using more stagerunners. This way, we can re-use methods defined on a stagerunner on local subsections.

                self$stages    <- initialize_stages(stages, context, remember)
  
            

We wrap up with some messy initialization in case our stagerunner intends to remember progress.

                if (isTRUE(self$remember)) {
    initialize_remembrance(self)
  }
}

initialize_stages <- function(stages, context, remember) {
  if (length(stages) == 0) {
    warning("stagerunners with zero stages may cause problems.")
  }

  if (!is_pre_stagerunner(stages)) {
    stop("Can only turn a function or list of functions into a stagerunner.")
  }

  if (is.function(stages)) {
    stages <- list(stages)
  }

            

A loop is slightly faster than an lapply here.

                for (i in seq_along(stages)) {
    if (is.list(stages[[i]])) {
      stages[[i]] <- stagerunner(context, stages[[i]], remember = remember)
    } else if (is.function(stages[[i]]) || is.null(stages[[i]])) {
      stages[[i]] <- stageRunnerNode(stages[[i]], context)
    }
  }

            

We will be using the / character in a special way for running stages. For example, if we had a runner such as

  • import data
  • clean data
    • impute variable 1
    • discretize variable 2

we would run the first substage using runner$run("clean data/impute variable 1"). To avoid complications, we prevent the use of slashes in the stage names.

                prevent_stage_name_violators(stages)

  stages
}

prevent_stage_name_violators <- function(stages) {
  if (any(violators <- grepl("/", names(stages), fixed = TRUE))) {
    stop(paste0("Stage names may not have a '/' character. The following do not ",
      "satisfy this constraint: '",
      paste0(names(stages)[violators], collapse = "', '"), "'"))
  }
}

initialize_remembrance <- function(stagerunner) {
  stagerunner$.clear_cache()
            

We set up some meta-data that will be used to track the changes occuring in the stagerunner. See the treeSkeleton class later for more details.

                stagerunner$.set_parents()
  if (stagerunner$with_tracked_environment()) {
    stagerunner$.set_prefixes()
  } else if (length(stagerunner$stages) > 0) {
            

The very first stage should remember what the context looked like upon initialization. After all, if a user messed with the context and later re-runs the stagerunner from scratch, it should remember what the context looked like at the time of initialization.

                  first_env <- treeSkeleton$new(stagerunner$stages[[1]])$first_leaf()$object
    first_env$.cached_env <- new.env(parent = parent.env(stagerunner$.context))
    copy_env(first_env$.cached_env, stagerunner$.context)
  }
}

            

stagerunner-internal.R

              
            

This file contains some messy internal methods that are necessary for correct interoperation with the treeSkeleton class and the objectdiff package.

              #' Clear all caches in this stageRunner, and recursively.
#' @name stageRunner_.clear_cache
stageRunner_.clear_cache <- function() {
  for (i in seq_along(self$stages)) {
            

The stagerunner context just prior to stage execution is stored in an environment cache. We clear this cache recursively.

                  if (is.stagerunner(self$stages[[i]])) self$stages[[i]]$.clear_cache()
    else self$stages[[i]]$.cached_env <- NULL
  }
  TRUE
}

            

The treeSkeleton requires a recursive structure to be annotated with “parent metadata” so it can be traversed like a tree structure. This is what allows us to go from stage “2/2” to stage “3”, for example: we are finding the successor node in the tree structure and “running” it.

              #' Set all parents for this stageRunner, and recursively
#' @name stageRunner_.set_parents
stageRunner_.set_parents <- function() {
  for (i in seq_along(self$stages)) {
    # Set convenience helper attribute "child_index" to ensure that treeSkeleton
    # can find this stage.
            

The metadata required by the treeSkeleton class.

                  attr(self$stages[[i]], 'child_index') <<- i
    attr(self$stages[[i]], 'parent') <<- self
  }
  self$.parent <- NULL
}

#' Get an environment representing the context directly before executing a given stage.
#'
#' @note If there is a lot of data in the remembered environment, this function
#'   may be computationally expensive as it has to create a new environment
#'   with a copy of all the relevant data.
#' @param stage_index integer. The substage for which to grab the before
#'   environment.
#' @return a fresh new environment representing what would have been in
#'   the context as of right before the execution of that substage.
stageRunner_.before_env <- function(stage_index) {
  cannot_run_error <- function() {
    stop("Cannot run this stage yet because some previous stages have ",
         "not been executed.")
  }

  if (self$with_tracked_environment()) {
    # We are using the objectdiff package and its tracked_environment,
    # so we have to "roll back" to a previous commit.
    current_commit <- paste0(self$.prefix, stage_index)

    if (!current_commit %in% names(package_function("objectdiff", "commits")(self$.context))) {
      if (`first_commit?`(current_commit)) {
        # TODO: (RK) Do this more robustly. This will fail if there is a 
        # first sub-stageRunner with an empty list as its stages.
        package_function("objectdiff", "commit")(self$.context, current_commit)
      } else {
        cannot_run_error()
      }
    } else {
      package_function("objectdiff", "force_push")(self$.context, current_commit)
    }

    env <- new.env(parent = package_function("objectdiff", "parent.env.tracked_environment")(self$.context))
    copy_env(env, package_function("objectdiff", "environment")(self$.context))
    env
  } else {
    env <- self$stages[[stage_index]]$.cached_env
    if (is.null(env)) { cannot_run_error() }

    # Restart execution from cache, so set context to the cached environment.
    copy_env(self$.context, env)
    env
  }
}

#' Mark a given stage as being finished.
#' 
#' @param stage_index integer. The index of the substage in this stageRunner.
stageRunner_.mark_finished <- function(stage_index) {
  node <- treeSkeleton$new(self$stages[[stage_index]])$successor()

  if (!is.null(node)) { # Prepare a cache for the future!
    if (self$with_tracked_environment()) {
      # We assume the head for the tracked_environment is set correctly.
      package_function("objectdiff", "commit")(self$.context, node$object$index())
    } else {
      node$object$.cached_env <- new.env(parent = parent.env(self$.context))
      copy_env(node$object$.cached_env, self$.context)
    }
  } else {
    # TODO: Remove this hack used for printing
    root <- self$.root()
    root$.finished <- TRUE
  }
}

#' Determine the root of the stageRunner.
#'
#' @name stageRunner_.root
#' @return the root of the stageRunner
stageRunner_.root <- function() {
  treeSkeleton$new(self)$root()$object
}

            

stagerunner-next_stage.R

              
            

Stagerunners allow for the option to use a special mode called "next". In this mode, instead of executing by default from the beginning of the stagerunner, execution will commence from the last non-executed stage.

This allows us to repeatedly call runner$run() until it has finished executing, if errors occur during the process and we repeatedly fix them.

              #' For stageRunners with caching, find the next unexecuted stage.
#'
#' @name stageRunner_next_stage
#' @return a character stage key giving the next unexecuted stage.
#'   If all stages have been executed, this returns \code{FALSE}.
#'   If the stageRunner does not have caching enabled, this will
#'   always return the first stage key (`'1'`).
stageRunner_next_stage <- function() {
  for (stage_index in seq_along(self$stages)) {
            

We use the stageRunnerNode$was_executed helper to determine if this stage has been executed yet.

                  is_unexecuted_terminal_node <- is.stageRunnerNode(self$stages[[stage_index]]) &&
      !self$stages[[stage_index]]$was_executed()
    if (is_unexecuted_terminal_node) return(as.character(stage_index))

            

We can recursively use next_stage if the current stage is another stagerunner rather than a terminal node.

                  has_unexecuted_terminal_node <- is.stagerunner(self$stages[[stage_index]]) &&
      is.character(tmp <- self$stages[[stage_index]]$next_stage())

    if (has_unexecuted_terminal_node) return(paste(c(stage_index, tmp), collapse = '/'))
  }
  FALSE
}

            

stagerunner-objectdiff.R

              
            

Stagerunners remember the full history of their execution. If you have fifty data preparation steps recorded in a stagerunner and the remember flag is set to TRUE, a full copy of the dataset will be made after each step. This is highly inefficient.

We attempt to solve this problem with a space-time tradeoff: the objectdiff package computes the difference between the environment before and after executing a given stage. By incorporating this package into a stagerunner, we can take slightly more time (by computing differences between environments during the execution of each stage) but save a lot of memory (by only storing patches that record what has changed during each step, rather than a full copy of the data set).

This advanced feature allows stagerunners to remain in-memory, retaining the fast interactive iterate model building process. The downside is either slightly more space or time usage depending on the configuration of the objectdiff package.

We can avoid the problem entirely by doing all of our processing in batches or in-database, but this is outside of the scope of this package. For interactive model development on data sets with less than 1M rows, performance is usually not prohibitive.

              # Some features of stageRunner, specifically its interaction with the
# objectdiff package, require some additional setup. We attempt to record
# all of these dependencies in this file.

#' Set all prefixes for child stageRunners.
#'
#' When a stageRunner is used in conjunction with an
#' \code{objectdiff::tracked_environment}, we need to remember
#' the full nested tree structure. This function sets up the
#' \code{prefix} member of each sub-stageRunner recursively to enable
#' correct remembering functionality.
#'
#' @param prefix character. The prefix to assign to this stageRunner.
#' @name stageRunner_.set_prefixes
stageRunner_.set_prefixes <- function(prefix = '') {
  self$.prefix <- prefix
  for (i in seq_along(self$stages)) {
    if (is.stageRunner(self$stages[[i]])) {
      self$stages[[i]]$.set_prefixes(paste0(prefix, i, '/'))
    }
  }
}

`first_commit?` <- function(commit) {
  all(strsplit(commit, "/", fixed = TRUE)[[1]] == '1')
}


            

stagerunner-overlay.R

              
            

Overlaying a stagerunner means replacing the terminal nodes with terminal nodes that do some extra behavior, and can themselves be full stagerunners. This is subtly different than the around method, which transforms the terminal node function itself, rather than turning it from a function to a stagerunner.

For example, if we have a stagerunner like

  • import data
  • clean data
    • impute variable 1
    • discretize variable 2
  • train model

we may wish to replace each function with a “hidden” mini-runner that runs some tests after each stage.

  • import data
    • vanilla function
    • some testing function that checks data got imported
  • clean data
    • impute variable 1
      • vanilla function
      • some testing function that checks variables were imputed
    • discretize variable 2
      • vanilla function
      • some testing function that checks variables were discretized
  • train model
    • vanilla function
    • some testing function that checks the model got trained successfully

We can achieve this by passing the stagerunner with the same tree structure but containing tests in the terminal node as the argument to the main stagerunner's overlay method.

              #' Overlaying a stageRunner object is taking another stageRunner object
#' with similar stage names and adding the latter's stages as terminal stages
#' to the former (for example, to support tests).
#'
#' @name stageRunner_overlay
#' @param other_runner stageRunner. Another stageRunner from which to overlay.
#' @param label character. The label for the overlayed stageRunner. This refers
#'    to the name the former will get wrapped with when appended to the
#'    stages of the current stageRunner. For example, if \code{label = 'test'},
#'    and a current terminal node is unnamed, it will becomes
#'    \code{list(current_node, test = other_runner_node)}.
#' @param flat logical. Whether to use the \code{stageRunner$append} method to
#'    overlay, or simply overwrite the given \code{label}. If \code{flat = TRUE},
#'    you must supply a \code{label}. The default is \code{flat = FALSE}.
stageRunner_overlay <- function(other_runner, label = NULL, flat = FALSE) {
  stopifnot(is.stagerunner(other_runner))
  for (stage_index in seq_along(other_runner$stages)) {
    name <- names(other_runner$stages)[[stage_index]]
    index <-
      if (identical(name, '') || identical(name, NULL)) stage_index
      else if (name %in% names(self$stages)) name
      else stop('Cannot overlay because keys do not match')
    self$stages[[index]]$overlay(other_runner$stages[[stage_index]], label, flat)
  }
  TRUE
}

            

stagerunner-package.R

              
            
              #' stagerunner: in-memory reproducible data preparation and modeling
#'
#' stagerunner is an attempt to define a notion of data munging that includes
#' \emph{history}. By writing your code as a stagerunner instead of a
#' collection of functions, three key advantages should become clear:
#'
#' \itemize{
#' \item Clarity will emerge in code that is intended to execute a sequence
#'   of operations that aims to produce a final result.
#' \item Reproducibility of interactive munging steps is possible without
#'   re-executing your analysis from scratch.
#' \item Modularity and extensibility becomes free of charge: methods like
#'   \code{around} and \code{transform} allow you to apply the same operation
#'   to your entire modeling procedure, simplifying progress monitoring and
#'   debugging.
#' }
#'
#' Although originally intended for clarifying the modeling process,
#' stagerunners have much more general applicability. To learn more,
#' begin with the vignettes: \code{browseVignettes(package = "stagerunner")}.
#'
#' @docType package
#' @name stagerunner
#' @import crayon R6
#' @author Robert Krzyzanowski <\url{http://syberia.io}>
#' @seealso The core function in this package: \code{\link{stagerunner}}. It
#'   defines the constructor creating stagerunner objects that allow you to
#'   wrap a complicated modeling procedure into an organized hierarchy.
#' @references Full documentation and demos: \url{http://robertzk.github.io/stagerunner/};
#'   FAQ's: \url{http://robertzk.github.io/stagerunner/faq/}
NULL

            

Since self is used all over the place in R6 method definitions, R CMD CHECK will yell at us if we do not include the line below.

              globalVariables('self')
            

stagerunner-run.R

              
            

The heart of a stagerunner object is its run method, depicted on the right. A stagerunner consists of two things:

  • a context: This is an environment object that allows the user to persistently store information between stages. The usual way to build a data pipeline is to provide functions with various inputs and hook them up to functions with various outputs.

    This is nice because it is clear what the inputs and outputs will be. However, the disadvantage is that hooking up all the functions can become pretty messy.

    In this approach, we let the user set their own conventions for what to place in the context. The advantage is that all stages have the same form, a function taking one argument (the context), and so they become easy to manipulate.

  • stages: A list of functions or, recursively, other stagerunners. Each function should take precisely one argument: the context described above. If you have some familiarity with pure mathematics, you will know the original inspiration for stagerunners: a stagerunner is a sequence of actions on an environment.

Running a portion of a stagerunner means to execute some of its stages on its context. For example, suppose we start with an empty environment context = new.env() and the following stages:

context <- new.env()
runner  <- stagerunner(context, list(
 "Set x"    = function(e) { e$x <- 1 },
 "Double x" = function(e) { e$x <- 2 * e$x }
))

If we write runner$run("Set x"), then context$x will become 1. If we write runner$run(2) (a syntactical shortcut), then context$x becomes 2. If we write runner$run(2) again, it will become 4.

The real advantage of this approach becomes clear when we enable the remember flag:

context <- new.env()
runner  <- stagerunner(remember = TRUE, context, list(
  "Import data"               = function(e) e$data <- iris,
  "Create dependent variable" = function(e) e$dep_var <- e$data[[1]] > 5,
  "Create derived variable"   = function(e) e$diff <- e$data[[1]] - e$data[[2]]
))

Now, the stagerunner holds a copy of the full environment in each stage: this means we can re-run previous stages at will.

runner$run()        # Run all stages
runner$data <- NULL # Clear the data
runner$run(2)       # Re-run just the second stage.

In this scenario, the data gets restored from a cached environment– what the context looked like after the first stage finished– and we have a dep_var column (although no diff column since the third stage was now “rolled back”).

This kind of approach also allows us to debug what happens during execution:

envs <- runner$run(2)
ls(envs$before$data) # The iris attributes
ls(envs$after$data)  # The iris attributes *and* dep_var

When a stagerunner is set to remember its progress the output of the run function consists of a list with keys before and after representing two environments: what the stagerunner's context looked like before and after executing that stage.

              #' Run the stages in a stageRunner object.
#'
#' @param from an indexing parameter. Many forms are accepted, but the
#'   easiest is the name of the stage. For example, if we have
#'   \code{stageRunner$new(context, list(stage_one = some_fn, stage_two = some_other_fn))}
#'   then using \code{run('stage_one')} will execute \code{some_fn}.
#'   Additional indexing forms are logical (which stages to execute),
#'   numeric (which stages to execute by indices), negative (all but the
#'   given stages), character (as above), and nested forms of these.
#'   The latter refers to instances of the following:
#'   \code{stageRunner$new(context, list(stage_one =
#'     stageRunner$new(context, substage_one = some_fn, substage_two = other_fn),
#'     stage_two = another_fn))}.
#'   Here, the following all execute only substage_two:
#'   \code{run(list(list(FALSE, TRUE), FALSE))},
#'   \code{run(list(list(1, 2)))},
#'   \code{run('stage_one/substage_two')},
#'   \code{run('one/two')},
#'   Notice that substrings are allowed for characters.
#'   The default is \code{NULL}, which runs the whole sequences of stages.
#' @param to an indexing parameter. If \code{from} refers to a single stage,
#'   attempt to run from that stage to this stage (or, if this one comes first,
#'   this stage to that stage). For example, if we have
#'      \code{stages = list(a = list(b = 1, c = 2), d = 3, e = list(f = 4, g = 5))}
#'   where the numbers are some functions, and we call \code{run} with
#'   \code{from = 'a/c'} and \code{to = 'e/f'}, then we would execute
#'   stages \code{"a/c", "d", "e/f"}.
#' @param verbose logical. Whether or not to display pretty colored text
#'   informing about stage progress.
#'   nested list of logicals.
#' @param remember_flag logical. An internal argument used by \code{run}
#'   recursively if the \code{stageRunner} object has the \code{remember}
#'   field set to \code{TRUE}. If \code{remember_flag} is FALSE, \code{run}
#'   will not attempt to restore the context from cache (e.g., if we are
#'   executing five stages simultaneously with \code{remember = TRUE},
#'   the first stage's context should be restored from cache but none
#'   of the remaining stages should).
#' @param mode character. If \code{mode = 'head'}, then by default the
#'   \code{from} parameter will be used to execute that stage and that
#'   stage only. If \code{mode = 'next'}, then the \code{from} parameter
#'   will be used to run (by default, if \code{to} is left missing)
#'   from the last successfully executed stage to the stage given by
#'   \code{from}. If \code{from} occurs before the last successfully
#'   executed stage (say S), the stages will be run from \code{from} to S.
#' @param normalized logical. A convenience recursion performance helper. If
#'   \code{TRUE}, stageRunner will assume the \code{from} argument is a
#'   nested list of logicals.
            

Do not worry about this parameter, .depth. It is used internally to keep track of how “deep” the current stage execution is.

              #' @param .depth integer. Internal parameter for keeping track of nested
#'   execution depth.
#' @param ... Any additional arguments to delegate to the \code{stageRunnerNode}
#'   object that will execute its own \code{run} method.
#'   (See \code{stageRunnerNode$run})
#' @return TRUE or FALSE according as running the stages specified by the
#'   \code{from} and \code{to} keys succeeded or failed. If
#'   \code{remember = TRUE}, this will instead be a list of the environment
#'   before and after executing the aforementioned stages. (This allows
#'   comparing what changes were made to the \code{context} during the
#'   execution of the stageRunner.)
#' @examples
#' env <- new.env()
#' some_fn    <- function(e) e$x <- 1
#' other_fn   <- function(e) e$y <- 1
#' another_fn <- function(e) e$z <- 1
#' sr <- stagerunner(env, list(stage_one =
#'  stagerunner(env, list(substage_one = some_fn, substage_two = other_fn)),
#'  stage_two = another_fn))
#' 
#' # Here, the following all execute only substage_two:
#'
#' sr$run(list(list(FALSE, TRUE), FALSE))
#' sr$run(list(list(1, 2)))
#' sr$run('stage_one/substage_two')
#' sr$run('one/two')
#' stopifnot(is.null(env$z), is.null(env$x), identical(env$y, 1))
#'
#' # This will execute all but "stage_one" (i.e., only "stage_two")
#' sr$run(-1)
#' stopifnot(identical(env$z, 1))
run <- function(from = NULL, to = NULL, verbose = FALSE,
                remember_flag = getOption("stagerunner.remember", TRUE),
                mode = self$.mode, normalized = FALSE, .depth = 1, ...) {
            

The parameter normalized refers to whether the input (that is, the from and to parameters) are in the canonical nested list format. For example, if we have a runner with stages “Import”, “Data/impute”, and “Data/discretize”, the canonical representation for the first substage of the second stage would be list(FALSE, list(TRUE, FALSE)). This allows the stagerunner package to easily tell what is being executed.

If the from and to parameters are not in normal form, or the from parameter is missing and the to parameter is present (so that we are asking to run from the beginning to the stage denoted by to), we must first normalize the keys to use this nested list format.

We will use the stage_key local variable to track what substages to execute during this run call.

                if (identical(normalized, FALSE)) {
    if (missing(from) && identical(self$remember, TRUE) && identical(mode, 'next')) {
      from <- self$next_stage()
      if (missing(to)) to <- TRUE
    }
    stage_key <- normalize_stage_keys(from, self$stages, to = to)
  } else {
    stage_key <- from
  }

            

Now that we have determined which stages to run, we cycle through them all. It is up to the user to determine that context changes make sense. We also implicitly sort the stages to ensure linearity is preserved. Stagerunner enforces the linearity and directionality set in the stage definitions.

                
            

If we are remembering changes, we must recall what the environment looked like before we ran anything.

                before_env <- NULL

  for (stage_index in seq_along(stage_key)) {
    nested_run <- TRUE
    
            

In a stagerunner, recursively nested stages (i.e., stages with substages) are themselves represented as stagerunners, while final stages (i.e., the functions to execute) are represented as R6 objects called stageRunnerNodes. In each scenario, a different recursive call to $run will be necessary, so we compute a closure that gives the correct call for later use.

                  run_stage <- determine_run_stage(stage_key, stage_index,
                                     self$stages, verbose, .depth)

            

We keep track of whether this is a nested run so that the verbose display knows whether to say “Beginning stage X” or “Running stage X”.

                  if (isTRUE(stage_key[[stage_index]]) &&
        !is.stagerunner(self$stages[[stage_index]])) {
      nested_run <- FALSE
    }

            

The above helper run_stage will return an object of class next_stage if we should skip this stage (i.e., because stage_key[[stage_index]] is FALSE).

                  if (is(run_stage, "next_stage")) next

            

Display a nice message telling us which stage we are currently executing.

                  display_message <- isTRUE(verbose) && contains_true(stage_key[[stage_index]])
    if (display_message) {
      show_message(names(self$stages), stage_index, begin = TRUE,
                   nested = nested_run, depth = .depth)
    }

            

If remember = TRUE, we have to cache the progress along each stage.

                  if (self$remember && isTRUE(remember_flag) && is.null(before_env)) {
            

If we have not determined what the environment on the stagerunner was like prior to running any stages, we do so now. This will eventually be returned by this function, so that the user can inspect what happened before and after all the desired stages were executed.

                    if (nested_run) {
            

If this is a nested stage, we grab the “initial environment” recursively.

                      before_env <- run_stage(..., remember_flag = TRUE)$before
      } else { 
            

Otherwise, if it is a terminal node, we just make a copy of the current context.

                      before_env <- self$.before_env(stage_index)
      }
      
            

If the current stage is a terminal node, execute the stage (if it was nested, it's already been executed in order to recursively fetch the initial environment, before_env).

                    if (!nested_run) { run_stage(...) }
    }
    else if (self$remember) { run_stage(..., remember_flag = remember_flag) }
    else { run_stage(...) }

            

When we're done running a stage (i.e., processing a terminal node), set the cache on the successor node to be the current context (since that node will execute starting with what's in the context now – this also ensures that running that node with a separate call to $run will not bump into a “you haven't executed this stage yet” error).

                  if (self$remember && isTRUE(remember_flag) && !nested_run) {
      self$.mark_finished(stage_index)
    }

            

Finally, display our progress by indicating we are ending this stage.

                  if (display_message) {
      show_message(names(self$stages), stage_index, begin = FALSE,
                   nested = nested_run, depth = .depth)
    }
  }

            

If the stagerunner is a remembering stagerunner, i.e., the field remember = TRUE, we will return a list with keys before and after indicating what the stagerunner's context looked like before and after executing the stages indicated by the from and to parameters. This allows the user to perform their own analysis about what happened.

Otherwise, we simply return TRUE (invisibly).

                if (self$remember && isTRUE(remember_flag)) {
    list(before = before_env, after = self$.context)
  } else {
    invisible(TRUE)
  }
}

            

This is a helper function to call $run correctly if we are recursively executing substages:

  • If the substage is a stagerunner, pass along information about how deep we currently are in the stagerunner for verbose printing.
  • Otherwise, simply call the stageRunnerNode$run method directly.
              determine_run_stage <- function(stage_key, stage_index, stages, verbose, .depth) {
  if (isTRUE(stage_key[[stage_index]])) {
    stage <- stages[[stage_index]]
    if (is.stagerunner(stage)) { 
      function(...) { stage$run(verbose = verbose, .depth = .depth + 1, ...) }
    } else {
     nested_run <- FALSE
            

Intercept the remember_flag argument to calls to the stageRunnerNode (since it doesn't know how to use it).

                   function(..., remember_flag = TRUE) { stage$run(...) }
    }
  } else if (is.list(stage_key[[stage_index]])) {
    function(...) {
      stages[[stage_index]]$run(stage_key[[stage_index]], normalized = TRUE,
                                verbose = verbose, .depth = .depth + 1, ...)
    }
  } else {
    structure(list(), class = "next_stage")
  }
}

            

stagerunner-show.R

              
            

Printing a stagerunner should show information about:

  1. The hierarchical structure of its stages.
  2. A summary of what stages have been already executed.

We choose the following notation:

A caching stageRunner with 4 stages:
+ import
 * data
   + impute variable
  * discretize variable
 - train model
Context <environment: 0x101726640>

The + indicates the stage has been executed successfully; * indicates it is currently being executed; and - means the stage has not yet been executed. If remember = FALSE, this information is not available, so we only use this prefix notation for “caching stagerunners” (those with remember = TRUE).

              #' Generic for printing stageRunner objects.
#' 
#' @name stageRunner_show
#' @param indent integer. Internal parameter for keeping track of nested
#'   indentation level.
stageRunner_show <- function(indent = 0) {
  if (missing(indent)) {
    sum_stages <- function(x) sum(vapply(x,
      function(x) if (is.stagerunner(x)) sum_stages(x$stages) else 1L, integer(1)))
    caching <- if (self$remember) " caching"
    cat("A", caching, " stageRunner with ",
        sum_stages(self$stages), " stages:\n", sep = '')
  }

  stage_names <- names(self$stages) %||% rep("", length(self$stages))

  for (index in seq_along(stage_names)) {
    prefix <- paste0(rep('  ', (if (is.numeric(indent)) indent else 0) + 1), collapse = '')
    currently_executing_this_stage <- self$remember && began_stage(self$stages[[index]])

    if (currently_executing_this_stage) {
      next_stage <- treeSkeleton$new(self$stages[[index]])$last_leaf()$successor()$object
      if (( is.null(next_stage) && !self$.root()$.finished) ||
          (!is.null(next_stage) && !began_stage(next_stage)))
        marker <- '*' # Use a * if this is the next stage to be executed
        # TODO: Fix the bug where we are unable to tell if the last stage
        # finished without a .finished internal field.
        # We need to look at and set predecessors, not successors.
      else {
       marker <- '+' # Other use a + for completely executed stage
      }
    } else {
      marker <- '-'
    }

    prefix <- gsub('.$', marker, prefix)
    if (is.na(stage_names[[index]]) || stage_names[[index]] == "") {
      stage_name <- paste0("< Unnamed (stage ", index, ") >")
    } else {
      stage_name <- stage_names[[index]]
    }

    cat(prefix, stage_name, "\n")

    if (is.stagerunner(self$stages[[index]])) {
      self$stages[[index]]$show(indent = indent + 1)
    }
  }

  if (missing(indent)) {
    cat('Context ')
    print(self$.context)
  }

  NULL
}

# A helper function for determining if a stage has been run yet.
began_stage <- function(stage) {
  if (is.stagerunner(stage)) {
    any(vapply(stage$stages, began_stage, logical(1)))
  } else if (is.stageRunnerNode(stage)) {
    node <- treeSkeleton(stage)$predecessor()$object
    is.null(node) || node$executed
  }
}

#' @export
print.stageRunner <- function(x, ...) {
  x$show(...)
}

#' @export
print.stageRunnerNode <- function(x, ...) {
  x$show(...)
}

            

stagerunner-stage_names.R

              
            
              #' Retrieve a flattened list of canonical stage names for a stageRunner object
#'
#' For example, if we have stages
#'   \code{stages = list(a = list(b = 1, c = 2), d = 3, e = list(f = 4, g = 5))}
#' then this method would return
#'   \code{list('a/b', 'a/c', 'd', 'e/f', 'e/g')}
#'
#' @name stageRunner_stage_names
#' @return a list of canonical stage names.
# # @examples
# # f <- function() {}
# # sr <- stageRunner$new(new.env(),
# #   list(a = stageRunner$new(new.env(), list(b = f, c = f)), d = f,
# #   e = stageRunner$new(new.env(), list(f = f, g = f))))
# # sr$stage_names()
stageRunner_stage_names <- function() {
  nested_stages <- function(x) {
    if (is.stagerunner(x)) {
      lapply(x$stages, nested_stages)
    } else {
      x
    }
  }

  nested_names(lapply(self$stages, nested_stages))
}

#' Delimited names of a nested list.
#'
#' Unnamed values will use index number instead.
#'
#' @name nested_names
#' @param el list.
#' @param delim character. The delimiter with which to separate nested names.
#' @param prefix character. A prefix to every name.
#' @return a list of nested names
#' @examples
#' stagerunner:::nested_names(list(a = list(b = 1, c = list(d = 2, e = 3)), f = 4, 5))
#' # c('a/b', 'a/c/d', 'a/c/e', 'f', '3')
#' stagerunner:::nested_names(list(a = list(b = 1, c = 2), d = 2), delim = ' ', prefix = '#')
#' # c('#a b', '#a c', '#d')
nested_names <- function(el, delim = '/', prefix = '') {
  list_names <- names(el) %||% rep("", length(el))
  Reduce(c, lapply(seq_along(el), function(index) {
    name <- if (list_names[[index]] == "") as.character(index)
            else list_names[[index]]
    paste0(prefix,
      if (is.list(el[[index]])) {
        paste0(name, delim, nested_names(el[[index]], delim = delim, prefix = ''))
      } else name)
  }))
}
            

stagerunner-transform.R

              
            

Straightforwardly, apply some function (transformation) to every terminal node in the stagerunner. This is useful for simple debugging and monitoring. For example, if we wish to print the variables currently in the context of stagerunner prior to executing each stage, we can call

runner$transform(function(fn) {
  function(context, ...) {
    print(ls(context))
    fn(context, ...)
  }
})
              #' Transform the callable's of the terminal nodes of a stageRunner.
#'
#' Every terminal node in a stageRunner is of type stageRunnerNode.
#' These each have a callable, and this method transforms those
#' callables in the way given by the first argument.
#'
#' @name stageRunner_transform
#' @param transformation function. The function which transforms one callable
#'   into another.
stageRunner_transform <- function(transformation) {
  for (stage_index in seq_along(self$stages)) {
    self$stages[[stage_index]]$transform(transformation)
  }
  self
}


            

stagerunner.R

              
            
              #' @include stagerunner-initialize.R stagerunner-run.R stagerunner-around.R
#'   stagerunner-coalesce.R stagerunner-overlay.R stagerunner-transform.R
#'   stagerunner-append.R stagerunner-stage_names.R stagerunner-current_stage.R
#'   stagerunner-next_stage.R stagerunner-show.R stagerunner-has_key.R
#'   stagerunner-internal.R
#'   stageRunnerNode.R
NULL

            

We use R6 instead of the built-in reference classes for several reasons.

  1. Their definition is much more compact.
  2. It is possible to extend R6 definitions cross-packages.
  3. They suppor the notion of public and private membership.

A stagerunner is clearly represented as a reference object, rather than an S3 or S4 class, as it is by nature highly mutable: every stage execution triggers updates of the corresponding stage caches.

A stagerunner is primarly defined by its context and its stages. The former is an environment (or when used in conjunction with objectdiff, a \code{\link[objectdiff]{tracked_environment}} that holds the current state of the stagerunner.

A stagerunner's stages are a nested list of either stageRunnerNodes (wrappers for functions) or more stagerunners, the latter if we wish to group together logically bound collections of functions (like a data preparation procedure or a sequence of modeling steps).

              #' Stagerunners are parametrized sequences of linear execution.
#' 
#' @name stageRunner
#' @format NULL
#' @docType class
stageRunner_ <- R6::R6Class('stageRunner',
  active = list(context = function() self$.context),                            
  public = list(
    .context = NULL,
    stages = list(),
    remember = FALSE,
    .mode = "head",
    .parent = NULL,
    .finished = FALSE,
    .prefix = "",
    initialize     = stagerunner_initialize,
    run            = run,
    around         = stageRunner_around,
    coalesce       = stageRunner_coalesce,
    overlay        = stageRunner_overlay,
    transform      = stageRunner_transform,
    append         = stageRunner_append,
    stage_names    = stageRunner_stage_names,
    parent         = function() { self$.parent },
    children       = function() { self$stages },
    current_stage  = stageRunner_current_stage,
    next_stage     = stageRunner_next_stage,
    show           = stageRunner_show,
    has_key        = stageRunner_has_key,
    mode           = function() { self$mode },
    .set_parents   = stageRunner_.set_parents,
    .clear_cache   = stageRunner_.clear_cache,
    .root          = stageRunner_.root,

    # objectdiff intertwined functionality
    .set_prefixes  = stageRunner_.set_prefixes,
    .before_env    = stageRunner_.before_env,
    .mark_finished = stageRunner_.mark_finished,
    with_tracked_environment = function() {
      out <- is(self$context, 'tracked_environment')
      if (out) { requireNamespace("objectdiff", quietly = TRUE) }
      out
    }
  )
)

            

A little trick to ensure that a stagerunner can be constructed both as stagerunner(...) and stagerunner$new(...).

              #' @rdname stageRunner
#' @param ... Arguments to pass to stagerunner initialization.
#' @export
stageRunner <- structure(
  function(...) { stageRunner_$new(...) },
  class = "stageRunner_"
)

#' @export
#' @rdname stageRunner
stagerunner <- stageRunner

            

To make the above trick work, we need to prevent access to everything except new.

              #' @export
`$.stageRunner_` <- function(...) {
  stopifnot(identical(..2, "new"))
  ..1
}

#' Check whether an R object is a stageRunner object
#'
#' @export
#' @param obj any object.
#' @return \code{TRUE} if the object is of class
#'    \code{stageRunner}, \code{FALSE} otherwise.
is.stagerunner <- function(obj) inherits(obj, 'stageRunner')
#' @rdname is.stagerunner
#' @export
is.stageRunner <- is.stagerunner

            

stageRunnerNode-around.R

              
            
              #' Wrap a stageRunnerNode callable with another callable.
#'
#' @param other_node stagerunner or stageRunnerNode.
#' @return \code{TRUE} or \code{FALSE} according as the wrapping was
#'    successful.
#' @examples \dontrun{
#' node1 <- stageRunnerNode(function(e) print(2))
#' node2 <- stageRunnerNode(function(e) { print(1); yield(); print(3); })
#' node1$around(node2)
#' node1$run() # Will print 1 2 3
#' # Notice the provided "yield" keyword, which allows calling the
#' # node that is being wrapped.
#' }
stageRunnerNode_around <- function(other_node) {
  if (is.stageRunnerNode(other_node)) {
    other_node <- other_node$callable
  }

  if (is.null(other_node)) {
    return(FALSE)
  }

  if (!is.function(other_node)) {
    warning("Cannot apply stageRunner$around in a terminal ",
            "node except with a function. Instead, I got a ",
            class(other_node)[1])
    return(FALSE)
  }

  new_callable <- other_node

            

We inject the yield keyword.

                environment(new_callable) <- list2env(parent = environment(new_callable), list(
    .parent_context = self,
    yield           = around_yield(self$callable)
  ))
  self$callable <- new_callable
  TRUE
}

around_yield <- function(callable) {
            

Constructing the yield keyword is a little bit messy. We want to pass the exact same parameters as the call to the original callable, so can grab ... from two frames up. However, since we must also provide the function we are invoking with yield (i.e., the callable), we have in effect two different kinds of injections.

                yield <- function() {
    # ... lives up two frames, but the run function lives up 1,
    # so we have to do something ugly
    run <- eval.parent(quote(.parent_context$run))
    args <- append(eval.parent(quote(list(...)), n = 2),
      list(.callable = callable))
    do.call(run, args, envir = parent.frame())
  }
            

We don't need anything except the base environment for the body of the yield keyword itself.

                environment(yield) <- list2env(list(callable = callable), parent = baseenv())
  yield
}

            

stageRunnerNode-overlay.R

              
            

Consider the following runner.

  1. Import data.
  2. Munge data.
  3. Create model.
  4. Export model.

Imagine we wish to add some assertions at the end of each stage, like ensuring that data was in fact imported and that munging performed some necessary operations.

We can replace each function in the above four stages with a stageRunner consisting of the original function and a new “assertion” function. This is precisely the job of the around method on stageRunnerNodes.

              #' Append one stageRunnerNode around another.
#'
#' @param other_node stagerunner or stageRunnerNode.
#' @param label character. Under the hood, this will be the "stage name"
#'    for the stage represented by the \code{other_node} in the
#'    automatically generated new stageRunner used as this node's
#'    callable (assuming \code{flat} is \code{FALSE}).
#' @param flat logical. If \code{TRUE}.
#' @return \code{TRUE} or \code{FALSE} according as the wrapping was
#'    successful.
#' @examples \dontrun{
#' node1 <- stageRunnerNode(function(e) print(1))
#' node2 <- stageRunnerNode(function(e) print(2))
#' node1$overlay(node2)
#' node1$run() # Will print 1 2
#' }
stageRunnerNode_overlay <- function(other_node, label = NULL, flat = FALSE) {
  if (is.stageRunnerNode(other_node)) {
    other_node <- other_node$callable
  }

  if (is.null(other_node)) { 
    return(FALSE)
  }

  if (!is.stagerunner(other_node)) {
    other_node <- stageRunner$new(self$.context, other_node)
  }

  # Coerce the current callable object to a stageRunner so that
  # we can append the other_node's stageRunner.
  if (!is.stagerunner(self$callable)) {
    self$callable <- stageRunner$new(self$.context, self$callable)
  }

  # TODO: Fancier merging here
  if (isTRUE(flat)) {
    if (!is.character(label)) stop("flat coalescing needs a label")
    self$callable$stages[[label]] <- other_node
  } else {
    self$callable$append(other_node, label)
  }
}

            

stageRunnerNode-run.R

              
            
              #' Execute the callable of a stageRunnerNode.
#'
#' @param ... additional arguments to the \code{callable}. This allows,
#'    stagerunner stages to be uniformly parametrized (for example,
#'    if all stages should have a \code{verbose} parameter.
#' @param .cached_env An internal helper that passes the cached environment
#'    to be used for storing the results of this execution.
#' @param .callable Another internal helper used for some recursive
#'    metaprogramming.
#' @return \code{TRUE} if the execution was successful, or an error otherwise.
stageRunnerNode_run <- function(..., .cached_env = NULL, .callable = self$callable) {
  # TODO: Clean this up by using environment injection utility fn
  correct_cache <- .cached_env %||% self$.cached_env
  if (is.null(.callable)) {
    FALSE
  } else if (is.stagerunner(.callable)) {
    .callable$run(..., .cached_env = correct_cache)
  } else {
            

If we are executing a function, we inject the \code{cached_env} into the environment for use by, e.g., testing functions. Ideally, the callable should be able to determine what the state of the runner looked like before execution.

                  environment(.callable) <- list2env(
      list(cached_env = correct_cache), parent = environment(.callable)
    )
            

But once this function finishes executing, restore the environment of the callable to its former glory (i.e., remove the cached_env).

                  on.exit(environment(.callable) <- parent.env(environment(.callable)))
    .callable(self$.context, ...)
  }
  self$executed <- TRUE
}


            

stageRunnerNode-transform.R

              
            

This helper method is useful when we want to apply some transformation to all terminal nodes of a stageRunner. You can think of it as \code{\link{rapply}} for stageRunners.

              #' Transform a stageRunnerNode according to a functional.
#'
#' @param transformation function. An arity-1 function which takes the
#'    \code{callable} of a \code{stageRunnerNode} and transforms it
#'    into another callable (i.e. a function or a stagerunner). If the
#'    original \code{callable} is a stagerunner, its terminal nodes in
#'    turn will be transformed recursively.
#' @return The transformed callable.
#' @examples \dontrun{
#' increment <- 1
#' adder     <- function(x) x + increment
#' node      <- stageRunnerNode$new(function(e) print(adder(1)))
#' node$transform(function(fn) {
#'   environment(fn)$increment <- environment(fn)$increment + 1; fn
#' })
#' node$run() # Prints 3, rather than 2
#' }
stageRunnerNode_transform <- function(transformation) {
  if (is.stagerunner(self$callable)) {
    self$callable$transform(transformation)
  } else {
    self$callable <- transformation(self$callable)
  }
}

            

stageRunnerNode.R

              
            

In order to give us more flexibility on the terminal nodes of a stagerunner (the actual functions that will be executed on the stagerunner's context), we wrap them in an R6 class called a [stageRunnerNode]. This will be extremely useful if we wish to dynamically allow our stagerunners to be extended or wrapped with functionality.

For example, if we have a runner such as

  1. Import data.
  2. Munge data.
  3. Create model.
  4. Export model.

we might want to run tests for each stage. To do so, we can replace each terminal node, a function, with a stagerunner consisting of two functions. We can do this with the overlay helper method:

runner$overlay(test_runner)

Here, test_runner is another stagerunner with the exact same structure as our main runner, but with testing functions in its terminal nodes.

              #' Stagerunner nodes are environment wrappers around individual stages
#' (i.e. functions) in order to track meta-data (e.g., for caching).
#' 
#' @name stageRunnerNode
#' @format NULL
#' @docType class
stageRunnerNode_ <- R6::R6Class('stageRunnerNode',
  public = list(
    callable = NULL,
    .cached_env = NULL,
    .context = NULL,
    .parent = NULL,
    executed = FALSE,
    initialize = function(.callable, .context = NULL) {
      stopifnot(is_any(.callable, c('stageRunner', 'function', 'NULL')))
      self$callable <- .callable
      self$.context <- .context
      self$executed <- FALSE
    },

    run          = stageRunnerNode_run,
    around       = stageRunnerNode_around,
    overlay      = stageRunnerNode_overlay,
    transform    = stageRunnerNode_transform,
    was_executed = function() { self$executed },
    parent       = function() { attr(self, "parent") },
    children     = function() list(),
    show         = function() { cat("A stageRunner node containing: \n"); print(self$callable) },

    # Functions which intertwine with the objectdiff package
    index        = function() {
      ix <- which(vapply(attr(self, "parent")$stages,
        function(x) identical(self, x), logical(1)))
      paste0(attr(self, "parent")$.prefix, ix)
    }
  )
)

#' @export
stageRunnerNode <- structure(
  function(...) { stageRunnerNode_$new(...) },
  class = "stageRunnerNode_"
)

#' @export
`$.stageRunnerNode_` <- function(...) {
  stopifnot(identical(..2, "new"))
  ..1
}

#' @param obj ANY. An object to test for class \code{stageRunnerNode}.
#' @export
#' @rdname stageRunnerNode
is.stageRunnerNode <- function(obj) inherits(obj, 'stageRunnerNode')

            

treeSkeleton-initialize.R

              
            
              #' Initialize a treeSkeleton object.
#'
#' treeSkeleton objects allow you to traverse a reference class object
#' as if it had a tree structure, merely by knowing how to call parent
#' or child nodes.
#'
#' @name treeSkeleton__initialize
#' @param object ANY. If a reference class object, then \code{parent_caller}
#'    and \code{children_caller} will refer to reference class methods.
#'    If an attribute on the object with names of \code{children_caller} and
#'    \code{parent_caller} exists, those will be used. Otherwise, the
#'    generic methods will be used.
#' @param parent_caller character. The name of the reference class method
#'    that returns the parent object, if the object was a node in a tree
#'    structure.
#' @param children_caller character. The name of the reference class method
#'    that returns the child objects, if the object was a node in a tree
#'    structure.
#' @return a treeSkeleton object.
treeSkeleton__initialize <- function(object, parent_caller = 'parent',
                                     children_caller = 'children') {
  stopifnot(!is.null(object))
  self$object  <- object
  self$.parent   <- uninitialized_field()
  self$.children <- uninitialized_field()

  # Make sure parent_caller and children_caller are methods of object
  if (inherits(object, "R6")) {
    stopifnot(all(c(parent_caller, children_caller) %in% ls(object)))
  }

  self$parent_caller   <- parent_caller
  self$children_caller <- children_caller
  NULL
}

            

treeSkeleton-parent_index.R

              
            
              #' Find the index of the current object in the children of its parent.
#' @name treeSkeleton__.parent_index
treeSkeleton__.parent_index <- function() {
  if (!is.null(ci <- attr(self$object, 'child_index'))) ci
  # Hack for accessing attribute modifications on a reference class object
  # See: http://stackoverflow.com/questions/22752021/why-is-r-capricious-in-its-use-of-attributes-on-reference-class-objects
  else if (inherits(self$object, 'refClass') && !inherits(self$object, 'R6') &&
           !is.null(ci <- attr(attr(self$object, '.xData')$.self, 'child_index'))) ci
  else # look through the parent's children and compare to .self
    # Danger Will Robinson! This will lead to strange bugs if our tree
    # has several nodes with duplicate objects
    which(vapply(
      self$parent()$children(),
      function(node) identical(node$object, self$object), logical(1)))[1]
}

            

treeSkeleton-predecessor.R

              
            
              #' Attempt to find the predecessor of the current node.
#'
#' @name treeSkeleton__predecessor
#' @param index integer. If specified, this is the index of the current node
#'   in the children of its parent. (Sometimes, this cannot be computed
#'   automatically, and should usually be provided.)
#' @return predecessor for the wrapped object.
treeSkeleton__predecessor<- function(index = NULL) {
            

We define the predecessor of the root node as NULL.

                if (is.null(p <- self$parent())) return(NULL) 

  parent_index <- if (is.null(index)) self$.parent_index() else index
  stopifnot(is.finite(parent_index))

            

If we are the first leaf in the list of our parent's children, our predecessor is our parent's successor

                if (parent_index == 1) {
    p$predecessor()
  } else {
            

Otherwise, the predecessor is the last leaf of the previous child.

                  p$children()[[parent_index - 1]]$last_leaf()
  }
}

            

treeSkeleton-successor.R

              
            
              #' Attempt to find the successor of the current node.
#'
#' @name treeSkeleton__successor
#' @param index integer. If specified, this is the index of the current node
#'   in the children of its parent. (Sometimes, this cannot be computed
#'   automatically, and should usually be provided.)
#' @return successor for the wrapped object.
treeSkeleton__successor <- function(index = NULL) {
            

We define the successor of the root node as NULL.

                if (is.null(p <- self$parent())) return(NULL) # no successor of root node

  parent_index <- if (is.null(index)) self$.parent_index() else index
  stopifnot(is.finite(parent_index))

            

If we are the last leaf in the list of our parent's children, our successor is our parent's successor

                if (parent_index == length(p$children())) {
    p$successor()
  } else {
            

Otherwise, the successor is the first leaf of the next child node.

                  p$children()[[parent_index + 1]]$first_leaf()
  }
}

            

treeSkeleton.R

              
            

Stagerunners are tree structures) and come with a natural set of operations, like taking the predecessor, successor, and root of a node. However, these are not entirely simple to implement in a manner that is implementation-independent.

Specifically, we recognize that the notion of a node successor and predecessor is implementation agnostic as long as we have access to class methods that provide access to a node's parent and children. In this case, we can write an implementation-agnostic version that works regardless of whether the object is an S3, S4, or R6 object.

              #' @include treeSkeleton-initialize.R treeSkeleton-predecessor.R
#'    treeSkeleton-successor.R treeSkeleton-parent_index.R
NULL

#' Find the root node of the tree (the only one with no parent).
#'
#' @name treeSkeleton__root
#' @return The root node of the tree or NULL if empty tree.
treeSkeleton__root <- function() {
  if (is.null(self$parent())) self
  else self$parent()
}

#' Find the first leaf in a tree.
#'
#' @name treeSkeleton__first_leaf
#' @return The first leaf, that is, the first terminal child node.
treeSkeleton__first_leaf <- function() {
  if (length(self$children()) == 0) self
  else self$children()[[1]]$first_leaf()
}

#' Find the last leaf in a tree.
#'
#' @name treeSkeleton__last_leaf
#' @return The last leaf, that is, the last terminal child node.
treeSkeleton__last_leaf <- function() {
  if (length(childs <- self$children()) == 0) self
  else childs[[length(childs)]]$last_leaf()
}

#' Find the parent of the current object wrapped in a treeSkeleton.
#' @name treeSkeleton__parent
treeSkeleton__parent <- function() {
  if (!is.unitialized_field(self$.parent)) return(self$.parent)
  self$.parent <-
    if (is.null(obj <- OOP_type_independent_method(self$object, self$parent_caller))) NULL
    else treeSkeleton$new(obj, parent_caller = self$parent_caller,
                          children_caller = self$children_caller)
}

#' Find the children of the current object wrapped in treeSkeletons.
#' @name treeSkeleton__children
treeSkeleton__children <- function() {
  if (!is.unitialized_field(self$.children)) return(self$.children)
  prechildren <- OOP_type_independent_method(self$object, self$children_caller)
  self$.children <- lapply(prechildren, treeSkeleton$new,
                       parent_caller = self$parent_caller)
}

#' Find the key with the given index using the names of the lists
#' that parametrize each node's children.
#'
#' For example, if our tree structure is given by
#'   \code{list(a = list(b = 1, c = 2))}
#' then calling \code{find('a/b')} on the root node will return \code{1}.
#'
#' @name treeSkeleton__find
#' @param key character. The key to find in the given tree structure,
#'    whether nodes are named by their name in the \code{children()}
#'    list. Numeric indices can be used to refer to unnamed nodes.
#'    For example, if key is \code{a/2/b}, this method would try to find
#'    the current node's child \code{a}'s second child's \code{b} child.
#'    (Just look at the examples).
#' @return the subtree or terminal node with the given key.
#' @examples 
#' \dontrun{
#' sr <- stageRunner$new(new.env(), list(a = list(force, list(b = function(x) x + 1))))
#' stagerunner:::treeSkeleton$new(sr)$find('a/2/b') # function(x) x + 1
#' }
treeSkeleton__find <- function(key) {
            

Currently out of service! Will be back shortly.

              #  stopifnot(is.character(key))
#  if (length(key) == 0 || identical(key, '')) return(self$object)
#  # Extract "foo" from "foo/bar/baz"
#  subkey <- regmatches(key, regexec('^[^/]+', key))[[1]]
#  key_remainder <- substr(key, nchar(subkey) + 2, nchar(key))
#  if (grepl('^[0-9]+', subkey)) {
#    subkey <- as.integer(subkey)
#    key_falls_within_children <- length(self$children()) >= subkey
#    stopifnot(key_falls_within_children)
#  } else {
#    matches <- grepl(subkey, names(self$children()))
#    stopifnot(length(matches) == 1)
#    key <- which(matches)
#  }
#  self$children()[[key]]$find(key_remainder)
}

#' This class implements iterators for a tree-based structure
#' without an actual underlying tree.
#'
#' In other dynamic languages, this kind of behavior would be called
#' duck typing. Imagine we have an object \code{x} that is of some
#' reference class. This object has a tree structure, and each node
#' in the tree has a parent and children. However, the methods to
#' fetch a node's parent or its children may have arbitrary names.
#' These names are stored in \code{treeSkeleton}'s \code{parent_caller}
#' and \code{children_caller} fields. Thus, if \code{x$methods()}
#' refers to \code{x}'s children and \code{x$parent_method()} refers
#' to \code{x}'s parent, we could define a \code{treeSkeleton} for
#' \code{x} by writing \code{treeSkeleton$new(x, 'parent_method', 'methods')}.
#'
#' The iterators on a \code{treeSkeleton} use the standard definition of
#' successor, predecessor, ancestor, etc.
#'
#' @name treeSkeleton
#' @docType class
#' @format NULL
treeSkeleton_ <- R6::R6Class('treeSkeleton',
  public = list(
    object = 'ANY',
            

As long as we know how to get an objects parent and children, we will be able to determine all the nice derived methods below.

                  parent_caller = 'character',
    children_caller = 'character',
    .children = 'ANY',
    .parent = 'ANY',

    initialize    = treeSkeleton__initialize,
    successor     = treeSkeleton__successor,
    predecessor   = treeSkeleton__predecessor,
    parent        = treeSkeleton__parent,
    children      = treeSkeleton__children,
    root          = treeSkeleton__root,
    first_leaf    = treeSkeleton__first_leaf,
    last_leaf     = treeSkeleton__last_leaf,
    find          = treeSkeleton__find,
    .parent_index = treeSkeleton__.parent_index,
    show          = function() { cat("treeSkeleton wrapping:\n"); print(self$object) }
  )
)

            

Some fancy tricks to make treeSkeleton(...) and treeSkeleton(...) have the same effect, just like in traditional reference classes.

              #' @export
treeSkeleton <- structure(
  function(...) { treeSkeleton_$new(...) },
  class = "treeSkeleton_"
)

#' @export
`$.treeSkeleton_` <- function(...) {
  stopifnot(identical(..2, "new"))
  ..1
}

uninitialized_field <- function() {
  structure(NULL, class = "uninitialized_field")
}

is.unitialized_field <- function(x) {
  is(x, "uninitialized_field")
}

            

utils.R

              
            

A handy little trick from Hadley: this will return the second argument if the first is NULL.

              `%||%` <- function(x, y) if (is.null(x)) y else x

contains_true <- function(x) {
  if (is.list(x)) any(vapply(x, contains_true, logical(1)))
  else any(x)
}

all_logical <- function(x) {
  is.logical(x) || all(vapply(x,
    function(y) if (is.atomic(y)) is.logical(y) else all_logical(y),
  logical(1)))
}

            

A helper function for printing stagerunner execution progress.

              as.ordinal <- function(number) {
  ordinals <- list('first', 'second', 'third', 'fourth', 'fifth',
    'sixth', 'seventh', 'eighth', 'ninth', 'tenth', 'eleventh',
    'twelfth', 'thirteenth', 'fourteenth', 'fifteenth',
    'sixteenth', 'seventeenth', 'eighteenth', 'nineteenth',
    'twentieth')
  ext <- c("th", "st", "nd", "rd", rep("th", 6))
  ordinals[number][[1]] %||%
  paste0(number, ext[[(number %% 10) + 1]])
}

            

Print some nice messages that tell you what type the stagerunner constructor expects.

              enforce_type <- function(value, expected, klass, name = deparse(substitute(value))) {
  if (missing(value)) {
    stop(sprintf(
      "Please provide %s%s.",
      articleize(sQuote(crayon::red(name))),
      if (missing(klass)) "" else paste( " to a", klass)
    ))
  }

  check <- utils::getFromNamespace(paste0("is.", expected), "base")
  if (!check(value)) {
    stop(sprintf(
      "Please pass %s as the %s%s; instead I got a %s.",
      articleize(sQuote(crayon::yellow(expected))), dQuote(name),
      if (missing(klass)) "" else paste(" for a", klass),
      crayon::red(sclass(value))
    ))
  }
}

sclass <- function(obj) { class(obj)[1L] }

articleize <- function(word) {
  sprintf("a%s %s", if (is_vowel(first_letter(word))) "n" else "", word)
}

is_vowel <- function(char) {
  is.element(char, c("a", "e", "i", "o", "u", "A", "E", "I", "O", "U"))
}

first_letter <- function(word) {
  substring(gsub("[^a-zA-Z]|\\[3[0-9]m", "", word), 1, 1)
}

# Whether obj is of any of the given types.
is_any <- function(obj, klasses) {
  any(vapply(klasses, inherits, logical(1), x = obj))
}

package_function <- function(pkg, fn) { # for when using :: breaks R CMD check
  get(fn, envir = getNamespace(pkg))
}

            

Used in conjunction with treeSkeleton so that it works for S3, S4, RC, and R6 classes.

              #' Call a method on an object regardless of its OOP type.
#'
#' @name OOP_type_independent_method 
#' @param object any. An R object of variable OOP type (S3, S4, RC, R6).
#' @param method character. The method to call on the \code{object}. If the
#'    latter is a reference class, it use the \code{$} operator to access the method.
#'    (For example, \code{object$some_method}). If it has an attribute with the name
#'    \code{method}, it will use that attribute as the method to call. Otherwise,
#'    it will try to fetch a generic with the name \code{method} using \code{get}.
OOP_type_independent_method <- function(object, method) {
  if (method %in% names(attributes(object))) {
    attr(object, method)
  } else if (is.environment(object) && method %in% ls(object)) {
    object[[method]]()
  } else {
    get(method)(object)
  }
}

            
              as.list.environment <- function(env) {
  out <- base::as.list.environment(env)
  lapply(out, function(x) if (is.environment(x)) as.list(x) else x)
}