使用JAVA版Paho框架开发原生MQTT接口

举报
华为梁工 发表于 2020/01/15 15:59:09 2020/01/15
【摘要】 说明:阅读该文档之前需要对Mqtt有一定的了解,这里不对Mqtt知识作介绍,对Mqtt的了解请自行搜索学习。主要说明一下用一个简单的Demo样例,实现和IoT平台的对接,上报数据,下发命令等

说明:阅读该文档之前需要对Mqtt有一定的了解,这里不对Mqtt知识作介绍,对Mqtt的了解请自行搜索学习。主要说明一下用一个简单的Demo样例,实现和IoT平台的对接,上报数据,下发命令等

一、注册设备

1. 开发中心 注册设备

(1)查看产品信息

产品信息中的 协议类型 必须为MQTT

01.PNG

(2)注册设备

设备管理—>新增真实设备—>选择上面开发好的产品—>接入方式选择 直连

02.PNG

(3)注册设备成功之后需要保存的信息

保存设备ID和密钥,利用其构建clientID

03.PNG

2. 设备管理/设备接入 注册设备

(1)查看产品模型

进入设备管理界面—>产品模型

如果没有产品模型,可以点击右上角,从产品中心导入或者是本地导入

注意:产品的协议类型必须为MQTT

06.PNG

(2)注册设备

进入设备管理界面—>设备—>设备注册—>创建

04.PNG

(3)注册设备成功之后需要保存的信息

保存设备ID和密钥,利用其构建clientID

05.PNG

3. 北向接口 注册设备

注册设备(密码方式)

https://support.huaweicloud.cn/api-IoT/iot_06_0005.html

二、IoT平台提供的原生MQTT接口

https://support.huaweicloud.cn/api-IoT/iot_06_3002.html

  • Java

本篇文档基于eclipse的paho框架,该框架网上资料较多,可自行百度搜索学习。

代码中在Maven依赖上加载:

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.0</version>
</dependency>

或者直接引用jar包:org.eclipse.paho.client.mqttv3-1.2.0.jar

1.MQTT CONNECT连接鉴权

(1) 主要是证书的配置,certFile就是证书的路径

(2) 其他参数的配置请参考源码:)

  • Java

private static MqttClient mqttClient;

//连接地址每个局点不一样,比如开发中心是:"ssl://iot-acc-dev.huaweicloud.cn:8883"
private static String url = "ssl://xx.xx.xx.xx:8883";
//注册直连设备的时候返回的设备ID
private static String deviceID = "9a57a-***-***-816b3e";
//注册直连设备的时候返回的秘钥
private static String secret = "cbd*******3abv";

private static String curTime = curTimeStamp();
private static String password = makePwd(secret, curTime);
//clientID参考API文档拼装
private static String clientId = deviceID + "_0_0_" + curTime;

mqttClient = new MqttClient(url, clientId, new MemoryPersistence());

// 设置回调,这里主要写了接收消息之后的响应
mqttClient.setCallback(new MqttCallback() {            
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
           String content = new String(message.getPayload(), "utf-8");
        System.out.println("收到mqtt消息,topic: " + topic + " ,content: " + content);

        // 设备响应命令
        commandRsp();
    }            
    @Override
    public void deliveryComplete(IMqttDeliveryToken arg0) {
        System.out.println("mqtt 发送完成!");
    }
    @Override
    public void connectionLost(Throwable arg0) {
        System.out.println("mqtt 失去了连接!");
    }
});

// 连接(MQTT CONNECT连接鉴权)
mqttConnection();

// 发布(设备上报数据)
publish();

// 订阅(设备接收命令)
subscribe();

