Type: Evaluation Essays
Sample donated: Derrick Nunez
Last updated: October 1, 2019
AbstractApache Spark is a scalable distributed framework for large-scale data pro-cessing and was originally developed by AMPLab. 4 On top of Spark core, lies MLlib, which is a distributed framework for machine learning. It provides com-mon learning algorithms as well as APIs for Scala, Python and Java 5, making it convenient to use. This seminar introduces MLlib and shows how it can facilitate machine learning in large scale in distributed environments.1 IntroductionThe need to process an increasing amount of data requires different approaches than the traditional ones.
The approach of relational database management sys-tems are not suitable given their main memory or disk-based computation 3. MapReduce in Hadoop also is strictly disk-based and consequently slow. Apache Spark addresses these limitations as it is a fast, in-memory and general-purpose cluster-computing framework 3. Spark provides four types of tools: SparkSQL, GraphX, Spark Streaming and MLlib.
They are all built on top of Spark core. MLlib, a distributed machine learning library in Spark, provides tools to run ma-chine learning algorithms on huge datasets 3. In addition to providing all com-mon machine learning algorithms, finding any specific needs can be met given its open-source nature 5. Utilizing the concept of Resilient Distributed Datasets (RDD) and data parallelism, MLlib provides fast and scalable solutions for com-mon machine learning techniques. 1 Spark provides two types of library for machine learning, namely Spark.ml and Mllib.
Spark.ml is not an official name for the machine learning library provided by Spark. It is mainly used to distin-guish the DataFrame-based API (Spark.ml) and the RDD-based API (MLlib)Using a master/worker architecture, Spark clusters are made of a driver program and multiple executors. SparkContext which represents the connection to a clus-ter as well as the Driver are in a master node, and executor processes are in a worker node.
SparkContext object contains another configuration object called SparkConf. It can be used to generate and modify datasets.One of the core concepts around which Spark is built is Resilient Distributed Dataset (RDD). It is an encapsulation of a very large dataset which abstracts the complexity of distributed processing of the data as well as achieving fault toler-ance. It assures the achievement of fault-tolerance as well as splitting of data to each node and it is a dataset. 1Once RDDs are created, one could either apply some Transformation or Action to it. Transformation applies an operation to an existing dataset and generates a new one whereas an Action deals with the actual dataset 2. Transformations in Spark are done in a lazy manner 1, meaning that they are not executed until an action requires the respective transformation, which triggers the construction of the Directed Acyclic Graph (DAG ) 9.
In Spark, each executor node has an independent copy of the variables used in the respective function. Hence the variables are in a sense ‘stateless’ as the changes to these variables on the executors are not propagated back to the driv-er.3 Spark provides two types of shared variables, namely Broadcast variables and Accumulators, which can be utilized to address the inefficiency of shared read-write variables across different tasks1.• Broadcast variables: They are large/frequently accessed data which are made available to the nodes in the cluster. They are read-only and cached on each machine which reduces network costs. This means that the data is available to each node whenever needed and it does not need be re-transmitted over the network.1broadcastVar = sc.
broadcast(1, 2, 3)broadcastVar.value• Accumulators: As opposed to broadcast variables which are read-only, one could add values to accumulators. Given the global accumulated value is computed in parallel, in order for it to be computed correctly, Spark restricts the tasks of a cluster to adding.
This means that only the driver program can read the value of the accumulator.1accum = sc.accumulator(0)sc.parallelize(1, 2, 3, 4).foreach(lambda x: ac-cum.add(x))accum.
value1.1.5 Data typesData types in MLlib are supported depending on their distribution, meaning on a single machine, local vectors and local matrices are used, and for distributed storage, distributed matrices are used 1. The choice of format in distributed matrices plays a crucial role when storing data because when wrongly formatted, shuffling would be required by each ac-cess, which is a very expensive operation in Spark. Distributed matrices are di-vided into following categories 1:• Row Matrix: Row-oriented distributed matrix without indices where rows indicate a local vector• Indexed Row Matrix: same as Row Matrix but with Index• Coordinate matrix: it is a matrix in coordinate format• Block matrix: it is embedded with several matricesBefore applying machine learning algorithms, the data needs to be converted to mentioned data types. There are two types of vectors, namely sparse and dense vectors. Sparse vectors would lead to optimization in terms of storage as well as computation time 2.
A labeled point represents a data point in machine learn-ing. It represents a label (which is to be predicted) and a (set of) feature(s) 1. In MLlib, Numpy’s array and python’s list are considered as dense vectors. The single column variant of SciPy as well as SparseVector are considered as sparse vectors.
Just like RDDs, DataFrames are distributed datasets but with some additional benefits. As illustrated in figure 3, one issue of RDD-based API was the fact that it did not provide uniform behavior between the chosen API. One of the main purposes of DataFrame-based API was to address this issue. 1Pipelines refer to a sequence of actions which have to be done in order (e.g.
, data cleansing, transforming). A pipeline is a workflow containing Transformers and Estimators. Transformers and estimators both receive a DataFrame as input, with the difference that a transformer converts a DataFrame to another one (e.g.
, ap-pending a new column), whereas an estimator is a learning model which trains a transformer. In some ML algorithms, some parameters are required. For example, logistic regression requires maxIter as well as regParam as parameter. In a pipe-line one could try different values for these parameters with different settings. This could be done without pipelines as well, but using pipelines makes the ap-plication more maintainable and easier to understand. 1Figure 4 illustrates a three stage pipeline where the boxes in blue are transform-ers and the one in red is an estimator and the cylinders indicate DataFrames.
The Tokenizer receives a raw text document and returns split words. HashingTF pro-duces vectors from the output of the tokenizer. Lastly, the estimator (logistic regression) trains a logistic regression model which is a transformer. 1Figure 5 illustrates the transformer returned by the previous step. ML pipeline is an important concept in large-scale machine learning. Figure 6 illustrates the example mentioned in figure 4 in a different form. Even though there are only three components between load data and evaluate component, each of these components has a few parameters. If these components are to be reused with different data sources or to be evaluated with different evaluators, then it would be a cumbersome process to copy the code relating to each component to the respective project.
In addition, each component would probably have a few pa-rameters, and when doing ML tuning, it is required to change these parameters and compare the performance. ML pipelines provide an encapsulation around several components which can enhance maintainability and reusability of ML components. 1Before the introduction of Spark, data scientists used to train the ML algorithms using statistical languages such as R, and when big data was involved, data engi-neers implemented this models on Hadoop infrastructure. This is not an ideal approach considering accuracy, scalability, efficiency and more importantly the execution time. MLlib is aimed to solve the mentioned issues 3. It supports various machine learning techniques such as Classification, Regression, Cluster-ing (K-means, LDA, Bisecting k-means, GMM), Collaborative filtering, FP-Growth. 12.1.
Data Transformation and processingTokenizationA tokenizer receives a string as input and splits it into words. MLlib provides two types of tokenizers. • The Tokenizer class first converts the input to lower case, then separates it by whitespaces. • Regular expression tokenizer (Regextokenizer class) is an advanced to-kenizer which uses a pattern (regex) To separate the input which can be used when the input should not be separat-ed by whitespace. User can also use the regex pattern to denote the tokens by setting the gap parameter to false.tokenizer = Tokenizer(inputCol=”sentence”, out-putCol=”words”)regexTokenizer = RegexTokenizer(inputCol=”sentence”, outputCol=”words”, pattern=”W”)Most common words in a language are referred to as ‘stopwords’. These words should be filtered out before further processing.
Stopwordsremover makes this possible by using the output of the tokenizer and filtering these stopwords from the output. StringIndexer indexes a column of strings based on their frequency (0 is the highest). 12.2. Feature transformation2.2.1.
TF-IDFIn information retrieval and text mining, TF-IDF(Term frequency-inverse document frequency) refers to a feature vectorization method aiming to reflect how important a word is in a document in the corpus and it is calculated with this formula:D| is the total number of documents in the corpus and DF(t,D) is the number of documents containing t. For increasing flexibility, in MLlib, TF and IDF are separated. 1For TF two methods can be used:• HashingTF: Using Hashing Trick, HashingTF is a transformer which converts a set of terms to feature vectors of fixed-length.• CountVectorizer: in case there is no defined dictionary, countVectorizer could be used to extract vocabulary and converts a collection of text documents into vectors of term counts.
IDF (Inverse Document Frequency) scales columns of feature vectors cre-ated by HashingTF or CountVectorizer. IDF increases the weight of terms that occur rarely. 1It transforms documents into distributed vector representations which are mainly used in natural language processing applications, such as parsing, tagging and machine translation.
1An n-gram is a sequence of n items in a given sequence of strings. It receives a sequence of strings and returns a sequence of n-grams. 1DCT converts a sequence in the time domain to another one in the frequency domain, where the sequence is length N real-valued 1.It executes the SQL statements. As shown below, the ‘select’ can be a field, con-stant or expressions supported by SparkSQL.
1The structure of statements accepted by SQLTransformer: SELECT a, a + b AS a_b FROM __THIS__SELECT a, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5SELECT a, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, bThe python code for SQLTransformer is:LSH is hashing technique which uses a family of functions to hash data points into buckets, in a way that data points which are far from one another are placed into different buckets and the ones close to each other placed in the same bucket with high probability. 1It is useful for dimensionality reduction. It takes a column as input and adds hashed values as a new column. 12.
2.7.2. Approximate Similarity JoinIt takes two data sets as input as well as a threshold defined by the user, and re-turns pairs of rows whose distance are smaller than the defined threshold. It also adds a distance column which indicates the true distance between each pair of rows in the output.
18.104.22.168. Approximate Nearest Neighbor SearchGiven a dataset and a key, it approximately returns a specified number of rows (in the dataset) which are closest to the key.
1Logistic regression is a probabilistic 2 predictive analytics technique. In MLlib there are two types of logistic regression, namely binomial logistic regression and multinomial logistic regression. They are used to predict a binary class or a mul-ticlass outcome, respectively. A parameter is used to specify which algorithm to use, though if not specified, Spark will infer the correct algorithm for the task.
In MLlib, extracting a summary of the model over the training set is possible. In multinomial logistic regression, the algorithm produces K (the number of out-come of the classes) sets of coefficients, or a K*J matrix where J denotes the number of features. If the algorithm is fit with an intercept term, a length K vector of intercepts is available. The softmax function models the conditional probabili-ties of outcome classes k?1,2,…,K.
Naïve Bayes classifiers are probabilistic classifiers. By applying Bayes’ theorem, they assume strong independence between the features, which is where the name ‘naïve’ comes. The two variants of naïve Bayes, namely multinomial and Ber-noulli, are supported in Spark.ml. 1Generalized linear regression is a flexible generalization of linear regression where the distribution of the response variable is of natural exponential family. The form of a natural exponential family distribution is given as:The Generalized Linear Regression interface of SSpark supports specifications of generalized linear models that can be used for prediction problems like linear regression, passion regression and others.
1 Nevertheless there are some limita-tions in Spark’s implementation, e.g. only 4096 features are supported 3 and therefore the algorithm would throw an exception if the features were more. The GLM implementation of Spark allows for diagnosing the models, e.g. residuals, deviance … 1 The available families of responses in GLM includes: Gaussian, Binomial, Poisson, Gamma, Tweedie.
A very common classification method, decision tree is a predictive analysis which is easy to interpret, and can perform on datasets which are not scaled 2 however, overfitting issue is common in decision trees 3.MLlib division of data by row means a scalable implementation and can be done in a distributed manner. Beside the prediction label, MLlib provides predicted probability of each class (for classification) or biased sample variance of predic-tion (for regression). Binary class and multiclass classification as well as regres-sion are supported in MLlib. There are some differences between MLlib and Spark.ml implementation of Spark, including the support for pipelines (Spark.ml), distinguish continuous and categorical data via meta-data of Data-Frames.
Inputs are: labelCol (label to predict) and featureCol (feature vector) and outputs are: predictionCol which is the predicted label, rawPredictionCol which is a vector of length of number of classes, with the counts of training instance labels at the tree node which makes the prediction, and probabilityCol which is a vector of length of the number of classes equal to rawPrediction normalized to a multinomial distributionClustering is one of the techniques used in unsupervised machine learning where similar entities in a dataset are grouped in the same cluster. MLlib provides dif-ferent models including K-means, LDA and Gaussian mixture model 3.K-means is an algorithm which requires the number of clusters in advance. Spark.ml supports this and Bisecting K-means (hierarchical clustering) 1 as well as the more scalable variant as well as Scalable K-means++ 6. It receives a feature vector named featuresCol as input and outputs the predicted cluster cen-ter named predictionCol.
1Is a topic modeling technique used in NLP which is applicable to large text da-tasets. 1Sometimes in order to make use of large high-dimensional data, it might be necessary to leave out some variables and features 3. MLliblib supports two types of dimensionality reduction 1:• Singular value decomposition (SVD) which receives a matrix decompose it to three4 matrices with top k values: U= m X k orthonormal matrix? is k X k non negative matrix in descending orderV is n X k orthonormal matrix • Principal component analysis (PCA) which extracts k principal compo-nents (uncorrelated variables) from matrix k. The k principal component will have largest possible variance because every principal component is calculated in a way that it has the largest variance 1 2.A recommender system aims to predict the preference of a user 3.
The most widely used are collaborative filtering and content-based filtering as well as hy-brid ones 2.Collaborative filtering is an approach for creating recommender systems 1. There are two types of collaborative filtering, namely item-based and user-based, which uses the preference or rating of many users on items, or similarity of items, to generate some recommendation 2. In model-based methods, which is sup-ported in Spark.ml, user-item ratings are used in order to estimate the new ratings where the model is applied in unknown user-item combinations. 1A recommender system aims to predict the preference of a user 3. The most widely used are collaborative filtering and content-based filtering as well as hy-brid ones 2.Collaborative filtering is an approach for creating recommender systems 1.
There are two types of collaborative filtering, namely item-based and user-based, which uses the preference or rating of many users on items, or similarity of items, to generate some recommendation 2. In model-based methods, which is sup-ported in Spark.ml, user-item ratings are used in order to estimate the new ratings where the model is applied in unknown user-item combinations. 1Cold-start problemSometimes, it is possible that some data in test data was not present in the test data and lack of enough data would affect the quality of the algorithm. This hap-pens when there are new entries in the data which have not been present in the training dataset. This could also happen when in cross-validation the data is ran-domly split. Spark will assign NaN prediction label to this type of data if they are not present in the model 1.2.
3.9. Frequent pattern mining Finding frequent patterns is one of the initial tasks when dealing with big data. Association Rule Discovery is a method for finding relations in big datasets.
It can be seen as the problem of detecting frequent patterns in an itemset 7 There is a number of algorithms addressing the frequent pattern mining problem including apriori algorithm and partition algorithm 7.Mllib supports a parallel implementation of FP-growth (PFP) 1 algorithm which addresses the essential problems of apriori algorithm, namely handling number of candidate sets as well as scanning databases multiple times. 8 Spark.ml implementation of FP-growth takes the following parameters:minSupport: the threshold to identify an itemset as frequentminConfidence: used for generating association rulenumPartitions: the number of partitionsFP-growth algorithm performs the following three steps. Firstly, it constructs a tree structure for frequent patterns and identifies frequent items. Then the tree is used for encoding transactions. FP-growth reduces the search cost by using a partition-based, divide and conquer method.
82.3.10. ML tuningOptimizing an ml application both in run time and memory usage is referred to as ML Tuning 3. In production environment, it is generally required to know the performance of the model on unseen data in advance. 2 Cross validation is the process where the data is divided into two splits, one for training the other for evaluating 2. This way, the outcome of the algorithm could be foreseen on un-seen data (evaluating data) 2.
A common method is k-fold cross-validation, where the data is divided into k folds which do not overlap 1. One part would be used to train the model and the other for evaluation. 2 A cheaper alternative to cross-validation is train-validation, where the training evaluation pair is eval-uated only once, leading to a less accurate but cheaper result. 1This seminar presented some of the features of MLlib. It is recommended to use DataFrame-based API, not only because of the features it brings, but also because of the fact that the RDD-based API will be deprecated in future releases 1.
Given some of the algorithms of RDD-based API are not yet implemented in the DataFrame-based API 3, one could use a combination of implementation of the algorithms in both APIs. MLlib performs well on large datasets because it benefits from iterative computation design of Spark. Users can conveniently learn working with MLlib using the extensive documentation provided. In addi-tion, the implementation of pipelines can result in better maintainability and easier reusability of the components, especially in large-scale problems.