大数据分析Day04使用DLI Flink SQL进行电商实时业务数据分析
大数据分析Day04使用DLI Flink SQL进行电商实时业务数据分析
场景描述
-
多种方式接入:web方式访问、app的方式访问、微信小程序访问
-
电商平台则需要每天统计各平台的实时访问数据量、订单数、访问人数等等指标,从而能在显示大屏上实时展示相关数据
-
假设平台已经将每个商品的订单信息实时写入Kafka中,这些信息包括订单ID、订单生成的渠道(即web方式、app方式等)、订单时间、订单金额、折扣后实际支付金额、支付时间、用户ID、用户姓名、订单地区ID等信息。
-
根据当前可以获取到的业务数据,实时统计每种渠道的相关指标,输出存储到数据库中,并进行大屏展示。
场景方案
步骤1:注册账号
- 注册华为云账号并进行实名认证
- 已完成华为云账号注册和实名认证,可跳过该步骤
步骤2:创建资源
创建资源时注意以下几点:
-
Kafka与MySQL实例创建时需指定VPC,该VPC需提前创建好,且网段不与后续创建的DLI队列网段冲突.
-
Kafka+MySQl : 192, DLI:172
-
【按需】模式的购买产品
-
(1)创建VPC:【VPC】–【创建虚拟私有云】
-
(2)购买队列:【DLI】–【队列管理】–【购买队列】
步骤3:创建DMS topic并获取连接地址
- 选择“分布式消息服务DMS”,单击进入DMS服务、控制台页面。
- 在“Kafka专享版”页面找到创建的Kafka实例。单击“基本信息”,获取“连接地址”
10.0.0.202:9092,10.0.0.83:9092,10.0.0.138:9092
- 单击“Topic管理”,创建一个Topic:trade_order_detail_info
步骤4:创建RDS数据库表
- 购买MySQL数据库
- 登录
- 创建数据库
- 创建表
DROP TABLE `dli-demo`.`trade_channel_collect`;
CREATE TABLE `dli-demo`.`trade_channel_collect` (
`begin_time` VARCHAR(32) NOT NULL,
`channel_code` VARCHAR(32) NOT NULL,
`channel_name` VARCHAR(32) NULL,
`cur_gmv` DOUBLE UNSIGNED NULL,
`cur_order_user_count` BIGINT UNSIGNED NULL,
`cur_order_count` BIGINT UNSIGNED NULL,
`last_pay_time` VARCHAR(32) NULL,
`flink_current_time` VARCHAR(32) NULL,
PRIMARY KEY (`begin_time`, `channel_code`)
) ENGINE = InnoDB
DEFAULT CHARACTER SET = utf8mb4
COLLATE = utf8mb4_general_ci
COMMENT = '各渠道的销售总额实时统计';
步骤5:创建增强型跨源打通网络
-
【DLI】,找到【数据湖探索】,【全局配置】-【服务授权】,选中【VPC Administrator】
-
【跨源连接】-【增强型跨源】-【创建】
-
【绑定队列】选择您所创建的通用队列,【虚拟私有云】和【子网】选择 Kafka 与 MySQL 实例所在的 VPC 与子网
-
测试队列与RDS、DMS实例连通性。点击【队列管理】,选择您所使用的队列,点击【更多】-【测试地址连通性】。输入前序步骤3-2获取的DMS Kafka实例连接地址和步骤4-2获取的RDS MySQL实例内网地址,进行网络连通性测试。测试结果显示可达,则DLI队列与Kafka、MySQL实例的网络已经联通。
注:10没有调通,后重新改为192(MySQL和Kafka)和172(DLI)
-
配置安全组 3306 9092
-
修改后可达
步骤6:创建并提交Flink作业
– 【DLI控制台】–【作业管理】,选择【Flink作业】–【创建作业】,选择作业类型为:Flink OpenSource SQL,名称自定义。
- 注:Kafka连接地址,mysql连接地址、数据库名称,用户名和密码修改为自己前面的地址
--********************************************************************--
-- 数据源:trade_order_detail_info (订单详情宽表)
--********************************************************************--
create table trade_order_detail (
order_id string, -- 订单ID
order_channel string, -- 渠道
order_time string, -- 订单创建时间
pay_amount double, -- 订单金额
real_pay double, -- 实际付费金额
pay_time string, -- 付费时间
user_id string, -- 用户ID
user_name string, -- 用户名
area_id string -- 地区ID
) with (
"connector.type" = "kafka",
"connector.version" = "0.10",
"connector.properties.bootstrap.servers" = "xxxx:9092,xxxx:9092,xxxx:9092", -- Kafka连接地址
"connector.properties.group.id" = "trade_order", -- Kafka groupID
"connector.topic" = "trade_order_detail_info", -- Kafka topic
"format.type" = "json",
"connector.startup-mode" = "latest-offset"
);
--********************************************************************--
-- 结果表:trade_channel_collect (各渠道的销售总额实时统计)
--********************************************************************--
create table trade_channel_collect(
begin_time string, --统计数据的开始时间
channel_code string, -- 渠道编号
channel_name string, -- 渠道名
cur_gmv double, -- 当天GMV
cur_order_user_count bigint, -- 当天付款人数
cur_order_count bigint, -- 当天付款订单数
last_pay_time string, -- 最近结算时间
flink_current_time string,
primary key (begin_time, channel_code) not enforced
) with (
"connector.type" = "jdbc",
"connector.url" = "jdbc:mysql://xxxx:3306/xxxx", -- mysql连接地址,jdbc格式
"connector.table" = "xxxx", -- mysql表名
"connector.driver" = "com.mysql.jdbc.Driver",
"connector.username" = "xxx", -- mysql用户名
"connector.password" = "xxxx", -- mysql密码
"connector.write.flush.max-rows" = "1000",
"connector.write.flush.interval" = "1s"
);
--********************************************************************--
-- 临时中间表
--********************************************************************--
create view tmp_order_detail
as
select *
, case when t.order_channel not in ("webShop", "appShop", "miniAppShop") then "other"
else t.order_channel end as channel_code --重新定义统计渠道 只有四个枚举值[webShop、appShop、miniAppShop、other]
, case when t.order_channel = "webShop" then _UTF16"网页商城"
when t.order_channel = "appShop" then _UTF16"app商城"
when t.order_channel = "miniAppShop" then _UTF16"小程序商城"
else _UTF16"其他" end as channel_name --渠道名称
from (
select *
, row_number() over(partition by order_id order by order_time desc ) as rn --去除重复订单数据
, concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") as begin_time
, concat(substr("2021-03-25 12:03:00", 1, 10), " 23:59:59") as end_time
from trade_order_detail
where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") --取今天数据,为了方便运行,这里使用"2021-03-25 12:03:00"替代cast(LOCALTIMESTAMP as string)
and real_pay is not null
) t
where t.rn = 1;
-- 按渠道统计各个指标
insert into trade_channel_collect
select
begin_time --统计数据的开始时间
, channel_code
, channel_name
, cast(COALESCE(sum(real_pay), 0) as double) as cur_gmv --当天GMV
, count(distinct user_id) as cur_order_user_count --当天付款人数
, count(1) as cur_order_count --当天付款订单数
, max(pay_time) as last_pay_time --最近结算时间
, cast(LOCALTIMESTAMP as string) as flink_current_time --flink任务中的当前时间
from tmp_order_detail
where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00")
group by begin_time, channel_code, channel_name;
首先,我们先定义一个Kafka源表,用来从Kafka指定topic中读取消费数据;再定义一个结果表,用来通过JDBC向MySQL中写入结果数据。
创建源表和结果表以后,需要实现相应的处理逻辑,以实现各个指标的统计。
为了简化最终的处理逻辑,使用创建视图进行数据预处理。首先利用over窗口条件和过滤条件结合以去除重复数据(该方式是利用了top N的方法),同时利用相应的内置函数concat和substr将当天的00:00:00作为统计的开始时间,当天的23:59:59作为统计结束时间,并筛选出支付时间在当天凌晨00:00:00后的订单数据进行统计(为了方便模拟数据的构造,这里使用"2021-03-25 12:03:00"替代cast(LOCALTIMESTAMP as string),请注意)。然后根据这些数据的订单渠道利用内置的条件函数设置channel_code和channel_name的值,从而获取了源表中的字段信息,以及begin_time、end_time和channel_code、channel_name的值。
最后,我们根据需要对相应指标进行统计和筛选,并将结果写入到结果表中。
- 点赞
- 收藏
- 关注作者
评论(0)