Hive + Spark(处理循环)计算活跃用户留存
创始人
2025-05-31 19:25:50
0
10日的活跃表可以计算8日活跃的两日留存9日活跃的一日留存 

 历史活跃区间表:

hive>create table active_range_test(
user_id         string,
first_dt        string,
range_start     string,
range_end       string
)
partitioned by (dt string)
row format delimited fields terminated by ','
;

数据:
range.txt(历史数据)

linux>vi range.txt
a1,2023-03-01,2023-03-01,2023-03-04
a1,2023-03-01,2023-03-06,2023-03-08
b1,2023-03-01,2023-03-03,2023-03-04
b1,2023-03-01,2023-03-05,2023-03-08
c1,2023-03-01,2023-03-01,2023-03-06
c1,2023-03-01,2023-03-07,9999-12-31
d1,2023-03-05,2023-03-05,9999-12-31
#a1用户3.1-3.4号来过,3.6-3.8号来过

载入数据

hive>load data local inpath '/root/tmp_data/data/range.txt' into table active_range_test partition (dt='2023-03-10');
Spark Sql(计算活跃用户留存)
pom.xml:

4.0.0org.exampleLogTest1.0-SNAPSHOTscala-demo-projecthttp://www.example.comUTF-81.81.8org.scala-langscala-library2.11.8org.apache.kafkakafka_2.110.11.0.2org.apache.commonscommons-dbcp22.1.1mysqlmysql-connector-java5.1.47org.apache.hadoophadoop-client2.7.3org.apache.sparkspark-sql_2.112.2.0org.apache.sparkspark-streaming_2.112.2.0org.apache.sparkspark-streaming-kafka-0-10_2.112.2.0org.scalikejdbcscalikejdbc_2.113.1.0org.scalikejdbcscalikejdbc-config_2.113.1.0org.apache.sparkspark-sql_2.112.2.0org.apache.sparkspark-hive_2.112.2.0org.apache.sparkspark-graphx_2.112.2.0com.alibabafastjson1.2.69ch.hsrgeohash1.3.0org.mongodb.sparkmongo-spark-connector_2.112.2.0com.alibabadruid1.1.10redis.clientsjedis2.9.3org.apache.logging.log4jlog4j-api-scala_2.1111.0net.alchim31.mavenscala-maven-plugin3.2.2compiletestCompileorg.apache.maven.pluginsmaven-shade-plugin2.4.3packageshadecn.kgc.kafak.demo.ThreadProducer*:*META-INF/*.SFNF/*.DSAMETA-INF/*.RSA
ActiveRetentionDemo(加载工具类计算活跃用户留存)
package ActiveRetentionDemo
//活跃留存
import common.utils.DateUtilDemo
import org.apache.spark.sql.{SaveMode, SparkSession}object ActiveRetentionDemo {/**** 从hive中读取数据* 经过处理首* 写入hive中** @param args*///# hive --service metastore hive启动此项服务def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[*]").appName("test").enableHiveSupport()//      .config("mapreduce.app-submission.cross-platform", "true") //在windows下.config("hive.metastore.uris", "thrift://192.168.58.200:9083").getOrCreate()import spark.implicits._val df = spark.sql("select * from testhivedb.active_range_test  where dt='2023-03-10'")val filtered = df.where("dt='2023-03-10' and range_end='9999-12-31'")val result = filtered.flatMap(row => {val userId = row.getAs[String]("user_id")val startDate = row.getAs[String]("range_start")val dt = row.getAs[String]("dt")import common.utils._// dt-1  dateSub(date,days)val endDate = DateUtil.addDays(-1,dt)val days = DateUtil.getDayDiff(startDate, endDate)for (i <- 0L to  days) yield (userId, DateUtil.addDays(i.toInt, startDate))}).toDF("user_id", "dt")//.show()/**上面运行的结果*** +-------+----------+* |user_id|        dt|* +-------+----------+* |     c1|2023-03-07|* |     c1|2023-03-08|* |     c1|2023-03-09|* |     d1|2023-03-05|* |     d1|2023-03-06|* |     d1|2023-03-07|* |     d1|2023-03-08|* |     d1|2023-03-09|* +-------+----------+*///  result.show() //或者执行此项也可以得到上面的结果result.createTempView("v_result")spark.sql("""||select|dt as dt,|datediff('2023-03-11',dt) as days,|count(1) as counts|from v_result|group by dt|||||""".stripMargin).show()spark.stop()
//结果://+----------+----+------+//|        dt|days|counts|//+----------+----+------+//|2023-03-06|   5|     1|//|2023-03-05|   6|     1|//|2023-03-07|   4|     2|//|2023-03-08|   3|     2|//|2023-03-09|   2|     2|//+----------+----+------+}
}
DateUtilDemo(工具类)
package common.utilsimport org.apache.commons.lang.time.DateFormatUtils
import org.apache.commons.lang3.time.DateUtilsobject DateUtilDemo {def getDayDiff(start: String, end: String): Long = {val startDate = DateUtils.parseDate(start, "yyyy-MM-dd")val endDate = DateUtils.parseDate(end, "yyyy-MM-dd")var diff: Long = 0if (startDate != null && endDate != null) {diff = (endDate.getTime - startDate.getTime) / DateUtils.MILLIS_PER_DAY}diff}def addDays(days: Int, start: String): String = {val startDate = DateUtils.parseDate(start, "yyyy-MM-dd")val result = DateUtils.addDays(startDate, days)DateFormatUtils.format(result, "yyyy-MM-dd")}def main(args: Array[String]): Unit = {//  println(getDayDiff("2023-3-5", "2023-3-12"))  //第二个值减第一个值返回差 结果:7//  println(addDays(1, "2023-03-12")) // +1天 结果:2023-03-13}}

报错:WARN [hive.metastore] - Failed to connect to the MetaStore Server...
org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection refused: connect

解决办法:
在hive服务器上运行hive --service metastore即可

linux>hive --service metastore

 报错2:Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.base.Stopwatch.elapsedMillis()J
原因:Spark2.4.8没有集成Google Guava库或者Guava版本过高导致(16.0版本已经删除了elapsedMillis方法)

解决:有两种方法:
方法一:需要在pom.xml中添加Google Guava库,添加如下:

com.google.guavaguava15.0

方法二(建议使用):需要在pom.xml中添加HDFS客户端依赖,添加如下:
 

org.apache.hadoophadoop-client2.7.3

spark写好后打包传入hive服务器上

--创建自定义函数

hive>add jar /home/vagrant/udf-demo-1.0-SNAPSHOT-jar-with-dependencies.jar;
hive>create temporary function dateDivide as 'ActionDivideDemo.ActionDivideDemo';

--测试

hive>select dateDivide(7,'2023-03-12');返回:+----------------------------------------------------+--+
|                        _c0                         |
+----------------------------------------------------+--+
| 2023-03-10,2023-03-09,2023-03-08,2023-03-07,2023-03-06,2023-03-05,2023-03-04 |
+----------------------------------------------------+--+
1 row selected (0.483 seconds)

--计算活跃用户留存

hive>with one as(
select 
user_id,
first_dt,
range_start,
range_end
from active_range_test 
where dt = '2023-03-10' and range_end='9999-12-31'
),
two as(
select 
user_id,
split(dateDivide(datediff('2023-03-11',range_start),'2023-03-11'),',') as days 
from one
),
three as(
select 
dt
from two 
lateral view explode(days) z as dt
)
select 
dt,
datediff('2023-03-11',dt)-1 as days,
count(1) as users
from three
group by dt;
返回结果:+-------------+-------+--------+--+
|     dt      | days  | users  |
+-------------+-------+--------+--+
| 2023-03-04  | 6     | 1      |
| 2023-03-05  | 5     | 1      |
| 2023-03-06  | 4     | 2      |
| 2023-03-07  | 3     | 2      |
| 2023-03-08  | 2     | 2      |
| 2023-03-09  | 1     | 2      |
+-------------+-------+--------+--+

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...
一帆风顺二龙腾飞三阳开泰祝福语... 本篇文章极速百科给大家谈谈一帆风顺二龙腾飞三阳开泰祝福语,以及一帆风顺二龙腾飞三阳开泰祝福语结婚对应...