From c13e5ac62005ad852e7be077acef766463ff333d Mon Sep 17 00:00:00 2001 From: Shabir Mohamed Date: Tue, 13 Oct 2015 13:28:26 +0530 Subject: [PATCH] Added MQTT-Subscriber and XMPP-Connector for the virtual_firealarm deviceType --- .../impl/ArduinoControllerService.java | 2 +- .../impl/util/MqttArduinoSubscriber.java | 5 +- modules/samples/pom.xml | 3 +- .../constants/VirtualFireAlarmConstants.java | 5 + .../plugin/impl/VirtualFireAlarmManager.java | 1 - ...alFirealarmManagementServiceComponent.java | 119 ++++---- .../pom.xml | 10 - .../VirtualFireAlarmControllerService.java | 207 ++++++------- .../util/VirtualFireAlarmMQTTSubscriber.java | 81 +++++ .../util/VirtualFireAlarmXMPPConnector.java | 88 ++++++ .../service/impl/util/mqtt/MQTTClient.java | 280 ------------------ .../service/impl/util/xmpp/XMPPClient.java | 241 --------------- .../src/main/webapp/WEB-INF/cxf-servlet.xml | 11 +- pom.xml | 17 ++ 14 files changed, 358 insertions(+), 712 deletions(-) create mode 100644 modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/VirtualFireAlarmMQTTSubscriber.java create mode 100644 modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/VirtualFireAlarmXMPPConnector.java delete mode 100644 modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/mqtt/MQTTClient.java delete mode 100644 modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/xmpp/XMPPClient.java diff --git a/modules/samples/arduino/src/org.wso2.carbon.device.mgt.iot.sample.arduino.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/arduino/service/impl/ArduinoControllerService.java b/modules/samples/arduino/src/org.wso2.carbon.device.mgt.iot.sample.arduino.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/arduino/service/impl/ArduinoControllerService.java index f0545137..e64e9c30 100644 --- a/modules/samples/arduino/src/org.wso2.carbon.device.mgt.iot.sample.arduino.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/arduino/service/impl/ArduinoControllerService.java +++ b/modules/samples/arduino/src/org.wso2.carbon.device.mgt.iot.sample.arduino.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/arduino/service/impl/ArduinoControllerService.java @@ -48,7 +48,7 @@ public class ArduinoControllerService { public void setMqttArduinoSubscriber(MqttArduinoSubscriber mqttArduinoSubscriber) { ArduinoControllerService.mqttArduinoSubscriber = mqttArduinoSubscriber; try { - mqttArduinoSubscriber.subscribe(); + mqttArduinoSubscriber.connectAndSubscribe(); } catch (DeviceManagementException e) { log.error(e.getErrorMessage()); } diff --git a/modules/samples/arduino/src/org.wso2.carbon.device.mgt.iot.sample.arduino.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/arduino/service/impl/util/MqttArduinoSubscriber.java b/modules/samples/arduino/src/org.wso2.carbon.device.mgt.iot.sample.arduino.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/arduino/service/impl/util/MqttArduinoSubscriber.java index f465c7b0..0c854153 100644 --- a/modules/samples/arduino/src/org.wso2.carbon.device.mgt.iot.sample.arduino.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/arduino/service/impl/util/MqttArduinoSubscriber.java +++ b/modules/samples/arduino/src/org.wso2.carbon.device.mgt.iot.sample.arduino.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/arduino/service/impl/util/MqttArduinoSubscriber.java @@ -26,10 +26,12 @@ import org.wso2.carbon.device.mgt.iot.common.controlqueue.mqtt.MqttSubscriber; import java.io.File; import java.util.LinkedList; +import java.util.UUID; public class MqttArduinoSubscriber extends MqttSubscriber { private static Log log = LogFactory.getLog(MqttArduinoSubscriber.class); + private static final String iotServerSubscriber = UUID.randomUUID().toString().substring(0,5); private static final String subscribetopic = "wso2" + File.separator + "iot" + File.separator + "+" + File.separator + ArduinoConstants.DEVICE_TYPE + File.separator + "#"; @@ -37,8 +39,7 @@ public class MqttArduinoSubscriber extends MqttSubscriber { //make it singleton private MqttArduinoSubscriber() { - - super("Subscriber", ArduinoConstants.DEVICE_TYPE, MqttConfig.getInstance().getMqttQueueEndpoint(), + super(iotServerSubscriber, ArduinoConstants.DEVICE_TYPE, MqttConfig.getInstance().getMqttQueueEndpoint(), subscribetopic); } diff --git a/modules/samples/pom.xml b/modules/samples/pom.xml index 2fcdc9ae..17040df8 100644 --- a/modules/samples/pom.xml +++ b/modules/samples/pom.xml @@ -201,7 +201,7 @@ org.eclipse.paho mqtt-client - ${eclipse.paho.version} + ${paho.mqtt.version} provided @@ -441,7 +441,6 @@ 7.0.52.wso2v5 7.0.34.wso2v2 - 0.4.0 2.2.4 diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/plugin/constants/VirtualFireAlarmConstants.java b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/plugin/constants/VirtualFireAlarmConstants.java index b7f0c43e..e780ee83 100644 --- a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/plugin/constants/VirtualFireAlarmConstants.java +++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/plugin/constants/VirtualFireAlarmConstants.java @@ -22,4 +22,9 @@ public class VirtualFireAlarmConstants { public final static String DEVICE_PLUGIN_DEVICE_ID = "VIRTUAL_FIREALARM_DEVICE_ID"; public final static String STATE_ON = "ON"; public final static String STATE_OFF = "OFF"; + + public static final String URL_PREFIX = "http://"; + public static final String BULB_CONTEXT = "/BULB/"; + public static final String SONAR_CONTEXT = "/HUMIDITY/"; + public static final String TEMPERATURE_CONTEXT = "/TEMPERATURE/"; } diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/plugin/impl/VirtualFireAlarmManager.java b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/plugin/impl/VirtualFireAlarmManager.java index 4defa1b7..eefd20ae 100644 --- a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/plugin/impl/VirtualFireAlarmManager.java +++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/plugin/impl/VirtualFireAlarmManager.java @@ -49,7 +49,6 @@ public class VirtualFireAlarmManager implements DeviceManager { private static final Log log = LogFactory.getLog(VirtualFireAlarmManager.class); - @Override public FeatureManager getFeatureManager() { return null; diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/plugin/internal/VirtualFirealarmManagementServiceComponent.java b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/plugin/internal/VirtualFirealarmManagementServiceComponent.java index 858f9df9..ddc17eb6 100644 --- a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/plugin/internal/VirtualFirealarmManagementServiceComponent.java +++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/plugin/internal/VirtualFirealarmManagementServiceComponent.java @@ -25,11 +25,13 @@ import org.osgi.framework.ServiceRegistration; import org.osgi.service.component.ComponentContext; import org.wso2.carbon.device.mgt.common.spi.DeviceManagementService; import org.wso2.carbon.device.mgt.iot.common.service.DeviceTypeService; -import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl.VirtualFireAlarmManagerService; +import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl + .VirtualFireAlarmManagerService; /** - * @scr.component name="org.wso2.carbon.device.mgt.iot.firealarm.internal.VirtualFirealarmManagementServiceComponent" + * @scr.component name="org.wso2.carbon.device.mgt.iot.firealarm.internal + * .VirtualFirealarmManagementServiceComponent" * immediate="true" * @scr.reference name="wso2.carbon.device.mgt.iot.common.DeviceTypeService" * interface="org.wso2.carbon.device.mgt.iot.common.service.DeviceTypeService" @@ -39,66 +41,69 @@ import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl.Virtu * unbind="unsetDeviceTypeService" */ public class VirtualFirealarmManagementServiceComponent { - - private ServiceRegistration firealarmServiceRegRef; - - - private static final Log log = LogFactory.getLog(VirtualFirealarmManagementServiceComponent.class); - protected void activate(ComponentContext ctx) { - if (log.isDebugEnabled()) { - log.debug("Activating Virtual Firealarm Device Management Service Component"); - } - try { - BundleContext bundleContext = ctx.getBundleContext(); - - - firealarmServiceRegRef = - bundleContext.registerService(DeviceManagementService.class.getName(), - new VirtualFireAlarmManagerService(), - null); - - - - if (log.isDebugEnabled()) { - log.debug("Virtual Firealarm Device Management Service Component has been successfully activated"); - } - } catch (Throwable e) { - log.error("Error occurred while activating Virtual Firealarm Device Management Service Component", e); - } - } - - protected void deactivate(ComponentContext ctx) { - if (log.isDebugEnabled()) { - log.debug("De-activating Virtual Firealarm Device Management Service Component"); - } - try { - if (firealarmServiceRegRef != null) { - firealarmServiceRegRef.unregister(); - } - - if (log.isDebugEnabled()) { - log.debug( - "Virtual Firealarm Device Management Service Component has been successfully de-activated"); - } - } catch (Throwable e) { - log.error("Error occurred while de-activating Virtual Firealarm Device Management bundle", e); - } - } - - protected void setDeviceTypeService(DeviceTypeService deviceTypeService) { + private ServiceRegistration firealarmServiceRegRef; + + private static final Log log = LogFactory.getLog( + VirtualFirealarmManagementServiceComponent.class); + + protected void activate(ComponentContext ctx) { + if (log.isDebugEnabled()) { + log.debug("Activating Virtual Firealarm Device Management Service Component"); + } + try { + BundleContext bundleContext = ctx.getBundleContext(); + firealarmServiceRegRef = + bundleContext.registerService(DeviceManagementService.class.getName(), + new VirtualFireAlarmManagerService(), null); + + if (log.isDebugEnabled()) { + log.debug( + "Virtual Firealarm Device Management Service Component has been " + + "successfully activated"); + } + } catch (Throwable e) { + log.error( + "Error occurred while activating Virtual Firealarm Device Management Service " + + "Component", + e); + } + } + + protected void deactivate(ComponentContext ctx) { + if (log.isDebugEnabled()) { + log.debug("De-activating Virtual Firealarm Device Management Service Component"); + } + try { + if (firealarmServiceRegRef != null) { + firealarmServiceRegRef.unregister(); + } + + if (log.isDebugEnabled()) { + log.debug( + "Virtual Firealarm Device Management Service Component has been " + + "successfully de-activated"); + } + } catch (Throwable e) { + log.error( + "Error occurred while de-activating Virtual Firealarm Device Management " + + "bundle", + e); + } + } + + protected void setDeviceTypeService(DeviceTypeService deviceTypeService) { /* This is to avoid this component getting initialized before the common registered */ - if (log.isDebugEnabled()) { - log.debug("Data source service set to mobile service component"); - } - } + if (log.isDebugEnabled()) { + log.debug("Data source service set to mobile service component"); + } + } - protected void unsetDeviceTypeService(DeviceTypeService deviceTypeService) { - //do nothing - } + protected void unsetDeviceTypeService(DeviceTypeService deviceTypeService) { + //do nothing + } - } diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/pom.xml b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/pom.xml index d5ded8e9..3db470bd 100644 --- a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/pom.xml +++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/pom.xml @@ -98,16 +98,6 @@ org.wso2.carbon.device.mgt.analytics - - - org.igniterealtime.smack.wso2 - smack - - - org.igniterealtime.smack.wso2 - smackx - - diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmControllerService.java b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmControllerService.java index 575eb120..53c1eff2 100644 --- a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmControllerService.java +++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmControllerService.java @@ -23,8 +23,6 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.concurrent.FutureCallback; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.impl.nio.client.HttpAsyncClients; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.jivesoftware.smack.packet.Message; import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.device.mgt.analytics.exception.DataPublisherConfigurationException; import org.wso2.carbon.device.mgt.analytics.service.DeviceAnalyticsService; @@ -32,14 +30,15 @@ import org.wso2.carbon.device.mgt.common.DeviceIdentifier; import org.wso2.carbon.device.mgt.common.DeviceManagementException; import org.wso2.carbon.device.mgt.iot.common.DeviceController; import org.wso2.carbon.device.mgt.iot.common.DeviceValidator; -import org.wso2.carbon.device.mgt.iot.common.controlqueue.mqtt.MqttConfig; import org.wso2.carbon.device.mgt.iot.common.controlqueue.xmpp.XmppConfig; import org.wso2.carbon.device.mgt.iot.common.exception.DeviceControllerException; -import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.DeviceJSON; import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.constants .VirtualFireAlarmConstants; -import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.mqtt.MQTTClient; -import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.xmpp.XMPPClient; +import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.DeviceJSON; +import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.VirtualFireAlarmMqttSubscriber; + + +import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.VirtualFireAlarmXmppConnector; import org.wso2.carbon.utils.CarbonUtils; import javax.servlet.http.HttpServletResponse; @@ -75,74 +74,49 @@ public class VirtualFireAlarmControllerService { @Context //injected response proxy supporting multiple thread private HttpServletResponse response; private static final String TEMPERATURE_STREAM_DEFINITION = "org.wso2.iot.devices.temperature"; - - private static final String URL_PREFIX = "http://"; - private static final String BULB_CONTEXT = "/BULB/"; - private static final String SONAR_CONTEXT = "/SONAR/"; - private static final String TEMPERATURE_CONTEXT = "/TEMPERATURE/"; - public static final String XMPP_PROTOCOL = "XMPP"; public static final String HTTP_PROTOCOL = "HTTP"; public static final String MQTT_PROTOCOL = "MQTT"; - private static ConcurrentHashMap deviceToIpMap = new ConcurrentHashMap(); - private static XMPPClient xmppClient; - private static MQTTClient mqttClient; - private static final String mqttServerSubscribeTopic = "wso2/iot/+/" + VirtualFireAlarmConstants.DEVICE_TYPE + "/+/reply"; - private static final String iotServerSubscriber = "IoT-Server"; - - static{ - String xmppServer = XmppConfig.getInstance().getXmppControlQueue().getServerURL(); - int indexOfChar = xmppServer.lastIndexOf('/'); - if (indexOfChar != -1) { - xmppServer = xmppServer.substring((indexOfChar + 1), xmppServer.length()); - } - - int xmppPort = Integer.parseInt(XmppConfig.getInstance().getSERVER_CONNECTION_PORT()); - xmppClient = new XMPPClient(xmppServer, xmppPort) { - @Override - protected void processXMPPMessage(Message xmppMessage) { - - } - }; - - String xmppUsername = XmppConfig.getInstance().getXmppUsername(); - String xmppPassword = XmppConfig.getInstance().getXmppPassword(); - - try { - xmppClient.connectAndLogin(xmppUsername, xmppPassword, "iotServer"); - } catch (DeviceManagementException e) { - e.printStackTrace(); - } - - xmppClient.setMessageFilterAndListener(""); - - String mqttEndpoint = MqttConfig.getInstance().getMqttQueueEndpoint(); - mqttClient = new MQTTClient(iotServerSubscriber, VirtualFireAlarmConstants.DEVICE_TYPE, mqttEndpoint, mqttServerSubscribeTopic) { - @Override - protected void postMessageArrived(String topic, MqttMessage message) { + private static VirtualFireAlarmMqttSubscriber virtualFireAlarmMqttSubscriber; + private static VirtualFireAlarmXmppConnector virtualFireAlarmXmppConnector; + private static ConcurrentHashMap deviceToIpMap = + new ConcurrentHashMap(); + + public void setVirtualFireAlarmXmppConnector( + VirtualFireAlarmXmppConnector virtualFireAlarmXmppConnector) { + VirtualFireAlarmControllerService.virtualFireAlarmXmppConnector = + virtualFireAlarmXmppConnector; + virtualFireAlarmXmppConnector.initConnector(); + virtualFireAlarmXmppConnector.connectAndLogin(); + } - } - }; + public void setVirtualFireAlarmMqttSubscriber( + VirtualFireAlarmMqttSubscriber virtualFireAlarmMqttSubscriber) { + VirtualFireAlarmControllerService.virtualFireAlarmMqttSubscriber = + virtualFireAlarmMqttSubscriber; + virtualFireAlarmMqttSubscriber.initConnector(); + virtualFireAlarmMqttSubscriber.connectAndSubscribe(); + } - try { - mqttClient.connectAndSubscribe(); - } catch (DeviceManagementException e) { - e.printStackTrace(); - } + public VirtualFireAlarmXmppConnector getVirtualFireAlarmXmppConnector() { + return virtualFireAlarmXmppConnector; } + public VirtualFireAlarmMqttSubscriber getVirtualFireAlarmMqttSubscriber() { + return virtualFireAlarmMqttSubscriber; + } @Path("/register/{owner}/{deviceId}/{ip}") @POST public String registerDeviceIP(@PathParam("owner") String owner, - @PathParam("deviceId") String deviceId, - @PathParam("ip") String deviceIP, - @Context HttpServletResponse response) { + @PathParam("deviceId") String deviceId, + @PathParam("ip") String deviceIP, + @Context HttpServletResponse response) { String result; log.info("Got register call from IP: " + deviceIP + " for Device ID: " + deviceId + - " of owner: " + owner); + " of owner: " + owner); deviceToIpMap.put(deviceId, deviceIP); @@ -162,10 +136,10 @@ public class VirtualFireAlarmControllerService { @Path("/bulb/{state}") @POST public void switchBulb(@HeaderParam("owner") String owner, - @HeaderParam("deviceId") String deviceId, - @HeaderParam("protocol") String protocol, - @PathParam("state") String state, - @Context HttpServletResponse response) { + @HeaderParam("deviceId") String deviceId, + @HeaderParam("protocol") String protocol, + @PathParam("state") String state, + @Context HttpServletResponse response) { try { DeviceValidator deviceValidator = new DeviceValidator(); @@ -196,27 +170,28 @@ public class VirtualFireAlarmControllerService { } String protocolString = protocol.toUpperCase(); - String callUrlPattern = BULB_CONTEXT + switchToState; + String callUrlPattern = VirtualFireAlarmConstants.BULB_CONTEXT + switchToState; - log.info("Sending command: '" + callUrlPattern + "' to virtual-firealarm at: " + deviceIP + " " + - "via" + " " + protocolString); + log.info("Sending command: '" + callUrlPattern + "' to virtual-firealarm at: " + deviceIP + + " " + + "via" + " " + protocolString); try { switch (protocolString) { case HTTP_PROTOCOL: - sendCommandViaHTTP(deviceIP, 80, callUrlPattern, true); + sendCommandViaHTTP(deviceIP, 9090, callUrlPattern, true); break; case MQTT_PROTOCOL: - sendCommandViaMQTT(owner, deviceId, BULB_CONTEXT.replace("/", ""), - switchToState); + sendCommandViaMQTT(owner, deviceId, VirtualFireAlarmConstants.BULB_CONTEXT.replace("/", ""), + switchToState); break; case XMPP_PROTOCOL: // requestBulbChangeViaXMPP(switchToState, response); - sendCommandViaXMPP(owner, deviceId, BULB_CONTEXT, switchToState); + sendCommandViaXMPP(owner, deviceId, VirtualFireAlarmConstants.BULB_CONTEXT, switchToState); break; default: if (protocolString == null) { - sendCommandViaHTTP(deviceIP, 80, callUrlPattern, true); + sendCommandViaHTTP(deviceIP, 9090, callUrlPattern, true); } else { response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode()); return; @@ -225,7 +200,7 @@ public class VirtualFireAlarmControllerService { } } catch (DeviceManagementException e) { log.error("Failed to send command '" + callUrlPattern + "' to: " + deviceIP + " via" + - " " + protocol); + " " + protocol); response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); return; } @@ -237,16 +212,16 @@ public class VirtualFireAlarmControllerService { @Path("/readsonar") @GET public String requestSonarReading(@HeaderParam("owner") String owner, - @HeaderParam("deviceId") String deviceId, - @HeaderParam("protocol") String protocol, - @Context HttpServletResponse response) { + @HeaderParam("deviceId") String deviceId, + @HeaderParam("protocol") String protocol, + @Context HttpServletResponse response) { String replyMsg = ""; DeviceValidator deviceValidator = new DeviceValidator(); try { if (!deviceValidator.isExist(owner, SUPER_TENANT, new DeviceIdentifier(deviceId, VirtualFireAlarmConstants - .DEVICE_TYPE))) { + .DEVICE_TYPE))) { response.setStatus(Response.Status.UNAUTHORIZED.getStatusCode()); return "Unauthorized Access"; } @@ -267,25 +242,24 @@ public class VirtualFireAlarmControllerService { try { switch (protocol) { case HTTP_PROTOCOL: - log.info("Sending request to read sonar value at : " + deviceIp + - " via " + HTTP_PROTOCOL); + log.info("Sending request to read sonar value at : " + deviceIp + " via " + HTTP_PROTOCOL); + replyMsg = sendCommandViaHTTP(deviceIp, 9090, VirtualFireAlarmConstants.SONAR_CONTEXT, false); + break; - replyMsg = sendCommandViaHTTP(deviceIp, 80, SONAR_CONTEXT, false); + case MQTT_PROTOCOL: + log.info("Sending request to read sonar value at : " + deviceIp + " via " + MQTT_PROTOCOL); + sendCommandViaMQTT(owner, deviceId, VirtualFireAlarmConstants.SONAR_CONTEXT.replace("/", ""), ""); break; case XMPP_PROTOCOL: - log.info("Sending request to read sonar value at : " + deviceIp + - " via " + - XMPP_PROTOCOL); - replyMsg = sendCommandViaXMPP(owner, deviceId, SONAR_CONTEXT, "."); + log.info("Sending request to read sonar value at : " + deviceIp + " via " + XMPP_PROTOCOL); + replyMsg = sendCommandViaXMPP(owner, deviceId, VirtualFireAlarmConstants.SONAR_CONTEXT, "."); break; default: if (protocol == null) { - log.info("Sending request to read sonar value at : " + deviceIp + - " via " + HTTP_PROTOCOL); - - replyMsg = sendCommandViaHTTP(deviceIp, 80, SONAR_CONTEXT, false); + log.info("Sending request to read sonar value at : " + deviceIp + " via " + HTTP_PROTOCOL); + replyMsg = sendCommandViaHTTP(deviceIp, 9090, VirtualFireAlarmConstants.SONAR_CONTEXT, false); } else { replyMsg = "Requested protocol '" + protocol + "' is not supported"; response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode()); @@ -308,16 +282,16 @@ public class VirtualFireAlarmControllerService { @Path("/readtemperature") @GET public String requestTemperature(@HeaderParam("owner") String owner, - @HeaderParam("deviceId") String deviceId, - @HeaderParam("protocol") String protocol, - @Context HttpServletResponse response) { + @HeaderParam("deviceId") String deviceId, + @HeaderParam("protocol") String protocol, + @Context HttpServletResponse response) { String replyMsg = ""; DeviceValidator deviceValidator = new DeviceValidator(); try { if (!deviceValidator.isExist(owner, SUPER_TENANT, new DeviceIdentifier(deviceId, VirtualFireAlarmConstants - .DEVICE_TYPE))) { + .DEVICE_TYPE))) { response.setStatus(Response.Status.UNAUTHORIZED.getStatusCode()); return "Unauthorized Access"; } @@ -338,26 +312,25 @@ public class VirtualFireAlarmControllerService { try { switch (protocol) { case HTTP_PROTOCOL: - log.info("Sending request to read virtual-firealarm-temperature at : " + deviceIp + - " via " + HTTP_PROTOCOL); + log.info("Sending request to read virtual-firealarm-temperature at : " + deviceIp + " via " + HTTP_PROTOCOL); + replyMsg = sendCommandViaHTTP(deviceIp, 9090, VirtualFireAlarmConstants.TEMPERATURE_CONTEXT, false); + break; - replyMsg = sendCommandViaHTTP(deviceIp, 80, TEMPERATURE_CONTEXT, false); + case MQTT_PROTOCOL: + log.info("Sending request to read virtual-firealarm-temperature at : " + deviceIp + " via " + MQTT_PROTOCOL); + sendCommandViaMQTT(owner, deviceId, VirtualFireAlarmConstants.TEMPERATURE_CONTEXT.replace("/", ""), ""); break; case XMPP_PROTOCOL: - log.info("Sending request to read virtual-firealarm-temperature at : " + deviceIp + - " via " + - XMPP_PROTOCOL); - replyMsg = sendCommandViaXMPP(owner, deviceId, TEMPERATURE_CONTEXT, "."); + log.info("Sending request to read virtual-firealarm-temperature at : " + deviceIp + " via " + XMPP_PROTOCOL); + replyMsg = sendCommandViaXMPP(owner, deviceId, VirtualFireAlarmConstants.TEMPERATURE_CONTEXT, "."); // replyMsg = requestTemperatureViaXMPP(response); break; default: if (protocol == null) { - log.info("Sending request to read virtual-firealarm-temperature at : " + deviceIp + - " via " + HTTP_PROTOCOL); - - replyMsg = sendCommandViaHTTP(deviceIp, 80, TEMPERATURE_CONTEXT, false); + log.info("Sending request to read virtual-firealarm-temperature at : " + deviceIp + " via " + HTTP_PROTOCOL); + replyMsg = sendCommandViaHTTP(deviceIp, 9090, VirtualFireAlarmConstants.TEMPERATURE_CONTEXT, false); } else { replyMsg = "Requested protocol '" + protocol + "' is not supported"; response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode()); @@ -439,12 +412,14 @@ public class VirtualFireAlarmControllerService { if (registeredIp == null) { log.warn("Unregistered IP: Temperature Data Received from an un-registered IP " + - deviceIp + " for device ID - " + deviceId); + deviceIp + " for device ID - " + deviceId); response.setStatus(Response.Status.PRECONDITION_FAILED.getStatusCode()); return; } else if (!registeredIp.equals(deviceIp)) { log.warn("Conflicting IP: Received IP is " + deviceIp + ". Device with ID " + - deviceId + " is already registered under some other IP. Re-registration " + "required"); + deviceId + + " is already registered under some other IP. Re-registration " + + "required"); response.setStatus(Response.Status.CONFLICT.getStatusCode()); return; } @@ -454,12 +429,13 @@ public class VirtualFireAlarmControllerService { ctx.setTenantDomain(SUPER_TENANT, true); DeviceAnalyticsService deviceAnalyticsService = (DeviceAnalyticsService) ctx .getOSGiService(DeviceAnalyticsService.class, null); - Object metdaData[] = {dataMsg.owner, VirtualFireAlarmConstants.DEVICE_TYPE, dataMsg.deviceId, - System.currentTimeMillis()}; + Object metdaData[] = + {dataMsg.owner, VirtualFireAlarmConstants.DEVICE_TYPE, dataMsg.deviceId, + System.currentTimeMillis()}; Object payloadData[] = {temperature}; try { deviceAnalyticsService.publishEvent(TEMPERATURE_STREAM_DEFINITION, "1.0.0", - metdaData, new Object[0], payloadData); + metdaData, new Object[0], payloadData); } catch (DataPublisherConfigurationException e) { response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); @@ -551,7 +527,7 @@ public class VirtualFireAlarmControllerService { private String sendCommandViaXMPP(String deviceOwner, String deviceId, String resource, - String state) throws DeviceManagementException { + String state) throws DeviceManagementException { String replyMsg = ""; String scriptArguments = ""; @@ -636,7 +612,7 @@ public class VirtualFireAlarmControllerService { private boolean sendCommandViaMQTT(String deviceOwner, String deviceId, String resource, - String state) throws DeviceManagementException { + String state) throws DeviceManagementException { boolean result = false; DeviceController deviceController = new DeviceController(); @@ -644,7 +620,7 @@ public class VirtualFireAlarmControllerService { try { result = deviceController.publishMqttControl(deviceOwner, VirtualFireAlarmConstants.DEVICE_TYPE, - deviceId, resource, state); + deviceId, resource, state); } catch (DeviceControllerException e) { String errorMsg = "Error whilst trying to publish to MQTT Queue"; log.error(errorMsg); @@ -655,16 +631,16 @@ public class VirtualFireAlarmControllerService { private String sendCommandViaHTTP(final String deviceIp, int deviceServerPort, - String callUrlPattern, - boolean fireAndForgot) + String callUrlPattern, + boolean fireAndForgot) throws DeviceManagementException { if (deviceServerPort == 0) { - deviceServerPort = 80; + deviceServerPort = 9090; } String responseMsg = ""; - String urlString = URL_PREFIX + deviceIp + ":" + deviceServerPort + callUrlPattern; + String urlString = VirtualFireAlarmConstants.URL_PREFIX + deviceIp + ":" + deviceServerPort + callUrlPattern; if (log.isDebugEnabled()) { log.debug(urlString); @@ -740,7 +716,7 @@ public class VirtualFireAlarmControllerService { /* This methods creates and returns a http connection object */ private HttpURLConnection getHttpConnection(String urlString) throws - DeviceManagementException { + DeviceManagementException { URL connectionUrl = null; HttpURLConnection httpConnection = null; @@ -803,5 +779,4 @@ public class VirtualFireAlarmControllerService { return completeResponse.toString(); } - } diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/VirtualFireAlarmMQTTSubscriber.java b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/VirtualFireAlarmMQTTSubscriber.java new file mode 100644 index 00000000..09bee147 --- /dev/null +++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/VirtualFireAlarmMQTTSubscriber.java @@ -0,0 +1,81 @@ +package org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.wso2.carbon.device.mgt.common.DeviceManagementException; +import org.wso2.carbon.device.mgt.iot.common.controlqueue.mqtt.MqttConfig; +import org.wso2.carbon.device.mgt.iot.common.controlqueue.mqtt.MqttSubscriber; +import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.constants + .VirtualFireAlarmConstants; + +import java.io.File; +import java.util.UUID; + +public class VirtualFireAlarmMqttSubscriber extends MqttSubscriber { + private static Log log = LogFactory.getLog(VirtualFireAlarmMqttSubscriber.class); + + private static final String subscribeTopic = + "wso2" + File.separator + "iot" + File.separator + "+" + File.separator + + VirtualFireAlarmConstants.DEVICE_TYPE + File.separator + "+" + File.separator + + "reply"; + private static final String iotServerSubscriber = UUID.randomUUID().toString().substring(0, 5); + private static String mqttEndpoint; + + private VirtualFireAlarmMqttSubscriber() { + super(iotServerSubscriber, VirtualFireAlarmConstants.DEVICE_TYPE, + MqttConfig.getInstance().getMqttQueueEndpoint(), subscribeTopic); + } + + public void initConnector() { + mqttEndpoint = MqttConfig.getInstance().getMqttQueueEndpoint(); + } + + public void connectAndSubscribe() { + try { + super.connectAndSubscribe(); + } catch (DeviceManagementException e) { + log.error("Subscription to MQTT Broker at: " + mqttEndpoint + " failed"); + retryMQTTSubscription(); + } + } + + @Override + protected void postMessageArrived(String topic, MqttMessage message) { + log.info("Message " + message.toString() + " was received for topic: " + topic); + } + + private void retryMQTTSubscription() { + Thread retryToSubscribe = new Thread() { + @Override + public void run() { + while (true) { + if (!isConnected()) { + if (log.isDebugEnabled()) { + log.debug("Subscriber re-trying to reach MQTT queue...."); + } + + try { + VirtualFireAlarmMqttSubscriber.super.connectAndSubscribe(); + } catch (DeviceManagementException e1) { + if (log.isDebugEnabled()) { + log.debug("Attempt to re-connect to MQTT-Queue failed"); + } + } + } else { + break; + } + + try { + Thread.sleep(5000); + } catch (InterruptedException e1) { + log.error("MQTT: Thread S;eep Interrupt Exception"); + } + } + } + }; + + retryToSubscribe.setDaemon(true); + retryToSubscribe.start(); + } +} diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/VirtualFireAlarmXMPPConnector.java b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/VirtualFireAlarmXMPPConnector.java new file mode 100644 index 00000000..a4ff2539 --- /dev/null +++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/VirtualFireAlarmXMPPConnector.java @@ -0,0 +1,88 @@ +package org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jivesoftware.smack.packet.Message; +import org.wso2.carbon.device.mgt.common.DeviceManagementException; +import org.wso2.carbon.device.mgt.iot.common.controlqueue.xmpp.XmppConfig; +import org.wso2.carbon.device.mgt.iot.common.controlqueue.xmpp.XmppConnector; + +public class VirtualFireAlarmXmppConnector extends XmppConnector { + private static Log log = LogFactory.getLog(VirtualFireAlarmXmppConnector.class); + + private static String xmppServerIP; + // private static int xmppServerPort; + private static String xmppAdminUsername; + private static String xmppAdminPassword; + private static String xmppAdminAccountJID; + + private VirtualFireAlarmXmppConnector() { + super(XmppConfig.getInstance().getXmppServerIP(), + XmppConfig.getInstance().getSERVER_CONNECTION_PORT()); + } + + public void initConnector() { + xmppServerIP = XmppConfig.getInstance().getXmppServerIP(); + xmppAdminUsername = XmppConfig.getInstance().getXmppUsername(); + xmppAdminPassword = XmppConfig.getInstance().getXmppPassword(); + xmppAdminAccountJID = xmppAdminUsername + "@" + xmppServerIP; + } + + public void connectAndLogin() { + try { + super.connectAndLogin(xmppAdminUsername, xmppAdminPassword, null); + super.setMessageFilterOnReceiver(xmppAdminAccountJID); + } catch (DeviceManagementException e) { + log.error("Connect/Login attempt to XMPP Server at: " + xmppServerIP + " failed"); + retryXMPPConnection(); + } + } + + @Override + protected void processXMPPMessage(Message xmppMessage) { + String from = xmppMessage.getFrom(); + String message = xmppMessage.getBody(); + log.info("Received XMPP message '" + message + "' from " + from); + } + + private void retryXMPPConnection() { + Thread retryToConnect = new Thread() { + @Override + public void run() { + + while (true) { + if (!isConnected()) { + if (log.isDebugEnabled()) { + log.debug("Re-trying to reach XMPP Server...."); + } + + try { + VirtualFireAlarmXmppConnector.super.connectAndLogin(xmppAdminUsername, + xmppAdminPassword, + null); + VirtualFireAlarmXmppConnector.super.setMessageFilterOnReceiver( + xmppAdminAccountJID); + } catch (DeviceManagementException e1) { + if (log.isDebugEnabled()) { + log.debug("Attempt to re-connect to XMPP-Server failed"); + } + } + } else { + break; + } + + try { + Thread.sleep(5000); + } catch (InterruptedException e1) { + log.error("XMPP: Thread Sleep Interrupt Exception"); + } + } + } + }; + + retryToConnect.setDaemon(true); + retryToConnect.start(); + } + + +} diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/mqtt/MQTTClient.java b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/mqtt/MQTTClient.java deleted file mode 100644 index 975d7c75..00000000 --- a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/mqtt/MQTTClient.java +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. - * - * WSO2 Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.mqtt; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.eclipse.paho.client.mqttv3.MqttSecurityException; -import org.wso2.carbon.device.mgt.common.DeviceManagementException; - -import java.io.File; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * This class contains the Agent specific implementation for all the MQTT functionality. This - * includes connecting to a MQTT Broker & subscribing to the appropriate MQTT-topic, action plan - * upon losing connection or successfully delivering a message to the broker and processing - * incoming messages. Makes use of the 'Paho-MQTT' library provided by Eclipse Org. - *

- * It is an abstract class with an abstract method 'postMessageArrived' allowing the user to have - * their own implementation of the actions to be taken upon receiving a message to the subscribed - * MQTT-Topic. - */ -public abstract class MQTTClient implements MqttCallback { - private static final Log log = LogFactory.getLog(MQTTClient.class); - - private MqttClient client; - private String clientId; - private MqttConnectOptions options; - private String subscribeTopic; - private String clientWillTopic; - private String mqttBrokerEndPoint; - private int reConnectionInterval; - - /** - * Constructor for the MQTTClient which takes in the owner, type of the device and the MQTT - * Broker URL and the topic to subscribe. - * - * @param deviceOwner the owner of the device. - * @param deviceType the CDMF Device-Type of the device. - * @param mqttBrokerEndPoint the IP/URL of the MQTT broker endpoint. - * @param subscribeTopic the MQTT topic to which the client is to be subscribed - */ - protected MQTTClient(String deviceOwner, String deviceType, String mqttBrokerEndPoint, - String subscribeTopic) { - this.clientId = deviceOwner + ":" + deviceType; - this.subscribeTopic = subscribeTopic; - this.clientWillTopic = deviceType + File.separator + "disconnection"; - this.mqttBrokerEndPoint = mqttBrokerEndPoint; - this.reConnectionInterval = 5000; - this.initSubscriber(); - } - - /** - * Constructor for the MQTTClient which takes in the owner, type of the device and the MQTT - * Broker URL and the topic to subscribe. Additionally this constructor takes in the - * reconnection-time interval between successive attempts to connect to the broker. - * - * @param deviceOwner the owner of the device. - * @param deviceType the CDMF Device-Type of the device. - * @param mqttBrokerEndPoint the IP/URL of the MQTT broker endpoint. - * @param subscribeTopic the MQTT topic to which the client is to be subscribed - * @param reConnectionInterval time interval in SECONDS between successive attempts to connect - * to the broker. - */ - protected MQTTClient(String deviceOwner, String deviceType, String mqttBrokerEndPoint, - String subscribeTopic, int reConnectionInterval) { - this.clientId = deviceOwner + ":" + deviceType; - this.subscribeTopic = subscribeTopic; - this.clientWillTopic = deviceType + File.separator + "disconnection"; - this.mqttBrokerEndPoint = mqttBrokerEndPoint; - this.reConnectionInterval = reConnectionInterval; - this.initSubscriber(); - } - - /** - * Initializes the MQTT-Client. - * Creates a client using the given MQTT-broker endpoint and the clientId (which is - * constructed by a concatenation of [deviceOwner]:[deviceType]). Also sets the client's - * options parameter with the clientWillTopic (in-case of connection failure) and other info. - * Also sets the call-back this current class. - */ - private void initSubscriber() { - try { - client = new MqttClient(this.mqttBrokerEndPoint, clientId, null); - log.info("MQTT subscriber was created with ClientID : " + clientId); - } catch (MqttException ex) { - String errorMsg = "MQTT Client Error\n" + "\tReason: " + ex.getReasonCode() + - "\n\tMessage: " + ex.getMessage() + "\n\tLocalMsg: " + - ex.getLocalizedMessage() + "\n\tCause: " + ex.getCause() + - "\n\tException: " + ex; - log.error(errorMsg); - } - - options = new MqttConnectOptions(); - options.setCleanSession(false); - options.setWill(clientWillTopic, "connection crashed".getBytes(StandardCharsets.UTF_8), 2, - true); - client.setCallback(this); - } - - /** - * Checks whether the connection to the MQTT-Broker persists. - * - * @return true if the client is connected to the MQTT-Broker, else false. - */ - public boolean isConnected() { - return client.isConnected(); - } - - /** - * Connects to the MQTT-Broker and if successfully established connection, then tries to - * subscribe to the MQTT-Topic specific to the device. (The MQTT-Topic specific to the - * device is created is taken in as a constructor parameter of this class) . - * - * @throws DeviceManagementException in the event of 'Connecting to' or 'Subscribing to' the - * MQTT broker fails. - */ - public void connectAndSubscribe() throws DeviceManagementException { - try { - client.connect(options); - - if (log.isDebugEnabled()) { - log.debug("Subscriber connected to queue at: " + this.mqttBrokerEndPoint); - } - } catch (MqttSecurityException ex) { - String errorMsg = "MQTT Security Exception when connecting to queue\n" + "\tReason: " + - " " + - ex.getReasonCode() + "\n\tMessage: " + ex.getMessage() + - "\n\tLocalMsg: " + ex.getLocalizedMessage() + "\n\tCause: " + - ex.getCause() + "\n\tException: " + ex; //throw - if (log.isDebugEnabled()) { - log.debug(errorMsg); - } - throw new DeviceManagementException(errorMsg, ex); - - } catch (MqttException ex) { - String errorMsg = "MQTT Exception when connecting to queue\n" + "\tReason: " + - ex.getReasonCode() + "\n\tMessage: " + ex.getMessage() + - "\n\tLocalMsg: " + ex.getLocalizedMessage() + "\n\tCause: " + - ex.getCause() + "\n\tException: " + ex; //throw - if (log.isDebugEnabled()) { - log.debug(errorMsg); - } - throw new DeviceManagementException(errorMsg, ex); - } - - try { - client.subscribe(subscribeTopic, 0); - - log.info("Subscriber - " + clientId + " subscribed to topic: " + subscribeTopic); - } catch (MqttException ex) { - String errorMsg = "MQTT Exception when trying to subscribe to topic: " + - subscribeTopic + "\n\tReason: " + ex.getReasonCode() + - "\n\tMessage: " + ex.getMessage() + "\n\tLocalMsg: " + - ex.getLocalizedMessage() + "\n\tCause: " + ex.getCause() + - "\n\tException: " + ex; - if (log.isDebugEnabled()) { - log.debug(errorMsg); - } - throw new DeviceManagementException(errorMsg, ex); - } - } - - /** - * Callback method which is triggered once the MQTT client losers its connection to the broker. - * A scheduler thread is spawned to continuously re-attempt and connect to the broker and - * subscribe to the device's topic. This thread is scheduled to execute after every break - * equal to that of the 'reConnectionInterval' of the MQTTClient. - * - * @param throwable a Throwable Object containing the details as to why the failure occurred. - */ - public void connectionLost(Throwable throwable) { - log.warn("Lost Connection for client: " + this.clientId + " to " + this.mqttBrokerEndPoint + ".\nThis was due to - " + throwable.getMessage()); - - Runnable reSubscriber = new Runnable() { - @Override - public void run() { - if (!isConnected()) { - if (log.isDebugEnabled()) { - log.debug("Subscriber reconnecting to queue........"); - } - try { - connectAndSubscribe(); - } catch (DeviceManagementException e) { - if (log.isDebugEnabled()) { - log.debug("Could not reconnect and subscribe to ControlQueue."); - } - } - } else { - return; - } - } - }; - - ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); - service.scheduleAtFixedRate(reSubscriber, 0, this.reConnectionInterval, TimeUnit.SECONDS); - } - - /** - * Callback method which is triggered upon receiving a MQTT Message from the broker. Spawns a - * new thread that executes any actions to be taken with the received message. - * - * @param topic the MQTT-Topic to which the received message was published to and the - * client was subscribed to. - * @param mqttMessage the actual MQTT-Message that was received from the broker. - */ - public void messageArrived(final String topic, final MqttMessage mqttMessage) { - Thread subscriberThread = new Thread() { - public void run() { - postMessageArrived(topic, mqttMessage); - } - }; - subscriberThread.start(); - } - - /** - * Callback method which gets triggered upon successful completion of a message delivery to - * the broker. - * - * @param iMqttDeliveryToken the MQTT-DeliveryToken which includes the details about the - * specific message delivery. - */ - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - String message = ""; - try { - message = iMqttDeliveryToken.getMessage().toString(); - } catch (MqttException e) { - log.error("Error occurred whilst trying to read the message from the MQTT delivery token."); - } - String topic = iMqttDeliveryToken.getTopics()[0]; - String client = iMqttDeliveryToken.getClient().getClientId(); - log.info("Message - '" + message + "' of client [" + client + "] for the topic (" + topic + ") was delivered successfully."); - } - - /** - * This is an abstract method used for post processing the received MQTT-message. This - * method will be implemented as per requirement at the time of creating an object of this - * class. - * - * @param topic The topic for which the message was received for. - * @param message The message received for the subscription to the above topic. - */ - protected abstract void postMessageArrived(String topic, MqttMessage message); - - /** - * Gets the MQTTClient object. - * - * @return the MQTTClient object. - */ - public MqttClient getClient() { - return client; - } - -} - diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/xmpp/XMPPClient.java b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/xmpp/XMPPClient.java deleted file mode 100644 index 8391b1cb..00000000 --- a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/xmpp/XMPPClient.java +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. - * - * WSO2 Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.xmpp; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.jivesoftware.smack.ConnectionConfiguration; -import org.jivesoftware.smack.PacketListener; -import org.jivesoftware.smack.SmackConfiguration; -import org.jivesoftware.smack.XMPPConnection; -import org.jivesoftware.smack.XMPPException; -import org.jivesoftware.smack.filter.AndFilter; -import org.jivesoftware.smack.filter.FromContainsFilter; -import org.jivesoftware.smack.filter.PacketFilter; -import org.jivesoftware.smack.filter.PacketTypeFilter; -import org.jivesoftware.smack.packet.Message; -import org.jivesoftware.smack.packet.Packet; -import org.wso2.carbon.device.mgt.common.DeviceManagementException; - -/** - * This class contains the Agent specific implementation for all the XMPP functionality. This - * includes connecting to a XMPP Server & Login using the device's XMPP-Account, Setting - * listeners and filters on incoming XMPP messages and Sending XMPP replies for control signals - * received. Makes use of the 'Smack-XMPP' library provided by jivesoftware/igniterealtime. - *

- * It is an abstract class with an abstract method 'processXMPPMessage' allowing the user to have - * their own implementation of the actions to be taken upon receiving an appropriate XMPP message. - */ -public abstract class XMPPClient { - private static final Log log = LogFactory.getLog(XMPPClient.class); - - private int replyTimeoutInterval = 500; // millis - private String server; - private int port; - private ConnectionConfiguration config; - private XMPPConnection connection; - private PacketFilter filter; - private PacketListener listener; - - /** - * Constructor for XMPPClient passing server-IP and the XMPP-port. - * - * @param server the IP of the XMPP server. - * @param port the XMPP server's port to connect to. (default - 5222) - */ - public XMPPClient(String server, int port) { - this.server = server; - this.port = port; - initXMPPClient(); - } - - /** - * Initializes the XMPP Client. - * Sets the time-out-limit whilst waiting for XMPP-replies from server. Creates the XMPP - * configurations to connect to the server and creates the XMPPConnection object used for - * connecting and Logging-In. - */ - private void initXMPPClient() { - log.info(String.format( - "Initializing connection to XMPP Server at %1$s via port %2$d......", server, - port)); - SmackConfiguration.setPacketReplyTimeout(replyTimeoutInterval); - config = new ConnectionConfiguration(server, port); -// TODO:: Need to enable SASL-Authentication appropriately - config.setSASLAuthenticationEnabled(false); - config.setSecurityMode(ConnectionConfiguration.SecurityMode.disabled); - connection = new XMPPConnection(config); - } - - /** - * Connects to the XMPP-Server and if successfully established connection, then tries to Log - * in using the device's XMPP Account credentials. (The XMPP-Account specific to the device is - * created in the XMPP server whilst downloading the Agent from the IoT Server) . - * - * @param username the username of the device's XMPP-Account. - * @param password the password of the device's XMPP-Account. - * @param resource the resource the resource, specific to the XMPP-Account to which the login - * is made to - * @throws DeviceManagementException in the event of 'Connecting to' or 'Logging into' the - * XMPP server fails. - */ - public void connectAndLogin(String username, String password, String resource) - throws DeviceManagementException { - try { - connection.connect(); - log.info(String.format( - "Connection to XMPP Server at %1$s established successfully......", server)); - - } catch (XMPPException xmppExcepion) { - String errorMsg = - "Connection attempt to the XMPP Server at " + server + " via port " + port + - " failed."; - log.info(errorMsg); - throw new DeviceManagementException(errorMsg, xmppExcepion); - } - - if (connection.isConnected()) { - try { - if (resource == null) { - connection.login(username, password); - log.info(String.format("Logged into XMPP Server at %1$s as user %2$s......", server, username)); - } else { - connection.login(username, password, resource); - log.info(String.format( - "Logged into XMPP Server at %1$s as user %2$s on resource %3$s......", - server, username, resource)); - } - } catch (XMPPException xmppException) { - String errorMsg = - "Login attempt to the XMPP Server at " + server + " with username - " + - username + " failed."; - log.info(errorMsg); - throw new DeviceManagementException(errorMsg, xmppException); - } - } - } - - /** - * Sets a filter on all the incoming XMPP-Messages for the JID (XMPP-Account ID) passed in. - * Also creates a listener for the incoming messages and connects the listener to the - * XMPPConnection alongside the set filter. - * - * @param senderJID the JID (XMPP-Account ID) to which the filter is to be set. - */ - public void setMessageFilterAndListener(String senderJID) { - filter = new AndFilter(new PacketTypeFilter(Message.class), new - FromContainsFilter( - senderJID)); - listener = new PacketListener() { - @Override - public void processPacket(Packet packet) { - if (packet instanceof Message) { - final Message xmppMessage = (Message) packet; - Thread msgProcessThread = new Thread() { - public void run() { - processXMPPMessage(xmppMessage); - } - }; - msgProcessThread.start(); - } - } - }; - - connection.addPacketListener(listener, filter); - } - - - - /** - * Sends an XMPP message - * - * @param JID the JID (XMPP Account ID) to which the message is to be sent to. - * @param message the XMPP-Message that is to be sent. - */ - public void sendXMPPMessage(String JID, String message) { - sendXMPPMessage(JID, message, "Reply-From-Device"); - if (log.isDebugEnabled()) { - log.debug("Message: " + message + " to XMPP JID [" + JID + "] sent successfully"); - } - } - - /** - * Overloaded method to send an XMPP message. Includes the subject to be mentioned in the - * message that is sent. - * - * @param JID the JID (XMPP Account ID) to which the message is to be sent to. - * @param message the XMPP-Message that is to be sent. - * @param subject the subject that the XMPP-Message would carry. - */ - public void sendXMPPMessage(String JID, String message, String subject) { - Message xmppMessage = new Message(); - xmppMessage.setTo(JID); - xmppMessage.setSubject(subject); - xmppMessage.setBody(message); - xmppMessage.setType(Message.Type.chat); - connection.sendPacket(xmppMessage); - } - - /** - * Checks whether the connection to the XMPP-Server persists. - * - * @return true if the client is connected to the XMPP-Server, else false. - */ - public boolean isConnected() { - return connection.isConnected(); - } - - /** - * Sets the client's time-out-limit whilst waiting for XMPP-replies from server. - * - * @param millis the time in millis to be set as the time-out-limit whilst waiting for a - * XMPP-reply. - */ - public void setReplyTimeoutInterval(int millis) { - this.replyTimeoutInterval = millis; - } - - /** - * Disables default debugger provided by the XMPPConnection. - */ - public void disableDebugger() { - connection.DEBUG_ENABLED = false; - } - - /** - * Closes the connection to the XMPP Server. - */ - public void closeConnection() { - if (connection != null && connection.isConnected()) { - connection.disconnect(); - } - } - - /** - * This is an abstract method used for post processing the received XMPP-message. This - * method will be implemented as per requirement at the time of creating an object of this - * class. - * - * @param xmppMessage the xmpp message received by the listener. - */ - protected abstract void processXMPPMessage(Message xmppMessage); - -} - - diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/webapp/WEB-INF/cxf-servlet.xml b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/webapp/WEB-INF/cxf-servlet.xml index dc3ddaab..94f945ac 100644 --- a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/webapp/WEB-INF/cxf-servlet.xml +++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/webapp/WEB-INF/cxf-servlet.xml @@ -23,11 +23,12 @@ http://cxf.apache.org/jaxrs http://cxf.apache.org/schemas/jaxrs.xsd"> - + - + + @@ -46,6 +47,12 @@ + + + + + + diff --git a/pom.xml b/pom.xml index 2e0e8f13..faca78c4 100644 --- a/pom.xml +++ b/pom.xml @@ -804,6 +804,19 @@ commons-httpclient ${orbit.version.commons-httpclient} + + + + org.igniterealtime.smack.wso2 + smack + ${smack.wso2.version} + + + org.igniterealtime.smack.wso2 + smackx + ${smackx.wso2.version} + + @@ -927,6 +940,10 @@ 2.4 1.1.1 2.0.0.wso2v1 + + + 3.0.4.wso2v1 + 3.0.4.wso2v1