10日的活跃表可以计算8日活跃的两日留存9日活跃的一日留存
hive>create table active_range_test(
user_id string,
first_dt string,
range_start string,
range_end string
)
partitioned by (dt string)
row format delimited fields terminated by ','
;
数据:
range.txt(历史数据)
linux>vi range.txt
a1,2023-03-01,2023-03-01,2023-03-04
a1,2023-03-01,2023-03-06,2023-03-08
b1,2023-03-01,2023-03-03,2023-03-04
b1,2023-03-01,2023-03-05,2023-03-08
c1,2023-03-01,2023-03-01,2023-03-06
c1,2023-03-01,2023-03-07,9999-12-31
d1,2023-03-05,2023-03-05,9999-12-31
#a1用户3.1-3.4号来过,3.6-3.8号来过
载入数据
hive>load data local inpath '/root/tmp_data/data/range.txt' into table active_range_test partition (dt='2023-03-10');
Spark Sql(计算活跃用户留存) pom.xml:
4.0.0 org.example LogTest 1.0-SNAPSHOT scala-demo-project http://www.example.com UTF-8 1.8 1.8 org.scala-lang scala-library 2.11.8 org.apache.kafka kafka_2.11 0.11.0.2 org.apache.commons commons-dbcp2 2.1.1 mysql mysql-connector-java 5.1.47 org.apache.hadoop hadoop-client 2.7.3 org.apache.spark spark-sql_2.11 2.2.0 org.apache.spark spark-streaming_2.11 2.2.0 org.apache.spark spark-streaming-kafka-0-10_2.11 2.2.0 org.scalikejdbc scalikejdbc_2.11 3.1.0 org.scalikejdbc scalikejdbc-config_2.11 3.1.0 org.apache.spark spark-sql_2.11 2.2.0 org.apache.spark spark-hive_2.11 2.2.0 org.apache.spark spark-graphx_2.11 2.2.0 com.alibaba fastjson 1.2.69 ch.hsr geohash 1.3.0 org.mongodb.spark mongo-spark-connector_2.11 2.2.0 com.alibaba druid 1.1.10 redis.clients jedis 2.9.3 org.apache.logging.log4j log4j-api-scala_2.11 11.0 net.alchim31.maven scala-maven-plugin 3.2.2 compile testCompile org.apache.maven.plugins maven-shade-plugin 2.4.3 package shade cn.kgc.kafak.demo.ThreadProducer *:* META-INF/*.SF NF/*.DSAMETA-INF/*.RSA
ActiveRetentionDemo(加载工具类计算活跃用户留存)
package ActiveRetentionDemo
//活跃留存
import common.utils.DateUtilDemo
import org.apache.spark.sql.{SaveMode, SparkSession}object ActiveRetentionDemo {/**** 从hive中读取数据* 经过处理首* 写入hive中** @param args*///# hive --service metastore hive启动此项服务def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[*]").appName("test").enableHiveSupport()// .config("mapreduce.app-submission.cross-platform", "true") //在windows下.config("hive.metastore.uris", "thrift://192.168.58.200:9083").getOrCreate()import spark.implicits._val df = spark.sql("select * from testhivedb.active_range_test where dt='2023-03-10'")val filtered = df.where("dt='2023-03-10' and range_end='9999-12-31'")val result = filtered.flatMap(row => {val userId = row.getAs[String]("user_id")val startDate = row.getAs[String]("range_start")val dt = row.getAs[String]("dt")import common.utils._// dt-1 dateSub(date,days)val endDate = DateUtil.addDays(-1,dt)val days = DateUtil.getDayDiff(startDate, endDate)for (i <- 0L to days) yield (userId, DateUtil.addDays(i.toInt, startDate))}).toDF("user_id", "dt")//.show()/**上面运行的结果*** +-------+----------+* |user_id| dt|* +-------+----------+* | c1|2023-03-07|* | c1|2023-03-08|* | c1|2023-03-09|* | d1|2023-03-05|* | d1|2023-03-06|* | d1|2023-03-07|* | d1|2023-03-08|* | d1|2023-03-09|* +-------+----------+*/// result.show() //或者执行此项也可以得到上面的结果result.createTempView("v_result")spark.sql("""||select|dt as dt,|datediff('2023-03-11',dt) as days,|count(1) as counts|from v_result|group by dt|||||""".stripMargin).show()spark.stop()
//结果://+----------+----+------+//| dt|days|counts|//+----------+----+------+//|2023-03-06| 5| 1|//|2023-03-05| 6| 1|//|2023-03-07| 4| 2|//|2023-03-08| 3| 2|//|2023-03-09| 2| 2|//+----------+----+------+}
}
DateUtilDemo(工具类)
package common.utilsimport org.apache.commons.lang.time.DateFormatUtils
import org.apache.commons.lang3.time.DateUtilsobject DateUtilDemo {def getDayDiff(start: String, end: String): Long = {val startDate = DateUtils.parseDate(start, "yyyy-MM-dd")val endDate = DateUtils.parseDate(end, "yyyy-MM-dd")var diff: Long = 0if (startDate != null && endDate != null) {diff = (endDate.getTime - startDate.getTime) / DateUtils.MILLIS_PER_DAY}diff}def addDays(days: Int, start: String): String = {val startDate = DateUtils.parseDate(start, "yyyy-MM-dd")val result = DateUtils.addDays(startDate, days)DateFormatUtils.format(result, "yyyy-MM-dd")}def main(args: Array[String]): Unit = {// println(getDayDiff("2023-3-5", "2023-3-12")) //第二个值减第一个值返回差 结果:7// println(addDays(1, "2023-03-12")) // +1天 结果:2023-03-13}}
报错:WARN [hive.metastore] - Failed to connect to the MetaStore Server...
org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection refused: connect
解决办法:
在hive服务器上运行hive --service metastore即可
linux>hive --service metastore
报错2:Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.base.Stopwatch.elapsedMillis()J
原因:Spark2.4.8没有集成Google Guava库或者Guava版本过高导致(16.0版本已经删除了elapsedMillis方法)
解决:有两种方法:
方法一:需要在pom.xml中添加Google Guava库,添加如下:
com.google.guava guava 15.0
方法二(建议使用):需要在pom.xml中添加HDFS客户端依赖,添加如下:
org.apache.hadoop hadoop-client 2.7.3
spark写好后打包传入hive服务器上
--创建自定义函数
hive>add jar /home/vagrant/udf-demo-1.0-SNAPSHOT-jar-with-dependencies.jar;
hive>create temporary function dateDivide as 'ActionDivideDemo.ActionDivideDemo';
--测试
hive>select dateDivide(7,'2023-03-12');返回:+----------------------------------------------------+--+
| _c0 |
+----------------------------------------------------+--+
| 2023-03-10,2023-03-09,2023-03-08,2023-03-07,2023-03-06,2023-03-05,2023-03-04 |
+----------------------------------------------------+--+
1 row selected (0.483 seconds)
--计算活跃用户留存
hive>with one as(
select
user_id,
first_dt,
range_start,
range_end
from active_range_test
where dt = '2023-03-10' and range_end='9999-12-31'
),
two as(
select
user_id,
split(dateDivide(datediff('2023-03-11',range_start),'2023-03-11'),',') as days
from one
),
three as(
select
dt
from two
lateral view explode(days) z as dt
)
select
dt,
datediff('2023-03-11',dt)-1 as days,
count(1) as users
from three
group by dt;
返回结果:+-------------+-------+--------+--+
| dt | days | users |
+-------------+-------+--------+--+
| 2023-03-04 | 6 | 1 |
| 2023-03-05 | 5 | 1 |
| 2023-03-06 | 4 | 2 |
| 2023-03-07 | 3 | 2 |
| 2023-03-08 | 2 | 2 |
| 2023-03-09 | 1 | 2 |
+-------------+-------+--------+--+