In [ ]:
# @hidden_cell
# The project token is an authorization token that is used to access project resources like data sources, connections, and used by platform APIs.
from project_lib import Project
project = Project(spark.sparkContext, 'd446469b-781b-49d5-8b17-544d2907527f', 'p-76d40614129be4047348cc3c6041daafbc357db0')
pc = project.project_context

Movie recommender system with Spark machine learning

In this Jupyter notebook, you will use Spark and the Spark machine learning library to build a recommender system for movies with a data set from MovieLens. You'll also learn how to manage Spark performance.

Some familiarity with Python is recommended. This notebook runs on Python 3.6 with Spark.

MovieLens

MovieLens is a project developed by GroupLens, a research laboratory at the University of Minnesota. MovieLens provides an online movie recommender application that uses anonymously-collected data to improve recommender algorithms. Anyone can try the app for free and get movies recommendations. To help people develop the best recommendation algorithms, MovieLens also released several data sets. In this notebook, you'll use the latest data set, which has two sizes.

The full data set consists of more than 24 million ratings across more than 40,000 movies by more than 250,000 users. The file size is kept under 1GB by using indexes instead of full string names.

The small data set that you'll use in this notebook is a subset of the full data set. It's generally a good idea to start building a working program with a small data set to get faster performance while interacting, exploring, and getting errors with your data. When you have a fully working program, you can apply the same code to the larger data set, possibly on a larger cluster of processors. You can also minimize memory consumption by limiting the data volume as much as possible, for example, by using indexes.

Spark machine learning library

Apache Spark’s machine learning library makes practical machine learning scalable and easy. The library consists of common machine learning algorithms and utilities, including classification, regression, clustering, collaborative filtering (this notebook!), dimensionality reduction, lower-level optimization primitives, and higher-level pipeline APIs.

The library has two packages:

  • spark.mllib contains the original API that handles data in RDDs. It's in maintenance mode (deprecated), but fully supported.
  • spark.ml contains a newer API for constructing ML pipelines. It handles data in DataFrames. It's being actively enhanced.

You'll use the spark.ml package in this notebook.

1. Load the data

You'll create Spark DataFrames, which are similar to R or pandas DataFrames, but can be distributed on a cluster of Spark executors, which can potentially scale up to thousands of machines. DataFrames are one of the easiest and best performing ways of manipulating data with Spark, but they require structured data in formats or sources such as CSV, Parquet, JSON, or JDBC.

1.1 Download the data from MovieLens

To download the data:

  1. Download the small version of the ml-latest.zip file from MovieLens Latest DataSets.
  2. Unzip the file. The files you'll be using are movies.csv and ratings.csv.

1.2 Load movies.csv

To load the movies.csv file into a Spark DataFrame:

  1. Click the Data button on the notebook action bar (the 1010 button at the top right of this page). Drop the files into the box or browse to select the files. The files are loaded to your object storage and also appear in the Data Assets section of your project.
  2. Click in the following code cell.
  3. Under the movies.csv file, click Insert to code > Insert SparkSession DataFrame. The DataFrame is named df_data_1 by default.
  4. Rename the DataFrame by changing df_data_n to movies in the last row and four rows above that and run the cell.
In [1]:
# Click here to create your ratings DataFrame
import ibmos2spark
import ibmos2spark
# @hidden_cell
credentials = {
    'endpoint': 'https://s3-api.us-geo.objectstorage.service.networklayer.com',
    'service_id': 'iam-ServiceId-684aa356-2d31-434b-829d-460ec7cde735',
    'iam_service_endpoint': 'https://iam.ng.bluemix.net/oidc/token',
    'api_key': '5dx5qPvfyPEs9sQ_zH_OK8indREdPzEbOySdoDE6cc2t'
}

