When your website generates a lot of explicit data e.g. in the form of ratings, it makes a great starting point for making user experience more personalized.
In this example, we'll follow the collaborative filtering approach and apply Alternating Least Squares (ALS) algorithm to create a recommedation engine for a website.
from pyspark import SparkContext, SQLContext
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("Recommender system in Spark")\
.config("spark.some.config.option", "some-value") \
.getOrCreate()
Let's look into the MovieLens datasets we are going to use here.
There are two csv files we need to merge based on 'movieId' column:
import ibmos2spark
credentials = {
'endpoint': 'https://s3.eu-geo.objectstorage.service.networklayer.com',
'service_id': 'iam-ServiceId-9ebf6bf4-20a4-47da-bd08-ba8e7807ae90',
'iam_service_endpoint': 'https://iam.eu-gb.bluemix.net/oidc/token',
'api_key': 'PC5PWGuwGlO3Ch7Ygh3Fc4CU_zACtcXN1z7Yf3zNFjA7'
}
configuration_name = 'os_3f4c7db4c46d4481b29169bb36b6aa49_configs'
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
movies = spark.read\
.format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
.option('header', 'true')\
.load(cos.url('movies.csv', 'recommendationengine-donotdelete-pr-1cnfjaobjuzd3x'))
movies.show(5)
ratings = spark.read\
.format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
.option('header', 'true')\
.load(cos.url('ratings.csv', 'recommendationengine-donotdelete-pr-1cnfjaobjuzd3x'))
ratings.show(5)
ratings.join(movies, 'movieId').show(3)
We shall now check if there are missing values in the dataset and convert data type of the variables into integers for further analysis:
df = ratings.select('userId', 'movieId', 'rating')
df.dtypes
from pyspark.sql.functions import isnan
print(df.filter((df['userId'] == "") | df['userId'].isNull() | isnan(df['userId'])).count())
print(df.filter((df['movieId'] == "") | df['movieId'].isNull() | isnan(df['movieId'])).count())
print(df.filter((df['rating'] == "") | df['rating'].isNull() | isnan(df['rating'])).count())
from pyspark.sql.types import IntegerType
df = df.withColumn('userId', df['userId'].cast(IntegerType()))
df = df.withColumn('movieId', df['movieId'].cast(IntegerType()))
df = df.withColumn('rating', df['rating'].cast(IntegerType()))
df.dtypes
Let's now create subsets of data for training and testing the model:
train_test_splits = df.randomSplit([0.8, 0.2])
train = train_test_splits[0].withColumnRenamed('rating', 'label')
test = train_test_splits[1].withColumnRenamed('rating', 'trueRating')
We'll apply Alternating Least Squares method to train and test the model:
from pyspark.ml.recommendation import ALS
als = ALS(maxIter=19, regParam=0.01, userCol='userId', itemCol='movieId', ratingCol='label')
model = als.fit(train)
prediction = model.transform(test)
Let's take a look at some of the predictions the model made, compare them to the true values and compute RMSE:
prediction.join(movies, 'movieId').select('userId', 'title', 'prediction', 'trueRating').show(n=10, truncate=False)
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol='trueRating', predictionCol='prediction', metricName='rmse')
prediction = prediction.dropna(how='any', subset=["prediction"])
rmse = evaluator.evaluate(prediction)
print ('Root Mean Square Error:', rmse)
Finally we are in a position to generate recommendations for all the users or pick a user to see what the recemmendation system we've build is recommending specifically to them:
userRecs = model.recommendForAllUsers(10)
userRecs.show(5)
This user defined function I came across on the Internet makes it easy to make recommendations for a specific user:
def get_recs_for_users(recs):
recs = recs.select('recommendations.movieId','recommendations.rating')
movies = recs.select('movieId').toPandas().iloc[0,0]
ratings = recs.select('rating').toPandas().iloc[0,0]
ratings_matrix = pd.DataFrame(movies,columns = ['movieId'])
ratings_matrix['ratings'] = ratings
ratings_matrix_ps = sqlContext.createDataFrame(ratings_matrix)
return ratings_matrix_ps
from pyspark.sql.functions import col
import pandas as pd
user_1000_recs = userRecs.filter(col('userId') == 1000)
get_recs_for_users(user_1000_recs).show(10)
Well done! Now we have a recommendation system to offer the user more personalized experience.