1. Introduction
Machine Learning (ML) model development does not end with training and validation. As the say goes “you cannot improve what you don’t measure” – to continuously deliver business value, ML models must be deployed to production, monitored, and maintained. This process is known as MLOps (Machine Learning Operations) – a set of principles for streamlined design, implementation, deployment, and maintenance of ML models.
MLOps stages definition
In this hands-on tutorial, we’ll demonstrate MLOps concepts using Databricks and the California Housing Prices dataset. This dataset contains 8 features (latitude, longitude, housing_median_age, total_rooms, total_bedrooms, population, households, median_income, ocean_proximity) and a target variable (median_house_value). Our goal is to predict house values based on some (or all) of the given features.
End-to-end Machine Learning Workflow
2. Data Preparation
Incomplete, inaccurate, and inconsistent data leads to false conclusions and predictions. So, cleaning and understanding the data has a high impact on the quality of the results.
2.1 Exploratory Data Analysis
First step is to perform exploratory data analysis (EDA) on the dataset to account for null values, duplicated rows and account for multicollinearity. The dataset consists of 20.640 (Figure 1) entries. The feature total_bedrooms has 207 null values which can be safely dropped. Duplicated rows were also dropped.
Figure 1: Information on the dataset
2.2 Feature Engineering and registering to the Feature Store
All the features are numerical except ocean_proximity which is categorical with 5 categories. One Hot Encoding will be applied to this feature. Then we’ll proceed computing the Pearson’s correlation among all the features to check for multicollinearity (Figure 2).
Figure 2: Heatmap correlation plot
Pearson’s correlation revealed a few features with high multicollinearity. To address this, the rooms_per_bedroom (total_rooms/total_bedrooms) and population_per_house (population/ households) features were created (Figure 3).
data['rooms_per_bedroom'] = data['total_rooms'] / data['total_bedrooms']
data['population_per_household'] = data['population'] / data['households']
Figure 3: Creating new features to tackle multicollinearity.
The clean dataset was split, keeping 20% separate for inference after model development. The remaining data was registered in the Feature Store in Databricks (Figure 4a). The Feature Store is a centralized repository to store features that can be used for model training and inference ensuring reproducibility (Figure 4b).
table_name = 'housing_feature_table'
from databricks import feature_store
from databricks.feature_store import feature_table
fs = feature_store.FeatureStoreClient()
# Drop table if exists
fs.drop_table(table_name)
fs.create_table(
name=table_name,
primary_keys=['id'],
df = spark.createDataFrame(X_train).drop('median_house_value'),
schema = spark.createDataFrame(X_train).drop('median_house_value').schema,
description="Input for Califorbia house pricing model"
)
Figure 4a: Register features to the Feature Store in Databricks.
Figure 4b: Registered features. The target variable ‘median_house_value’ isn not registered.
For machine learning lifecycle management, we will use MLflow, an open-source platform developed by Databricks. This platform provides the necessary tools for tracking experiments, model versioning, packaging code, and deploying models. All these enable us to implement MLOps strategies more effectively.
3. Modeling: Train a machine learning model and hyperparameter tuning
For model development, we will implement a LightGBM Regression model since we have to predict a continuous variable. To measure the accuracy of the machine learning model, the adjusted R-squared was used. This is a statistical measure that represents the proportion of the variance for a dependent variable that’s explained by the independent variables. Compared to the traditional R-squared which tends to grow as we add more features in our model, the adjusted R-squared accounts for that, hence providing a more reliable accuracy result. It takes values between 0 and 1, with 0 indicating that the response variable cannot be explained by the model’s feature variables, while 1 indicating that the response variable can be perfectly explained by the model’s feature variables. Alongside the adjusted R-squared, the Root Mean Squared Error (RMSE) is reported. RMSE is a common metric used in regression tasks to measure the error of a model and has the advantage of being expressed in the same units as the target variable.
3.1 Model Training and Hyperparameter Tuning
For model training, we will use the features registered in the Feature Store to build the training dataset, which will be further split into training (70%) and test (30%) sets for modeling. Initially, we developed a baseline LightGBM regression model. Following this, we performed hyperparameter tuning with the goal of improving the selected performance metrics (via Bayesian Optimization using the hyperopt library).
A great feature of MLFlow is tracking experiments and runs. We use this to compare parameters and results under the ‘Experiments‘ section in Databricks (Figure 6). As can be seen from Figure 4, the different runs can be compared. In this example, the improvement of the adjusted R2 metric is very small; from 0.83 to 0.84 after model tuning. However, still the tuned model is better than the baseline one, I will promote it to the Model Registry and proceed with further checks.
source_file_input='housing_training'
table_name='housing_feature_table'
target='median_house_value'
data = spark.read.table(source_file_input)
model_feature_lookups = [FeatureLookup(table_name=table_name, lookup_key='id')]
# fs.create_training_set looks up features in model_feature_lookups that match the primary key from inference_data_df
training_set = fs.create_training_set(data[['id','median_house_value']], model_feature_lookups, label=target, exclude_columns=['id'])
training_df = training_set.load_df().toPandas()
Figure 5: Creating training set from Feature Store in Databricks
Figure 6: Comparing runs from LightGBM Regressor with default parameters and LightGBM Regressor after hyperparameter tuning
4. MLFlow Model Registry: From development to production
Model Registry provides a centralized model store for managing model lifecycle stage transitions from development to production. The most performing model is pushed to the Model Registry where the stage ‘None’ is assigned to it (Figure 7). After that, the model is requested to transition to the ‘Staging’ stage. A webhook for the event "TRANSITION_REQUEST_TO_STAGING_CREATED" is set up and a job is triggered that performs certain model tests ensuring the robustness of the model. These tests include:
- has_artifacts: model has registered artifacts
- has_description: contains appropriate description
- has_signature: contains input signature (infer_signature)
- predicts: is able to make predictions
When all these tests are successful then the model is into the ‘Staging’ stage. In order to be able to use the model and make future predictions, it has to be moved in the ‘Production’ stage of the Model Registry.
To make this transition, a webhook for the event “TRANSITION_REQUEST_TO_PRODUCTION_CREATED” is set up. This triggers a job that makes the following checks:
- Whether the performance metric of the model is above a certain threshold (here is set to 0.8)
- Whether there is already a model in production
In case the second check comes out positive, the newer model is required to have a higher performance than the one already productionized. If either or none of these checks pass, then a job that accounts for data drift is triggered (Figure 8).
# Push model in the Model Registry (None stage)
model_uri = f"runs:/{run_lgb_hyperopt.info.run_id}/hp_lgb_model"
model_name = "housing_california_model"
model_details = mlflow.register_model(model_uri, model_name)
client.update_registered_model(
name=model_details.name,
description="This model predicts the price for houses in California."
)
client.update_model_version(
name=model_details.name,
version=model_details.version,
description="This model version was built using LightGBM with Hyperopt hyperparameter optimization."
)
Figure 7: Push model to Model Registry and add description for the model and its respective version.
if (((metric_r2 > 0.8 and in_production==False)) or (in_production==True and metric_r2 > metric_r2_prod)):
print(f"The model {model_name} is robust can be used for making inferences.")
approve_request_body = {'name': model_name,
'version': model_version_stage,
'stage': 'Production',
'archive_existing_versions': 'true',
'comment': 'The model shows good performance with Adjusted R2 above 0.8. Move to Production!'}
mlflow_call_endpoint('transition-requests/approve', 'POST', json.dumps(approve_request_body))
else:
print(f'The model version {model_version_stage} cannot be moved to Production. Check for potential Data Drift!')
job_json = {"job_id": '676578267809590'}
auth_json = {"Authorization": f"Bearer {token}"}
response = requests.post("https://eastus-c3.azuredatabricks.net/api/2.1/jobs/run-now", json=job_json, headers=auth_json).json()
reject_request_body = {'name': model_name,
'version': model_version_stage,
'stage': 'Production',
'comment': 'The adjusted R2 metrics for the model is below 0.8 or the current model in Production has better performance. Check for potential drift and retrain.'}
mlflow_call_endpoint('transition-requests/reject', 'POST', json.dumps(reject_request_body))
Figure 8: Part of the job that is triggered by the event “TRANSITION_REQUEST_TO_PRODUCTION_CREATED“.
Transition of Model Stages in the Model Registry
5. Inference and model monitoring
The last step after having a robust model in the ‘Production’ stage, is to make new predictions on unseen data. During this step, we setup a job ‘inference’, which reads new data and makes predictions on top of it. Upon new predictions, if the performance of the model falls below the set threshold (in this case 0.8), then we check for potential data drift (Figure 9). In this case, and dependent on the nature of the drift, appropriate actions have to take place. These range from retraining the model with more data or running a root cause analysis to understand why the underlying data distribution changed.
It is worth noting that there are cases in which, though a drift exists, the performance of the model is not affected. In other words, the drift has no impact on the business.
if r2_adj < 0.8:
print(f"Adjusted R2 is {r2_adj}; Check for potential drift!")
job_json = {"job_id": '676578267809590'}
auth_json = {"Authorization": f"Bearer {token}"}
response = requests.post("https://eastus-c3.azuredatabricks.net/api/2.1/jobs/run-now", json=job_json, headers=auth_json).json()
print(response)
else:
print('R2 is above 0.8, do not check for data drift.')
Figure 9: If adjusted R2 is below 0.8 then trigger the job to check for data drift.
In this tutorial, To demonstrate drift detection, artificial drift was introduced in the feature variable median_income (Figure 10). This caused the adjusted R2 to fall below the 0.8 threshold, triggering the job to check for drift (Figure 11).
Figure 10: Distribution of the feature variable median_income before and after introduction of drift.
Figure 11: When the performance metric adjusted R2 is below a predefined threshold (0.8 here) a job that checks for drift is triggered.
For drift detection, the Kolmogorov-Smirnov non-parametric statistical test is performed. The null hypothesis (H0) is that two variable distributions come from the same reference distribution. If the p-value is below 0.05 then the null hypothesis is rejected and a drift is detected since the two variables come from different distributions (Figure 12).
Figure 12: Drift detected in the median_income variable. The p-values are adjusted for multiple hypothesis testing.
6. Databricks CI/CD using Github Actions
For a proper MLOps project, CI/CD has to exist. In a real business scenario, we assume that the proper infrastructure (created with IaC!) in Databricks already exists, and it contains at least two environment (development and production).
For CI/CD, we used Github Actions that upon a pull request of a branch to the main branch, are triggered to go through three stages:
- DEV: Runs a workflow called ‘eda_modeling’ that does the exploratory data analysis, modeling and promoting of the best model to the Model Registry.
- TEST: Runs a workflow called ‘job-model-tests’ that includes the model tests for the transitions in the ‘Staging’ and ‘Production’ stages in the Model Registry.
- PROD: Runs a workflow ‘inference’ for batch inference against new data.
The .yml file for the Github Actions workflow is summarized below (it is advised not be be blindly copy-pasted but to be adapted accordingly):
name: databricks-cicd
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
main:
runs-on: ubuntu-latest
# Name of the environment you created
environment: DEV
steps:
- name: Check out repository
uses: actions/checkout@v3
- name: Install Databricks CLI
run: |
curl -fsSL https://raw.githubusercontent.com/databricks/setup-cli/main/install.sh | sh
# Run a databricks job given its name
- name: Run databricks job "eda_modelling"
env:
DATABRICKS_HOST: ${{ vars.DATABRICKS_HOST }}
DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}
JOB_NAME: 'eda_modeling'
WAIT_TIMEOUT: '20m0s'
run: |
JOB_ID=$(databricks jobs list | grep -w $JOB_NAME | awk -F ' ' '{print $1}')
databricks jobs run-now $JOB_ID --timeout $WAIT_TIMEOUT
test:
needs: main
runs-on: ubuntu-latest
# Name of the environment you created
environment: TEST
steps:
- name: Check out repository
uses: actions/checkout@v3
- name: Install Databricks CLI
run: |
curl -fsSL https://raw.githubusercontent.com/databricks/setup-cli/main/install.sh | sh
# Run a databricks job given its name
- name: Run databricks job "job-model-tests"
env:
DATABRICKS_HOST: ${{ vars.DATABRICKS_HOST }}
DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}
JOB_NAME: 'job-model-tests'
WAIT_TIMEOUT: '20m0s'
run: |
JOB_ID=$(databricks jobs list | grep -w $JOB_NAME | awk -F ' ' '{print $1}')
databricks jobs run-now $JOB_ID --timeout $WAIT_TIMEOUT
prod:
needs: TEST
runs-on: ubuntu-latest
# Name of the environment you created
environment: PROD
steps:
- name: Check out repository
uses: actions/checkout@v3
- name: Install Databricks CLI
run: |
curl -fsSL https://raw.githubusercontent.com/databricks/setup-cli/main/install.sh | sh
# Run a databricks job given its name
- name: Run databricks job "inference"
env:
DATABRICKS_HOST: ${{ vars.DATABRICKS_HOST }}
DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}
JOB_NAME: 'inference'
WAIT_TIMEOUT: '20m0s'
run: |
JOB_ID=$(databricks jobs list | grep -w $JOB_NAME | awk -F ' ' '{print $1}')
databricks jobs run-now $JOB_ID --timeout $WAIT_TIMEOUT
The DATABRICKS_HOST and DATABRICKS_TOKEN should reflect and host and token in the development Databricks environment for the stages MAIN and TEST and the production environment for the PROD stage.