spring-integration连接MQTT全过程

目录
  • 首先需要引入spring-integration-mqt的包
  • MQTT的配置比较简单
  • 其中ChanneName是一个常量类
  • 数据发送网关只是一个接口
  • MQTT服务器有数据下发时
  • 最后是参数配置文件
  • 总结

MQTT一种物联网数据传输协议,构建在TCP之上,采用发布与订阅的模式进行数据交互,发布与订阅是两个独立的连接通道,这里采用spring-integration-mqt来实现发布与订阅MQTT,与直接采用MQTT的SDK相对要简单许多,服务端采用ActiveMQ来支持MQTT的消息服务并实现消息转发。

首先需要引入spring-integration-mqt的包

这里只需要引入这一个包即可。

<dependency>
     <groupId>org.springframework.integration</groupId>
     <artifactId>spring-integration-mqtt</artifactId>
     <version>5.3.1.RELEASE</version>
</dependency>

MQTT的配置比较简单

和spring-integration集成一样,需要配置相对应的入站、出站就可以了

具体配置如下:

package org.noka.serialservice.config;
 
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.noka.serialservice.service.MsgSendService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.AbstractMqttMessageHandler;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.support.MessageBuilder;
 
/**--------------------------------------------------------------
 * MQTT 数据转发服务
 * mqtt.services MQTT服务地址不配置时,不会启用该服务
 * 检测mqtt.services这个参数是否配置,以确定是否启用MQTT服务
 * @author  xiefangjian@163.com
 * @version 1.0.0
 **------------------------------------------------------------*/
@EnableIntegration
@Configuration
@ConditionalOnProperty("mqtt.services")
public class MQTTConfig implements ApplicationListener<ApplicationEvent> {
    private static Logger logger = LoggerFactory.getLogger(MQTTConfig.class);
 
    private final MsgSendService msgSendService;//发布消息到消息中间件接口
 
    @Value("${mqtt.appid:mqtt_id}")
    private String appid;//客户端ID
 
    @Value("${mqtt.input.topic:mqtt_input_topic}")
    private String[] inputTopic;//订阅主题,可以是多个主题
 
    @Value("${mqtt.out.topic:mqtt_out_topic}")
    private String[] outTopic;//发布主题,可以是多个主题
 
    @Value("${mqtt.services:#{null}}")
    private String[] mqttServices;//服务器地址以及端口
 
    @Value("${mqtt.user:#{null}}")
    private String user;//用户名
 
    @Value("${mqtt.password:#{null}}")
    private String password;//密码
 
    @Value("${mqtt.KeepAliveInterval:300}")
    private Integer KeepAliveInterval;//心跳时间,默认为5分钟
 
    @Value("${mqtt.CleanSession:false}")
    private Boolean CleanSession;//是否不保持session,默认为session保持
 
    @Value("${mqtt.AutomaticReconnect:true}")
    private Boolean AutomaticReconnect;//是否自动重联,默认为开启自动重联
 
    @Value("${mqtt.CompletionTimeout:30000}")
    private Long CompletionTimeout;//连接超时,默认为30秒
 
    @Value("${mqtt.Qos:1}")
    private Integer Qos;//通信质量,详见MQTT协议
 
 
    public MQTTConfig(MsgSendService msgSendService) {
        this.msgSendService = msgSendService;
    }
 
