Merge remote-tracking branch 'upstream/build-pack' into build-pack

application-manager-new
charithag 9 years ago
commit 454a403a22

@ -67,9 +67,12 @@
org.w3c.dom, org.w3c.dom,
org.wso2.carbon.device.mgt.common.*, org.wso2.carbon.device.mgt.common.*,
org.wso2.carbon.device.mgt.common, org.wso2.carbon.device.mgt.common,
org.wso2.carbon.policy.mgt.common,
org.wso2.carbon.policy.mgt.common.*,
org.wso2.carbon.context.*, org.wso2.carbon.context.*,
org.wso2.carbon.ndatasource.core, org.wso2.carbon.ndatasource.core,
org.wso2.carbon.device.mgt.iot.common.*, org.wso2.carbon.device.mgt.iot.common.*,
com.google.gson.*
</Import-Package> </Import-Package>
<Export-Package> <Export-Package>
@ -97,6 +100,17 @@
<artifactId>org.wso2.carbon.logging</artifactId> <artifactId>org.wso2.carbon.logging</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>org.wso2.carbon.policy.mgt.core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>org.wso2.carbon.policy.mgt.common</artifactId>
<scope>provided</scope>
</dependency>
<dependency> <dependency>
<groupId>org.wso2.carbon.devicemgt</groupId> <groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>org.wso2.carbon.device.mgt.common</artifactId> <artifactId>org.wso2.carbon.device.mgt.common</artifactId>
@ -111,6 +125,10 @@
<artifactId>org.wso2.carbon.device.mgt.iot.common</artifactId> <artifactId>org.wso2.carbon.device.mgt.iot.common</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
</dependencies> </dependencies>

