实时即未来,大数据项目车联网之驾驶行程入库(16)
theme: smartblue
持续创作,加速成长!这是我参与「掘金日新计划 · 10 月更文挑战」的第16天,点击查看活动详情
1. 驾驶行程入库
驾驶行程数据筛选入库,为第六节的行程指标的分析数据
驾驶行程入库结果字段:
-
vin :车架号
-
lastSoc:上次报文soc
-
lastMileage:上次报文里程数
-
tripStartTime:行程开始时间
-
start_BMS_SOC:行程开始soc
-
start_longitude:行程开始经度
-
start_latitude:行程开始纬度
-
start_mileage:行程开始里程
-
end_BMS_SOC:结束soc
-
end_longitude:结束经度
-
end_latitude:结束纬度
-
end_mileage:结束里程
-
tripEndTime:行程结束时间
-
mileage:行程里程消耗
-
max_speed:最高行驶车速
-
soc_comsuption:soc消耗
-
time_comsuption:行程消耗时间
-
total_low_speed_nums:总低速的次数
-
total_medium_speed_nums:总中速的次数
-
total_high_speed_nums:总高速次数
-
Low_BMS_SOC:低速soc消耗
-
Medium_BMS_SOC:中速soc消耗
-
High_BMS_SOC:高速soc消耗
-
Low_BMS_Mileage:低速里程
-
Medium_BMS_Mileage:中速里程
-
High_BMS_Mileage:高速里程
-
tripStatus:是否为异常行程(0:正常行程 1:异常行程(只有一个采样点))
1.1 驾驶行程主任务定义
主类中复用已定义好的session window处理的数据,主类中定义驾驶行程的window function和指定数据写入hbase的自定义hbase sink
//TODO 9)根据vin进行分组
KeyedStream<ItcastDataObj, String> keyedStream = tripDriveWatermark.keyBy(ItcastDataObj::getVin);
1.2 自定义驾驶行程window function
实现window function接口,定义接口参数<输入数据类型,输出数据类型,key的类型, TimeWindow>
/**
* 实现驾驶行程数据的计算逻辑
*/
public class DriveTripWindowFunction implements WindowFunction<ItcastDataObj, TripModel, String, TimeWindow> {
}
定义驾驶行程结果对象:TripModel
/**
* 定义驾驶行程计算结果对应的JavaBean对象
*/
如需实现数据写入hdfs,要实现toHiveString方法
完整的TripModel对象
实现重写apply方法逻辑
/**
* 窗口划分的每一个车辆15分钟内的行程,计算行程结果,返回TripModel对象
* @param key
* @param timeWindow
* @param iterable
* @param collector
* @throws Exception
*/
@Override
public void apply(String key, TimeWindow timeWindow, Iterable<ItcastDataObj> iterable, Collector<TripModel> collector) throws Exception {
//窗口内的数据有水位线,因此需要对窗口内的数据进行排序,否则拼接出来的数据是不准确的
//todo 1:先将迭代器转换成集合对象
ArrayList<ItcastDataObj> itcastDataObjArrayList = Lists.newArrayList(iterable);
//todo 2:对每一个会话窗口内的元素进行排序操作
itcastDataObjArrayList.sort(((o1, o2) -> {
//如果第一个元素对象的TerminalTimeStamp,大于第二个元素对象的TerminalTimeStamp
if(o1.getTerminalTimeStamp()> o2.getTerminalTimeStamp()){
//升序排序,就会交换两个对象的值
return 1;
}else if(o1.getTerminalTimeStamp() < o2.getTerminalTimeStamp()){
return -1;
}else{
return 0;
}
}));
TripModel returnState = new TripModel();
getTripInfo(itcastDataObjArrayList, returnState);
//返回转换后的javaBean对象
collector.collect(returnState);
}
行程指标计算,方法:getTripInfo
获得list第一条数据、或的list中最后一条数据
从第一条数据中得到vin、tripStartTime、start_BMS_SOC、start_longitude、start_latitude、start_mileage
从最后一条数据中得到tripEndTime、end_BMS_SOC、end_longitude、end_latitude、end_mileage、mileage、time_comsuption、lastSoc、lastMileage
遍历list,计算得到soc_comsuption、max_speed、total_low_speed_nums、low_BMS_SOC、low_BMS_Mileage、total_medium_speed_nums、medium_BMS_SOC、medium_BMS_Mileage、total_high_speed_nums、high_BMS_SOC、high_BMS_Mileage、lastSoc、lastMileage
/**
* 驾驶行程指标计算
* @param itcastDataObjArrays
* @return
*/
1.3 驾驶行程分析结果数据写入hbase
驾驶行程分析数据写入hbase,需自定义hbase sink
主任务类中定义
// todo 14)驾驶行程入hbase库TRIPDB:strip_division tripdataresult.addSink(new TripDivisionHBaseSink(“TRIPDB:trip_division”));
在hbase中创建行程划分实时数据分析结果表
create_namespace 'TRIPDB’list_namespace
自定义sink:****TripDivisionHBaseSink****
封装habsePut方法,返回Put对象
驾驶行程分析结果,用于驾驶行程的分析,如:速度、里程、油耗、行程开始时间、行程结束时间等指标分析
- 点赞
- 收藏
- 关注作者
评论(0)