Collaborative Filtering on Amazon Products With PySpark
@ rushi | Sunday, Jun 19, 2022 | 4 minutes read | Update at Sunday, Jun 19, 2022

Goals

Recommend top 5 products for an user: RMSE = 1.22

Data set description

  • This is a list of over 34,000 consumer reviews for Amazon products like the Kindle, Fire TV Stick, and more provided by Datafiniti’s Product Database. The dataset includes basic product information, rating, review text, and more for each product.
  • Note that this is a sample of a large dataset. The full dataset is available through Datafiniti.
from pyspark.sql import SparkSession, column
spark = SparkSession.builder.appName('rs').getOrCreate()
df = spark.read.csv('Amazon_Consumer_Reviews.csv', inferSchema=True, header=True)
df.printSchema()
root
 |-- id: string (nullable = true)
 |-- dateAdded: timestamp (nullable = true)
 |-- dateUpdated: timestamp (nullable = true)
 |-- name: string (nullable = true)
 |-- asins: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- primaryCategories: string (nullable = true)
 |-- imageURLs: string (nullable = true)
 |-- keys: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- manufacturerNumber: string (nullable = true)
 |-- reviews.date: string (nullable = true)
 |-- reviews.dateSeen: string (nullable = true)
 |-- reviews.didPurchase: string (nullable = true)
 |-- reviews.doRecommend: boolean (nullable = true)
 |-- reviews.id: string (nullable = true)
 |-- reviews.numHelpful: integer (nullable = true)
 |-- reviews.rating: integer (nullable = true)
 |-- reviews.sourceURLs: string (nullable = true)
 |-- reviews.text: string (nullable = true)
 |-- reviews.title: string (nullable = true)
 |-- reviews.username: string (nullable = true)
 |-- sourceURLs: string (nullable = true)
import re
from pyspark.sql.functions import col
def rename_cols(df):
    for column in df.columns:
        new_column = column.replace('.','')
        df = df.withColumnRenamed(column, new_column)
    return df
df_2 = rename_cols(df)
df_2.columns
['id',
 'dateAdded',
 'dateUpdated',
 'name',
 'asins',
 'brand',
 'categories',
 'primaryCategories',
 'imageURLs',
 'keys',
 'manufacturer',
 'manufacturerNumber',
 'reviewsdate',
 'reviewsdateSeen',
 'reviewsdidPurchase',
 'reviewsdoRecommend',
 'reviewsid',
 'reviewsnumHelpful',
 'reviewsrating',
 'reviewssourceURLs',
 'reviewstext',
 'reviewstitle',
 'reviewsusername',
 'sourceURLs']
df_3 = df_2.select('reviewsusername', 'id', 'reviewsrating')
df_3.groupBy('reviewsusername').count().orderBy('count', ascending=False).show(10)
+-----------------+-----+
|  reviewsusername|count|
+-----------------+-----+
|ByAmazon Customer|  889|
|             Mike|   62|
|ByKindle Customer|   45|
|             Dave|   44|
|            Chris|   38|
|             John|   30|
|             Nana|   28|
|        Anonymous|   27|
|           ByMike|   26|
|             Brad|   26|
+-----------------+-----+
only showing top 10 rows
df_3.groupBy('reviewsrating').count().orderBy('count', ascending=False).show(10)
+-------------+-----+
|reviewsrating|count|
+-------------+-----+
|            5|19831|
|            4| 5621|
|            3| 1205|
|            1|  965|
|            2|  617|
|            0|   91|
|           16|    1|
|           44|    1|
+-------------+-----+
df_4 = df_3.filter("reviewsrating != 44 AND reviewsrating != 16")
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, IndexToString
stringIndexer = StringIndexer(inputCol="id", outputCol="id_int")
model = stringIndexer.fit(df_4)
df_5 = model.transform(df_4)
df_5.groupBy('id_int').count().orderBy('count', ascending=False).show(10)
+------+-----+
|id_int|count|
+------+-----+
|   0.0| 8343|
|   1.0| 3728|
|   2.0| 2443|
|   3.0| 2370|
|   4.0| 1676|
|   5.0| 1425|
|   6.0| 1212|
|   7.0| 1024|
|   8.0|  987|
|   9.0|  883|
+------+-----+
only showing top 10 rows
stringIndexer = StringIndexer(inputCol="reviewsusername", outputCol="userid")
model = stringIndexer.fit(df_5)
df_6 = model.transform(df_5)
df_6.show(5)
+----------------+--------------------+-------------+------+-------+
| reviewsusername|                  id|reviewsrating|id_int| userid|
+----------------+--------------------+-------------+------+-------+
|      Byger yang|AVpgNzjwLJeJML43Kpxn|            3|   0.0|12659.0|
|            ByMG|AVpgNzjwLJeJML43Kpxn|            4|   0.0|  153.0|
|BySharon Lambert|AVpgNzjwLJeJML43Kpxn|            5|   0.0|11560.0|
|   Bymark sexson|AVpgNzjwLJeJML43Kpxn|            5|   0.0|12926.0|
|         Bylinda|AVpgNzjwLJeJML43Kpxn|            5|   0.0|  953.0|
+----------------+--------------------+-------------+------+-------+
only showing top 5 rows
train, test = df_6.randomSplit([0.75,0.25])
train.count()
21251
test.count()
7079
from pyspark.ml.recommendation import ALS
rs = ALS(maxIter=10, regParam=0.01, userCol='userid', itemCol='id_int', ratingCol='reviewsrating', nonnegative=True, coldStartStrategy="drop")
rs = rs.fit(train)
pred = rs.transform(test)