configuration_name = 'os_55fc726f26764f10977ea25721117b57_configs'
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
movies = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(cos.url('movies.csv', 'sparkdemo-donotdelete-pr-va3xb7bzcb67ss'))
movies.take(5)
Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20200121181609-0000
KERNEL_ID = 13fc7203-f9a3-455e-a805-fb9d1dfe9e0c
Out[1]:
[Row(movieId='1', title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy'),
 Row(movieId='2', title='Jumanji (1995)', genres='Adventure|Children|Fantasy'),
 Row(movieId='3', title='Grumpier Old Men (1995)', genres='Comedy|Romance'),
 Row(movieId='4', title='Waiting to Exhale (1995)', genres='Comedy|Drama|Romance'),
 Row(movieId='5', title='Father of the Bride Part II (1995)', genres='Comedy')]

The # @hidden_cell tag hides your credentials when you use the Share button to share this notebook with people who are not collaborators on your project. Be aware that this tag does NOT prevent those cells from being published when you publish your notebook on GitHub!

1.3 Load ratings.csv

Now load the ratings.csv file. Add the file to the Files pane and load the data in one of the following ways:

  • Run Insert to code > Insert SparkSession DataFrame and then rename the DataFrame by changing df_data_n to ratings in the first and last row. Then run the cell.
  • Run the code in the next cell.

Notice that there's much less code. That's because your credentials are needed only once in a notebook.

In [2]:
# Click here to create your ratings DataFrame
ratings = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(cos.url('ratings.csv', 'sparkdemo-donotdelete-pr-va3xb7bzcb67ss'))
ratings.take(5)
Out[2]:
[Row(userId='1', movieId='1', rating='4.0', timestamp='964982703'),
 Row(userId='1', movieId='3', rating='4.0', timestamp='964981247'),
 Row(userId='1', movieId='6', rating='4.0', timestamp='964982224'),
 Row(userId='1', movieId='47', rating='5.0', timestamp='964983815'),
 Row(userId='1', movieId='50', rating='5.0', timestamp='964982931')]

2. Spark performance basics

Spark runs jobs in parallel on an executors cluster. You need a SparkSession instance to submit jobs. In Watson Studio, a SparkSession is automatically created, as spark.

Show the Spark version:

In [3]:
print(spark.version)
2.3.3

Show the default parallelism factor, which is the default number of partitions if you do not specify one:

In [4]:
print(sc.defaultParallelism)
2

Here's how you can see the details of the Spark configuration:

In [5]:
print(sc.getConf().toDebugString())
spark.app.id=app-20200121181609-0000
spark.app.name=pyspark-shell
spark.authenticate=true
spark.authenticate.enableSaslEncryption=true
spark.authenticate.secret=a654ceb1-71c5-4682-b107-7b95cdb36645
spark.driver.cores=1
spark.driver.extraClassPath=/usr/local/share/jupyter/kernels/scala/lib/*:/home/spark/shared/user-libs/spark2/*:/home/spark/shared/user-libs/common/*:/home/spark/shared/user-libs/connectors/*:/opt/ibm/connectors/db2/*:/opt/ibm/connectors/others-db-drivers/*:/opt/ibm/third-party/libs/spark2/*:/opt/ibm/third-party/libs/common/*:/opt/ibm/third-party/libs/connectors/*
spark.driver.extraJavaOptions= -Dderby.system.home=/home/spark/.local/share/jupyter/runtime/kernel-13fc7203-f9a3-455e-a805-fb9d1dfe9e0c-20200121_181305 -Dlog4j.logFile=/home/spark/shared/logs/kernel-python3.6-python3.6-20200121_181305.log -Dlog4j.configuration=file:/opt/ibm/jkg/log4j/log4j.properties -Dfile.encoding=UTF-8 
spark.driver.host=172.30.228.245
spark.driver.memory=4g
spark.driver.port=41215
spark.dynamicAllocation.enabled=false
spark.eventLog.dir=file:///home/spark/shared/spark-events
spark.eventLog.enabled=true
spark.executor.cores=1
spark.executor.extraClassPath=/usr/local/share/jupyter/kernels/scala/lib/*:/home/spark/shared/user-libs/spark2/*:/home/spark/shared/user-libs/common/*:/home/spark/shared/user-libs/connectors/*:/opt/ibm/connectors/db2/*:/opt/ibm/connectors/others-db-drivers/*:/opt/ibm/third-party/libs/spark2/*:/opt/ibm/third-party/libs/common/*:/opt/ibm/third-party/libs/connectors/*
spark.executor.id=driver
spark.executor.instances=1
spark.executor.memory=4g
spark.hadoop.fs.cos.impl=com.ibm.stocator.fs.ObjectStoreFileSystem
spark.hadoop.fs.s3a.fast.upload=true
spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.multipart.size=33554432
spark.hadoop.fs.stocator.cos.impl=com.ibm.stocator.fs.cos.COSAPIClient
spark.hadoop.fs.stocator.cos.scheme=cos
spark.hadoop.fs.stocator.glob.bracket.support=true
spark.hadoop.fs.stocator.scheme.list=cos
spark.history.fs.logDirectory=/home/spark/shared/spark-event
spark.history.ui.port=18080
spark.master=spark://jkg-deployment-13fc7203-f9a3-455e-a805-fb9d1dfe9e0c-6c4868vntd7:7077
spark.master.ui.port=8080
spark.network.crypto.enabled=true
spark.network.crypto.keyLength=256
spark.r.command=/opt/ibm/conda/R/bin/Rscript
spark.rdd.compress=True
spark.serializer.objectStreamReset=100
spark.shuffle.service.enabled=false
spark.shuffle.service.port=7337
spark.sql.catalogImplementation=hive
spark.submit.deployMode=client
spark.ui.enabled=true
spark.ui.killEnabled=false
spark.ui.port=4040
spark.ui.proxyBase=/proxy/app-20200121181609-0000
spark.ui.reverseProxy=true
spark.ui.showConsoleProgress=true
spark.worker.ui.port=8081

Spark DataFrames are distributed collections across a cluster of Spark executors. One of the key factors for ensuring that a Spark job is well distributed across the cluster is the number of partitions for a DataFrame. Run the getNumPartitions() method to show the number of partitions for each DataFrame:

In [6]:
print('Number of partitions for the movies DataFrame: {}'.format(movies.rdd.getNumPartitions()))
print('Number of partitions for the ratings DataFrame: {}'.format(ratings.rdd.getNumPartitions()))
Number of partitions for the movies DataFrame: 1
Number of partitions for the ratings DataFrame: 1

Despite the defaultParallelism, the Spark DataFrameReader that we used only used a single Spark partition, which means all subsequent Spark operations would only run on a single executor.

You can run the repartition() method to redistribute the ratings data across multiple partitions. But be aware that repartition can involve a great deal of network bandwidth while data is transfered across the cluster! Because Spark DataFrames are immutable, you have to create a new DataFrame to repartition an existing DataFrame.

Create the repartitionedRatings DataFrame across 10 partitions:

In [7]:
repartitionedRatings = ratings.repartition(10)
print('Number of partitions for the ratings DataFrame: {}'.format(ratings.rdd.getNumPartitions()))
print('Number of partitions for the repartitionedRatings DataFrame: {}'.format(repartitionedRatings.rdd.getNumPartitions()))
Number of partitions for the ratings DataFrame: 1
Number of partitions for the repartitionedRatings DataFrame: 10

One of the key feature of Apache Spark is to leverage the memory as much as possible. While an action is in progress, data is loaded to memory and transformed several times before getting a result. But at the end of the processing, the memory is released. You can use the .cache() method to retain and persist a data set in memory as soon as you run an action on it.

Load the movies and ratings data sets in memory to improve performance:

In [8]:
print('Number of ratings: {}'.format(repartitionedRatings.count()))
Number of ratings: 100836
In [9]:
repartitionedRatings.cache()
Out[9]:
DataFrame[userId: string, movieId: string, rating: string, timestamp: string]

When you run the next cell, count() is the first action performed on the repartitionedRatings DataFrame since it was cached. Thus, the data is read from object storage, the repartition() transformation is applied, and the count() action is run. But because you ran the cache() transformation on this DataFrame already, Spark won't release the memory.

In [10]:
print('Number of ratings: {}'.format(repartitionedRatings.count()))
Number of ratings: 100836

3. Explore the data with Spark APIs

You'll use the Spark DataFrame API and SparkSQL to look at the data. The Spark DataFrame API and SparkSQL are high level APIs to query and transform Spark DataFrames.

Show the content of the DataFrame in a table:

In [11]:
movies.show(truncate=False)
+-------+-------------------------------------+-------------------------------------------+
|movieId|title                                |genres                                     |
+-------+-------------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                     |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                       |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)              |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)             |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)   |Comedy                                     |
|6      |Heat (1995)                          |Action|Crime|Thriller                      |
|7      |Sabrina (1995)                       |Comedy|Romance                             |
|8      |Tom and Huck (1995)                  |Adventure|Children                         |
|9      |Sudden Death (1995)                  |Action                                     |
|10     |GoldenEye (1995)                     |Action|Adventure|Thriller                  |
|11     |American President, The (1995)       |Comedy|Drama|Romance                       |
|12     |Dracula: Dead and Loving It (1995)   |Comedy|Horror                              |
|13     |Balto (1995)                         |Adventure|Animation|Children               |
|14     |Nixon (1995)                         |Drama                                      |
|15     |Cutthroat Island (1995)              |Action|Adventure|Romance                   |
|16     |Casino (1995)                        |Crime|Drama                                |
|17     |Sense and Sensibility (1995)         |Drama|Romance                              |
|18     |Four Rooms (1995)                    |Comedy                                     |
|19     |Ace Ventura: When Nature Calls (1995)|Comedy                                     |
|20     |Money Train (1995)                   |Action|Comedy|Crime|Drama|Thriller         |
+-------+-------------------------------------+-------------------------------------------+
only showing top 20 rows

Print the schema of the DataFrame:

In [12]:
movies.printSchema()
root
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

By default, the schema shows every column as a string. To override this, you can either explicitly specify the schema or configure the inferSchema parameter in the Spark CSV DataFrameReader.

Set the inferSchema parameter and then print the schemas.

Use More option on upper right hand side panel (three dots) and insert project token.If this is the first time you are using a token in this project, you will receive message - No project access token. Below that message click on project settings and create token by clicking new token option from access tokens sections and then insert token in the notebook.

The token will get added to the first cell of your notebook. Copy that token to below cell and run it. You will then be able to use the get_file_url method to fetch a file from the object storage using Spark.

In [23]:
from project_lib import Project
project = Project(spark.sparkContext, 'd446469b-781b-49d5-8b17-544d2907527f', 'p-76d40614129be4047348cc3c6041daafbc357db0')
pc = project.project_context
In [24]:
movies_file_name = 'movies.csv'
ratings_file_name = 'ratings.csv'

movies = spark.read.csv(project.get_file_url(movies_file_name), header=True, inferSchema=True).repartition(10).cache()
ratings = spark.read.csv(project.get_file_url(ratings_file_name), header=True, inferSchema=True).repartition(10).cache()
In [25]:
movies.printSchema()
ratings.printSchema()
root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)

Run the describe() method to see the count, mean, standard deviation, minimum, and maximum values for the data in each column:

In [26]:
ratings.describe().show()
+-------+------------------+----------------+------------------+--------------------+
|summary|            userId|         movieId|            rating|           timestamp|
+-------+------------------+----------------+------------------+--------------------+
|  count|            100836|          100836|            100836|              100836|
|   mean|326.12756356856676|19435.2957177992| 3.501556983616962|1.2059460873684695E9|
| stddev| 182.6184914634999|35530.9871987002|1.0425292390606344| 2.162610359951315E8|
|    min|                 1|               1|               0.5|           828124615|
|    max|               610|          193609|               5.0|          1537799250|
+-------+------------------+----------------+------------------+--------------------+

Not all of these statistics are actually meaningful!

You can use specific methods from the DataFrame API to compute any statistic:

In [27]:
print('Number of different users: {}'.format(ratings.select('userId').distinct().count()))
print('Number of different movies: {}'.format(ratings.select('movieId').distinct().count()))
print('Number of movies with at least one rating strictly higher than 4: {}'.format(ratings.filter('rating > 4').select('movieId').distinct().count()))
Number of different users: 610
Number of different movies: 9724
Number of movies with at least one rating strictly higher than 4: 4056

You can also leverage your SQL knowledge to query the data. Spark version 2.0 is ANSI SQL-92 compliant and can run the 99 TPC-DS queries.

Find the number of movies with ratings higher than 4 again, this time with SQL:

In [28]:
ratings.createOrReplaceTempView('ratings')
spark.sql('SELECT COUNT(DISTINCT(movieId)) AS nb FROM ratings WHERE rating > 4').show()
+----+
|  nb|
+----+
|4056|
+----+

You can even query the data file directly, which might be ambiguous for the CSV file format, but can be useful for self-describing data formats like Parquet that embed the schema definition with the data.

In [29]:
ratings_url = project.get_file_url(ratings_file_name)
sql = 'SELECT * FROM csv.`' + ratings_url + '`'
spark.sql(sql).take(2)
Out[29]:
[Row(_c0='userId', _c1='movieId', _c2='rating', _c3='timestamp'),
 Row(_c0='1', _c1='1', _c2='4.0', _c3='964982703')]

You can easily switch between Spark distributed DataFrames and pandas local DataFrames.

In [30]:
!pip install --upgrade pandas
Collecting pandas
  Downloading https://files.pythonhosted.org/packages/52/3f/f6a428599e0d4497e1595030965b5ba455fd8ade6e977e3c819973c4b41d/pandas-0.25.3-cp36-cp36m-manylinux1_x86_64.whl (10.4MB)
    100% |################################| 10.4MB 1.2MB/s eta 0:00:01   57% |##################              | 6.0MB 68.6MB/s eta 0:00:01
Collecting pytz>=2017.2 (from pandas)
  Downloading https://files.pythonhosted.org/packages/e7/f9/f0b53f88060247251bf481fa6ea62cd0d25bf1b11a87888e53ce5b7c8ad2/pytz-2019.3-py2.py3-none-any.whl (509kB)
    100% |################################| 512kB 3.5MB/s eta 0:00:01
Collecting numpy>=1.13.3 (from pandas)
  Downloading https://files.pythonhosted.org/packages/62/20/4d43e141b5bc426ba38274933ef8e76e85c7adea2c321ecf9ebf7421cedf/numpy-1.18.1-cp36-cp36m-manylinux1_x86_64.whl (20.1MB)
    100% |################################| 20.2MB 814kB/s eta 0:00:01
Collecting python-dateutil>=2.6.1 (from pandas)
  Downloading https://files.pythonhosted.org/packages/d4/70/d60450c3dd48ef87586924207ae8907090de0b306af2bce5d134d78615cb/python_dateutil-2.8.1-py2.py3-none-any.whl (227kB)
    100% |################################| 235kB 3.5MB/s eta 0:00:01
Collecting six>=1.5 (from python-dateutil>=2.6.1->pandas)
  Downloading https://files.pythonhosted.org/packages/65/eb/1f97cb97bfc2390a276969c6fae16075da282f5058082d4cb10c6c5c1dba/six-1.14.0-py2.py3-none-any.whl
tensorflow 1.13.1 requires tensorboard<1.14.0,>=1.13.0, which is not installed.
Installing collected packages: pytz, numpy, six, python-dateutil, pandas
Successfully installed numpy-1.18.1 pandas-0.25.3 python-dateutil-2.8.1 pytz-2019.3 six-1.14.0
In [31]:
import pandas as pd

ratings.toPandas().head()
Out[31]:
userId movieId rating timestamp
0 249 5803 3.0 1354225800
1 610 84772 3.5 1493846852
2 599 1982 3.0 1498524557
3 468 317 3.0 831400519
4 169 5873 4.5 1059429313

4. Visualize the data

You'll use the Seaborn and matplotlib matplotlib libraries to create graphs. The Seaborn library works with the matplotlib library to graph statistical data.

Install the Seaborn library:

In [32]:
!pip install --upgrade seaborn
Collecting seaborn
  Downloading https://files.pythonhosted.org/packages/a8/76/220ba4420459d9c4c9c9587c6ce607bf56c25b3d3d2de62056efe482dadc/seaborn-0.9.0-py3-none-any.whl (208kB)
    100% |################################| 215kB 3.2MB/s eta 0:00:01
Collecting scipy>=0.14.0 (from seaborn)
  Downloading https://files.pythonhosted.org/packages/dc/29/162476fd44203116e7980cfbd9352eef9db37c49445d1fec35509022f6aa/scipy-1.4.1-cp36-cp36m-manylinux1_x86_64.whl (26.1MB)
    100% |################################| 26.1MB 680kB/s eta 0:00:01
Collecting pandas>=0.15.2 (from seaborn)
  Using cached https://files.pythonhosted.org/packages/52/3f/f6a428599e0d4497e1595030965b5ba455fd8ade6e977e3c819973c4b41d/pandas-0.25.3-cp36-cp36m-manylinux1_x86_64.whl
Collecting numpy>=1.9.3 (from seaborn)
  Using cached https://files.pythonhosted.org/packages/62/20/4d43e141b5bc426ba38274933ef8e76e85c7adea2c321ecf9ebf7421cedf/numpy-1.18.1-cp36-cp36m-manylinux1_x86_64.whl
Collecting matplotlib>=1.4.3 (from seaborn)
  Downloading https://files.pythonhosted.org/packages/4e/11/06958a2b895a3853206dea1fb2a5b11bf044f626f90745987612af9c8f2c/matplotlib-3.1.2-cp36-cp36m-manylinux1_x86_64.whl (13.1MB)
    100% |################################| 13.1MB 1.2MB/s eta 0:00:01
Collecting python-dateutil>=2.6.1 (from pandas>=0.15.2->seaborn)
  Using cached https://files.pythonhosted.org/packages/d4/70/d60450c3dd48ef87586924207ae8907090de0b306af2bce5d134d78615cb/python_dateutil-2.8.1-py2.py3-none-any.whl
Collecting pytz>=2017.2 (from pandas>=0.15.2->seaborn)
  Using cached https://files.pythonhosted.org/packages/e7/f9/f0b53f88060247251bf481fa6ea62cd0d25bf1b11a87888e53ce5b7c8ad2/pytz-2019.3-py2.py3-none-any.whl
Collecting pyparsing!=2.0.4,!=2.1.2,!=2.1.6,>=2.0.1 (from matplotlib>=1.4.3->seaborn)
  Downloading https://files.pythonhosted.org/packages/5d/bc/1e58593167fade7b544bfe9502a26dc860940a79ab306e651e7f13be68c2/pyparsing-2.4.6-py2.py3-none-any.whl (67kB)
    100% |################################| 71kB 2.6MB/s eta 0:00:01
Collecting cycler>=0.10 (from matplotlib>=1.4.3->seaborn)
  Downloading https://files.pythonhosted.org/packages/f7/d2/e07d3ebb2bd7af696440ce7e754c59dd546ffe1bbe732c8ab68b9c834e61/cycler-0.10.0-py2.py3-none-any.whl
Collecting kiwisolver>=1.0.1 (from matplotlib>=1.4.3->seaborn)
  Downloading https://files.pythonhosted.org/packages/f8/a1/5742b56282449b1c0968197f63eae486eca2c35dcd334bab75ad524e0de1/kiwisolver-1.1.0-cp36-cp36m-manylinux1_x86_64.whl (90kB)
    100% |################################| 92kB 3.3MB/s eta 0:00:01
Collecting six>=1.5 (from python-dateutil>=2.6.1->pandas>=0.15.2->seaborn)
  Using cached https://files.pythonhosted.org/packages/65/eb/1f97cb97bfc2390a276969c6fae16075da282f5058082d4cb10c6c5c1dba/six-1.14.0-py2.py3-none-any.whl
Collecting setuptools (from kiwisolver>=1.0.1->matplotlib>=1.4.3->seaborn)
  Downloading https://files.pythonhosted.org/packages/a7/c5/6c1acea1b4ea88b86b03280f3fde1efa04fefecd4e7d2af13e602661cde4/setuptools-45.1.0-py3-none-any.whl (583kB)
    100% |################################| 593kB 3.8MB/s eta 0:00:01
tensorflow 1.13.1 requires tensorboard<1.14.0,>=1.13.0, which is not installed.
Installing collected packages: numpy, scipy, six, python-dateutil, pytz, pandas, pyparsing, cycler, setuptools, kiwisolver, matplotlib, seaborn
Successfully installed cycler-0.10.0 kiwisolver-1.1.0 matplotlib-3.1.2 numpy-1.18.1 pandas-0.25.3 pyparsing-2.4.6 python-dateutil-2.8.1 pytz-2019.3 scipy-1.4.1 seaborn-0.9.0 setuptools-45.1.0 six-1.14.0
Exception:
Traceback (most recent call last):
  File "/opt/ibm/conda/miniconda3.6/lib/python3.6/site-packages/pip/_internal/cli/base_command.py", line 176, in main
    status = self.run(options, args)
  File "/opt/ibm/conda/miniconda3.6/lib/python3.6/site-packages/pip/_internal/commands/install.py", line 441, in run
    options.target_dir, target_temp_dir, options.upgrade
  File "/opt/ibm/conda/miniconda3.6/lib/python3.6/site-packages/pip/_internal/commands/install.py", line 492, in _handle_target_dir
    shutil.rmtree(target_item_dir)
  File "/home/spark/conda/envs/python3.6/lib/python3.6/shutil.py", line 486, in rmtree
    _rmtree_safe_fd(fd, path, onerror)
  File "/home/spark/conda/envs/python3.6/lib/python3.6/shutil.py", line 428, in _rmtree_safe_fd
    onerror(os.rmdir, fullname, sys.exc_info())
  File "/home/spark/conda/envs/python3.6/lib/python3.6/shutil.py", line 426, in _rmtree_safe_fd
    os.rmdir(name, dir_fd=topfd)
OSError: [Errno 39] Directory not empty: 'util'

Create a graph of the movies reviewed by users:

In [33]:
import seaborn as sns
%matplotlib inline

ratingsPandas = ratings.toPandas()
sns.lmplot(x='userId', y='movieId', data=ratingsPandas, fit_reg=False);
In [37]:
ratingsPandas.head()
Out[37]:
userId movieId rating timestamp
0 249 5803 3.0 1354225800
1 610 84772 3.5 1493846852
2 599 1982 3.0 1498524557
3 468 317 3.0 831400519
4 169 5873 4.5 1059429313

This matrix represents all the movies rated by users, but doesn't distinguish the ratings. Improve the graph by choosing a color palette to color the ratings:

In [34]:
sns.palplot(sns.diverging_palette(10, 133, sep=80, n=10))

Create the graph on a larger scale with the color palette:

In [35]:
lm = sns.lmplot(x='userId', y='movieId', hue='rating', data=ratingsPandas, fit_reg=False, height=10, aspect=2, palette=sns.diverging_palette(10, 133, sep=80, n=10))
axes = lm.axes
axes[0, 0].set_ylim(0, 163949) # max movieId is 163949
axes[0, 0].set_xlim(0, 671) # max userId is 671
lm;

On this matrix, you'll notice gaps in the data: some movies and users seem to be missing. This could be because you're using a subset of the data (the small data set).

Nevertheless, you can identify some patterns. Some users always give positive reviews of movies. Some movies are rated a lot, which could be for different reasons, such as the first release of the MovieLens website had a much smaller catalog, or the movies are more famous.

Now visualize the global distribution of ratings with a violin plot:

In [36]:
sns.violinplot([ratingsPandas.rating]);

These are just two examples of what you can achieve with the rich Python visualization libraries ecosystem. Feel free to explore more!

In [58]:
ratingsPandas.groupby('movieId').count().sort_values(by='userId', ascending=False)
Out[58]:
userId rating timestamp
movieId
356 329 329 329
318 317 317 317
296 307 307 307
593 279 279 279
2571 278 278 278
... ... ... ...
4093 1 1 1
4089 1 1 1
58351 1 1 1
4083 1 1 1
193609 1 1 1

9724 rows × 3 columns

In [60]:
movie_356 = ratingsPandas[ratingsPandas.movieId == 356]
movie_356.head()
Out[60]:
userId movieId rating timestamp
1014 57 356 4.0 965795650
1281 588 356 3.0 839316241
1404 42 356 5.0 996218222
1440 1 356 4.0 964980962
1920 434 356 4.5 1270604589
In [65]:
sns.scatterplot(x="rating", y = "userId", data = movie_356)
Out[65]:
<matplotlib.axes._subplots.AxesSubplot at 0x7fb2871bd8d0>
In [70]:
sns.countplot(x='rating', data=ratingsPandas)
Out[70]:
<matplotlib.axes._subplots.AxesSubplot at 0x7fb28426aba8>

5. Build the recommender system

There are different methods for building a recommender system, such as, user-based, content-based, or collaborative filtering. Collaborative filtering calculates recommendations based on similarities between users and products. For example, collaborative filtering assumes that users who give the similar ratings on the same movies will also have similar opinions on movies that they haven't seen.

The alternating least squares (ALS) algorithm provides collaborative filtering between users and products to find products that the customers might like, based on their previous ratings.

In this case, the ALS algorithm will create a matrix of all users versus all movies. Most cells in the matrix will be empty. An empty cell means the user hasn't reviewed the movie yet. The ALS algorithm will fill in the probable (predicted) ratings, based on similarities between user ratings. The algorithm uses the least squares computation to minimize the estimation errors, and alternates between solving for movie factors and solving for user factors.

The following trivial example gives you an idea of the problem to solve. However, keep in mind that the general problem is much harder because the matrix often has far more missing values.

User\Product matrix

Check the size of the ratings matrix:

In [27]:
spark.sql('''
    SELECT *, 100 * nb_ratings/matrix_size AS percentage
    FROM (
        SELECT nb_users, nb_movies, nb_ratings, nb_users * nb_movies AS matrix_size
        FROM (
            SELECT COUNT(*) AS nb_ratings, COUNT(DISTINCT(movieId)) AS nb_movies, COUNT(DISTINCT(userId)) AS nb_users
            FROM ratings
        )
    )
''').toPandas().head()
Out[27]:
nb_users nb_movies nb_ratings matrix_size percentage
0 610 9724 100836 5931640 1.699968

Less than 2% of the matrix is filled!

5.1 Train the model

Use the SparkML ALS algorithm to train a model to provide recommendations. The mandatory parameters to the ALS algorithm are the columns that identify the users, the items, and the ratings. Run the fit() method to train the model:

In [28]:
from pyspark.ml.recommendation import ALS

model = ALS(userCol='userId', itemCol='movieId', ratingCol='rating').fit(ratings)

5.2 Run the model

Run the transform() method to score the model and output a DataFrame with an additional prediction column that shows the predicted rating:

In [29]:
predictions = model.transform(ratings)
predictions.toPandas().head()
Out[29]:
userId movieId rating timestamp prediction
0 191 148 5.0 829760897 4.921849
1 133 471 4.0 843491793 3.274433
2 597 471 2.0 941558175 3.926369
3 385 471 4.0 850766697 3.330407
4 436 471 3.0 833530187 3.446549

You can see that many of the predictions are close to the actual ratings.

5.3 Evaluate the model

After you apply a model to a data set, you should evaluate the performance of the model by comparing the predicted values with the original values. Use the RegressionEvaluator method to compare continuous values with the root mean squared calculation. The root mean squared error (RMSE) calculation measures the average of the squares of the errors between what is estimated and the existing data. The lower the mean squared error value, the more accurate the model.

In [30]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
print('The root mean squared error for our model is: {}'.format(evaluator.evaluate(predictions)))
The root mean squared error for our model is: 0.5932807901774939

You want the performance score to improve with your design iterations so that the model is improved. But notice that you just ran the training and the scoring on the same data set. That's something that you won't normally do because you usually want to predict values that you don't already know! Therefore, this result is nonsense. To accurately evaluate the model, it's common practice in machine learning to split the data set between a training data set to train the model, and a test data set to compare the predicted results with the original results. This process is called cross-validation. Not doing cross-validation often leads to overfitting, which occurs when the model is too specific to the training data set and does not perform well on a more general data set. Here's the general iterative process of machine learning:

Machine Learning

5.4 Split the data set

Split your ratings data set between an 80% training data set and a 20% test data set. Then rerun the steps to train the model on the training set, run it on the test set, and evaluate the performance.

In [31]:
(trainingRatings, testRatings) = ratings.randomSplit([80.0, 20.0])
In [32]:
als = ALS(userCol='userId', itemCol='movieId', ratingCol='rating')
model = als.fit(trainingRatings)
predictions = model.transform(testRatings)
In [33]:
predictions.toPandas().head()
Out[33]:
userId movieId rating timestamp prediction
0 191 148 5.0 829760897 NaN
1 597 471 2.0 941558175 4.238353
2 372 471 3.0 874415126 3.275066
3 218 471 4.0 1111624874 2.868498
4 312 471 4.0 1043175564 3.801344
In [34]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
print('The root mean squared error for our model is: {}'.format(evaluator.evaluate(predictions)))
The root mean squared error for our model is: nan

You might get the value nan (not a number) from the previous cell.

5.5 Handle NaN results

A NaN result is due to SPARK-14489 and because the model can't predict values for users for which there's no data. A temporary workaround is to exclude rows with predicted NaN values or to replace them with a constant, for instance, the general mean rating. However, to map to a real business problem, the data scientist, in collaboration with the business owner, must define what happens if such an event occurs. For example, you can provide no recommendation for a user until that user rates a few items. Alternatively, before user rates five items, you can use a user-based recommender system that's based on the user's profile (that's another recommender system to develop).

Replace predicted NaN values with the average rating and evaluate the model:

In [35]:
avgRatings = ratings.select('rating').groupBy().avg().first()[0]
print ('The average rating in the dataset is: {}'.format(avgRatings))

evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
print ('The root mean squared error for our model is: {}'.format(evaluator.evaluate(predictions.na.fill(avgRatings))))
The average rating in the dataset is: 3.501556983616962
The root mean squared error for our model is: 0.9000586650712746

Now exclude predicted NaN values and evaluate the model:

In [36]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
print ('The root mean squared error for our model is: {}'.format(evaluator.evaluate(predictions.na.drop())))
The root mean squared error for our model is: 0.8863126821673949

Obviously, you get lower performance than with the previous model, but you're protected against overfitting: you will actually get this level of performance on new incoming data!

5.6 Improve the performance score

If you run the randomSplit(), fit(), transform(), and evaluate() functions several times, you won't always get the same performance score. This is because the randomSplit() and ALS() functions have some randomness. To get a more precise performance score, run the model several times and then compute the average performance score. This process is really close to what is called ${k}$-fold cross validation.

Create a repeatALS() function that trains, runs, and evaluates the model multiple times:

In [37]:
def repeatALS(data, k=3, userCol='userId', itemCol='movieId', ratingCol='rating', metricName='rmse'):
    evaluations = []
    for i in range(0, k):  
        (trainingSet, testingSet) = data.randomSplit([k - 1.0, 1.0])
        als = ALS(userCol=userCol, itemCol=itemCol, ratingCol=ratingCol)
        model = als.fit(trainingSet)
        predictions = model.transform(testingSet)
        evaluator = RegressionEvaluator(metricName=metricName, labelCol='rating', predictionCol='prediction')
        evaluation = evaluator.evaluate(predictions.na.drop())
        print('Loop {}: {} = {}'.format(i + 1, metricName, evaluation))
        evaluations.append(evaluation)
    return sum(evaluations) / float(len(evaluations))

Run repeatALS four times and average the performance scores:

In [38]:
print('RMSE = {}'.format(repeatALS(ratings, k=4)))
Loop 1: rmse = 0.8906605382403731
Loop 2: rmse = 0.8837958660975254
Loop 3: rmse = 0.8852492047370137
Loop 4: rmse = 0.8892840940492689
RMSE = 0.8872474257810452

The computed performance score is more stable this way.

Create a kfoldALS() function that also trains, runs, and evaluates the model, but splits up the data between training and testing data sets in a different way. The original data set is split into k data sets. Each of the k iterations of the function uses a different data set for testing and the other data sets for training.

In [39]:
def kfoldALS(data, k=3, userCol='userId', itemCol='movieId', ratingCol='rating', metricName='rmse'):
    evaluations = []
    weights = [1.0] * k
    splits = data.randomSplit(weights)
    for i in range(0, k):  
        testingSet = splits[i]
        trainingSet = spark.createDataFrame(sc.emptyRDD(), data.schema)
        for j in range(0, k):
            if i == j:
                continue
            else:
                trainingSet = trainingSet.union(splits[j])
        als = ALS(userCol=userCol, itemCol=itemCol, ratingCol=ratingCol)
        model = als.fit(trainingSet)
        predictions = model.transform(testingSet)
        evaluator = RegressionEvaluator(metricName=metricName, labelCol='rating', predictionCol='prediction')
        evaluation = evaluator.evaluate(predictions.na.drop())
        print('Loop {}: {} = {}'.format(i + 1, metricName, evaluation))
        evaluations.append(evaluation)
    return sum(evaluations) / float(len(evaluations))

Compute the average performance score for 4 folds:

In [40]:
print('RMSE = {}'.format(kfoldALS(ratings, k=4)))
Loop 1: rmse = 0.8807296982315512
Loop 2: rmse = 0.8948281283128832
Loop 3: rmse = 0.8863483036667585
Loop 4: rmse = 0.8856905205249661
RMSE = 0.8868991626840398

Now compute the average performance score for 10 folds:

In [41]:
print('RMSE = {}'.format(kfoldALS(ratings, k=10)))
Loop 1: rmse = 0.8686338039094718
Loop 2: rmse = 0.8797971865659019
Loop 3: rmse = 0.8633948056259702
Loop 4: rmse = 0.8587820848910547
Loop 5: rmse = 0.8704924052449575
Loop 6: rmse = 0.8710890255878764
Loop 7: rmse = 0.8633550866503463
Loop 8: rmse = 0.855704172612134
Loop 9: rmse = 0.8768588912878453
Loop 10: rmse = 0.8814342315779007
RMSE = 0.8689541693953459

Recall that you want to minimize the RMSE. And that 10 folds means a training set of 90% of the data, while 4 folds means a training training set of 75% of the data. If you choose a value too small for ${k}$, you will run into the selection bias issue. On the other hand, too big value for ${k}$ means overfitting - high variance and low bias. Therefore, choosing the right ${k}$ is important. Usually, the recommended value of ${k}$ is between 5 and 10, however, this can change based on the data set.

5.7 Improve the model

So now, how can you improve this model? Machine learning algorithms have hyperparameters that control how the algorithm works.

The ALS algorithm has this signature:

class pyspark.ml.recommendation.ALS(
        rank=10,
        maxIter=10,
        regParam=0.1,
        numUserBlocks=10,
        numItemBlocks=10,
        implicitPrefs=false,
        alpha=1.0,
        userCol="user",
        itemCol="item",
        seed=None,
        ratingCol="rating",
        nonnegative=false,
        checkpointInterval=10,
        intermediateStorageLevel="MEMORY_AND_DISK",
        finalStorageLevel="MEMORY_AND_DISK"
    )

The ALS hyperparameters are:

  • rank = the number of latent factors in the model
  • maxIter = the maximum number of iterations
  • regParam = the regularization parameter

To test several values for those hyperparameters and choose the best configuration, it's common practice to define a grid of parameter combinations and to run a grid search over the combinations to evaluate the resulting models and comparing their performance. This process is known as model selection.

The Spark CrossValidator function performs a grid search as well as k-fold cross validation.
Run the CrossValidator function with multiple values for rank and regParam:

In [ ]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

(trainingRatings, validationRatings) = ratings.randomSplit([80.0, 20.0])
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')

paramGrid = ParamGridBuilder().addGrid(als.rank, [1, 5, 10]).addGrid(als.maxIter, [20]).addGrid(als.regParam, [0.05, 0.1, 0.5]).build()

crossval = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=10)
cvModel = crossval.fit(trainingRatings)
predictions = cvModel.transform(validationRatings)

print('The root mean squared error for our model is: {}'.format(evaluator.evaluate(predictions.na.drop())))

The more folds and parameters you add to the grid, the longer it takes to test any combination. The CrossValidator model contains more information about the performance for each combination that you can get with the avgMetrics() method. For example, you can graph the results on a plot for analysis.

Unfortunately, because of the SPARK-14489 issue mentioned above, the CrossValidator function can't compute the root mean squared error most of the time and provides incorrect results. You could limit this problem by making the training set much larger than the test set, but that's not a good practice. If you want to learn more about this issue, which is more a conceptual one than a technical one, you can have a look at Nick Pentreath's pull request #12896(https://github.com/apache/spark/pull/12896). Welcome to the Open Source world!

5.8 Recommend movies

To recommend movies for a specific user, create a function that applies the trained model, ALSModel, on the list of movies that the user hasn't yet rated.

Create a recommendMovies function:

In [44]:
from pyspark.sql.functions import lit

def recommendMovies(model, user, nbRecommendations):
    # Create a Spark DataFrame with the specified user and all the movies listed in the ratings DataFrame
    dataSet = ratings.select('movieId').distinct().withColumn('userId', lit(user))

    # Create a Spark DataFrame with the movies that have already been rated by this user
    moviesAlreadyRated = ratings.filter(ratings.userId == user).select('movieId', 'userId')

    # Apply the recommender system to the data set without the already rated movies to predict ratings
    predictions = model.transform(dataSet.subtract(moviesAlreadyRated)).dropna().orderBy('prediction', ascending=False).limit(nbRecommendations).select('movieId', 'prediction')

    # Join with the movies DataFrame to get the movies titles and genres
    recommendations = predictions.join(movies, predictions.movieId == movies.movieId).select(predictions.movieId, movies.title, movies.genres, predictions.prediction)

#     recommendations.show(truncate=False)
    return recommendations

Now run this function to recommend 10 movies for three different users:

In [45]:
print('Recommendations for user 133:')
recommendMovies(model, 133, 10).toPandas()
Recommendations for user 133:
Out[45]:
movieId title genres prediction
0 3022 General, The (1926) Comedy|War 3.922637
1 33649 Saving Face (2004) Comedy|Drama|Romance 3.883383
2 6818 Come and See (Idi i smotri) (1985) Drama|War 4.181536
3 148881 World of Tomorrow (2015) Animation|Comedy 4.033890
4 25771 Andalusian Dog, An (Chien andalou, Un) (1929) Fantasy 4.053865
5 53 Lamerica (1994) Adventure|Drama 3.857156
6 26258 Topo, El (1970) Fantasy|Western 3.901819
7 898 Philadelphia Story, The (1940) Comedy|Drama|Romance 3.835418
8 30803 3-Iron (Bin-jip) (2004) Drama|Romance 3.827286
9 26326 Holy Mountain, The (Montaña sagrada, La) (1973) Drama 3.805677
In [46]:
print('Recommendations for user 471:')
recommendMovies(model, 471, 10).toPandas()
Recommendations for user 471:
Out[46]:
movieId title genres prediction
0 174909 Logan Lucky (2017) Comedy 4.408311
1 147286 The Adventures of Sherlock Holmes and Doctor W... Crime|Mystery 4.392801
2 188751 Mamma Mia: Here We Go Again! (2018) Comedy|Romance 4.408311
3 2131 Autumn Sonata (Höstsonaten) (1978) Drama 4.438284
4 123 Chungking Express (Chung Hing sam lam) (1994) Drama|Mystery|Romance 4.548064
5 151769 Three from Prostokvashino (1978) Animation 4.392801
6 175397 In the blue sea, in the white foam. (1984) Animation|Children|Fantasy 4.392801
7 51931 Reign Over Me (2007) Drama 4.412847
8 183897 Isle of Dogs (2018) Animation|Comedy 4.430795
9 177593 Three Billboards Outside Ebbing, Missouri (2017) Crime|Drama 4.429290
In [47]:
print('Recommendations for user 496:')
recommendMovies(model, 496, 10).toPandas()
Recommendations for user 496:
Out[47]:
movieId title genres prediction
0 61240 Let the Right One In (Låt den rätte komma in) ... Drama|Fantasy|Horror|Romance 4.771688
1 3379 On the Beach (1959) Drama 4.776227
2 96004 Dragon Ball Z: The History of Trunks (Doragon ... Action|Adventure|Animation 4.776227
3 931 Spellbound (1945) Mystery|Romance|Thriller 4.932300
4 86377 Louis C.K.: Shameless (2007) Comedy 4.945564
5 4967 No Man's Land (2001) Drama|War 5.029913
6 2360 Celebration, The (Festen) (1998) Drama 4.781893
7 3083 All About My Mother (Todo sobre mi madre) (1999) Drama 4.915379
8 971 Cat on a Hot Tin Roof (1958) Drama 4.779697
9 26810 Bad Boy Bubby (1993) Drama 4.927678

6. Summary and next steps

Here are some suggestions to improve your model:

  • Build a confusion matrix to understand where your model is good and where it is bad. The matrix should be 10x10 where the rows are the ratings, the columns are the predictions, and each cell shows the number of predictions in each class (0.5, 1.0, 1.5, etc.) for each real rating. The matrix diagonal should ideally contain all the values. You could plot this matrix as a heatmap with matplotlib.
  • Bound the predicted ratings between 0.5 and 5.
  • Use more data, like tags or timestamps.
  • Run the model on the full data set. Compare the execution time and the results. Downloading the full data set to your computer and then loading it into Watson Studio might take a lot of time, depending on your network configuration. You can load the data set directly from GroupLens servers to your Watson Studio environment, but that method isn't described in this notebook.

Author

Victor Hatinguais specializes in technology evangelism, training, demos, and proofs of concept around Data Science and Big Data technologies, such as, Apache Hadoop and IBM BigInsights, Apache Spark, IBM Streams, and IBM SPSS.

Data citation

F. Maxwell Harper and Joseph A. Konstan. 2015. The MovieLens Datasets: History and Context. ACM Transactions on Interactive Intelligent Systems (TiiS) 5, 4, Article 19 (December 2015), 19 pages. DOI=http://dx.doi.org/10.1145/2827872


Copyright © IBM Corp. 2017-2019. This notebook and its source code are released under the terms of the MIT License.

Love this notebook? Don't have an account yet?
Share it with your colleagues and help them discover the power of Watson Studio! Sign Up