public static MqttClient mqttConnection() {
    if (mqttClient != null) {
        try {
            MqttConnectOptions options = new MqttConnectOptions();

            options.setCleanSession(true);
            options.setKeepAliveInterval(20);
            options.setConnectionTimeout(100);
            options.setUserName(deviceID);
            options.setPassword(password.toCharArray());

            //证书配置,mqtt.jks是平台提供的证书
            String certFile = "../ca.jks";
            String certPWD = "IoT@2019";

            InputStream stream = new FileInputStream(certFile);
            SSLContext sslContext = SSLContext.getInstance("TLS");
            KeyStore ks = KeyStore.getInstance("JKS");
            ks.load(stream, certPWD.toCharArray());
            TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
            tmf.init(ks);
            TrustManager[] tm = tmf.getTrustManagers();
            sslContext.init(null, tm, new SecureRandom());
            SocketFactory factory = sslContext.getSocketFactory();

            options.setSocketFactory(factory);

            mqttClient.connect(options);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
        return mqttClient;
}

2.设备上报数据

数据上报就是往平台指定的topic上发布数据

  • Java

public static void publish() {
    try {
        String message = "" + "{ \n" + "\"msgType\":\"deviceReq\", \n" + "\"data\": [{ \n"+ "\"serviceId\":\"Storage\", \n" + "\"serviceData\":{\n" + "\"storage\": 22\n" + "}\n" + "}] \n"+ "}";
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(1);
        mqttMessage.setPayload(message.getBytes());

        String pubTopic = "/huawei/v1/devices/" + deviceID + "/data/json";

        mqttClient.publish(pubTopic, mqttMessage, null, null);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

3.设备接收命令

命令接收就是订阅平台指定的topic,平台往该topic发送命令时,设备端就能收到

  • Java

//先订阅平台的topic
public static void subscribe() {
    String subtopic = "/huawei/v1/devices/" + deviceID + "/command/json";
    mqttClient.subscribe(subtopic, 1);
}

// 在回调函数里面重写messageArrived方法,打印收到的消息
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
    String content = new String(message.getPayload(), "utf-8");
    System.out.println("content:" + content);

    // 设备响应命令
    commandRsp();
    }
}

4.设备响应命令

  1. 应用服务器要需要调用“订阅平台业务数据”API订阅“commandRsp”类型的通知后,才能接收到设备对控制命令的应答;

  2. 先订阅topic(/huawei/v1/devices/{deviceId}/command/{codecMode})接收到命令,然后往另外一个topic(/huawei/v1/devices/{deviceId}/data/{codecMode})发数据响应给平台,就视为对这条命令的响应,但是要注意,数据上报和命令响应的topic虽然是相同的,但是他们上报的结构体是有区别的

  • Java

public static void commandRsp() {
    try {
        String message = "\n" + "{ \n" + "\"msgType\":\"deviceRsp\", \n" + "\"mid\":1,\n\"errcode\":0, \n"+ "\"body\":{\n" + "\"result\": 0\n" + "}\n" + "}";
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(1);
        mqttMessage.setPayload(message.getBytes());
        System.out.println("message" + message);

        String RspTopic = "/huawei/v1/devices/" + deviceID + "/data/json";

        mqttClient.publish(RspTopic, mqttMessage, null, null);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

5.工具类

获取当前时间

public static String curTimeStamp() {
    String TIMESTAMP_FORMAT = "yyyyMMddHH";
    SimpleDateFormat sdf = new SimpleDateFormat(TIMESTAMP_FORMAT);
    String curTimeStamp = sdf.format(new Date(System.currentTimeMillis()));
    return curTimeStamp;
}

生成 password

public static String makePwd(String secret, String curTimeStamp) {
    String passWord = null;
    try {
        Mac sha256_HMAC = Mac.getInstance("HmacSHA256");
        SecretKeySpec secret_key = new SecretKeySpec(curTimeStamp.getBytes(), "HmacSHA256");
        sha256_HMAC.init(secret_key);
        byte[] bytes = sha256_HMAC.doFinal(secret.getBytes());
        passWord = byteArrayToHexString(bytes);
    } catch (Exception e) {
        System.out.println("Error HmacSHA256 ===========" + e.getMessage());
    }
    return passWord;
}

public static String byteArrayToHexString(byte[] b) {
    StringBuilder hs = new StringBuilder();
    String stmp;
    for (int n = 0; b != null && n < b.length; n++) {
        stmp = Integer.toHexString(b[n] & 0XFF);
        if (stmp.length() == 1)
            hs.append('0');
        hs.append(stmp);
    }
       return hs.toString().toLowerCase();
}


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。