@ -0,0 +1,106 @@
/*
* Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.iot.sample.firealarm.plugin.impl;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.CarbonContext;
import org.wso2.carbon.device.mgt.common.Device;
import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
import org.wso2.carbon.device.mgt.iot.common.exception.DeviceControllerException;
import org.wso2.carbon.device.mgt.iot.sample.firealarm.plugin.constants.FireAlarmConstants;
import org.wso2.carbon.policy.mgt.common.Policy;
import org.wso2.carbon.policy.mgt.common.monitor.ComplianceData;
import org.wso2.carbon.policy.mgt.common.monitor.ComplianceFeature;
import org.wso2.carbon.policy.mgt.common.monitor.PolicyComplianceException;
import org.wso2.carbon.policy.mgt.common.spi.PolicyMonitoringService;
import org.wso2.carbon.device.mgt.iot.common.transport.mqtt.MqttPublisher;
import java.util.ArrayList;
import java.util.List;
import static java.nio.charset.StandardCharsets.UTF_8;
public class FireAlarmPolicyMonitoringService implements PolicyMonitoringService {
private static Log log = LogFactory.getLog(FireAlarmPolicyMonitoringService.class);
@Override
public void notifyDevices(List<Device> list) throws PolicyComplianceException {
String userName = CarbonContext.getThreadLocalCarbonContext().getUsername();
MqttPublisher mqttPublisher = new MqttPublisher();
try {
mqttPublisher.initControlQueue();
mqttPublisher.publish("Raspberry-Policy-sender",
"/iot/policymgt/govern/" + FireAlarmConstants.DEVICE_TYPE + "/"
+ userName, userName.getBytes(UTF_8));
} catch (DeviceControllerException e) {
log.error("Error on notifying "+FireAlarmConstants.DEVICE_TYPE+" devices, message not sent.");
}
}
@Override
public ComplianceData checkPolicyCompliance(DeviceIdentifier deviceIdentifier, Policy policy,
Object compliancePayload) throws PolicyComplianceException {
if (log.isDebugEnabled()) {
log.debug("Checking policy compliance status of device '" + deviceIdentifier.getId() + "'");
}
ComplianceData complianceData = new ComplianceData();
if (compliancePayload == null || policy == null) {
return complianceData;
}
List<ComplianceFeature> complianceFeatures = new ArrayList<ComplianceFeature>();
// Parsing json string to get compliance features.
JsonElement jsonElement;
if (compliancePayload instanceof String) {
jsonElement = new JsonParser().parse((String) compliancePayload);
} else {
throw new PolicyComplianceException("Invalid policy compliance payload");
}
JsonArray jsonArray = jsonElement.getAsJsonArray();
Gson gson = new Gson();
ComplianceFeature complianceFeature;
for (JsonElement element : jsonArray) {
complianceFeature = gson.fromJson(element, ComplianceFeature.class);
complianceFeatures.add(complianceFeature);
}
complianceData.setComplianceFeatures(complianceFeatures);
for (ComplianceFeature cf : complianceFeatures) {
if (!cf.isCompliant()) {
complianceData.setStatus(false);
break;
}
}
return complianceData;
}
@Override
public String getType() {
return FireAlarmConstants.DEVICE_TYPE;
}
}

@ -96,6 +96,18 @@
<version>${carbon.device.mgt.version}</version> <version>${carbon.device.mgt.version}</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>org.wso2.carbon.policy.mgt.core</artifactId>
<version>${carbon.device.mgt.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>org.wso2.carbon.policy.mgt.common</artifactId>
<version>${carbon.device.mgt.version}</version>
<scope>provided</scope>
</dependency>
<!--IOT dependencies--> <!--IOT dependencies-->
@ -211,6 +223,12 @@
<!--<scope>provided</scope>--> <!--<scope>provided</scope>-->
</dependency> </dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${google.gson.version}</version>
</dependency>
<!--Analytics dependencies--> <!--Analytics dependencies-->
<dependency> <dependency>
@ -381,10 +399,17 @@
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<!--Dependencies on XMPP Client Library-->
<dependency>
<groupId>org.igniterealtime.smack.wso2</groupId>
<artifactId>smack</artifactId>
<version>${smack.wso2.version}</version>
</dependency>
<dependency>
<groupId>org.igniterealtime.smack.wso2</groupId>
<artifactId>smackx</artifactId>
<version>${smackx.wso2.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
@ -417,36 +442,34 @@
<orbit.tomcat.jdbc.pooling.version>7.0.34.wso2v2</orbit.tomcat.jdbc.pooling.version> <orbit.tomcat.jdbc.pooling.version>7.0.34.wso2v2</orbit.tomcat.jdbc.pooling.version>
<eclipse.paho.version>0.4.0</eclipse.paho.version> <eclipse.paho.version>0.4.0</eclipse.paho.version>
<google.gson.version>2.2.4</google.gson.version>
<!-- CXF version --> <cxf.version>2.6.1</cxf.version>
<!-- CXF version -->
<cxf.version>2.6.1</cxf.version>
<jackson.version>1.9.0</jackson.version> <jackson.version>1.9.0</jackson.version>
<javax.ws.rs.version>1.1.1</javax.ws.rs.version> <javax.ws.rs.version>1.1.1</javax.ws.rs.version>
<!-- Device Management --> <!-- Device Management -->
<carbon.device.mgt.version>0.9.2-SNAPSHOT</carbon.device.mgt.version> <carbon.device.mgt.version>0.9.2-SNAPSHOT</carbon.device.mgt.version>
<carbon.device.mgt.version.range>[0.8.0, 2.0.0)</carbon.device.mgt.version.range> <carbon.device.mgt.version.range>[0.8.0, 2.0.0)</carbon.device.mgt.version.range>
<!-- IOT Device Management --> <!-- IOT Device Management -->
<carbon.iot.device.mgt.version>1.0.0-SNAPSHOT</carbon.iot.device.mgt.version> <carbon.iot.device.mgt.version>1.0.0-SNAPSHOT</carbon.iot.device.mgt.version>
<commons-httpclient.orbit.version>3.1.0.wso2v2</commons-httpclient.orbit.version> <commons-httpclient.orbit.version>3.1.0.wso2v2</commons-httpclient.orbit.version>
<commons-json.version>3.0.0.wso2v1</commons-json.version> <commons-json.version>3.0.0.wso2v1</commons-json.version>
<jackson.fasterxml.version>2.6.1</jackson.fasterxml.version> <jackson.fasterxml.version>2.6.1</jackson.fasterxml.version>
<!-- Source code --> <!-- Source code -->
<wso2.maven.compiler.source>1.7</wso2.maven.compiler.source> <wso2.maven.compiler.source>1.7</wso2.maven.compiler.source>
<wso2.maven.compiler.target>1.7</wso2.maven.compiler.target> <wso2.maven.compiler.target>1.7</wso2.maven.compiler.target>
<carbon.analytics.version>1.0.3</carbon.analytics.version> <carbon.analytics.version>1.0.3</carbon.analytics.version>
<!--XMPP/MQTT Version-->
<smack.wso2.version>3.0.4.wso2v1</smack.wso2.version>
<smackx.wso2.version>3.0.4.wso2v1</smackx.wso2.version>
<paho.mqtt.version>0.4.0</paho.mqtt.version>
</properties> </properties>

@ -99,6 +99,13 @@
<scope>system</scope> <scope>system</scope>
<systemPath>${basedir}/../../../../repository/components/plugins/org.wso2.carbon.device.mgt.analytics_0.9.2.SNAPSHOT.jar</systemPath> <systemPath>${basedir}/../../../../repository/components/plugins/org.wso2.carbon.device.mgt.analytics_0.9.2.SNAPSHOT.jar</systemPath>
</dependency> </dependency>
<dependency>
<groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>org.wso2.carbon.policy.mgt.common</artifactId>
<version>${carbon.device.mgt.version}</version>
<scope>system</scope>
<systemPath>${basedir}/../../../../repository/components/plugins/org.wso2.carbon.policy.mgt.common_0.9.2.SNAPSHOT.jar</systemPath>
</dependency>
<!--IOT dependencies--> <!--IOT dependencies-->

@ -98,6 +98,16 @@
<artifactId>org.wso2.carbon.device.mgt.analytics</artifactId> <artifactId>org.wso2.carbon.device.mgt.analytics</artifactId>
</dependency> </dependency>
<!--Dependencies on XMPP Client Library-->
<dependency>
<groupId>org.igniterealtime.smack.wso2</groupId>
<artifactId>smack</artifactId>
</dependency>
<dependency>
<groupId>org.igniterealtime.smack.wso2</groupId>
<artifactId>smackx</artifactId>
</dependency>
</dependencies> </dependencies>

@ -23,6 +23,8 @@ import org.apache.http.client.methods.HttpGet;
import org.apache.http.concurrent.FutureCallback; import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients; import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.jivesoftware.smack.packet.Message;
import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.analytics.exception.DataPublisherConfigurationException; import org.wso2.carbon.device.mgt.analytics.exception.DataPublisherConfigurationException;
import org.wso2.carbon.device.mgt.analytics.service.DeviceAnalyticsService; import org.wso2.carbon.device.mgt.analytics.service.DeviceAnalyticsService;
@ -30,11 +32,14 @@ import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
import org.wso2.carbon.device.mgt.common.DeviceManagementException; import org.wso2.carbon.device.mgt.common.DeviceManagementException;
import org.wso2.carbon.device.mgt.iot.common.DeviceController; import org.wso2.carbon.device.mgt.iot.common.DeviceController;
import org.wso2.carbon.device.mgt.iot.common.DeviceValidator; import org.wso2.carbon.device.mgt.iot.common.DeviceValidator;
import org.wso2.carbon.device.mgt.iot.common.controlqueue.mqtt.MqttConfig;
import org.wso2.carbon.device.mgt.iot.common.controlqueue.xmpp.XmppConfig; import org.wso2.carbon.device.mgt.iot.common.controlqueue.xmpp.XmppConfig;
import org.wso2.carbon.device.mgt.iot.common.exception.DeviceControllerException; import org.wso2.carbon.device.mgt.iot.common.exception.DeviceControllerException;
import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.DeviceJSON; import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.DeviceJSON;
import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.constants import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.constants
.VirtualFireAlarmConstants; .VirtualFireAlarmConstants;
import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.mqtt.MQTTClient;
import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.xmpp.XMPPClient;
import org.wso2.carbon.utils.CarbonUtils; import org.wso2.carbon.utils.CarbonUtils;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
@ -63,8 +68,10 @@ import java.util.concurrent.Future;
public class VirtualFireAlarmControllerService { public class VirtualFireAlarmControllerService {
private static Log log = LogFactory.getLog(VirtualFireAlarmControllerService.class); private static Log log = LogFactory.getLog(VirtualFireAlarmControllerService.class);
//TODO; replace this tenant domain //TODO; replace this tenant domain
private final String SUPER_TENANT = "carbon.super"; private final String SUPER_TENANT = "carbon.super";
@Context //injected response proxy supporting multiple thread @Context //injected response proxy supporting multiple thread
private HttpServletResponse response; private HttpServletResponse response;
private static final String TEMPERATURE_STREAM_DEFINITION = "org.wso2.iot.devices.temperature"; private static final String TEMPERATURE_STREAM_DEFINITION = "org.wso2.iot.devices.temperature";
@ -78,9 +85,53 @@ public class VirtualFireAlarmControllerService {
public static final String HTTP_PROTOCOL = "HTTP"; public static final String HTTP_PROTOCOL = "HTTP";
public static final String MQTT_PROTOCOL = "MQTT"; public static final String MQTT_PROTOCOL = "MQTT";
private static ConcurrentHashMap<String, String> deviceToIpMap = new ConcurrentHashMap<String, String>();
private static XMPPClient xmppClient;
private static MQTTClient mqttClient;
private static final String mqttServerSubscribeTopic = "wso2/iot/+/" + VirtualFireAlarmConstants.DEVICE_TYPE + "/+/reply";
private static final String iotServerSubscriber = "IoT-Server";
static{
String xmppServer = XmppConfig.getInstance().getXmppControlQueue().getServerURL();
int indexOfChar = xmppServer.lastIndexOf('/');
if (indexOfChar != -1) {
xmppServer = xmppServer.substring((indexOfChar + 1), xmppServer.length());
}
int xmppPort = Integer.parseInt(XmppConfig.getInstance().getSERVER_CONNECTION_PORT());
xmppClient = new XMPPClient(xmppServer, xmppPort) {
@Override
protected void processXMPPMessage(Message xmppMessage) {
}
};
String xmppUsername = XmppConfig.getInstance().getXmppUsername();
String xmppPassword = XmppConfig.getInstance().getXmppPassword();
try {
xmppClient.connectAndLogin(xmppUsername, xmppPassword, "iotServer");
} catch (DeviceManagementException e) {
e.printStackTrace();
}
xmppClient.setMessageFilterAndListener("");
String mqttEndpoint = MqttConfig.getInstance().getMqttQueueEndpoint();
mqttClient = new MQTTClient(iotServerSubscriber, VirtualFireAlarmConstants.DEVICE_TYPE, mqttEndpoint, mqttServerSubscribeTopic) {
@Override
protected void postMessageArrived(String topic, MqttMessage message) {
}
};
try {
mqttClient.connectAndSubscribe();
} catch (DeviceManagementException e) {
e.printStackTrace();
}
}
private static ConcurrentHashMap<String, String> deviceToIpMap =
new ConcurrentHashMap<String, String>();
@Path("/register/{owner}/{deviceId}/{ip}") @Path("/register/{owner}/{deviceId}/{ip}")
@POST @POST

@ -306,15 +306,6 @@ public class VirtualFireAlarmManagerService {
newXmppAccount.setAccountName(owner + "_" + deviceId); newXmppAccount.setAccountName(owner + "_" + deviceId);
newXmppAccount.setUsername(deviceId); newXmppAccount.setUsername(deviceId);
newXmppAccount.setPassword(accessToken); newXmppAccount.setPassword(accessToken);
String xmppEndPoint = XmppConfig.getInstance().getXmppControlQueue().getServerURL();
int indexOfChar = xmppEndPoint.lastIndexOf('/');
if (indexOfChar != -1) {
xmppEndPoint = xmppEndPoint.substring((indexOfChar + 1), xmppEndPoint.length());
}
newXmppAccount.setEmail(deviceId + "@wso2.com"); newXmppAccount.setEmail(deviceId + "@wso2.com");
XmppServerClient xmppServerClient = new XmppServerClient(); XmppServerClient xmppServerClient = new XmppServerClient();

@ -0,0 +1,280 @@
/*
* Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.mqtt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* This class contains the Agent specific implementation for all the MQTT functionality. This
* includes connecting to a MQTT Broker & subscribing to the appropriate MQTT-topic, action plan
* upon losing connection or successfully delivering a message to the broker and processing
* incoming messages. Makes use of the 'Paho-MQTT' library provided by Eclipse Org.
* <p/>
* It is an abstract class with an abstract method 'postMessageArrived' allowing the user to have
* their own implementation of the actions to be taken upon receiving a message to the subscribed
* MQTT-Topic.
*/
public abstract class MQTTClient implements MqttCallback {
private static final Log log = LogFactory.getLog(MQTTClient.class);
private MqttClient client;
private String clientId;
private MqttConnectOptions options;
private String subscribeTopic;
private String clientWillTopic;
private String mqttBrokerEndPoint;
private int reConnectionInterval;
/**
* Constructor for the MQTTClient which takes in the owner, type of the device and the MQTT
* Broker URL and the topic to subscribe.
*
* @param deviceOwner the owner of the device.
* @param deviceType the CDMF Device-Type of the device.
* @param mqttBrokerEndPoint the IP/URL of the MQTT broker endpoint.
* @param subscribeTopic the MQTT topic to which the client is to be subscribed
*/
protected MQTTClient(String deviceOwner, String deviceType, String mqttBrokerEndPoint,
String subscribeTopic) {
this.clientId = deviceOwner + ":" + deviceType;
this.subscribeTopic = subscribeTopic;
this.clientWillTopic = deviceType + File.separator + "disconnection";
this.mqttBrokerEndPoint = mqttBrokerEndPoint;
this.reConnectionInterval = 5000;
this.initSubscriber();
}
/**
* Constructor for the MQTTClient which takes in the owner, type of the device and the MQTT
* Broker URL and the topic to subscribe. Additionally this constructor takes in the
* reconnection-time interval between successive attempts to connect to the broker.
*
* @param deviceOwner the owner of the device.
* @param deviceType the CDMF Device-Type of the device.
* @param mqttBrokerEndPoint the IP/URL of the MQTT broker endpoint.
* @param subscribeTopic the MQTT topic to which the client is to be subscribed
* @param reConnectionInterval time interval in SECONDS between successive attempts to connect
* to the broker.
*/
protected MQTTClient(String deviceOwner, String deviceType, String mqttBrokerEndPoint,
String subscribeTopic, int reConnectionInterval) {
this.clientId = deviceOwner + ":" + deviceType;
this.subscribeTopic = subscribeTopic;
this.clientWillTopic = deviceType + File.separator + "disconnection";
this.mqttBrokerEndPoint = mqttBrokerEndPoint;
this.reConnectionInterval = reConnectionInterval;
this.initSubscriber();
}
/**
* Initializes the MQTT-Client.
* Creates a client using the given MQTT-broker endpoint and the clientId (which is
* constructed by a concatenation of [deviceOwner]:[deviceType]). Also sets the client's
* options parameter with the clientWillTopic (in-case of connection failure) and other info.
* Also sets the call-back this current class.
*/
private void initSubscriber() {
try {
client = new MqttClient(this.mqttBrokerEndPoint, clientId, null);
log.info("MQTT subscriber was created with ClientID : " + clientId);
} catch (MqttException ex) {
String errorMsg = "MQTT Client Error\n" + "\tReason: " + ex.getReasonCode() +
"\n\tMessage: " + ex.getMessage() + "\n\tLocalMsg: " +
ex.getLocalizedMessage() + "\n\tCause: " + ex.getCause() +
"\n\tException: " + ex;
log.error(errorMsg);
}
options = new MqttConnectOptions();
options.setCleanSession(false);
options.setWill(clientWillTopic, "connection crashed".getBytes(StandardCharsets.UTF_8), 2,
true);
client.setCallback(this);
}
/**
* Checks whether the connection to the MQTT-Broker persists.
*
* @return true if the client is connected to the MQTT-Broker, else false.
*/
public boolean isConnected() {
return client.isConnected();
}
/**
* Connects to the MQTT-Broker and if successfully established connection, then tries to
* subscribe to the MQTT-Topic specific to the device. (The MQTT-Topic specific to the
* device is created is taken in as a constructor parameter of this class) .
*
* @throws DeviceManagementException in the event of 'Connecting to' or 'Subscribing to' the
* MQTT broker fails.
*/
public void connectAndSubscribe() throws DeviceManagementException {
try {
client.connect(options);
if (log.isDebugEnabled()) {
log.debug("Subscriber connected to queue at: " + this.mqttBrokerEndPoint);
}
} catch (MqttSecurityException ex) {
String errorMsg = "MQTT Security Exception when connecting to queue\n" + "\tReason: " +
" " +
ex.getReasonCode() + "\n\tMessage: " + ex.getMessage() +
"\n\tLocalMsg: " + ex.getLocalizedMessage() + "\n\tCause: " +
ex.getCause() + "\n\tException: " + ex; //throw
if (log.isDebugEnabled()) {
log.debug(errorMsg);
}
throw new DeviceManagementException(errorMsg, ex);
} catch (MqttException ex) {
String errorMsg = "MQTT Exception when connecting to queue\n" + "\tReason: " +
ex.getReasonCode() + "\n\tMessage: " + ex.getMessage() +
"\n\tLocalMsg: " + ex.getLocalizedMessage() + "\n\tCause: " +
ex.getCause() + "\n\tException: " + ex; //throw
if (log.isDebugEnabled()) {
log.debug(errorMsg);
}
throw new DeviceManagementException(errorMsg, ex);
}
try {
client.subscribe(subscribeTopic, 0);
log.info("Subscriber - " + clientId + " subscribed to topic: " + subscribeTopic);
} catch (MqttException ex) {
String errorMsg = "MQTT Exception when trying to subscribe to topic: " +
subscribeTopic + "\n\tReason: " + ex.getReasonCode() +
"\n\tMessage: " + ex.getMessage() + "\n\tLocalMsg: " +
ex.getLocalizedMessage() + "\n\tCause: " + ex.getCause() +
"\n\tException: " + ex;
if (log.isDebugEnabled()) {
log.debug(errorMsg);
}
throw new DeviceManagementException(errorMsg, ex);
}
}
/**
* Callback method which is triggered once the MQTT client losers its connection to the broker.
* A scheduler thread is spawned to continuously re-attempt and connect to the broker and
* subscribe to the device's topic. This thread is scheduled to execute after every break
* equal to that of the 'reConnectionInterval' of the MQTTClient.
*
* @param throwable a Throwable Object containing the details as to why the failure occurred.
*/
public void connectionLost(Throwable throwable) {
log.warn("Lost Connection for client: " + this.clientId + " to " + this.mqttBrokerEndPoint + ".\nThis was due to - " + throwable.getMessage());
Runnable reSubscriber = new Runnable() {
@Override
public void run() {
if (!isConnected()) {
if (log.isDebugEnabled()) {
log.debug("Subscriber reconnecting to queue........");
}
try {
connectAndSubscribe();
} catch (DeviceManagementException e) {
if (log.isDebugEnabled()) {
log.debug("Could not reconnect and subscribe to ControlQueue.");
}
}
} else {
return;
}
}
};
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
service.scheduleAtFixedRate(reSubscriber, 0, this.reConnectionInterval, TimeUnit.SECONDS);
}
/**
* Callback method which is triggered upon receiving a MQTT Message from the broker. Spawns a
* new thread that executes any actions to be taken with the received message.
*
* @param topic the MQTT-Topic to which the received message was published to and the
* client was subscribed to.
* @param mqttMessage the actual MQTT-Message that was received from the broker.
*/
public void messageArrived(final String topic, final MqttMessage mqttMessage) {
Thread subscriberThread = new Thread() {
public void run() {
postMessageArrived(topic, mqttMessage);
}
};
subscriberThread.start();
}
/**
* Callback method which gets triggered upon successful completion of a message delivery to
* the broker.
*
* @param iMqttDeliveryToken the MQTT-DeliveryToken which includes the details about the
* specific message delivery.
*/
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
String message = "";
try {
message = iMqttDeliveryToken.getMessage().toString();
} catch (MqttException e) {
log.error("Error occurred whilst trying to read the message from the MQTT delivery token.");
}
String topic = iMqttDeliveryToken.getTopics()[0];
String client = iMqttDeliveryToken.getClient().getClientId();
log.info("Message - '" + message + "' of client [" + client + "] for the topic (" + topic + ") was delivered successfully.");
}
/**
* This is an abstract method used for post processing the received MQTT-message. This
* method will be implemented as per requirement at the time of creating an object of this
* class.
*
* @param topic The topic for which the message was received for.
* @param message The message received for the subscription to the above topic.
*/
protected abstract void postMessageArrived(String topic, MqttMessage message);
/**
* Gets the MQTTClient object.
*
* @return the MQTTClient object.
*/
public MqttClient getClient() {
return client;
}
}

