带你体验数据湖DLI Flink SQL进行电商实时业务数据分析丨【我的华为云体验之旅】

举报
zekelove 发表于 2021/12/18 23:08:19 2021/12/18
【摘要】 带你体验一下数据湖DLI,Flink SQL进行电商实时业务数据分析,过程还是比较复杂,所需的资源也比较多,需要提前准备ECS(弹性云服务器)、VPC(虚拟私有云)、DMS(分布式消息服务)、RDS(云数据库)、DLI(数据湖探索)、EIP(弹性公网IP),如果这些资源没有使用过最好先了解一下再来体验。

今天就来带你体验一下数据湖DLI,Flink SQL进行电商实时业务数据分析,如果使用华为云上的帮助文档,可能你会遇到一些问题,下面就把体验过程中遇到的一些问题分享给大家,减少走弯路。

华为云的官方地址:https://support.huaweicloud.cn/bestpractice-dli/dli_05_0006.html

目标任务

使用DLI Flink完成电商业务实时数据的分析处理,获取各个渠道的销售汇总数据。

资源准备

我们需要创建:ECS(弹性云服务器),VPC(虚拟私有云)、DMS(分布式消息服务)、RDS(云数据库),DLI(数据湖探索)、EIP(弹性公网IP),所有资源都要在同一个区域,这样相互之间才能连通。

如果不想开通公网IP,提前下载:JDK,Kafka客户端,通过 ECS 远程登录进行上传并安装。

JDK1.8下载地址https://www.oracle.com/java/technologies/downloads/#java8

Kafka 1.1.0版本实例的下载地址:https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz

Kafka 2.3.0版本实例的下载地址:https://archive.apache.org/dist/kafka/2.3.0/kafka_2.11-2.3.0.tgz

ECS远程登录

安装JDK

# 两种安装方式 
tar -zxf jdk-8u144-linux-x64.tgz
rpm -ivh jdk-8u144-linux-x64.rpm

ECS安装Java JDK或JRE,配置JAVA_HOME与PATH环境变量,使用命令 vi .bash_profile 打开文件,添加如下行:

export JAVA_HOME=/opt/java/jdk1.8.0_144
export PATH=$JAVA_HOME/bin:$PATH

执行source .bash_profile命令使修改生效

# 验证是否安装成功
java -version

数据库创建表

先创建一个数据库,打开SQL查询窗口,执行下面创建表的脚本。

