개발 공부 기록/02. Spring Boot

Spring Boot에서 MQTT 사용해보기

박세류 2024. 2. 13. 21:26

MQTT란?

Message Queue for Telemetry Transport로써, IoT기기와 게이트웨이가 통신하기 위해 만들어진 프로토콜이다.

 

MQTT의 특징

  1. Connection oriented: 연결된 클라이언트와 브로커 간에 지속적인 연결을 유지하기 위한 세션을 사용
  2. 퍼블리셔(publisher)서브스크라이버(subscriber) : 퍼블리셔는 특정 주제(Topic)에 대한 메세지를 보내고, 서브스크라이버는 그 주제(Topic)를 구독하여 메세지를 받는다.
  3. 브로커(Borker): MQTT 메세지를 중개하고 관리하는 서버. 클라이언트(퍼블리셔 역할을 함)는 브로커에 연결하여 메세지를 발행 or 수신한다.
  4. QOS(Quality of Service): 메시지의 전달 품질을 나타내는 등급. 0, 1, 2 의 3가지 레벨이 존재한다.
    • 0 : 최대 한번 전송하며 메시지를 전달 할 뿐 클라이언트의 수신을 보장하지 않는다. (손실 가능성)
    • 1 : 적어도 한 번 이상 전송되며, 브로커는 수신 확인을 보내고, 수신 확인을 받은 후 불확실 하다면 정해진 횟수만큼 메시지를 재전송한다.
    • 2 : 메시지는 한 번만 전송하며, 구독하는 클라이언트가 정확히 수신할 수 있음을 보장한다.
  5. LWT (Last will and testament)
    • LWT는 유언이라는 의미로 브로커와 클라이언트의 연결이 끊어지면 다른 구독자들에게 메시지가 전송되는 기능이다.

MQTT 이미지

 

그림을 살펴보자면 Broker라는 중계서버가 있고, 그 Broker를 통해 센서와 다른 장비들이 통신한다.

기기들이 broker에게 데이터를 전송하는 과정이 publish, 다른 장비들이 broker에게 데이터를 요청하는 과정을 subscribe라고 한다.

또한 토픽은 데이터의 주제를 나타내며, 해당 topic을 subscribe하는 클라이언트랑만 메시지를 송/수신 하는것이다.

 

 

그럼 Spring Boot로 구현해보자

0. 준비물 

Spring Boot 프로젝트(서버) 2개, 브로커 서버, 테스트할 도구

Spring Boot 1개는 위 이미지의 센서 부분을 해서 메시지를 보내줄 publisher 역할을 하는 서버이고,

나머지 하나는 다른 장비인 클라이언트 해서 메시지를 받는 subscriber 역할을 하는 서버로 사용 할 예정이다.

 

1. Eclipse Mosquitto로 브로커 서버 만들기

 

Download

Source mosquitto-2.0.18.tar.gz (GPG signature) Git source code repository (github.com) Older downloads are available at https://mosquitto.org/files/ Binary Installation The binary packages listed be

mosquitto.org

에서 Mosquitto를 다운로드하고 설치한다.

이후, 환경변수를 설정해 주거나 해당 설치경로로 가서 mosquitto 명령을 실행해 브로커 서버를 실행해 준다.

서버가 열린상태

기본적으로 보안과정이 없는 상태로 1883 포트에 열리므로, 이 점 알아두자.

 

2. Spring boot 서버들의 build.gradle에 Paho Mqtt 라이브러리를 추가한다.

// https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'

Mqtt 통신을 도와주는 라이브러리이다.

 

 

3. MQTT 클라이언트 설정(config) 파일 만들기 

 

  • publisher 쪽
@Configuration
public class MqttConfig {

    @Value("tcp://localhost:1883")
    private String brokerUrl;

    @Value("publisher")
    private String clientId;

    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{brokerUrl});
        options.setCleanSession(true);
        return options;
    }

    @Bean
    public MqttClient mqttClient() {
        try {
            MqttClient mqttClient = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
            mqttClient.connect(mqttConnectOptions());
            return mqttClient;
        } catch (MqttException e) {
            throw new RuntimeException(e);
        }
    }
}

토픽으로 클라이언트에 publish 해주는 쪽 이므로 해당과 같이 작성한다.

 

  • subscriber 쪽 
@Configuration
public class MqttConfig {

