1. 启动ActiveMQ:
2. 安装jdk和eclipse
Jdk 官方下载:https://www.oracle.com/cn/java/technologies/downloads/
jdk安装测试:
eclipse官方下载网站免安装版,解压缩就可以使用:
https://www.eclipse.org/downloads/packages/
3.下载Paho Mqtt的jar包:
Index of /repositories/paho/org/eclipse/paho/mqtt-client/0.4.1-SNAPSHOT
4.打开eclipse,创建Java project,右击项目,添加路径libs,把上面下载的Paho Mqtt的jar包放进libs路径, Add external jar...
5.编写代码
src源码路径添加两个class:MqttTest,MyMqtt,代码如下:
MqttTest.java:
public class MyMqtt {
//Mqtt服务器地址,由于本地电脑,所以用本地回环地址作为服务器地址
private String host = "tcp://127.0.0.1:1883";
//登录ActiveMQ的账号信息
private String userName = "admin";
private String passWord = "admin";
//连接服务器的客户端实例client
private MqttClient client;
//存放订阅的主题
private MqttTopic mqttTopic;
//当前主题
private String myTopic = "Topics/htjs/serverToPhone";
//发送消息的主体
private MqttMessage message;
public MyMqtt(String id,MqttCallback callback,boolean cleanSession) {
// TODO Auto-generated constructor stub
try {
//配置连接属性
client = new MqttClient(host,id,new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(cleanSession);
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
//设置回调函数
if(callback == null) {
client.setCallback(new MqttCallback(){
@Override
public void connectionLost(Throwable arg0) {
// TODO Auto-generated method stub
System.out.println(id+"connectLost"+arg0);
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
// TODO Auto-generated method stub
System.out.println(id+"deliveryComplete"+arg0);
}
@Override
public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
// TODO Auto-generated method stub
System.out.println(id+"messageArrived"+arg1.toString());
}});
}else{
client.setCallback(callback);
}
//发起连接
client.connect(options);
}catch(MqttException e) {
e.printStackTrace();
}
}
//添加订阅信息的方法
public void subscribe(String[] topicFilters,int[] qos) {
try {
client.subscribe(topicFilters,qos);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//添加发布消息的方法
public void sendMessage(String msg) {
sendMessage(myTopic,msg);
}
private void sendMessage(String topic, String msg) {
// TODO Auto-generated method stub
try {
message = new MqttMessage();
message.setQos(1);
message.setRetained(true);
message.setPayload(msg.getBytes());
mqttTopic = client.getTopic(topic);
//发布主题
MqttDeliveryToken token = mqttTopic.publish(message);
token.waitForCompletion();
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
MyMqtt.java:
public class MqttTest {
private static String[] myTopics = {"Topics/Light/SwitchOn","Topics/Light/SwitchOff"};
private static int[] myQos = {2,2};
public static void main(String[] args) {
// TODO Auto-generated method stub
MyMqtt myMqtt = new MyMqtt("client",null,false);
myMqtt.subscribe(myTopics, myQos);
System.out.println("client start...");
}
}
6. 运行代码,注意这时候ActiveMQ是已经启动的!Java编写的id为client的客户端已经启动成功:
7.通信测试
下载Paho MQTT客户端程序(Paho):
Index of /repositories/paho-releases/org/eclipse/paho/org.eclipse.paho.ui.app/1.1.1
解压缩,进入路径,启动paho客户端
添加连接,输入本机1883的服务器地址,连接client客户端,订阅client发布的主题:Topics/htjs/serverToPhone,发布开灯的主题:Topics/Light/SwitchOn,发送的信息为:{value:80}:
打开eclipe项目,可以收到paho客户端刚才发过来的主题和信息:
8.总结
首先开启ActiveMQ服务器平台,用java编写client客户端,发布主题:Topics/htjs/serverToPhone,订阅主题:Topics/Light/SwitchOn,Topics/Light/SwitchOff,在打开paho客户端,连接client客户端,订阅主题:Topics/htjs/serverToPhone,发布主题和消息:Topics/Light/SwitchOn,{value:80},最后在client客户端收到信息80,网络架构如下图所示: