使用JAVA版Paho框架开发原生MQTT接口
说明:阅读该文档之前需要对Mqtt有一定的了解,这里不对Mqtt知识作介绍,对Mqtt的了解请自行搜索学习。主要说明一下用一个简单的Demo样例,实现和IoT平台的对接,上报数据,下发命令等
一、注册设备
1. 开发中心 注册设备
(1)查看产品信息
产品信息中的 协议类型 必须为MQTT
(2)注册设备
设备管理—>新增真实设备—>选择上面开发好的产品—>接入方式选择 直连
(3)注册设备成功之后需要保存的信息
保存设备ID和密钥,利用其构建clientID
2. 设备管理/设备接入 注册设备
(1)查看产品模型
进入设备管理界面—>产品模型
如果没有产品模型,可以点击右上角,从产品中心导入或者是本地导入
注意:产品的协议类型必须为MQTT
(2)注册设备
进入设备管理界面—>设备—>设备注册—>创建
(3)注册设备成功之后需要保存的信息
保存设备ID和密钥,利用其构建clientID
3. 北向接口 注册设备
注册设备(密码方式)
https://support.huaweicloud.cn/api-IoT/iot_06_0005.html
二、IoT平台提供的原生MQTT接口
https://support.huaweicloud.cn/api-IoT/iot_06_3002.html
Java
本篇文档基于eclipse的paho框架,该框架网上资料较多,可自行百度搜索学习。
代码中在Maven依赖上加载:
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency>
或者直接引用jar包:org.eclipse.paho.client.mqttv3-1.2.0.jar
1.MQTT CONNECT连接鉴权
(1) 主要是证书的配置,certFile就是证书的路径
(2) 其他参数的配置请参考源码:)
Java
private static MqttClient mqttClient; //连接地址每个局点不一样,比如开发中心是:"ssl://iot-acc-dev.huaweicloud.cn:8883" private static String url = "ssl://xx.xx.xx.xx:8883"; //注册直连设备的时候返回的设备ID private static String deviceID = "9a57a-***-***-816b3e"; //注册直连设备的时候返回的秘钥 private static String secret = "cbd*******3abv"; private static String curTime = curTimeStamp(); private static String password = makePwd(secret, curTime); //clientID参考API文档拼装 private static String clientId = deviceID + "_0_0_" + curTime; mqttClient = new MqttClient(url, clientId, new MemoryPersistence()); // 设置回调,这里主要写了接收消息之后的响应 mqttClient.setCallback(new MqttCallback() { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { String content = new String(message.getPayload(), "utf-8"); System.out.println("收到mqtt消息,topic: " + topic + " ,content: " + content); // 设备响应命令 commandRsp(); } @Override public void deliveryComplete(IMqttDeliveryToken arg0) { System.out.println("mqtt 发送完成!"); } @Override public void connectionLost(Throwable arg0) { System.out.println("mqtt 失去了连接!"); } }); // 连接(MQTT CONNECT连接鉴权) mqttConnection(); // 发布(设备上报数据) publish(); // 订阅(设备接收命令) subscribe(); public static MqttClient mqttConnection() { if (mqttClient != null) { try { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); options.setKeepAliveInterval(20); options.setConnectionTimeout(100); options.setUserName(deviceID); options.setPassword(password.toCharArray()); //证书配置,mqtt.jks是平台提供的证书 String certFile = "../ca.jks"; String certPWD = "IoT@2019"; InputStream stream = new FileInputStream(certFile); SSLContext sslContext = SSLContext.getInstance("TLS"); KeyStore ks = KeyStore.getInstance("JKS"); ks.load(stream, certPWD.toCharArray()); TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); tmf.init(ks); TrustManager[] tm = tmf.getTrustManagers(); sslContext.init(null, tm, new SecureRandom()); SocketFactory factory = sslContext.getSocketFactory(); options.setSocketFactory(factory); mqttClient.connect(options); } catch (Exception e) { e.printStackTrace(); } } return mqttClient; }
2.设备上报数据
数据上报就是往平台指定的topic上发布数据
Java
public static void publish() { try { String message = "" + "{ \n" + "\"msgType\":\"deviceReq\", \n" + "\"data\": [{ \n"+ "\"serviceId\":\"Storage\", \n" + "\"serviceData\":{\n" + "\"storage\": 22\n" + "}\n" + "}] \n"+ "}"; MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(1); mqttMessage.setPayload(message.getBytes()); String pubTopic = "/huawei/v1/devices/" + deviceID + "/data/json"; mqttClient.publish(pubTopic, mqttMessage, null, null); } catch (Exception e) { e.printStackTrace(); } }
3.设备接收命令
命令接收就是订阅平台指定的topic,平台往该topic发送命令时,设备端就能收到
Java
//先订阅平台的topic public static void subscribe() { String subtopic = "/huawei/v1/devices/" + deviceID + "/command/json"; mqttClient.subscribe(subtopic, 1); } // 在回调函数里面重写messageArrived方法,打印收到的消息 @Override public void messageArrived(String topic, MqttMessage message) throws Exception { String content = new String(message.getPayload(), "utf-8"); System.out.println("content:" + content); // 设备响应命令 commandRsp(); } }
4.设备响应命令
应用服务器要需要调用“订阅平台业务数据”API订阅“commandRsp”类型的通知后,才能接收到设备对控制命令的应答;
先订阅topic(/huawei/v1/devices/{deviceId}/command/{codecMode})接收到命令,然后往另外一个topic(/huawei/v1/devices/{deviceId}/data/{codecMode})发数据响应给平台,就视为对这条命令的响应,但是要注意,数据上报和命令响应的topic虽然是相同的,但是他们上报的结构体是有区别的
Java
public static void commandRsp() { try { String message = "\n" + "{ \n" + "\"msgType\":\"deviceRsp\", \n" + "\"mid\":1,\n\"errcode\":0, \n"+ "\"body\":{\n" + "\"result\": 0\n" + "}\n" + "}"; MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(1); mqttMessage.setPayload(message.getBytes()); System.out.println("message" + message); String RspTopic = "/huawei/v1/devices/" + deviceID + "/data/json"; mqttClient.publish(RspTopic, mqttMessage, null, null); } catch (Exception e) { e.printStackTrace(); } }
5.工具类
获取当前时间
public static String curTimeStamp() { String TIMESTAMP_FORMAT = "yyyyMMddHH"; SimpleDateFormat sdf = new SimpleDateFormat(TIMESTAMP_FORMAT); String curTimeStamp = sdf.format(new Date(System.currentTimeMillis())); return curTimeStamp; }
生成 password
public static String makePwd(String secret, String curTimeStamp) { String passWord = null; try { Mac sha256_HMAC = Mac.getInstance("HmacSHA256"); SecretKeySpec secret_key = new SecretKeySpec(curTimeStamp.getBytes(), "HmacSHA256"); sha256_HMAC.init(secret_key); byte[] bytes = sha256_HMAC.doFinal(secret.getBytes()); passWord = byteArrayToHexString(bytes); } catch (Exception e) { System.out.println("Error HmacSHA256 ===========" + e.getMessage()); } return passWord; } public static String byteArrayToHexString(byte[] b) { StringBuilder hs = new StringBuilder(); String stmp; for (int n = 0; b != null && n < b.length; n++) { stmp = Integer.toHexString(b[n] & 0XFF); if (stmp.length() == 1) hs.append('0'); hs.append(stmp); } return hs.toString().toLowerCase(); }
- 点赞
- 收藏
- 关注作者
评论(0)