保存每个mid的首次访问日期,每条进入该算子的访问记录,都会把mid对应的首次访问时间读取出来,跟当前日期进行比较,只有首次访问时间不为空,且首次访问时间早于当日的,则认为该访客是老访客,否则是新访客。
同时如果是新访客且没有访问记录的话,会写入首次访问时间。
is_new为0不用修复,已经访问过,状态是正确的;为1时才会出现状态不准确的情况。
日志格式如下:
// TODO 5 修复新老访客状态// 按照设备id进行分组// 匿名内部类
// jsonObjDS.keyBy(new KeySelector() {
// @Override
// public String getKey(JSONObject jsonObj) throws Exception {
// return jsonObj.getJSONObject("common").getString("mid");
// }
// });// 按照设备id进行分组// lambda表达式KeyedStream keyedDS = jsonObjDS.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"));// 修复,修改json中的某个属性值SingleOutputStreamOperator jsonObjWithNew = keyedDS.map(new RichMapFunction() {// 不能在声明的时候初始化状态,这时还不能获取到RuntimeContext// 富函数只有在调用到open方法时,才可以获取到RuntimeContextprivate ValueState lastVisitDateState;private SimpleDateFormat sdf;@Overridepublic void open(Configuration parameters) throws Exception {lastVisitDateState = getRuntimeContext().getState(new ValueStateDescriptor("lastVisitDateState", Types.STRING));sdf = new SimpleDateFormat("yyyyMMdd");}@Overridepublic JSONObject map(JSONObject jsonObj) throws Exception {String isNew = jsonObj.getJSONObject("common").getString("is_new");if (isNew.equals("1")) {// 如果之前系统判断是新访客,可能出现错误// 将之前的访问信息保存到状态中String lastVisitDate = lastVisitDateState.value();String curVisitDate = sdf.format(jsonObj.getLong("ts"));// 判断状态中的上次访日期是否为空if (lastVisitDate != null && lastVisitDate.length() > 0) {// 访问过// 判断是否在同一天访问if (!lastVisitDate.equals(curVisitDate)) {isNew = "0";jsonObj.getJSONObject("common").put("is_new", isNew);}} else {lastVisitDateState.update(curVisitDate);}}return jsonObj;}});
根据日志数据内容,将日志数据分为3类,页面日志、启动日志和曝光日志。页面日志输出到主流,启动日志输出到启动侧输出流,曝光日志输出到曝光日志侧输出流。
// TODO 6 按照日志类型对日志进行分流
// 启动 -- 启动侧输出流
// 曝光 -- 曝光侧输出流
// 页面 -- 主流
// 声明侧输出流标签
OutputTag startTag = new OutputTag("start"){};
OutputTag displayTag = new OutputTag("display"){};
// 使用Flink侧输出流进行分流
SingleOutputStreamOperator pageDS = jsonObjWithNew.process(new ProcessFunction() {@Overridepublic void processElement(JSONObject jsonObj, Context context, Collector out) throws Exception {// 获取启动jsonObjJSONObject startJsonObj = jsonObj.getJSONObject("start");String jsonStr = jsonObj.toJSONString();Long ts = jsonObj.getLong("ts");// 判断是否为启动日志if (startJsonObj != null && startJsonObj.size() > 0) {// 放到启动侧输出流context.output(startTag, jsonStr);} else {// 如果不是启动日志,那么其他日志都属于页面日志,放到主流中out.collect(jsonStr);String pageId = jsonObj.getJSONObject("page").getString("page_id");// 判断是否是曝光日志,曝光日志是一个数组JSONArray displaysArr = jsonObj.getJSONArray("displays");if (displaysArr != null && displaysArr.size() > 0) {// 遍历数据,获取每一条曝光数据for (int i = 0; i < displaysArr.size(); i++) {JSONObject displayJsonObj = displaysArr.getJSONObject(i);displayJsonObj.put("ts", ts);displayJsonObj.put("page_id", pageId);// 将曝光日志输出到曝光侧输出流context.output(displayTag, displayJsonObj.toJSONString());}}}}}
);
// 获取不同流数据,输出测试
DataStream startDS = pageDS.getSideOutput(startTag);
DataStream displayDs = pageDS.getSideOutput(displayTag);
pageDS.print("主流:");
startDS.print("启动流:");
displayDs.print("曝光流:");
开启需要的环境,运行程序,执行日志数据生成jar包,观察输出结果。
在MyKafkaUtil中封装获取kafka生产者的方法。
// 获取kafka的生产者
public static FlinkKafkaProducer getKafkaSink(String topic){return new FlinkKafkaProducer(KAFKA_SERVER,topic,new SimpleStringSchema());
}
// TODO 7 将不同流的数据写到kafka的dwd不同主题中
pageDS.addSink(MyKafkaUtil.getKafkaSink("dwd_page_log"));
startDS.addSink(MyKafkaUtil.getKafkaSink("dwd_start_log"));
displayDs.addSink(MyKafkaUtil.getKafkaSink("dwd_diaplay_log"));
# 开启三个kafka消费者
kfkcon.sh dwd_page_log
kfkcon.sh dwd_start_log
kfkcon.sh dwd_display_log
# 启动程序,生成日志数据,观察结果
结果如下图:
日志数据分流执行流程
需要启动的进程
生成模拟生成日志的jar包
将生成的日志发送给Nginx
Nginx接收到数据之后,进行请求转发,将请求发送给101,102,103上的日志采集服务
日志采集服务对数据进行输出、落盘以及发送到kafka的ods_base_log
BaseLogApp从ods_base_log读取数据
结构转换 String -> JSONObject
状态修复 分组、修复
分流
将分流后的数据写到kafka的dwd不同主题中