MQTT란?
Message Queue for Telemetry Transport로써, IoT기기와 게이트웨이가 통신하기 위해 만들어진 프로토콜이다.
MQTT의 특징
- Connection oriented: 연결된 클라이언트와 브로커 간에 지속적인 연결을 유지하기 위한 세션을 사용
- 퍼블리셔(publisher)와 서브스크라이버(subscriber) : 퍼블리셔는 특정 주제(Topic)에 대한 메세지를 보내고, 서브스크라이버는 그 주제(Topic)를 구독하여 메세지를 받는다.
- 브로커(Borker): MQTT 메세지를 중개하고 관리하는 서버. 클라이언트(퍼블리셔 역할을 함)는 브로커에 연결하여 메세지를 발행 or 수신한다.
- QOS(Quality of Service): 메시지의 전달 품질을 나타내는 등급. 0, 1, 2 의 3가지 레벨이 존재한다.
- 0 : 최대 한번 전송하며 메시지를 전달 할 뿐 클라이언트의 수신을 보장하지 않는다. (손실 가능성)
- 1 : 적어도 한 번 이상 전송되며, 브로커는 수신 확인을 보내고, 수신 확인을 받은 후 불확실 하다면 정해진 횟수만큼 메시지를 재전송한다.
- 2 : 메시지는 한 번만 전송하며, 구독하는 클라이언트가 정확히 수신할 수 있음을 보장한다.
- LWT (Last will and testament)
- LWT는 유언이라는 의미로 브로커와 클라이언트의 연결이 끊어지면 다른 구독자들에게 메시지가 전송되는 기능이다.
그림을 살펴보자면 Broker라는 중계서버가 있고, 그 Broker를 통해 센서와 다른 장비들이 통신한다.
기기들이 broker에게 데이터를 전송하는 과정이 publish, 다른 장비들이 broker에게 데이터를 요청하는 과정을 subscribe라고 한다.
또한 토픽은 데이터의 주제를 나타내며, 해당 topic을 subscribe하는 클라이언트랑만 메시지를 송/수신 하는것이다.
그럼 Spring Boot로 구현해보자
0. 준비물
Spring Boot 프로젝트(서버) 2개, 브로커 서버, 테스트할 도구
Spring Boot 1개는 위 이미지의 센서 부분을 해서 메시지를 보내줄 publisher 역할을 하는 서버이고,
나머지 하나는 다른 장비인 클라이언트 해서 메시지를 받는 subscriber 역할을 하는 서버로 사용 할 예정이다.
1. Eclipse Mosquitto로 브로커 서버 만들기
에서 Mosquitto를 다운로드하고 설치한다.
이후, 환경변수를 설정해 주거나 해당 설치경로로 가서 mosquitto 명령을 실행해 브로커 서버를 실행해 준다.
기본적으로 보안과정이 없는 상태로 1883 포트에 열리므로, 이 점 알아두자.
2. Spring boot 서버들의 build.gradle에 Paho Mqtt 라이브러리를 추가한다.
// https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
Mqtt 통신을 도와주는 라이브러리이다.
3. MQTT 클라이언트 설정(config) 파일 만들기
- publisher 쪽
@Configuration
public class MqttConfig {
@Value("tcp://localhost:1883")
private String brokerUrl;
@Value("publisher")
private String clientId;
@Bean
public MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{brokerUrl});
options.setCleanSession(true);
return options;
}
@Bean
public MqttClient mqttClient() {
try {
MqttClient mqttClient = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
mqttClient.connect(mqttConnectOptions());
return mqttClient;
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
}
토픽으로 클라이언트에 publish 해주는 쪽 이므로 해당과 같이 작성한다.
- subscriber 쪽
@Configuration
public class MqttConfig {
@Value("tcp://localhost:1883")
private String brokerUrl;
@Value("subscriber")
private String clientId;
@Value("living/#")
private String topic;
@Bean
public MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{brokerUrl});
options.setCleanSession(true);
return options;
}
@Bean
public MqttClient mqttClient() {
try {
MqttClient mqttClient = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
mqttClient.connect(mqttConnectOptions());
mqttClient.subscribe(topic);
return mqttClient;
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
}
위 코드와 크게 다를건 없지만, 구독할 topic과 broker 서버에 날리는 메서드를 추가해준다.
이렇게 모든 서버에 브로커 서버의 주소와 포트를 지정해 연결해 준다.
4. Publisher 서버에서 MQTT 토픽과 메시지를 발행(Publish) 해준다.
- Service 파일
@Service
public class MqttPublisherService {
private MqttClient mqttClient;
public MqttPublisherService(MqttClient mqttClient) {
this.mqttClient = mqttClient;
}
public void sendMessage(String topic, String message) {
try {
mqttClient.publish(topic, new MqttMessage(message.getBytes()));
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
}
5. Subscriber 서버에서는 발행한 메시지를 수신해본다.
- MqttCallback을 구현하여 MQTT 메시지를 받는다
@Component
public class MqttReceiver implements MqttCallback {
@Autowired
private MqttClient mqttClient;
@PostConstruct
public void init() {
mqttClient.setCallback(this);
}
@Override
public void connectionLost(Throwable cause) {
// 연결이 끊어졌을 때 처리 로직
System.out.println("연결이 끊어졌을 때 처리 로직");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// 메시지가 도착했을 때 처리 로직
System.out.printf("메세지 도착 : topic: %s / message : %s", topic, message);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// 메시지 전송이 완료되었을 때 처리 로직
System.out.println("메시지 전송이 완료되었을 때 처리 로직");
}
}
publisher 쪽에도 해당 코드가 유용하게 쓰일 수 있을 것 같다.
P.S. MQTT 토픽의 계층 구조
MQTT의 토픽은 계층적 구조를 가질 수 있으며 '/'로 구분한다.
- '+' (단일 계층 와일드카드) : 'home/+/temperature'는 'home/livingroom/temperature' 나 'home/kitchen/temperature' 모두 포함한다는 것이다.
- '#'(멀티 계층 와일드카드) : 'home/#'는 'home'으로 시작하는 모든 토픽의 하위 계층을 포함한다는 뜻이다.
6. 테스트
메시지가 정상적으로 전송/수신 되었는지 확인해 보자.
테스트로 사용할 컨트롤러를 publisher쪽에 만들어보자.
@RestController
public class MqttPublisherController {
MqttPublisherService mqttPublisherService;
public MqttPublisherController(MqttPublisherService mqttPublisherService) {
this.mqttPublisherService = mqttPublisherService;
}
@PostMapping("/mqtt/publish")
public void SendTopicAndMessage(@RequestBody MqttRequestDto mqttRequestDto) {
mqttPublisherService.sendMessage(mqttRequestDto.getTopic(), mqttRequestDto.getMessage());
}
}
두 서버와 브로커 서버를 가동 후, 간단하게 포스트맨으로 확인 해 보자.
양식에 맞게 보내면,
이와같이 subscriber 쪽에서 메시지를 수신함을 확인 할 수 있다.
프로젝트 구조와 코드 전문은 다음에서 확인 할 수 있다.
https://github.com/ParkSeryu/MqttPublisherPractice.git
https://github.com/ParkSeryu/MqttSubscriberPractice.git
'개발 공부 기록 > 02. Spring Boot' 카테고리의 다른 글
[SpringBoot] 프로젝트 만들 때 들어가는 Group? Artifact? 얘내는 뭐지? (0) | 2024.04.01 |
---|---|
Mybatis 기본적인 사용법 정리 (0) | 2024.01.25 |
SpringBoot와 MongoDB 연동하기 (1) | 2024.01.24 |
JPA, Hibernate, Spring Data JPA란? (0) | 2023.12.11 |
스프링 컨테이너를 다루는 방법 (0) | 2023.12.10 |