MQTT简介

MQTT简介

六月 27, 2018

MQTT(消息队列遥测传输)ISO标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件

IBM公司的安迪·斯坦福-克拉克及 Cirrus Link 公司的阿兰·尼普于1999年撰写了该协议的第一个版本。

  该协议的可用性取决于该协议的使用环境。IBM公司在2013年就向结构化资讯标准促进组织提交了 MQTT 3.1 版规范,并附有相关章程,以确保只能对规范进行少量更改。MQTT-SN是针对非 TCP/IP 网络上的嵌入式设备主要协议的变种,与此类似的还有ZigBee协议。

  纵观行业的发展历程,“MQTT”中的“MQ” 是来自于 IBM 的 MQ 系列消息队列产品线。然而通常队列本身不需要作为标准功能来支持。

  可选协议包含了高级消息队列协议,面向文本的消息传递协议,互联网工程任务组约束应用协议,可扩展消息与存在协议,数据分发服务,OPC UA以及 web 应用程序消息传递协议。
– 以上内容摘自维基百科

EMQ 2.0

EMQ (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。Erlang/OTP 是出色的软实时(Soft-Realtime)、低延时(Low-Latency)、分布式(Distributed) 的语言平台。MQTT轻量的 (Lightweight)发布订阅模式 (PubSub)物联网消息协议
EMQ 项目设计目标是承载移动终端或物联网终端海量 MQTT 连接,并实现在海量物联网设备间快速低延时消息路由:

  1. 稳定承载大规模的 MQTT 客户端连接,单服务器节点支持50万100万连接。
  2. 分布式节点集群,快速低延时的消息路由,单集群支持1000万规模的路由。
  3. 消息服务器内扩展,支持定制多种认证方式、高效存储消息到后端数据库。
  4. 完整物联网协议支持,MQTT、MQTT-SN、CoAP、WebSocket 或私有协议支持。

MQTT 发布订阅模式简述

MQTT发布/订阅(Publish/Subscribe) 模式的消息协议,与 HTTP 协议请求响应(Request/Response) 模式不同。
MQTT 发布者与订阅者之间通过 主题(Topic) 进行消息路由,主题 (Topic) 格式类似 Unix 文件路径,例如:

1
2
3
4
5
6
sensor/1/temperature 
chat/room/subject
presence/user/feng
sensor/1/#
sensor/+/temperature
uber/drivers/joe/inbox

主题(Topic) 支持 ‘+’ 、‘#’ 的通配符,‘+’ 通配一个层级,‘#’ 通配多个层级(必须在末尾)。
注解
  初接触MQTT协议的用户,通常会向通配符的’过滤主题’发布广播消息,MQTT 协议不支持这种模式,需从订阅侧设计广播主题(Topic)。 例如 Android 推送,向所有广州用户,推送某类本地消息,客户端获得 GIS 位置后,可订阅 ‘news/city/guangzhou’ 主题。  

EMQ安装

下载地址

LINUX服务器安装(CENTOS)

RPM 安装 (其他平台类似)

1
$ rpm -ivh emqttd-centos7-v2.1.2-1.el7.centos.x86_64.rpm

Erlang/OTP R19 依赖 lksctp-tools 库

1
$ yum install lksctp-tools

由于emqttd是用Erlang语言编写的,所以,在Linux下安装时,需要先安装Erlang。 按需安装

1
2
3
4
5
6
7
8
9
## 安装依赖库
$ sudo yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
## 获取源码包
$ wget http://www.erlang.org/download/otp_src_R13B04.tar.gz
## 解压、编译、安装
$ tar xfvz otp_src_R13B04.tar.gz
$ cd otp_src_R13B04/
$ ./configure --with-ssl
$ sudo make install

EMQ 配置文件:

1
/etc/emqttd/emq.conf

插件配置文件:

1
/etc/emqttd/plugins/*.conf。

日志文件

1
/var/log/emqttd

数据文件

1
/var/lib/emqttd/

启动停止
$ systemctl start|stop|restart emqttd.service

1
$ emqttd console|start|stop|restart

console : 控制台运行,控制台输出状态信息,调试用
start : 守护进程模式启动
stop : 停止
restart : 重启

EMQ 消息服务器进程状态查询

1
$ emqttd_ctl status

正常情况下,查询命令返回:

1
2
3
$ emqttd_ctl status
Node 'emq@127.0.0.1' is started
emqttd 2.3.9 is running

EMQ 消息服务器状态监控 URL

1
http://localhost:8080/status

管理页面(基于Dashboard插件)

1
2
http://host:18083/
如:http://123.123.123.123:18083 或 http://localhost:18083

Dashboard插件配置文件位置

1
/etc/emqttd/plugins/emq_dashboard.conf

默认配置内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
## SSL Handshake timeout.
##
## Value: Duration
## dashboard.listener.https.handshake_timeout = 15s

## See: 'listener.ssl.<name>.ciphers' in emq.conf
##
## Value: Ciphers
## dashboard.listener.https.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA

## See: 'listener.ssl.<name>.secure_renegotiate' in emq.conf
##
## Value: on | off
## dashboard.listener.https.secure_renegotiate = off

## See: 'listener.ssl.<name>.reuse_sessions' in emq.conf
##
## Value: on | off
## dashboard.listener.https.reuse_sessions = on

## See: 'listener.ssl.<name>.honor_cipher_order' in emq.conf
##
## Value: on | off
## dashboard.listener.https.honor_cipher_order = on

可选参数还有

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
## HTTP Listener
dashboard.listener.http = 18083
dashboard.listener.http.acceptors = 2
dashboard.listener.http.max_clients = 512

## HTTPS Listener
## dashboard.listener.https = 18084
## dashboard.listener.https.acceptors = 2
## dashboard.listener.https.max_clients = 512
## dashboard.listener.https.handshake_timeout = 15
## dashboard.listener.https.certfile = etc/certs/cert.pem
## dashboard.listener.https.keyfile = etc/certs/key.pem
## dashboard.listener.https.cacertfile = etc/certs/cacert.pem
## dashboard.listener.https.verify = verify_peer
## dashboard.listener.https.fail_if_no_peer_cert = true

MQTT(EMQ)使用

关键字介绍

网络连接 NETWORK CONNECTION

MQTT使用的底层传输协议基础设施。

  • 客户端使用它连接服务端。
  • 它提供有序的、可靠的、双向字节流传输。

应用消息 APPLICATION MESSAGE

MQTT协议通过网络传输应用数据。应用消息通过MQTT传输时,它们有关联的服务质量(QoS)和主题(Topic)。

客户端 CLIENT

使用MQTT的程序或设备。客户端总是通过网络连接到服务端。它可以

  • 发布应用消息给其它相关的客户端。
  • 订阅以请求接受相关的应用消息。
  • 取消订阅以移除接受应用消息的请求。
  • 从服务端断开连接。

服务端 SERVER

一个程序或设备,作为发送消息的客户端和请求订阅的客户端之间的中介。服务端

  • 接受来自客户端的网络连接。
  • 接受客户端发布的应用消息。
  • 处理客户端的订阅和取消订阅请求。
  • 转发应用消息给符合条件的已订阅客户端。

订阅 SUBSCRIPTION

订阅包含一个主题过滤器(Topic Filter)和一个最大的服务质量(QoS)等级。订阅与单个会话(Session)关联。会话可以包含多于一个的订阅。会话的每个订阅都有一个不同的主题过滤器。

主题名 TOPIC NAME

附加在应用消息上的一个标签,服务端已知且与订阅匹配。服务端发送应用消息的一个副本给每一个匹配的客户端订阅。

主题过滤器 TOPIC FILTER

订阅中包含的一个表达式,用于表示相关的一个或多个主题。主题过滤器可以使用通配符。

会话 SESSION

客户端和服务端之间的状态交互。一些会话持续时长与网络连接一样,另一些可以在客户端和服务端的多个连续网络连接间扩展。

控制报文 MQTT CONTROL PACKET

通过网络连接发送的信息数据包。MQTT规范定义了十四种不同类型的控制报文,其中一个(PUBLISH报文)用于传输应用消息。

服务质量 QOS

这是消息的模式,目前有3个值,分别代表不同的消息策略或者说质量

  • 0 : 只发送一次,不管成功与否。消耗最低。
  • 1 : 至少1次,保证到达,但是可能存在重复。
  • 2 : 只有一次,保证到达并且只有一次成功。消耗最高。

客户端订阅-JAVA

Eclipse Paho

单向认证

1) 将 cacert.pem 重命名为 cacert.crt
2) 加入JDK证书信任,jdk1.8.0_121\jre\lib\security目录下运行命令:

1
$ keytool -import -alias cacert -keystore cacerts -file 路径/cacert.crt

3) 源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.cert.CertificateException;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MqttPublishSample {

public static void main(String[] args) throws KeyManagementException, CertificateException, FileNotFoundException, IOException, KeyStoreException {

String topic = "MQTT Examples";
String content = "Message from MqttPublishSample";
int qos = 2;//服务质量,G7中建议为0或者1
String broker = "ssl://10.110.111.251:8883";
String clientId = "JavaSample";
MemoryPersistence persistence = new MemoryPersistence();

try {
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: "+broker);
sampleClient.connect(connOpts);
System.out.println("Connected");
System.out.println("Publishing message: "+content);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
sampleClient.publish(topic, message);
System.out.println("Message published");
sampleClient.disconnect();
System.out.println("Disconnected");
System.exit(0);
} catch(MqttException me) {
System.out.println("reason "+me.getReasonCode());
System.out.println("msg "+me.getMessage());
System.out.println("loc "+me.getLocalizedMessage());
System.out.println("cause "+me.getCause());
System.out.println("excep "+me);
me.printStackTrace();
}
}
}

双向认证

1) 证书转换,将 client-key.pem 转换成 pkcs8 格式的证书

1
2
$ openssl pkcs8 -topk8 -inform PEM -in client-key.pem -outform PEM -nocrypt -out client-key-pkcs8.pem

2) 将 cacert.pem 重命名为 cacert.crt

3) 将 client-cert.pem 重命名为 client-cert.crt

4) 源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.security.KeyFactory;
import java.security.KeyStore;
import java.security.PrivateKey;
import java.security.SecureRandom;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.security.interfaces.RSAPrivateKey;
import java.security.spec.PKCS8EncodedKeySpec;

import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;

import org.apache.commons.codec.binary.Base64;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class PahoTlsExample {

private static void connect() {

String topic = "MQTT Examples";
String content = "Message from MqttPublishSample";
int qos = 2;
String broker = "ssl://10.110.111.251:8883";
String clientId = "JavaSample";
MemoryPersistence persistence = new MemoryPersistence();

try {
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
SSLSocketFactory factory = null;
try {
factory = getSSLSocktet("F:/emq/cacert/cacert.crt","F:/emq/cacert/client-cert.crt","F:/emq/cacert/client-key-pkcs8.pem","brt123");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
connOpts.setSocketFactory(factory);
System.out.println("Connecting to broker: "+broker);
sampleClient.connect(connOpts);
System.out.println("Connected");
System.out.println("Publishing message: "+content);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
sampleClient.publish(topic, message);
System.out.println("Message published");
sampleClient.disconnect();
System.out.println("Disconnected");
System.exit(0);
} catch(MqttException me) {
System.out.println("reason "+me.getReasonCode());
System.out.println("msg "+me.getMessage());
System.out.println("loc "+me.getLocalizedMessage());
System.out.println("cause "+me.getCause());
System.out.println("excep "+me);
me.printStackTrace();
}

}

private static SSLSocketFactory getSSLSocktet(String caPath,String crtPath, String keyPath, String password) throws Exception {
// CA certificate is used to authenticate server
CertificateFactory cAf = CertificateFactory.getInstance("X.509");
FileInputStream caIn = new FileInputStream(caPath);
X509Certificate ca = (X509Certificate) cAf.generateCertificate(caIn);
KeyStore caKs = KeyStore.getInstance("JKS");
caKs.load(null, null);
caKs.setCertificateEntry("ca-certificate", ca);
TrustManagerFactory tmf = TrustManagerFactory.getInstance("PKIX");
tmf.init(caKs);

CertificateFactory cf = CertificateFactory.getInstance("X.509");
FileInputStream crtIn = new FileInputStream(crtPath);
X509Certificate caCert = (X509Certificate) cf.generateCertificate(crtIn);

crtIn.close();
// client key and certificates are sent to server so it can authenticate
// us
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
ks.load(null, null);
ks.setCertificateEntry("certificate", caCert);
ks.setKeyEntry("private-key", getPrivateKey(keyPath), password.toCharArray(),
new java.security.cert.Certificate[]{caCert} );
KeyManagerFactory kmf = KeyManagerFactory.getInstance("PKIX");
kmf.init(ks, password.toCharArray());

// finally, create SSL socket factory
SSLContext context = SSLContext.getInstance("TLSv1");

context.init(kmf.getKeyManagers(),tmf.getTrustManagers(), new SecureRandom());

return context.getSocketFactory();
}

public static PrivateKey getPrivateKey(String path) throws Exception{

org.apache.commons.codec.binary.Base64 base64=new Base64();
byte[] buffer= base64.decode(getPem(path));

PKCS8EncodedKeySpec keySpec= new PKCS8EncodedKeySpec(buffer);
KeyFactory keyFactory= KeyFactory.getInstance("RSA");
return (RSAPrivateKey) keyFactory.generatePrivate(keySpec);

}

private static String getPem(String path) throws Exception{
FileInputStream fin=new FileInputStream(path);
BufferedReader br= new BufferedReader(new InputStreamReader(fin));
String readLine= null;
StringBuilder sb= new StringBuilder();
while((readLine= br.readLine())!=null){
if(readLine.charAt(0)=='-'){
continue;
}else{
sb.append(readLine);
sb.append('\r');
}
}
fin.close();
return sb.toString();
}
}