Deleting all active xmpp-sessions during server shutdown

Shabirmean 9 years ago
parent 060c2dd079
commit 2cf022ef55

@ -26,7 +26,7 @@ import org.wso2.carbon.apimgt.annotations.device.feature.Feature;
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig;
import org.wso2.carbon.device.mgt.iot.digitaldisplay.api.exception.DigitalDisplayException;
import org.wso2.carbon.device.mgt.iot.digitaldisplay.api.transport.CommunicationHandlerException;
import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException;
import org.wso2.carbon.device.mgt.iot.digitaldisplay.api.util.DigitalDisplayMqttCommunicationHandler;
import org.wso2.carbon.device.mgt.iot.digitaldisplay.constants.DigitalDisplayConstants;
@ -466,7 +466,7 @@ public class DigitalDisplayControllerService {
try {
digitalDisplayMqttCommunicationHandler.publishToDigitalDisplay(topic, payload, 2, true);
} catch (CommunicationHandlerException e) {
} catch (TransportHandlerException e) {
String errorMessage = "Error publishing data to device with ID " + deviceId;
throw new DigitalDisplayException(errorMessage, e);
}

@ -1,15 +0,0 @@
package org.wso2.carbon.device.mgt.iot.digitaldisplay.api.transport;
public interface CommunicationHandler<T> {
int DEFAULT_TIMEOUT_INTERVAL = 5000; // millis ~ 10 sec
void connect();
boolean isConnected();
void processIncomingMessage(T message, String... messageParams);
void processIncomingMessage();
void disconnect();
}

@ -1,38 +0,0 @@
package org.wso2.carbon.device.mgt.iot.digitaldisplay.api.transport;
public class CommunicationHandlerException extends Exception {
private static final long serialVersionUID = 2736466230451105440L;
private String errorMessage;
public String getErrorMessage() {
return errorMessage;
}
public void setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
}
public CommunicationHandlerException(String msg, Exception nestedEx) {
super(msg, nestedEx);
setErrorMessage(msg);
}
public CommunicationHandlerException(String message, Throwable cause) {
super(message, cause);
setErrorMessage(message);
}
public CommunicationHandlerException(String msg) {
super(msg);
setErrorMessage(msg);
}
public CommunicationHandlerException() {
super();
}
public CommunicationHandlerException(Throwable cause) {
super(cause);
}
}

