【最佳实践】使用Flink SQL实时分析日志信息
【摘要】 从DIS数据源读取数据,利用CS服务的Flink SQL作业,实时分析日志信息,并向OBS输出结果数据
场景概述
本次实践从DIS数据源读取数据,利用CS服务的Flink SQL作业,实时分析日志信息,并向OBS输出结果数据。
本次实践基本流程如下所示:
创建DIS通道和OBS桶
本次实践从DIS读取数据,流处理后将数据写入OBS,需先创建DIS输入通道和OBS桶。
- 创建DIS通道
- 登录公有云管理控制台。
- 在“服务列表”中,选择“EI企业智能 > 数据接入服务”,进入DIS管理控制台。
- 单击“购买接入通道”,进入“购买接入通道”页面,配置通道信息。
- 区域:我们需要将DIS,OBS和CS服务设置为统一区域,此处我们选择“华北-北京1”。
- 通道名称:input-dis。
- 源数据类型:选择CSV。
- 其他参数配置保持默认。
图1 创建DIS通道
- 单击“立即购买”,确认通道规格信息后,单击“提交”。
- 单击“返回通道列表”,我们可以看到已经创建成功的input-dis通道。此通道用来作为CS服务的输入通道。
- 创建OBS桶。
- 在“服务列表”中,选择“存储 > 对象存储服务”,进入OBS管理控制台。
- 单击“创建桶”,进入“创建桶”页面,配置桶信息。
- 区域:选择“华北-北京1”。
- 桶名称:output-obs。
- 其他参数配置保持默认。
图2 创建OBS桶
- 单击“立即创建”,我们可以在桶列表中看到已经创建成功的桶。
- 单击桶名称output-obs,然后在左侧导航栏中单击“对象”,单击“新建文件夹”,输入文件夹名称logInfos。在output-obs桶内创建logInfos目录,用来保存输出数据。
图3 创建桶文件夹
创建Flink SQL作业
- 在“服务列表”中,选择“EI企业智能 >实时流计算服务”,进入CS管理控制台。
如果是首次登录CS管理控制台,请先根据页面提示申请服务并授权。
- 单击“新建作业”,弹出“新建作业”窗口,配置作业信息。
- 类型:选择“Flink SQL作业“。
- 名称:test。
- 模板:选择“DIS-CS-OBS样例模板”。
- 其他参数保持默认。
图4 创建Flink SQL作业
- 单击“确认”,进入“编辑”页面,编辑作业。
在SQL编辑区域内可以看到“DIS-CS-OBS样例模板”的内容,本次演示就用该样例模板进行运行。
SQL编辑器中包含三部分内容:
- source数据源:在with 语句中配置,实现与DIS的“input-dis”通道对接,使CS服务可以从“input-dis”通道实时获取数据。需要配置以下参数:
- type = "dis" :类型选择dis。
- region = "cn-north-1" :作业所在的区域, cn-north-1 表示华北-北京一区域。
- channel = "input-dis" :DIS通道名称。
- partition_count = "1" :DIS通道的分区数。
- encode = "csv" :数据编码方式,选择CSV。
- field_delimiter = "\\|\\|" :当编码格式为CSV时,属性之间的分隔符。
- quote = "\u005c\u0022" :指定数据格式中的引用符号,在两个引用符号之间的属性分隔符会被当做普通字符处理。当引用符号为双引号时,请设置quote = "\u005c\u0022"进行转义。
- offset = "0" :偏移量,offset="0"时,表示CS服务从DIS服务的第0条数据开始处理。
详细信息请参见配置DIS输入流。
- sink输出源:在with 语句中配置,实现与OBS桶对接,使CS服务能将处理结果输出到OBS桶中。需要配置以下参数:
- type = "obs" :类型选择obs
- region = "cn-north-1" :作业所在的区域, cn-north-1 表示华北-北京一区域。
- encode = "csv" :数据编码方式,选择CSV。
- field_delimiter = "," :当编码格式为CSV时,属性之间的分隔符。
- row_delimiter = "\n" :行分隔符。
- obs_dir = "output-obs/logInfos" :文件存储目录,格式为{桶名}/{目录名}。
- file_prefix = "log_out" :输出文件名的前缀,默认为temp。
- rolling_size = "100m" :单个文件最大允许大小。
详细信息请参见配置OBS输出流。
- SQL 查询,示例如下:
INSERT INTO log_out SELECT http_host,forward_ip,cast(cast(msec * 1000 as bigint) + 28800000 as timestamp),status,request_length, bytes_sent,string_to_array(request, '\\ ')[1],string_to_array(request, '\\ ')[2],http_referer,http_user_agent, upstream_cache_status,upstream_status,request_time,cookie_DedeUserID_cookie_sid_sent_http_logdata,upstream_response_time, upstream_addr, case IP_TO_PROVINCE(forward_ip) when "Guangxi" then "Guangxi Zhuang Autonomous Region" when "Ningxia" then "Ningxia Hui Autonomous Region" when "Taiwan" then "Taiwan Province" when "Macao" then "Macau" else IP_TO_PROVINCE(forward_ip) end, case when http_user_agent like "%Chrome%" then "Chrome" when http_user_agent like "%Firefox%" then "Firefox" when http_user_agent like "%Safari%" then "Safari" else "Others" end FROM log_infos;
- source数据源:在with 语句中配置,实现与DIS的“input-dis”通道对接,使CS服务可以从“input-dis”通道实时获取数据。需要配置以下参数:
- 设置作业运行参数。
- SPU:Stream Processing Units 流处理单元,一个SPU为1核4G的资源,每SPU 0.5元/小时。最低2个SPU起。
- 并行数:Flink作业算子并行度,缺省为1。
- 开启Checkpoint:是否开启Flink快照。
- 保存作业日志:是否保存作业日志到OBS桶中。
- 开启作业异常告警:作业异常后是否推送SMN消息(邮件和短信)。
图5 设置作业运行参数
- 完成作业编辑并且设置运行参数后,单击“语义校验”,确保语义校验成功。只有语义校验成功后,才可以执行“调试”、“提交”或“启动”作业的操作。
- 语义校验成功后,单击“提交”,进入作业配置确认页面,作业配置确认无误后,单击“确认”将作业提交并启动。
提交作业后,系统将自动跳转到“作业管理”页面,新创建的作业将显示在作业列表中,在“状态”列中我们可以查看作业的状态。作业提交成功后,状态将变为“运行中”。
发送DIS数据,查看结果
这里使用DIS Agent向云上DIS通道发送CSV结构的数据,DIS Agent是一个本地运行的代理,监控本地文件变化,一旦文件中有新的数据追加,就会即时把新增的数据发送到DIS通道中,类似flume。
DIS Agent详细使用指南,请参见使用Agent上传数据。
- 启动DIS Agent。
- 下载DIS Agent:https://dis-publish.obs-website.cn-north-1.myhuaweicloud.com/dis-agent-1.1.0.zip。
- 将下载的DIS Agent在本地解压。
- 修改conf/agent.yml。
--- # 不变 region: cn-north-1 # user ak (get from 'My Credential') ak: 填写你的AK # user sk (get from 'My Credential') sk: 填写你的SK ak/sk:进入console控制台->右上角 我的账号 选择"我的凭证"-> "管理访问秘钥"->"新增访问秘钥" # user project id (get from 'My Credential') projectId: 进入console控制台->右上角 我的账号 选择"我的凭证"-> "项目列表"中选择"cn-north-1"对应的"项目ID" # 不变。 endpoint: https://dis.cn-north-1.myhuaweicloud.com:20004 # config each flow to monitor file. flows: # 填写您创建的DIS通道的名称 - DISStream: input-dis # 填写数据文件所在的路径 filePattern: D:/disagent-cw/dis-agent-1.1.0/data/*.log # 不变
- 启动DIS Agent。
- Linux环境:bin/start-dis-agent.sh
- Windows环境:bin/start-dis-agent.bat
- 发送DIS数据。
将您的数据文件放置在agent.yml中配置的文件路径处,您可以通过在本地写个小程序,如下所示,向文件中追加数据。您需将该小程序同样移到agent.yml中配置的文件路径处。
1 2 3 4 5 6
import time for idx in range(10000): with open("test.log", mode = "a+") as f: f.write("api.huaweicloud.cn||45.249.212.44||15421010072.675||200||651||228||POST /x/report/heartbeat HTTP/1.1||-||Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0||-||200||0.033||-.918nw0fj-||0.033||140.206.227.10:80" + "\n" + "api.huaweicloud.cn||45.249.212.52||15421010072.875||200||651||228||POST /details/jobs HTTP/1.1||-||Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0||-||200||0.033||-.918nw0fj-||0.033||140.206.227.10:80" + "\n") time.sleep(60)
- 登录OBS管理控制台,进入“output-obs” 桶下“logInfos”目录,单击“下载”,查看输出结果。
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)