From d384942f693352e02bd890494e5244d19e538469 Mon Sep 17 00:00:00 2001 From: Rasika Perera Date: Mon, 12 Oct 2015 08:12:52 +0530 Subject: [PATCH 1/2] adding policy monitor service for firealarm --- .../pom.xml | 18 +++ .../FireAlarmPolicyMonitoringService.java | 106 ++++++++++++++++++ modules/samples/pom.xml | 23 +++- modules/samples/sample_pom.xml | 7 ++ pom.xml | 10 ++ 5 files changed, 161 insertions(+), 3 deletions(-) create mode 100644 modules/samples/firealarm/src/org.wso2.carbon.device.mgt.iot.sample.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/firealarm/plugin/impl/FireAlarmPolicyMonitoringService.java diff --git a/modules/samples/firealarm/src/org.wso2.carbon.device.mgt.iot.sample.firealarm.plugin.impl/pom.xml b/modules/samples/firealarm/src/org.wso2.carbon.device.mgt.iot.sample.firealarm.plugin.impl/pom.xml index 765e8edf..cbe014c7 100644 --- a/modules/samples/firealarm/src/org.wso2.carbon.device.mgt.iot.sample.firealarm.plugin.impl/pom.xml +++ b/modules/samples/firealarm/src/org.wso2.carbon.device.mgt.iot.sample.firealarm.plugin.impl/pom.xml @@ -67,9 +67,12 @@ org.w3c.dom, org.wso2.carbon.device.mgt.common.*, org.wso2.carbon.device.mgt.common, + org.wso2.carbon.policy.mgt.common, + org.wso2.carbon.policy.mgt.common.*, org.wso2.carbon.context.*, org.wso2.carbon.ndatasource.core, org.wso2.carbon.device.mgt.iot.common.*, + com.google.gson.* @@ -97,6 +100,17 @@ org.wso2.carbon.logging + + org.wso2.carbon.devicemgt + org.wso2.carbon.policy.mgt.core + provided + + + org.wso2.carbon.devicemgt + org.wso2.carbon.policy.mgt.common + provided + + org.wso2.carbon.devicemgt org.wso2.carbon.device.mgt.common @@ -111,6 +125,10 @@ org.wso2.carbon.device.mgt.iot.common + + com.google.code.gson + gson + diff --git a/modules/samples/firealarm/src/org.wso2.carbon.device.mgt.iot.sample.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/firealarm/plugin/impl/FireAlarmPolicyMonitoringService.java b/modules/samples/firealarm/src/org.wso2.carbon.device.mgt.iot.sample.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/firealarm/plugin/impl/FireAlarmPolicyMonitoringService.java new file mode 100644 index 00000000..4ee496e1 --- /dev/null +++ b/modules/samples/firealarm/src/org.wso2.carbon.device.mgt.iot.sample.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/firealarm/plugin/impl/FireAlarmPolicyMonitoringService.java @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.wso2.carbon.device.mgt.iot.sample.firealarm.plugin.impl; + +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.wso2.carbon.context.CarbonContext; +import org.wso2.carbon.device.mgt.common.Device; +import org.wso2.carbon.device.mgt.common.DeviceIdentifier; +import org.wso2.carbon.device.mgt.iot.common.exception.DeviceControllerException; +import org.wso2.carbon.device.mgt.iot.sample.firealarm.plugin.constants.FireAlarmConstants; +import org.wso2.carbon.policy.mgt.common.Policy; +import org.wso2.carbon.policy.mgt.common.monitor.ComplianceData; +import org.wso2.carbon.policy.mgt.common.monitor.ComplianceFeature; +import org.wso2.carbon.policy.mgt.common.monitor.PolicyComplianceException; +import org.wso2.carbon.policy.mgt.common.spi.PolicyMonitoringService; +import org.wso2.carbon.device.mgt.iot.common.transport.mqtt.MqttPublisher; + +import java.util.ArrayList; +import java.util.List; + +import static java.nio.charset.StandardCharsets.UTF_8; + +public class FireAlarmPolicyMonitoringService implements PolicyMonitoringService { + + private static Log log = LogFactory.getLog(FireAlarmPolicyMonitoringService.class); + + @Override + public void notifyDevices(List list) throws PolicyComplianceException { + String userName = CarbonContext.getThreadLocalCarbonContext().getUsername(); + MqttPublisher mqttPublisher = new MqttPublisher(); + try { + mqttPublisher.initControlQueue(); + mqttPublisher.publish("Raspberry-Policy-sender", + "/iot/policymgt/govern/" + FireAlarmConstants.DEVICE_TYPE + "/" + + userName, userName.getBytes(UTF_8)); + } catch (DeviceControllerException e) { + log.error("Error on notifying "+FireAlarmConstants.DEVICE_TYPE+" devices, message not sent."); + } + } + + @Override + public ComplianceData checkPolicyCompliance(DeviceIdentifier deviceIdentifier, Policy policy, + Object compliancePayload) throws PolicyComplianceException { + if (log.isDebugEnabled()) { + log.debug("Checking policy compliance status of device '" + deviceIdentifier.getId() + "'"); + } + ComplianceData complianceData = new ComplianceData(); + if (compliancePayload == null || policy == null) { + return complianceData; + } + List complianceFeatures = new ArrayList(); + + // Parsing json string to get compliance features. + JsonElement jsonElement; + if (compliancePayload instanceof String) { + jsonElement = new JsonParser().parse((String) compliancePayload); + } else { + throw new PolicyComplianceException("Invalid policy compliance payload"); + } + JsonArray jsonArray = jsonElement.getAsJsonArray(); + Gson gson = new Gson(); + ComplianceFeature complianceFeature; + + for (JsonElement element : jsonArray) { + complianceFeature = gson.fromJson(element, ComplianceFeature.class); + complianceFeatures.add(complianceFeature); + } + + complianceData.setComplianceFeatures(complianceFeatures); + + for (ComplianceFeature cf : complianceFeatures) { + if (!cf.isCompliant()) { + complianceData.setStatus(false); + break; + } + } + return complianceData; + } + + @Override + public String getType() { + return FireAlarmConstants.DEVICE_TYPE; + } +} \ No newline at end of file diff --git a/modules/samples/pom.xml b/modules/samples/pom.xml index 52091079..8c1de612 100644 --- a/modules/samples/pom.xml +++ b/modules/samples/pom.xml @@ -96,6 +96,18 @@ ${carbon.device.mgt.version} provided + + org.wso2.carbon.devicemgt + org.wso2.carbon.policy.mgt.core + ${carbon.device.mgt.version} + provided + + + org.wso2.carbon.devicemgt + org.wso2.carbon.policy.mgt.common + ${carbon.device.mgt.version} + provided + @@ -211,6 +223,12 @@ + + com.google.code.gson + gson + ${google.gson.version} + + @@ -417,11 +435,10 @@ 7.0.34.wso2v2 0.4.0 + 2.2.4 - - - 2.6.1 + 2.6.1 1.9.0 1.1.1 diff --git a/modules/samples/sample_pom.xml b/modules/samples/sample_pom.xml index 76e1dc33..20b641a5 100644 --- a/modules/samples/sample_pom.xml +++ b/modules/samples/sample_pom.xml @@ -99,6 +99,13 @@ system ${basedir}/../../../../repository/components/plugins/org.wso2.carbon.device.mgt.analytics_0.9.2.SNAPSHOT.jar + + org.wso2.carbon.devicemgt + org.wso2.carbon.policy.mgt.common + ${carbon.device.mgt.version} + system + ${basedir}/../../../../repository/components/plugins/org.wso2.carbon.policy.mgt.common_0.9.2.SNAPSHOT.jar + diff --git a/pom.xml b/pom.xml index cd191614..2e0e8f13 100644 --- a/pom.xml +++ b/pom.xml @@ -412,6 +412,16 @@ org.wso2.carbon.device.mgt.analytics ${carbon.device.mgt.version} + + org.wso2.carbon.devicemgt + org.wso2.carbon.policy.mgt.core + ${carbon.device.mgt.version} + + + org.wso2.carbon.devicemgt + org.wso2.carbon.policy.mgt.common + ${carbon.device.mgt.version} + From 45c0d9bc03c8bc24bd4827eb0311b209416ef116 Mon Sep 17 00:00:00 2001 From: Shabir Mohamed Date: Mon, 12 Oct 2015 10:46:04 +0530 Subject: [PATCH 2/2] added XMPPClient and MQTTClient for receiving incoming messages --- modules/samples/pom.xml | 28 +- .../pom.xml | 10 + .../VirtualFireAlarmControllerService.java | 55 +++- .../impl/VirtualFireAlarmManagerService.java | 9 - .../service/impl/util/mqtt/MQTTClient.java | 280 ++++++++++++++++++ .../service/impl/util/xmpp/XMPPClient.java | 241 +++++++++++++++ 6 files changed, 600 insertions(+), 23 deletions(-) create mode 100644 modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/mqtt/MQTTClient.java create mode 100644 modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/xmpp/XMPPClient.java diff --git a/modules/samples/pom.xml b/modules/samples/pom.xml index 52091079..92ccda50 100644 --- a/modules/samples/pom.xml +++ b/modules/samples/pom.xml @@ -381,10 +381,17 @@ provided - - - - + + + org.igniterealtime.smack.wso2 + smack + ${smack.wso2.version} + + + org.igniterealtime.smack.wso2 + smackx + ${smackx.wso2.version} + @@ -418,35 +425,32 @@ 0.4.0 - - 2.6.1 1.9.0 1.1.1 - - - 0.9.2-SNAPSHOT [0.8.0, 2.0.0) 1.0.0-SNAPSHOT - 3.1.0.wso2v2 3.0.0.wso2v1 2.6.1 - - 1.7 1.7 1.0.3 + + 3.0.4.wso2v1 + 3.0.4.wso2v1 + 0.4.0 + diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/pom.xml b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/pom.xml index 3db470bd..d5ded8e9 100644 --- a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/pom.xml +++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/pom.xml @@ -98,6 +98,16 @@ org.wso2.carbon.device.mgt.analytics + + + org.igniterealtime.smack.wso2 + smack + + + org.igniterealtime.smack.wso2 + smackx + + diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmControllerService.java b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmControllerService.java index aa5f6b76..575eb120 100644 --- a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmControllerService.java +++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmControllerService.java @@ -23,6 +23,8 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.concurrent.FutureCallback; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.impl.nio.client.HttpAsyncClients; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.jivesoftware.smack.packet.Message; import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.device.mgt.analytics.exception.DataPublisherConfigurationException; import org.wso2.carbon.device.mgt.analytics.service.DeviceAnalyticsService; @@ -30,11 +32,14 @@ import org.wso2.carbon.device.mgt.common.DeviceIdentifier; import org.wso2.carbon.device.mgt.common.DeviceManagementException; import org.wso2.carbon.device.mgt.iot.common.DeviceController; import org.wso2.carbon.device.mgt.iot.common.DeviceValidator; +import org.wso2.carbon.device.mgt.iot.common.controlqueue.mqtt.MqttConfig; import org.wso2.carbon.device.mgt.iot.common.controlqueue.xmpp.XmppConfig; import org.wso2.carbon.device.mgt.iot.common.exception.DeviceControllerException; import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.DeviceJSON; import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.constants .VirtualFireAlarmConstants; +import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.mqtt.MQTTClient; +import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.xmpp.XMPPClient; import org.wso2.carbon.utils.CarbonUtils; import javax.servlet.http.HttpServletResponse; @@ -63,8 +68,10 @@ import java.util.concurrent.Future; public class VirtualFireAlarmControllerService { private static Log log = LogFactory.getLog(VirtualFireAlarmControllerService.class); + //TODO; replace this tenant domain private final String SUPER_TENANT = "carbon.super"; + @Context //injected response proxy supporting multiple thread private HttpServletResponse response; private static final String TEMPERATURE_STREAM_DEFINITION = "org.wso2.iot.devices.temperature"; @@ -78,9 +85,53 @@ public class VirtualFireAlarmControllerService { public static final String HTTP_PROTOCOL = "HTTP"; public static final String MQTT_PROTOCOL = "MQTT"; + private static ConcurrentHashMap deviceToIpMap = new ConcurrentHashMap(); + private static XMPPClient xmppClient; + private static MQTTClient mqttClient; + private static final String mqttServerSubscribeTopic = "wso2/iot/+/" + VirtualFireAlarmConstants.DEVICE_TYPE + "/+/reply"; + private static final String iotServerSubscriber = "IoT-Server"; + + static{ + String xmppServer = XmppConfig.getInstance().getXmppControlQueue().getServerURL(); + int indexOfChar = xmppServer.lastIndexOf('/'); + if (indexOfChar != -1) { + xmppServer = xmppServer.substring((indexOfChar + 1), xmppServer.length()); + } + + int xmppPort = Integer.parseInt(XmppConfig.getInstance().getSERVER_CONNECTION_PORT()); + xmppClient = new XMPPClient(xmppServer, xmppPort) { + @Override + protected void processXMPPMessage(Message xmppMessage) { + + } + }; + + String xmppUsername = XmppConfig.getInstance().getXmppUsername(); + String xmppPassword = XmppConfig.getInstance().getXmppPassword(); + + try { + xmppClient.connectAndLogin(xmppUsername, xmppPassword, "iotServer"); + } catch (DeviceManagementException e) { + e.printStackTrace(); + } + + xmppClient.setMessageFilterAndListener(""); + + String mqttEndpoint = MqttConfig.getInstance().getMqttQueueEndpoint(); + mqttClient = new MQTTClient(iotServerSubscriber, VirtualFireAlarmConstants.DEVICE_TYPE, mqttEndpoint, mqttServerSubscribeTopic) { + @Override + protected void postMessageArrived(String topic, MqttMessage message) { + + } + }; + + try { + mqttClient.connectAndSubscribe(); + } catch (DeviceManagementException e) { + e.printStackTrace(); + } + } - private static ConcurrentHashMap deviceToIpMap = - new ConcurrentHashMap(); @Path("/register/{owner}/{deviceId}/{ip}") @POST diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmManagerService.java b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmManagerService.java index c75937cc..9a58df9a 100644 --- a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmManagerService.java +++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmManagerService.java @@ -306,15 +306,6 @@ public class VirtualFireAlarmManagerService { newXmppAccount.setAccountName(owner + "_" + deviceId); newXmppAccount.setUsername(deviceId); newXmppAccount.setPassword(accessToken); - - String xmppEndPoint = XmppConfig.getInstance().getXmppControlQueue().getServerURL(); - - int indexOfChar = xmppEndPoint.lastIndexOf('/'); - - if (indexOfChar != -1) { - xmppEndPoint = xmppEndPoint.substring((indexOfChar + 1), xmppEndPoint.length()); - } - newXmppAccount.setEmail(deviceId + "@wso2.com"); XmppServerClient xmppServerClient = new XmppServerClient(); diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/mqtt/MQTTClient.java b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/mqtt/MQTTClient.java new file mode 100644 index 00000000..975d7c75 --- /dev/null +++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/mqtt/MQTTClient.java @@ -0,0 +1,280 @@ +/* + * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.mqtt; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.MqttSecurityException; +import org.wso2.carbon.device.mgt.common.DeviceManagementException; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * This class contains the Agent specific implementation for all the MQTT functionality. This + * includes connecting to a MQTT Broker & subscribing to the appropriate MQTT-topic, action plan + * upon losing connection or successfully delivering a message to the broker and processing + * incoming messages. Makes use of the 'Paho-MQTT' library provided by Eclipse Org. + *

+ * It is an abstract class with an abstract method 'postMessageArrived' allowing the user to have + * their own implementation of the actions to be taken upon receiving a message to the subscribed + * MQTT-Topic. + */ +public abstract class MQTTClient implements MqttCallback { + private static final Log log = LogFactory.getLog(MQTTClient.class); + + private MqttClient client; + private String clientId; + private MqttConnectOptions options; + private String subscribeTopic; + private String clientWillTopic; + private String mqttBrokerEndPoint; + private int reConnectionInterval; + + /** + * Constructor for the MQTTClient which takes in the owner, type of the device and the MQTT + * Broker URL and the topic to subscribe. + * + * @param deviceOwner the owner of the device. + * @param deviceType the CDMF Device-Type of the device. + * @param mqttBrokerEndPoint the IP/URL of the MQTT broker endpoint. + * @param subscribeTopic the MQTT topic to which the client is to be subscribed + */ + protected MQTTClient(String deviceOwner, String deviceType, String mqttBrokerEndPoint, + String subscribeTopic) { + this.clientId = deviceOwner + ":" + deviceType; + this.subscribeTopic = subscribeTopic; + this.clientWillTopic = deviceType + File.separator + "disconnection"; + this.mqttBrokerEndPoint = mqttBrokerEndPoint; + this.reConnectionInterval = 5000; + this.initSubscriber(); + } + + /** + * Constructor for the MQTTClient which takes in the owner, type of the device and the MQTT + * Broker URL and the topic to subscribe. Additionally this constructor takes in the + * reconnection-time interval between successive attempts to connect to the broker. + * + * @param deviceOwner the owner of the device. + * @param deviceType the CDMF Device-Type of the device. + * @param mqttBrokerEndPoint the IP/URL of the MQTT broker endpoint. + * @param subscribeTopic the MQTT topic to which the client is to be subscribed + * @param reConnectionInterval time interval in SECONDS between successive attempts to connect + * to the broker. + */ + protected MQTTClient(String deviceOwner, String deviceType, String mqttBrokerEndPoint, + String subscribeTopic, int reConnectionInterval) { + this.clientId = deviceOwner + ":" + deviceType; + this.subscribeTopic = subscribeTopic; + this.clientWillTopic = deviceType + File.separator + "disconnection"; + this.mqttBrokerEndPoint = mqttBrokerEndPoint; + this.reConnectionInterval = reConnectionInterval; + this.initSubscriber(); + } + + /** + * Initializes the MQTT-Client. + * Creates a client using the given MQTT-broker endpoint and the clientId (which is + * constructed by a concatenation of [deviceOwner]:[deviceType]). Also sets the client's + * options parameter with the clientWillTopic (in-case of connection failure) and other info. + * Also sets the call-back this current class. + */ + private void initSubscriber() { + try { + client = new MqttClient(this.mqttBrokerEndPoint, clientId, null); + log.info("MQTT subscriber was created with ClientID : " + clientId); + } catch (MqttException ex) { + String errorMsg = "MQTT Client Error\n" + "\tReason: " + ex.getReasonCode() + + "\n\tMessage: " + ex.getMessage() + "\n\tLocalMsg: " + + ex.getLocalizedMessage() + "\n\tCause: " + ex.getCause() + + "\n\tException: " + ex; + log.error(errorMsg); + } + + options = new MqttConnectOptions(); + options.setCleanSession(false); + options.setWill(clientWillTopic, "connection crashed".getBytes(StandardCharsets.UTF_8), 2, + true); + client.setCallback(this); + } + + /** + * Checks whether the connection to the MQTT-Broker persists. + * + * @return true if the client is connected to the MQTT-Broker, else false. + */ + public boolean isConnected() { + return client.isConnected(); + } + + /** + * Connects to the MQTT-Broker and if successfully established connection, then tries to + * subscribe to the MQTT-Topic specific to the device. (The MQTT-Topic specific to the + * device is created is taken in as a constructor parameter of this class) . + * + * @throws DeviceManagementException in the event of 'Connecting to' or 'Subscribing to' the + * MQTT broker fails. + */ + public void connectAndSubscribe() throws DeviceManagementException { + try { + client.connect(options); + + if (log.isDebugEnabled()) { + log.debug("Subscriber connected to queue at: " + this.mqttBrokerEndPoint); + } + } catch (MqttSecurityException ex) { + String errorMsg = "MQTT Security Exception when connecting to queue\n" + "\tReason: " + + " " + + ex.getReasonCode() + "\n\tMessage: " + ex.getMessage() + + "\n\tLocalMsg: " + ex.getLocalizedMessage() + "\n\tCause: " + + ex.getCause() + "\n\tException: " + ex; //throw + if (log.isDebugEnabled()) { + log.debug(errorMsg); + } + throw new DeviceManagementException(errorMsg, ex); + + } catch (MqttException ex) { + String errorMsg = "MQTT Exception when connecting to queue\n" + "\tReason: " + + ex.getReasonCode() + "\n\tMessage: " + ex.getMessage() + + "\n\tLocalMsg: " + ex.getLocalizedMessage() + "\n\tCause: " + + ex.getCause() + "\n\tException: " + ex; //throw + if (log.isDebugEnabled()) { + log.debug(errorMsg); + } + throw new DeviceManagementException(errorMsg, ex); + } + + try { + client.subscribe(subscribeTopic, 0); + + log.info("Subscriber - " + clientId + " subscribed to topic: " + subscribeTopic); + } catch (MqttException ex) { + String errorMsg = "MQTT Exception when trying to subscribe to topic: " + + subscribeTopic + "\n\tReason: " + ex.getReasonCode() + + "\n\tMessage: " + ex.getMessage() + "\n\tLocalMsg: " + + ex.getLocalizedMessage() + "\n\tCause: " + ex.getCause() + + "\n\tException: " + ex; + if (log.isDebugEnabled()) { + log.debug(errorMsg); + } + throw new DeviceManagementException(errorMsg, ex); + } + } + + /** + * Callback method which is triggered once the MQTT client losers its connection to the broker. + * A scheduler thread is spawned to continuously re-attempt and connect to the broker and + * subscribe to the device's topic. This thread is scheduled to execute after every break + * equal to that of the 'reConnectionInterval' of the MQTTClient. + * + * @param throwable a Throwable Object containing the details as to why the failure occurred. + */ + public void connectionLost(Throwable throwable) { + log.warn("Lost Connection for client: " + this.clientId + " to " + this.mqttBrokerEndPoint + ".\nThis was due to - " + throwable.getMessage()); + + Runnable reSubscriber = new Runnable() { + @Override + public void run() { + if (!isConnected()) { + if (log.isDebugEnabled()) { + log.debug("Subscriber reconnecting to queue........"); + } + try { + connectAndSubscribe(); + } catch (DeviceManagementException e) { + if (log.isDebugEnabled()) { + log.debug("Could not reconnect and subscribe to ControlQueue."); + } + } + } else { + return; + } + } + }; + + ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); + service.scheduleAtFixedRate(reSubscriber, 0, this.reConnectionInterval, TimeUnit.SECONDS); + } + + /** + * Callback method which is triggered upon receiving a MQTT Message from the broker. Spawns a + * new thread that executes any actions to be taken with the received message. + * + * @param topic the MQTT-Topic to which the received message was published to and the + * client was subscribed to. + * @param mqttMessage the actual MQTT-Message that was received from the broker. + */ + public void messageArrived(final String topic, final MqttMessage mqttMessage) { + Thread subscriberThread = new Thread() { + public void run() { + postMessageArrived(topic, mqttMessage); + } + }; + subscriberThread.start(); + } + + /** + * Callback method which gets triggered upon successful completion of a message delivery to + * the broker. + * + * @param iMqttDeliveryToken the MQTT-DeliveryToken which includes the details about the + * specific message delivery. + */ + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + String message = ""; + try { + message = iMqttDeliveryToken.getMessage().toString(); + } catch (MqttException e) { + log.error("Error occurred whilst trying to read the message from the MQTT delivery token."); + } + String topic = iMqttDeliveryToken.getTopics()[0]; + String client = iMqttDeliveryToken.getClient().getClientId(); + log.info("Message - '" + message + "' of client [" + client + "] for the topic (" + topic + ") was delivered successfully."); + } + + /** + * This is an abstract method used for post processing the received MQTT-message. This + * method will be implemented as per requirement at the time of creating an object of this + * class. + * + * @param topic The topic for which the message was received for. + * @param message The message received for the subscription to the above topic. + */ + protected abstract void postMessageArrived(String topic, MqttMessage message); + + /** + * Gets the MQTTClient object. + * + * @return the MQTTClient object. + */ + public MqttClient getClient() { + return client; + } + +} + diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/xmpp/XMPPClient.java b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/xmpp/XMPPClient.java new file mode 100644 index 00000000..8391b1cb --- /dev/null +++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/xmpp/XMPPClient.java @@ -0,0 +1,241 @@ +/* + * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.xmpp; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jivesoftware.smack.ConnectionConfiguration; +import org.jivesoftware.smack.PacketListener; +import org.jivesoftware.smack.SmackConfiguration; +import org.jivesoftware.smack.XMPPConnection; +import org.jivesoftware.smack.XMPPException; +import org.jivesoftware.smack.filter.AndFilter; +import org.jivesoftware.smack.filter.FromContainsFilter; +import org.jivesoftware.smack.filter.PacketFilter; +import org.jivesoftware.smack.filter.PacketTypeFilter; +import org.jivesoftware.smack.packet.Message; +import org.jivesoftware.smack.packet.Packet; +import org.wso2.carbon.device.mgt.common.DeviceManagementException; + +/** + * This class contains the Agent specific implementation for all the XMPP functionality. This + * includes connecting to a XMPP Server & Login using the device's XMPP-Account, Setting + * listeners and filters on incoming XMPP messages and Sending XMPP replies for control signals + * received. Makes use of the 'Smack-XMPP' library provided by jivesoftware/igniterealtime. + *

+ * It is an abstract class with an abstract method 'processXMPPMessage' allowing the user to have + * their own implementation of the actions to be taken upon receiving an appropriate XMPP message. + */ +public abstract class XMPPClient { + private static final Log log = LogFactory.getLog(XMPPClient.class); + + private int replyTimeoutInterval = 500; // millis + private String server; + private int port; + private ConnectionConfiguration config; + private XMPPConnection connection; + private PacketFilter filter; + private PacketListener listener; + + /** + * Constructor for XMPPClient passing server-IP and the XMPP-port. + * + * @param server the IP of the XMPP server. + * @param port the XMPP server's port to connect to. (default - 5222) + */ + public XMPPClient(String server, int port) { + this.server = server; + this.port = port; + initXMPPClient(); + } + + /** + * Initializes the XMPP Client. + * Sets the time-out-limit whilst waiting for XMPP-replies from server. Creates the XMPP + * configurations to connect to the server and creates the XMPPConnection object used for + * connecting and Logging-In. + */ + private void initXMPPClient() { + log.info(String.format( + "Initializing connection to XMPP Server at %1$s via port %2$d......", server, + port)); + SmackConfiguration.setPacketReplyTimeout(replyTimeoutInterval); + config = new ConnectionConfiguration(server, port); +// TODO:: Need to enable SASL-Authentication appropriately + config.setSASLAuthenticationEnabled(false); + config.setSecurityMode(ConnectionConfiguration.SecurityMode.disabled); + connection = new XMPPConnection(config); + } + + /** + * Connects to the XMPP-Server and if successfully established connection, then tries to Log + * in using the device's XMPP Account credentials. (The XMPP-Account specific to the device is + * created in the XMPP server whilst downloading the Agent from the IoT Server) . + * + * @param username the username of the device's XMPP-Account. + * @param password the password of the device's XMPP-Account. + * @param resource the resource the resource, specific to the XMPP-Account to which the login + * is made to + * @throws DeviceManagementException in the event of 'Connecting to' or 'Logging into' the + * XMPP server fails. + */ + public void connectAndLogin(String username, String password, String resource) + throws DeviceManagementException { + try { + connection.connect(); + log.info(String.format( + "Connection to XMPP Server at %1$s established successfully......", server)); + + } catch (XMPPException xmppExcepion) { + String errorMsg = + "Connection attempt to the XMPP Server at " + server + " via port " + port + + " failed."; + log.info(errorMsg); + throw new DeviceManagementException(errorMsg, xmppExcepion); + } + + if (connection.isConnected()) { + try { + if (resource == null) { + connection.login(username, password); + log.info(String.format("Logged into XMPP Server at %1$s as user %2$s......", server, username)); + } else { + connection.login(username, password, resource); + log.info(String.format( + "Logged into XMPP Server at %1$s as user %2$s on resource %3$s......", + server, username, resource)); + } + } catch (XMPPException xmppException) { + String errorMsg = + "Login attempt to the XMPP Server at " + server + " with username - " + + username + " failed."; + log.info(errorMsg); + throw new DeviceManagementException(errorMsg, xmppException); + } + } + } + + /** + * Sets a filter on all the incoming XMPP-Messages for the JID (XMPP-Account ID) passed in. + * Also creates a listener for the incoming messages and connects the listener to the + * XMPPConnection alongside the set filter. + * + * @param senderJID the JID (XMPP-Account ID) to which the filter is to be set. + */ + public void setMessageFilterAndListener(String senderJID) { + filter = new AndFilter(new PacketTypeFilter(Message.class), new + FromContainsFilter( + senderJID)); + listener = new PacketListener() { + @Override + public void processPacket(Packet packet) { + if (packet instanceof Message) { + final Message xmppMessage = (Message) packet; + Thread msgProcessThread = new Thread() { + public void run() { + processXMPPMessage(xmppMessage); + } + }; + msgProcessThread.start(); + } + } + }; + + connection.addPacketListener(listener, filter); + } + + + + /** + * Sends an XMPP message + * + * @param JID the JID (XMPP Account ID) to which the message is to be sent to. + * @param message the XMPP-Message that is to be sent. + */ + public void sendXMPPMessage(String JID, String message) { + sendXMPPMessage(JID, message, "Reply-From-Device"); + if (log.isDebugEnabled()) { + log.debug("Message: " + message + " to XMPP JID [" + JID + "] sent successfully"); + } + } + + /** + * Overloaded method to send an XMPP message. Includes the subject to be mentioned in the + * message that is sent. + * + * @param JID the JID (XMPP Account ID) to which the message is to be sent to. + * @param message the XMPP-Message that is to be sent. + * @param subject the subject that the XMPP-Message would carry. + */ + public void sendXMPPMessage(String JID, String message, String subject) { + Message xmppMessage = new Message(); + xmppMessage.setTo(JID); + xmppMessage.setSubject(subject); + xmppMessage.setBody(message); + xmppMessage.setType(Message.Type.chat); + connection.sendPacket(xmppMessage); + } + + /** + * Checks whether the connection to the XMPP-Server persists. + * + * @return true if the client is connected to the XMPP-Server, else false. + */ + public boolean isConnected() { + return connection.isConnected(); + } + + /** + * Sets the client's time-out-limit whilst waiting for XMPP-replies from server. + * + * @param millis the time in millis to be set as the time-out-limit whilst waiting for a + * XMPP-reply. + */ + public void setReplyTimeoutInterval(int millis) { + this.replyTimeoutInterval = millis; + } + + /** + * Disables default debugger provided by the XMPPConnection. + */ + public void disableDebugger() { + connection.DEBUG_ENABLED = false; + } + + /** + * Closes the connection to the XMPP Server. + */ + public void closeConnection() { + if (connection != null && connection.isConnected()) { + connection.disconnect(); + } + } + + /** + * This is an abstract method used for post processing the received XMPP-message. This + * method will be implemented as per requirement at the time of creating an object of this + * class. + * + * @param xmppMessage the xmpp message received by the listener. + */ + protected abstract void processXMPPMessage(Message xmppMessage); + +} + +