proxibuy/lib/services/mqtt_service.dart

123 lines
3.9 KiB
Dart

// lib/services/mqtt_service.dart
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:math';
import 'package:flutter/foundation.dart';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';
class MqttService {
late MqttServerClient client;
final String server = '5.75.200.241';
final int port = 1883;
final StreamController<Map<String, dynamic>> _messageStreamController =
StreamController.broadcast();
Stream<Map<String, dynamic>> get messages => _messageStreamController.stream;
bool get isConnected => client.connectionStatus?.state == MqttConnectionState.connected;
Future<void> connect(String token) async {
final String clientId =
'nest-' + Random().nextInt(0xFFFFFF).toRadixString(16).padLeft(6, '0');
final String username = 'ignored';
final String password = token;
client = MqttServerClient.withPort(server, clientId, port);
client.logging(on: true);
client.keepAlivePeriod = 60;
client.autoReconnect = false;
client.setProtocolV311();
debugPrint('--- [MQTT] Attempting to connect...');
debugPrint('--- [MQTT] Server: $server:$port');
debugPrint('--- [MQTT] ClientID: $clientId');
final connMessage = MqttConnectMessage()
.withClientIdentifier(clientId)
.startClean()
.authenticateAs(username, password);
client.connectionMessage = connMessage;
client.onConnected = () {
debugPrint('✅ [MQTT] Connected successfully.');
client.updates!.listen((List<MqttReceivedMessage<MqttMessage>> c) {
final MqttPublishMessage recMess = c[0].payload as MqttPublishMessage;
final String payload =
MqttPublishPayload.bytesToStringAsString(recMess.payload.message);
debugPrint('<<<<< [MQTT] Received Data <<<<<');
debugPrint('<<<<< [MQTT] Topic: ${c[0].topic}');
debugPrint('<<<<< [MQTT] Payload as String: $payload');
debugPrint('<<<<< ======================== <<<<<');
try {
final Map<String, dynamic> jsonPayload = json.decode(payload);
_messageStreamController.add(jsonPayload);
} catch (e) {
debugPrint("❌ [MQTT] Error decoding received JSON: $e");
}
});
};
client.onDisconnected = () {
debugPrint('❌ [MQTT] Disconnected.');
};
client.onSubscribed = (String topic) {
debugPrint('✅ [MQTT] Subscribed to topic: $topic');
};
client.pongCallback = () {
debugPrint('🏓 [MQTT] Ping response received');
};
try {
await client.connect();
} on NoConnectionException catch (e) {
debugPrint('❌ [MQTT] Connection failed - No Connection Exception: $e');
client.disconnect();
} on SocketException catch (e) {
debugPrint('❌ [MQTT] Connection failed - Socket Exception: $e');
client.disconnect();
} catch (e) {
debugPrint('❌ [MQTT] Connection failed - General Exception: $e');
client.disconnect();
}
}
void subscribe(String topic) {
if (isConnected) {
client.subscribe(topic, MqttQos.atLeastOnce);
} else {
debugPrint("⚠️ [MQTT] Cannot subscribe. Client is not connected.");
}
}
void publish(String topic, Map<String, dynamic> message) {
if (isConnected) {
final builder = MqttClientPayloadBuilder();
final payloadString = json.encode(message);
builder.addString(payloadString);
debugPrint('>>>>> [MQTT] Publishing Data >>>>>');
debugPrint('>>>>> [MQTT] Topic: $topic');
debugPrint('>>>>> [MQTT] Payload: $payloadString');
debugPrint('>>>>> ======================= >>>>>');
client.publishMessage(topic, MqttQos.atLeastOnce, builder.payload!);
} else {
debugPrint("⚠️ [MQTT] Cannot publish. Client is not connected.");
}
}
void dispose() {
debugPrint("--- [MQTT] Disposing MQTT Service.");
_messageStreamController.close();
client.disconnect();
}
}