读取数据,进行类型处理(数组到Vector)
from pyspark.ml.linalg import Vectors
# 选取部分数据做测试
article_vector = w2v.spark.sql("select article_id, articlevector from article_vector where channel_id=18 limit 10")
train = articlevector.select(['article_id', 'articleVector'])def _array_to_vector(row):return row.article_id, Vectors.dense(row.articleVector)train = train.rdd.map(_array_to_vector).toDF(['article_id', 'articleVector'])
BRP进行FIT
from pyspark.ml.feature import BucketedRandomProjectionLSHbrp = BucketedRandomProjectionLSH(inputCol='articleVector', outputCol='hashes', numHashTables=4.0, bucketLength=10.0)
model = brp.fit(train)
计算相似的文章以及相似度
similar = model.approxSimilarityJoin(test, train, 2.0, distCol='EuclideanDistance')similar.sort(['EuclideanDistance']).show()
对于计算出来的相似度,是要在推荐的时候使用。那么我们所知的是,HIVE只适合在离线分析时候使用,因为运行速度慢,所以只能将相似度存储到HBASE当中
目的:将所有文章对应相似度文章及其相似度保存
步骤:
我们需要建立一个HBase存储文章相似度的表
create 'article_similar', 'similar'# 存储格式如下:key:为article_id, 'similar:article_id', 结果为相似度
put 'article_similar', '1', 'similar:1', 0.2
put 'article_similar', '1', 'similar:2', 0.34
put 'article_similar', '1', 'similar:3', 0.267
put 'article_similar', '1', 'similar:4', 0.56
put 'article_similar', '1', 'similar:5', 0.7
put 'article_similar', '1', 'similar:6', 0.819
put 'article_similar', '1', 'similar:8', 0.28
定义保存HBASE函数,确保我们的happybase连接hbase启动成功,Thrift服务打开。hbase集群出现退出等问题常见原因,配置文件hadoop目录,地址等,还有
def save_hbase(partition):import happybasepool = happybase.ConnectionPool(size=3, host='hadoop-master')with pool.connection() as conn:# 建议表的连接table = conn.table('article_similar')for row in partition:if row.datasetA.article_id == row.datasetB.article_id:passelse:table.put(str(row.datasetA.article_id).encode(),{"similar:{}".format(row.datasetB.article_id).encode(): b'%0.4f' % (row.EuclideanDistance)})# 手动关闭所有的连接conn.close()similar.foreachPartition(save_hbase)
上一篇:经典同步问题