@ -1,345 +0,0 @@
/*
* Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.iot.digitaldisplay.api.transport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.paho.client.mqttv3.*;
import java.io.File;
import java.nio.charset.StandardCharsets;
/**
* This class contains the IoT-Server specific implementation for all the MQTT functionality.
* This includes connecting to a MQTT Broker & subscribing to the appropriate MQTT-topic, action
* plan upon losing connection or successfully delivering a message to the broker and processing
* incoming messages. Makes use of the 'Paho-MQTT' library provided by Eclipse Org.
* <p/>
* It is an abstract class that implements the common interface "CommunicationHandler" and the
* "MqttCallback". Whilst providing some methods which handle key MQTT relevant tasks, this class
* implements only the most generic methods of the "CommunicationHandler" interface. The rest of
* the methods are left for any extended concrete-class to implement as per its need.
*/
public abstract class MQTTCommunicationHandler
implements MqttCallback, CommunicationHandler<MqttMessage> {
private static final Log log = LogFactory.getLog(MQTTCommunicationHandler.class);
public static final int DEFAULT_MQTT_QUALITY_OF_SERVICE = 0;
private MqttClient client;
private String clientId;
private MqttConnectOptions options;
private String clientWillTopic;
protected String mqttBrokerEndPoint;
protected int timeoutInterval;
protected String subscribeTopic;
/**
* Constructor for the MQTTCommunicationHandler which takes in the owner, type of the device
* and the MQTT Broker URL and the topic to subscribe.
*
* @param deviceOwner the owner of the device.
* @param deviceType the CDMF Device-Type of the device.
* @param mqttBrokerEndPoint the IP/URL of the MQTT broker endpoint.
* @param subscribeTopic the MQTT topic to which the client is to be subscribed
*/
protected MQTTCommunicationHandler(String deviceOwner, String deviceType,
String mqttBrokerEndPoint,
String subscribeTopic) {
this.clientId = deviceOwner + ":" + deviceType;
this.subscribeTopic = subscribeTopic;
this.clientWillTopic = deviceType + File.separator + "disconnection";
this.mqttBrokerEndPoint = mqttBrokerEndPoint;
this.timeoutInterval = DEFAULT_TIMEOUT_INTERVAL;
this.initSubscriber();
}
/**
* Constructor for the MQTTCommunicationHandler which takes in the owner, type of the device
* and the MQTT Broker URL and the topic to subscribe. Additionally this constructor takes in
* the reconnection-time interval between successive attempts to connect to the broker.
*
* @param deviceOwner the owner of the device.
* @param deviceType the CDMF Device-Type of the device.
* @param mqttBrokerEndPoint the IP/URL of the MQTT broker endpoint.
* @param subscribeTopic the MQTT topic to which the client is to be subscribed
* @param intervalInMillis the time interval in MILLI-SECONDS between successive
* attempts to connect to the broker.
*/
protected MQTTCommunicationHandler(String deviceOwner, String deviceType,
String mqttBrokerEndPoint, String subscribeTopic,
int intervalInMillis) {
this.clientId = deviceOwner + ":" + deviceType;
this.subscribeTopic = subscribeTopic;
this.clientWillTopic = deviceType + File.separator + "disconnection";
this.mqttBrokerEndPoint = mqttBrokerEndPoint;
this.timeoutInterval = intervalInMillis;
this.initSubscriber();
}
public void setTimeoutInterval(int timeoutInterval) {
this.timeoutInterval = timeoutInterval;
}
/**
* Initializes the MQTT-Client. Creates a client using the given MQTT-broker endpoint and the
* clientId (which is constructed by a concatenation of [deviceOwner]:[deviceType]). Also sets
* the client's options parameter with the clientWillTopic (in-case of connection failure) and
* other info. Also sets the call-back this current class.
*/
private void initSubscriber() {
try {
client = new MqttClient(this.mqttBrokerEndPoint, clientId, null);
log.info("MQTT subscriber was created with ClientID : " + clientId);
} catch (MqttException ex) {
String errorMsg = "MQTT Client Error\n" + "\tReason: " + ex.getReasonCode() +
"\n\tMessage: " + ex.getMessage() + "\n\tLocalMsg: " +
ex.getLocalizedMessage() + "\n\tCause: " + ex.getCause() +
"\n\tException: " + ex;
log.error(errorMsg);
}
options = new MqttConnectOptions();
options.setCleanSession(false);
options.setWill(clientWillTopic, "Connection-Lost".getBytes(StandardCharsets.UTF_8), 2,
true);
client.setCallback(this);
}
/**
* Checks whether the connection to the MQTT-Broker persists.
*
* @return true if the client is connected to the MQTT-Broker, else false.
*/
@Override
public boolean isConnected() {
return client.isConnected();
}
/**
* Connects to the MQTT-Broker and if successfully established connection.
*
* @throws CommunicationHandlerException in the event of 'Connecting to' the MQTT broker fails.
*/
protected void connectToQueue() throws CommunicationHandlerException {
try {
client.connect(options);
if (log.isDebugEnabled()) {
log.debug("Subscriber connected to queue at: " + this.mqttBrokerEndPoint);
}
} catch (MqttSecurityException ex) {
String errorMsg = "MQTT Security Exception when connecting to queue\n" + "\tReason: " +
" " +
ex.getReasonCode() + "\n\tMessage: " + ex.getMessage() +
"\n\tLocalMsg: " + ex.getLocalizedMessage() + "\n\tCause: " +
ex.getCause() + "\n\tException: " + ex;
if (log.isDebugEnabled()) {
log.debug(errorMsg);
}
throw new CommunicationHandlerException(errorMsg, ex);
} catch (MqttException ex) {
String errorMsg = "MQTT Exception when connecting to queue\n" + "\tReason: " +
ex.getReasonCode() + "\n\tMessage: " + ex.getMessage() +
"\n\tLocalMsg: " + ex.getLocalizedMessage() + "\n\tCause: " +
ex.getCause() + "\n\tException: " + ex;
if (log.isDebugEnabled()) {
log.debug(errorMsg);
}
throw new CommunicationHandlerException(errorMsg, ex);
}
}
/**
* Subscribes to the MQTT-Topic specific to this MQTT Client. (The MQTT-Topic specific to the
* device is taken in as a constructor parameter of this class) .
*
* @throws CommunicationHandlerException in the event of 'Subscribing to' the MQTT broker
* fails.
*/
protected void subscribeToQueue() throws CommunicationHandlerException {
try {
client.subscribe(subscribeTopic, 0);
log.info("Subscriber '" + clientId + "' subscribed to topic: " + subscribeTopic);
} catch (MqttException ex) {
String errorMsg = "MQTT Exception when trying to subscribe to topic: " +
subscribeTopic + "\n\tReason: " + ex.getReasonCode() +
"\n\tMessage: " + ex.getMessage() + "\n\tLocalMsg: " +
ex.getLocalizedMessage() + "\n\tCause: " + ex.getCause() +
"\n\tException: " + ex;
if (log.isDebugEnabled()) {
log.debug(errorMsg);
}
throw new CommunicationHandlerException(errorMsg, ex);
}
}
/**
* This method is used to publish reply-messages for the control signals received.
* Invocation of this method calls its overloaded-method with a QoS equal to that of the
* default value.
*
* @param topic the topic to which the reply message is to be published.
* @param payLoad the reply-message (payload) of the MQTT publish action.
*/
protected void publishToQueue(String topic, String payLoad)
throws CommunicationHandlerException {
publishToQueue(topic, payLoad, DEFAULT_MQTT_QUALITY_OF_SERVICE, false);
}
/**
* This is an overloaded method that publishes MQTT reply-messages for control signals
* received form the IoT-Server.
*
* @param topic the topic to which the reply message is to be published
* @param payLoad the reply-message (payload) of the MQTT publish action.
* @param qos the Quality-of-Service of the current publish action.
* Could be 0(At-most once), 1(At-least once) or 2(Exactly once)
*/
protected void publishToQueue(String topic, String payLoad, int qos, boolean retained)
throws CommunicationHandlerException {
try {
client.publish(topic, payLoad.getBytes(StandardCharsets.UTF_8), qos, retained);
if (log.isDebugEnabled()) {
log.debug("Message: " + payLoad + " to MQTT topic [" + topic +
"] published successfully");
}
} catch (MqttException ex) {
String errorMsg =
"MQTT Client Error" + "\n\tReason: " + ex.getReasonCode() + "\n\tMessage: " +
ex.getMessage() + "\n\tLocalMsg: " + ex.getLocalizedMessage() +
"\n\tCause: " + ex.getCause() + "\n\tException: " + ex;
log.info(ex);
throw new CommunicationHandlerException(errorMsg, ex);
}
}
protected void publishToQueue(String topic, MqttMessage message)
throws CommunicationHandlerException {
try {
client.publish(topic, message);
if (log.isDebugEnabled()) {
log.debug("Message: " + message.toString() + " to MQTT topic [" + topic +
"] published successfully");
}
} catch (MqttException ex) {
String errorMsg =
"MQTT Client Error" + "\n\tReason: " + ex.getReasonCode() + "\n\tMessage: " +
ex.getMessage() + "\n\tLocalMsg: " + ex.getLocalizedMessage() +
"\n\tCause: " + ex.getCause() + "\n\tException: " + ex;
log.info(errorMsg);
throw new CommunicationHandlerException(errorMsg, ex);
}
}
/**
* Callback method which is triggered once the MQTT client losers its connection to the broker.
* Spawns a new thread that executes necessary actions to try and reconnect to the endpoint.
*
* @param throwable a Throwable Object containing the details as to why the failure occurred.
*/
@Override
public void connectionLost(Throwable throwable) {
log.warn("Lost Connection for client: " + this.clientId +
" to " + this.mqttBrokerEndPoint + ".\nThis was due to - " +
throwable.getMessage());
Thread reconnectThread = new Thread() {
public void run() {
connect();
}
};
reconnectThread.setDaemon(true);
reconnectThread.start();
}
/**
* Callback method which is triggered upon receiving a MQTT Message from the broker. Spawns a
* new thread that executes any actions to be taken with the received message.
*
* @param topic the MQTT-Topic to which the received message was published to and the
* client was subscribed to.
* @param mqttMessage the actual MQTT-Message that was received from the broker.
*/
@Override
public void messageArrived(final String topic, final MqttMessage mqttMessage) {
if (log.isDebugEnabled()) {
log.info("Got an MQTT message '" + mqttMessage.toString() + "' for topic '" + topic +
"'.");
}
Thread messageProcessorThread = new Thread() {
public void run() {
processIncomingMessage(mqttMessage, topic);
}
};
messageProcessorThread.setDaemon(true);
messageProcessorThread.start();
}
/**
* Callback method which gets triggered upon successful completion of a message delivery to
* the broker.
*
* @param iMqttDeliveryToken the MQTT-DeliveryToken which includes the details about the
* specific message delivery.
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
String message = "";
try {
if (iMqttDeliveryToken.isComplete()) {
if (iMqttDeliveryToken.getMessage() != null){
message = iMqttDeliveryToken.getMessage().toString();
}
} else {
log.error("MQTT Message not delivered");
}
} catch (MqttException e) {
log.error(
"Error occurred whilst trying to read the message from the MQTT delivery token.");
}
String topic = iMqttDeliveryToken.getTopics()[0];
String client = iMqttDeliveryToken.getClient().getClientId();
if (log.isDebugEnabled()) {
log.debug("Message - '" + message + "' of client [" + client + "] for the topic (" +
topic + ") was delivered successfully.");
}
}
/**
* Closes the connection to the MQTT Broker.
*/
public void closeConnection() throws MqttException {
if (client != null && isConnected()) {
client.disconnect();
}
}
}

