Contents

Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming. MLlib is Apache Spark's scalable machine learning library.

Install Pyspark on Jupyter notebook

After usual installation of pyspark, it usually works via native python IDLE ide. To have a more intuitive development environment, we can use jupyter notebook for spark development.

In linux

(Note: One important note – When configured to run Jupyter notebook, spark-submit is likely to throw an error executing Jupyter command. Because you set up to launch Jupyter/Spark notebooks with just the pyspark command, it will cause spark-submit to fail. The solution is set the variable to use python, not jupyter:PYSPARK_DRIVER_PYTHON=python

Locally in Jupyter notebook

You can interepret and excute the program cell-by-cell. Instead of passing arguments in the command line,

parser = ArgumentParser()
parser.add_argument("--experiment_name", dest="experiment_name", help="experiment_name", default="log_reg_pyspark", required=False)
parser.add_argument("--data_path", dest="data_path", help="data_path", required=True)
parser.add_argument("--parameter", dest="parameter", help="parameter", required=True)
parser.add_argument("--describe", dest="describe", help="Describe data", default=False, action='store_true')
args = parser.parse_args()

pass them as variables.

Locally in a stand-alone cluster

or, you can copy the programs and it's necessary dependencies on to the cluster (in your assigned directory) and run it locally from there.

Multinomial logistic regression

Multiclass classification is supported via multinomial logistic (softmax) regression. In multinomial logistic regression, the algorithm produces K sets of coefficients, or a matrix of dimension K×J where K is the number of outcome classes and J is the number of features. If the algorithm is fit with an intercept term then a length K vector of intercepts is available.

Random Forests

Random forests are ensembles of decision trees. Random forests combine many decision trees in order to reduce the risk of overfitting. The spark.ml implementation supports random forests for binary and multiclass classification and for regression, using both continuous and categorical features.

For this tutorial we are going to focus on MLlib libaray and use MLFlow for tracking the spark models. For quick-start on MLflow, go here

The mlflow.spark module provides an API for logging and loading Spark MLlib models. This module exports Spark MLlib models with the following flavors:

Spark MLlib (native) format

Allows models to be loaded as Spark Transformers for scoring in a Spark session. Models with this flavor can be loaded as PySpark PipelineModel objects in Python. This is the main flavor and is always produced. Following are the standard logging functions:

mlflow.spark.load_model(_modeluri, _dfstmpdir=None) Load the Spark MLlib model from the path.

mlflow.spark.log_model(spark_model, artifact_path, conda_env=None, dfs_tmpdir=None, sample_input=None, registered_model_name=None)

Log a Spark MLlib model as an MLflow artifact for the current run. This uses the MLlib persistence format and produces an MLflow Model with the Spark flavor.

Performance tracking with metrics

You log MLflow metrics with log methods in the Tracking API. The log methods support two alternative methods for distinguishing metric values on the x-axis: timestamp and step. Timestamp is an optional long value that represents the time that the metric was logged. timestamp defaults to the current time. step is an optional integer that represents any measurement of training progress (number of training iterations, number of epochs, and so on). step defaults to 0 and has the following requirements and properties:

The code and examples shown here are exclusively in python using Pyspark.

For this tutorial we are going to

For more information about dataset, go here The data-set is split randomly into 70-30 (cov_train & cov_test) so one part can be used for training and another one for testing the unseen data. *(Note: when using cluster (or spark on top of hadoop( or any hdfs system), the programs must be first copied to hdfs file system hadoop fs -copyFromLocal filename.extension) Additionally, the programs use SparkSession which is supported only from Spark version 2. So in the cluster change the spark version using export SPARK_MAJOR_VERSION=2

Virtual enviornment

The cluster environment may not have MLflow installed on it. Additionally, it won't allow system-wide changes on it. Also to make sure that our project doesn't mess with original directories and scripts. It's necessary to run our program inside python virtual environment (virtualenv).

StructField & StructTypes

Both the programs are tailored to read any given csv data-set. To see the data-set into tabular format one has to pass schema of the dataset. In current implementation, the schema is generated using the column names, StructFields and StructTypes (Spark attributes). *(Tip: For getting column names, use Pandasdataframe.columns command which will save all names in an array.)*

Both the programs take csv-train files to log evaluation metrics into MLflow ui.

Parameters & Arguments

Common arguments are:

For model specific arguments, check the program as well as official Spark MLlib documentation.

Note for running predict program

The model in predict function is picked up via run_id of the logged MLflow model. The relative path to the spark model is taken by using formatted string literals or f-strings. If you train the model on cluster and you call predict on a different system, then because of relative path difference you would have to manually pass that model to predict program. Also f-string is not supported on python version 2 (Currently installed on cluster).

Performance tracking

For the current task, ideal case would be to use spark cross-validator with our range of parameter grid and record a metric for each of the individual model. But that is currently only supported running on databricks server. MLlib- MLflow integration So the performance_metric program shows an outline for manually tracking the performance over different values (or even timesteps), one hyperparameter at a time, to show the generalized performance of that algorithm. It takes an additional argument

parser.add_argument("--parameter", dest="parameter", help="parameter", required=True)

which in this case can be maxIter, aggregationDepth or regParam. It will iterate over range of values for one of the selected parameter keeping others at a default value. We can even combine these parameters in loops to see other results.