payspark als

2019-07-30

payspark als

from pyspark.sql import SparkSession

 

import math

from os.path import abspath

 

def cosSim(v1, v2):

    member = reduce(lambda x, y: x+y, map(lambda d: d[0]*d[1], zip(v1,v2)))

    t1 = math.sqrt(reduce(lambda m, n: m+n, map(lambda x: math.pow(x, 2), v1)))

    t2 = math.sqrt(reduce(lambda m, n: m+n, map(lambda x: math.pow(x, 2), v2)))

    return 1.0*member/(t1*t2)

 

warehouse_location = abspath(‘spark-warehouse‘)

 

spark = SparkSession.builder.appName("pyspark als").config("spark.sql.warehouse.dir", warehouse_location).enableHiveSupport().getOrCreate()

 

data = spark.sql("select userid,itemid,count(*) as score from table group by userid,itemid")

from pyspark.mllib.recommendation import Rating

ratings = data.rdd.map(lambda x:Rating(int(x[0]), int(x[1]), float(x[2])))

from pyspark.mllib.recommendation import ALS

model = ALS.trainImplicit(ratings, rank=10)

jf = model.productFeatures()

jf = spark.createDataFrame(jf)

jf = jf.withColumnRenamed("_1", "id").withColumnRenamed("_2", "features")

vvv = spark.sql("select itemid,code,city_code from table")

vjf = jf.join(vvv, jf.id == vvv.itemid)

cvjf = vjf.select("itemid", "code", "city_code", "features")

jcvjf = cvjf.withColumnRenamed("itemid", "xjobid").withColumnRenamed("code", "xcode").withColumnRenamed("city_code", "xcity_code").withColumnRenamed("features", "xfeatures")

jjj = cvjf.join(jcvjf, (cvjf.code == jcvjf.xcode) & (cvjf.city_code == jcvjf.xcity_code))

jj = jjj.filter("itemid!=xitemid").select("itemid", "xitemid", "features", "xfeatures")

 

sjj = jj.rdd.map(lambda x: (x[0], x[1], cosSim(x[2], x[3])))

gsj = sjj.groupBy(lambda x: x[0])

sj3 = gsj.map(lambda x: str(x[0])+‘,‘+‘,‘.join(map(lambda m: str(m[1]), sorted(list(x[1]), key=lambda n: n[2], reverse=True)[:10])))

#print sj3.collect()

sj3.saveAsTextFile("/hdfs/py_als_test")

spark.stop()

payspark als

payspark als

原文地址:https://www.cnblogs.com/kayy/p/11272329.html