# @hidden_cell
# The project token is an authorization token that is used to access project resources like data sources, connections, and used by platform APIs.
from project_lib import Project
project = Project(spark.sparkContext, 'd446469b-781b-49d5-8b17-544d2907527f', 'p-76d40614129be4047348cc3c6041daafbc357db0')
pc = project.project_context
In this Jupyter notebook, you will use Spark and the Spark machine learning library to build a recommender system for movies with a data set from MovieLens. You'll also learn how to manage Spark performance.
Some familiarity with Python is recommended. This notebook runs on Python 3.6 with Spark.
MovieLens is a project developed by GroupLens, a research laboratory at the University of Minnesota. MovieLens provides an online movie recommender application that uses anonymously-collected data to improve recommender algorithms. Anyone can try the app for free and get movies recommendations. To help people develop the best recommendation algorithms, MovieLens also released several data sets. In this notebook, you'll use the latest data set, which has two sizes.
The full data set consists of more than 24 million ratings across more than 40,000 movies by more than 250,000 users. The file size is kept under 1GB by using indexes instead of full string names.
The small data set that you'll use in this notebook is a subset of the full data set. It's generally a good idea to start building a working program with a small data set to get faster performance while interacting, exploring, and getting errors with your data. When you have a fully working program, you can apply the same code to the larger data set, possibly on a larger cluster of processors. You can also minimize memory consumption by limiting the data volume as much as possible, for example, by using indexes.
Apache Spark’s machine learning library makes practical machine learning scalable and easy. The library consists of common machine learning algorithms and utilities, including classification, regression, clustering, collaborative filtering (this notebook!), dimensionality reduction, lower-level optimization primitives, and higher-level pipeline APIs.
The library has two packages:
spark.mllib
contains the original API that handles data in RDDs. It's in maintenance mode (deprecated), but fully supported.spark.ml
contains a newer API for constructing ML pipelines. It handles data in DataFrames. It's being actively enhanced.You'll use the spark.ml
package in this notebook.
You'll create Spark DataFrames, which are similar to R or pandas DataFrames, but can be distributed on a cluster of Spark executors, which can potentially scale up to thousands of machines. DataFrames are one of the easiest and best performing ways of manipulating data with Spark, but they require structured data in formats or sources such as CSV, Parquet, JSON, or JDBC.
To download the data:
ml-latest.zip
file from MovieLens Latest DataSets.movies.csv
and ratings.csv
.To load the movies.csv file into a Spark DataFrame:
movies.csv
file, click Insert to code > Insert SparkSession DataFrame. The DataFrame is named df_data_1
by default.df_data_n
to movies
in the last row and four rows above that and run the cell.# Click here to create your ratings DataFrame
import ibmos2spark
import ibmos2spark
# @hidden_cell
credentials = {
'endpoint': 'https://s3-api.us-geo.objectstorage.service.networklayer.com',
'service_id': 'iam-ServiceId-684aa356-2d31-434b-829d-460ec7cde735',
'iam_service_endpoint': 'https://iam.ng.bluemix.net/oidc/token',
'api_key': '5dx5qPvfyPEs9sQ_zH_OK8indREdPzEbOySdoDE6cc2t'
}
configuration_name = 'os_55fc726f26764f10977ea25721117b57_configs'
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
movies = spark.read\
.format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
.option('header', 'true')\
.load(cos.url('movies.csv', 'sparkdemo-donotdelete-pr-va3xb7bzcb67ss'))
movies.take(5)
The # @hidden_cell
tag hides your credentials when you use the Share button to share this notebook with people who are not collaborators on your project. Be aware that this tag does NOT prevent those cells from being published when you publish your notebook on GitHub!
Now load the ratings.csv
file. Add the file to the Files pane and load the data in one of the following ways:
df_data_n
to ratings
in the first and last row. Then run the cell.Notice that there's much less code. That's because your credentials are needed only once in a notebook.
# Click here to create your ratings DataFrame
ratings = spark.read\
.format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
.option('header', 'true')\
.load(cos.url('ratings.csv', 'sparkdemo-donotdelete-pr-va3xb7bzcb67ss'))
ratings.take(5)
Spark runs jobs in parallel on an executors cluster. You need a SparkSession instance to submit jobs. In Watson Studio, a SparkSession is automatically created, as spark
.
Show the Spark version:
print(spark.version)
Show the default parallelism factor, which is the default number of partitions if you do not specify one:
print(sc.defaultParallelism)
Here's how you can see the details of the Spark configuration:
print(sc.getConf().toDebugString())
Spark DataFrames are distributed collections across a cluster of Spark executors. One of the key factors for ensuring that a Spark job is well distributed across the cluster is the number of partitions for a DataFrame. Run the getNumPartitions()
method to show the number of partitions for each DataFrame:
print('Number of partitions for the movies DataFrame: {}'.format(movies.rdd.getNumPartitions()))
print('Number of partitions for the ratings DataFrame: {}'.format(ratings.rdd.getNumPartitions()))
Despite the defaultParallelism
, the Spark DataFrameReader that we used only used a single Spark partition, which means all subsequent Spark operations would only run on a single executor.
You can run the repartition()
method to redistribute the ratings data across multiple partitions. But be aware that repartition can involve a great deal of network bandwidth while data is transfered across the cluster! Because Spark DataFrames are immutable, you have to create a new DataFrame to repartition an existing DataFrame.
Create the repartitionedRatings
DataFrame across 10 partitions:
repartitionedRatings = ratings.repartition(10)
print('Number of partitions for the ratings DataFrame: {}'.format(ratings.rdd.getNumPartitions()))
print('Number of partitions for the repartitionedRatings DataFrame: {}'.format(repartitionedRatings.rdd.getNumPartitions()))
One of the key feature of Apache Spark is to leverage the memory as much as possible. While an action is in progress, data is loaded to memory and transformed several times before getting a result. But at the end of the processing, the memory is released. You can use the .cache()
method to retain and persist a data set in memory as soon as you run an action on it.
Load the movies
and ratings
data sets in memory to improve performance:
print('Number of ratings: {}'.format(repartitionedRatings.count()))
repartitionedRatings.cache()
When you run the next cell, count()
is the first action performed on the repartitionedRatings
DataFrame since it was cached. Thus, the data is read from object storage, the repartition()
transformation is applied, and the count()
action is run. But because you ran the cache()
transformation on this DataFrame already, Spark won't release the memory.
print('Number of ratings: {}'.format(repartitionedRatings.count()))
You'll use the Spark DataFrame API and SparkSQL to look at the data. The Spark DataFrame API and SparkSQL are high level APIs to query and transform Spark DataFrames.
Show the content of the DataFrame in a table:
movies.show(truncate=False)
Print the schema of the DataFrame:
movies.printSchema()
By default, the schema shows every column as a string. To override this, you can either explicitly specify the schema or configure the inferSchema
parameter in the Spark CSV DataFrameReader.
Set the inferSchema
parameter and then print the schemas.
Use More option on upper right hand side panel (three dots) and insert project token.If this is the first time you are using a token in this project, you will receive message - No project access token. Below that message click on project settings and create token by clicking new token option from access tokens sections and then insert token in the notebook.
The token will get added to the first cell of your notebook. Copy that token to below cell and run it.
You will then be able to use the get_file_url
method to fetch a file from the object storage using Spark.
from project_lib import Project
project = Project(spark.sparkContext, 'd446469b-781b-49d5-8b17-544d2907527f', 'p-76d40614129be4047348cc3c6041daafbc357db0')
pc = project.project_context
movies_file_name = 'movies.csv'
ratings_file_name = 'ratings.csv'
movies = spark.read.csv(project.get_file_url(movies_file_name), header=True, inferSchema=True).repartition(10).cache()
ratings = spark.read.csv(project.get_file_url(ratings_file_name), header=True, inferSchema=True).repartition(10).cache()
movies.printSchema()
ratings.printSchema()
Run the describe()
method to see the count, mean, standard deviation, minimum, and maximum values for the data in each column:
ratings.describe().show()
Not all of these statistics are actually meaningful!
You can use specific methods from the DataFrame API to compute any statistic:
print('Number of different users: {}'.format(ratings.select('userId').distinct().count()))
print('Number of different movies: {}'.format(ratings.select('movieId').distinct().count()))
print('Number of movies with at least one rating strictly higher than 4: {}'.format(ratings.filter('rating > 4').select('movieId').distinct().count()))
You can also leverage your SQL knowledge to query the data. Spark version 2.0 is ANSI SQL-92 compliant and can run the 99 TPC-DS queries.
Find the number of movies with ratings higher than 4 again, this time with SQL:
ratings.createOrReplaceTempView('ratings')
spark.sql('SELECT COUNT(DISTINCT(movieId)) AS nb FROM ratings WHERE rating > 4').show()
You can even query the data file directly, which might be ambiguous for the CSV file format, but can be useful for self-describing data formats like Parquet that embed the schema definition with the data.
ratings_url = project.get_file_url(ratings_file_name)
sql = 'SELECT * FROM csv.`' + ratings_url + '`'
spark.sql(sql).take(2)
You can easily switch between Spark distributed DataFrames and pandas local DataFrames.
!pip install --upgrade pandas
import pandas as pd
ratings.toPandas().head()
You'll use the Seaborn and matplotlib matplotlib libraries to create graphs. The Seaborn library works with the matplotlib library to graph statistical data.
Install the Seaborn library:
!pip install --upgrade seaborn
Create a graph of the movies reviewed by users:
import seaborn as sns
%matplotlib inline
ratingsPandas = ratings.toPandas()
sns.lmplot(x='userId', y='movieId', data=ratingsPandas, fit_reg=False);
ratingsPandas.head()
This matrix represents all the movies rated by users, but doesn't distinguish the ratings. Improve the graph by choosing a color palette to color the ratings:
sns.palplot(sns.diverging_palette(10, 133, sep=80, n=10))
Create the graph on a larger scale with the color palette:
lm = sns.lmplot(x='userId', y='movieId', hue='rating', data=ratingsPandas, fit_reg=False, height=10, aspect=2, palette=sns.diverging_palette(10, 133, sep=80, n=10))
axes = lm.axes
axes[0, 0].set_ylim(0, 163949) # max movieId is 163949
axes[0, 0].set_xlim(0, 671) # max userId is 671
lm;
On this matrix, you'll notice gaps in the data: some movies and users seem to be missing. This could be because you're using a subset of the data (the small data set).
Nevertheless, you can identify some patterns. Some users always give positive reviews of movies. Some movies are rated a lot, which could be for different reasons, such as the first release of the MovieLens website had a much smaller catalog, or the movies are more famous.
Now visualize the global distribution of ratings with a violin plot:
sns.violinplot([ratingsPandas.rating]);
These are just two examples of what you can achieve with the rich Python visualization libraries ecosystem. Feel free to explore more!
ratingsPandas.groupby('movieId').count().sort_values(by='userId', ascending=False)
movie_356 = ratingsPandas[ratingsPandas.movieId == 356]
movie_356.head()
sns.scatterplot(x="rating", y = "userId", data = movie_356)
sns.countplot(x='rating', data=ratingsPandas)
There are different methods for building a recommender system, such as, user-based, content-based, or collaborative filtering. Collaborative filtering calculates recommendations based on similarities between users and products. For example, collaborative filtering assumes that users who give the similar ratings on the same movies will also have similar opinions on movies that they haven't seen.
The alternating least squares (ALS) algorithm provides collaborative filtering between users and products to find products that the customers might like, based on their previous ratings.
In this case, the ALS algorithm will create a matrix of all users versus all movies. Most cells in the matrix will be empty. An empty cell means the user hasn't reviewed the movie yet. The ALS algorithm will fill in the probable (predicted) ratings, based on similarities between user ratings. The algorithm uses the least squares computation to minimize the estimation errors, and alternates between solving for movie factors and solving for user factors.
The following trivial example gives you an idea of the problem to solve. However, keep in mind that the general problem is much harder because the matrix often has far more missing values.
Check the size of the ratings matrix:
spark.sql('''
SELECT *, 100 * nb_ratings/matrix_size AS percentage
FROM (
SELECT nb_users, nb_movies, nb_ratings, nb_users * nb_movies AS matrix_size
FROM (
SELECT COUNT(*) AS nb_ratings, COUNT(DISTINCT(movieId)) AS nb_movies, COUNT(DISTINCT(userId)) AS nb_users
FROM ratings
)
)
''').toPandas().head()
Less than 2% of the matrix is filled!
Use the SparkML ALS algorithm to train a model to provide recommendations. The mandatory parameters to the ALS algorithm are the columns that identify the users, the items, and the ratings. Run the fit()
method to train the model:
from pyspark.ml.recommendation import ALS
model = ALS(userCol='userId', itemCol='movieId', ratingCol='rating').fit(ratings)
Run the transform()
method to score the model and output a DataFrame with an additional prediction column that shows the predicted rating:
predictions = model.transform(ratings)
predictions.toPandas().head()
You can see that many of the predictions are close to the actual ratings.
After you apply a model to a data set, you should evaluate the performance of the model by comparing the predicted values with the original values. Use the RegressionEvaluator method to compare continuous values with the root mean squared calculation. The root mean squared error (RMSE) calculation measures the average of the squares of the errors between what is estimated and the existing data. The lower the mean squared error value, the more accurate the model.
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
print('The root mean squared error for our model is: {}'.format(evaluator.evaluate(predictions)))
You want the performance score to improve with your design iterations so that the model is improved. But notice that you just ran the training and the scoring on the same data set. That's something that you won't normally do because you usually want to predict values that you don't already know! Therefore, this result is nonsense. To accurately evaluate the model, it's common practice in machine learning to split the data set between a training data set to train the model, and a test data set to compare the predicted results with the original results. This process is called cross-validation. Not doing cross-validation often leads to overfitting, which occurs when the model is too specific to the training data set and does not perform well on a more general data set. Here's the general iterative process of machine learning:
Split your ratings data set between an 80% training data set and a 20% test data set. Then rerun the steps to train the model on the training set, run it on the test set, and evaluate the performance.
(trainingRatings, testRatings) = ratings.randomSplit([80.0, 20.0])
als = ALS(userCol='userId', itemCol='movieId', ratingCol='rating')
model = als.fit(trainingRatings)
predictions = model.transform(testRatings)
predictions.toPandas().head()
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
print('The root mean squared error for our model is: {}'.format(evaluator.evaluate(predictions)))
You might get the value nan
(not a number) from the previous cell.
A NaN result is due to SPARK-14489 and because the model can't predict values for users for which there's no data. A temporary workaround is to exclude rows with predicted NaN values or to replace them with a constant, for instance, the general mean rating. However, to map to a real business problem, the data scientist, in collaboration with the business owner, must define what happens if such an event occurs. For example, you can provide no recommendation for a user until that user rates a few items. Alternatively, before user rates five items, you can use a user-based recommender system that's based on the user's profile (that's another recommender system to develop).
Replace predicted NaN values with the average rating and evaluate the model:
avgRatings = ratings.select('rating').groupBy().avg().first()[0]
print ('The average rating in the dataset is: {}'.format(avgRatings))
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
print ('The root mean squared error for our model is: {}'.format(evaluator.evaluate(predictions.na.fill(avgRatings))))
Now exclude predicted NaN values and evaluate the model:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
print ('The root mean squared error for our model is: {}'.format(evaluator.evaluate(predictions.na.drop())))
Obviously, you get lower performance than with the previous model, but you're protected against overfitting: you will actually get this level of performance on new incoming data!
If you run the randomSplit()
, fit()
, transform()
, and evaluate()
functions several times, you won't always get the same performance score. This is because the randomSplit()
and ALS()
functions have some randomness. To get a more precise performance score, run the model several times and then compute the average performance score. This process is really close to what is called ${k}$-fold cross validation.
Create a repeatALS()
function that trains, runs, and evaluates the model multiple times:
def repeatALS(data, k=3, userCol='userId', itemCol='movieId', ratingCol='rating', metricName='rmse'):
evaluations = []
for i in range(0, k):
(trainingSet, testingSet) = data.randomSplit([k - 1.0, 1.0])
als = ALS(userCol=userCol, itemCol=itemCol, ratingCol=ratingCol)
model = als.fit(trainingSet)
predictions = model.transform(testingSet)
evaluator = RegressionEvaluator(metricName=metricName, labelCol='rating', predictionCol='prediction')
evaluation = evaluator.evaluate(predictions.na.drop())
print('Loop {}: {} = {}'.format(i + 1, metricName, evaluation))
evaluations.append(evaluation)
return sum(evaluations) / float(len(evaluations))
Run repeatALS
four times and average the performance scores:
print('RMSE = {}'.format(repeatALS(ratings, k=4)))
The computed performance score is more stable this way.
Create a kfoldALS()
function that also trains, runs, and evaluates the model, but splits up the data between training and testing data sets in a different way. The original data set is split into k data sets. Each of the k iterations of the function uses a different data set for testing and the other data sets for training.
def kfoldALS(data, k=3, userCol='userId', itemCol='movieId', ratingCol='rating', metricName='rmse'):
evaluations = []
weights = [1.0] * k
splits = data.randomSplit(weights)
for i in range(0, k):
testingSet = splits[i]
trainingSet = spark.createDataFrame(sc.emptyRDD(), data.schema)
for j in range(0, k):
if i == j:
continue
else:
trainingSet = trainingSet.union(splits[j])
als = ALS(userCol=userCol, itemCol=itemCol, ratingCol=ratingCol)
model = als.fit(trainingSet)
predictions = model.transform(testingSet)
evaluator = RegressionEvaluator(metricName=metricName, labelCol='rating', predictionCol='prediction')
evaluation = evaluator.evaluate(predictions.na.drop())
print('Loop {}: {} = {}'.format(i + 1, metricName, evaluation))
evaluations.append(evaluation)
return sum(evaluations) / float(len(evaluations))
Compute the average performance score for 4 folds:
print('RMSE = {}'.format(kfoldALS(ratings, k=4)))
Now compute the average performance score for 10 folds:
print('RMSE = {}'.format(kfoldALS(ratings, k=10)))
Recall that you want to minimize the RMSE. And that 10 folds means a training set of 90% of the data, while 4 folds means a training training set of 75% of the data. If you choose a value too small for ${k}$, you will run into the selection bias issue. On the other hand, too big value for ${k}$ means overfitting - high variance and low bias. Therefore, choosing the right ${k}$ is important. Usually, the recommended value of ${k}$ is between 5 and 10, however, this can change based on the data set.
So now, how can you improve this model? Machine learning algorithms have hyperparameters that control how the algorithm works.
The ALS algorithm has this signature:
class pyspark.ml.recommendation.ALS(
rank=10,
maxIter=10,
regParam=0.1,
numUserBlocks=10,
numItemBlocks=10,
implicitPrefs=false,
alpha=1.0,
userCol="user",
itemCol="item",
seed=None,
ratingCol="rating",
nonnegative=false,
checkpointInterval=10,
intermediateStorageLevel="MEMORY_AND_DISK",
finalStorageLevel="MEMORY_AND_DISK"
)
The ALS hyperparameters are:
rank
= the number of latent factors in the modelmaxIter
= the maximum number of iterations regParam
= the regularization parameterTo test several values for those hyperparameters and choose the best configuration, it's common practice to define a grid of parameter combinations and to run a grid search over the combinations to evaluate the resulting models and comparing their performance. This process is known as model selection.
The Spark CrossValidator function performs a grid search as well as k-fold cross validation.
Run the CrossValidator function with multiple values for rank
and regParam
:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
(trainingRatings, validationRatings) = ratings.randomSplit([80.0, 20.0])
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
paramGrid = ParamGridBuilder().addGrid(als.rank, [1, 5, 10]).addGrid(als.maxIter, [20]).addGrid(als.regParam, [0.05, 0.1, 0.5]).build()
crossval = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=10)
cvModel = crossval.fit(trainingRatings)
predictions = cvModel.transform(validationRatings)
print('The root mean squared error for our model is: {}'.format(evaluator.evaluate(predictions.na.drop())))
The more folds and parameters you add to the grid, the longer it takes to test any combination. The CrossValidator
model contains more information about the performance for each combination that you can get with the avgMetrics()
method. For example, you can graph the results on a plot for analysis.
Unfortunately, because of the SPARK-14489 issue mentioned above, the CrossValidator function can't compute the root mean squared error most of the time and provides incorrect results. You could limit this problem by making the training set much larger than the test set, but that's not a good practice. If you want to learn more about this issue, which is more a conceptual one than a technical one, you can have a look at Nick Pentreath's pull request #12896(https://github.com/apache/spark/pull/12896). Welcome to the Open Source world!
To recommend movies for a specific user, create a function that applies the trained model, ALSModel
, on the list of movies that the user hasn't yet rated.
Create a recommendMovies
function:
from pyspark.sql.functions import lit
def recommendMovies(model, user, nbRecommendations):
# Create a Spark DataFrame with the specified user and all the movies listed in the ratings DataFrame
dataSet = ratings.select('movieId').distinct().withColumn('userId', lit(user))
# Create a Spark DataFrame with the movies that have already been rated by this user
moviesAlreadyRated = ratings.filter(ratings.userId == user).select('movieId', 'userId')
# Apply the recommender system to the data set without the already rated movies to predict ratings
predictions = model.transform(dataSet.subtract(moviesAlreadyRated)).dropna().orderBy('prediction', ascending=False).limit(nbRecommendations).select('movieId', 'prediction')
# Join with the movies DataFrame to get the movies titles and genres
recommendations = predictions.join(movies, predictions.movieId == movies.movieId).select(predictions.movieId, movies.title, movies.genres, predictions.prediction)
# recommendations.show(truncate=False)
return recommendations
Now run this function to recommend 10 movies for three different users:
print('Recommendations for user 133:')
recommendMovies(model, 133, 10).toPandas()
print('Recommendations for user 471:')
recommendMovies(model, 471, 10).toPandas()
print('Recommendations for user 496:')
recommendMovies(model, 496, 10).toPandas()
Here are some suggestions to improve your model:
Victor Hatinguais specializes in technology evangelism, training, demos, and proofs of concept around Data Science and Big Data technologies, such as, Apache Hadoop and IBM BigInsights, Apache Spark, IBM Streams, and IBM SPSS.
F. Maxwell Harper and Joseph A. Konstan. 2015. The MovieLens Datasets: History and Context. ACM Transactions on Interactive Intelligent Systems (TiiS) 5, 4, Article 19 (December 2015), 19 pages. DOI=http://dx.doi.org/10.1145/2827872