网站日志实时分析之 Flink 处理实时热门和 PVUV 统计
实时热门统计
操作步骤:
- 先从Kafka读取消费数据
- 使用map算子对数据进行预处理
- 过滤数据,只留住pv数据
- 使用timewindow,每隔10秒创建一个20秒的window
- 然后将窗口自定义预聚合,并且兹定于窗口函数,按指定输入输出case操作数据
- 上面操作时候返回的是DataStream,那么就根据timestampEnd进行keyby
- 使用底层API操作,对每个时间窗口内的数据进行排序,取top
package com.ongbo.hotAnalysis
import java.sql.Timestamp
import java.util.Properties
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector
import scala.collection.mutable.ListBuffer
/*
*定义输入数据的样例类
*/
case class UserBehavior(userId: Long, itemId: Long, cateGoryId: Int,behavior: String, timestamp: Long)
//定义窗口聚合结果样例类
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)
object HotItems {
def main(args: Array[String]): Unit = {
//1:创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//设置为事件事件
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//2:读取数据
/*kafka源*/
val properties = new Properties()
properties.setProperty("bootstrap.servers","114.116.219.197:5008,114.116.220.98:5008,114.116.199.154:5008")
properties.setProperty("group.id","web-consumer-group")
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset","latest")
val dataStream = env.addSource(new FlinkKafkaConsumer[String]("weblog", new SimpleStringSchema(),properties))
// val dataStream = env.readTextFile("/Users/ongbo/Maven/bin/UserBehaviorAnalysis/HotItemAnalysis/src/main/resources/UserBehavior.csv")
.map(data =>{
System.out.println("data:"+data)
val dataArray = data.split(",")
// if(dataArray(0).equals("ij"))
UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
//3:transform处理数据
val processStream = dataStream
//筛选出埋点pv数据
.filter(_.behavior.equals("pv"))
//先对itemID进行分组
.keyBy(_.itemId)
//然后设置timeWindow,size为1小时,步长为5分钟的滑动窗口
.timeWindow(Time.seconds(20), Time.seconds(10))
//窗口聚合,按道理说应该不用窗口聚合,但是因为达到的数据可能时间顺序会扰乱,所以聚合后要keyby
.aggregate(new CountAgg(), new WindowResult())
.keyBy(_.windowEnd) //按照窗口分组
.process(new TopNHotItems(10))
//sink:输出数据
processStream.print("processStream::")
// dataStream.print()
//执行
env.execute("hot Items Job")
}
}
/*自定义预聚合函数*/
class CountAgg() extends AggregateFunction[UserBehavior, Long, Long]{
//累加器初始值
override def createAccumulator(): Long = 0
//每来一次就加一
override def add(in: UserBehavior, acc: Long): Long = acc+1
//
override def getResult(acc: Long): Long = acc
override def merge(acc: Long, acc1: Long): Long = acc + acc1
}
//自定义窗口函数,输出ItemViewCount
class WindowResult() extends WindowFunction[Long,ItemViewCount, Long, TimeWindow]{
override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
out.collect(ItemViewCount(key,window.getEnd,input.iterator.next()))
}
}
//自定义处理函数
class TopNHotItems(topsize: Int) extends KeyedProcessFunction[Long, ItemViewCount, String] {
private var itemState: ListState[ItemViewCount] = _
override def open(parameters: Configuration): Unit = {
itemState = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("item-state", classOf[ItemViewCount]))
}
override def processElement(value: ItemViewCount, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#Context, out: Collector[String]): Unit = {
//把每条数据存入状态列表
itemState.add(value)
//注册一个定时器
ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)
}
//定时器触发时,对所有的数据排序,并输出结果
override def onTimer(timestamp: Long, ctx: _root_.org.apache.flink.streaming.api.functions.KeyedProcessFunction[Long, _root_.com.ongbo.hotAnalysis.ItemViewCount, _root_.scala.Predef.String]#OnTimerContext, out: _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]): Unit = {
//将所有state中的数据取出,放到一个list Buffer中
val allItems: ListBuffer[ItemViewCount] = new ListBuffer()
import scala.collection.JavaConversions._
for(item <- itemState.get()){
allItems += item
}
//按照点计量count大小排序,sortBy默认是升序,并且取前三个
val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topsize)
//清空状态
itemState.clear()
//格式化输出排名结果
val result : StringBuilder = new StringBuilder
result.append("时间:").append(new Timestamp(timestamp - 1)).append("\n")
//输出每一个商品信息
for(i<- sortedItems.indices){
val currentItem = sortedItems(i)
result.append("No").append(i+1).append(":")
.append(" 商品ID:").append(currentItem.itemId)
.append(" 浏览量:").append(currentItem.count).append("\n")
}
result.append("============================\n")
//控制输出频率
Thread.sleep(1000)
out.collect(result.toString())
}
}
/*自定义预聚合函数计算平均数*/
class AverageAgg() extends AggregateFunction[UserBehavior, (Long,Int), Double]{
override def createAccumulator(): (Long, Int) = (0L,0)
override def add(in: UserBehavior, acc: (Long, Int)): (Long, Int) = (acc._1+in.timestamp, acc._2+1)
override def getResult(acc: (Long, Int)): Double = acc._1 /acc._2
override def merge(acc: (Long, Int), acc1: (Long, Int)): (Long, Int) = (acc._1+acc1._1, acc._2+acc1._2)
}
实时PV统计
这里按道理应该也要从Kafka读取数据的,但是这里暂时先从本地读,因为当时本地网络的原因,暂时不在服务器上创建数据,而直接用本地的。
这个很简单,直接创建滚动窗口,从而能够计算一个小时的PV,然后每隔一个小时更新一次。
package com.ongbo.NetWorkFlow_Analysis
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
/*
*定义输入数据的样例类
*/
case class UserBehavior(userId: Long, itemId: Long, cateGoryId: Int,behavior: String, timestamp: Long)
object PageVies {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
//用相对路径定义数据集
val resource = getClass.getResource("/UserBehavior.csv")
val dataStream = env.readTextFile(resource.getPath)
.map(data =>{
val dataArray = data.split(",")
UserBehavior(dataArray(0).trim.toLong,dataArray(1).trim.toLong,dataArray(2).trim.toInt,dataArray(3).trim,dataArray(4).trim.toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
.filter(_.behavior.equals("pv"))
.map(data => ("pv", 1))
.keyBy(_._1)
.timeWindow(Time.hours(1))
.sum(1)
dataStream.print("pv count")
env.execute("PV")
}
}
实时UV统计:布隆过滤器
我们统计UV需要注意,很多重复的user会占用到内存,所以我们采用布隆过滤器优化,减少Flink缓存user从而降低性能。而且将数据count保存在Redis,可以给后端使用的。
package com.ongbo.NetWorkFlow_Analysis
import com.ongbo.NetWorkFlow_Analysis.UniqueView.getClass
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedis
object UvWithBloom {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
//用相对路径定义数据集
val resource = getClass.getResource("/UserBehavior.csv")
val dataStream = env.readTextFile("/Users/ongbo/Maven/bin/UserBehaviorAnalysis/NetWorkFlowAnalysis/src/main/resources/UserBehavior.csv")
.map(data =>{
val dataArray = data.split(",")
UserBehavior(dataArray(0).trim.toLong,dataArray(1).trim.toLong,dataArray(2).trim.toInt,dataArray(3).trim,dataArray(4).trim.toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
.filter(_.behavior.equals("pv"))
.map( data => ("dummyKey",data.userId))
.keyBy(_._1)
.timeWindow(Time.hours(1))
.trigger(new MyTrigger())
.process(new UvCountWithBloom())
dataStream.print()
env.execute()
}
}
//自定义窗口触发器
class MyTrigger() extends Trigger[(String,Long),TimeWindow]{
override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
//每来一条数据就直接触发窗口操作,并清空所有状态
TriggerResult.FIRE_AND_PURGE
}
override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE
override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE
override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}
}
class UvCountWithBloom() extends ProcessWindowFunction[(String,Long),UvCount,String, TimeWindow] {
// 定义Redis连接
lazy val jedis = new Jedis("114.116.219.97",5000)
//29位,也就是64M
lazy val bloom = new Bloom(1 << 29)
override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {
//位图的存储方式 , key是windowwen,value是位图
val storeKey = context.window.getEnd.toString
var count = 0L
//把每个窗口的count值,也存入Redis表里,存放内容位(windowEnd,uccount),所以要先从Redis中读取
if(jedis.hget("count",storeKey) != null){
// System.out.println(v)
count = jedis.hget("count",storeKey).toLong
}
//用布隆过滤器判断当前用户是否已经存在
val userId = elements.last._2.toString
val offset = bloom.hash(userId, 61)
//定义一个标志位,判断Redis位图中有没有这一位
val isExist = jedis.getbit(storeKey, offset)
if(!isExist){
//如果不存在位图对应位置变成1,count+1
jedis.setbit(storeKey,offset,true)
jedis.hset("count",storeKey,(count+1).toString)
out.collect(UvCount(storeKey.toLong,count+1))
}else{
out.collect(UvCount(storeKey.toLong,count))
}
}
}
class Bloom(size: Long) extends Serializable{
//位图大小
private val cap = if(size>0) size else 1 << 27
//定义Hash函数
def hash(value: String, seed: Int) : Long = {
var result:Long = 0L
for(i <- 0 until value.length){
result = result * seed + value.charAt(i)
}
result & (cap-1)
}
}
欢迎点赞+收藏+转发朋友圈素质三连