基于 Flink 实现的商品实时推荐系统(附源码)

前言

之前一直给大家推荐的是关于 Flink 的介绍和知识点,以及关于 Flink 里面使用这些知识点的一些简单的 demo,地址在:https://github.com/zhisheng17/flink-learning 。总的来说,现在还挺缺这种真正实战的项目分享出来,尤其是把源代码分享出来的,近期我在 GitHub 观察到一个不错的 Flink 项目,然后也和作者交流了下,于是在这里做一个分享。所以,那些平时问我有没有 Flink 项目的可以看过来了。地址在 :https://github.com/CheckChe0803/flink-recommandSystem-demo 下面介绍下这个项目。

1. 系统架构 v2.0

  • 1.1 系统架构图

  • 1.2模块说明

  • a.在日志数据模块(flink-2-hbase)中,又主要分为6个Flink任务:

  • 用户-产品浏览历史 -> 实现基于协同过滤的推荐逻辑

    通过Flink去记录用户浏览过这个类目下的哪些产品,为后面的基于Item的协同过滤做准备
    实时的记录用户的评分到Hbase中,为后续离线处理做准备.

    数据存储在Hbase的p_history表

  • 用户-兴趣 -> 实现基于上下文的推荐逻辑

    根据用户对同一个产品的操作计算兴趣度,计算规则通过操作间隔时间(如购物 - 浏览 < 100s)则判定为一次兴趣事件
    通过Flink的ValueState实现,如果用户的操作Action=3(收藏),则清除这个产品的state,如果超过100s没有出现Action=3的事件,也会清除这个state

    数据存储在Hbase的u_interest表

  • 用户画像计算 -> 实现基于标签的推荐逻辑

    v1.0按照三个维度去计算用户画像,分别是用户的颜色兴趣,用户的产地兴趣,和用户的风格兴趣.根据日志不断的修改用户画像的数据,记录在Hbase中.

    数据存储在Hbase的user表

  • 产品画像记录 -> 实现基于标签的推荐逻辑

    用两个维度记录产品画像,一个是喜爱该产品的年龄段,另一个是性别

    数据存储在Hbase的prod表

  • 事实热度榜 -> 实现基于热度的推荐逻辑

    通过Flink时间窗口机制,统计当前时间的实时热度,并将数据缓存在Redis中.

    通过Flink的窗口机制计算实时热度,使用ListState保存一次热度榜

    数据存储在redis中,按照时间戳存储list

  • 日志导入

    从Kafka接收的数据直接导入进Hbase事实表,保存完整的日志log,日志中包含了用户Id,用户操作的产品id,操作时间,行为(如购买,点击,推荐等).

    数据按时间窗口统计数据大屏需要的数据,返回前段展示

    数据存储在Hbase的con表

  • b. web模块

  • 前台用户界面

    该页面返回给用户推荐的产品list

  • 后台监控页面

    该页面返回给管理员指标监控

2.推荐引擎逻辑说明

  • 2.1 基于热度的推荐逻辑

    现阶段推荐逻辑图

    根据用户特征,重新排序热度榜,之后根据两种推荐算法计算得到的产品相关度评分,为每个热度榜中的产品推荐几个关联的产品

  • 2.2 基于产品画像的产品相似度计算方法

    基于产品画像的推荐逻辑依赖于产品画像和热度榜两个维度,产品画像有三个特征,包含color/country/style三个角度,通过计算用户对该类目产品的评分来过滤热度榜上的产品

    在已经有产品画像的基础上,计算item与item之间的关联系,通过余弦相似度来计算两两之间的评分,最后在已有物品选中的情况下推荐关联性更高的产品.

  • 2.3 基于协同过滤的产品相似度计算方法

    根据产品用户表(Hbase) 去计算公式得到相似度评分:

3. 前台推荐页面

当前推荐结果分为3列,分别是热度榜推荐,协同过滤推荐和产品画像推荐

4. 后台数据大屏

在后台上显示推荐系统的实时数据,数据来自其他Flink计算模块的结果.目前包含热度榜和1小时日志接入量两个指标. 
真实数据位置在resource/database.sql

5. 部署说明

以下的部署均使用Docker,对于搭建一套复杂的系统,使用docker来部署各种服务中间件再合适不过了。这里有一套简单的Docker入门系列


需要的服务有:Mysql、Redis、Hbase和Kafka

Mysql

1docker pull mysql:5.7
2
3docker run --name local-mysql -p 3308:3306  -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7

简单介绍一下命令,先拉取镜像,然后指定参数启动容器

  • --name local-mysql 容器名为local-mysql

  • -p 3308:3306 宿主机与容器的端口映射为3308:3306 即你访问宿主机的3308就是访问容器的3306端口,需要理解下

  • -e MYSQL_ROOT_PASSWORD=123456 容器内的变量名MYSQL_ROOT_PASSWORD对应的值为123456 即mysql的root密码为123456

  • -d 后台启动

Redis

1$ docker run --name local-redis -p 6379:6379 -d redis

Hbase

 1docker pull harisekhon/hbase
2
3docker run -d -h base-server \
4        -p 2181:2181 \
5        -p 8080:8080 \
6        -p 8085:8085 \
7        -p 9090:9090 \
8        -p 9000:9000 \
9        -p 9095:9095 \
10        -p 16000:16000 \
11        -p 16010:16010 \
12        -p 16201:16201 \
13        -p 16301:16301 \
14        -p 16020:16020\
15        --name hbase \
16        harisekhon/hbase

