@Configuration
public class MqttConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(MqttConfig.class);
private static final byte[] WILL_DATA;
static {
WILL_DATA = "offline".getBytes();
}
/**
* 订阅的bean名称
*/
public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
/**
* 发布的bean名称
*/
public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.url}")
private String url;
@Value("${mqtt.producer.clientId}")
private String producerClientId;
@Value("${mqtt.producer.defaultTopic}")
private String producerDefaultTopic;
@Value("${mqtt.consumer.clientId}")
private String consumerClientId;
@Value("${mqtt.consumer.defaultTopic}")
private String consumerDefaultTopic;
/**
* MQTT连接器选项
*
* @return {@link org.eclipse.paho.client.mqttv3.MqttConnectOptions}
*/
@Bean
public MqttConnectOptions getMqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
// 这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(false);
// 设置连接的用户名
options.setUserName(username);
// 设置连接的密码
options.setPassword(password.toCharArray());
// 设置连接地址+端口
options.setServerURIs(url.split(","));
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
options.setWill("willTopic", WILL_DATA, 2, false);
return options;
}
/**
* MQTT客户端
*
* @return {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory}
*/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
/**
* MQTT信息通道(生产者)
*
* @return {@link org.springframework.messaging.MessageChannel}
*/
@Bean(name = CHANNEL_NAME_OUT)
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* MQTT消息处理器(生产者)
*
* @return {@link org.springframework.messaging.MessageHandler}
*/
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
public MessageHandler mqttOutbound() {
// 在这里进行mqttOutboundChannel的相关设置
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
producerClientId,
mqttClientFactory());
// 如果设置成true,发送消息时将不会阻塞
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(producerDefaultTopic);
return messageHandler;
}
/**
* MQTT消息订阅绑定(消费者)
*
* @return {@link org.springframework.integration.core.MessageProducer}
*/
@Bean
public MessageProducer inbound() {
// 可以同时消费(订阅)多个Topic
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(
consumerClientId, mqttClientFactory(),//客户端名称
consumerDefaultTopic.split(","));//消费的topic
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
// 设置订阅通道
adapter.setOutputChannel(mqttInboundChannel());
return adapter;
}
/**
* MQTT信息通道(消费者)
*
* @return {@link org.springframework.messaging.MessageChannel}
*/
@Bean(name = CHANNEL_NAME_IN)
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
/**
* MQTT消息处理器(消费者)
*
* @return {@link org.springframework.messaging.MessageHandler}
*/
@Bean
//ServiceActivator注解表明当前方法用于处理MQTT消息,inputChannel参数指定了用于接收消息信息的channel
@ServiceActivator(inputChannel = CHANNEL_NAME_IN)
public MessageHandler handler() {
/* return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
LOGGER.info("=========订阅到的msg{}", message.getPayload());
}
};*/
return message -> {
String payload = message.getPayload().toString();
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
// 根据topic分别进行消息处理。
if (topic.equals("/sys/WG585LL0720041300111/up")) {
LOGGER.info("/sys/WG585LL0720041300111/up: 处理消息 " + payload);
} else if (topic.equals("/WG585LL0720041300111/down")) {
LOGGER.info("/WG585WLAN202001111001/down: 处理消息 " + payload);
} else {
LOGGER.info(topic + ": 其他消息 " + payload);
}
};
}