@ -5,26 +5,29 @@ import org.apache.commons.logging.LogFactory;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig;
import org.wso2.carbon.device.mgt.iot.digitaldisplay.constants.DigitalDisplayConstants;
import org.wso2.carbon.device.mgt.iot.digitaldisplay.api.transport.CommunicationHandlerException;
import org.wso2.carbon.device.mgt.iot.digitaldisplay.api.transport.MQTTCommunicationHandler;
import org.wso2.carbon.device.mgt.iot.digitaldisplay.api.websocket.DigitalDisplayWebSocketServerEndPoint;
import org.wso2.carbon.device.mgt.iot.digitaldisplay.constants.DigitalDisplayConstants;
import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException;
import org.wso2.carbon.device.mgt.iot.transport.mqtt.MQTTTransportHandler;
import java.io.File;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
//import org.wso2.carbon.device.mgt.iot.digitaldisplay.api.transport.CommunicationHandlerException;
//import org.wso2.carbon.device.mgt.iot.digitaldisplay.api.transport.MQTTCommunicationHandler;
public class DigitalDisplayMqttCommunicationHandler extends MQTTCommunicationHandler {
public class DigitalDisplayMqttCommunicationHandler extends MQTTTransportHandler {
private static Log log = LogFactory.getLog(DigitalDisplayMqttCommunicationHandler.class);
private static final String subscribeTopic =
"wso2"+ File.separator+"iot"+File.separator+"+"+File.separator+
DigitalDisplayConstants.DEVICE_TYPE+File.separator+"+"+File.separator+
"wso2" + File.separator + "iot" + File.separator + "+" + File.separator +
DigitalDisplayConstants.DEVICE_TYPE + File.separator + "+" + File.separator +
"digital_display_publisher";
private static String iotServerSubscriber = UUID.randomUUID().toString().substring(0,5);
private static String iotServerSubscriber = UUID.randomUUID().toString().substring(0, 5);
private ScheduledFuture<?> dataPushServiceHandler;
@ -42,13 +45,13 @@ public class DigitalDisplayMqttCommunicationHandler extends MQTTCommunicationHan
Runnable connect = new Runnable() {
@Override
public void run() {
while (!isConnected()){
while (!isConnected()) {
try {
log.info("Trying to Connect..");
connectToQueue();
subscribeToQueue();
} catch (CommunicationHandlerException e) {
} catch (TransportHandlerException e) {
log.warn("Connection/Subscription to MQTT Broker at: " +
mqttBrokerEndPoint + " failed");
@ -76,19 +79,19 @@ public class DigitalDisplayMqttCommunicationHandler extends MQTTCommunicationHan
String topic = messageParams[0];
String ownerAndId = topic.replace("wso2"+File.separator+"iot"+File.separator,"");
ownerAndId = ownerAndId.replace(File.separator+ DigitalDisplayConstants.DEVICE_TYPE+File.separator,":");
ownerAndId = ownerAndId.replace(File.separator+"digital_display_publisher","");
String ownerAndId = topic.replace("wso2" + File.separator + "iot" + File.separator, "");
ownerAndId = ownerAndId.replace(File.separator + DigitalDisplayConstants.DEVICE_TYPE + File.separator, ":");
ownerAndId = ownerAndId.replace(File.separator + "digital_display_publisher", "");
String owner = ownerAndId.split(":")[0];
String deviceId = ownerAndId.split(":")[1];
String [] messageData = message.toString().split(":");
String[] messageData = message.toString().split(":");
log.info("Received MQTT message for: {OWNER-" + owner + "} & {DEVICE.ID-" + deviceId + "}");
if(messageData.length == 3){
if (messageData.length == 3) {
String randomId = messageData[0];
String requestMessage = messageData[1];
String result = messageData[2];
@ -98,12 +101,8 @@ public class DigitalDisplayMqttCommunicationHandler extends MQTTCommunicationHan
}
@Override
public void processIncomingMessage() {
}
public void publishToDigitalDisplay(String topic, String payLoad, int qos, boolean retained) throws CommunicationHandlerException {
public void publishToDigitalDisplay(String topic, String payLoad, int qos, boolean retained)
throws TransportHandlerException {
log.info(topic + " " + payLoad);
publishToQueue(topic, payLoad, qos, retained);
}
@ -138,4 +137,30 @@ public class DigitalDisplayMqttCommunicationHandler extends MQTTCommunicationHan
terminatorThread.start();
}
@Override
public void publishDeviceData() throws TransportHandlerException {
}
@Override
public void publishDeviceData(MqttMessage publishData) throws TransportHandlerException {
}
@Override
public void publishDeviceData(String... publishData) throws TransportHandlerException {
}
@Override
public void processIncomingMessage() {
}
@Override
public void processIncomingMessage(MqttMessage message) throws TransportHandlerException {
}
}

@ -25,10 +25,13 @@ import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import org.json.JSONArray;
import org.json.JSONObject;
import org.wso2.carbon.device.mgt.iot.controlqueue.ControlQueueConnector;
import org.wso2.carbon.device.mgt.iot.exception.DeviceControllerException;
import org.wso2.carbon.device.mgt.iot.exception.IoTException;
@ -47,11 +50,13 @@ public class XmppServerClient implements ControlQueueConnector {
private static final Log log = LogFactory.getLog(XmppServerClient.class);
private static final String XMPP_SERVER_API_CONTEXT = "/plugins/restapi/v1";
private static final String USERS_API = "/users";
private static final String XMPP_USERS_API = "/users";
private static final String XMPP_SESSIONS_API = "/sessions";
@SuppressWarnings("unused")
private static final String GROUPS_API = "/groups";
private static final String XMPP_GROUPS_API = "/groups";
@SuppressWarnings("unused")
private static final String APPLICATION_JSON_MT = "application/json";
private static final String DEVICEMGT_CONFIG_FILE = "devicemgt-config.xml";
private String xmppEndpoint;
private String xmppUsername;
@ -73,25 +78,25 @@ public class XmppServerClient implements ControlQueueConnector {
public void enqueueControls(HashMap<String, String> deviceControls)
throws DeviceControllerException {
if (!xmppEnabled) {
log.warn("XMPP <Enabled> set to false in 'devicemgt-config.xml'");
log.warn(String.format("XMPP <Enabled> set to false in [%s]", DEVICEMGT_CONFIG_FILE));
}
}
public boolean createXMPPAccount(XmppAccount newUserAccount) throws DeviceControllerException {
if (xmppEnabled) {
String xmppUsersAPIEndpoint = xmppEndpoint + XMPP_SERVER_API_CONTEXT + USERS_API;
String xmppUsersAPIEndpoint = xmppEndpoint + XMPP_SERVER_API_CONTEXT + XMPP_USERS_API;
if (log.isDebugEnabled()) {
log.debug("The API Endpoint URL of the XMPP Server is set to: " + xmppUsersAPIEndpoint);
log.debug("The Create-UserAccount Endpoint URL of the XMPP Server is set to: " + xmppUsersAPIEndpoint);
}
String encodedString = xmppUsername + ":" + xmppPassword;
encodedString = new String(Base64.encodeBase64(encodedString.getBytes(StandardCharsets.UTF_8)));
String authorizationHeader = "Basic " + encodedString;
String jsonRequest ="{\n" +
" \"username\": \""+newUserAccount.getUsername()+"\"," +
" \"password\": \""+newUserAccount.getPassword()+"\"," +
" \"name\": \""+newUserAccount.getAccountName()+"\"," +
" \"email\": \""+newUserAccount.getEmail()+"\"," +
String jsonRequest = "{\n" +
" \"username\": \"" + newUserAccount.getUsername() + "\"," +
" \"password\": \"" + newUserAccount.getPassword() + "\"," +
" \"name\": \"" + newUserAccount.getAccountName() + "\"," +
" \"email\": \"" + newUserAccount.getEmail() + "\"," +
" \"properties\": {" +
" \"property\": [" +
" {" +
@ -108,7 +113,8 @@ public class XmppServerClient implements ControlQueueConnector {
StringEntity requestEntity;
try {
requestEntity = new StringEntity(jsonRequest, MediaType.APPLICATION_JSON , StandardCharsets.UTF_8.toString());
requestEntity = new StringEntity(jsonRequest, MediaType.APPLICATION_JSON,
StandardCharsets.UTF_8.toString());
} catch (UnsupportedEncodingException e) {
return false;
}
@ -148,14 +154,13 @@ public class XmppServerClient implements ControlQueueConnector {
return true;
}
} catch (IOException | IoTException e) {
String errorMsg =
"Error occured whilst trying a 'POST' at : " + xmppUsersAPIEndpoint + " error: " + e.getMessage();
String errorMsg = "Error occured whilst trying a 'POST' at : " + xmppUsersAPIEndpoint;
log.error(errorMsg);
throw new DeviceControllerException(errorMsg, e);
}
} else {
log.warn("XMPP <Enabled> set to false in 'devicemgt-config.xml'");
log.warn(String.format("XMPP <Enabled> set to false in [%s]", DEVICEMGT_CONFIG_FILE));
return false;
}
}
@ -163,9 +168,10 @@ public class XmppServerClient implements ControlQueueConnector {
public boolean doesXMPPUserAccountExist(String username) throws DeviceControllerException {
if (xmppEnabled) {
String xmppUsersAPIEndpoint = xmppEndpoint + XMPP_SERVER_API_CONTEXT + USERS_API + "/" + username;
String xmppCheckUserAPIEndpoint = xmppEndpoint + XMPP_SERVER_API_CONTEXT + XMPP_USERS_API + "/" + username;
if (log.isDebugEnabled()) {
log.debug("The API Endpoint URL of the XMPP Server is set to: " + xmppUsersAPIEndpoint);
log.debug("The Check-User-Account Endpoint URL of the XMPP Server is set to: " +
xmppCheckUserAPIEndpoint);
}
String encodedString = xmppUsername + ":" + xmppPassword;
@ -174,9 +180,9 @@ public class XmppServerClient implements ControlQueueConnector {
URL xmppUserApiUrl;
try {
xmppUserApiUrl = new URL(xmppUsersAPIEndpoint);
xmppUserApiUrl = new URL(xmppCheckUserAPIEndpoint);
} catch (MalformedURLException e) {
String errMsg = "Malformed XMPP URL + " + xmppUsersAPIEndpoint;
String errMsg = "Malformed XMPP URL + " + xmppCheckUserAPIEndpoint;
log.error(errMsg);
throw new DeviceControllerException(errMsg, e);
}
@ -191,7 +197,7 @@ public class XmppServerClient implements ControlQueueConnector {
throw new DeviceControllerException(errorMsg, e);
}
HttpGet httpGet = new HttpGet(xmppUsersAPIEndpoint);
HttpGet httpGet = new HttpGet(xmppCheckUserAPIEndpoint);
httpGet.addHeader(HttpHeaders.AUTHORIZATION, authorizationHeader);
try {
@ -208,8 +214,7 @@ public class XmppServerClient implements ControlQueueConnector {
}
} catch (IOException | IoTException e) {
String errorMsg = "Error occured whilst trying a 'GET' at : " + xmppUsersAPIEndpoint +
"\nError: " + e.getMessage();
String errorMsg = "Error occured whilst trying a 'GET' at : " + xmppCheckUserAPIEndpoint;
log.error(errorMsg);
throw new DeviceControllerException(errorMsg, e);
}
@ -219,9 +224,146 @@ public class XmppServerClient implements ControlQueueConnector {
}
return true;
} else {
String warnMsg = "XMPP <Enabled> set to false in 'devicemgt-config.xml'";
String warnMsg = String.format("XMPP <Enabled> set to false in [%s]", DEVICEMGT_CONFIG_FILE);
log.warn(warnMsg);
throw new DeviceControllerException(warnMsg);
}
}
public JSONArray getAllCurrentUserSessions() throws DeviceControllerException {
if (xmppEnabled) {
JSONArray xmppSessions;
String xmppSessionsAPIEndpoint = xmppEndpoint + XMPP_SERVER_API_CONTEXT + XMPP_SESSIONS_API;
if (log.isDebugEnabled()) {
log.debug("The Get-Sessions Endpoint URL of the XMPP Server is set to: " + xmppSessionsAPIEndpoint);
}
String encodedString = xmppUsername + ":" + xmppPassword;
encodedString = new String(Base64.encodeBase64(encodedString.getBytes(StandardCharsets.UTF_8)));
String authorizationHeader = "Basic " + encodedString;
URL xmppUserApiUrl;
try {
xmppUserApiUrl = new URL(xmppSessionsAPIEndpoint);
} catch (MalformedURLException e) {
String errMsg = "Malformed XMPP URL + " + xmppSessionsAPIEndpoint;
log.error(errMsg);
throw new DeviceControllerException(errMsg, e);
}
HttpClient httpClient;
try {
httpClient = IoTUtil.getHttpClient(xmppUserApiUrl.getPort(), xmppUserApiUrl.getProtocol());
} catch (Exception e) {
String errorMsg = "Error on getting a http client for port :" + xmppUserApiUrl.getPort() +
" protocol :" + xmppUserApiUrl.getProtocol();
log.error(errorMsg);
throw new DeviceControllerException(errorMsg, e);
}
HttpGet httpGet = new HttpGet(xmppSessionsAPIEndpoint);
httpGet.addHeader(HttpHeaders.AUTHORIZATION, authorizationHeader);
httpGet.addHeader(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON);
try {
HttpResponse httpResponse = httpClient.execute(httpGet);
if (httpResponse.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
String errorMsg = "XMPP Server returned status: '" + httpResponse.getStatusLine().getStatusCode() +
"' for checking current XMPP Sessions.";
log.error(errorMsg);
throw new DeviceControllerException(errorMsg);
}
String response = IoTUtil.getResponseString(httpResponse);
xmppSessions = new JSONObject(response).getJSONArray("session");
return xmppSessions;
} catch (IOException | IoTException e) {
String errorMsg = "Error occured whilst trying a 'GET' at : " + xmppSessionsAPIEndpoint;
log.error(errorMsg);
throw new DeviceControllerException(errorMsg, e);
}
} else {
String warnMsg = String.format("XMPP <Enabled> set to false in [%s]", DEVICEMGT_CONFIG_FILE);
log.warn(warnMsg);
throw new DeviceControllerException(warnMsg);
}
}
public void deleteCurrentXmppSessions() throws DeviceControllerException {
JSONArray xmppSessionsArray;
try {
xmppSessionsArray = getAllCurrentUserSessions();
} catch (DeviceControllerException e) {
if (e.getMessage().contains(DEVICEMGT_CONFIG_FILE)) {
log.warn(String.format("XMPP <Enabled> set to false in [%s]", DEVICEMGT_CONFIG_FILE));
return;
} else {
throw e;
}
}
if (xmppSessionsArray.length() != 0) {
String xmppSessionsAPIEndpoint = xmppEndpoint + XMPP_SERVER_API_CONTEXT + XMPP_SESSIONS_API;
String encodedString = xmppUsername + ":" + xmppPassword;
encodedString = new String(Base64.encodeBase64(encodedString.getBytes(StandardCharsets.UTF_8)));
String authorizationHeader = "Basic " + encodedString;
if (log.isDebugEnabled()) {
log.debug("The Get-Sessions Endpoint URL of the XMPP Server is set to: " + xmppSessionsAPIEndpoint);
}
URL xmppUserApiUrl;
try {
xmppUserApiUrl = new URL(xmppSessionsAPIEndpoint);
} catch (MalformedURLException e) {
String errMsg = "Malformed XMPP URL + " + xmppSessionsAPIEndpoint;
log.error(errMsg);
throw new DeviceControllerException(errMsg, e);
}
HttpClient httpClient;
try {
httpClient = IoTUtil.getHttpClient(xmppUserApiUrl.getPort(), xmppUserApiUrl.getProtocol());
} catch (Exception e) {
String errorMsg = "Error on getting a http client for port :" + xmppUserApiUrl.getPort() +
" protocol :" + xmppUserApiUrl.getProtocol();
log.error(errorMsg);
throw new DeviceControllerException(errorMsg, e);
}
for (int i = 0; i < xmppSessionsArray.length(); i++) {
String sessionName = xmppSessionsArray.getJSONObject(i).getString("username");
String xmppUserSessionsAPIEndpoint = xmppSessionsAPIEndpoint + "/" + sessionName;
HttpDelete httpDelete = new HttpDelete(xmppUserSessionsAPIEndpoint);
httpDelete.addHeader(HttpHeaders.AUTHORIZATION, authorizationHeader);
try {
HttpResponse httpResponse = httpClient.execute(httpDelete);
if (httpResponse.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
String errorMsg =
"XMPP Server returned status: '" + httpResponse.getStatusLine().getStatusCode() +
"' for checking current XMPP Sessions.";
log.error(errorMsg);
throw new DeviceControllerException(errorMsg);
}
} catch (IOException e) {
String errorMsg = "Error occured whilst trying a 'DELETE' user-session [" + sessionName + "] " +
"at : " + xmppUserSessionsAPIEndpoint;
log.error(errorMsg);
throw new DeviceControllerException(errorMsg, e);
}
}
}
}
}

@ -27,16 +27,19 @@ import org.wso2.carbon.databridge.core.DataBridgeReceiverService;
import org.wso2.carbon.device.mgt.iot.DeviceController;
import org.wso2.carbon.device.mgt.iot.UserManagement;
import org.wso2.carbon.device.mgt.iot.analytics.statistics.IoTEventsStatisticsClient;
import org.wso2.carbon.device.mgt.iot.analytics.statistics.IoTUsageStatisticsClient;
import org.wso2.carbon.device.mgt.iot.config.devicetype.IotDeviceTypeConfigurationManager;
import org.wso2.carbon.device.mgt.iot.config.devicetype.datasource.IotDeviceTypeConfig;
import org.wso2.carbon.device.mgt.iot.config.server.DeviceManagementConfigurationManager;
import org.wso2.carbon.device.mgt.iot.controlqueue.xmpp.XmppConfig;
import org.wso2.carbon.device.mgt.iot.controlqueue.xmpp.XmppServerClient;
import org.wso2.carbon.device.mgt.iot.exception.DeviceControllerException;
import org.wso2.carbon.device.mgt.iot.service.DeviceTypeService;
import org.wso2.carbon.device.mgt.iot.service.DeviceTypeServiceImpl;
import org.wso2.carbon.device.mgt.iot.startup.StartupUrlPrinter;
import org.wso2.carbon.device.mgt.iot.util.iotdevice.exception.IotDeviceMgtPluginException;
import org.wso2.carbon.device.mgt.iot.config.server.DeviceManagementConfigurationManager;
import org.wso2.carbon.device.mgt.iot.analytics.statistics.IoTUsageStatisticsClient;
import org.wso2.carbon.device.mgt.iot.config.devicetype.IotDeviceTypeConfigurationManager;
import org.wso2.carbon.device.mgt.iot.util.iotdevice.dao.IotDeviceManagementDAOFactory;
import org.wso2.carbon.device.mgt.iot.util.iotdevice.dao.util.IotDeviceManagementDAOUtil;
import org.wso2.carbon.device.mgt.iot.util.iotdevice.exception.IotDeviceMgtPluginException;
import org.wso2.carbon.ndatasource.core.DataSourceService;
import org.wso2.carbon.user.core.service.RealmService;
import org.wso2.carbon.utils.ConfigurationContextService;
@ -75,6 +78,7 @@ public class IotDeviceManagementServiceComponent {
private static final Log log = LogFactory.getLog(IotDeviceManagementServiceComponent.class);
public static ConfigurationContextService configurationContextService;
protected void activate(ComponentContext ctx) {
if (log.isDebugEnabled()) {
log.debug("Activating Iot Device Management Service Component");
@ -82,7 +86,8 @@ public class IotDeviceManagementServiceComponent {
try {
BundleContext bundleContext = ctx.getBundleContext(); /* Initialize the data source configuration */
BundleContext bundleContext = ctx.getBundleContext(); /* Initialize the data source
configuration */
DeviceManagementConfigurationManager.getInstance().initConfig();
IotDeviceTypeConfigurationManager.getInstance().initConfig();
Map<String, IotDeviceTypeConfig> dsConfigMap =
@ -91,7 +96,6 @@ public class IotDeviceManagementServiceComponent {
IotDeviceManagementDAOFactory.init(dsConfigMap);
String setupOption = System.getProperty("setup");
if (setupOption != null) {
if (log.isDebugEnabled()) {
@ -100,7 +104,7 @@ public class IotDeviceManagementServiceComponent {
"to begin");
}
try {
for (String pluginType : dsConfigMap.keySet()){
for (String pluginType : dsConfigMap.keySet()) {
IotDeviceManagementDAOUtil
.setupIotDeviceManagementSchema(
IotDeviceManagementDAOFactory.getDataSourceMap
@ -137,6 +141,20 @@ public class IotDeviceManagementServiceComponent {
}
protected void deactivate(ComponentContext ctx) {
XmppConfig xmppConfig = XmppConfig.getInstance();
try {
if (xmppConfig.isEnabled()) {
XmppServerClient xmppServerClient = new XmppServerClient();
xmppServerClient.initControlQueue();
xmppServerClient.deleteCurrentXmppSessions();
}
} catch (DeviceControllerException e) {
String errorMsg = "An error occurred whilst trying to delete all existing XMPP login sessions at " +
"[" + xmppConfig.getXmppEndpoint() + "].";
log.error(errorMsg, e);
}
if (log.isDebugEnabled()) {
log.debug("De-activating Iot Device Management Service Component");
}
@ -161,7 +179,7 @@ public class IotDeviceManagementServiceComponent {
log.debug("Setting ConfigurationContextService");
}
IotDeviceManagementServiceComponent.configurationContextService=configurationContextService;
IotDeviceManagementServiceComponent.configurationContextService = configurationContextService;
}
@ -169,11 +187,12 @@ public class IotDeviceManagementServiceComponent {
if (log.isDebugEnabled()) {
log.debug("Un-setting ConfigurationContextService");
}
IotDeviceManagementServiceComponent.configurationContextService=null;
IotDeviceManagementServiceComponent.configurationContextService = null;
}
/**
* Sets Realm Service
*
* @param realmService associated realm service reference
*/
protected void setRealmService(RealmService realmService) {
@ -187,6 +206,7 @@ public class IotDeviceManagementServiceComponent {
/**
* Unsets Realm Service
*
* @param realmService associated realm service reference
*/
protected void unsetRealmService(RealmService realmService) {

Loading…
Cancel
Save