Skip to content

Commit

Permalink
WIP: add parallel processing for historic, detect_anomalies, and cros…
Browse files Browse the repository at this point in the history
…s_validation methods
  • Loading branch information
MMenchero committed Aug 24, 2024
1 parent 0920b1a commit 60622ef
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 177 deletions.
87 changes: 29 additions & 58 deletions R/nixtla_client_cross_validation.R
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#' @param finetune_loss Loss function to use for finetuning. Options are: "default", "mae", "mse", "rmse", "mape", and "smape".
#' @param clean_ex_first Clean exogenous signal before making the forecasts using 'TimeGPT'.
#' @param model Model to use, either "timegpt-1" or "timegpt-1-long-horizon". Use "timegpt-1-long-horizon" if you want to forecast more than one seasonal period given the frequency of the data.
#' @param num_partitions A positive integer, "auto", or NULL specifying the number of partitions. When set to "auto", the number of partitions is equal to the number of available cores. When NULL, it defaults to a single partition.
#'
#' @return A tsibble or a data frame with 'TimeGPT''s cross validation result.
#' @export
Expand All @@ -26,7 +27,9 @@
#' fcst <- nixtlar::nixtla_client_cross_validation(df, h = 8, id_col = "unique_id", n_windows = 5)
#' }
#'
nixtla_client_cross_validation <- function(df, h=8, freq=NULL, id_col=NULL, time_col="ds", target_col="y", X_df=NULL, level=NULL, quantiles=NULL, n_windows=1, step_size=NULL, finetune_steps=0, finetune_loss="default", clean_ex_first=TRUE, model="timegpt-1"){
nixtla_client_cross_validation <- function(df, h=8, freq=NULL, id_col=NULL, time_col="ds", target_col="y", X_df=NULL, level=NULL, quantiles=NULL, n_windows=1, step_size=NULL, finetune_steps=0, finetune_loss="default", clean_ex_first=TRUE, model="timegpt-1", num_partitions=NULL){

start <- Sys.time()

# Prepare data ----
names(df)[which(names(df) == time_col)] <- "ds"
Expand Down Expand Up @@ -109,30 +112,24 @@ nixtla_client_cross_validation <- function(df, h=8, freq=NULL, id_col=NULL, time

# Create request ----
url_cv <- "https://api.nixtla.io/cross_validation_multi_series"
req_cv <- httr2::request(url_cv) |>
httr2::req_headers(
"accept" = "application/json",
"content-type" = "application/json",
"authorization" = paste("Bearer", .get_api_key())
) |>
httr2::req_user_agent("nixtlar") |>
httr2::req_body_json(data = timegpt_data) |>
httr2::req_retry(
max_tries = 6,
is_transient = .transient_errors
)

# Send request and fetch response ----
resp_cv <- req_cv |>
httr2::req_perform() |>
httr2::resp_body_json()

payload_list <- .partition_payload(timegpt_data, num_partitions)

future::plan(future::multisession)

responses <- .make_request(url_cv, payload_list)

# Extract cross-validation ----
cv_list <- lapply(resp_cv$data$forecast$data, unlist)
res <- data.frame(do.call(rbind, cv_list))
colnames(res) <- resp_cv$data$forecast$columns
res[,4:ncol(res)] <- lapply(res[,4:ncol(res)], as.numeric)
res$cutoff <- lubridate::ymd_hms(res$cutoff)
cross_val_list <- lapply(responses, function(resp) {
cv_list <- lapply(resp$data$forecast$data, unlist)
cv <- data.frame(do.call(rbind, cv_list))
names(cv) <- resp$data$forecast$columns
return(cv)
})

res <- do.call(rbind, cross_val_list)

res[, 4:ncol(res)] <- future.apply::future_lapply(res[, 4:ncol(res)], as.numeric)

# Rename quantile columns if necessary
if(!is.null(quantiles)){
Expand Down Expand Up @@ -160,41 +157,10 @@ nixtla_client_cross_validation <- function(df, h=8, freq=NULL, id_col=NULL, time
}
}

# Data transformation ----
if(tsibble::is_tsibble(df)){
res$ds <- switch(freq,
"Y" = as.numeric(substr(res$ds, 1, 4)),
"A" = as.numeric(substr(res$ds, 1, 4)),
"Q" = tsibble::yearquarter(res$ds),
"MS" = tsibble::yearmonth(res$ds),
"W" = tsibble::yearweek(res$ds),
"H" = lubridate::ymd_hms(res$ds),
lubridate::ymd(res$ds) # default (daily or other)
)
res$cutoff <- switch(freq,
"Y" = as.numeric(substr(res$cutoff, 1, 4)),
"A" = as.numeric(substr(res$cutoff, 1, 4)),
"Q" = tsibble::yearquarter(res$cutoff),
"MS" = tsibble::yearmonth(res$cutoff),
"W" = tsibble::yearweek(res$cutoff),
"H" = lubridate::ymd_hms(res$cutoff),
lubridate::ymd(res$cutoff) # default (daily or other)
)
if(is.null(id_col)){
res <- tsibble::as_tsibble(res, index="ds")
}else{
res <- tsibble::as_tsibble(res, key="unique_id", index="ds")
}
}else{
# If df is a data frame, convert ds to dates
if(freq == "H"){
res$ds <- lubridate::ymd_hms(res$ds)
res$cutoff <- lubridate::ymd_hms(res$cutoff)
}else{
res$ds <- lubridate::ymd(res$ds)
res$cutoff <- lubridate::ymd(res$cutoff)
}
}
# Date transformation ----
res <- .transform_output_dates(res, "ds", freq, data$flag)
new_cutoff <- future.apply::future_lapply(res$cutoff, lubridate::ymd_hms)
res$cutoff <- do.call(c, new_cutoff)

# Rename columns ----
colnames(res)[which(colnames(res) == "ds")] <- time_col
Expand All @@ -206,5 +172,10 @@ nixtla_client_cross_validation <- function(df, h=8, freq=NULL, id_col=NULL, time
dplyr::select(-c(.data$unique_id))
}

row.names(res) <- NULL

end <- Sys.time()
print(paste0("Total execution time: ", end-start))

return(res)
}
74 changes: 27 additions & 47 deletions R/nixtla_client_detect_anomalies.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#' @param level The confidence level (0-100) for the prediction interval used in anomaly detection. Default is 99.
#' @param clean_ex_first Clean exogenous signal before making the forecasts using 'TimeGPT'.
#' @param model Model to use, either "timegpt-1" or "timegpt-1-long-horizon". Use "timegpt-1-long-horizon" if you want to forecast more than one seasonal period given the frequency of the data.
#' @param num_partitions A positive integer, "auto", or NULL specifying the number of partitions. When set to "auto", the number of partitions is equal to the number of available cores. When NULL, it defaults to a single partition.
#'
#' @return A tsibble or a data frame with the anomalies detected in the historical period.
#' @export
Expand All @@ -19,7 +20,9 @@
#' fcst <- nixtlar::nixtla_client_anomaly_detection(df, id_col="unique_id")
#' }
#'
nixtla_client_detect_anomalies <- function(df, freq=NULL, id_col=NULL, time_col="ds", target_col="y", level=c(99), clean_ex_first=TRUE, model="timegpt-1"){
nixtla_client_detect_anomalies <- function(df, freq=NULL, id_col=NULL, time_col="ds", target_col="y", level=c(99), clean_ex_first=TRUE, model="timegpt-1", num_partitions=NULL){

start <- Sys.time()

# Prepare data ----
names(df)[which(names(df) == time_col)] <- "ds"
Expand Down Expand Up @@ -67,54 +70,26 @@ nixtla_client_detect_anomalies <- function(df, freq=NULL, id_col=NULL, time_col=

# Create request ----
url_anomaly <- "https://api.nixtla.io/anomaly_detection_multi_series"
req_anomaly <- httr2::request(url_anomaly) |>
httr2::req_headers(
"accept" = "application/json",
"content-type" = "application/json",
"authorization" = paste("Bearer", .get_api_key())
) |>
httr2::req_user_agent("nixtlar") |>
httr2::req_body_json(data = timegpt_data) |>
httr2::req_retry(
max_tries = 6,
is_transient = .transient_errors
)

# Send request and fetch response
resp_anomaly <- req_anomaly |>
httr2::req_perform() |>
httr2::resp_body_json()

payload_list <- .partition_payload(timegpt_data, num_partitions)

future::plan(future::multisession)

responses <- .make_request(url_anomaly, payload_list)

# Extract anomalies ----
anomaly_list <- lapply(resp_anomaly$data$forecast$data, unlist)
res <- data.frame(do.call(rbind, anomaly_list))
colnames(res) <- resp_anomaly$data$forecast$columns
res[,3:ncol(res)] <- lapply(res[,3:ncol(res)], as.numeric)

# Data transformation ----
if(tsibble::is_tsibble(df)){
res$ds <- switch(freq,
"Y" = as.numeric(substr(res$ds, 1, 4)),
"A" = as.numeric(substr(res$ds, 1, 4)),
"Q" = tsibble::yearquarter(res$ds),
"MS" = tsibble::yearmonth(res$ds),
"W" = tsibble::yearweek(res$ds),
"H" = lubridate::ymd_hms(res$ds),
lubridate::ymd(res$ds) # default (daily or other)
)
if(is.null(id_col)){
res <- tsibble::as_tsibble(res, index="ds")
}else{
res <- tsibble::as_tsibble(res, key="unique_id", index="ds")
}
}else{
# If df is a data frame, convert ds to dates
if(freq == "H"){
res$ds <- lubridate::ymd_hms(res$ds)
}else{
res$ds <- lubridate::ymd(res$ds)
}
}
anomaly_list <- lapply(responses, function(resp) {
anm_list <- lapply(resp$data$forecast$data, unlist)
anm <- data.frame(do.call(rbind, anm_list))
names(anm) <- resp$data$forecast$columns
return(anm)
})

res <- do.call(rbind, anomaly_list)
res[, 3:ncol(res)] <- future.apply::future_lapply(res[, 3:ncol(res)], as.numeric)

# Date transformation ----
res <- .transform_output_dates(res, "ds", freq, data$flag)

# Rename columns ----
colnames(res)[which(colnames(res) == "ds")] <- time_col
Expand All @@ -126,5 +101,10 @@ nixtla_client_detect_anomalies <- function(df, freq=NULL, id_col=NULL, time_col=
dplyr::select(-c(.data$unique_id))
}

row.names(res) <- NULL

end <- Sys.time()
print(paste0("Total execution time: ", end-start))

return(res)
}
10 changes: 5 additions & 5 deletions R/nixtla_client_forecast.R
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
#'
nixtla_client_forecast <- function(df, h=8, freq=NULL, id_col=NULL, time_col="ds", target_col="y", X_df=NULL, level=NULL, quantiles=NULL, finetune_steps=0, finetune_loss="default", clean_ex_first=TRUE, add_history=FALSE, model="timegpt-1", num_partitions=NULL){

start_time <- Sys.time()
start <- Sys.time()
# Prepare data ----
names(df)[which(names(df) == time_col)] <- "ds"
names(df)[which(names(df) == target_col)] <- "y"
Expand Down Expand Up @@ -120,7 +120,6 @@ nixtla_client_forecast <- function(df, h=8, freq=NULL, id_col=NULL, time_col="ds
})

fcst <- do.call(rbind, fcst_list)

if(!is.null(level)){
fcst[, 3:ncol(fcst)] <- future.apply::future_lapply(fcst[, 3:ncol(fcst)], as.numeric)
}else{
Expand Down Expand Up @@ -154,7 +153,7 @@ nixtla_client_forecast <- function(df, h=8, freq=NULL, id_col=NULL, time_col="ds
}

# Date transformation ----
fcst <- .transform_output_dates(fcst, freq, data$flag)
fcst <- .transform_output_dates(fcst, "ds", freq, data$flag)

# Rename columns ----
names(fcst)[which(names(fcst) == "ds")] <- time_col
Expand All @@ -180,8 +179,9 @@ nixtla_client_forecast <- function(df, h=8, freq=NULL, id_col=NULL, time_col="ds
}

row.names(fcst) <- NULL
end_time <- Sys.time()
print(paste0("Total execution time: ", end_time-start_time))

end <- Sys.time()
print(paste0("Total execution time: ", end-start))

return(fcst)
}
72 changes: 27 additions & 45 deletions R/nixtla_client_historic.R
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#' @param finetune_steps Number of steps used to finetune 'TimeGPT' in the new data.
#' @param finetune_loss Loss function to use for finetuning. Options are: "default", "mae", "mse", "rmse", "mape", and "smape".
#' @param clean_ex_first Clean exogenous signal before making the forecasts using 'TimeGPT'.
#' @param model Model to use, either "timegpt-1" or "timegpt-1-long-horizon". Use "timegpt-1-long-horizon" if you want to forecast more than one seasonal period given the frequency of the data.
#' @param num_partitions A positive integer, "auto", or NULL specifying the number of partitions. When set to "auto", the number of partitions is equal to the number of available cores. When NULL, it defaults to a single partition.
#'
#' @return 'TimeGPT''s forecast for the in-sample period.
#' @export
Expand All @@ -21,7 +23,9 @@
#' fcst <- nixtlar::nixtla_client_historic(df, id_col="unique_id", level=c(80,95))
#' }
#'
nixtla_client_historic <- function(df, freq=NULL, id_col=NULL, time_col="ds", target_col="y", level=NULL, quantiles=NULL, finetune_steps=0, finetune_loss="default", clean_ex_first=TRUE){
nixtla_client_historic <- function(df, freq=NULL, id_col=NULL, time_col="ds", target_col="y", level=NULL, quantiles=NULL, finetune_steps=0, finetune_loss="default", clean_ex_first=TRUE, model="timegpt-1", num_partitions=NULL){

start <- Sys.time()

# Prepare data ----
names(df)[which(names(df) == time_col)] <- "ds"
Expand Down Expand Up @@ -81,30 +85,24 @@ nixtla_client_historic <- function(df, freq=NULL, id_col=NULL, time_col="ds", ta

# Create request ----
url_historic <- "https://api.nixtla.io/historic_forecast_multi_series"
req_hist <- httr2::request(url_historic) |>
httr2::req_headers(
"accept" = "application/json",
"content-type" = "application/json",
"authorization" = paste("Bearer", .get_api_key())
) |>
httr2::req_user_agent("nixtlar") |>
httr2::req_body_json(data = timegpt_data) |>
httr2::req_retry(
max_tries = 6,
is_transient = .transient_errors
)

# Send request and fetch response
resp_hist <- req_hist |>
httr2::req_perform() |>
httr2::resp_body_json()

payload_list <- .partition_payload(timegpt_data, num_partitions)

future::plan(future::multisession)

responses <- .make_request(url_historic, payload_list)

# Extract fitted values ----
fit_list <- lapply(resp_hist$data$forecast$data, unlist)
fitted <- data.frame(do.call(rbind, fit_list), stringsAsFactors=FALSE)
names(fitted) <- resp_hist$data$forecast$columns
fitted[,3:ncol(fitted)] <- lapply(fitted[,3:ncol(fitted)], as.numeric)
fitted_list <- lapply(responses, function(resp) {
fit_list <- lapply(resp$data$forecast$data, unlist)
fit <- data.frame(do.call(rbind, fit_list))
names(fit) <- resp$data$forecast$columns
return(fit)
})

fitted <- do.call(rbind, fitted_list)
fitted <- fitted[,-which(names(fitted) == "y")]
fitted[, 3:ncol(fitted)] <- future.apply::future_lapply(fitted[, 3:ncol(fitted)], as.numeric)

# Rename quantile columns if necessary
if(!is.null(quantiles)){
Expand Down Expand Up @@ -132,29 +130,8 @@ nixtla_client_historic <- function(df, freq=NULL, id_col=NULL, time_col="ds", ta
}
}

# Data transformation ----
if(tsibble::is_tsibble(df)){
fitted$ds <- switch(freq,
"Y" = as.numeric(substr(fitted$ds, 1, 4)),
"A" = as.numeric(substr(fitted$ds, 1, 4)),
"Q" = tsibble::yearquarter(fitted$ds),
"MS" = tsibble::yearmonth(fitted$ds),
"W" = tsibble::yearweek(fitted$ds),
"H" = lubridate::ymd_hms(fitted$ds),
lubridate::ymd(fitted$ds)) # default (daily "D" or other)
if(is.null(id_col)){
fitted <- tsibble::as_tsibble(fitted, index="ds")
}else{
fitted <- tsibble::as_tsibble(fitted, key="unique_id", index="ds")
}
}else{
# If df is a data frame, convert ds to dates
if(freq == "H"){
fitted$ds <- lubridate::ymd_hms(fitted$ds)
}else{
fitted$ds <- lubridate::ymd(fitted$ds)
}
}
# Date transformation ----
fitted <- .transform_output_dates(fitted, "ds", freq, data$flag)

# Rename columns ----
names(fitted)[which(names(fitted) == "ds")] <- time_col
Expand All @@ -166,5 +143,10 @@ nixtla_client_historic <- function(df, freq=NULL, id_col=NULL, time_col="ds", ta
dplyr::select(-c(.data$unique_id))
}

row.names(fitted) <- NULL

end <- Sys.time()
print(paste0("Total execution time: ", end-start))

return(fitted)
}
Loading

0 comments on commit 60622ef

Please sign in to comment.