    @Value("tcp://localhost:1883")
    private String brokerUrl;

    @Value("subscriber")
    private String clientId;

    @Value("living/#")
    private String topic;

    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{brokerUrl});
        options.setCleanSession(true);
        return options;
    }


    @Bean
    public MqttClient mqttClient() {
        try {
            MqttClient mqttClient = new MqttClient(brokerUrl, clientId, new MemoryPersistence());

            mqttClient.connect(mqttConnectOptions());
            mqttClient.subscribe(topic);
            return mqttClient;
        } catch (MqttException e) {
            throw new RuntimeException(e);
        }
    }
}

위 코드와 크게 다를건 없지만, 구독할 topic과 broker 서버에 날리는 메서드를 추가해준다.

이렇게 모든 서버에 브로커 서버의 주소와 포트를 지정해 연결해 준다.

 

4. Publisher 서버에서 MQTT 토픽과 메시지를 발행(Publish) 해준다.

  • Service 파일
@Service
public class MqttPublisherService {

    private MqttClient mqttClient;

    public MqttPublisherService(MqttClient mqttClient) {
        this.mqttClient = mqttClient;
    }

    public void sendMessage(String topic, String message) {
        try {
            mqttClient.publish(topic, new MqttMessage(message.getBytes()));
        } catch (MqttException e) {
            throw new RuntimeException(e);
        }
    }
}

 

 

5. Subscriber 서버에서는 발행한 메시지를 수신해본다.

  • MqttCallback을 구현하여 MQTT 메시지를 받는다
@Component
public class MqttReceiver implements MqttCallback {

    @Autowired
    private MqttClient mqttClient;

    @PostConstruct
    public void init() {
        mqttClient.setCallback(this);
    }

    @Override
    public void connectionLost(Throwable cause) {
        // 연결이 끊어졌을 때 처리 로직
        System.out.println("연결이 끊어졌을 때 처리 로직");
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // 메시지가 도착했을 때 처리 로직
        System.out.printf("메세지 도착 : topic: %s / message : %s", topic, message);
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        // 메시지 전송이 완료되었을 때 처리 로직
        System.out.println("메시지 전송이 완료되었을 때 처리 로직");
    }
}

publisher 쪽에도 해당 코드가 유용하게 쓰일 수 있을 것 같다. 

 

 

P.S.  MQTT 토픽의 계층 구조
MQTT의 토픽은 계층적 구조를 가질 수 있으며 '/'로 구분한다.

 

  1. '+' (단일 계층 와일드카드) : 'home/+/temperature'는 'home/livingroom/temperature' 나 'home/kitchen/temperature' 모두 포함한다는 것이다.
  2. '#'(멀티 계층 와일드카드) : 'home/#'는 'home'으로 시작하는 모든 토픽의 하위 계층을 포함한다는 뜻이다. 

 

6. 테스트

메시지가 정상적으로 전송/수신 되었는지 확인해 보자.

테스트로 사용할 컨트롤러를 publisher쪽에 만들어보자.

@RestController
public class MqttPublisherController {

    MqttPublisherService mqttPublisherService;

    public MqttPublisherController(MqttPublisherService mqttPublisherService) {
        this.mqttPublisherService = mqttPublisherService;
    }

    @PostMapping("/mqtt/publish")
    public void SendTopicAndMessage(@RequestBody MqttRequestDto mqttRequestDto) {
        mqttPublisherService.sendMessage(mqttRequestDto.getTopic(), mqttRequestDto.getMessage());
    }
}

 

두 서버와 브로커 서버를 가동 후, 간단하게 포스트맨으로 확인 해 보자.

양식에 맞게 보내면,

이와같이 subscriber 쪽에서 메시지를 수신함을 확인 할 수 있다.

 

프로젝트 구조와 코드 전문은 다음에서 확인 할 수 있다.

https://github.com/ParkSeryu/MqttPublisherPractice.git

 

GitHub - ParkSeryu/MqttPublisherPractice

Contribute to ParkSeryu/MqttPublisherPractice development by creating an account on GitHub.

github.com

https://github.com/ParkSeryu/MqttSubscriberPractice.git

 

GitHub - ParkSeryu/MqttSubscriberPractice

Contribute to ParkSeryu/MqttSubscriberPractice development by creating an account on GitHub.

github.com

 

728x90