diff --git a/modules/samples/pom.xml b/modules/samples/pom.xml
index 52091079..92ccda50 100644
--- a/modules/samples/pom.xml
+++ b/modules/samples/pom.xml
@@ -381,10 +381,17 @@
provided
-
-
-
-
+
+
+ org.igniterealtime.smack.wso2
+ smack
+ ${smack.wso2.version}
+
+
+ org.igniterealtime.smack.wso2
+ smackx
+ ${smackx.wso2.version}
+
@@ -418,35 +425,32 @@
0.4.0
-
-
2.6.1
1.9.0
1.1.1
-
-
-
0.9.2-SNAPSHOT
[0.8.0, 2.0.0)
1.0.0-SNAPSHOT
-
3.1.0.wso2v2
3.0.0.wso2v1
2.6.1
-
-
1.7
1.7
1.0.3
+
+ 3.0.4.wso2v1
+ 3.0.4.wso2v1
+ 0.4.0
+
diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/pom.xml b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/pom.xml
index 3db470bd..d5ded8e9 100644
--- a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/pom.xml
+++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/pom.xml
@@ -98,6 +98,16 @@
org.wso2.carbon.device.mgt.analytics
+
+
+ org.igniterealtime.smack.wso2
+ smack
+
+
+ org.igniterealtime.smack.wso2
+ smackx
+
+
diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmControllerService.java b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmControllerService.java
index aa5f6b76..575eb120 100644
--- a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmControllerService.java
+++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmControllerService.java
@@ -23,6 +23,8 @@ import org.apache.http.client.methods.HttpGet;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.jivesoftware.smack.packet.Message;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.analytics.exception.DataPublisherConfigurationException;
import org.wso2.carbon.device.mgt.analytics.service.DeviceAnalyticsService;
@@ -30,11 +32,14 @@ import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
import org.wso2.carbon.device.mgt.iot.common.DeviceController;
import org.wso2.carbon.device.mgt.iot.common.DeviceValidator;
+import org.wso2.carbon.device.mgt.iot.common.controlqueue.mqtt.MqttConfig;
import org.wso2.carbon.device.mgt.iot.common.controlqueue.xmpp.XmppConfig;
import org.wso2.carbon.device.mgt.iot.common.exception.DeviceControllerException;
import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.DeviceJSON;
import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.constants
.VirtualFireAlarmConstants;
+import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.mqtt.MQTTClient;
+import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.xmpp.XMPPClient;
import org.wso2.carbon.utils.CarbonUtils;
import javax.servlet.http.HttpServletResponse;
@@ -63,8 +68,10 @@ import java.util.concurrent.Future;
public class VirtualFireAlarmControllerService {
private static Log log = LogFactory.getLog(VirtualFireAlarmControllerService.class);
+
//TODO; replace this tenant domain
private final String SUPER_TENANT = "carbon.super";
+
@Context //injected response proxy supporting multiple thread
private HttpServletResponse response;
private static final String TEMPERATURE_STREAM_DEFINITION = "org.wso2.iot.devices.temperature";
@@ -78,9 +85,53 @@ public class VirtualFireAlarmControllerService {
public static final String HTTP_PROTOCOL = "HTTP";
public static final String MQTT_PROTOCOL = "MQTT";
+ private static ConcurrentHashMap deviceToIpMap = new ConcurrentHashMap();
+ private static XMPPClient xmppClient;
+ private static MQTTClient mqttClient;
+ private static final String mqttServerSubscribeTopic = "wso2/iot/+/" + VirtualFireAlarmConstants.DEVICE_TYPE + "/+/reply";
+ private static final String iotServerSubscriber = "IoT-Server";
+
+ static{
+ String xmppServer = XmppConfig.getInstance().getXmppControlQueue().getServerURL();
+ int indexOfChar = xmppServer.lastIndexOf('/');
+ if (indexOfChar != -1) {
+ xmppServer = xmppServer.substring((indexOfChar + 1), xmppServer.length());
+ }
+
+ int xmppPort = Integer.parseInt(XmppConfig.getInstance().getSERVER_CONNECTION_PORT());
+ xmppClient = new XMPPClient(xmppServer, xmppPort) {
+ @Override
+ protected void processXMPPMessage(Message xmppMessage) {
+
+ }
+ };
+
+ String xmppUsername = XmppConfig.getInstance().getXmppUsername();
+ String xmppPassword = XmppConfig.getInstance().getXmppPassword();
+
+ try {
+ xmppClient.connectAndLogin(xmppUsername, xmppPassword, "iotServer");
+ } catch (DeviceManagementException e) {
+ e.printStackTrace();
+ }
+
+ xmppClient.setMessageFilterAndListener("");
+
+ String mqttEndpoint = MqttConfig.getInstance().getMqttQueueEndpoint();
+ mqttClient = new MQTTClient(iotServerSubscriber, VirtualFireAlarmConstants.DEVICE_TYPE, mqttEndpoint, mqttServerSubscribeTopic) {
+ @Override
+ protected void postMessageArrived(String topic, MqttMessage message) {
+
+ }
+ };
+
+ try {
+ mqttClient.connectAndSubscribe();
+ } catch (DeviceManagementException e) {
+ e.printStackTrace();
+ }
+ }
- private static ConcurrentHashMap deviceToIpMap =
- new ConcurrentHashMap();
@Path("/register/{owner}/{deviceId}/{ip}")
@POST
diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmManagerService.java b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmManagerService.java
index c75937cc..9a58df9a 100644
--- a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmManagerService.java
+++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmManagerService.java
@@ -306,15 +306,6 @@ public class VirtualFireAlarmManagerService {
newXmppAccount.setAccountName(owner + "_" + deviceId);
newXmppAccount.setUsername(deviceId);
newXmppAccount.setPassword(accessToken);
-
- String xmppEndPoint = XmppConfig.getInstance().getXmppControlQueue().getServerURL();
-
- int indexOfChar = xmppEndPoint.lastIndexOf('/');
-
- if (indexOfChar != -1) {
- xmppEndPoint = xmppEndPoint.substring((indexOfChar + 1), xmppEndPoint.length());
- }
-
newXmppAccount.setEmail(deviceId + "@wso2.com");
XmppServerClient xmppServerClient = new XmppServerClient();
diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/mqtt/MQTTClient.java b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/mqtt/MQTTClient.java
new file mode 100644
index 00000000..975d7c75
--- /dev/null
+++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/mqtt/MQTTClient.java
@@ -0,0 +1,280 @@
+/*
+ * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.mqtt;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.MqttSecurityException;
+import org.wso2.carbon.device.mgt.common.DeviceManagementException;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class contains the Agent specific implementation for all the MQTT functionality. This
+ * includes connecting to a MQTT Broker & subscribing to the appropriate MQTT-topic, action plan
+ * upon losing connection or successfully delivering a message to the broker and processing
+ * incoming messages. Makes use of the 'Paho-MQTT' library provided by Eclipse Org.
+ *
+ * It is an abstract class with an abstract method 'postMessageArrived' allowing the user to have
+ * their own implementation of the actions to be taken upon receiving a message to the subscribed
+ * MQTT-Topic.
+ */
+public abstract class MQTTClient implements MqttCallback {
+ private static final Log log = LogFactory.getLog(MQTTClient.class);
+
+ private MqttClient client;
+ private String clientId;
+ private MqttConnectOptions options;
+ private String subscribeTopic;
+ private String clientWillTopic;
+ private String mqttBrokerEndPoint;
+ private int reConnectionInterval;
+
+ /**
+ * Constructor for the MQTTClient which takes in the owner, type of the device and the MQTT
+ * Broker URL and the topic to subscribe.
+ *
+ * @param deviceOwner the owner of the device.
+ * @param deviceType the CDMF Device-Type of the device.
+ * @param mqttBrokerEndPoint the IP/URL of the MQTT broker endpoint.
+ * @param subscribeTopic the MQTT topic to which the client is to be subscribed
+ */
+ protected MQTTClient(String deviceOwner, String deviceType, String mqttBrokerEndPoint,
+ String subscribeTopic) {
+ this.clientId = deviceOwner + ":" + deviceType;
+ this.subscribeTopic = subscribeTopic;
+ this.clientWillTopic = deviceType + File.separator + "disconnection";
+ this.mqttBrokerEndPoint = mqttBrokerEndPoint;
+ this.reConnectionInterval = 5000;
+ this.initSubscriber();
+ }
+
+ /**
+ * Constructor for the MQTTClient which takes in the owner, type of the device and the MQTT
+ * Broker URL and the topic to subscribe. Additionally this constructor takes in the
+ * reconnection-time interval between successive attempts to connect to the broker.
+ *
+ * @param deviceOwner the owner of the device.
+ * @param deviceType the CDMF Device-Type of the device.
+ * @param mqttBrokerEndPoint the IP/URL of the MQTT broker endpoint.
+ * @param subscribeTopic the MQTT topic to which the client is to be subscribed
+ * @param reConnectionInterval time interval in SECONDS between successive attempts to connect
+ * to the broker.
+ */
+ protected MQTTClient(String deviceOwner, String deviceType, String mqttBrokerEndPoint,
+ String subscribeTopic, int reConnectionInterval) {
+ this.clientId = deviceOwner + ":" + deviceType;
+ this.subscribeTopic = subscribeTopic;
+ this.clientWillTopic = deviceType + File.separator + "disconnection";
+ this.mqttBrokerEndPoint = mqttBrokerEndPoint;
+ this.reConnectionInterval = reConnectionInterval;
+ this.initSubscriber();
+ }
+
+ /**
+ * Initializes the MQTT-Client.
+ * Creates a client using the given MQTT-broker endpoint and the clientId (which is
+ * constructed by a concatenation of [deviceOwner]:[deviceType]). Also sets the client's
+ * options parameter with the clientWillTopic (in-case of connection failure) and other info.
+ * Also sets the call-back this current class.
+ */
+ private void initSubscriber() {
+ try {
+ client = new MqttClient(this.mqttBrokerEndPoint, clientId, null);
+ log.info("MQTT subscriber was created with ClientID : " + clientId);
+ } catch (MqttException ex) {
+ String errorMsg = "MQTT Client Error\n" + "\tReason: " + ex.getReasonCode() +
+ "\n\tMessage: " + ex.getMessage() + "\n\tLocalMsg: " +
+ ex.getLocalizedMessage() + "\n\tCause: " + ex.getCause() +
+ "\n\tException: " + ex;
+ log.error(errorMsg);
+ }
+
+ options = new MqttConnectOptions();
+ options.setCleanSession(false);
+ options.setWill(clientWillTopic, "connection crashed".getBytes(StandardCharsets.UTF_8), 2,
+ true);
+ client.setCallback(this);
+ }
+
+ /**
+ * Checks whether the connection to the MQTT-Broker persists.
+ *
+ * @return true if the client is connected to the MQTT-Broker, else false.
+ */
+ public boolean isConnected() {
+ return client.isConnected();
+ }
+
+ /**
+ * Connects to the MQTT-Broker and if successfully established connection, then tries to
+ * subscribe to the MQTT-Topic specific to the device. (The MQTT-Topic specific to the
+ * device is created is taken in as a constructor parameter of this class) .
+ *
+ * @throws DeviceManagementException in the event of 'Connecting to' or 'Subscribing to' the
+ * MQTT broker fails.
+ */
+ public void connectAndSubscribe() throws DeviceManagementException {
+ try {
+ client.connect(options);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Subscriber connected to queue at: " + this.mqttBrokerEndPoint);
+ }
+ } catch (MqttSecurityException ex) {
+ String errorMsg = "MQTT Security Exception when connecting to queue\n" + "\tReason: " +
+ " " +
+ ex.getReasonCode() + "\n\tMessage: " + ex.getMessage() +
+ "\n\tLocalMsg: " + ex.getLocalizedMessage() + "\n\tCause: " +
+ ex.getCause() + "\n\tException: " + ex; //throw
+ if (log.isDebugEnabled()) {
+ log.debug(errorMsg);
+ }
+ throw new DeviceManagementException(errorMsg, ex);
+
+ } catch (MqttException ex) {
+ String errorMsg = "MQTT Exception when connecting to queue\n" + "\tReason: " +
+ ex.getReasonCode() + "\n\tMessage: " + ex.getMessage() +
+ "\n\tLocalMsg: " + ex.getLocalizedMessage() + "\n\tCause: " +
+ ex.getCause() + "\n\tException: " + ex; //throw
+ if (log.isDebugEnabled()) {
+ log.debug(errorMsg);
+ }
+ throw new DeviceManagementException(errorMsg, ex);
+ }
+
+ try {
+ client.subscribe(subscribeTopic, 0);
+
+ log.info("Subscriber - " + clientId + " subscribed to topic: " + subscribeTopic);
+ } catch (MqttException ex) {
+ String errorMsg = "MQTT Exception when trying to subscribe to topic: " +
+ subscribeTopic + "\n\tReason: " + ex.getReasonCode() +
+ "\n\tMessage: " + ex.getMessage() + "\n\tLocalMsg: " +
+ ex.getLocalizedMessage() + "\n\tCause: " + ex.getCause() +
+ "\n\tException: " + ex;
+ if (log.isDebugEnabled()) {
+ log.debug(errorMsg);
+ }
+ throw new DeviceManagementException(errorMsg, ex);
+ }
+ }
+
+ /**
+ * Callback method which is triggered once the MQTT client losers its connection to the broker.
+ * A scheduler thread is spawned to continuously re-attempt and connect to the broker and
+ * subscribe to the device's topic. This thread is scheduled to execute after every break
+ * equal to that of the 'reConnectionInterval' of the MQTTClient.
+ *
+ * @param throwable a Throwable Object containing the details as to why the failure occurred.
+ */
+ public void connectionLost(Throwable throwable) {
+ log.warn("Lost Connection for client: " + this.clientId + " to " + this.mqttBrokerEndPoint + ".\nThis was due to - " + throwable.getMessage());
+
+ Runnable reSubscriber = new Runnable() {
+ @Override
+ public void run() {
+ if (!isConnected()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Subscriber reconnecting to queue........");
+ }
+ try {
+ connectAndSubscribe();
+ } catch (DeviceManagementException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Could not reconnect and subscribe to ControlQueue.");
+ }
+ }
+ } else {
+ return;
+ }
+ }
+ };
+
+ ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
+ service.scheduleAtFixedRate(reSubscriber, 0, this.reConnectionInterval, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Callback method which is triggered upon receiving a MQTT Message from the broker. Spawns a
+ * new thread that executes any actions to be taken with the received message.
+ *
+ * @param topic the MQTT-Topic to which the received message was published to and the
+ * client was subscribed to.
+ * @param mqttMessage the actual MQTT-Message that was received from the broker.
+ */
+ public void messageArrived(final String topic, final MqttMessage mqttMessage) {
+ Thread subscriberThread = new Thread() {
+ public void run() {
+ postMessageArrived(topic, mqttMessage);
+ }
+ };
+ subscriberThread.start();
+ }
+
+ /**
+ * Callback method which gets triggered upon successful completion of a message delivery to
+ * the broker.
+ *
+ * @param iMqttDeliveryToken the MQTT-DeliveryToken which includes the details about the
+ * specific message delivery.
+ */
+ public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+ String message = "";
+ try {
+ message = iMqttDeliveryToken.getMessage().toString();
+ } catch (MqttException e) {
+ log.error("Error occurred whilst trying to read the message from the MQTT delivery token.");
+ }
+ String topic = iMqttDeliveryToken.getTopics()[0];
+ String client = iMqttDeliveryToken.getClient().getClientId();
+ log.info("Message - '" + message + "' of client [" + client + "] for the topic (" + topic + ") was delivered successfully.");
+ }
+
+ /**
+ * This is an abstract method used for post processing the received MQTT-message. This
+ * method will be implemented as per requirement at the time of creating an object of this
+ * class.
+ *
+ * @param topic The topic for which the message was received for.
+ * @param message The message received for the subscription to the above topic.
+ */
+ protected abstract void postMessageArrived(String topic, MqttMessage message);
+
+ /**
+ * Gets the MQTTClient object.
+ *
+ * @return the MQTTClient object.
+ */
+ public MqttClient getClient() {
+ return client;
+ }
+
+}
+
diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/xmpp/XMPPClient.java b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/xmpp/XMPPClient.java
new file mode 100644
index 00000000..8391b1cb
--- /dev/null
+++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/xmpp/XMPPClient.java
@@ -0,0 +1,241 @@
+/*
+ * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.xmpp;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jivesoftware.smack.ConnectionConfiguration;
+import org.jivesoftware.smack.PacketListener;
+import org.jivesoftware.smack.SmackConfiguration;
+import org.jivesoftware.smack.XMPPConnection;
+import org.jivesoftware.smack.XMPPException;
+import org.jivesoftware.smack.filter.AndFilter;
+import org.jivesoftware.smack.filter.FromContainsFilter;
+import org.jivesoftware.smack.filter.PacketFilter;
+import org.jivesoftware.smack.filter.PacketTypeFilter;
+import org.jivesoftware.smack.packet.Message;
+import org.jivesoftware.smack.packet.Packet;
+import org.wso2.carbon.device.mgt.common.DeviceManagementException;
+
+/**
+ * This class contains the Agent specific implementation for all the XMPP functionality. This
+ * includes connecting to a XMPP Server & Login using the device's XMPP-Account, Setting
+ * listeners and filters on incoming XMPP messages and Sending XMPP replies for control signals
+ * received. Makes use of the 'Smack-XMPP' library provided by jivesoftware/igniterealtime.
+ *
+ * It is an abstract class with an abstract method 'processXMPPMessage' allowing the user to have
+ * their own implementation of the actions to be taken upon receiving an appropriate XMPP message.
+ */
+public abstract class XMPPClient {
+ private static final Log log = LogFactory.getLog(XMPPClient.class);
+
+ private int replyTimeoutInterval = 500; // millis
+ private String server;
+ private int port;
+ private ConnectionConfiguration config;
+ private XMPPConnection connection;
+ private PacketFilter filter;
+ private PacketListener listener;
+
+ /**
+ * Constructor for XMPPClient passing server-IP and the XMPP-port.
+ *
+ * @param server the IP of the XMPP server.
+ * @param port the XMPP server's port to connect to. (default - 5222)
+ */
+ public XMPPClient(String server, int port) {
+ this.server = server;
+ this.port = port;
+ initXMPPClient();
+ }
+
+ /**
+ * Initializes the XMPP Client.
+ * Sets the time-out-limit whilst waiting for XMPP-replies from server. Creates the XMPP
+ * configurations to connect to the server and creates the XMPPConnection object used for
+ * connecting and Logging-In.
+ */
+ private void initXMPPClient() {
+ log.info(String.format(
+ "Initializing connection to XMPP Server at %1$s via port %2$d......", server,
+ port));
+ SmackConfiguration.setPacketReplyTimeout(replyTimeoutInterval);
+ config = new ConnectionConfiguration(server, port);
+// TODO:: Need to enable SASL-Authentication appropriately
+ config.setSASLAuthenticationEnabled(false);
+ config.setSecurityMode(ConnectionConfiguration.SecurityMode.disabled);
+ connection = new XMPPConnection(config);
+ }
+
+ /**
+ * Connects to the XMPP-Server and if successfully established connection, then tries to Log
+ * in using the device's XMPP Account credentials. (The XMPP-Account specific to the device is
+ * created in the XMPP server whilst downloading the Agent from the IoT Server) .
+ *
+ * @param username the username of the device's XMPP-Account.
+ * @param password the password of the device's XMPP-Account.
+ * @param resource the resource the resource, specific to the XMPP-Account to which the login
+ * is made to
+ * @throws DeviceManagementException in the event of 'Connecting to' or 'Logging into' the
+ * XMPP server fails.
+ */
+ public void connectAndLogin(String username, String password, String resource)
+ throws DeviceManagementException {
+ try {
+ connection.connect();
+ log.info(String.format(
+ "Connection to XMPP Server at %1$s established successfully......", server));
+
+ } catch (XMPPException xmppExcepion) {
+ String errorMsg =
+ "Connection attempt to the XMPP Server at " + server + " via port " + port +
+ " failed.";
+ log.info(errorMsg);
+ throw new DeviceManagementException(errorMsg, xmppExcepion);
+ }
+
+ if (connection.isConnected()) {
+ try {
+ if (resource == null) {
+ connection.login(username, password);
+ log.info(String.format("Logged into XMPP Server at %1$s as user %2$s......", server, username));
+ } else {
+ connection.login(username, password, resource);
+ log.info(String.format(
+ "Logged into XMPP Server at %1$s as user %2$s on resource %3$s......",
+ server, username, resource));
+ }
+ } catch (XMPPException xmppException) {
+ String errorMsg =
+ "Login attempt to the XMPP Server at " + server + " with username - " +
+ username + " failed.";
+ log.info(errorMsg);
+ throw new DeviceManagementException(errorMsg, xmppException);
+ }
+ }
+ }
+
+ /**
+ * Sets a filter on all the incoming XMPP-Messages for the JID (XMPP-Account ID) passed in.
+ * Also creates a listener for the incoming messages and connects the listener to the
+ * XMPPConnection alongside the set filter.
+ *
+ * @param senderJID the JID (XMPP-Account ID) to which the filter is to be set.
+ */
+ public void setMessageFilterAndListener(String senderJID) {
+ filter = new AndFilter(new PacketTypeFilter(Message.class), new
+ FromContainsFilter(
+ senderJID));
+ listener = new PacketListener() {
+ @Override
+ public void processPacket(Packet packet) {
+ if (packet instanceof Message) {
+ final Message xmppMessage = (Message) packet;
+ Thread msgProcessThread = new Thread() {
+ public void run() {
+ processXMPPMessage(xmppMessage);
+ }
+ };
+ msgProcessThread.start();
+ }
+ }
+ };
+
+ connection.addPacketListener(listener, filter);
+ }
+
+
+
+ /**
+ * Sends an XMPP message
+ *
+ * @param JID the JID (XMPP Account ID) to which the message is to be sent to.
+ * @param message the XMPP-Message that is to be sent.
+ */
+ public void sendXMPPMessage(String JID, String message) {
+ sendXMPPMessage(JID, message, "Reply-From-Device");
+ if (log.isDebugEnabled()) {
+ log.debug("Message: " + message + " to XMPP JID [" + JID + "] sent successfully");
+ }
+ }
+
+ /**
+ * Overloaded method to send an XMPP message. Includes the subject to be mentioned in the
+ * message that is sent.
+ *
+ * @param JID the JID (XMPP Account ID) to which the message is to be sent to.
+ * @param message the XMPP-Message that is to be sent.
+ * @param subject the subject that the XMPP-Message would carry.
+ */
+ public void sendXMPPMessage(String JID, String message, String subject) {
+ Message xmppMessage = new Message();
+ xmppMessage.setTo(JID);
+ xmppMessage.setSubject(subject);
+ xmppMessage.setBody(message);
+ xmppMessage.setType(Message.Type.chat);
+ connection.sendPacket(xmppMessage);
+ }
+
+ /**
+ * Checks whether the connection to the XMPP-Server persists.
+ *
+ * @return true if the client is connected to the XMPP-Server, else false.
+ */
+ public boolean isConnected() {
+ return connection.isConnected();
+ }
+
+ /**
+ * Sets the client's time-out-limit whilst waiting for XMPP-replies from server.
+ *
+ * @param millis the time in millis to be set as the time-out-limit whilst waiting for a
+ * XMPP-reply.
+ */
+ public void setReplyTimeoutInterval(int millis) {
+ this.replyTimeoutInterval = millis;
+ }
+
+ /**
+ * Disables default debugger provided by the XMPPConnection.
+ */
+ public void disableDebugger() {
+ connection.DEBUG_ENABLED = false;
+ }
+
+ /**
+ * Closes the connection to the XMPP Server.
+ */
+ public void closeConnection() {
+ if (connection != null && connection.isConnected()) {
+ connection.disconnect();
+ }
+ }
+
+ /**
+ * This is an abstract method used for post processing the received XMPP-message. This
+ * method will be implemented as per requirement at the time of creating an object of this
+ * class.
+ *
+ * @param xmppMessage the xmpp message received by the listener.
+ */
+ protected abstract void processXMPPMessage(Message xmppMessage);
+
+}
+
+