    /**
     * MQTT连接配置
     * @return 连接工厂
     */
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();//连接工厂类
        MqttConnectOptions options = new MqttConnectOptions();//连接参数
        options.setServerURIs(mqttServices);//连接地址
        if(null!=user) {
            options.setUserName(user);//用户名
        }
        if(null!=password) {
            options.setPassword(password.toCharArray());//密码
        }
        options.setKeepAliveInterval(KeepAliveInterval);//心跳时间
        options.setAutomaticReconnect(AutomaticReconnect);//断开是否自动重联
        options.setCleanSession(CleanSession);//保持session
        factory.setConnectionOptions(options);
        return factory;
    }
 
    /**
     * 入站管道
     * @param mqttPahoClientFactory
     * @return
     */
    @Bean
    public MessageProducerSupport mqttInput(MqttPahoClientFactory mqttPahoClientFactory){
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(appid, mqttPahoClientFactory, inputTopic);//建立订阅连接
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
        converter.setPayloadAsBytes(true);//bytes类型接收
        adapter.setCompletionTimeout(CompletionTimeout);//连接超时的时间
        adapter.setConverter(converter);
        adapter.setQos(Qos);//消息质量
        adapter.setOutputChannelName(ChannelName.INPUT_DATA);//输入管道名称
        return adapter;
    }
    /**
     * 向服务器发送数据管道绑定
     * @param connectionFactory tcp连接工厂类
     * @return 消息管道对象
     */
    @Bean
    @ServiceActivator(inputChannel = ChannelName.OUTPUT_DATA_MQTT)
    public AbstractMqttMessageHandler MQTTOutAdapter(MqttPahoClientFactory connectionFactory) {
        //创建一个新的出站管道,由于MQTT的发布与订阅是两个独立的连接,因此客户端的ID(即APPID)不能与订阅时所使用的ID一样,否则在服务端会认为是同一个客户端,而造成连接失败
        MqttPahoMessageHandler outGate = new MqttPahoMessageHandler(appid + "_put", connectionFactory);
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
        converter.setPayloadAsBytes(true);//bytes类型接收
        outGate.setAsync(true);
        outGate.setCompletionTimeout(CompletionTimeout);//设置连接超时时时
        outGate.setDefaultQos(Qos);//设置通信质量
        outGate.setConverter(converter);
        return outGate;
    }
 
    /**
     * MQTT连接时调用的方法
     * @param event
     */
    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        if (event instanceof MqttSubscribedEvent) {
            String msg = "OK";
            /**------------------连接时需要发送起始消息,写在这里-------------**/
            msgSendService.send(MessageBuilder.withPayload(msg.getBytes()).build());
        }
    }
}

其中ChanneName是一个常量类

来标识入站、出站管道的名称,以便在其它需要的地方使用,实现方法如下:

/** -----------------------------------------
 * 管道名称常量类
 * @author  xiefangjian@163.com
 * @version 1.0.0
 ** ---------------------------------------**/
public class ChannelName {
    public final static String INPUT_DATA="input_data";//入站管道
    public final static String OUTPUT_DATA_TCP="output_data_TCP";//TCP出站管道
    public final static String OUTPUT_DATA_MQTT="output_data_MQTT";//mqtt出站管道名称
}

此时所有配置完成,接下来需要做的就是处理接收到的数据和发布数据,以上配置完成以后,接收和发送数据都是通过数据管道来完成,配置的是数据管道名称。

数据发送网关只是一个接口

用于向指定的数据管道里面发送数据,实现如下:

package org.noka.serialservice.service;
 
import org.noka.serialservice.config.ChannelName;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
 
/**----------------------------------------------------------------
 * 发送消息网关,其它需要发向服务器发送消息时,调用该接口
 * @author  xiefangjian@163.com
 * @version  1.0.0
 **--------------------------------------------------------------**/
@MessagingGateway
@Component
public interface MsgGateway {
    /**
     * MQTT 发送网关
     * @param a 主题,可以指定不同的数据发布主题,在消息中间件里面体现为不同的消息队列
     * @param out 消息内容
     */
    @Gateway(requestChannel = ChannelName.OUTPUT_DATA_MQTT)
    void send(@Header(MqttHeaders.TOPIC) String a, Message<byte[]> out);
}

在需要的地方,可以向下面这样调用这个接口,向MQTT服务器发送消息

//topic为主题名称,out为消息内容
msgGateway.send(topic, out);

MQTT服务器有数据下发时

