Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming. MLlib is Apache Spark's scalable machine learning library.
After usual installation of pyspark, it usually works via native python IDLE ide. To have a more intuitive development environment, we can use jupyter notebook for spark development.
~/.bashrc (or ~/.zshrc) file.export PYSPARK_DRIVER_PYTHON=jupyterexport PYSPARK_DRIVER_PYTHON_OPTS='notebook'(Note: One important note – When configured to run Jupyter notebook, spark-submit is likely to throw an error executing Jupyter command. Because you set up to launch Jupyter/Spark notebooks with just the pyspark command, it will cause spark-submit to fail. The solution is set the variable to use python, not jupyter:PYSPARK_DRIVER_PYTHON=python
You can interepret and excute the program cell-by-cell. Instead of passing arguments in the command line,
parser = ArgumentParser()
parser.add_argument("--experiment_name", dest="experiment_name", help="experiment_name", default="log_reg_pyspark", required=False)
parser.add_argument("--data_path", dest="data_path", help="data_path", required=True)
parser.add_argument("--parameter", dest="parameter", help="parameter", required=True)
parser.add_argument("--describe", dest="describe", help="Describe data", default=False, action='store_true')
args = parser.parse_args()
pass them as variables.
./bin/spark-class org.apache.spark.deploy.master.Master It will give us an IP:PORT. –For submitting program locally stored in your own system, you don't need to mention IP port. –For our use-case, we use master URL (spark-submit) with: Local[*] - Run Spark locally with as many worker threads as logical cores on your machine. It can be any K number of cores you want to use.spark-submit --master spark://IP port \ example.py –(additional arugments)or, you can copy the programs and it's necessary dependencies on to the cluster (in your assigned directory) and run it locally from there.
Multiclass classification is supported via multinomial logistic (softmax) regression. In multinomial logistic regression, the algorithm produces K sets of coefficients, or a matrix of dimension K×J where K is the number of outcome classes and J is the number of features. If the algorithm is fit with an intercept term then a length K vector of intercepts is available.
Random forests are ensembles of decision trees. Random forests combine many decision trees in order to reduce the risk of overfitting. The spark.ml implementation supports random forests for binary and multiclass classification and for regression, using both continuous and categorical features.
For this tutorial we are going to focus on MLlib libaray and use MLFlow for tracking the spark models. For quick-start on MLflow, go here
The mlflow.spark module provides an API for logging and loading Spark MLlib models. This module exports Spark MLlib models with the following flavors:
Allows models to be loaded as Spark Transformers for scoring in a Spark session. Models with this flavor can be loaded as PySpark PipelineModel objects in Python. This is the main flavor and is always produced. Following are the standard logging functions:
mlflow.spark.load_model(_modeluri, _dfstmpdir=None) Load the Spark MLlib model from the path.
mlflow.spark.log_model(spark_model, artifact_path, conda_env=None, dfs_tmpdir=None, sample_input=None, registered_model_name=None)
Log a Spark MLlib model as an MLflow artifact for the current run. This uses the MLlib persistence format and produces an MLflow Model with the Spark flavor.
You log MLflow metrics with log methods in the Tracking API. The log methods support two alternative methods for distinguishing metric values on the x-axis: timestamp and step. Timestamp is an optional long value that represents the time that the metric was logged. timestamp defaults to the current time. step is an optional integer that represents any measurement of training progress (number of training iterations, number of epochs, and so on). step defaults to 0 and has the following requirements and properties:
The code and examples shown here are exclusively in python using Pyspark.
For this tutorial we are going to
For more information about dataset, go here The data-set is split randomly into 70-30 (cov_train & cov_test) so one part can be used for training and another one for testing the unseen data. *(Note: when using cluster (or spark on top of hadoop( or any hdfs system), the programs must be first copied to hdfs file system hadoop fs -copyFromLocal filename.extension) Additionally, the programs use SparkSession which is supported only from Spark version 2. So in the cluster change the spark version using export SPARK_MAJOR_VERSION=2
The cluster environment may not have MLflow installed on it. Additionally, it won't allow system-wide changes on it. Also to make sure that our project doesn't mess with original directories and scripts. It's necessary to run our program inside python virtual environment (virtualenv).
Both the programs are tailored to read any given csv data-set. To see the data-set into tabular format one has to pass schema of the dataset. In current implementation, the schema is generated using the column names, StructFields and StructTypes (Spark attributes). *(Tip: For getting column names, use Pandasdataframe.columns command which will save all names in an array.)*
Both the programs take csv-train files to log evaluation metrics into MLflow ui.
Common arguments are:
For model specific arguments, check the program as well as official Spark MLlib documentation.
The model in predict function is picked up via run_id of the logged MLflow model. The relative path to the spark model is taken by using formatted string literals or f-strings. If you train the model on cluster and you call predict on a different system, then because of relative path difference you would have to manually pass that model to predict program. Also f-string is not supported on python version 2 (Currently installed on cluster).
For the current task, ideal case would be to use spark cross-validator with our range of parameter grid and record a metric for each of the individual model. But that is currently only supported running on databricks server. MLlib- MLflow integration So the performance_metric program shows an outline for manually tracking the performance over different values (or even timesteps), one hyperparameter at a time, to show the generalized performance of that algorithm. It takes an additional argument
parser.add_argument("--parameter", dest="parameter", help="parameter", required=True)
which in this case can be maxIter, aggregationDepth or regParam. It will iterate over range of values for one of the selected parameter keeping others at a default value. We can even combine these parameters in loops to see other results.