Hbase用到的端口,可以参考一下详细教程
启动成功之后我们可以访问http://localhost:16010/master-status登录Web界面

:point_right: 快速实现SpringBoot集成Hbase

Kafka

考虑到更好的区别这些端口,我这里启动了一个虚拟机,在虚拟机中在用dokcer安装Kafka,过程如下

 1## pull images
2docker pull wurstmeister/zookeeper
3docker pull wurstmeister/kafka
4docker pull sheepkiller/kafka-manager
5
6docker run -d --name zookeeper --publish 2181:2181 \
7  --volume /etc/localtime:/etc/localtime \
8  --restart=always \
9  wurstmeister/zookeeper
10
11
12## run kafka
13docker run --name kafka \
14  -p 9092:9092 \
15  --link zookeeper:zookeeper \
16  -e KAFKA_ADVERTISED_HOST_NAME=192.168.1.8 \
17  -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
18  -d  wurstmeister/kafka  
19
20## run kafka manager
21docker run -d \
22  --link zookeeper:zookeeper \
23  -p 9000:9000  \
24  -e ZK_HOSTS="zookeeper:2181" \
25  hlebalbau/kafka-manager:stable \
26  -Dpidfile.path=/dev/null

如果想设置webui 的权限,可以这样设置

1KAFKA_MANAGER_AUTH_ENABLED: "true"
2KAFKA_MANAGER_USERNAME: username
3KAFKA_MANAGER_PASSWORD: password

容器启动成功之后就可以在页面访问localhost:9000查看Kafkfa的管理界面。

:point_right: 快速实现SpringBoot集成Kafka

启动服务

以下的操作是在IDEA下完成

1、将上述部署的几个服务的ip和端口号分别配置在flink-2-hbase和web服务中;

2、在flink-2-hbase中的根目录执行mvn clean install,目的是将其打包并放置在本地仓库中;

3、分别启动task目录下的task(直接在idea中右键启动就行了);

4、把SchedulerJob启动起来,定时的去计算协同过滤和用户画像所需要的分数;

5、在idea中打开web项目,等待其自动引入flink-2-hbase生成的jar包之后,再启动服务就ok了;

注意:所有的服务启动后,因为没有任何的点击记录,所以就是随机从数据库取得产品,这里需要你在推荐页面随便点击,等有了一定的历史数据之后,就能实现实时推荐的效果了

6. 下一步工作

  1. 添加flink任务监控

  2. 完善数据大屏,显示更详细的指标

  3. 统计召回率/准确率等业务指标

END


关注我

公众号(zhisheng)里回复 面经、ES、Flink、 Spring、Java、Kafka、监控 等关键字可以查看更多关键字对应的文章

Flink 实战

1、《从0到1学习Flink》—— Apache Flink 介绍
2、《从0到1学习Flink》—— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门
3、《从0到1学习Flink》—— Flink 配置文件详解
4、《从0到1学习Flink》—— Data Source 介绍
5、《从0到1学习Flink》—— 如何自定义 Data Source ?
6、《从0到1学习Flink》—— Data Sink 介绍
7、《从0到1学习Flink》—— 如何自定义 Data Sink ?
8、《从0到1学习Flink》—— Flink Data transformation(转换)
9、《从0到1学习Flink》—— 介绍 Flink 中的 Stream Windows
10、《从0到1学习Flink》—— Flink 中的几种 Time 详解
11、《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 ElasticSearch
12、《从0到1学习Flink》—— Flink 项目如何运行?
13、《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 Kafka
14、《从0到1学习Flink》—— Flink JobManager 高可用性配置
15、《从0到1学习Flink》—— Flink parallelism 和 Slot 介绍
16、《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL
17、《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ
18、《从0到1学习Flink》—— 你上传的 jar 包藏到哪里去了
19、大数据“重磅炸弹”——实时计算框架 Flink
20、《Flink 源码解析》—— 源码编译运行
21、为什么说流处理即未来?
22、OPPO数据中台之基石:基于Flink SQL构建实数据仓库
23、流计算框架 Flink 与 Storm 的性能对比
24、Flink状态管理和容错机制介绍
25、原理解析 | Apache Flink 结合 Kafka 构建端到端的 Exactly-Once 处理
26、Apache Flink 是如何管理好内存的?
27、从0到1学习Flink》——Flink 中这样管理配置,你知道?
28、从0到1学习Flink》——Flink 不可以连续 Split(分流)?
29、Flink 从0到1学习—— 分享四本 Flink 的书和二十多篇 Paper 论文
30、360深度实践:Flink与Storm协议级对比
31、Apache Flink 1.9 重大特性提前解读
32、如何基于Flink+TensorFlow打造实时智能异常检测平台?只看这一篇就够了
33、美团点评基于 Flink 的实时数仓建设实践
34、Flink 灵魂两百问,这谁顶得住?
35、一文搞懂 Flink 的 Exactly Once 和 At Least Once
36、你公司到底需不需要引入实时计算引擎?
37、Flink 从0到1学习 —— 如何使用 Side Output 来分流?
38、一文让你彻底了解大数据实时计算引擎 Flink

Flink 源码解析



知识星球里面可以看到下面文章

喜欢就点个"在看"呗^_^