import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'dart:math'; import 'package:flutter/foundation.dart'; import 'package:mqtt_client/mqtt_client.dart'; import 'package:mqtt_client/mqtt_server_client.dart'; class MqttService { MqttServerClient? client; final String server = '5.75.197.180'; final int port = 1883; final StreamController> _messageStreamController = StreamController.broadcast(); Stream> get messages => _messageStreamController.stream; bool get isConnected { return client?.connectionStatus?.state == MqttConnectionState.connected; } Future connect(String token) async { final String clientId = 'nest-' + Random().nextInt(0xFFFFFF).toRadixString(16).padLeft(6, '0'); final String username = 'ignored'; final String password = token; client = MqttServerClient.withPort(server, clientId, port); client!.logging(on: true); client!.keepAlivePeriod = 60; client!.autoReconnect = true; client!.setProtocolV311(); debugPrint('--- [MQTT] Attempting to connect...'); debugPrint('--- [MQTT] Server: $server:$port'); debugPrint('--- [MQTT] ClientID: $clientId'); final connMessage = MqttConnectMessage() .withClientIdentifier(clientId) .startClean() .authenticateAs(username, password); client!.connectionMessage = connMessage; client!.onConnected = () { debugPrint('✅ [MQTT] Connected successfully.'); client!.updates!.listen((List> c) { final MqttPublishMessage recMess = c[0].payload as MqttPublishMessage; final String payload = MqttPublishPayload.bytesToStringAsString(recMess.payload.message); debugPrint('<<<<< [MQTT] Received Data <<<<<'); debugPrint('<<<<< [MQTT] Topic: ${c[0].topic}'); debugPrint('<<<<< [MQTT] Payload as String: $payload'); debugPrint('<<<<< ======================== <<<<<'); try { final Map jsonPayload = json.decode(payload); _messageStreamController.add(jsonPayload); } catch (e) { debugPrint("❌ [MQTT] Error decoding received JSON: $e"); } }); }; client!.onDisconnected = () { debugPrint('❌ [MQTT] Disconnected.'); }; client!.onAutoReconnect = () { debugPrint('↪️ [MQTT] Auto-reconnecting...'); }; client!.onAutoReconnected = () { debugPrint('✅ [MQTT] Auto-reconnected successfully.'); }; client!.onSubscribed = (String topic) { debugPrint('✅ [MQTT] Subscribed to topic: $topic'); }; client!.pongCallback = () { debugPrint('🏓 [MQTT] Ping response received'); }; try { await client!.connect(); } on NoConnectionException catch (e) { debugPrint('❌ [MQTT] Connection failed - No Connection Exception: $e'); client?.disconnect(); } on SocketException catch (e) { debugPrint('❌ [MQTT] Connection failed - Socket Exception: $e'); client?.disconnect(); } catch (e) { debugPrint('❌ [MQTT] Connection failed - General Exception: $e'); client?.disconnect(); } } void subscribe(String topic) { if (isConnected) { client?.subscribe(topic, MqttQos.atLeastOnce); } else { debugPrint("⚠️ [MQTT] Cannot subscribe. Client is not connected."); } } void publish(String topic, Map message) { if (isConnected) { final builder = MqttClientPayloadBuilder(); final payloadString = json.encode(message); builder.addString(payloadString); debugPrint('>>>>> [MQTT] Publishing Data >>>>>'); debugPrint('>>>>> [MQTT] Topic: $topic'); debugPrint('>>>>> [MQTT] Payload: $payloadString'); debugPrint('>>>>> ======================= >>>>>'); client?.publishMessage(topic, MqttQos.atLeastOnce, builder.payload!); } else { debugPrint("⚠️ [MQTT] Cannot publish. Client is not connected."); } } void dispose() { debugPrint("--- [MQTT] Disposing MQTT Service."); _messageStreamController.close(); client?.disconnect(); } }