proxibuy/lib/services/mqtt_service.dart

100 lines
3.3 KiB
Dart

import 'dart:async';
import 'dart:convert';
import 'dart:math';
import 'package:flutter/foundation.dart';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';
class MqttService {
MqttServerClient? client;
final String server = '5.75.197.180';
final int port = 1883;
final StreamController<Map<String, dynamic>> _messageStreamController = StreamController.broadcast();
Stream<Map<String, dynamic>> get messages => _messageStreamController.stream;
Completer<Map<String, dynamic>>? _firstMessageCompleter;
bool get isConnected => client?.connectionStatus?.state == MqttConnectionState.connected;
Future<Map<String, dynamic>> awaitFirstMessage() {
_firstMessageCompleter = Completer<Map<String, dynamic>>();
return _firstMessageCompleter!.future;
}
Future<void> connect(String token) async {
final String clientId = 'nest-' + Random().nextInt(0xFFFFFF).toRadixString(16).padLeft(6, '0');
final String username = 'ignored';
final String password = token;
client = MqttServerClient.withPort(server, clientId, port);
client!.logging(on: true);
client!.keepAlivePeriod = 60;
client!.autoReconnect = true;
client!.setProtocolV311();
final connMessage = MqttConnectMessage()
.withClientIdentifier(clientId)
.startClean()
.authenticateAs(username, password);
client!.connectionMessage = connMessage;
client!.onConnected = () {
debugPrint('✅ [MQTT] Connected successfully.');
client!.updates!.listen((List<MqttReceivedMessage<MqttMessage>> c) {
final MqttPublishMessage recMess = c[0].payload as MqttPublishMessage;
final String payload = MqttPublishPayload.bytesToStringAsString(recMess.payload.message);
try {
final Map<String, dynamic> jsonPayload = json.decode(payload);
if (!_messageStreamController.isClosed) {
_messageStreamController.add(jsonPayload);
}
if (_firstMessageCompleter != null && !_firstMessageCompleter!.isCompleted) {
_firstMessageCompleter!.complete(jsonPayload);
}
} catch (e) {
debugPrint("❌ [MQTT] Error decoding JSON: $e");
if (_firstMessageCompleter != null && !_firstMessageCompleter!.isCompleted) {
_firstMessageCompleter!.completeError(e);
}
}
});
};
client!.onDisconnected = () => debugPrint('❌ [MQTT] Disconnected.');
client!.onSubscribed = (String topic) => debugPrint('✅ [MQTT] Subscribed to topic: $topic');
try {
await client!.connect();
} catch (e) {
debugPrint('❌ [MQTT] Connection failed: $e');
client?.disconnect();
if (_firstMessageCompleter != null && !_firstMessageCompleter!.isCompleted) {
_firstMessageCompleter!.completeError(e);
}
}
}
void subscribe(String topic) {
if (isConnected) {
client?.subscribe(topic, MqttQos.atLeastOnce);
}
}
void publish(String topic, Map<String, dynamic> message) {
if (isConnected) {
final builder = MqttClientPayloadBuilder();
builder.addString(json.encode(message));
client?.publishMessage(topic, MqttQos.atLeastOnce, builder.payload!);
}
}
void dispose() {
client?.disconnect();
_messageStreamController.close();
}
}