会自动调将数据放入配置的入站数据管道中,在需要接收数据的地方,向下面这样配置即可

    /**
     * 服务器有数据下发
     * 用ServiceActivator配置需要接收的数据管道名称,当该管道里面的数据时,会自动调用该方法
     * @param in 服务器有数据下发时,序列化后的对象,这里使用byte数组
     */
    @ServiceActivator(inputChannel = ChannelName.INPUT_DATA)
    public void upCase(Message<byte[]> in) {
        logger.info("[net service data]========================================");
        logger.info("[net dow data]"+new String(in.getPayload()));//字符串方式打印服务器下发的数据
        logger.info("[net dow hex]"+ Hex.encodeHexString(in.getPayload(),false));//16进制方式打印服务器下发的数据
        serialService.send(in.getPayload());//将服务器下发的数据转发给串口
    }

最后是参数配置文件

#--------MQTT---------------------------
#设备ID,唯一标识
mqtt.appid=mqtt_id
#订阅主题,多个主题用逗号分隔
mqtt.input.topic=mqtt_input_topic
#发布主题
mqtt.out.topic=mqtt_out_topic,aac
#MQTT服务器地址,可以是多个地址
mqtt.services=tcp://47.244.191.41:1883
#mqtt用户名,默认无
#mqtt.user=guest
#mqtt密码,默认无
#mqtt.password=guest
#心跳间隔时间,默认3000
#mqtt.KeepAliveInterval=3000
#是否不保持session,默认false
#mqtt.CleanSession=false
#是否自动连接,默认true
#mqtt.AutomaticReconnect=true
#连接超时,默认30000
#mqtt.CompletionTimeout=30000
#传输质量,默认1
#mqtt.Qos=1

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • 解决spring-integration-mqtt频繁报Lost connection错误问题

    目录 问题描述 解决过程 总结 问题描述 在之前的博客介绍了如何在 Spring Boot 集成 MQTT,后面使用中没有发现问题,最近发现一直报错: Lost connection: Connection lost; retrying...Lost connection: 已断开连接; retrying... 解决过程 网上说是因为 client ID 重复,最开始是不相信的,因为我测试只启动了一个客户端.但是却怎么都定位不到异常原因,用重新回到 client ID 重复的这个思路上来: 因为

  • 使用java 实现mqtt两种常用方式

    目录 前言 Paho Java 库实现 spring boot集成mqtt 核心代码 总结 前言 在开发MQTT时有两种方式一种是使用Paho Java 原生库来完成,一种是使用spring boot 来完成. Paho Java 库实现 Eclipse Paho Java Client (opens new window)是用 Java 编写的 MQTT 客户端库(MQTT Java Client),可用于 JVM 或其他 Java 兼容平台(例如Android).Eclipse Paho J

  • 源码解读Spring-Integration执行过程

    一,前言 Spring-Integration基于Spring,在应用程序中启用了轻量级消息传递,并支持通过声明式适配器与外部系统集成.这一段官网的介绍,概况了整个Integration的用途.个人感觉消息传递是真正的重点. 如上图所示,典型的生产者-消费者模式,中间通过一个特定的通道进行数据传输,说到这,是不是隐隐感觉到queue的存在.确实事实上这个所谓的通道默认就是用的 blockingqueue. Spring-Integration网上的资料是真少,再加上源码分析的是更少.关于Spri

  • spring-integration连接MQTT全过程

    目录 首先需要引入spring-integration-mqt的包 MQTT的配置比较简单 其中ChanneName是一个常量类 数据发送网关只是一个接口 MQTT服务器有数据下发时 最后是参数配置文件 总结 MQTT一种物联网数据传输协议,构建在TCP之上,采用发布与订阅的模式进行数据交互,发布与订阅是两个独立的连接通道,这里采用spring-integration-mqt来实现发布与订阅MQTT,与直接采用MQTT的SDK相对要简单许多,服务端采用ActiveMQ来支持MQTT的消息服务并实

  • Spring boot 集成 MQTT详情

    目录 一.简介 二.主要特性 三.集成步骤 1.引入相关jar包 2.核心配置类 3.网关配置 4.编写测试类 5.yml配置信息 一.简介 MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,可以以极少的代码和有限的带宽为连接远程设备提供实时可靠的消息服务.目前在物联网.小型设备.移动应用等方面有较广泛的应用. 二.主要特性 (1)使用发布/订阅消

  • Spring Integration概述与怎么使用详解

    目录 一.Spring Integration是什么? 二.为什么要用Spring Integration? 三.怎么使用 总结 一.Spring Integration是什么? spring-integration是一个功能强大的EIP(Enterprise Integration Patterns),即企业集成模式.多个系统或者功能,少不了消息交互 ,Spring intergration的出现解决了系统与系统之间的功能之间消息交互的问题. 二.为什么要用Spring Integration

  • 详解Spring Hibernate连接oracle数据库的配置

    详解Spring Hibernate连接oracle数据库的配置 jdbc.properties文件配置如下  driverClassName=oracle.jdbc.driver.OracleDriver url=jdbc\:oracle\:thin\:@localhost\:1521\: database=OA username=oa password=oa initialSize=2 maxActive=10 maxIdle=2 minIdle=2 removeAbandoned=true

  • Spring Boot 连接LDAP的方法

    本文是Spring Boot系列文集中关于LDAP连接相关操作的一文.仅仅涉及基本的使用ODM来快速实现LDAP增删改查操作.详细的关于Spring LDAP的其他操作,可以参考翻译的官方文档. 本文目的:使用Spring Boot构建项目,帮助读者快速配置并使用Spring LDAP操作LDAP.大致步骤如下: 1.创建Spring Boot项目(约1分钟) 2.添加pom.xml文件中Spring LDAP依赖(约1分钟) 3.配置Spring LDAP连接信息(约1分钟) 4.创建实体类作

  • Spring Boot连接超时导致502错误的实战案例

    1.问题描述 内部系统之间通过Nginx来实现路由转发. 但最近发现有一个系统,经常报502错误,每天达到上百次,完全无法忍受. 2. 原因排查 于是进行排查, 发现配置人员把连接超时时间(server.tomcat.connection-timeout)的单位,理解为秒,实际上是毫秒. SpringBoot的部分配置如下: # Tomcat server: tomcat: uri-encoding: UTF-8 max-threads: 1000 min-spare-threads: 30 c

  • Spring Boot高级教程之Spring Boot连接MySql数据库

    Spring Boot可以大大简化持久化任务,几乎不需要写SQL语句,在之前章节"Spring Boot 构建框架"中我们新建了一个Spring Boot应用程序,本章在原有的工程中与数据库建立连接. Spring Boot有两种方法与数据库建立连接,一种是使用JdbcTemplate,另一种集成Mybatis,下面分别为大家介绍一下如何集成和使用这两种方式. 1. 使用JdbcTemplate <dependency> <groupId>mysql</g

  • Spring boot 连接多数据源过程详解

    1.application.yml中添加两个datasource server: port: 8080 spring: application: name: king datasource: master: type: com.zaxxer.hikari.HikariDataSource jdbc-url: jdbc:mysql://localhost:3306/king?useUnicode=yes&characterEncoding=UTF-8&serverTimezone=UTC d

  • Spring boot连接MySQL 8.0可能出现的问题

    前言 在学习任何一个后端技术,如果不让数据库参与进来,那只能说在学习过程中都不算完整的. 以前用的是5.7版本的MySQL,在学习实践Springboot的时候顺带升级了一下8.0,遇到了一些坑,在这记录一下,有碰到同类问题的童鞋需要自取. 下面话不多说了,来一起看看详细的介绍吧 1.使用 navicat连接发现报错1251- Client does not support authentication protocol 错误 这个笔者查询资料发现是新版本的加密规则变了,在mysql8之后,加密

  • Spring整合Mybatis的全过程

    1.Spring配置文件 1.1配置数据库连接池 <!--读取文件--> <util:properties id="config" location="classpath:Config/db.properties"/> <!--配置数据库连接池--> <bean id="source" class="org.apache.commons.dbcp.BasicDataSource">

随机推荐