@ -0,0 +1,241 @@
/*
* Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.xmpp;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jivesoftware.smack.ConnectionConfiguration;
import org.jivesoftware.smack.PacketListener;
import org.jivesoftware.smack.SmackConfiguration;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.filter.AndFilter;
import org.jivesoftware.smack.filter.FromContainsFilter;
import org.jivesoftware.smack.filter.PacketFilter;
import org.jivesoftware.smack.filter.PacketTypeFilter;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.packet.Packet;
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
/**
* This class contains the Agent specific implementation for all the XMPP functionality. This
* includes connecting to a XMPP Server & Login using the device's XMPP-Account, Setting
* listeners and filters on incoming XMPP messages and Sending XMPP replies for control signals
* received. Makes use of the 'Smack-XMPP' library provided by jivesoftware/igniterealtime.
* <p/>
* It is an abstract class with an abstract method 'processXMPPMessage' allowing the user to have
* their own implementation of the actions to be taken upon receiving an appropriate XMPP message.
*/
public abstract class XMPPClient {
private static final Log log = LogFactory.getLog(XMPPClient.class);
private int replyTimeoutInterval = 500; // millis
private String server;
private int port;
private ConnectionConfiguration config;
private XMPPConnection connection;
private PacketFilter filter;
private PacketListener listener;
/**
* Constructor for XMPPClient passing server-IP and the XMPP-port.
*
* @param server the IP of the XMPP server.
* @param port the XMPP server's port to connect to. (default - 5222)
*/
public XMPPClient(String server, int port) {
this.server = server;
this.port = port;
initXMPPClient();
}
/**
* Initializes the XMPP Client.
* Sets the time-out-limit whilst waiting for XMPP-replies from server. Creates the XMPP
* configurations to connect to the server and creates the XMPPConnection object used for
* connecting and Logging-In.
*/
private void initXMPPClient() {
log.info(String.format(
"Initializing connection to XMPP Server at %1$s via port %2$d......", server,
port));
SmackConfiguration.setPacketReplyTimeout(replyTimeoutInterval);
config = new ConnectionConfiguration(server, port);
// TODO:: Need to enable SASL-Authentication appropriately
config.setSASLAuthenticationEnabled(false);
config.setSecurityMode(ConnectionConfiguration.SecurityMode.disabled);
connection = new XMPPConnection(config);
}
/**
* Connects to the XMPP-Server and if successfully established connection, then tries to Log
* in using the device's XMPP Account credentials. (The XMPP-Account specific to the device is
* created in the XMPP server whilst downloading the Agent from the IoT Server) .
*
* @param username the username of the device's XMPP-Account.
* @param password the password of the device's XMPP-Account.
* @param resource the resource the resource, specific to the XMPP-Account to which the login
* is made to
* @throws DeviceManagementException in the event of 'Connecting to' or 'Logging into' the
* XMPP server fails.
*/
public void connectAndLogin(String username, String password, String resource)
throws DeviceManagementException {
try {
connection.connect();
log.info(String.format(
"Connection to XMPP Server at %1$s established successfully......", server));
} catch (XMPPException xmppExcepion) {
String errorMsg =
"Connection attempt to the XMPP Server at " + server + " via port " + port +
" failed.";
log.info(errorMsg);
throw new DeviceManagementException(errorMsg, xmppExcepion);
}
if (connection.isConnected()) {
try {
if (resource == null) {
connection.login(username, password);
log.info(String.format("Logged into XMPP Server at %1$s as user %2$s......", server, username));
} else {
connection.login(username, password, resource);
log.info(String.format(
"Logged into XMPP Server at %1$s as user %2$s on resource %3$s......",
server, username, resource));
}
} catch (XMPPException xmppException) {
String errorMsg =
"Login attempt to the XMPP Server at " + server + " with username - " +
username + " failed.";
log.info(errorMsg);
throw new DeviceManagementException(errorMsg, xmppException);
}
}
}
/**
* Sets a filter on all the incoming XMPP-Messages for the JID (XMPP-Account ID) passed in.
* Also creates a listener for the incoming messages and connects the listener to the
* XMPPConnection alongside the set filter.
*
* @param senderJID the JID (XMPP-Account ID) to which the filter is to be set.
*/
public void setMessageFilterAndListener(String senderJID) {
filter = new AndFilter(new PacketTypeFilter(Message.class), new
FromContainsFilter(
senderJID));
listener = new PacketListener() {
@Override
public void processPacket(Packet packet) {
if (packet instanceof Message) {
final Message xmppMessage = (Message) packet;
Thread msgProcessThread = new Thread() {
public void run() {
processXMPPMessage(xmppMessage);
}
};
msgProcessThread.start();
}
}
};
connection.addPacketListener(listener, filter);
}
/**
* Sends an XMPP message
*
* @param JID the JID (XMPP Account ID) to which the message is to be sent to.
* @param message the XMPP-Message that is to be sent.
*/
public void sendXMPPMessage(String JID, String message) {
sendXMPPMessage(JID, message, "Reply-From-Device");
if (log.isDebugEnabled()) {
log.debug("Message: " + message + " to XMPP JID [" + JID + "] sent successfully");
}
}
/**
* Overloaded method to send an XMPP message. Includes the subject to be mentioned in the
* message that is sent.
*
* @param JID the JID (XMPP Account ID) to which the message is to be sent to.
* @param message the XMPP-Message that is to be sent.
* @param subject the subject that the XMPP-Message would carry.
*/
public void sendXMPPMessage(String JID, String message, String subject) {
Message xmppMessage = new Message();
xmppMessage.setTo(JID);
xmppMessage.setSubject(subject);
xmppMessage.setBody(message);
xmppMessage.setType(Message.Type.chat);
connection.sendPacket(xmppMessage);
}
/**
* Checks whether the connection to the XMPP-Server persists.
*
* @return true if the client is connected to the XMPP-Server, else false.
*/
public boolean isConnected() {
return connection.isConnected();
}
/**
* Sets the client's time-out-limit whilst waiting for XMPP-replies from server.
*
* @param millis the time in millis to be set as the time-out-limit whilst waiting for a
* XMPP-reply.
*/
public void setReplyTimeoutInterval(int millis) {
this.replyTimeoutInterval = millis;
}
/**
* Disables default debugger provided by the XMPPConnection.
*/
public void disableDebugger() {
connection.DEBUG_ENABLED = false;
}
/**
* Closes the connection to the XMPP Server.
*/
public void closeConnection() {
if (connection != null && connection.isConnected()) {
connection.disconnect();
}
}
/**
* This is an abstract method used for post processing the received XMPP-message. This
* method will be implemented as per requirement at the time of creating an object of this
* class.
*
* @param xmppMessage the xmpp message received by the listener.
*/
protected abstract void processXMPPMessage(Message xmppMessage);
}

@ -412,6 +412,16 @@
<artifactId>org.wso2.carbon.device.mgt.analytics</artifactId> <artifactId>org.wso2.carbon.device.mgt.analytics</artifactId>
<version>${carbon.device.mgt.version}</version> <version>${carbon.device.mgt.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>org.wso2.carbon.policy.mgt.core</artifactId>
<version>${carbon.device.mgt.version}</version>
</dependency>
<dependency>
<groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>org.wso2.carbon.policy.mgt.common</artifactId>
<version>${carbon.device.mgt.version}</version>
</dependency>
<!--IOT dependencies--> <!--IOT dependencies-->

Loading…
Cancel
Save