CREATE TABLE `trade_channel_collect` (
  `begin_time` varchar(32) DEFAULT NULL,
  `channel_name` varchar(32) DEFAULT NULL,
  `channel_code` varchar(32) DEFAULT NULL,
  `cur_gmv` double unsigned DEFAULT NULL,
  `cur_order_user_count` bigint(20) unsigned DEFAULT NULL COMMENT '付款人数',
  `cur_order_count` bigint(20) unsigned DEFAULT NULL COMMENT '付款订单数',
  `last_pay_time` varchar(32) DEFAULT NULL,
  `flink_current_time` varchar(32) DEFAULT NULL COMMENT 'Flink数据处理时间'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `trade_order_detail_info` (
  `order_id` varchar(32) DEFAULT NULL COMMENT '订单ID',
  `order_channel` varchar(32) DEFAULT NULL COMMENT '订单来源渠道',
  `order_time` varchar(32) DEFAULT NULL COMMENT '订单时间',
  `pay_amount` double unsigned DEFAULT NULL COMMENT '金额',
  `real_pay` double unsigned DEFAULT NULL COMMENT '支付金额',
  `pay_time` varchar(32) DEFAULT NULL,
  `user_id` varchar(32) DEFAULT NULL,
  `user_name` varchar(32) DEFAULT NULL,
  `area_id` varchar(32) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

创建Kafka

创建好了Kafka,点名称进入后创建Topic。

创建Flink作业

打开数据湖 DLI,作业管理里面创建Flink作业。

创建作业的时候右侧“所属队列”要选择所创建的DLI通用队列,如果该步骤无法选择创建的CCE队列,则可能是因为没有提前开通CCE队列使用权限。请在官网提工单申请开通CCE队列使用权限后,再重新创建DLI CCE队列,继续后续步骤。

--********************************************************************--
-- 数据源:trade_order_detail_info (订单详情宽表)
--********************************************************************--
create table trade_order_detail (
  order_id string,      -- 订单ID
  order_channel string,   -- 渠道
  order_time string,     -- 订单创建时间
  pay_amount double,     -- 订单金额
  real_pay double,      -- 实际付费金额
  pay_time string,      -- 付费时间
  user_id string,      -- 用户ID
  user_name string,     -- 用户名
  area_id string       -- 地区ID
) with (
  "connector.type" = "kafka",
  "connector.version" = "0.10",
  "connector.properties.bootstrap.servers" = "192.168.0.69:9092", -- Kafka连接地址
  "connector.properties.group.id" = "kafka-test",   -- Kafka groupID
  "connector.topic" = "trade_order_detail_info",     -- Kafka topic
  "format.type" = "json",
  "connector.startup-mode" = "latest-offset"
);
 
--********************************************************************--
-- 结果表:trade_channel_collect (各渠道的销售总额实时统计)
--********************************************************************--
create table trade_channel_collect(
  begin_time string,       --统计数据的开始时间
  channel_code string,      -- 渠道编号
  channel_name string,      -- 渠道名
  cur_gmv double,         -- 当天GMV
  cur_order_user_count bigint, -- 当天付款人数
  cur_order_count bigint,    -- 当天付款订单数
  last_pay_time string,     -- 最近结算时间
  flink_current_time string,
  primary key (begin_time, channel_code) not enforced
) with (
  "connector.type" = "jdbc",
  "connector.url" = "jdbc:mysql://192.168.0.241:3306/db_csdn_zeke",    -- mysql连接地址,jdbc格式
  "connector.table" = "trade_channel_collect",            -- mysql表名
  "connector.driver" = "com.mysql.jdbc.Driver",
  "connector.username" = "root",                    -- mysql用户名
  "connector.password" = "Passw0rd",                   -- mysql密码
  "connector.write.flush.max-rows" = "1000",
  "connector.write.flush.interval" = "1s"
);
 
--********************************************************************--
-- 临时中间表
--********************************************************************--
create view tmp_order_detail
as
select *
    , case when t.order_channel not in ("webShop", "appShop", "miniAppShop") then "other"
           else t.order_channel end as channel_code --重新定义统计渠道 只有四个枚举值[webShop、appShop、miniAppShop、other]
    , case when t.order_channel = "webShop" then _UTF16"网页商城"
           when t.order_channel = "appShop" then _UTF16"app商城"
           when t.order_channel = "miniAppShop" then _UTF16"小程序商城"
           else _UTF16"其他" end as channel_name --渠道名称
from (
    select *
        , row_number() over(partition by order_id order by order_time desc ) as rn --去除重复订单数据
        , concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") as begin_time
        , concat(substr("2021-03-25 12:03:00", 1, 10), " 23:59:59") as end_time
    from trade_order_detail
    where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") --取今天数据,为了方便运行,这里使用"2021-03-25 12:03:00"替代cast(LOCALTIMESTAMP as string)
    and real_pay is not null
) t
where t.rn = 1;
 
-- 按渠道统计各个指标
insert into trade_channel_collect
select
      begin_time  --统计数据的开始时间
    , channel_code
    , channel_name
    , cast(COALESCE(sum(real_pay), 0) as double) as cur_gmv --当天GMV
    , count(distinct user_id) as cur_order_user_count --当天付款人数
    , count(1) as cur_order_count --当天付款订单数
    , max(pay_time) as last_pay_time --最近结算时间
    , cast(LOCALTIMESTAMP as string) as flink_current_time --flink任务中的当前时间
from tmp_order_detail
where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00")
group by begin_time, channel_code, channel_name;

注意修改地方:Kafka连接配置,数据库连接配置

创建队列

创建队列一定要在所有准备工作完成后在创建,它的按需收费计算规则是创建就收费,并且以整点为结算时间,并不是按照使用时长

创建跨源连接

Kafka客户端发送数据

使用Kafka客户端向指定topic发送数据,模拟实时数据流,命令如下:

# 进入目录 kafka_2.11-2.3.0/bin
# sh ./kafka-console-producer.sh --broker-list kafka连接地址 --topic 指定topic
sh ./kafka-console-producer.sh --broker-list 192.168.0.69:9092 --topic trade_order_detail_info
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
{"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
{"order_id":"202103252020200001", "order_channel":"webShop", "order_time":"2021-03-24 20:20:20", "pay_amount":"600.00", "real_pay":"480.00", "pay_time":"2021-03-25 00:00:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}
{"order_id":"202103260808080001", "order_channel":"webShop", "order_time":"2021-03-25 08:08:08", "pay_amount":"300.00", "real_pay":"240.00", "pay_time":"2021-03-25 08:10:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}
{"order_id":"202103261313130001", "order_channel":"webShop", "order_time":"2021-03-25 13:13:13", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-25 16:16:16", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}
{"order_id":"202103270606060001", "order_channel":"appShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"50.50", "real_pay":"50.50", "pay_time":"2021-03-25 06:07:00", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103270606060002", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"66.60", "real_pay":"66.60", "pay_time":"2021-03-25 06:07:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
{"order_id":"202103270606060003", "order_channel":"miniAppShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"88.80", "real_pay":"88.80", "pay_time":"2021-03-25 06:07:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
{"order_id":"202103270606060004", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"99.90", "real_pay":"99.90", "pay_time":"2021-03-25 06:07:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}

注意:在ECS服务器命令窗口输入JSON数据,数据不能存在换行。

Flink作业启动后会自动执行

数据库查看结果

温馨提示

文章内容如果写的存在问题欢迎留言指出,让我们共同交流,共同探讨,共同进步~~~

文章如果对你有帮助,动动你的小手点个赞,鼓励一下,给我前行的动力。

【我的华为云体验之旅】有奖征文火热进行中:https://bbs.huaweicloud.cn/blogs/309059

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。