import 'dart:async'; import 'dart:convert'; import 'dart:math'; import 'package:flutter/foundation.dart'; import 'package:mqtt_client/mqtt_client.dart'; import 'package:mqtt_client/mqtt_server_client.dart'; class MqttService { MqttServerClient? client; final String server = '5.75.197.180'; final int port = 1883; final StreamController> _messageStreamController = StreamController.broadcast(); Stream> get messages => _messageStreamController.stream; Completer>? _firstMessageCompleter; bool get isConnected => client?.connectionStatus?.state == MqttConnectionState.connected; Future> awaitFirstMessage() { _firstMessageCompleter = Completer>(); return _firstMessageCompleter!.future; } Future connect(String token) async { final String clientId = 'nest-' + Random().nextInt(0xFFFFFF).toRadixString(16).padLeft(6, '0'); final String username = 'ignored'; final String password = token; client = MqttServerClient.withPort(server, clientId, port); client!.logging(on: true); client!.keepAlivePeriod = 60; client!.autoReconnect = true; client!.setProtocolV311(); final connMessage = MqttConnectMessage() .withClientIdentifier(clientId) .startClean() .authenticateAs(username, password); client!.connectionMessage = connMessage; client!.onConnected = () { debugPrint('✅ [MQTT] Connected successfully.'); client!.updates!.listen((List> c) { final MqttPublishMessage recMess = c[0].payload as MqttPublishMessage; final String payload = MqttPublishPayload.bytesToStringAsString(recMess.payload.message); try { final Map jsonPayload = json.decode(payload); if (!_messageStreamController.isClosed) { _messageStreamController.add(jsonPayload); } if (_firstMessageCompleter != null && !_firstMessageCompleter!.isCompleted) { _firstMessageCompleter!.complete(jsonPayload); } } catch (e) { debugPrint("❌ [MQTT] Error decoding JSON: $e"); if (_firstMessageCompleter != null && !_firstMessageCompleter!.isCompleted) { _firstMessageCompleter!.completeError(e); } } }); }; client!.onDisconnected = () => debugPrint('❌ [MQTT] Disconnected.'); client!.onSubscribed = (String topic) => debugPrint('✅ [MQTT] Subscribed to topic: $topic'); try { await client!.connect(); } catch (e) { debugPrint('❌ [MQTT] Connection failed: $e'); client?.disconnect(); if (_firstMessageCompleter != null && !_firstMessageCompleter!.isCompleted) { _firstMessageCompleter!.completeError(e); } } } void subscribe(String topic) { if (isConnected) { client?.subscribe(topic, MqttQos.atLeastOnce); } } void publish(String topic, Map message) { if (isConnected) { final builder = MqttClientPayloadBuilder(); builder.addString(json.encode(message)); client?.publishMessage(topic, MqttQos.atLeastOnce, builder.payload!); } } void dispose() { client?.disconnect(); _messageStreamController.close(); } }