import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'dart:math'; import 'package:flutter/foundation.dart'; import 'package:mqtt_client/mqtt_client.dart'; import 'package:mqtt_client/mqtt_server_client.dart'; class MqttService { late MqttServerClient client; final String server = '5.75.197.180'; final int port = 1883; final StreamController> _messageStreamController = StreamController.broadcast(); Stream> get messages => _messageStreamController.stream; bool get isConnected => client.connectionStatus?.state == MqttConnectionState.connected; Future connect(String token) async { final String clientId = 'nest-' + Random().nextInt(0xFFFFFF).toRadixString(16).padLeft(6, '0'); final String username = 'ignored'; final String password = token; client = MqttServerClient.withPort(server, clientId, port); client.logging(on: true); client.keepAlivePeriod = 60; client.autoReconnect = true; client.setProtocolV311(); debugPrint('--- [MQTT] Attempting to connect...'); debugPrint('--- [MQTT] Server: $server:$port'); debugPrint('--- [MQTT] ClientID: $clientId'); final connMessage = MqttConnectMessage() .withClientIdentifier(clientId) .startClean() .authenticateAs(username, password); client.connectionMessage = connMessage; client.onConnected = () { debugPrint('✅ [MQTT] Connected successfully.'); client.updates!.listen((List> c) { final MqttPublishMessage recMess = c[0].payload as MqttPublishMessage; final String payload = MqttPublishPayload.bytesToStringAsString(recMess.payload.message); debugPrint('<<<<< [MQTT] Received Data <<<<<'); debugPrint('<<<<< [MQTT] Topic: ${c[0].topic}'); debugPrint('<<<<< [MQTT] Payload as String: $payload'); debugPrint('<<<<< ======================== <<<<<'); try { final Map jsonPayload = json.decode(payload); _messageStreamController.add(jsonPayload); } catch (e) { debugPrint("❌ [MQTT] Error decoding received JSON: $e"); } }); }; client.onDisconnected = () { debugPrint('❌ [MQTT] Disconnected.'); }; client.onAutoReconnect = () { debugPrint('↪️ [MQTT] Auto-reconnecting...'); }; client.onAutoReconnected = () { debugPrint('✅ [MQTT] Auto-reconnected successfully.'); }; client.onSubscribed = (String topic) { debugPrint('✅ [MQTT] Subscribed to topic: $topic'); }; client.pongCallback = () { debugPrint('🏓 [MQTT] Ping response received'); }; try { await client.connect(); } on NoConnectionException catch (e) { debugPrint('❌ [MQTT] Connection failed - No Connection Exception: $e'); client.disconnect(); } on SocketException catch (e) { debugPrint('❌ [MQTT] Connection failed - Socket Exception: $e'); client.disconnect(); } catch (e) { debugPrint('❌ [MQTT] Connection failed - General Exception: $e'); client.disconnect(); } } void subscribe(String topic) { if (isConnected) { client.subscribe(topic, MqttQos.atLeastOnce); } else { debugPrint("⚠️ [MQTT] Cannot subscribe. Client is not connected."); } } void publish(String topic, Map message) { if (isConnected) { final builder = MqttClientPayloadBuilder(); final payloadString = json.encode(message); builder.addString(payloadString); debugPrint('>>>>> [MQTT] Publishing Data >>>>>'); debugPrint('>>>>> [MQTT] Topic: $topic'); debugPrint('>>>>> [MQTT] Payload: $payloadString'); debugPrint('>>>>> ======================= >>>>>'); client.publishMessage(topic, MqttQos.atLeastOnce, builder.payload!); } else { debugPrint("⚠️ [MQTT] Cannot publish. Client is not connected."); } } void dispose() { debugPrint("--- [MQTT] Disposing MQTT Service."); _messageStreamController.close(); client.disconnect(); } }