From 6fd475658a9a50160949a9d5aaf823364153af81 Mon Sep 17 00:00:00 2001 From: Shabirmean Date: Tue, 22 Dec 2015 11:30:17 +0530 Subject: [PATCH 1/2] Refactoring XMPP and MQTT Connectors for VirtualFireAlarm --- .../agent/advanced/sidhdhi/SidhdhiQuery.java | 16 +- .../mqtt/FireAlarmMQTTCommunicator.java | 80 +++-- .../xmpp/FireAlarmXMPPCommunicator.java | 108 ++++--- .../constants/VirtualFireAlarmConstants.java | 4 +- .../service/VirtualFireAlarmService.java | 145 +++++---- ...ava => VirtualFireAlarmMQTTConnector.java} | 86 +++--- .../VirtualFireAlarmXMPPConnector.java | 278 +++++++++++++----- .../util/VirtualFireAlarmServiceUtils.java | 28 -- .../src/main/webapp/META-INF/resources.xml | 4 +- .../src/main/webapp/WEB-INF/cxf-servlet.xml | 20 +- .../device/mgt/iot/DeviceController.java | 3 +- .../iot/controlqueue/xmpp/XmppAccount.java | 5 - .../controlqueue/xmpp/XmppServerClient.java | 91 +++++- .../mgt/iot/transport/TransportHandler.java | 13 +- .../transport/mqtt/MQTTTransportHandler.java | 9 +- .../transport/xmpp/XMPPTransportHandler.java | 41 ++- 16 files changed, 559 insertions(+), 372 deletions(-) rename components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/service/transport/{VirtualFireAlarmMQTTSubscriber.java => VirtualFireAlarmMQTTConnector.java} (78%) diff --git a/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/sidhdhi/SidhdhiQuery.java b/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/sidhdhi/SidhdhiQuery.java index 7ce14bdee9..09e8a92799 100644 --- a/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/sidhdhi/SidhdhiQuery.java +++ b/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/sidhdhi/SidhdhiQuery.java @@ -43,7 +43,7 @@ import java.nio.file.Files; import java.nio.file.Paths; /** - * This class reads the sonar reading and injects values + * This class reads the humidity reading and injects values * to the siddhiEngine for processing on a routine basis * also if the siddhiquery is updated the class takes * care of re-initializing same. @@ -104,8 +104,8 @@ public class SidhdhiQuery implements Runnable { //Sending events to Siddhi try { - int sonarReading = AgentManager.getInstance().getTemperature(); - inputHandler.send(new Object[]{"FIRE_1", sonarReading}); + int humidityReading = AgentManager.getInstance().getTemperature(); + inputHandler.send(new Object[]{"FIRE_1", humidityReading}); Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); @@ -142,14 +142,14 @@ public class SidhdhiQuery implements Runnable { /** - * Read sonar data from API URL + * Read humidity data from API URL * - * @param sonarAPIUrl + * @param humidityAPIUrl * @return */ - private String readSonarData(String sonarAPIUrl) { + private String readHumidityData(String humidityAPIUrl) { HttpClient client = new DefaultHttpClient(); - HttpGet request = new HttpGet(sonarAPIUrl); + HttpGet request = new HttpGet(humidityAPIUrl); String responseStr = null; try { HttpResponse response = client.execute(request); @@ -161,7 +161,7 @@ public class SidhdhiQuery implements Runnable { } catch (IOException e) { //log.error("Exception encountered while trying to make get request."); - log.error("Error while reading sonar reading from file!"); + log.error("Error while reading humidity reading from file!"); return responseStr; } return responseStr; diff --git a/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/communication/mqtt/FireAlarmMQTTCommunicator.java b/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/communication/mqtt/FireAlarmMQTTCommunicator.java index 4f560791d3..83d9ba79f2 100644 --- a/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/communication/mqtt/FireAlarmMQTTCommunicator.java +++ b/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/communication/mqtt/FireAlarmMQTTCommunicator.java @@ -103,6 +103,7 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler { String deviceID = agentManager.getAgentConfigs().getDeviceId(); String receivedMessage; String replyMessage; + String securePayLoad; try { receivedMessage = AgentUtilOperations.extractMessageFromPayload(message.toString()); @@ -116,56 +117,53 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler { String[] controlSignal = receivedMessage.split(":"); // message- ":" format.(ex: "BULB:ON", "TEMPERATURE", "HUMIDITY") - switch (controlSignal[0].toUpperCase()) { - case AgentConstants.BULB_CONTROL: - boolean stateToSwitch = controlSignal[1].equals(AgentConstants.CONTROL_ON); + try { + switch (controlSignal[0].toUpperCase()) { + case AgentConstants.BULB_CONTROL: + boolean stateToSwitch = controlSignal[1].equals(AgentConstants.CONTROL_ON); + agentManager.changeAlarmStatus(stateToSwitch); + log.info(AgentConstants.LOG_APPENDER + "Bulb was switched to state: '" + controlSignal[1] + "'"); + break; - agentManager.changeAlarmStatus(stateToSwitch); - log.info(AgentConstants.LOG_APPENDER + "Bulb was switched to state: '" + controlSignal[1] + "'"); - break; + case AgentConstants.TEMPERATURE_CONTROL: + int currentTemperature = agentManager.getTemperature(); - case AgentConstants.TEMPERATURE_CONTROL: - int currentTemperature = agentManager.getTemperature(); + String replyTemperature = "Current temperature was read as: '" + currentTemperature + "C'"; + log.info(AgentConstants.LOG_APPENDER + replyTemperature); - String replyTemperature = "Current temperature was read as: '" + currentTemperature + "C'"; - log.info(AgentConstants.LOG_APPENDER + replyTemperature); + String tempPublishTopic = String.format(AgentConstants.MQTT_PUBLISH_TOPIC, + serverName, deviceOwner, deviceID); - String tempPublishTopic = String.format(AgentConstants.MQTT_PUBLISH_TOPIC, - serverName, deviceOwner, deviceID); - replyMessage = AgentConstants.TEMPERATURE_CONTROL + ":" + currentTemperature; + replyMessage = AgentConstants.TEMPERATURE_CONTROL + ":" + currentTemperature; + securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage); + publishToQueue(tempPublishTopic, securePayLoad); + break; - try { - publishToQueue(tempPublishTopic, replyMessage); - } catch (TransportHandlerException e) { - log.error(AgentConstants.LOG_APPENDER + - "MQTT - Publishing, reply message to the MQTT Queue at: " + - agentManager.getAgentConfigs().getMqttBrokerEndpoint() + " failed"); - } - break; + case AgentConstants.HUMIDITY_CONTROL: + int currentHumidity = agentManager.getHumidity(); - case AgentConstants.HUMIDITY_CONTROL: - int currentHumidity = agentManager.getHumidity(); + String replyHumidity = "Current humidity was read as: '" + currentHumidity + "%'"; + log.info(AgentConstants.LOG_APPENDER + replyHumidity); - String replyHumidity = "Current humidity was read as: '" + currentHumidity + "%'"; - log.info(AgentConstants.LOG_APPENDER + replyHumidity); + String humidPublishTopic = String.format( + AgentConstants.MQTT_PUBLISH_TOPIC, serverName, deviceOwner, deviceID); - String humidPublishTopic = String.format( - AgentConstants.MQTT_PUBLISH_TOPIC, serverName, deviceOwner, deviceID); - replyMessage = AgentConstants.HUMIDITY_CONTROL + ":" + currentHumidity; + replyMessage = AgentConstants.HUMIDITY_CONTROL + ":" + currentHumidity; + securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage); + publishToQueue(humidPublishTopic, securePayLoad); + break; - try { - publishToQueue(humidPublishTopic, replyMessage); - } catch (TransportHandlerException e) { - log.error(AgentConstants.LOG_APPENDER + - "MQTT - Publishing, reply message to the MQTT Queue at: " + - agentManager.getAgentConfigs().getMqttBrokerEndpoint() + " failed"); - } - break; - - default: - log.warn(AgentConstants.LOG_APPENDER + "'" + controlSignal[0] + - "' is invalid and not-supported for this device-type"); - break; + default: + log.warn(AgentConstants.LOG_APPENDER + "'" + controlSignal[0] + + "' is invalid and not-supported for this device-type"); + break; + } + } catch (AgentCoreOperationException e) { + log.warn(AgentConstants.LOG_APPENDER + "Preparing Secure payload failed", e); + } catch (TransportHandlerException e) { + log.error(AgentConstants.LOG_APPENDER + + "MQTT - Publishing, reply message to the MQTT Queue at: " + + agentManager.getAgentConfigs().getMqttBrokerEndpoint() + " failed"); } } diff --git a/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/communication/xmpp/FireAlarmXMPPCommunicator.java b/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/communication/xmpp/FireAlarmXMPPCommunicator.java index 21dedf8367..5d2f5564bb 100644 --- a/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/communication/xmpp/FireAlarmXMPPCommunicator.java +++ b/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/communication/xmpp/FireAlarmXMPPCommunicator.java @@ -21,10 +21,12 @@ package org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.communication.xmpp import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.jivesoftware.smack.packet.Message; -import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.transport.xmpp.XMPPTransportHandler; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.core.AgentConstants; -import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.transport.TransportHandlerException; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.core.AgentManager; +import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.core.AgentUtilOperations; +import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.exception.AgentCoreOperationException; +import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.transport.TransportHandlerException; +import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.transport.xmpp.XMPPTransportHandler; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -94,8 +96,7 @@ public class FireAlarmXMPPCommunicator extends XMPPTransportHandler { } }; - connectorServiceHandler = service.scheduleAtFixedRate(connect, 0, timeoutInterval, - TimeUnit.MILLISECONDS); + connectorServiceHandler = service.scheduleAtFixedRate(connect, 0, timeoutInterval, TimeUnit.MILLISECONDS); } /** @@ -110,63 +111,76 @@ public class FireAlarmXMPPCommunicator extends XMPPTransportHandler { final AgentManager agentManager = AgentManager.getInstance(); String from = xmppMessage.getFrom(); String message = xmppMessage.getBody(); - log.info(AgentConstants.LOG_APPENDER + "Received XMPP message [" + message + "] from " + - from); - + String receivedMessage; String replyMessage; - String[] controlSignal = message.split(":"); + String securePayLoad; + + try { + receivedMessage = AgentUtilOperations.extractMessageFromPayload(message); + log.info(AgentConstants.LOG_APPENDER + "Message [" + receivedMessage + "] was received"); + } catch (AgentCoreOperationException e) { + log.warn(AgentConstants.LOG_APPENDER + "Could not extract message from payload.", e); + return; + } + + String[] controlSignal = receivedMessage.split(":"); //message- ":" format. (ex: "BULB:ON", "TEMPERATURE", "HUMIDITY") + try { + switch (controlSignal[0].toUpperCase()) { + case AgentConstants.BULB_CONTROL: + if (controlSignal.length != 2) { + replyMessage = "BULB controls need to be in the form - 'BULB:{ON|OFF}'"; + log.warn(replyMessage); + securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage); + sendXMPPMessage(xmppAdminJID, securePayLoad, "CONTROL-REPLY"); + break; + } - switch (controlSignal[0].toUpperCase()) { - case AgentConstants.BULB_CONTROL: - if (controlSignal.length != 2) { - replyMessage = "BULB controls need to be in the form - 'BULB:{ON|OFF}'"; - log.warn(replyMessage); - sendXMPPMessage(xmppAdminJID, replyMessage, "CONTROL-REPLY"); + agentManager.changeAlarmStatus(controlSignal[1].equals(AgentConstants.CONTROL_ON)); + log.info(AgentConstants.LOG_APPENDER + "Bulb was switched to state: '" + controlSignal[1] + "'"); break; - } - agentManager.changeAlarmStatus(controlSignal[1].equals(AgentConstants.CONTROL_ON)); - log.info(AgentConstants.LOG_APPENDER + "Bulb was switched to state: '" + - controlSignal[1] + "'"); - break; + case AgentConstants.TEMPERATURE_CONTROL: + int currentTemperature = agentManager.getTemperature(); - case AgentConstants.TEMPERATURE_CONTROL: - int currentTemperature = agentManager.getTemperature(); - - String replyTemperature = - "The current temperature was read to be: '" + currentTemperature + - "C'"; - log.info(AgentConstants.LOG_APPENDER + replyTemperature); + String replyTemperature = + "The current temperature was read to be: '" + currentTemperature + + "C'"; + log.info(AgentConstants.LOG_APPENDER + replyTemperature); - replyMessage = AgentConstants.TEMPERATURE_CONTROL + ":" + currentTemperature; - sendXMPPMessage(xmppAdminJID, replyMessage, "CONTROL-REPLY"); - break; + replyMessage = AgentConstants.TEMPERATURE_CONTROL + ":" + currentTemperature; + securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage); + sendXMPPMessage(xmppAdminJID, securePayLoad, "CONTROL-REPLY"); + break; - case AgentConstants.HUMIDITY_CONTROL: - int currentHumidity = agentManager.getHumidity(); + case AgentConstants.HUMIDITY_CONTROL: + int currentHumidity = agentManager.getHumidity(); - String replyHumidity = - "The current humidity was read to be: '" + currentHumidity + "%'"; - log.info(AgentConstants.LOG_APPENDER + replyHumidity); + String replyHumidity = "The current humidity was read to be: '" + currentHumidity + "%'"; + log.info(AgentConstants.LOG_APPENDER + replyHumidity); - replyMessage = AgentConstants.HUMIDITY_CONTROL + ":" + currentHumidity; - sendXMPPMessage(xmppAdminJID, replyMessage, "CONTROL-REPLY"); - break; + replyMessage = AgentConstants.HUMIDITY_CONTROL + ":" + currentHumidity; + securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage); + sendXMPPMessage(xmppAdminJID, securePayLoad, "CONTROL-REPLY"); + break; - default: - replyMessage = "'" + controlSignal[0] + - "' is invalid and not-supported for this device-type"; - log.warn(replyMessage); - sendXMPPMessage(xmppAdminJID, replyMessage, "CONTROL-ERROR"); - break; + default: + replyMessage = "'" + controlSignal[0] + "' is invalid and not-supported for this device-type"; + log.warn(replyMessage); + securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage); + sendXMPPMessage(xmppAdminJID, securePayLoad, "CONTROL-ERROR"); + break; + } + } catch (AgentCoreOperationException e) { + log.warn(AgentConstants.LOG_APPENDER + "Preparing Secure payload failed", e); } } + @Override - public void processIncomingMessage() { + public void publishDeviceData() { final AgentManager agentManager = AgentManager.getInstance(); int publishInterval = agentManager.getPushInterval(); @@ -225,13 +239,15 @@ public class FireAlarmXMPPCommunicator extends XMPPTransportHandler { terminatorThread.start(); } + @Override - public void publishDeviceData(String... publishData) { + public void processIncomingMessage() { } @Override - public void publishDeviceData() { + public void publishDeviceData(String... publishData) { } + } diff --git a/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/plugin/constants/VirtualFireAlarmConstants.java b/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/plugin/constants/VirtualFireAlarmConstants.java index 0190b20013..fc44c2b9b4 100644 --- a/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/plugin/constants/VirtualFireAlarmConstants.java +++ b/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/plugin/constants/VirtualFireAlarmConstants.java @@ -27,8 +27,8 @@ public class VirtualFireAlarmConstants { public static final String URL_PREFIX = "http://"; public static final String BULB_CONTEXT = "/BULB/"; - public static final String SONAR_CONTEXT = "/HUMIDITY/"; + public static final String HUMIDITY_CONTEXT = "/HUMIDITY/"; public static final String TEMPERATURE_CONTEXT = "/TEMPERATURE/"; - public static final String SENSOR_TEMPERATURE = "temperature"; + public static final String SENSOR_TEMP = "temperature"; } diff --git a/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/service/VirtualFireAlarmService.java b/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/service/VirtualFireAlarmService.java index dc8f605a4e..5a48a7fedc 100644 --- a/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/service/VirtualFireAlarmService.java +++ b/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/service/VirtualFireAlarmService.java @@ -42,12 +42,13 @@ import org.wso2.carbon.device.mgt.iot.exception.AccessTokenException; import org.wso2.carbon.device.mgt.iot.exception.DeviceControllerException; import org.wso2.carbon.device.mgt.iot.sensormgt.SensorDataManager; import org.wso2.carbon.device.mgt.iot.sensormgt.SensorRecord; +import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException; import org.wso2.carbon.device.mgt.iot.util.ZipArchive; import org.wso2.carbon.device.mgt.iot.util.ZipUtil; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.plugin.constants.VirtualFireAlarmConstants; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.dto.DeviceJSON; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.exception.VirtualFireAlarmException; -import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.transport.VirtualFireAlarmMQTTSubscriber; +import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.transport.VirtualFireAlarmMQTTConnector; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.transport.VirtualFireAlarmXMPPConnector; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.util.VerificationManager; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.util.VirtualFireAlarmServiceUtils; @@ -98,7 +99,7 @@ public class VirtualFireAlarmService { public static final String MQTT_PROTOCOL = "MQTT"; private VerificationManager verificationManager; - private VirtualFireAlarmMQTTSubscriber virtualFireAlarmMQTTSubscriber; + private VirtualFireAlarmMQTTConnector virtualFireAlarmMQTTConnector; private VirtualFireAlarmXMPPConnector virtualFireAlarmXMPPConnector; private ConcurrentHashMap deviceToIpMap = new ConcurrentHashMap<>(); @@ -125,7 +126,7 @@ public class VirtualFireAlarmService { @Override public void run() { virtualFireAlarmXMPPConnector.initConnector(); - virtualFireAlarmXMPPConnector.connectAndLogin(); + virtualFireAlarmXMPPConnector.connect(); } }; @@ -136,23 +137,23 @@ public class VirtualFireAlarmService { /** * - * @param virtualFireAlarmMQTTSubscriber + * @param virtualFireAlarmMQTTConnector */ - public void setVirtualFireAlarmMQTTSubscriber( - final VirtualFireAlarmMQTTSubscriber virtualFireAlarmMQTTSubscriber) { - this.virtualFireAlarmMQTTSubscriber = virtualFireAlarmMQTTSubscriber; - - Runnable xmppStarter = new Runnable() { - @Override - public void run() { - virtualFireAlarmMQTTSubscriber.initConnector(); - virtualFireAlarmMQTTSubscriber.connect(); - } - }; - - Thread xmppStarterThread = new Thread(xmppStarter); - xmppStarterThread.setDaemon(true); - xmppStarterThread.start(); + public void setVirtualFireAlarmMQTTConnector( + final VirtualFireAlarmMQTTConnector virtualFireAlarmMQTTConnector) { + this.virtualFireAlarmMQTTConnector = virtualFireAlarmMQTTConnector; + +// Runnable xmppStarter = new Runnable() { +// @Override +// public void run() { + virtualFireAlarmMQTTConnector.initConnector(); + virtualFireAlarmMQTTConnector.connect(); +// } +// }; +// +// Thread xmppStarterThread = new Thread(xmppStarter); +// xmppStarterThread.setDaemon(true); +// xmppStarterThread.start(); } /** @@ -175,8 +176,8 @@ public class VirtualFireAlarmService { * * @return */ - public VirtualFireAlarmMQTTSubscriber getVirtualFireAlarmMQTTSubscriber() { - return virtualFireAlarmMQTTSubscriber; + public VirtualFireAlarmMQTTConnector getVirtualFireAlarmMQTTConnector() { + return virtualFireAlarmMQTTConnector; } /* --------------------------------------------------------------------------------------- @@ -619,20 +620,22 @@ public class VirtualFireAlarmService { VirtualFireAlarmServiceUtils.sendCommandViaHTTP(deviceHTTPEndpoint, callUrlPattern, true); break; + case MQTT_PROTOCOL: - String resource = VirtualFireAlarmConstants.BULB_CONTEXT.replace("/", ""); - virtualFireAlarmMQTTSubscriber.publishDeviceData(owner, deviceId, resource, switchToState); + String mqttResource = VirtualFireAlarmConstants.BULB_CONTEXT.replace("/", ""); + virtualFireAlarmMQTTConnector.publishDeviceData(owner, deviceId, mqttResource, switchToState); break; + case XMPP_PROTOCOL: - VirtualFireAlarmServiceUtils.sendCommandViaXMPP(owner, deviceId, - VirtualFireAlarmConstants.BULB_CONTEXT, - switchToState, virtualFireAlarmXMPPConnector); + String xmppResource = VirtualFireAlarmConstants.BULB_CONTEXT.replace("/", ""); + virtualFireAlarmXMPPConnector.publishDeviceData(owner, deviceId, xmppResource, switchToState); break; + default: response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode()); return; } - } catch (DeviceManagementException e) { + } catch (DeviceManagementException | TransportHandlerException e) { log.error("Failed to send switch-bulb request to device [" + deviceId + "] via " + protocolString); response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); return; @@ -650,23 +653,22 @@ public class VirtualFireAlarmService { * @param response * @return */ - @Path("controller/readsonar") + @Path("controller/readhumidity") @GET - @Feature( code="VIRTUALFIREALARM_READSONAR", name="Read Sonar", - description="Read Sonar Readings from Virtual Fire Alarm") - public String requestSonarReading(@HeaderParam("owner") String owner, - @HeaderParam("deviceId") String deviceId, - @HeaderParam("protocol") String protocol, - @Context HttpServletResponse response) { + @Feature( code="VIRTUALFIREALARM_READHUMIDITY", name="Read Humidity", + description="Read Humidity Readings from Virtual Fire Alarm") + public String requestHumidity(@HeaderParam("owner") String owner, + @HeaderParam("deviceId") String deviceId, + @HeaderParam("protocol") String protocol, + @Context HttpServletResponse response) { String replyMsg = ""; DeviceValidator deviceValidator = new DeviceValidator(); try { - if (!deviceValidator.isExist(owner, SUPER_TENANT, new DeviceIdentifier(deviceId, - VirtualFireAlarmConstants - .DEVICE_TYPE))) { + if (!deviceValidator.isExist(owner, SUPER_TENANT, new DeviceIdentifier( + deviceId, VirtualFireAlarmConstants.DEVICE_TYPE))) { response.setStatus(Response.Status.UNAUTHORIZED.getStatusCode()); - return "Unauthorized Access"; + return "Unauthorized Access Attempt"; } } catch (DeviceManagementException e) { replyMsg = e.getErrorMessage(); @@ -677,8 +679,7 @@ public class VirtualFireAlarmService { String protocolString = protocol.toUpperCase(); if (log.isDebugEnabled()) { - log.debug("Sending request to read sonar value of device [" + deviceId + "] via " + - protocolString); + log.debug("Sending request to read humidity value of device [" + deviceId + "] via " + protocolString); } try { @@ -686,26 +687,24 @@ public class VirtualFireAlarmService { case HTTP_PROTOCOL: String deviceHTTPEndpoint = deviceToIpMap.get(deviceId); if (deviceHTTPEndpoint == null) { - replyMsg = - "IP not registered for device: " + deviceId + " of owner: " + owner; + replyMsg = "IP not registered for device: " + deviceId + " of owner: " + owner; response.setStatus(Response.Status.PRECONDITION_FAILED.getStatusCode()); return replyMsg; } replyMsg = VirtualFireAlarmServiceUtils.sendCommandViaHTTP(deviceHTTPEndpoint, - VirtualFireAlarmConstants.SONAR_CONTEXT, + VirtualFireAlarmConstants.HUMIDITY_CONTEXT, false); break; case MQTT_PROTOCOL: - String resource = VirtualFireAlarmConstants.SONAR_CONTEXT.replace("/", ""); - virtualFireAlarmMQTTSubscriber.publishDeviceData(owner, deviceId, resource, ""); + String mqttResource = VirtualFireAlarmConstants.HUMIDITY_CONTEXT.replace("/", ""); + virtualFireAlarmMQTTConnector.publishDeviceData(owner, deviceId, mqttResource, ""); break; case XMPP_PROTOCOL: - VirtualFireAlarmServiceUtils.sendCommandViaXMPP(owner, deviceId, - VirtualFireAlarmConstants.SONAR_CONTEXT, "", - virtualFireAlarmXMPPConnector); + String xmppResource = VirtualFireAlarmConstants.HUMIDITY_CONTEXT.replace("/", ""); + virtualFireAlarmXMPPConnector.publishDeviceData(owner, deviceId, xmppResource, ""); break; default: @@ -713,14 +712,14 @@ public class VirtualFireAlarmService { response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode()); return replyMsg; } - } catch (DeviceManagementException e) { - replyMsg = e.getErrorMessage(); + } catch (DeviceManagementException | TransportHandlerException e) { + replyMsg = e.getMessage(); response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); return replyMsg; } response.setStatus(Response.Status.OK.getStatusCode()); - replyMsg = "The current sonar reading of the device is " + replyMsg; + replyMsg = "The current humidity reading of the device is " + replyMsg; return replyMsg; } @@ -747,9 +746,8 @@ public class VirtualFireAlarmService { DeviceValidator deviceValidator = new DeviceValidator(); try { - if (!deviceValidator.isExist(owner, SUPER_TENANT, new DeviceIdentifier(deviceId, - VirtualFireAlarmConstants - .DEVICE_TYPE))) { + if (!deviceValidator.isExist(owner, SUPER_TENANT, + new DeviceIdentifier(deviceId, VirtualFireAlarmConstants.DEVICE_TYPE))) { response.setStatus(Response.Status.UNAUTHORIZED.getStatusCode()); } } catch (DeviceManagementException e) { @@ -759,9 +757,8 @@ public class VirtualFireAlarmService { String protocolString = protocol.toUpperCase(); if (log.isDebugEnabled()) { - log.debug( - "Sending request to read virtual-firealarm-temperature of device [" + deviceId + - "] via " + protocolString); + log.debug("Sending request to read virtual-firealarm-temperature of device " + + "[" + deviceId + "] via " + protocolString); } try { @@ -772,35 +769,33 @@ public class VirtualFireAlarmService { response.setStatus(Response.Status.PRECONDITION_FAILED.getStatusCode()); } - String temperatureValue = VirtualFireAlarmServiceUtils. - sendCommandViaHTTP(deviceHTTPEndpoint, - VirtualFireAlarmConstants.TEMPERATURE_CONTEXT, - false); + String temperatureValue = VirtualFireAlarmServiceUtils.sendCommandViaHTTP( + deviceHTTPEndpoint, + VirtualFireAlarmConstants.TEMPERATURE_CONTEXT, + false); SensorDataManager.getInstance().setSensorRecord(deviceId, - VirtualFireAlarmConstants.SENSOR_TEMPERATURE, + VirtualFireAlarmConstants.SENSOR_TEMP, temperatureValue, Calendar.getInstance().getTimeInMillis()); break; case MQTT_PROTOCOL: - String resource = VirtualFireAlarmConstants.TEMPERATURE_CONTEXT.replace("/", ""); - virtualFireAlarmMQTTSubscriber.publishDeviceData(owner, deviceId, resource, ""); + String mqttResource = VirtualFireAlarmConstants.TEMPERATURE_CONTEXT.replace("/", ""); + virtualFireAlarmMQTTConnector.publishDeviceData(owner, deviceId, mqttResource, ""); break; case XMPP_PROTOCOL: - VirtualFireAlarmServiceUtils.sendCommandViaXMPP(owner, deviceId, - VirtualFireAlarmConstants.TEMPERATURE_CONTEXT, "", - virtualFireAlarmXMPPConnector); + String xmppResource = VirtualFireAlarmConstants.TEMPERATURE_CONTEXT.replace("/", ""); + virtualFireAlarmMQTTConnector.publishDeviceData(owner, deviceId, xmppResource, ""); break; default: response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode()); } sensorRecord = SensorDataManager.getInstance().getSensorRecord(deviceId, - VirtualFireAlarmConstants - .SENSOR_TEMPERATURE); - } catch (DeviceManagementException | DeviceControllerException e) { + VirtualFireAlarmConstants.SENSOR_TEMP); + } catch (DeviceManagementException | DeviceControllerException | TransportHandlerException e) { response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); } @@ -826,18 +821,16 @@ public class VirtualFireAlarmService { if (registeredIp == null) { log.warn("Unregistered IP: Temperature Data Received from an un-registered IP " + - deviceIp + " for device ID - " + deviceId); + deviceIp + " for device ID - " + deviceId); response.setStatus(Response.Status.PRECONDITION_FAILED.getStatusCode()); return; } else if (!registeredIp.equals(deviceIp)) { - log.warn("Conflicting IP: Received IP is " + deviceIp + ". Device with ID " + - deviceId + - " is already registered under some other IP. Re-registration " + - "required"); + log.warn("Conflicting IP: Received IP is " + deviceIp + ". Device with ID " + deviceId + + " is already registered under some other IP. Re-registration required"); response.setStatus(Response.Status.CONFLICT.getStatusCode()); return; } - SensorDataManager.getInstance().setSensorRecord(deviceId, VirtualFireAlarmConstants.SENSOR_TEMPERATURE, + SensorDataManager.getInstance().setSensorRecord(deviceId, VirtualFireAlarmConstants.SENSOR_TEMP, String.valueOf(temperature), Calendar.getInstance().getTimeInMillis()); diff --git a/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/service/transport/VirtualFireAlarmMQTTSubscriber.java b/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/service/transport/VirtualFireAlarmMQTTConnector.java similarity index 78% rename from components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/service/transport/VirtualFireAlarmMQTTSubscriber.java rename to components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/service/transport/VirtualFireAlarmMQTTConnector.java index 76273eb84a..d8a45678c4 100644 --- a/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/service/transport/VirtualFireAlarmMQTTSubscriber.java +++ b/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/service/transport/VirtualFireAlarmMQTTConnector.java @@ -22,10 +22,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.wso2.carbon.device.mgt.common.DeviceManagementException; import org.wso2.carbon.device.mgt.iot.config.server.DeviceManagementConfigurationManager; import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig; -import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttSubscriber; import org.wso2.carbon.device.mgt.iot.sensormgt.SensorDataManager; import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException; import org.wso2.carbon.device.mgt.iot.transport.mqtt.MQTTTransportHandler; @@ -41,25 +39,24 @@ import java.security.PublicKey; import java.util.Calendar; import java.util.UUID; -public class VirtualFireAlarmMQTTSubscriber extends MQTTTransportHandler { - private static Log log = LogFactory.getLog(VirtualFireAlarmMQTTSubscriber.class); +@SuppressWarnings("no JAX-WS annotation") +public class VirtualFireAlarmMQTTConnector extends MQTTTransportHandler { + private static Log log = LogFactory.getLog(VirtualFireAlarmMQTTConnector.class); - private static final String serverName = - DeviceManagementConfigurationManager.getInstance().getDeviceManagementServerInfo().getName(); - private static final String subscribeTopic = - serverName + File.separator + "+" + File.separator + VirtualFireAlarmConstants.DEVICE_TYPE + - File.separator + "+" + File.separator + "publisher"; + private static String serverName; + private static String subscribeTopic; + private static String iotServerSubscriber; - private static final String iotServerSubscriber = UUID.randomUUID().toString().substring(0, 5); - private String mqttEndpoint; - - private VirtualFireAlarmMQTTSubscriber() { + private VirtualFireAlarmMQTTConnector() { super(iotServerSubscriber, VirtualFireAlarmConstants.DEVICE_TYPE, MqttConfig.getInstance().getMqttQueueEndpoint(), subscribeTopic); } - public void initConnector() { - mqttEndpoint = MqttConfig.getInstance().getMqttQueueEndpoint(); + public void initConnector(){ + iotServerSubscriber = UUID.randomUUID().toString().substring(0, 5); + serverName = DeviceManagementConfigurationManager.getInstance().getDeviceManagementServerInfo().getName(); + subscribeTopic = serverName + File.separator + "+" + File.separator + VirtualFireAlarmConstants.DEVICE_TYPE + + File.separator + "+" + File.separator + "publisher"; } @Override @@ -75,8 +72,7 @@ public class VirtualFireAlarmMQTTSubscriber extends MQTTTransportHandler { try { Thread.sleep(timeoutInterval); } catch (InterruptedException ex) { - //TODO: Need to print exception - log.error("MQTT-Subscriber: Thread Sleep Interrupt Exception"); + log.error("MQTT-Subscriber: Thread Sleep Interrupt Exception.", ex); } } } @@ -126,7 +122,7 @@ public class VirtualFireAlarmMQTTSubscriber extends MQTTTransportHandler { } else if (actualMessage.contains("TEMPERATURE")) { String temperatureValue = actualMessage.split(":")[1]; - SensorDataManager.getInstance().setSensorRecord(deviceId, VirtualFireAlarmConstants.SENSOR_TEMPERATURE, + SensorDataManager.getInstance().setSensorRecord(deviceId, VirtualFireAlarmConstants.SENSOR_TEMP, temperatureValue, Calendar.getInstance().getTimeInMillis()); } @@ -137,22 +133,13 @@ public class VirtualFireAlarmMQTTSubscriber extends MQTTTransportHandler { } } - - @Override - public void publishDeviceData() { - - } - - @Override - public void processIncomingMessage() { - - } - - @Override - public void publishDeviceData(String... publishData) { + public void publishDeviceData(String... publishData) throws TransportHandlerException { if (publishData.length != 4) { - + String errorMsg = "Incorrect number of arguments received to SEND-MQTT Message. " + + "Need to be [owner, deviceId, resource{BULB/TEMP}, state{ON/OFF or null}]"; + log.error(errorMsg); + throw new TransportHandlerException(errorMsg); } String deviceOwner = publishData[0]; @@ -181,9 +168,10 @@ public class VirtualFireAlarmMQTTSubscriber extends MQTTTransportHandler { publishToQueue(publishTopic, pushMessage); } catch (VirtualFireAlarmException e) { - log.error("Preparing Secure payload failed", e); - } catch (TransportHandlerException e) { - log.warn("Data Publish attempt to topic - [" + publishTopic + "] failed for payload [" + pushMessage + "]"); + String errorMsg = "Preparing Secure payload failed for device - [" + deviceId + "] of owner - " + + "[" + deviceOwner + "]."; + log.error(errorMsg); + throw new TransportHandlerException(errorMsg, e); } } @@ -197,13 +185,15 @@ public class VirtualFireAlarmMQTTSubscriber extends MQTTTransportHandler { closeConnection(); } catch (MqttException e) { if (log.isDebugEnabled()) { - log.warn("Unable to 'STOP' MQTT connection at broker at: " + mqttBrokerEndPoint); + log.warn("Unable to 'STOP' MQTT connection at broker at: " + mqttBrokerEndPoint + + " for device-type - " + VirtualFireAlarmConstants.DEVICE_TYPE, e); } try { Thread.sleep(timeoutInterval); } catch (InterruptedException e1) { - log.error("MQTT-Terminator: Thread Sleep Interrupt Exception"); + log.error("MQTT-Terminator: Thread Sleep Interrupt Exception at device-type - " + + VirtualFireAlarmConstants.DEVICE_TYPE, e1); } } } @@ -214,4 +204,26 @@ public class VirtualFireAlarmMQTTSubscriber extends MQTTTransportHandler { terminatorThread.setDaemon(true); terminatorThread.start(); } + + + @Override + public void publishDeviceData() { + // nothing to do + } + + @Override + public void publishDeviceData(MqttMessage publishData) throws TransportHandlerException { + // nothing to do + } + + + @Override + public void processIncomingMessage() { + // nothing to do + } + + @Override + public void processIncomingMessage(MqttMessage message) throws TransportHandlerException { + // nothing to do + } } diff --git a/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/service/transport/VirtualFireAlarmXMPPConnector.java b/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/service/transport/VirtualFireAlarmXMPPConnector.java index 3405541905..285783d850 100644 --- a/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/service/transport/VirtualFireAlarmXMPPConnector.java +++ b/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/service/transport/VirtualFireAlarmXMPPConnector.java @@ -20,49 +20,122 @@ package org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.transport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.eclipse.paho.client.mqttv3.MqttMessage; import org.jivesoftware.smack.packet.Message; -import org.wso2.carbon.device.mgt.common.DeviceManagementException; +import org.wso2.carbon.device.mgt.iot.config.server.DeviceManagementConfigurationManager; +import org.wso2.carbon.device.mgt.iot.controlqueue.xmpp.XmppAccount; import org.wso2.carbon.device.mgt.iot.controlqueue.xmpp.XmppConfig; -import org.wso2.carbon.device.mgt.iot.controlqueue.xmpp.XmppConnector; +import org.wso2.carbon.device.mgt.iot.controlqueue.xmpp.XmppServerClient; +import org.wso2.carbon.device.mgt.iot.exception.DeviceControllerException; import org.wso2.carbon.device.mgt.iot.sensormgt.SensorDataManager; +import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException; +import org.wso2.carbon.device.mgt.iot.transport.xmpp.XMPPTransportHandler; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.plugin.constants.VirtualFireAlarmConstants; +import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.exception.VirtualFireAlarmException; +import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.util.VerificationManager; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.util.VirtualFireAlarmServiceUtils; +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.security.PrivateKey; +import java.security.PublicKey; import java.util.Calendar; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; -public class VirtualFireAlarmXMPPConnector extends XmppConnector { +@SuppressWarnings("no JAX-WS annotation") +public class VirtualFireAlarmXMPPConnector extends XMPPTransportHandler { private static Log log = LogFactory.getLog(VirtualFireAlarmXMPPConnector.class); private static String xmppServerIP; - // private static int xmppServerPort; - private static String xmppAdminUsername; - private static String xmppAdminPassword; - private static String xmppAdminAccountJID; + private static String xmppVFireAlarmAdminUsername; + private static String xmppVFireAlarmAdminAccountJID; + private static final String V_FIREALARM_XMPP_PASSWORD = "vfirealarm@123"; + private ScheduledFuture connectorServiceHandler; + private ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); private VirtualFireAlarmXMPPConnector() { - super(XmppConfig.getInstance().getXmppServerIP(), - XmppConfig.getInstance().getSERVER_CONNECTION_PORT()); + super(XmppConfig.getInstance().getXmppServerIP(), XmppConfig.getInstance().getSERVER_CONNECTION_PORT()); } public void initConnector() { + String serverName = + DeviceManagementConfigurationManager.getInstance().getDeviceManagementServerInfo().getName(); + xmppVFireAlarmAdminUsername = serverName + "_" + VirtualFireAlarmConstants.DEVICE_TYPE; + xmppServerIP = XmppConfig.getInstance().getXmppServerIP(); - xmppAdminUsername = XmppConfig.getInstance().getXmppUsername(); - xmppAdminPassword = XmppConfig.getInstance().getXmppPassword(); - xmppAdminAccountJID = xmppAdminUsername + "@" + xmppServerIP; + xmppVFireAlarmAdminAccountJID = xmppVFireAlarmAdminUsername + "@" + xmppServerIP; + createXMPPAccountForDeviceType(); } - public void connectAndLogin() { + public void createXMPPAccountForDeviceType() { + boolean accountExists = false; + XmppServerClient xmppServerClient = new XmppServerClient(); + try { - super.connectAndLogin(xmppAdminUsername, xmppAdminPassword, null); - super.setMessageFilterOnReceiver(xmppAdminAccountJID); - } catch (DeviceManagementException e) { - log.error("Connect/Login attempt to XMPP Server at: " + xmppServerIP + " failed"); - retryXMPPConnection(); + accountExists = xmppServerClient.doesXMPPUserAccountExist(xmppVFireAlarmAdminUsername); + } catch (DeviceControllerException e) { + String errorMsg = "An error was encountered whilst trying to check whether Server XMPP account exists " + + "for device-type - " + VirtualFireAlarmConstants.DEVICE_TYPE; + log.error(errorMsg, e); } + + if (!accountExists) { + XmppAccount xmppAccount = new XmppAccount(); + + xmppAccount.setAccountName(xmppVFireAlarmAdminUsername); + xmppAccount.setUsername(xmppVFireAlarmAdminUsername); + xmppAccount.setPassword(V_FIREALARM_XMPP_PASSWORD); + xmppAccount.setEmail(""); + + try { + boolean xmppCreated = xmppServerClient.createXMPPAccount(xmppAccount); + if (!xmppCreated) { + log.warn("Server XMPP Account was not created for device-type - " + + VirtualFireAlarmConstants.DEVICE_TYPE + + ". Check whether XMPP is enabled in \"devicemgt-config.xml\" & restart."); + } else { + log.info("Server XMPP Account [" + xmppVFireAlarmAdminUsername + + "] was not created for device-type - " + VirtualFireAlarmConstants.DEVICE_TYPE); + } + } catch (DeviceControllerException e) { + String errorMsg = + "An error was encountered whilst trying to create Server XMPP account for device-type - " + + VirtualFireAlarmConstants.DEVICE_TYPE; + log.error(errorMsg, e); + } + } + } + + + @Override + public void connect() { + Runnable connector = new Runnable() { + public void run() { + if (!isConnected()) { + try { + connectToServer(); + loginToServer(xmppVFireAlarmAdminUsername, V_FIREALARM_XMPP_PASSWORD, null); + setFilterOnReceiver(xmppVFireAlarmAdminAccountJID); + + } catch (TransportHandlerException e) { + if (log.isDebugEnabled()) { + log.warn("Connection/Login to XMPP server at: " + server + " as " + + xmppVFireAlarmAdminUsername + " failed for device-type [" + + VirtualFireAlarmConstants.DEVICE_TYPE + "].", e); + } + } + } + } + }; + + connectorServiceHandler = service.scheduleAtFixedRate(connector, 0, timeoutInterval, TimeUnit.MILLISECONDS); } @Override - protected void processXMPPMessage(Message xmppMessage) { + public void processIncomingMessage(Message xmppMessage) throws TransportHandlerException { String from = xmppMessage.getFrom(); String subject = xmppMessage.getSubject(); String message = xmppMessage.getBody(); @@ -75,80 +148,139 @@ public class VirtualFireAlarmXMPPConnector extends XmppConnector { String owner = from.substring(indexOfSlash + 1, from.length()); if (log.isDebugEnabled()) { - log.debug("Received XMPP message for: {OWNER-" + owner + "} & {DEVICE.ID-" + deviceId + "}"); + log.debug("Received XMPP message for: [OWNER-" + owner + "] & [DEVICE.ID-" + deviceId + "]"); } - if (subject != null) { - switch (subject) { - case "PUBLISHER": - float temperature = Float.parseFloat(message.split(":")[1]); - if (!VirtualFireAlarmServiceUtils.publishToDAS(owner, deviceId, temperature)) { - log.error("XMPP Connector: Publishing data to DAS failed."); - } + try { + PublicKey clientPublicKey = VirtualFireAlarmServiceUtils.getDevicePublicKey(deviceId); + PrivateKey serverPrivateKey = VerificationManager.getServerPrivateKey(); + String actualMessage = VirtualFireAlarmServiceUtils.extractMessageFromPayload(message, serverPrivateKey, + clientPublicKey); + if (log.isDebugEnabled()) { + log.debug("XMPP: Received Message [" + actualMessage + "] from: [" + from + "]"); + } - if (log.isDebugEnabled()) { - log.debug("XMPP: Publisher Message [" + message + "] from [" + from + "]"); - log.debug("XMPP Connector: Published data to DAS successfully."); - } - break; - case "CONTROL-REPLY": - if (log.isDebugEnabled()) { - log.debug("XMPP: Reply Message [" + message + "] from [" + from + "]"); - } - String tempVal = message.split(":")[1]; - SensorDataManager.getInstance().setSensorRecord(deviceId, - VirtualFireAlarmConstants.SENSOR_TEMPERATURE, - tempVal, - Calendar.getInstance().getTimeInMillis()); - break; - default: - if (log.isDebugEnabled()) { - log.warn("Unknown XMPP Message [" + message + "] from [" + from + "] received"); - } - break; + if (subject != null) { + switch (subject) { + case "PUBLISHER": + float temperature = Float.parseFloat(actualMessage.split(":")[1]); + if (!VirtualFireAlarmServiceUtils.publishToDAS(owner, deviceId, temperature)) { + log.error("XMPP Connector: Publishing VirtualFirealarm data to DAS failed."); + } + + if (log.isDebugEnabled()) { + log.debug("XMPP: Publisher Message [" + actualMessage + "] from [" + from + "] " + + "was successfully published to DAS"); + } + break; + + case "CONTROL-REPLY": + String tempVal = actualMessage.split(":")[1]; + SensorDataManager.getInstance().setSensorRecord(deviceId, + VirtualFireAlarmConstants.SENSOR_TEMP, + tempVal, + Calendar.getInstance().getTimeInMillis()); + break; + + default: + if (log.isDebugEnabled()) { + log.warn("Unknown XMPP Message [" + actualMessage + "] from [" + from + "] received"); + } + break; + } } + } catch (VirtualFireAlarmException e) { + String errorMsg = + "CertificateManagementService failure oo Signature-Verification/Decryption was unsuccessful."; + log.error(errorMsg, e); } } else { log.warn("Received XMPP message from client with unexpected JID [" + from + "]."); } } - private void retryXMPPConnection() { - Thread retryToConnect = new Thread() { - @Override - public void run() { + @Override + public void publishDeviceData(String... publishData) throws TransportHandlerException { + if (publishData.length != 4) { + String errorMsg = "Incorrect number of arguments received to SEND-MQTT Message. " + + "Need to be [owner, deviceId, resource{BULB/TEMP}, state{ON/OFF or null}]"; + log.error(errorMsg); + throw new TransportHandlerException(errorMsg); + } - while (true) { - if (!isConnected()) { - if (log.isDebugEnabled()) { - log.debug("Re-trying to reach XMPP Server...."); - } + String deviceOwner = publishData[0]; + String deviceId = publishData[1]; + String resource = publishData[2]; + String state = publishData[3]; - try { - VirtualFireAlarmXMPPConnector.super.connectAndLogin(xmppAdminUsername, - xmppAdminPassword, - null); - VirtualFireAlarmXMPPConnector.super.setMessageFilterOnReceiver( - xmppAdminAccountJID); - } catch (DeviceManagementException e1) { - if (log.isDebugEnabled()) { - log.debug("Attempt to re-connect to XMPP-Server failed"); - } - } - } else { - break; + try { + PublicKey devicePublicKey = VirtualFireAlarmServiceUtils.getDevicePublicKey(deviceId); + PrivateKey serverPrivateKey = VerificationManager.getServerPrivateKey(); + + String actualMessage = resource + ":" + state; + String encryptedMsg = VirtualFireAlarmServiceUtils.prepareSecurePayLoad(actualMessage, + devicePublicKey, + serverPrivateKey); + + String clientToConnect = deviceId + "@" + xmppServerIP + File.separator + deviceOwner; + sendXMPPMessage(clientToConnect, encryptedMsg, "CONTROL-REQUEST"); + + } catch (VirtualFireAlarmException e) { + String errorMsg = "Preparing Secure payload failed for device - [" + deviceId + "] of owner - " + + "[" + deviceOwner + "]."; + log.error(errorMsg); + throw new TransportHandlerException(errorMsg, e); + } + } + + + @Override + public void disconnect() { + Runnable stopConnection = new Runnable() { + public void run() { + while (isConnected()) { + connectorServiceHandler.cancel(true); + closeConnection(); + if (log.isDebugEnabled()) { + log.warn("Unable to 'STOP' connection to XMPP server at: " + server + + " for user - " + xmppVFireAlarmAdminUsername); } try { - Thread.sleep(5000); + Thread.sleep(timeoutInterval); } catch (InterruptedException e1) { - log.error("XMPP: Thread Sleep Interrupt Exception"); + log.error("XMPP-Terminator: Thread Sleep Interrupt Exception for " + + VirtualFireAlarmConstants.DEVICE_TYPE + " type.", e1); } + } } }; - retryToConnect.setDaemon(true); - retryToConnect.start(); + Thread terminatorThread = new Thread(stopConnection); + terminatorThread.setDaemon(true); + terminatorThread.start(); + } + + + @Override + public void processIncomingMessage(Message message, String... messageParams) throws TransportHandlerException { + // nothing to do + } + + @Override + public void processIncomingMessage() throws TransportHandlerException { + // nothing to do + } + + @Override + public void publishDeviceData() throws TransportHandlerException { + // nothing to do + } + + @Override + public void publishDeviceData(Message publishData) throws TransportHandlerException { + // nothing to do } } + diff --git a/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/service/util/VirtualFireAlarmServiceUtils.java b/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/service/util/VirtualFireAlarmServiceUtils.java index 960d4d8d34..5d1063ebbf 100644 --- a/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/service/util/VirtualFireAlarmServiceUtils.java +++ b/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/service/util/VirtualFireAlarmServiceUtils.java @@ -32,18 +32,11 @@ import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.device.mgt.analytics.exception.DataPublisherConfigurationException; import org.wso2.carbon.device.mgt.analytics.service.DeviceAnalyticsService; import org.wso2.carbon.device.mgt.common.DeviceManagementException; -import org.wso2.carbon.device.mgt.iot.DeviceController; -import org.wso2.carbon.device.mgt.iot.controlqueue.xmpp.XmppConfig; -import org.wso2.carbon.device.mgt.iot.exception.DeviceControllerException; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.plugin.constants.VirtualFireAlarmConstants; -import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.VirtualFireAlarmService; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.exception.VirtualFireAlarmException; -import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.transport.VirtualFireAlarmMQTTSubscriber; -import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.transport.VirtualFireAlarmXMPPConnector; import javax.ws.rs.HttpMethod; import java.io.BufferedReader; -import java.io.File; import java.io.IOException; import java.io.InputStreamReader; import java.net.HttpURLConnection; @@ -157,27 +150,6 @@ public class VirtualFireAlarmServiceUtils { return responseMsg; } - public static void sendCommandViaXMPP(String deviceOwner, String deviceId, String resource, - String state, VirtualFireAlarmXMPPConnector virtualFireAlarmXMPPConnector) - throws DeviceManagementException { - - String xmppServerDomain = XmppConfig.getInstance().getXmppEndpoint(); - int indexOfChar = xmppServerDomain.lastIndexOf(File.separator); - if (indexOfChar != -1) { - xmppServerDomain = xmppServerDomain.substring((indexOfChar + 1), xmppServerDomain.length()); - } - - indexOfChar = xmppServerDomain.indexOf(":"); - if (indexOfChar != -1) { - xmppServerDomain = xmppServerDomain.substring(0, indexOfChar); - } - - String clientToConnect = deviceId + "@" + xmppServerDomain + File.separator + deviceOwner; - String message = resource.replace("/", "") + ":" + state; - - virtualFireAlarmXMPPConnector.sendXMPPMessage(clientToConnect, message, "CONTROL-REQUEST"); - } - /* --------------------------------------------------------------------------------------- Utility methods relevant to creating and sending http requests --------------------------------------------------------------------------------------- */ diff --git a/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/webapp/META-INF/resources.xml b/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/webapp/META-INF/resources.xml index 8f83e0507e..b5b75ba027 100644 --- a/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/webapp/META-INF/resources.xml +++ b/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/webapp/META-INF/resources.xml @@ -87,8 +87,8 @@ Any GET - http://localhost:9763/virtual_firealarm/controller/controller/readsonar - /controller/readsonar + http://localhost:9763/virtual_firealarm/controller/controller/readhumidity + /controller/readhumidity Any diff --git a/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/webapp/WEB-INF/cxf-servlet.xml b/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/webapp/WEB-INF/cxf-servlet.xml index 2f54a57e36..d05204ddc8 100644 --- a/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/webapp/WEB-INF/cxf-servlet.xml +++ b/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/webapp/WEB-INF/cxf-servlet.xml @@ -24,26 +24,12 @@ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://cxf.apache.org/jaxrs http://cxf.apache.org/schemas/jaxrs.xsd"> - - - - - - - - - - - - - - - + @@ -55,8 +41,8 @@ - + diff --git a/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/DeviceController.java b/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/DeviceController.java index 0ef2b7599f..b200fb4e1b 100644 --- a/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/DeviceController.java +++ b/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/DeviceController.java @@ -70,8 +70,7 @@ public class DeviceController { Class dataStoreClass = Class.forName(handlerClass); if (DataStoreConnector.class.isAssignableFrom(dataStoreClass)) { - DataStoreConnector dataStoreConnector = - (DataStoreConnector) dataStoreClass.newInstance(); + DataStoreConnector dataStoreConnector = (DataStoreConnector) dataStoreClass.newInstance(); String dataStoreName = dataStore.getName(); if (dataStore.isEnabled()) { dataStoresMap.put(dataStoreName, dataStoreConnector); diff --git a/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/controlqueue/xmpp/XmppAccount.java b/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/controlqueue/xmpp/XmppAccount.java index 0fd6f5c32b..cf0986b5a0 100644 --- a/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/controlqueue/xmpp/XmppAccount.java +++ b/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/controlqueue/xmpp/XmppAccount.java @@ -18,11 +18,6 @@ package org.wso2.carbon.device.mgt.iot.controlqueue.xmpp; -import java.util.Map; - -/** - * Created by smean-MAC on 7/24/15. - */ public class XmppAccount { private String username; private String password; diff --git a/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/controlqueue/xmpp/XmppServerClient.java b/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/controlqueue/xmpp/XmppServerClient.java index 34e14a3136..ca6d16fe5e 100644 --- a/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/controlqueue/xmpp/XmppServerClient.java +++ b/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/controlqueue/xmpp/XmppServerClient.java @@ -25,6 +25,7 @@ import org.apache.http.HttpHeaders; import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; import org.apache.http.util.EntityUtils; @@ -33,6 +34,7 @@ import org.wso2.carbon.device.mgt.iot.exception.DeviceControllerException; import org.wso2.carbon.device.mgt.iot.exception.IoTException; import org.wso2.carbon.device.mgt.iot.util.IoTUtil; +import javax.ws.rs.core.MediaType; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.MalformedURLException; @@ -46,7 +48,9 @@ public class XmppServerClient implements ControlQueueConnector { private static final String XMPP_SERVER_API_CONTEXT = "/plugins/restapi/v1"; private static final String USERS_API = "/users"; + @SuppressWarnings("unused") private static final String GROUPS_API = "/groups"; + @SuppressWarnings("unused") private static final String APPLICATION_JSON_MT = "application/json"; private String xmppEndpoint; @@ -68,9 +72,7 @@ public class XmppServerClient implements ControlQueueConnector { @Override public void enqueueControls(HashMap deviceControls) throws DeviceControllerException { - if (xmppEnabled) { - - } else { + if (!xmppEnabled) { log.warn("XMPP set to false in 'devicemgt-config.xml'"); } } @@ -79,8 +81,7 @@ public class XmppServerClient implements ControlQueueConnector { if (xmppEnabled) { String xmppUsersAPIEndpoint = xmppEndpoint + XMPP_SERVER_API_CONTEXT + USERS_API; if (log.isDebugEnabled()) { - log.debug("The API Endpoint URL of the XMPP Server is set to: " + - xmppUsersAPIEndpoint); + log.debug("The API Endpoint URL of the XMPP Server is set to: " + xmppUsersAPIEndpoint); } String encodedString = xmppUsername + ":" + xmppPassword; @@ -104,22 +105,23 @@ public class XmppServerClient implements ControlQueueConnector { " ]" + " }" + "}"; - StringEntity requestEntity = null; + + StringEntity requestEntity; try { - requestEntity = new StringEntity(jsonRequest,"application/json","UTF-8"); + requestEntity = new StringEntity(jsonRequest, MediaType.APPLICATION_JSON , StandardCharsets.UTF_8.toString()); } catch (UnsupportedEncodingException e) { return false; } - URL xmppUserApiUrl = null; + URL xmppUserApiUrl; try { xmppUserApiUrl = new URL(xmppUsersAPIEndpoint); } catch (MalformedURLException e) { - String errMsg = "Malformed URL + " + xmppUsersAPIEndpoint; + String errMsg = "Malformed XMPP URL + " + xmppUsersAPIEndpoint; log.error(errMsg); throw new DeviceControllerException(errMsg); } - HttpClient httpClient = null; + HttpClient httpClient; try { httpClient = IoTUtil.getHttpClient(xmppUserApiUrl.getPort(), xmppUserApiUrl.getProtocol()); } catch (Exception e) { @@ -127,13 +129,11 @@ public class XmppServerClient implements ControlQueueConnector { + xmppUserApiUrl.getProtocol()); return false; } - HttpPost httpPost = new HttpPost(xmppUsersAPIEndpoint); + HttpPost httpPost = new HttpPost(xmppUsersAPIEndpoint); httpPost.addHeader(HttpHeaders.AUTHORIZATION, authorizationHeader); httpPost.setEntity(requestEntity); - - try { HttpResponse httpResponse = httpClient.execute(httpPost); @@ -159,4 +159,69 @@ public class XmppServerClient implements ControlQueueConnector { return false; } } + + + public boolean doesXMPPUserAccountExist(String username) throws DeviceControllerException { + if (xmppEnabled) { + String xmppUsersAPIEndpoint = xmppEndpoint + XMPP_SERVER_API_CONTEXT + USERS_API + "/" + username; + if (log.isDebugEnabled()) { + log.debug("The API Endpoint URL of the XMPP Server is set to: " + xmppUsersAPIEndpoint); + } + + String encodedString = xmppUsername + ":" + xmppPassword; + encodedString = new String(Base64.encodeBase64(encodedString.getBytes(StandardCharsets.UTF_8))); + String authorizationHeader = "Basic " + encodedString; + + URL xmppUserApiUrl; + try { + xmppUserApiUrl = new URL(xmppUsersAPIEndpoint); + } catch (MalformedURLException e) { + String errMsg = "Malformed XMPP URL + " + xmppUsersAPIEndpoint; + log.error(errMsg); + throw new DeviceControllerException(errMsg, e); + } + + HttpClient httpClient; + try { + httpClient = IoTUtil.getHttpClient(xmppUserApiUrl.getPort(), xmppUserApiUrl.getProtocol()); + } catch (Exception e) { + String errorMsg = "Error on getting a http client for port :" + xmppUserApiUrl.getPort() + + " protocol :" + xmppUserApiUrl.getProtocol(); + log.error(errorMsg); + throw new DeviceControllerException(errorMsg, e); + } + + HttpGet httpGet = new HttpGet(xmppUsersAPIEndpoint); + httpGet.addHeader(HttpHeaders.AUTHORIZATION, authorizationHeader); + + try { + HttpResponse httpResponse = httpClient.execute(httpGet); + + if (httpResponse.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { + String response = IoTUtil.getResponseString(httpResponse); + if (log.isDebugEnabled()) { + log.debug("XMPP Server returned status: '" + httpResponse.getStatusLine().getStatusCode() + + "' for checking existence of account [" + username + "] with message:\n" + + response + "\nProbably, an account with this username does not exist."); + } + return false; + } + + } catch (IOException | IoTException e) { + String errorMsg = "Error occured whilst trying a 'GET' at : " + xmppUsersAPIEndpoint + + "\nError: " + e.getMessage(); + log.error(errorMsg); + throw new DeviceControllerException(errorMsg, e); + } + + if (log.isDebugEnabled()) { + log.debug("XMPP Server already has an account for the username - [" + username + "]."); + } + return true; + } else { + String warnMsg = "XMPP set to false in 'devicemgt-config.xml'"; + log.error(warnMsg); + throw new DeviceControllerException(warnMsg); + } + } } diff --git a/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/transport/TransportHandler.java b/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/transport/TransportHandler.java index ed64da5fb9..f597b5db36 100644 --- a/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/transport/TransportHandler.java +++ b/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/transport/TransportHandler.java @@ -34,14 +34,17 @@ public interface TransportHandler { boolean isConnected(); - //TODO:: Any errors needs to be thrown ahead - void processIncomingMessage(T message, String... messageParams); + void processIncomingMessage() throws TransportHandlerException; - void processIncomingMessage(); + void processIncomingMessage(T message) throws TransportHandlerException; - void publishDeviceData(String... publishData); + void processIncomingMessage(T message, String... messageParams) throws TransportHandlerException; - void publishDeviceData(); + void publishDeviceData() throws TransportHandlerException; + + void publishDeviceData(T publishData) throws TransportHandlerException; + + void publishDeviceData(String... publishData) throws TransportHandlerException; void disconnect(); } diff --git a/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/transport/mqtt/MQTTTransportHandler.java b/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/transport/mqtt/MQTTTransportHandler.java index d3c3fafda1..1e58e68120 100644 --- a/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/transport/mqtt/MQTTTransportHandler.java +++ b/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/transport/mqtt/MQTTTransportHandler.java @@ -309,12 +309,17 @@ public abstract class MQTTTransportHandler @Override public void messageArrived(final String topic, final MqttMessage mqttMessage) { if (log.isDebugEnabled()) { - log.info("Got an MQTT message '" + mqttMessage.toString() + "' for topic '" + topic + "'."); + log.debug("Got an MQTT message '" + mqttMessage.toString() + "' for topic '" + topic + "'."); } Thread messageProcessorThread = new Thread() { public void run() { - processIncomingMessage(mqttMessage, topic); + try { + processIncomingMessage(mqttMessage, topic); + } catch (TransportHandlerException e) { + log.error("An error occurred when trying to process received MQTT message [" + mqttMessage + "] " + + "for topic [" + topic + "].", e); + } } }; messageProcessorThread.setDaemon(true); diff --git a/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/transport/xmpp/XMPPTransportHandler.java b/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/transport/xmpp/XMPPTransportHandler.java index 838e0a1659..455a417d47 100644 --- a/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/transport/xmpp/XMPPTransportHandler.java +++ b/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/transport/xmpp/XMPPTransportHandler.java @@ -61,7 +61,6 @@ public abstract class XMPPTransportHandler implements TransportHandler private static final int DEFAULT_XMPP_PORT = 5222; private XMPPConnection connection; private int port; - private ConnectionConfiguration config; private PacketFilter filter; private PacketListener listener; @@ -71,6 +70,7 @@ public abstract class XMPPTransportHandler implements TransportHandler * * @param server the IP of the XMPP server. */ + @SuppressWarnings("unused") protected XMPPTransportHandler(String server) { this.server = server; this.port = DEFAULT_XMPP_PORT; @@ -99,6 +99,7 @@ public abstract class XMPPTransportHandler implements TransportHandler * @param port the XMPP server's port to connect to. (default - 5222) * @param timeoutInterval the timeout interval to use for the connection and reconnection */ + @SuppressWarnings("unused") protected XMPPTransportHandler(String server, int port, int timeoutInterval) { this.server = server; this.port = port; @@ -112,6 +113,7 @@ public abstract class XMPPTransportHandler implements TransportHandler * @param millis the time in millis to be set as the time-out-limit whilst waiting for a * XMPP-reply. */ + @SuppressWarnings("unused") public void setTimeoutInterval(int millis) { this.timeoutInterval = millis; } @@ -135,7 +137,7 @@ public abstract class XMPPTransportHandler implements TransportHandler log.info(String.format("Initializing connection to XMPP Server at %1$s via port " + "%2$d.", server, port)); SmackConfiguration.setPacketReplyTimeout(timeoutInterval); - config = new ConnectionConfiguration(server, port); + ConnectionConfiguration config = new ConnectionConfiguration(server, port); // TODO:: Need to enable SASL-Authentication appropriately config.setSASLAuthenticationEnabled(false); config.setSecurityMode(ConnectionConfiguration.SecurityMode.disabled); @@ -214,6 +216,7 @@ public abstract class XMPPTransportHandler implements TransportHandler * * @param senderJID the JID (XMPP-Account ID of the sender) to which the filter is to be set. */ + @SuppressWarnings("unused") protected void setFilterOnSender(String senderJID) { filter = new AndFilter(new PacketTypeFilter(Message.class), new FromContainsFilter( senderJID)); @@ -224,7 +227,12 @@ public abstract class XMPPTransportHandler implements TransportHandler final Message xmppMessage = (Message) packet; Thread msgProcessThread = new Thread() { public void run() { - processIncomingMessage(xmppMessage); + try { + processIncomingMessage(xmppMessage); + } catch (TransportHandlerException e) { + log.error("An error occurred when trying to process received XMPP message " + + "[" + xmppMessage.getBody() + "].", e); + } } }; msgProcessThread.setDaemon(true); @@ -255,7 +263,12 @@ public abstract class XMPPTransportHandler implements TransportHandler final Message xmppMessage = (Message) packet; Thread msgProcessThread = new Thread() { public void run() { - processIncomingMessage(xmppMessage); + try { + processIncomingMessage(xmppMessage); + } catch (TransportHandlerException e) { + log.error("An error occurred when trying to process received XMPP message " + + "[" + xmppMessage.getBody() + "].", e); + } } }; msgProcessThread.setDaemon(true); @@ -280,6 +293,7 @@ public abstract class XMPPTransportHandler implements TransportHandler * if false: then the filter is set with 'OR' operator (senderJID | * receiverJID) */ + @SuppressWarnings("unused") protected void setMessageFilterAndListener(String senderJID, String receiverJID, boolean andCondition) { PacketFilter jidFilter; @@ -300,7 +314,12 @@ public abstract class XMPPTransportHandler implements TransportHandler final Message xmppMessage = (Message) packet; Thread msgProcessThread = new Thread() { public void run() { - processIncomingMessage(xmppMessage); + try { + processIncomingMessage(xmppMessage); + } catch (TransportHandlerException e) { + log.error("An error occurred when trying to process received XMPP message " + + "[" + xmppMessage.getBody() + "].", e); + } } }; msgProcessThread.setDaemon(true); @@ -319,6 +338,7 @@ public abstract class XMPPTransportHandler implements TransportHandler * @param JID the JID (XMPP Account ID) to which the message is to be sent to. * @param message the XMPP-Message that is to be sent. */ + @SuppressWarnings("unused") protected void sendXMPPMessage(String JID, String message) { sendXMPPMessage(JID, message, "XMPP-Message"); } @@ -351,20 +371,11 @@ public abstract class XMPPTransportHandler implements TransportHandler protected void sendXMPPMessage(String JID, Message xmppMessage) { connection.sendPacket(xmppMessage); if (log.isDebugEnabled()) { - log.debug("Message: '" + xmppMessage.getBody() + "' sent to XMPP JID [" + JID + - "] sent successfully."); + log.debug("Message: '" + xmppMessage.getBody() + "' sent to XMPP JID [" + JID + "] sent successfully."); } } - /** - * Disables default debugger provided by the XMPPConnection. - */ - protected void disableDebugger() { - connection.DEBUG_ENABLED = false; - } - - /** * Closes the connection to the XMPP Server. */ From 5f8751cd957bded7713204f798a08fdcc0da2d58 Mon Sep 17 00:00:00 2001 From: Shabirmean Date: Tue, 22 Dec 2015 12:29:48 +0530 Subject: [PATCH 2/2] Fixes to the VirtualFireAlarmMQTTConnector --- .../service/VirtualFireAlarmService.java | 1 - .../VirtualFireAlarmMQTTConnector.java | 17 +++++++---------- .../iot/controlqueue/xmpp/XmppServerClient.java | 2 +- 3 files changed, 8 insertions(+), 12 deletions(-) diff --git a/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/service/VirtualFireAlarmService.java b/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/service/VirtualFireAlarmService.java index 1ea0aca206..9631cc6f2e 100644 --- a/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/service/VirtualFireAlarmService.java +++ b/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/service/VirtualFireAlarmService.java @@ -147,7 +147,6 @@ public class VirtualFireAlarmService { // Runnable xmppStarter = new Runnable() { // @Override // public void run() { - virtualFireAlarmMQTTConnector.initConnector(); virtualFireAlarmMQTTConnector.connect(); // } // }; diff --git a/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/service/transport/VirtualFireAlarmMQTTConnector.java b/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/service/transport/VirtualFireAlarmMQTTConnector.java index d8a45678c4..49170235fc 100644 --- a/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/service/transport/VirtualFireAlarmMQTTConnector.java +++ b/components/device-mgt-iot-virtualfirealarm/org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/service/transport/VirtualFireAlarmMQTTConnector.java @@ -43,22 +43,19 @@ import java.util.UUID; public class VirtualFireAlarmMQTTConnector extends MQTTTransportHandler { private static Log log = LogFactory.getLog(VirtualFireAlarmMQTTConnector.class); - private static String serverName; - private static String subscribeTopic; - private static String iotServerSubscriber; + private static String serverName = DeviceManagementConfigurationManager.getInstance(). + getDeviceManagementServerInfo().getName(); + + private static String subscribeTopic = serverName + File.separator + "+" + File.separator + + VirtualFireAlarmConstants.DEVICE_TYPE + File.separator + "+" + File.separator + "publisher"; + + private static String iotServerSubscriber = UUID.randomUUID().toString().substring(0, 5); private VirtualFireAlarmMQTTConnector() { super(iotServerSubscriber, VirtualFireAlarmConstants.DEVICE_TYPE, MqttConfig.getInstance().getMqttQueueEndpoint(), subscribeTopic); } - public void initConnector(){ - iotServerSubscriber = UUID.randomUUID().toString().substring(0, 5); - serverName = DeviceManagementConfigurationManager.getInstance().getDeviceManagementServerInfo().getName(); - subscribeTopic = serverName + File.separator + "+" + File.separator + VirtualFireAlarmConstants.DEVICE_TYPE + - File.separator + "+" + File.separator + "publisher"; - } - @Override public void connect() { Runnable connector = new Runnable() { diff --git a/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/controlqueue/xmpp/XmppServerClient.java b/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/controlqueue/xmpp/XmppServerClient.java index ca6d16fe5e..043adafdaa 100644 --- a/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/controlqueue/xmpp/XmppServerClient.java +++ b/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/controlqueue/xmpp/XmppServerClient.java @@ -220,7 +220,7 @@ public class XmppServerClient implements ControlQueueConnector { return true; } else { String warnMsg = "XMPP set to false in 'devicemgt-config.xml'"; - log.error(warnMsg); + log.warn(warnMsg); throw new DeviceControllerException(warnMsg); } }