RMSE is roughly 1.22, which is quite satisfied for a small dataset of 21000 training rows. Lower RMSE can be achieved with a hybrid recommender system.

from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName='rmse', predictionCol='prediction', labelCol='reviewsrating')
rmse = evaluator.evaluate(pred)
print(rmse)
1.2034306578134335

Top 5 recommend products userid 20

# find out which products userid has used
total_prod = df_6.select('id_int').distinct()
used_prod = df_6.filter(df_6['userid'] == 20).select('id_int').distinct()
used_prod = used_prod.withColumnRenamed("id_int", "id_int_used")
joined = total_prod.join(used_prod, total_prod.id_int == used_prod.id_int_used, how='left')
new_prod = joined.where(col('id_int_used').isNull()).select(col('id_int')).distinct()
new_prod = new_prod.withColumn("userid",lit(int(20)))
new_prod.show(5)
+------+------+
|id_int|userid|
+------+------+
|   8.0|    20|
|   7.0|    20|
|  49.0|    20|
|  29.0|    20|
|  64.0|    20|
+------+------+
only showing top 5 rows
# run ALS model and pick top 5 products
rec = rs.transform(new_prod).orderBy('prediction', ascending=False)
rec.createTempView('rec')
rec_5 = spark.sql('SELECT id_int FROM rec LIMIT 5')
prod_id = rec_5.join(df_6, rec_5.id_int == df_6.id_int, how='left')
prod_id.select('id').join(df_2, prod_id.id == df_2.id, how='left').select('name').distinct().show()
+--------------------+
|                name|
+--------------------+
|Amazon Fire TV Ga...|
|AmazonBasics Vent...|
|Amazon Echo Show ...|
|AmazonBasics Nylo...|
|All-New Kindle Oa...|
+--------------------+

关于我

g1eny0ung 的 ❤️ 博客

记录一些 🌈 生活上,技术上的事

一名大四学生

马上(已经)毕业于 🏫 大连东软信息学院

职业是前端工程师

业余时间会做开源和 Apple App (OSX & iOS)

主要的技术栈是:

  • JavaScript & TypeScript
  • React.js
  • Electron
  • Rust

写着玩(写过):

  • Java & Clojure & CLJS
  • OCaml & Reason & ReScript
  • Dart & Swift

目前在 PingCAP 工作

– 2020 年 09 月 09 日更新

其他

如果你喜欢我的开源项目或者它们可以给你带来帮助,可以赏一杯咖啡 ☕ 给我。~

If you like my open source projects or they can help you. You can buy me a coffee ☕.~

PayPal

https://paypal.me/g1eny0ung

Patreon:

Become a Patron!

微信赞赏码

wechat

最好附加一下信息或者留言,方便我可以将捐助记录 📝 下来,十分感谢 🙏。

It is better to attach some information or leave a message so that I can record the donation 📝, thank you very much 🙏.