Recommender System with Spark and IBM Watson Studio

When your website generates a lot of explicit data e.g. in the form of ratings, it makes a great starting point for making user experience more personalized.
In this example, we'll follow the collaborative filtering approach and apply Alternating Least Squares (ALS) algorithm to create a recommedation engine for a website.

In [1]:
from pyspark import SparkContext, SQLContext
from pyspark.sql import SparkSession

spark = SparkSession\
  .builder\
  .appName("Recommender system in Spark")\
  .config("spark.some.config.option", "some-value") \
  .getOrCreate()
Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20190816092651-0003
KERNEL_ID = 1d774b1b-3578-4c7a-a81f-b38ab48e200f

Let's look into the MovieLens datasets we are going to use here.
There are two csv files we need to merge based on 'movieId' column:

In [2]:
import ibmos2spark
credentials = {
    'endpoint': 'https://s3.eu-geo.objectstorage.service.networklayer.com',
    'service_id': 'iam-ServiceId-9ebf6bf4-20a4-47da-bd08-ba8e7807ae90',
    'iam_service_endpoint': 'https://iam.eu-gb.bluemix.net/oidc/token',
    'api_key': 'PC5PWGuwGlO3Ch7Ygh3Fc4CU_zACtcXN1z7Yf3zNFjA7'
}

configuration_name = 'os_3f4c7db4c46d4481b29169bb36b6aa49_configs'
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
movies = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(cos.url('movies.csv', 'recommendationengine-donotdelete-pr-1cnfjaobjuzd3x'))
movies.show(5)
+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows

In [3]:
ratings = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(cos.url('ratings.csv', 'recommendationengine-donotdelete-pr-1cnfjaobjuzd3x'))
ratings.show(5)
+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|      2|   3.5|1112486027|
|     1|     29|   3.5|1112484676|
|     1|     32|   3.5|1112484819|
|     1|     47|   3.5|1112484727|
|     1|     50|   3.5|1112484580|
+------+-------+------+----------+
only showing top 5 rows

In [4]:
ratings.join(movies, 'movieId').show(3)
+-------+------+------+----------+--------------------+--------------------+
|movieId|userId|rating| timestamp|               title|              genres|
+-------+------+------+----------+--------------------+--------------------+
|      2|     1|   3.5|1112486027|      Jumanji (1995)|Adventure|Childre...|
|     29|     1|   3.5|1112484676|City of Lost Chil...|Adventure|Drama|F...|
|     32|     1|   3.5|1112484819|Twelve Monkeys (a...|Mystery|Sci-Fi|Th...|
+-------+------+------+----------+--------------------+--------------------+
only showing top 3 rows

We shall now check if there are missing values in the dataset and convert data type of the variables into integers for further analysis:

In [5]:
df = ratings.select('userId', 'movieId', 'rating')
df.dtypes
Out[5]:
[('userId', 'string'), ('movieId', 'string'), ('rating', 'string')]
In [6]:
from pyspark.sql.functions import isnan

print(df.filter((df['userId'] == "") | df['userId'].isNull() | isnan(df['userId'])).count())
print(df.filter((df['movieId'] == "") | df['movieId'].isNull() | isnan(df['movieId'])).count())
print(df.filter((df['rating'] == "") | df['rating'].isNull() | isnan(df['rating'])).count())
0
0
0
In [7]:
from pyspark.sql.types import IntegerType

df = df.withColumn('userId', df['userId'].cast(IntegerType()))
df = df.withColumn('movieId', df['movieId'].cast(IntegerType()))
df = df.withColumn('rating', df['rating'].cast(IntegerType()))
In [8]:
df.dtypes
Out[8]:
[('userId', 'int'), ('movieId', 'int'), ('rating', 'int')]

Let's now create subsets of data for training and testing the model:

In [9]:
train_test_splits = df.randomSplit([0.8, 0.2])
train = train_test_splits[0].withColumnRenamed('rating', 'label')
test = train_test_splits[1].withColumnRenamed('rating', 'trueRating')

We'll apply Alternating Least Squares method to train and test the model:

In [10]:
from pyspark.ml.recommendation import ALS

als = ALS(maxIter=19, regParam=0.01, userCol='userId', itemCol='movieId', ratingCol='label')

model = als.fit(train)
prediction = model.transform(test)

Let's take a look at some of the predictions the model made, compare them to the true values and compute RMSE:

In [11]:
prediction.join(movies, 'movieId').select('userId', 'title', 'prediction', 'trueRating').show(n=10, truncate=False)
+------+--------------------------------+----------+----------+
|userId|title                           |prediction|trueRating|
+------+--------------------------------+----------+----------+
|3673  |Awfully Big Adventure, An (1995)|2.3890615 |2         |
|5186  |Awfully Big Adventure, An (1995)|3.4688776 |2         |
|903   |Awfully Big Adventure, An (1995)|1.2790377 |3         |
|3335  |Awfully Big Adventure, An (1995)|2.2354188 |5         |
|3673  |Guilty as Sin (1993)            |3.3912914 |3         |
|2242  |Guilty as Sin (1993)            |2.434418  |3         |
|4162  |Guilty as Sin (1993)            |2.713019  |3         |
|51    |Guilty as Sin (1993)            |2.0280638 |2         |
|2274  |Guilty as Sin (1993)            |3.0363755 |3         |
|3352  |Hudsucker Proxy, The (1994)     |3.1329057 |3         |
+------+--------------------------------+----------+----------+
only showing top 10 rows

In [12]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol='trueRating', predictionCol='prediction', metricName='rmse')
prediction = prediction.dropna(how='any', subset=["prediction"])
rmse = evaluator.evaluate(prediction)

print ('Root Mean Square Error:', rmse)
Root Mean Square Error: 0.9252403069943134

Finally we are in a position to generate recommendations for all the users or pick a user to see what the recemmendation system we've build is recommending specifically to them:

In [17]:
userRecs = model.recommendForAllUsers(10)
userRecs.show(5)
+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[[25963, 7.063578...|
|  1591|[[53161, 9.807543...|
|  4101|[[59549, 6.138625...|
|  3794|[[7930, 7.566674]...|
|  6654|[[27888, 8.70235]...|
+------+--------------------+
only showing top 5 rows

This user defined function I came across on the Internet makes it easy to make recommendations for a specific user:

In [18]:
def get_recs_for_users(recs):
    recs = recs.select('recommendations.movieId','recommendations.rating')
    movies = recs.select('movieId').toPandas().iloc[0,0]
    ratings = recs.select('rating').toPandas().iloc[0,0]
    ratings_matrix = pd.DataFrame(movies,columns = ['movieId'])
    ratings_matrix['ratings'] = ratings
    ratings_matrix_ps = sqlContext.createDataFrame(ratings_matrix)
    return ratings_matrix_ps
In [21]:
from pyspark.sql.functions import col 
import pandas as pd
user_1000_recs = userRecs.filter(col('userId') == 1000) 
get_recs_for_users(user_1000_recs).show(10)
+-------+------------------+
|movieId|           ratings|
+-------+------------------+
|   1163|10.180137634277344|
|   2079| 8.391616821289062|
|  66200| 8.203961372375488|
|  48883| 8.014423370361328|
|  59727|7.9298248291015625|
|   5840|  7.76695442199707|
| 102684|7.7248735427856445|
|  50158| 7.722866058349609|
|   5461| 7.702124118804932|
|  64114| 7.623742580413818|
+-------+------------------+

Well done! Now we have a recommendation system to offer the user more personalized experience.