From 2cf022ef550996a3e2a73cfb81c097aab571d38e Mon Sep 17 00:00:00 2001 From: Shabirmean Date: Wed, 23 Dec 2015 15:08:27 +0530 Subject: [PATCH] Deleting all active xmpp-sessions during server shutdown --- .../api/DigitalDisplayControllerService.java | 4 +- .../api/transport/CommunicationHandler.java | 15 - .../CommunicationHandlerException.java | 38 -- .../transport/MQTTCommunicationHandler.java | 345 ------------ ...igitalDisplayMqttCommunicationHandler.java | 85 +-- .../controlqueue/xmpp/XmppServerClient.java | 502 +++++++++++------- .../IotDeviceManagementServiceComponent.java | 216 ++++---- 7 files changed, 497 insertions(+), 708 deletions(-) delete mode 100644 components/device-mgt-iot-digitaldisplay/org.wso2.carbon.device.mgt.iot.digitaldisplay.api/src/main/java/org/wso2/carbon/device/mgt/iot/digitaldisplay/api/transport/CommunicationHandler.java delete mode 100644 components/device-mgt-iot-digitaldisplay/org.wso2.carbon.device.mgt.iot.digitaldisplay.api/src/main/java/org/wso2/carbon/device/mgt/iot/digitaldisplay/api/transport/CommunicationHandlerException.java delete mode 100644 components/device-mgt-iot-digitaldisplay/org.wso2.carbon.device.mgt.iot.digitaldisplay.api/src/main/java/org/wso2/carbon/device/mgt/iot/digitaldisplay/api/transport/MQTTCommunicationHandler.java diff --git a/components/device-mgt-iot-digitaldisplay/org.wso2.carbon.device.mgt.iot.digitaldisplay.api/src/main/java/org/wso2/carbon/device/mgt/iot/digitaldisplay/api/DigitalDisplayControllerService.java b/components/device-mgt-iot-digitaldisplay/org.wso2.carbon.device.mgt.iot.digitaldisplay.api/src/main/java/org/wso2/carbon/device/mgt/iot/digitaldisplay/api/DigitalDisplayControllerService.java index 120a89b0a7..c5b8995835 100644 --- a/components/device-mgt-iot-digitaldisplay/org.wso2.carbon.device.mgt.iot.digitaldisplay.api/src/main/java/org/wso2/carbon/device/mgt/iot/digitaldisplay/api/DigitalDisplayControllerService.java +++ b/components/device-mgt-iot-digitaldisplay/org.wso2.carbon.device.mgt.iot.digitaldisplay.api/src/main/java/org/wso2/carbon/device/mgt/iot/digitaldisplay/api/DigitalDisplayControllerService.java @@ -26,7 +26,7 @@ import org.wso2.carbon.apimgt.annotations.device.feature.Feature; import org.wso2.carbon.device.mgt.common.DeviceManagementException; import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig; import org.wso2.carbon.device.mgt.iot.digitaldisplay.api.exception.DigitalDisplayException; -import org.wso2.carbon.device.mgt.iot.digitaldisplay.api.transport.CommunicationHandlerException; +import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException; import org.wso2.carbon.device.mgt.iot.digitaldisplay.api.util.DigitalDisplayMqttCommunicationHandler; import org.wso2.carbon.device.mgt.iot.digitaldisplay.constants.DigitalDisplayConstants; @@ -466,7 +466,7 @@ public class DigitalDisplayControllerService { try { digitalDisplayMqttCommunicationHandler.publishToDigitalDisplay(topic, payload, 2, true); - } catch (CommunicationHandlerException e) { + } catch (TransportHandlerException e) { String errorMessage = "Error publishing data to device with ID " + deviceId; throw new DigitalDisplayException(errorMessage, e); } diff --git a/components/device-mgt-iot-digitaldisplay/org.wso2.carbon.device.mgt.iot.digitaldisplay.api/src/main/java/org/wso2/carbon/device/mgt/iot/digitaldisplay/api/transport/CommunicationHandler.java b/components/device-mgt-iot-digitaldisplay/org.wso2.carbon.device.mgt.iot.digitaldisplay.api/src/main/java/org/wso2/carbon/device/mgt/iot/digitaldisplay/api/transport/CommunicationHandler.java deleted file mode 100644 index 9c165fef62..0000000000 --- a/components/device-mgt-iot-digitaldisplay/org.wso2.carbon.device.mgt.iot.digitaldisplay.api/src/main/java/org/wso2/carbon/device/mgt/iot/digitaldisplay/api/transport/CommunicationHandler.java +++ /dev/null @@ -1,15 +0,0 @@ -package org.wso2.carbon.device.mgt.iot.digitaldisplay.api.transport; - -public interface CommunicationHandler { - int DEFAULT_TIMEOUT_INTERVAL = 5000; // millis ~ 10 sec - - void connect(); - - boolean isConnected(); - - void processIncomingMessage(T message, String... messageParams); - - void processIncomingMessage(); - - void disconnect(); -} diff --git a/components/device-mgt-iot-digitaldisplay/org.wso2.carbon.device.mgt.iot.digitaldisplay.api/src/main/java/org/wso2/carbon/device/mgt/iot/digitaldisplay/api/transport/CommunicationHandlerException.java b/components/device-mgt-iot-digitaldisplay/org.wso2.carbon.device.mgt.iot.digitaldisplay.api/src/main/java/org/wso2/carbon/device/mgt/iot/digitaldisplay/api/transport/CommunicationHandlerException.java deleted file mode 100644 index 447ae6f1b1..0000000000 --- a/components/device-mgt-iot-digitaldisplay/org.wso2.carbon.device.mgt.iot.digitaldisplay.api/src/main/java/org/wso2/carbon/device/mgt/iot/digitaldisplay/api/transport/CommunicationHandlerException.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.wso2.carbon.device.mgt.iot.digitaldisplay.api.transport; - -public class CommunicationHandlerException extends Exception { - private static final long serialVersionUID = 2736466230451105440L; - - private String errorMessage; - - public String getErrorMessage() { - return errorMessage; - } - - public void setErrorMessage(String errorMessage) { - this.errorMessage = errorMessage; - } - - public CommunicationHandlerException(String msg, Exception nestedEx) { - super(msg, nestedEx); - setErrorMessage(msg); - } - - public CommunicationHandlerException(String message, Throwable cause) { - super(message, cause); - setErrorMessage(message); - } - - public CommunicationHandlerException(String msg) { - super(msg); - setErrorMessage(msg); - } - - public CommunicationHandlerException() { - super(); - } - - public CommunicationHandlerException(Throwable cause) { - super(cause); - } -} diff --git a/components/device-mgt-iot-digitaldisplay/org.wso2.carbon.device.mgt.iot.digitaldisplay.api/src/main/java/org/wso2/carbon/device/mgt/iot/digitaldisplay/api/transport/MQTTCommunicationHandler.java b/components/device-mgt-iot-digitaldisplay/org.wso2.carbon.device.mgt.iot.digitaldisplay.api/src/main/java/org/wso2/carbon/device/mgt/iot/digitaldisplay/api/transport/MQTTCommunicationHandler.java deleted file mode 100644 index c441f892c5..0000000000 --- a/components/device-mgt-iot-digitaldisplay/org.wso2.carbon.device.mgt.iot.digitaldisplay.api/src/main/java/org/wso2/carbon/device/mgt/iot/digitaldisplay/api/transport/MQTTCommunicationHandler.java +++ /dev/null @@ -1,345 +0,0 @@ -/* - * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. - * - * WSO2 Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.wso2.carbon.device.mgt.iot.digitaldisplay.api.transport; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.eclipse.paho.client.mqttv3.*; - -import java.io.File; -import java.nio.charset.StandardCharsets; - -/** - * This class contains the IoT-Server specific implementation for all the MQTT functionality. - * This includes connecting to a MQTT Broker & subscribing to the appropriate MQTT-topic, action - * plan upon losing connection or successfully delivering a message to the broker and processing - * incoming messages. Makes use of the 'Paho-MQTT' library provided by Eclipse Org. - *

- * It is an abstract class that implements the common interface "CommunicationHandler" and the - * "MqttCallback". Whilst providing some methods which handle key MQTT relevant tasks, this class - * implements only the most generic methods of the "CommunicationHandler" interface. The rest of - * the methods are left for any extended concrete-class to implement as per its need. - */ -public abstract class MQTTCommunicationHandler - implements MqttCallback, CommunicationHandler { - private static final Log log = LogFactory.getLog(MQTTCommunicationHandler.class); - - public static final int DEFAULT_MQTT_QUALITY_OF_SERVICE = 0; - - private MqttClient client; - private String clientId; - private MqttConnectOptions options; - private String clientWillTopic; - - protected String mqttBrokerEndPoint; - protected int timeoutInterval; - protected String subscribeTopic; - - /** - * Constructor for the MQTTCommunicationHandler which takes in the owner, type of the device - * and the MQTT Broker URL and the topic to subscribe. - * - * @param deviceOwner the owner of the device. - * @param deviceType the CDMF Device-Type of the device. - * @param mqttBrokerEndPoint the IP/URL of the MQTT broker endpoint. - * @param subscribeTopic the MQTT topic to which the client is to be subscribed - */ - protected MQTTCommunicationHandler(String deviceOwner, String deviceType, - String mqttBrokerEndPoint, - String subscribeTopic) { - this.clientId = deviceOwner + ":" + deviceType; - this.subscribeTopic = subscribeTopic; - this.clientWillTopic = deviceType + File.separator + "disconnection"; - this.mqttBrokerEndPoint = mqttBrokerEndPoint; - this.timeoutInterval = DEFAULT_TIMEOUT_INTERVAL; - this.initSubscriber(); - } - - /** - * Constructor for the MQTTCommunicationHandler which takes in the owner, type of the device - * and the MQTT Broker URL and the topic to subscribe. Additionally this constructor takes in - * the reconnection-time interval between successive attempts to connect to the broker. - * - * @param deviceOwner the owner of the device. - * @param deviceType the CDMF Device-Type of the device. - * @param mqttBrokerEndPoint the IP/URL of the MQTT broker endpoint. - * @param subscribeTopic the MQTT topic to which the client is to be subscribed - * @param intervalInMillis the time interval in MILLI-SECONDS between successive - * attempts to connect to the broker. - */ - protected MQTTCommunicationHandler(String deviceOwner, String deviceType, - String mqttBrokerEndPoint, String subscribeTopic, - int intervalInMillis) { - this.clientId = deviceOwner + ":" + deviceType; - this.subscribeTopic = subscribeTopic; - this.clientWillTopic = deviceType + File.separator + "disconnection"; - this.mqttBrokerEndPoint = mqttBrokerEndPoint; - this.timeoutInterval = intervalInMillis; - this.initSubscriber(); - } - - public void setTimeoutInterval(int timeoutInterval) { - this.timeoutInterval = timeoutInterval; - } - - /** - * Initializes the MQTT-Client. Creates a client using the given MQTT-broker endpoint and the - * clientId (which is constructed by a concatenation of [deviceOwner]:[deviceType]). Also sets - * the client's options parameter with the clientWillTopic (in-case of connection failure) and - * other info. Also sets the call-back this current class. - */ - private void initSubscriber() { - try { - - client = new MqttClient(this.mqttBrokerEndPoint, clientId, null); - log.info("MQTT subscriber was created with ClientID : " + clientId); - } catch (MqttException ex) { - String errorMsg = "MQTT Client Error\n" + "\tReason: " + ex.getReasonCode() + - "\n\tMessage: " + ex.getMessage() + "\n\tLocalMsg: " + - ex.getLocalizedMessage() + "\n\tCause: " + ex.getCause() + - "\n\tException: " + ex; - log.error(errorMsg); - } - - options = new MqttConnectOptions(); - options.setCleanSession(false); - options.setWill(clientWillTopic, "Connection-Lost".getBytes(StandardCharsets.UTF_8), 2, - true); - client.setCallback(this); - } - - /** - * Checks whether the connection to the MQTT-Broker persists. - * - * @return true if the client is connected to the MQTT-Broker, else false. - */ - @Override - public boolean isConnected() { - return client.isConnected(); - } - - - /** - * Connects to the MQTT-Broker and if successfully established connection. - * - * @throws CommunicationHandlerException in the event of 'Connecting to' the MQTT broker fails. - */ - protected void connectToQueue() throws CommunicationHandlerException { - try { - client.connect(options); - - if (log.isDebugEnabled()) { - log.debug("Subscriber connected to queue at: " + this.mqttBrokerEndPoint); - } - } catch (MqttSecurityException ex) { - String errorMsg = "MQTT Security Exception when connecting to queue\n" + "\tReason: " + - " " + - ex.getReasonCode() + "\n\tMessage: " + ex.getMessage() + - "\n\tLocalMsg: " + ex.getLocalizedMessage() + "\n\tCause: " + - ex.getCause() + "\n\tException: " + ex; - if (log.isDebugEnabled()) { - log.debug(errorMsg); - } - throw new CommunicationHandlerException(errorMsg, ex); - - } catch (MqttException ex) { - String errorMsg = "MQTT Exception when connecting to queue\n" + "\tReason: " + - ex.getReasonCode() + "\n\tMessage: " + ex.getMessage() + - "\n\tLocalMsg: " + ex.getLocalizedMessage() + "\n\tCause: " + - ex.getCause() + "\n\tException: " + ex; - if (log.isDebugEnabled()) { - log.debug(errorMsg); - } - throw new CommunicationHandlerException(errorMsg, ex); - } - } - - /** - * Subscribes to the MQTT-Topic specific to this MQTT Client. (The MQTT-Topic specific to the - * device is taken in as a constructor parameter of this class) . - * - * @throws CommunicationHandlerException in the event of 'Subscribing to' the MQTT broker - * fails. - */ - protected void subscribeToQueue() throws CommunicationHandlerException { - try { - client.subscribe(subscribeTopic, 0); - log.info("Subscriber '" + clientId + "' subscribed to topic: " + subscribeTopic); - } catch (MqttException ex) { - String errorMsg = "MQTT Exception when trying to subscribe to topic: " + - subscribeTopic + "\n\tReason: " + ex.getReasonCode() + - "\n\tMessage: " + ex.getMessage() + "\n\tLocalMsg: " + - ex.getLocalizedMessage() + "\n\tCause: " + ex.getCause() + - "\n\tException: " + ex; - if (log.isDebugEnabled()) { - log.debug(errorMsg); - } - - throw new CommunicationHandlerException(errorMsg, ex); - } - } - - - /** - * This method is used to publish reply-messages for the control signals received. - * Invocation of this method calls its overloaded-method with a QoS equal to that of the - * default value. - * - * @param topic the topic to which the reply message is to be published. - * @param payLoad the reply-message (payload) of the MQTT publish action. - */ - protected void publishToQueue(String topic, String payLoad) - throws CommunicationHandlerException { - publishToQueue(topic, payLoad, DEFAULT_MQTT_QUALITY_OF_SERVICE, false); - } - - /** - * This is an overloaded method that publishes MQTT reply-messages for control signals - * received form the IoT-Server. - * - * @param topic the topic to which the reply message is to be published - * @param payLoad the reply-message (payload) of the MQTT publish action. - * @param qos the Quality-of-Service of the current publish action. - * Could be 0(At-most once), 1(At-least once) or 2(Exactly once) - */ - protected void publishToQueue(String topic, String payLoad, int qos, boolean retained) - throws CommunicationHandlerException { - try { - client.publish(topic, payLoad.getBytes(StandardCharsets.UTF_8), qos, retained); - if (log.isDebugEnabled()) { - log.debug("Message: " + payLoad + " to MQTT topic [" + topic + - "] published successfully"); - } - } catch (MqttException ex) { - String errorMsg = - "MQTT Client Error" + "\n\tReason: " + ex.getReasonCode() + "\n\tMessage: " + - ex.getMessage() + "\n\tLocalMsg: " + ex.getLocalizedMessage() + - "\n\tCause: " + ex.getCause() + "\n\tException: " + ex; - log.info(ex); - throw new CommunicationHandlerException(errorMsg, ex); - } - } - - - protected void publishToQueue(String topic, MqttMessage message) - throws CommunicationHandlerException { - try { - client.publish(topic, message); - if (log.isDebugEnabled()) { - log.debug("Message: " + message.toString() + " to MQTT topic [" + topic + - "] published successfully"); - } - } catch (MqttException ex) { - String errorMsg = - "MQTT Client Error" + "\n\tReason: " + ex.getReasonCode() + "\n\tMessage: " + - ex.getMessage() + "\n\tLocalMsg: " + ex.getLocalizedMessage() + - "\n\tCause: " + ex.getCause() + "\n\tException: " + ex; - log.info(errorMsg); - throw new CommunicationHandlerException(errorMsg, ex); - } - } - - - /** - * Callback method which is triggered once the MQTT client losers its connection to the broker. - * Spawns a new thread that executes necessary actions to try and reconnect to the endpoint. - * - * @param throwable a Throwable Object containing the details as to why the failure occurred. - */ - @Override - public void connectionLost(Throwable throwable) { - log.warn("Lost Connection for client: " + this.clientId + - " to " + this.mqttBrokerEndPoint + ".\nThis was due to - " + - throwable.getMessage()); - - - Thread reconnectThread = new Thread() { - public void run() { - connect(); - } - }; - reconnectThread.setDaemon(true); - reconnectThread.start(); - } - - /** - * Callback method which is triggered upon receiving a MQTT Message from the broker. Spawns a - * new thread that executes any actions to be taken with the received message. - * - * @param topic the MQTT-Topic to which the received message was published to and the - * client was subscribed to. - * @param mqttMessage the actual MQTT-Message that was received from the broker. - */ - @Override - public void messageArrived(final String topic, final MqttMessage mqttMessage) { - if (log.isDebugEnabled()) { - log.info("Got an MQTT message '" + mqttMessage.toString() + "' for topic '" + topic + - "'."); - } - - Thread messageProcessorThread = new Thread() { - public void run() { - processIncomingMessage(mqttMessage, topic); - } - }; - messageProcessorThread.setDaemon(true); - messageProcessorThread.start(); - } - - /** - * Callback method which gets triggered upon successful completion of a message delivery to - * the broker. - * - * @param iMqttDeliveryToken the MQTT-DeliveryToken which includes the details about the - * specific message delivery. - */ - @Override - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - String message = ""; - try { - if (iMqttDeliveryToken.isComplete()) { - if (iMqttDeliveryToken.getMessage() != null){ - message = iMqttDeliveryToken.getMessage().toString(); - } - } else { - log.error("MQTT Message not delivered"); - } - } catch (MqttException e) { - log.error( - "Error occurred whilst trying to read the message from the MQTT delivery token."); - } - String topic = iMqttDeliveryToken.getTopics()[0]; - String client = iMqttDeliveryToken.getClient().getClientId(); - - if (log.isDebugEnabled()) { - log.debug("Message - '" + message + "' of client [" + client + "] for the topic (" + - topic + ") was delivered successfully."); - } - } - - /** - * Closes the connection to the MQTT Broker. - */ - public void closeConnection() throws MqttException { - if (client != null && isConnected()) { - client.disconnect(); - } - } -} - diff --git a/components/device-mgt-iot-digitaldisplay/org.wso2.carbon.device.mgt.iot.digitaldisplay.api/src/main/java/org/wso2/carbon/device/mgt/iot/digitaldisplay/api/util/DigitalDisplayMqttCommunicationHandler.java b/components/device-mgt-iot-digitaldisplay/org.wso2.carbon.device.mgt.iot.digitaldisplay.api/src/main/java/org/wso2/carbon/device/mgt/iot/digitaldisplay/api/util/DigitalDisplayMqttCommunicationHandler.java index 66529a9e0d..154c9cd207 100644 --- a/components/device-mgt-iot-digitaldisplay/org.wso2.carbon.device.mgt.iot.digitaldisplay.api/src/main/java/org/wso2/carbon/device/mgt/iot/digitaldisplay/api/util/DigitalDisplayMqttCommunicationHandler.java +++ b/components/device-mgt-iot-digitaldisplay/org.wso2.carbon.device.mgt.iot.digitaldisplay.api/src/main/java/org/wso2/carbon/device/mgt/iot/digitaldisplay/api/util/DigitalDisplayMqttCommunicationHandler.java @@ -5,32 +5,35 @@ import org.apache.commons.logging.LogFactory; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig; -import org.wso2.carbon.device.mgt.iot.digitaldisplay.constants.DigitalDisplayConstants; -import org.wso2.carbon.device.mgt.iot.digitaldisplay.api.transport.CommunicationHandlerException; -import org.wso2.carbon.device.mgt.iot.digitaldisplay.api.transport.MQTTCommunicationHandler; import org.wso2.carbon.device.mgt.iot.digitaldisplay.api.websocket.DigitalDisplayWebSocketServerEndPoint; +import org.wso2.carbon.device.mgt.iot.digitaldisplay.constants.DigitalDisplayConstants; +import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException; +import org.wso2.carbon.device.mgt.iot.transport.mqtt.MQTTTransportHandler; import java.io.File; import java.util.UUID; import java.util.concurrent.ScheduledFuture; +//import org.wso2.carbon.device.mgt.iot.digitaldisplay.api.transport.CommunicationHandlerException; +//import org.wso2.carbon.device.mgt.iot.digitaldisplay.api.transport.MQTTCommunicationHandler; -public class DigitalDisplayMqttCommunicationHandler extends MQTTCommunicationHandler { + +public class DigitalDisplayMqttCommunicationHandler extends MQTTTransportHandler { private static Log log = LogFactory.getLog(DigitalDisplayMqttCommunicationHandler.class); private static final String subscribeTopic = - "wso2"+ File.separator+"iot"+File.separator+"+"+File.separator+ - DigitalDisplayConstants.DEVICE_TYPE+File.separator+"+"+File.separator+ + "wso2" + File.separator + "iot" + File.separator + "+" + File.separator + + DigitalDisplayConstants.DEVICE_TYPE + File.separator + "+" + File.separator + "digital_display_publisher"; - private static String iotServerSubscriber = UUID.randomUUID().toString().substring(0,5); + private static String iotServerSubscriber = UUID.randomUUID().toString().substring(0, 5); private ScheduledFuture dataPushServiceHandler; private DigitalDisplayMqttCommunicationHandler() { super(iotServerSubscriber, DigitalDisplayConstants.DEVICE_TYPE, - MqttConfig.getInstance().getMqttQueueEndpoint(), subscribeTopic); + MqttConfig.getInstance().getMqttQueueEndpoint(), subscribeTopic); } public ScheduledFuture getDataPushServiceHandler() { @@ -42,15 +45,15 @@ public class DigitalDisplayMqttCommunicationHandler extends MQTTCommunicationHan Runnable connect = new Runnable() { @Override public void run() { - while (!isConnected()){ + while (!isConnected()) { try { log.info("Trying to Connect.."); connectToQueue(); subscribeToQueue(); - } catch (CommunicationHandlerException e) { + } catch (TransportHandlerException e) { log.warn("Connection/Subscription to MQTT Broker at: " + - mqttBrokerEndPoint + " failed"); + mqttBrokerEndPoint + " failed"); try { Thread.sleep(timeoutInterval); @@ -76,36 +79,32 @@ public class DigitalDisplayMqttCommunicationHandler extends MQTTCommunicationHan String topic = messageParams[0]; - String ownerAndId = topic.replace("wso2"+File.separator+"iot"+File.separator,""); - ownerAndId = ownerAndId.replace(File.separator+ DigitalDisplayConstants.DEVICE_TYPE+File.separator,":"); - ownerAndId = ownerAndId.replace(File.separator+"digital_display_publisher",""); + String ownerAndId = topic.replace("wso2" + File.separator + "iot" + File.separator, ""); + ownerAndId = ownerAndId.replace(File.separator + DigitalDisplayConstants.DEVICE_TYPE + File.separator, ":"); + ownerAndId = ownerAndId.replace(File.separator + "digital_display_publisher", ""); String owner = ownerAndId.split(":")[0]; String deviceId = ownerAndId.split(":")[1]; - String [] messageData = message.toString().split(":"); + String[] messageData = message.toString().split(":"); log.info("Received MQTT message for: {OWNER-" + owner + "} & {DEVICE.ID-" + deviceId + "}"); - if(messageData.length == 3){ - String randomId = messageData[0]; - String requestMessage = messageData[1]; - String result = messageData[2]; - log.info("Return result " + result + " for Request " + requestMessage); - DigitalDisplayWebSocketServerEndPoint.sendMessage(randomId, result); + if (messageData.length == 3) { + String randomId = messageData[0]; + String requestMessage = messageData[1]; + String result = messageData[2]; + log.info("Return result " + result + " for Request " + requestMessage); + DigitalDisplayWebSocketServerEndPoint.sendMessage(randomId, result); } } - @Override - public void processIncomingMessage() { - - } - - public void publishToDigitalDisplay(String topic, String payLoad, int qos, boolean retained) throws CommunicationHandlerException { - log.info(topic + " " + payLoad); - publishToQueue(topic, payLoad, qos, retained); + public void publishToDigitalDisplay(String topic, String payLoad, int qos, boolean retained) + throws TransportHandlerException { + log.info(topic + " " + payLoad); + publishToQueue(topic, payLoad, qos, retained); } @Override @@ -120,7 +119,7 @@ public class DigitalDisplayMqttCommunicationHandler extends MQTTCommunicationHan } catch (MqttException e) { if (log.isDebugEnabled()) { log.warn("Unable to 'STOP' MQTT connection at broker at: " + - mqttBrokerEndPoint); + mqttBrokerEndPoint); } try { @@ -138,4 +137,30 @@ public class DigitalDisplayMqttCommunicationHandler extends MQTTCommunicationHan terminatorThread.start(); } + + @Override + public void publishDeviceData() throws TransportHandlerException { + + } + + @Override + public void publishDeviceData(MqttMessage publishData) throws TransportHandlerException { + + } + + @Override + public void publishDeviceData(String... publishData) throws TransportHandlerException { + + } + + @Override + public void processIncomingMessage() { + + } + + @Override + public void processIncomingMessage(MqttMessage message) throws TransportHandlerException { + + } + } diff --git a/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/controlqueue/xmpp/XmppServerClient.java b/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/controlqueue/xmpp/XmppServerClient.java index 0b07b14241..8639ee392a 100644 --- a/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/controlqueue/xmpp/XmppServerClient.java +++ b/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/controlqueue/xmpp/XmppServerClient.java @@ -25,10 +25,13 @@ import org.apache.http.HttpHeaders; import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; import org.apache.http.util.EntityUtils; +import org.json.JSONArray; +import org.json.JSONObject; import org.wso2.carbon.device.mgt.iot.controlqueue.ControlQueueConnector; import org.wso2.carbon.device.mgt.iot.exception.DeviceControllerException; import org.wso2.carbon.device.mgt.iot.exception.IoTException; @@ -44,184 +47,323 @@ import java.util.HashMap; public class XmppServerClient implements ControlQueueConnector { - private static final Log log = LogFactory.getLog(XmppServerClient.class); - - private static final String XMPP_SERVER_API_CONTEXT = "/plugins/restapi/v1"; - private static final String USERS_API = "/users"; - @SuppressWarnings("unused") - private static final String GROUPS_API = "/groups"; - @SuppressWarnings("unused") - private static final String APPLICATION_JSON_MT = "application/json"; - - private String xmppEndpoint; - private String xmppUsername; - private String xmppPassword; - private boolean xmppEnabled = false; - - public XmppServerClient() { - } - - @Override - public void initControlQueue() { - xmppEndpoint = XmppConfig.getInstance().getXmppEndpoint(); - xmppUsername = XmppConfig.getInstance().getXmppUsername(); - xmppPassword = XmppConfig.getInstance().getXmppPassword(); - xmppEnabled = XmppConfig.getInstance().isEnabled(); - } - - @Override - public void enqueueControls(HashMap deviceControls) - throws DeviceControllerException { - if (!xmppEnabled) { - log.warn("XMPP set to false in 'devicemgt-config.xml'"); - } - } - - public boolean createXMPPAccount(XmppAccount newUserAccount) throws DeviceControllerException { - if (xmppEnabled) { - String xmppUsersAPIEndpoint = xmppEndpoint + XMPP_SERVER_API_CONTEXT + USERS_API; - if (log.isDebugEnabled()) { - log.debug("The API Endpoint URL of the XMPP Server is set to: " + xmppUsersAPIEndpoint); - } - - String encodedString = xmppUsername + ":" + xmppPassword; - encodedString = new String(Base64.encodeBase64(encodedString.getBytes(StandardCharsets.UTF_8))); - String authorizationHeader = "Basic " + encodedString; - String jsonRequest ="{\n" + - " \"username\": \""+newUserAccount.getUsername()+"\"," + - " \"password\": \""+newUserAccount.getPassword()+"\"," + - " \"name\": \""+newUserAccount.getAccountName()+"\"," + - " \"email\": \""+newUserAccount.getEmail()+"\"," + - " \"properties\": {" + - " \"property\": [" + - " {" + - " \"@key\": \"console.rows_per_page\"," + - " \"@value\": \"user-summary=8\"" + - " }," + - " {" + - " \"@key\": \"console.order\"," + - " \"@value\": \"session-summary=1\"" + - " }" + - " ]" + - " }" + - "}"; - - StringEntity requestEntity; - try { - requestEntity = new StringEntity(jsonRequest, MediaType.APPLICATION_JSON , StandardCharsets.UTF_8.toString()); - } catch (UnsupportedEncodingException e) { - return false; - } - - URL xmppUserApiUrl; - try { - xmppUserApiUrl = new URL(xmppUsersAPIEndpoint); - } catch (MalformedURLException e) { - String errMsg = "Malformed XMPP URL + " + xmppUsersAPIEndpoint; - log.error(errMsg); - throw new DeviceControllerException(errMsg); - } - HttpClient httpClient; - try { - httpClient = IoTUtil.getHttpClient(xmppUserApiUrl.getPort(), xmppUserApiUrl.getProtocol()); - } catch (Exception e) { - log.error("Error on getting a http client for port :" + xmppUserApiUrl.getPort() + " protocol :" - + xmppUserApiUrl.getProtocol()); - return false; - } - - HttpPost httpPost = new HttpPost(xmppUsersAPIEndpoint); - httpPost.addHeader(HttpHeaders.AUTHORIZATION, authorizationHeader); - httpPost.setEntity(requestEntity); - - try { - HttpResponse httpResponse = httpClient.execute(httpPost); - - if (httpResponse.getStatusLine().getStatusCode() != HttpStatus.SC_CREATED) { - String response = IoTUtil.getResponseString(httpResponse); - String errorMsg = "XMPP Server returned status: '" + httpResponse.getStatusLine().getStatusCode() + - "' for account creation with error:\n" + response; - log.error(errorMsg); - throw new DeviceControllerException(errorMsg); - } else { - EntityUtils.consume(httpResponse.getEntity()); - return true; - } - } catch (IOException | IoTException e) { - String errorMsg = - "Error occured whilst trying a 'POST' at : " + xmppUsersAPIEndpoint + " error: " + e.getMessage(); - log.error(errorMsg); - throw new DeviceControllerException(errorMsg, e); - } - - } else { - log.warn("XMPP set to false in 'devicemgt-config.xml'"); - return false; - } - } - - - public boolean doesXMPPUserAccountExist(String username) throws DeviceControllerException { - if (xmppEnabled) { - String xmppUsersAPIEndpoint = xmppEndpoint + XMPP_SERVER_API_CONTEXT + USERS_API + "/" + username; - if (log.isDebugEnabled()) { - log.debug("The API Endpoint URL of the XMPP Server is set to: " + xmppUsersAPIEndpoint); - } - - String encodedString = xmppUsername + ":" + xmppPassword; - encodedString = new String(Base64.encodeBase64(encodedString.getBytes(StandardCharsets.UTF_8))); - String authorizationHeader = "Basic " + encodedString; - - URL xmppUserApiUrl; - try { - xmppUserApiUrl = new URL(xmppUsersAPIEndpoint); - } catch (MalformedURLException e) { - String errMsg = "Malformed XMPP URL + " + xmppUsersAPIEndpoint; - log.error(errMsg); - throw new DeviceControllerException(errMsg, e); - } - - HttpClient httpClient; - try { - httpClient = IoTUtil.getHttpClient(xmppUserApiUrl.getPort(), xmppUserApiUrl.getProtocol()); - } catch (Exception e) { - String errorMsg = "Error on getting a http client for port :" + xmppUserApiUrl.getPort() + - " protocol :" + xmppUserApiUrl.getProtocol(); - log.error(errorMsg); - throw new DeviceControllerException(errorMsg, e); - } - - HttpGet httpGet = new HttpGet(xmppUsersAPIEndpoint); - httpGet.addHeader(HttpHeaders.AUTHORIZATION, authorizationHeader); - - try { - HttpResponse httpResponse = httpClient.execute(httpGet); - - if (httpResponse.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { - String response = IoTUtil.getResponseString(httpResponse); - if (log.isDebugEnabled()) { - log.debug("XMPP Server returned status: '" + httpResponse.getStatusLine().getStatusCode() + - "' for checking existence of account [" + username + "] with message:\n" + - response + "\nProbably, an account with this username does not exist."); - } - return false; - } - - } catch (IOException | IoTException e) { - String errorMsg = "Error occured whilst trying a 'GET' at : " + xmppUsersAPIEndpoint + - "\nError: " + e.getMessage(); - log.error(errorMsg); - throw new DeviceControllerException(errorMsg, e); - } - - if (log.isDebugEnabled()) { - log.debug("XMPP Server already has an account for the username - [" + username + "]."); - } - return true; - } else { - String warnMsg = "XMPP set to false in 'devicemgt-config.xml'"; - log.warn(warnMsg); - throw new DeviceControllerException(warnMsg); - } - } + private static final Log log = LogFactory.getLog(XmppServerClient.class); + + private static final String XMPP_SERVER_API_CONTEXT = "/plugins/restapi/v1"; + private static final String XMPP_USERS_API = "/users"; + private static final String XMPP_SESSIONS_API = "/sessions"; + @SuppressWarnings("unused") + private static final String XMPP_GROUPS_API = "/groups"; + @SuppressWarnings("unused") + private static final String APPLICATION_JSON_MT = "application/json"; + private static final String DEVICEMGT_CONFIG_FILE = "devicemgt-config.xml"; + + private String xmppEndpoint; + private String xmppUsername; + private String xmppPassword; + private boolean xmppEnabled = false; + + public XmppServerClient() { + } + + @Override + public void initControlQueue() { + xmppEndpoint = XmppConfig.getInstance().getXmppEndpoint(); + xmppUsername = XmppConfig.getInstance().getXmppUsername(); + xmppPassword = XmppConfig.getInstance().getXmppPassword(); + xmppEnabled = XmppConfig.getInstance().isEnabled(); + } + + @Override + public void enqueueControls(HashMap deviceControls) + throws DeviceControllerException { + if (!xmppEnabled) { + log.warn(String.format("XMPP set to false in [%s]", DEVICEMGT_CONFIG_FILE)); + } + } + + public boolean createXMPPAccount(XmppAccount newUserAccount) throws DeviceControllerException { + if (xmppEnabled) { + String xmppUsersAPIEndpoint = xmppEndpoint + XMPP_SERVER_API_CONTEXT + XMPP_USERS_API; + if (log.isDebugEnabled()) { + log.debug("The Create-UserAccount Endpoint URL of the XMPP Server is set to: " + xmppUsersAPIEndpoint); + } + + String encodedString = xmppUsername + ":" + xmppPassword; + encodedString = new String(Base64.encodeBase64(encodedString.getBytes(StandardCharsets.UTF_8))); + String authorizationHeader = "Basic " + encodedString; + String jsonRequest = "{\n" + + " \"username\": \"" + newUserAccount.getUsername() + "\"," + + " \"password\": \"" + newUserAccount.getPassword() + "\"," + + " \"name\": \"" + newUserAccount.getAccountName() + "\"," + + " \"email\": \"" + newUserAccount.getEmail() + "\"," + + " \"properties\": {" + + " \"property\": [" + + " {" + + " \"@key\": \"console.rows_per_page\"," + + " \"@value\": \"user-summary=8\"" + + " }," + + " {" + + " \"@key\": \"console.order\"," + + " \"@value\": \"session-summary=1\"" + + " }" + + " ]" + + " }" + + "}"; + + StringEntity requestEntity; + try { + requestEntity = new StringEntity(jsonRequest, MediaType.APPLICATION_JSON, + StandardCharsets.UTF_8.toString()); + } catch (UnsupportedEncodingException e) { + return false; + } + + URL xmppUserApiUrl; + try { + xmppUserApiUrl = new URL(xmppUsersAPIEndpoint); + } catch (MalformedURLException e) { + String errMsg = "Malformed XMPP URL + " + xmppUsersAPIEndpoint; + log.error(errMsg); + throw new DeviceControllerException(errMsg); + } + HttpClient httpClient; + try { + httpClient = IoTUtil.getHttpClient(xmppUserApiUrl.getPort(), xmppUserApiUrl.getProtocol()); + } catch (Exception e) { + log.error("Error on getting a http client for port :" + xmppUserApiUrl.getPort() + " protocol :" + + xmppUserApiUrl.getProtocol()); + return false; + } + + HttpPost httpPost = new HttpPost(xmppUsersAPIEndpoint); + httpPost.addHeader(HttpHeaders.AUTHORIZATION, authorizationHeader); + httpPost.setEntity(requestEntity); + + try { + HttpResponse httpResponse = httpClient.execute(httpPost); + + if (httpResponse.getStatusLine().getStatusCode() != HttpStatus.SC_CREATED) { + String response = IoTUtil.getResponseString(httpResponse); + String errorMsg = "XMPP Server returned status: '" + httpResponse.getStatusLine().getStatusCode() + + "' for account creation with error:\n" + response; + log.error(errorMsg); + throw new DeviceControllerException(errorMsg); + } else { + EntityUtils.consume(httpResponse.getEntity()); + return true; + } + } catch (IOException | IoTException e) { + String errorMsg = "Error occured whilst trying a 'POST' at : " + xmppUsersAPIEndpoint; + log.error(errorMsg); + throw new DeviceControllerException(errorMsg, e); + } + + } else { + log.warn(String.format("XMPP set to false in [%s]", DEVICEMGT_CONFIG_FILE)); + return false; + } + } + + + public boolean doesXMPPUserAccountExist(String username) throws DeviceControllerException { + if (xmppEnabled) { + String xmppCheckUserAPIEndpoint = xmppEndpoint + XMPP_SERVER_API_CONTEXT + XMPP_USERS_API + "/" + username; + if (log.isDebugEnabled()) { + log.debug("The Check-User-Account Endpoint URL of the XMPP Server is set to: " + + xmppCheckUserAPIEndpoint); + } + + String encodedString = xmppUsername + ":" + xmppPassword; + encodedString = new String(Base64.encodeBase64(encodedString.getBytes(StandardCharsets.UTF_8))); + String authorizationHeader = "Basic " + encodedString; + + URL xmppUserApiUrl; + try { + xmppUserApiUrl = new URL(xmppCheckUserAPIEndpoint); + } catch (MalformedURLException e) { + String errMsg = "Malformed XMPP URL + " + xmppCheckUserAPIEndpoint; + log.error(errMsg); + throw new DeviceControllerException(errMsg, e); + } + + HttpClient httpClient; + try { + httpClient = IoTUtil.getHttpClient(xmppUserApiUrl.getPort(), xmppUserApiUrl.getProtocol()); + } catch (Exception e) { + String errorMsg = "Error on getting a http client for port :" + xmppUserApiUrl.getPort() + + " protocol :" + xmppUserApiUrl.getProtocol(); + log.error(errorMsg); + throw new DeviceControllerException(errorMsg, e); + } + + HttpGet httpGet = new HttpGet(xmppCheckUserAPIEndpoint); + httpGet.addHeader(HttpHeaders.AUTHORIZATION, authorizationHeader); + + try { + HttpResponse httpResponse = httpClient.execute(httpGet); + + if (httpResponse.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { + String response = IoTUtil.getResponseString(httpResponse); + if (log.isDebugEnabled()) { + log.debug("XMPP Server returned status: '" + httpResponse.getStatusLine().getStatusCode() + + "' for checking existence of account [" + username + "] with message:\n" + + response + "\nProbably, an account with this username does not exist."); + } + return false; + } + + } catch (IOException | IoTException e) { + String errorMsg = "Error occured whilst trying a 'GET' at : " + xmppCheckUserAPIEndpoint; + log.error(errorMsg); + throw new DeviceControllerException(errorMsg, e); + } + + if (log.isDebugEnabled()) { + log.debug("XMPP Server already has an account for the username - [" + username + "]."); + } + return true; + } else { + String warnMsg = String.format("XMPP set to false in [%s]", DEVICEMGT_CONFIG_FILE); + log.warn(warnMsg); + throw new DeviceControllerException(warnMsg); + } + } + + + public JSONArray getAllCurrentUserSessions() throws DeviceControllerException { + if (xmppEnabled) { + JSONArray xmppSessions; + String xmppSessionsAPIEndpoint = xmppEndpoint + XMPP_SERVER_API_CONTEXT + XMPP_SESSIONS_API; + + if (log.isDebugEnabled()) { + log.debug("The Get-Sessions Endpoint URL of the XMPP Server is set to: " + xmppSessionsAPIEndpoint); + } + + String encodedString = xmppUsername + ":" + xmppPassword; + encodedString = new String(Base64.encodeBase64(encodedString.getBytes(StandardCharsets.UTF_8))); + String authorizationHeader = "Basic " + encodedString; + + URL xmppUserApiUrl; + try { + xmppUserApiUrl = new URL(xmppSessionsAPIEndpoint); + } catch (MalformedURLException e) { + String errMsg = "Malformed XMPP URL + " + xmppSessionsAPIEndpoint; + log.error(errMsg); + throw new DeviceControllerException(errMsg, e); + } + + HttpClient httpClient; + try { + httpClient = IoTUtil.getHttpClient(xmppUserApiUrl.getPort(), xmppUserApiUrl.getProtocol()); + } catch (Exception e) { + String errorMsg = "Error on getting a http client for port :" + xmppUserApiUrl.getPort() + + " protocol :" + xmppUserApiUrl.getProtocol(); + log.error(errorMsg); + throw new DeviceControllerException(errorMsg, e); + } + + HttpGet httpGet = new HttpGet(xmppSessionsAPIEndpoint); + httpGet.addHeader(HttpHeaders.AUTHORIZATION, authorizationHeader); + httpGet.addHeader(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON); + + try { + HttpResponse httpResponse = httpClient.execute(httpGet); + + if (httpResponse.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { + String errorMsg = "XMPP Server returned status: '" + httpResponse.getStatusLine().getStatusCode() + + "' for checking current XMPP Sessions."; + log.error(errorMsg); + throw new DeviceControllerException(errorMsg); + } + + String response = IoTUtil.getResponseString(httpResponse); + xmppSessions = new JSONObject(response).getJSONArray("session"); + return xmppSessions; + + } catch (IOException | IoTException e) { + String errorMsg = "Error occured whilst trying a 'GET' at : " + xmppSessionsAPIEndpoint; + log.error(errorMsg); + throw new DeviceControllerException(errorMsg, e); + } + + } else { + String warnMsg = String.format("XMPP set to false in [%s]", DEVICEMGT_CONFIG_FILE); + log.warn(warnMsg); + throw new DeviceControllerException(warnMsg); + } + } + + + public void deleteCurrentXmppSessions() throws DeviceControllerException { + JSONArray xmppSessionsArray; + + try { + xmppSessionsArray = getAllCurrentUserSessions(); + } catch (DeviceControllerException e) { + if (e.getMessage().contains(DEVICEMGT_CONFIG_FILE)) { + log.warn(String.format("XMPP set to false in [%s]", DEVICEMGT_CONFIG_FILE)); + return; + } else { + throw e; + } + } + + if (xmppSessionsArray.length() != 0) { + String xmppSessionsAPIEndpoint = xmppEndpoint + XMPP_SERVER_API_CONTEXT + XMPP_SESSIONS_API; + String encodedString = xmppUsername + ":" + xmppPassword; + encodedString = new String(Base64.encodeBase64(encodedString.getBytes(StandardCharsets.UTF_8))); + String authorizationHeader = "Basic " + encodedString; + + if (log.isDebugEnabled()) { + log.debug("The Get-Sessions Endpoint URL of the XMPP Server is set to: " + xmppSessionsAPIEndpoint); + } + + URL xmppUserApiUrl; + try { + xmppUserApiUrl = new URL(xmppSessionsAPIEndpoint); + } catch (MalformedURLException e) { + String errMsg = "Malformed XMPP URL + " + xmppSessionsAPIEndpoint; + log.error(errMsg); + throw new DeviceControllerException(errMsg, e); + } + + HttpClient httpClient; + try { + httpClient = IoTUtil.getHttpClient(xmppUserApiUrl.getPort(), xmppUserApiUrl.getProtocol()); + } catch (Exception e) { + String errorMsg = "Error on getting a http client for port :" + xmppUserApiUrl.getPort() + + " protocol :" + xmppUserApiUrl.getProtocol(); + log.error(errorMsg); + throw new DeviceControllerException(errorMsg, e); + } + + for (int i = 0; i < xmppSessionsArray.length(); i++) { + + String sessionName = xmppSessionsArray.getJSONObject(i).getString("username"); + String xmppUserSessionsAPIEndpoint = xmppSessionsAPIEndpoint + "/" + sessionName; + + HttpDelete httpDelete = new HttpDelete(xmppUserSessionsAPIEndpoint); + httpDelete.addHeader(HttpHeaders.AUTHORIZATION, authorizationHeader); + + try { + HttpResponse httpResponse = httpClient.execute(httpDelete); + + if (httpResponse.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { + String errorMsg = + "XMPP Server returned status: '" + httpResponse.getStatusLine().getStatusCode() + + "' for checking current XMPP Sessions."; + log.error(errorMsg); + throw new DeviceControllerException(errorMsg); + } + + } catch (IOException e) { + String errorMsg = "Error occured whilst trying a 'DELETE' user-session [" + sessionName + "] " + + "at : " + xmppUserSessionsAPIEndpoint; + log.error(errorMsg); + throw new DeviceControllerException(errorMsg, e); + } + } + } + } } diff --git a/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/internal/IotDeviceManagementServiceComponent.java b/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/internal/IotDeviceManagementServiceComponent.java index 4c204abb04..068342dbb8 100644 --- a/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/internal/IotDeviceManagementServiceComponent.java +++ b/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/internal/IotDeviceManagementServiceComponent.java @@ -27,16 +27,19 @@ import org.wso2.carbon.databridge.core.DataBridgeReceiverService; import org.wso2.carbon.device.mgt.iot.DeviceController; import org.wso2.carbon.device.mgt.iot.UserManagement; import org.wso2.carbon.device.mgt.iot.analytics.statistics.IoTEventsStatisticsClient; +import org.wso2.carbon.device.mgt.iot.analytics.statistics.IoTUsageStatisticsClient; +import org.wso2.carbon.device.mgt.iot.config.devicetype.IotDeviceTypeConfigurationManager; import org.wso2.carbon.device.mgt.iot.config.devicetype.datasource.IotDeviceTypeConfig; +import org.wso2.carbon.device.mgt.iot.config.server.DeviceManagementConfigurationManager; +import org.wso2.carbon.device.mgt.iot.controlqueue.xmpp.XmppConfig; +import org.wso2.carbon.device.mgt.iot.controlqueue.xmpp.XmppServerClient; +import org.wso2.carbon.device.mgt.iot.exception.DeviceControllerException; import org.wso2.carbon.device.mgt.iot.service.DeviceTypeService; import org.wso2.carbon.device.mgt.iot.service.DeviceTypeServiceImpl; import org.wso2.carbon.device.mgt.iot.startup.StartupUrlPrinter; -import org.wso2.carbon.device.mgt.iot.util.iotdevice.exception.IotDeviceMgtPluginException; -import org.wso2.carbon.device.mgt.iot.config.server.DeviceManagementConfigurationManager; -import org.wso2.carbon.device.mgt.iot.analytics.statistics.IoTUsageStatisticsClient; -import org.wso2.carbon.device.mgt.iot.config.devicetype.IotDeviceTypeConfigurationManager; import org.wso2.carbon.device.mgt.iot.util.iotdevice.dao.IotDeviceManagementDAOFactory; import org.wso2.carbon.device.mgt.iot.util.iotdevice.dao.util.IotDeviceManagementDAOUtil; +import org.wso2.carbon.device.mgt.iot.util.iotdevice.exception.IotDeviceMgtPluginException; import org.wso2.carbon.ndatasource.core.DataSourceService; import org.wso2.carbon.user.core.service.RealmService; import org.wso2.carbon.utils.ConfigurationContextService; @@ -74,7 +77,8 @@ import java.util.Map; public class IotDeviceManagementServiceComponent { private static final Log log = LogFactory.getLog(IotDeviceManagementServiceComponent.class); - public static ConfigurationContextService configurationContextService; + public static ConfigurationContextService configurationContextService; + protected void activate(ComponentContext ctx) { if (log.isDebugEnabled()) { log.debug("Activating Iot Device Management Service Component"); @@ -82,69 +86,83 @@ public class IotDeviceManagementServiceComponent { try { - BundleContext bundleContext = ctx.getBundleContext(); /* Initialize the data source configuration */ - DeviceManagementConfigurationManager.getInstance().initConfig(); - IotDeviceTypeConfigurationManager.getInstance().initConfig(); - Map dsConfigMap = - IotDeviceTypeConfigurationManager.getInstance().getIotDeviceTypeConfigMap(); - - IotDeviceManagementDAOFactory.init(dsConfigMap); - - - - String setupOption = System.getProperty("setup"); - if (setupOption != null) { - if (log.isDebugEnabled()) { - log.debug( - "-Dsetup is enabled. Iot Device management repository schema initialization is about " + - "to begin"); - } - try { - for (String pluginType : dsConfigMap.keySet()){ - IotDeviceManagementDAOUtil - .setupIotDeviceManagementSchema( - IotDeviceManagementDAOFactory.getDataSourceMap - ().get(pluginType), pluginType); - } - } catch (IotDeviceMgtPluginException e) { - log.error( - "Exception occurred while initializing mobile device management database schem ", - e); - } - } - - IoTCommonDataHolder.getInstance().initialize(); + BundleContext bundleContext = ctx.getBundleContext(); /* Initialize the data source + configuration */ + DeviceManagementConfigurationManager.getInstance().initConfig(); + IotDeviceTypeConfigurationManager.getInstance().initConfig(); + Map dsConfigMap = + IotDeviceTypeConfigurationManager.getInstance().getIotDeviceTypeConfigMap(); + + IotDeviceManagementDAOFactory.init(dsConfigMap); + + + String setupOption = System.getProperty("setup"); + if (setupOption != null) { + if (log.isDebugEnabled()) { + log.debug( + "-Dsetup is enabled. Iot Device management repository schema initialization is about " + + "to begin"); + } + try { + for (String pluginType : dsConfigMap.keySet()) { + IotDeviceManagementDAOUtil + .setupIotDeviceManagementSchema( + IotDeviceManagementDAOFactory.getDataSourceMap + ().get(pluginType), pluginType); + } + } catch (IotDeviceMgtPluginException e) { + log.error( + "Exception occurred while initializing mobile device management database schem ", + e); + } + } + + IoTCommonDataHolder.getInstance().initialize(); + + //TODO: handle + + DeviceController.init(); + IoTUsageStatisticsClient.initializeDataSource(); + IoTEventsStatisticsClient.initializeDataSource(); + UserManagement.registerApiAccessRoles(); - //TODO: handle - DeviceController.init(); - IoTUsageStatisticsClient.initializeDataSource(); - IoTEventsStatisticsClient.initializeDataSource(); - UserManagement.registerApiAccessRoles(); + bundleContext.registerService(DeviceTypeService.class.getName(), + new DeviceTypeServiceImpl(), null); + if (log.isDebugEnabled()) { + log.debug("Iot Device Management Service Component has been successfully activated"); + } - bundleContext.registerService(DeviceTypeService.class.getName(), - new DeviceTypeServiceImpl(), null); + bundleContext.registerService(ServerStartupObserver.class, new StartupUrlPrinter(), null); + } catch (Throwable e) { + log.error("Error occurred while activating Iot Device Management Service Component", e); + } + } - if (log.isDebugEnabled()) { - log.debug("Iot Device Management Service Component has been successfully activated"); - } + protected void deactivate(ComponentContext ctx) { + XmppConfig xmppConfig = XmppConfig.getInstance(); - bundleContext.registerService(ServerStartupObserver.class, new StartupUrlPrinter(), null); - } catch (Throwable e) { - log.error("Error occurred while activating Iot Device Management Service Component", e); - } - } + try { + if (xmppConfig.isEnabled()) { + XmppServerClient xmppServerClient = new XmppServerClient(); + xmppServerClient.initControlQueue(); + xmppServerClient.deleteCurrentXmppSessions(); + } + } catch (DeviceControllerException e) { + String errorMsg = "An error occurred whilst trying to delete all existing XMPP login sessions at " + + "[" + xmppConfig.getXmppEndpoint() + "]."; + log.error(errorMsg, e); + } - protected void deactivate(ComponentContext ctx) { - if (log.isDebugEnabled()) { - log.debug("De-activating Iot Device Management Service Component"); - } + if (log.isDebugEnabled()) { + log.debug("De-activating Iot Device Management Service Component"); + } - } + } - protected void setDataSourceService(DataSourceService dataSourceService) { - /* This is to avoid iot device management component getting initialized before the + protected void setDataSourceService(DataSourceService dataSourceService) { + /* This is to avoid iot device management component getting initialized before the underlying datasources are registered */ if (log.isDebugEnabled()) { @@ -156,45 +174,47 @@ public class IotDeviceManagementServiceComponent { //do nothing } - protected void setConfigurationContextService(ConfigurationContextService configurationContextService) { - if (log.isDebugEnabled()) { - log.debug("Setting ConfigurationContextService"); - } - - IotDeviceManagementServiceComponent.configurationContextService=configurationContextService; - - } - - protected void unsetConfigurationContextService(ConfigurationContextService configurationContextService) { - if (log.isDebugEnabled()) { - log.debug("Un-setting ConfigurationContextService"); - } - IotDeviceManagementServiceComponent.configurationContextService=null; - } - - /** - * Sets Realm Service - * @param realmService associated realm service reference - */ - protected void setRealmService(RealmService realmService) { - if (log.isDebugEnabled()) { - log.debug("Setting Realm Service"); - - } - UserManagement.setRealmService(realmService); - - } - - /** - * Unsets Realm Service - * @param realmService associated realm service reference - */ - protected void unsetRealmService(RealmService realmService) { - if (log.isDebugEnabled()) { - log.debug("Unsetting Realm Service"); - } - UserManagement.setRealmService(realmService); - } + protected void setConfigurationContextService(ConfigurationContextService configurationContextService) { + if (log.isDebugEnabled()) { + log.debug("Setting ConfigurationContextService"); + } + + IotDeviceManagementServiceComponent.configurationContextService = configurationContextService; + + } + + protected void unsetConfigurationContextService(ConfigurationContextService configurationContextService) { + if (log.isDebugEnabled()) { + log.debug("Un-setting ConfigurationContextService"); + } + IotDeviceManagementServiceComponent.configurationContextService = null; + } + + /** + * Sets Realm Service + * + * @param realmService associated realm service reference + */ + protected void setRealmService(RealmService realmService) { + if (log.isDebugEnabled()) { + log.debug("Setting Realm Service"); + + } + UserManagement.setRealmService(realmService); + + } + + /** + * Unsets Realm Service + * + * @param realmService associated realm service reference + */ + protected void unsetRealmService(RealmService realmService) { + if (log.isDebugEnabled()) { + log.debug("Unsetting Realm Service"); + } + UserManagement.setRealmService(realmService); + } /** * Sets DataBridge Receiver Service