Skip to content

Instantly share code, notes, and snippets.

@inardini
Last active August 31, 2022 09:56
Show Gist options
  • Select an option

  • Save inardini/f814cc5963edb69f3599abdd77c405c8 to your computer and use it in GitHub Desktop.

Select an option

Save inardini/f814cc5963edb69f3599abdd77c405c8 to your computer and use it in GitHub Desktop.
# Vertex AI BQML pipeline pseudo code
@dsl.pipeline(
name=PIPELINE_NAME,
description="A batch pipeline to train ARIMA PLUS and generate predictions using BQML",
)
def pipeline(
):
from google_cloud_pipeline_components.v1.bigquery import \
( BigqueryQueryJobOp,
BigqueryCreateModelJobOp,
BigqueryMLArimaEvaluateJobOp,
BigqueryEvaluateModelJobOp,
BigqueryForecastModelJobOp,
BigqueryExplainForecastModelJobOp
)
from google_cloud_pipeline_components.experimental.vertex_notification_email import VertexNotificationEmailOp
notify_email_task = VertexNotificationEmailOp(recipients=["email1@domain.com", "email2@domain.com"])
with dsl.ExitHandler(notify_email_task, name='demand forecasting pipeline'):
# Create the training dataset
create_training_dataset_op = BigqueryQueryJobOp(query=...).set_display_name("get train data")
# Run an ARIMA PLUS model experiment
bq_arima_model_op = BigqueryCreateModelJobOp(
query=...).set_display_name("run arima+ model experiment").after(create_training_dataset_op)
# Evaluate ARIMA PLUS time series
bq_arima_evaluate_time_series_op = BigqueryMLArimaEvaluateJobOp(
...).set_display_name("evaluate arima plus time series").after(bq_arima_model_op)
# Evaluate ARIMA Plus model
bq_arima_evaluate_model_op = BigqueryEvaluateModelJobOp(
...).set_display_name("evaluate arima plus model").after(bq_arima_model_op)
# Plot model metrics
get_evaluation_model_metrics_op = get_model_evaluation_metrics(bq_arima_evaluate_model_op.outputs['evaluation_metrics']).after(bq_arima_evaluate_model_op).set_display_name("plot evaluation metrics")
# Check the model performance. If ARIMA_PLUS average MAE metric is below to a minimal threshold
with Condition(get_evaluation_model_metrics_op.outputs['avg_mean_absolute_error'] < PERF_THRESHOLD, name='avg. mae good'):
# Train arima+ model
bq_arima_model_op = BigqueryCreateModelJobOp(
query=...).set_display_name("train arima+ model").after(create_training_dataset_op)
# Generate the ARIMA PLUS forecasts
bq_arima_forecast_op = BigqueryForecastModelJobOp(
...).set_display_name("generate hourly forecasts").after(get_evaluation_model_metrics_op)
# Generate the ARIMA PLUS forecast explanations
bq_arima_explain_forecast_op = BigqueryExplainForecastModelJobOp(
...).set_display_name("explain hourly forecasts").after(bq_arima_forecast_op)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment