diff --git a/modules/samples/arduino/src/org.wso2.carbon.device.mgt.iot.sample.arduino.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/arduino/service/impl/ArduinoControllerService.java b/modules/samples/arduino/src/org.wso2.carbon.device.mgt.iot.sample.arduino.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/arduino/service/impl/ArduinoControllerService.java index f0545137..e64e9c30 100644 --- a/modules/samples/arduino/src/org.wso2.carbon.device.mgt.iot.sample.arduino.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/arduino/service/impl/ArduinoControllerService.java +++ b/modules/samples/arduino/src/org.wso2.carbon.device.mgt.iot.sample.arduino.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/arduino/service/impl/ArduinoControllerService.java @@ -48,7 +48,7 @@ public class ArduinoControllerService { public void setMqttArduinoSubscriber(MqttArduinoSubscriber mqttArduinoSubscriber) { ArduinoControllerService.mqttArduinoSubscriber = mqttArduinoSubscriber; try { - mqttArduinoSubscriber.subscribe(); + mqttArduinoSubscriber.connectAndSubscribe(); } catch (DeviceManagementException e) { log.error(e.getErrorMessage()); } diff --git a/modules/samples/arduino/src/org.wso2.carbon.device.mgt.iot.sample.arduino.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/arduino/service/impl/util/MqttArduinoSubscriber.java b/modules/samples/arduino/src/org.wso2.carbon.device.mgt.iot.sample.arduino.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/arduino/service/impl/util/MqttArduinoSubscriber.java index f465c7b0..0c854153 100644 --- a/modules/samples/arduino/src/org.wso2.carbon.device.mgt.iot.sample.arduino.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/arduino/service/impl/util/MqttArduinoSubscriber.java +++ b/modules/samples/arduino/src/org.wso2.carbon.device.mgt.iot.sample.arduino.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/arduino/service/impl/util/MqttArduinoSubscriber.java @@ -26,10 +26,12 @@ import org.wso2.carbon.device.mgt.iot.common.controlqueue.mqtt.MqttSubscriber; import java.io.File; import java.util.LinkedList; +import java.util.UUID; public class MqttArduinoSubscriber extends MqttSubscriber { private static Log log = LogFactory.getLog(MqttArduinoSubscriber.class); + private static final String iotServerSubscriber = UUID.randomUUID().toString().substring(0,5); private static final String subscribetopic = "wso2" + File.separator + "iot" + File.separator + "+" + File.separator + ArduinoConstants.DEVICE_TYPE + File.separator + "#"; @@ -37,8 +39,7 @@ public class MqttArduinoSubscriber extends MqttSubscriber { //make it singleton private MqttArduinoSubscriber() { - - super("Subscriber", ArduinoConstants.DEVICE_TYPE, MqttConfig.getInstance().getMqttQueueEndpoint(), + super(iotServerSubscriber, ArduinoConstants.DEVICE_TYPE, MqttConfig.getInstance().getMqttQueueEndpoint(), subscribetopic); } diff --git a/modules/samples/pom.xml b/modules/samples/pom.xml index 2fcdc9ae..17040df8 100644 --- a/modules/samples/pom.xml +++ b/modules/samples/pom.xml @@ -201,7 +201,7 @@ <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>mqtt-client</artifactId> - <version>${eclipse.paho.version}</version> + <version>${paho.mqtt.version}</version> <scope>provided</scope> </dependency> <dependency> @@ -441,7 +441,6 @@ <orbit.tomcat.version>7.0.52.wso2v5</orbit.tomcat.version> <orbit.tomcat.jdbc.pooling.version>7.0.34.wso2v2</orbit.tomcat.jdbc.pooling.version> - <eclipse.paho.version>0.4.0</eclipse.paho.version> <google.gson.version>2.2.4</google.gson.version> diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/plugin/constants/VirtualFireAlarmConstants.java b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/plugin/constants/VirtualFireAlarmConstants.java index b7f0c43e..e780ee83 100644 --- a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/plugin/constants/VirtualFireAlarmConstants.java +++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/plugin/constants/VirtualFireAlarmConstants.java @@ -22,4 +22,9 @@ public class VirtualFireAlarmConstants { public final static String DEVICE_PLUGIN_DEVICE_ID = "VIRTUAL_FIREALARM_DEVICE_ID"; public final static String STATE_ON = "ON"; public final static String STATE_OFF = "OFF"; + + public static final String URL_PREFIX = "http://"; + public static final String BULB_CONTEXT = "/BULB/"; + public static final String SONAR_CONTEXT = "/HUMIDITY/"; + public static final String TEMPERATURE_CONTEXT = "/TEMPERATURE/"; } diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/plugin/impl/VirtualFireAlarmManager.java b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/plugin/impl/VirtualFireAlarmManager.java index 4defa1b7..eefd20ae 100644 --- a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/plugin/impl/VirtualFireAlarmManager.java +++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/plugin/impl/VirtualFireAlarmManager.java @@ -49,7 +49,6 @@ public class VirtualFireAlarmManager implements DeviceManager { private static final Log log = LogFactory.getLog(VirtualFireAlarmManager.class); - @Override public FeatureManager getFeatureManager() { return null; diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/plugin/internal/VirtualFirealarmManagementServiceComponent.java b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/plugin/internal/VirtualFirealarmManagementServiceComponent.java index 858f9df9..ddc17eb6 100644 --- a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/plugin/internal/VirtualFirealarmManagementServiceComponent.java +++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/plugin/internal/VirtualFirealarmManagementServiceComponent.java @@ -25,11 +25,13 @@ import org.osgi.framework.ServiceRegistration; import org.osgi.service.component.ComponentContext; import org.wso2.carbon.device.mgt.common.spi.DeviceManagementService; import org.wso2.carbon.device.mgt.iot.common.service.DeviceTypeService; -import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl.VirtualFireAlarmManagerService; +import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl + .VirtualFireAlarmManagerService; /** - * @scr.component name="org.wso2.carbon.device.mgt.iot.firealarm.internal.VirtualFirealarmManagementServiceComponent" + * @scr.component name="org.wso2.carbon.device.mgt.iot.firealarm.internal + * .VirtualFirealarmManagementServiceComponent" * immediate="true" * @scr.reference name="wso2.carbon.device.mgt.iot.common.DeviceTypeService" * interface="org.wso2.carbon.device.mgt.iot.common.service.DeviceTypeService" @@ -39,66 +41,69 @@ import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl.Virtu * unbind="unsetDeviceTypeService" */ public class VirtualFirealarmManagementServiceComponent { - - private ServiceRegistration firealarmServiceRegRef; - - - private static final Log log = LogFactory.getLog(VirtualFirealarmManagementServiceComponent.class); - protected void activate(ComponentContext ctx) { - if (log.isDebugEnabled()) { - log.debug("Activating Virtual Firealarm Device Management Service Component"); - } - try { - BundleContext bundleContext = ctx.getBundleContext(); - - - firealarmServiceRegRef = - bundleContext.registerService(DeviceManagementService.class.getName(), - new VirtualFireAlarmManagerService(), - null); - - - - if (log.isDebugEnabled()) { - log.debug("Virtual Firealarm Device Management Service Component has been successfully activated"); - } - } catch (Throwable e) { - log.error("Error occurred while activating Virtual Firealarm Device Management Service Component", e); - } - } - - protected void deactivate(ComponentContext ctx) { - if (log.isDebugEnabled()) { - log.debug("De-activating Virtual Firealarm Device Management Service Component"); - } - try { - if (firealarmServiceRegRef != null) { - firealarmServiceRegRef.unregister(); - } - - if (log.isDebugEnabled()) { - log.debug( - "Virtual Firealarm Device Management Service Component has been successfully de-activated"); - } - } catch (Throwable e) { - log.error("Error occurred while de-activating Virtual Firealarm Device Management bundle", e); - } - } - - protected void setDeviceTypeService(DeviceTypeService deviceTypeService) { + private ServiceRegistration firealarmServiceRegRef; + + private static final Log log = LogFactory.getLog( + VirtualFirealarmManagementServiceComponent.class); + + protected void activate(ComponentContext ctx) { + if (log.isDebugEnabled()) { + log.debug("Activating Virtual Firealarm Device Management Service Component"); + } + try { + BundleContext bundleContext = ctx.getBundleContext(); + firealarmServiceRegRef = + bundleContext.registerService(DeviceManagementService.class.getName(), + new VirtualFireAlarmManagerService(), null); + + if (log.isDebugEnabled()) { + log.debug( + "Virtual Firealarm Device Management Service Component has been " + + "successfully activated"); + } + } catch (Throwable e) { + log.error( + "Error occurred while activating Virtual Firealarm Device Management Service " + + "Component", + e); + } + } + + protected void deactivate(ComponentContext ctx) { + if (log.isDebugEnabled()) { + log.debug("De-activating Virtual Firealarm Device Management Service Component"); + } + try { + if (firealarmServiceRegRef != null) { + firealarmServiceRegRef.unregister(); + } + + if (log.isDebugEnabled()) { + log.debug( + "Virtual Firealarm Device Management Service Component has been " + + "successfully de-activated"); + } + } catch (Throwable e) { + log.error( + "Error occurred while de-activating Virtual Firealarm Device Management " + + "bundle", + e); + } + } + + protected void setDeviceTypeService(DeviceTypeService deviceTypeService) { /* This is to avoid this component getting initialized before the common registered */ - if (log.isDebugEnabled()) { - log.debug("Data source service set to mobile service component"); - } - } + if (log.isDebugEnabled()) { + log.debug("Data source service set to mobile service component"); + } + } - protected void unsetDeviceTypeService(DeviceTypeService deviceTypeService) { - //do nothing - } + protected void unsetDeviceTypeService(DeviceTypeService deviceTypeService) { + //do nothing + } - } diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/pom.xml b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/pom.xml index d5ded8e9..3db470bd 100644 --- a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/pom.xml +++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/pom.xml @@ -98,16 +98,6 @@ <artifactId>org.wso2.carbon.device.mgt.analytics</artifactId> </dependency> - <!--Dependencies on XMPP Client Library--> - <dependency> - <groupId>org.igniterealtime.smack.wso2</groupId> - <artifactId>smack</artifactId> - </dependency> - <dependency> - <groupId>org.igniterealtime.smack.wso2</groupId> - <artifactId>smackx</artifactId> - </dependency> - </dependencies> diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmService.java b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmService.java index a181949e..d75a3236 100644 --- a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmService.java +++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmService.java @@ -24,8 +24,6 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.concurrent.FutureCallback; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.impl.nio.client.HttpAsyncClients; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.jivesoftware.smack.packet.Message; import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.device.mgt.analytics.exception.DataPublisherConfigurationException; import org.wso2.carbon.device.mgt.analytics.service.DeviceAnalyticsService; @@ -39,7 +37,6 @@ import org.wso2.carbon.device.mgt.iot.common.DeviceValidator; import org.wso2.carbon.device.mgt.iot.common.apimgt.AccessTokenInfo; import org.wso2.carbon.device.mgt.iot.common.apimgt.TokenClient; import org.wso2.carbon.device.mgt.iot.common.controlqueue.xmpp.XmppAccount; -import org.wso2.carbon.device.mgt.iot.common.controlqueue.mqtt.MqttConfig; import org.wso2.carbon.device.mgt.iot.common.controlqueue.xmpp.XmppConfig; import org.wso2.carbon.device.mgt.iot.common.controlqueue.xmpp.XmppServerClient; import org.wso2.carbon.device.mgt.iot.common.exception.AccessTokenException; @@ -49,8 +46,10 @@ import org.wso2.carbon.device.mgt.iot.common.util.ZipUtil; import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.constants .VirtualFireAlarmConstants; import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.DeviceJSON; -import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.mqtt.MQTTClient; -import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.xmpp.XMPPClient; +import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util + .VirtualFireAlarmMQTTSubscriber; +import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util + .VirtualFireAlarmXMPPConnector; import org.wso2.carbon.utils.CarbonUtils; import javax.servlet.http.HttpServletResponse; @@ -98,63 +97,36 @@ public class VirtualFireAlarmService { private HttpServletResponse response; private static final String TEMPERATURE_STREAM_DEFINITION = "org.wso2.iot.devices.temperature"; - - private static final String URL_PREFIX = "http://"; - private static final String BULB_CONTEXT = "/BULB/"; - private static final String SONAR_CONTEXT = "/SONAR/"; - private static final String TEMPERATURE_CONTEXT = "/TEMPERATURE/"; - public static final String XMPP_PROTOCOL = "XMPP"; public static final String HTTP_PROTOCOL = "HTTP"; public static final String MQTT_PROTOCOL = "MQTT"; - private static ConcurrentHashMap<String, String> deviceToIpMap = new ConcurrentHashMap<String, String>(); - private static XMPPClient xmppClient; - private static MQTTClient mqttClient; - private static final String mqttServerSubscribeTopic = "wso2/iot/+/" + VirtualFireAlarmConstants.DEVICE_TYPE + "/+/reply"; - private static final String iotServerSubscriber = "IoT-Server"; - -// static{ -// String xmppServer = XmppConfig.getInstance().getXmppControlQueue().getServerURL(); -// int indexOfChar = xmppServer.lastIndexOf('/'); -// if (indexOfChar != -1) { -// xmppServer = xmppServer.substring((indexOfChar + 1), xmppServer.length()); -// } -// -// int xmppPort = Integer.parseInt(XmppConfig.getInstance().getSERVER_CONNECTION_PORT()); -// xmppClient = new XMPPClient(xmppServer, xmppPort) { -// @Override -// protected void processXMPPMessage(Message xmppMessage) { -// -// } -// }; -// -// String xmppUsername = XmppConfig.getInstance().getXmppUsername(); -// String xmppPassword = XmppConfig.getInstance().getXmppPassword(); -// -// try { -// xmppClient.connectAndLogin(xmppUsername, xmppPassword, "iotServer"); -// } catch (DeviceManagementException e) { -// e.printStackTrace(); -// } -// -// xmppClient.setMessageFilterAndListener(""); -// -// String mqttEndpoint = MqttConfig.getInstance().getMqttQueueEndpoint(); -// mqttClient = new MQTTClient(iotServerSubscriber, VirtualFireAlarmConstants.DEVICE_TYPE, mqttEndpoint, mqttServerSubscribeTopic) { -// @Override -// protected void postMessageArrived(String topic, MqttMessage message) { -// -// } -// }; -// -// try { -// mqttClient.connectAndSubscribe(); -// } catch (DeviceManagementException e) { -// e.printStackTrace(); -// } -// } + private static VirtualFireAlarmMQTTSubscriber virtualFireAlarmMQTTSubscriber; + private static VirtualFireAlarmXMPPConnector virtualFireAlarmXMPPConnector; + private static ConcurrentHashMap<String, String> deviceToIpMap = + new ConcurrentHashMap<String, String>(); + public void setVirtualFireAlarmXMPPConnector( + VirtualFireAlarmXMPPConnector virtualFireAlarmXMPPConnector) { + this.virtualFireAlarmXMPPConnector = virtualFireAlarmXMPPConnector; + virtualFireAlarmXMPPConnector.initConnector(); + virtualFireAlarmXMPPConnector.connectAndLogin(); + } + + public void setVirtualFireAlarmMQTTSubscriber( + VirtualFireAlarmMQTTSubscriber virtualFireAlarmMQTTSubscriber) { + this.virtualFireAlarmMQTTSubscriber = virtualFireAlarmMQTTSubscriber; + virtualFireAlarmMQTTSubscriber.initConnector(); + virtualFireAlarmMQTTSubscriber.connectAndSubscribe(); + } + + public VirtualFireAlarmXMPPConnector getVirtualFireAlarmXMPPConnector() { + return virtualFireAlarmXMPPConnector; + } + + public VirtualFireAlarmMQTTSubscriber getVirtualFireAlarmMQTTSubscriber() { + return virtualFireAlarmMQTTSubscriber; + } @Path("manager/device/register") @PUT @@ -171,7 +143,6 @@ public class VirtualFireAlarmService { response.setStatus(Response.Status.CONFLICT.getStatusCode()); return false; } - Device device = new Device(); device.setDeviceIdentifier(deviceId); EnrolmentInfo enrolmentInfo = new EnrolmentInfo(); @@ -510,7 +481,7 @@ public class VirtualFireAlarmService { } String protocolString = protocol.toUpperCase(); - String callUrlPattern = BULB_CONTEXT + switchToState; + String callUrlPattern = VirtualFireAlarmConstants.BULB_CONTEXT + switchToState; log.info("Sending command: '" + callUrlPattern + "' to virtual-firealarm at: " + deviceIP + " " + "via" + " " + protocolString); @@ -518,19 +489,18 @@ public class VirtualFireAlarmService { try { switch (protocolString) { case HTTP_PROTOCOL: - sendCommandViaHTTP(deviceIP, 80, callUrlPattern, true); + sendCommandViaHTTP(deviceIP, 9090, callUrlPattern, true); break; case MQTT_PROTOCOL: - sendCommandViaMQTT(owner, deviceId, BULB_CONTEXT.replace("/", ""), - switchToState); + sendCommandViaMQTT(owner, deviceId, VirtualFireAlarmConstants.BULB_CONTEXT.replace("/", ""), + switchToState); break; case XMPP_PROTOCOL: -// requestBulbChangeViaXMPP(switchToState, response); - sendCommandViaXMPP(owner, deviceId, BULB_CONTEXT, switchToState); + sendCommandViaXMPP(owner, deviceId, VirtualFireAlarmConstants.BULB_CONTEXT, switchToState); break; default: if (protocolString == null) { - sendCommandViaHTTP(deviceIP, 80, callUrlPattern, true); + sendCommandViaHTTP(deviceIP, 9090, callUrlPattern, true); } else { response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode()); return; @@ -539,7 +509,7 @@ public class VirtualFireAlarmService { } } catch (DeviceManagementException e) { log.error("Failed to send command '" + callUrlPattern + "' to: " + deviceIP + " via" + - " " + protocol); + " " + protocol); response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); return; } @@ -581,25 +551,24 @@ public class VirtualFireAlarmService { try { switch (protocol) { case HTTP_PROTOCOL: - log.info("Sending request to read sonar value at : " + deviceIp + - " via " + HTTP_PROTOCOL); + log.info("Sending request to read sonar value at : " + deviceIp + " via " + HTTP_PROTOCOL); + replyMsg = sendCommandViaHTTP(deviceIp, 9090, VirtualFireAlarmConstants.SONAR_CONTEXT, false); + break; - replyMsg = sendCommandViaHTTP(deviceIp, 80, SONAR_CONTEXT, false); + case MQTT_PROTOCOL: + log.info("Sending request to read sonar value at : " + deviceIp + " via " + MQTT_PROTOCOL); + sendCommandViaMQTT(owner, deviceId, VirtualFireAlarmConstants.SONAR_CONTEXT.replace("/", ""), ""); break; case XMPP_PROTOCOL: - log.info("Sending request to read sonar value at : " + deviceIp + - " via " + - XMPP_PROTOCOL); - replyMsg = sendCommandViaXMPP(owner, deviceId, SONAR_CONTEXT, "."); + log.info("Sending request to read sonar value at : " + deviceIp + " via " + XMPP_PROTOCOL); + replyMsg = sendCommandViaXMPP(owner, deviceId, VirtualFireAlarmConstants.SONAR_CONTEXT, "."); break; default: if (protocol == null) { - log.info("Sending request to read sonar value at : " + deviceIp + - " via " + HTTP_PROTOCOL); - - replyMsg = sendCommandViaHTTP(deviceIp, 80, SONAR_CONTEXT, false); + log.info("Sending request to read sonar value at : " + deviceIp + " via " + HTTP_PROTOCOL); + replyMsg = sendCommandViaHTTP(deviceIp, 9090, VirtualFireAlarmConstants.SONAR_CONTEXT, false); } else { replyMsg = "Requested protocol '" + protocol + "' is not supported"; response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode()); @@ -630,8 +599,8 @@ public class VirtualFireAlarmService { DeviceValidator deviceValidator = new DeviceValidator(); try { if (!deviceValidator.isExist(owner, SUPER_TENANT, new DeviceIdentifier(deviceId, - VirtualFireAlarmConstants - .DEVICE_TYPE))) { + VirtualFireAlarmConstants + .DEVICE_TYPE))) { response.setStatus(Response.Status.UNAUTHORIZED.getStatusCode()); return "Unauthorized Access"; } @@ -652,26 +621,24 @@ public class VirtualFireAlarmService { try { switch (protocol) { case HTTP_PROTOCOL: - log.info("Sending request to read virtual-firealarm-temperature at : " + deviceIp + - " via " + HTTP_PROTOCOL); + log.info("Sending request to read virtual-firealarm-temperature at : " + deviceIp + " via " + HTTP_PROTOCOL); + replyMsg = sendCommandViaHTTP(deviceIp, 9090, VirtualFireAlarmConstants.TEMPERATURE_CONTEXT, false); + break; - replyMsg = sendCommandViaHTTP(deviceIp, 80, TEMPERATURE_CONTEXT, false); + case MQTT_PROTOCOL: + log.info("Sending request to read virtual-firealarm-temperature at : " + deviceIp + " via " + MQTT_PROTOCOL); + sendCommandViaMQTT(owner, deviceId, VirtualFireAlarmConstants.TEMPERATURE_CONTEXT.replace("/", ""), ""); break; case XMPP_PROTOCOL: - log.info("Sending request to read virtual-firealarm-temperature at : " + deviceIp + - " via " + - XMPP_PROTOCOL); - replyMsg = sendCommandViaXMPP(owner, deviceId, TEMPERATURE_CONTEXT, "."); -// replyMsg = requestTemperatureViaXMPP(response); + log.info("Sending request to read virtual-firealarm-temperature at : " + deviceIp + " via " + XMPP_PROTOCOL); + replyMsg = sendCommandViaXMPP(owner, deviceId, VirtualFireAlarmConstants.TEMPERATURE_CONTEXT, "."); break; default: if (protocol == null) { - log.info("Sending request to read virtual-firealarm-temperature at : " + deviceIp + - " via " + HTTP_PROTOCOL); - - replyMsg = sendCommandViaHTTP(deviceIp, 80, TEMPERATURE_CONTEXT, false); + log.info("Sending request to read virtual-firealarm-temperature at : " + deviceIp + " via " + HTTP_PROTOCOL); + replyMsg = sendCommandViaHTTP(deviceIp, 9090, VirtualFireAlarmConstants.TEMPERATURE_CONTEXT, false); } else { replyMsg = "Requested protocol '" + protocol + "' is not supported"; response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode()); @@ -690,55 +657,6 @@ public class VirtualFireAlarmService { return replyMsg; } - -// public String requestTemperatureViaXMPP(@Context HttpServletResponse response) { -// String replyMsg = ""; -// -// String sep = File.separator; -// String scriptsFolder = "repository" + sep + "resources" + sep + "scripts"; -// String scriptPath = CarbonUtils.getCarbonHome() + sep + scriptsFolder + sep -// + "xmpp_client.py -r Temperature"; -// String command = "python " + scriptPath; -// -// replyMsg = executeCommand(command); -// -// response.setStatus(HttpStatus.SC_OK); -// return replyMsg; -// } - - -// public String requestSonarViaXMPP(@Context HttpServletResponse response) { -// String replyMsg = ""; -// -// String sep = File.separator; -// String scriptsFolder = "repository" + sep + "resources" + sep + "scripts"; -// String scriptPath = CarbonUtils.getCarbonHome() + sep + scriptsFolder + sep -// + "xmpp_client.py -r Sonar"; -// String command = "python " + scriptPath; -// -// replyMsg = executeCommand(command); -// -// response.setStatus(HttpStatus.SC_OK); -// return replyMsg; -// } - - -// public String requestBulbChangeViaXMPP(String state, -// @Context HttpServletResponse response) { -// String replyMsg = ""; -// -// String sep = File.separator; -// String scriptsFolder = "repository" + sep + "resources" + sep + "scripts"; -// String scriptPath = CarbonUtils.getCarbonHome() + sep + scriptsFolder + sep -// + "xmpp_client.py -r Bulb -s " + state; -// String command = "python " + scriptPath; -// -// replyMsg = executeCommand(command); -// -// response.setStatus(HttpStatus.SC_OK); -// return replyMsg; -// } - @Path("controller/push_temperature") @POST @Consumes(MediaType.APPLICATION_JSON) @@ -753,12 +671,12 @@ public class VirtualFireAlarmService { if (registeredIp == null) { log.warn("Unregistered IP: Temperature Data Received from an un-registered IP " + - deviceIp + " for device ID - " + deviceId); + deviceIp + " for device ID - " + deviceId); response.setStatus(Response.Status.PRECONDITION_FAILED.getStatusCode()); return; } else if (!registeredIp.equals(deviceIp)) { log.warn("Conflicting IP: Received IP is " + deviceIp + ". Device with ID " + - deviceId + " is already registered under some other IP. Re-registration " + "required"); + deviceId + " is already registered under some other IP. Re-registration " + "required"); response.setStatus(Response.Status.CONFLICT.getStatusCode()); return; } @@ -957,7 +875,7 @@ public class VirtualFireAlarmService { try { result = deviceController.publishMqttControl(deviceOwner, - VirtualFireAlarmConstants.DEVICE_TYPE, + VirtualFireAlarmConstants.DEVICE_TYPE, deviceId, resource, state); } catch (DeviceControllerException e) { String errorMsg = "Error whilst trying to publish to MQTT Queue"; @@ -974,11 +892,11 @@ public class VirtualFireAlarmService { throws DeviceManagementException { if (deviceServerPort == 0) { - deviceServerPort = 80; + deviceServerPort = 9090; } String responseMsg = ""; - String urlString = URL_PREFIX + deviceIp + ":" + deviceServerPort + callUrlPattern; + String urlString = VirtualFireAlarmConstants.URL_PREFIX + deviceIp + ":" + deviceServerPort + callUrlPattern; if (log.isDebugEnabled()) { log.debug(urlString); @@ -1117,5 +1035,4 @@ public class VirtualFireAlarmService { return completeResponse.toString(); } - } diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/VirtualFireAlarmMQTTSubscriber.java b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/VirtualFireAlarmMQTTSubscriber.java new file mode 100644 index 00000000..ab5703f0 --- /dev/null +++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/VirtualFireAlarmMQTTSubscriber.java @@ -0,0 +1,81 @@ +package org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.wso2.carbon.device.mgt.common.DeviceManagementException; +import org.wso2.carbon.device.mgt.iot.common.controlqueue.mqtt.MqttConfig; +import org.wso2.carbon.device.mgt.iot.common.controlqueue.mqtt.MqttSubscriber; +import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.constants + .VirtualFireAlarmConstants; + +import java.io.File; +import java.util.UUID; + +public class VirtualFireAlarmMQTTSubscriber extends MqttSubscriber { + private static Log log = LogFactory.getLog(VirtualFireAlarmMQTTSubscriber.class); + + private static final String subscribeTopic = + "wso2" + File.separator + "iot" + File.separator + "+" + File.separator + + VirtualFireAlarmConstants.DEVICE_TYPE + File.separator + "+" + File.separator + + "reply"; + private static final String iotServerSubscriber = UUID.randomUUID().toString().substring(0, 5); + private static String mqttEndpoint; + + private VirtualFireAlarmMQTTSubscriber() { + super(iotServerSubscriber, VirtualFireAlarmConstants.DEVICE_TYPE, + MqttConfig.getInstance().getMqttQueueEndpoint(), subscribeTopic); + } + + public void initConnector() { + mqttEndpoint = MqttConfig.getInstance().getMqttQueueEndpoint(); + } + + public void connectAndSubscribe() { + try { + super.connectAndSubscribe(); + } catch (DeviceManagementException e) { + log.error("Subscription to MQTT Broker at: " + mqttEndpoint + " failed"); + retryMQTTSubscription(); + } + } + + @Override + protected void postMessageArrived(String topic, MqttMessage message) { + log.info("Message " + message.toString() + " was received for topic: " + topic); + } + + private void retryMQTTSubscription() { + Thread retryToSubscribe = new Thread() { + @Override + public void run() { + while (true) { + if (!isConnected()) { + if (log.isDebugEnabled()) { + log.debug("Subscriber re-trying to reach MQTT queue...."); + } + + try { + VirtualFireAlarmMQTTSubscriber.super.connectAndSubscribe(); + } catch (DeviceManagementException e1) { + if (log.isDebugEnabled()) { + log.debug("Attempt to re-connect to MQTT-Queue failed"); + } + } + } else { + break; + } + + try { + Thread.sleep(5000); + } catch (InterruptedException e1) { + log.error("MQTT: Thread S;eep Interrupt Exception"); + } + } + } + }; + + retryToSubscribe.setDaemon(true); + retryToSubscribe.start(); + } +} diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/VirtualFireAlarmXMPPConnector.java b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/VirtualFireAlarmXMPPConnector.java new file mode 100644 index 00000000..dce69034 --- /dev/null +++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/VirtualFireAlarmXMPPConnector.java @@ -0,0 +1,88 @@ +package org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jivesoftware.smack.packet.Message; +import org.wso2.carbon.device.mgt.common.DeviceManagementException; +import org.wso2.carbon.device.mgt.iot.common.controlqueue.xmpp.XmppConfig; +import org.wso2.carbon.device.mgt.iot.common.controlqueue.xmpp.XmppConnector; + +public class VirtualFireAlarmXMPPConnector extends XmppConnector { + private static Log log = LogFactory.getLog(VirtualFireAlarmXMPPConnector.class); + + private static String xmppServerIP; + // private static int xmppServerPort; + private static String xmppAdminUsername; + private static String xmppAdminPassword; + private static String xmppAdminAccountJID; + + private VirtualFireAlarmXMPPConnector() { + super(XmppConfig.getInstance().getXmppServerIP(), + XmppConfig.getInstance().getSERVER_CONNECTION_PORT()); + } + + public void initConnector() { + xmppServerIP = XmppConfig.getInstance().getXmppServerIP(); + xmppAdminUsername = XmppConfig.getInstance().getXmppUsername(); + xmppAdminPassword = XmppConfig.getInstance().getXmppPassword(); + xmppAdminAccountJID = xmppAdminUsername + "@" + xmppServerIP; + } + + public void connectAndLogin() { + try { + super.connectAndLogin(xmppAdminUsername, xmppAdminPassword, null); + super.setMessageFilterOnReceiver(xmppAdminAccountJID); + } catch (DeviceManagementException e) { + log.error("Connect/Login attempt to XMPP Server at: " + xmppServerIP + " failed"); + retryXMPPConnection(); + } + } + + @Override + protected void processXMPPMessage(Message xmppMessage) { + String from = xmppMessage.getFrom(); + String message = xmppMessage.getBody(); + log.info("Received XMPP message '" + message + "' from " + from); + } + + private void retryXMPPConnection() { + Thread retryToConnect = new Thread() { + @Override + public void run() { + + while (true) { + if (!isConnected()) { + if (log.isDebugEnabled()) { + log.debug("Re-trying to reach XMPP Server...."); + } + + try { + VirtualFireAlarmXMPPConnector.super.connectAndLogin(xmppAdminUsername, + xmppAdminPassword, + null); + VirtualFireAlarmXMPPConnector.super.setMessageFilterOnReceiver( + xmppAdminAccountJID); + } catch (DeviceManagementException e1) { + if (log.isDebugEnabled()) { + log.debug("Attempt to re-connect to XMPP-Server failed"); + } + } + } else { + break; + } + + try { + Thread.sleep(5000); + } catch (InterruptedException e1) { + log.error("XMPP: Thread Sleep Interrupt Exception"); + } + } + } + }; + + retryToConnect.setDaemon(true); + retryToConnect.start(); + } + + +} diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/mqtt/MQTTClient.java b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/mqtt/MQTTClient.java deleted file mode 100644 index 975d7c75..00000000 --- a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/mqtt/MQTTClient.java +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. - * - * WSO2 Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.mqtt; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.eclipse.paho.client.mqttv3.MqttSecurityException; -import org.wso2.carbon.device.mgt.common.DeviceManagementException; - -import java.io.File; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * This class contains the Agent specific implementation for all the MQTT functionality. This - * includes connecting to a MQTT Broker & subscribing to the appropriate MQTT-topic, action plan - * upon losing connection or successfully delivering a message to the broker and processing - * incoming messages. Makes use of the 'Paho-MQTT' library provided by Eclipse Org. - * <p/> - * It is an abstract class with an abstract method 'postMessageArrived' allowing the user to have - * their own implementation of the actions to be taken upon receiving a message to the subscribed - * MQTT-Topic. - */ -public abstract class MQTTClient implements MqttCallback { - private static final Log log = LogFactory.getLog(MQTTClient.class); - - private MqttClient client; - private String clientId; - private MqttConnectOptions options; - private String subscribeTopic; - private String clientWillTopic; - private String mqttBrokerEndPoint; - private int reConnectionInterval; - - /** - * Constructor for the MQTTClient which takes in the owner, type of the device and the MQTT - * Broker URL and the topic to subscribe. - * - * @param deviceOwner the owner of the device. - * @param deviceType the CDMF Device-Type of the device. - * @param mqttBrokerEndPoint the IP/URL of the MQTT broker endpoint. - * @param subscribeTopic the MQTT topic to which the client is to be subscribed - */ - protected MQTTClient(String deviceOwner, String deviceType, String mqttBrokerEndPoint, - String subscribeTopic) { - this.clientId = deviceOwner + ":" + deviceType; - this.subscribeTopic = subscribeTopic; - this.clientWillTopic = deviceType + File.separator + "disconnection"; - this.mqttBrokerEndPoint = mqttBrokerEndPoint; - this.reConnectionInterval = 5000; - this.initSubscriber(); - } - - /** - * Constructor for the MQTTClient which takes in the owner, type of the device and the MQTT - * Broker URL and the topic to subscribe. Additionally this constructor takes in the - * reconnection-time interval between successive attempts to connect to the broker. - * - * @param deviceOwner the owner of the device. - * @param deviceType the CDMF Device-Type of the device. - * @param mqttBrokerEndPoint the IP/URL of the MQTT broker endpoint. - * @param subscribeTopic the MQTT topic to which the client is to be subscribed - * @param reConnectionInterval time interval in SECONDS between successive attempts to connect - * to the broker. - */ - protected MQTTClient(String deviceOwner, String deviceType, String mqttBrokerEndPoint, - String subscribeTopic, int reConnectionInterval) { - this.clientId = deviceOwner + ":" + deviceType; - this.subscribeTopic = subscribeTopic; - this.clientWillTopic = deviceType + File.separator + "disconnection"; - this.mqttBrokerEndPoint = mqttBrokerEndPoint; - this.reConnectionInterval = reConnectionInterval; - this.initSubscriber(); - } - - /** - * Initializes the MQTT-Client. - * Creates a client using the given MQTT-broker endpoint and the clientId (which is - * constructed by a concatenation of [deviceOwner]:[deviceType]). Also sets the client's - * options parameter with the clientWillTopic (in-case of connection failure) and other info. - * Also sets the call-back this current class. - */ - private void initSubscriber() { - try { - client = new MqttClient(this.mqttBrokerEndPoint, clientId, null); - log.info("MQTT subscriber was created with ClientID : " + clientId); - } catch (MqttException ex) { - String errorMsg = "MQTT Client Error\n" + "\tReason: " + ex.getReasonCode() + - "\n\tMessage: " + ex.getMessage() + "\n\tLocalMsg: " + - ex.getLocalizedMessage() + "\n\tCause: " + ex.getCause() + - "\n\tException: " + ex; - log.error(errorMsg); - } - - options = new MqttConnectOptions(); - options.setCleanSession(false); - options.setWill(clientWillTopic, "connection crashed".getBytes(StandardCharsets.UTF_8), 2, - true); - client.setCallback(this); - } - - /** - * Checks whether the connection to the MQTT-Broker persists. - * - * @return true if the client is connected to the MQTT-Broker, else false. - */ - public boolean isConnected() { - return client.isConnected(); - } - - /** - * Connects to the MQTT-Broker and if successfully established connection, then tries to - * subscribe to the MQTT-Topic specific to the device. (The MQTT-Topic specific to the - * device is created is taken in as a constructor parameter of this class) . - * - * @throws DeviceManagementException in the event of 'Connecting to' or 'Subscribing to' the - * MQTT broker fails. - */ - public void connectAndSubscribe() throws DeviceManagementException { - try { - client.connect(options); - - if (log.isDebugEnabled()) { - log.debug("Subscriber connected to queue at: " + this.mqttBrokerEndPoint); - } - } catch (MqttSecurityException ex) { - String errorMsg = "MQTT Security Exception when connecting to queue\n" + "\tReason: " + - " " + - ex.getReasonCode() + "\n\tMessage: " + ex.getMessage() + - "\n\tLocalMsg: " + ex.getLocalizedMessage() + "\n\tCause: " + - ex.getCause() + "\n\tException: " + ex; //throw - if (log.isDebugEnabled()) { - log.debug(errorMsg); - } - throw new DeviceManagementException(errorMsg, ex); - - } catch (MqttException ex) { - String errorMsg = "MQTT Exception when connecting to queue\n" + "\tReason: " + - ex.getReasonCode() + "\n\tMessage: " + ex.getMessage() + - "\n\tLocalMsg: " + ex.getLocalizedMessage() + "\n\tCause: " + - ex.getCause() + "\n\tException: " + ex; //throw - if (log.isDebugEnabled()) { - log.debug(errorMsg); - } - throw new DeviceManagementException(errorMsg, ex); - } - - try { - client.subscribe(subscribeTopic, 0); - - log.info("Subscriber - " + clientId + " subscribed to topic: " + subscribeTopic); - } catch (MqttException ex) { - String errorMsg = "MQTT Exception when trying to subscribe to topic: " + - subscribeTopic + "\n\tReason: " + ex.getReasonCode() + - "\n\tMessage: " + ex.getMessage() + "\n\tLocalMsg: " + - ex.getLocalizedMessage() + "\n\tCause: " + ex.getCause() + - "\n\tException: " + ex; - if (log.isDebugEnabled()) { - log.debug(errorMsg); - } - throw new DeviceManagementException(errorMsg, ex); - } - } - - /** - * Callback method which is triggered once the MQTT client losers its connection to the broker. - * A scheduler thread is spawned to continuously re-attempt and connect to the broker and - * subscribe to the device's topic. This thread is scheduled to execute after every break - * equal to that of the 'reConnectionInterval' of the MQTTClient. - * - * @param throwable a Throwable Object containing the details as to why the failure occurred. - */ - public void connectionLost(Throwable throwable) { - log.warn("Lost Connection for client: " + this.clientId + " to " + this.mqttBrokerEndPoint + ".\nThis was due to - " + throwable.getMessage()); - - Runnable reSubscriber = new Runnable() { - @Override - public void run() { - if (!isConnected()) { - if (log.isDebugEnabled()) { - log.debug("Subscriber reconnecting to queue........"); - } - try { - connectAndSubscribe(); - } catch (DeviceManagementException e) { - if (log.isDebugEnabled()) { - log.debug("Could not reconnect and subscribe to ControlQueue."); - } - } - } else { - return; - } - } - }; - - ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); - service.scheduleAtFixedRate(reSubscriber, 0, this.reConnectionInterval, TimeUnit.SECONDS); - } - - /** - * Callback method which is triggered upon receiving a MQTT Message from the broker. Spawns a - * new thread that executes any actions to be taken with the received message. - * - * @param topic the MQTT-Topic to which the received message was published to and the - * client was subscribed to. - * @param mqttMessage the actual MQTT-Message that was received from the broker. - */ - public void messageArrived(final String topic, final MqttMessage mqttMessage) { - Thread subscriberThread = new Thread() { - public void run() { - postMessageArrived(topic, mqttMessage); - } - }; - subscriberThread.start(); - } - - /** - * Callback method which gets triggered upon successful completion of a message delivery to - * the broker. - * - * @param iMqttDeliveryToken the MQTT-DeliveryToken which includes the details about the - * specific message delivery. - */ - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - String message = ""; - try { - message = iMqttDeliveryToken.getMessage().toString(); - } catch (MqttException e) { - log.error("Error occurred whilst trying to read the message from the MQTT delivery token."); - } - String topic = iMqttDeliveryToken.getTopics()[0]; - String client = iMqttDeliveryToken.getClient().getClientId(); - log.info("Message - '" + message + "' of client [" + client + "] for the topic (" + topic + ") was delivered successfully."); - } - - /** - * This is an abstract method used for post processing the received MQTT-message. This - * method will be implemented as per requirement at the time of creating an object of this - * class. - * - * @param topic The topic for which the message was received for. - * @param message The message received for the subscription to the above topic. - */ - protected abstract void postMessageArrived(String topic, MqttMessage message); - - /** - * Gets the MQTTClient object. - * - * @return the MQTTClient object. - */ - public MqttClient getClient() { - return client; - } - -} - diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/xmpp/XMPPClient.java b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/xmpp/XMPPClient.java deleted file mode 100644 index 8391b1cb..00000000 --- a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/xmpp/XMPPClient.java +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. - * - * WSO2 Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.xmpp; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.jivesoftware.smack.ConnectionConfiguration; -import org.jivesoftware.smack.PacketListener; -import org.jivesoftware.smack.SmackConfiguration; -import org.jivesoftware.smack.XMPPConnection; -import org.jivesoftware.smack.XMPPException; -import org.jivesoftware.smack.filter.AndFilter; -import org.jivesoftware.smack.filter.FromContainsFilter; -import org.jivesoftware.smack.filter.PacketFilter; -import org.jivesoftware.smack.filter.PacketTypeFilter; -import org.jivesoftware.smack.packet.Message; -import org.jivesoftware.smack.packet.Packet; -import org.wso2.carbon.device.mgt.common.DeviceManagementException; - -/** - * This class contains the Agent specific implementation for all the XMPP functionality. This - * includes connecting to a XMPP Server & Login using the device's XMPP-Account, Setting - * listeners and filters on incoming XMPP messages and Sending XMPP replies for control signals - * received. Makes use of the 'Smack-XMPP' library provided by jivesoftware/igniterealtime. - * <p/> - * It is an abstract class with an abstract method 'processXMPPMessage' allowing the user to have - * their own implementation of the actions to be taken upon receiving an appropriate XMPP message. - */ -public abstract class XMPPClient { - private static final Log log = LogFactory.getLog(XMPPClient.class); - - private int replyTimeoutInterval = 500; // millis - private String server; - private int port; - private ConnectionConfiguration config; - private XMPPConnection connection; - private PacketFilter filter; - private PacketListener listener; - - /** - * Constructor for XMPPClient passing server-IP and the XMPP-port. - * - * @param server the IP of the XMPP server. - * @param port the XMPP server's port to connect to. (default - 5222) - */ - public XMPPClient(String server, int port) { - this.server = server; - this.port = port; - initXMPPClient(); - } - - /** - * Initializes the XMPP Client. - * Sets the time-out-limit whilst waiting for XMPP-replies from server. Creates the XMPP - * configurations to connect to the server and creates the XMPPConnection object used for - * connecting and Logging-In. - */ - private void initXMPPClient() { - log.info(String.format( - "Initializing connection to XMPP Server at %1$s via port %2$d......", server, - port)); - SmackConfiguration.setPacketReplyTimeout(replyTimeoutInterval); - config = new ConnectionConfiguration(server, port); -// TODO:: Need to enable SASL-Authentication appropriately - config.setSASLAuthenticationEnabled(false); - config.setSecurityMode(ConnectionConfiguration.SecurityMode.disabled); - connection = new XMPPConnection(config); - } - - /** - * Connects to the XMPP-Server and if successfully established connection, then tries to Log - * in using the device's XMPP Account credentials. (The XMPP-Account specific to the device is - * created in the XMPP server whilst downloading the Agent from the IoT Server) . - * - * @param username the username of the device's XMPP-Account. - * @param password the password of the device's XMPP-Account. - * @param resource the resource the resource, specific to the XMPP-Account to which the login - * is made to - * @throws DeviceManagementException in the event of 'Connecting to' or 'Logging into' the - * XMPP server fails. - */ - public void connectAndLogin(String username, String password, String resource) - throws DeviceManagementException { - try { - connection.connect(); - log.info(String.format( - "Connection to XMPP Server at %1$s established successfully......", server)); - - } catch (XMPPException xmppExcepion) { - String errorMsg = - "Connection attempt to the XMPP Server at " + server + " via port " + port + - " failed."; - log.info(errorMsg); - throw new DeviceManagementException(errorMsg, xmppExcepion); - } - - if (connection.isConnected()) { - try { - if (resource == null) { - connection.login(username, password); - log.info(String.format("Logged into XMPP Server at %1$s as user %2$s......", server, username)); - } else { - connection.login(username, password, resource); - log.info(String.format( - "Logged into XMPP Server at %1$s as user %2$s on resource %3$s......", - server, username, resource)); - } - } catch (XMPPException xmppException) { - String errorMsg = - "Login attempt to the XMPP Server at " + server + " with username - " + - username + " failed."; - log.info(errorMsg); - throw new DeviceManagementException(errorMsg, xmppException); - } - } - } - - /** - * Sets a filter on all the incoming XMPP-Messages for the JID (XMPP-Account ID) passed in. - * Also creates a listener for the incoming messages and connects the listener to the - * XMPPConnection alongside the set filter. - * - * @param senderJID the JID (XMPP-Account ID) to which the filter is to be set. - */ - public void setMessageFilterAndListener(String senderJID) { - filter = new AndFilter(new PacketTypeFilter(Message.class), new - FromContainsFilter( - senderJID)); - listener = new PacketListener() { - @Override - public void processPacket(Packet packet) { - if (packet instanceof Message) { - final Message xmppMessage = (Message) packet; - Thread msgProcessThread = new Thread() { - public void run() { - processXMPPMessage(xmppMessage); - } - }; - msgProcessThread.start(); - } - } - }; - - connection.addPacketListener(listener, filter); - } - - - - /** - * Sends an XMPP message - * - * @param JID the JID (XMPP Account ID) to which the message is to be sent to. - * @param message the XMPP-Message that is to be sent. - */ - public void sendXMPPMessage(String JID, String message) { - sendXMPPMessage(JID, message, "Reply-From-Device"); - if (log.isDebugEnabled()) { - log.debug("Message: " + message + " to XMPP JID [" + JID + "] sent successfully"); - } - } - - /** - * Overloaded method to send an XMPP message. Includes the subject to be mentioned in the - * message that is sent. - * - * @param JID the JID (XMPP Account ID) to which the message is to be sent to. - * @param message the XMPP-Message that is to be sent. - * @param subject the subject that the XMPP-Message would carry. - */ - public void sendXMPPMessage(String JID, String message, String subject) { - Message xmppMessage = new Message(); - xmppMessage.setTo(JID); - xmppMessage.setSubject(subject); - xmppMessage.setBody(message); - xmppMessage.setType(Message.Type.chat); - connection.sendPacket(xmppMessage); - } - - /** - * Checks whether the connection to the XMPP-Server persists. - * - * @return true if the client is connected to the XMPP-Server, else false. - */ - public boolean isConnected() { - return connection.isConnected(); - } - - /** - * Sets the client's time-out-limit whilst waiting for XMPP-replies from server. - * - * @param millis the time in millis to be set as the time-out-limit whilst waiting for a - * XMPP-reply. - */ - public void setReplyTimeoutInterval(int millis) { - this.replyTimeoutInterval = millis; - } - - /** - * Disables default debugger provided by the XMPPConnection. - */ - public void disableDebugger() { - connection.DEBUG_ENABLED = false; - } - - /** - * Closes the connection to the XMPP Server. - */ - public void closeConnection() { - if (connection != null && connection.isConnected()) { - connection.disconnect(); - } - } - - /** - * This is an abstract method used for post processing the received XMPP-message. This - * method will be implemented as per requirement at the time of creating an object of this - * class. - * - * @param xmppMessage the xmpp message received by the listener. - */ - protected abstract void processXMPPMessage(Message xmppMessage); - -} - - diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/webapp/WEB-INF/cxf-servlet.xml b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/webapp/WEB-INF/cxf-servlet.xml index 83787aff..e4caa0b6 100644 --- a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/webapp/WEB-INF/cxf-servlet.xml +++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/webapp/WEB-INF/cxf-servlet.xml @@ -17,37 +17,45 @@ --> <beans xmlns="http://www.springframework.org/schema/beans" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xmlns:jaxrs="http://cxf.apache.org/jaxrs" - xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns:jaxrs="http://cxf.apache.org/jaxrs" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://cxf.apache.org/jaxrs http://cxf.apache.org/schemas/jaxrs.xsd"> - <!--<jaxrs:server id="FireAlarmController" address="/controller">--> - <!--<jaxrs:serviceBeans>--> - <!--<bean id="VirtualFireAlarmControllerService"--> - <!--class="org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.VirtualFireAlarmControllerService">--> - <!--<!–<property name="mqttFireAlarmSubscriber" ref="mqttSubscriber"/>–>--> - <!--</bean>--> - <!--</jaxrs:serviceBeans>--> - <!--<jaxrs:providers>--> - <!--<bean class="org.codehaus.jackson.jaxrs.JacksonJsonProvider" />--> - <!--</jaxrs:providers>--> - <!--</jaxrs:server>--> - - <jaxrs:server id="VirtualFireAlarm" address="/"> - <jaxrs:serviceBeans> - <bean id="VirtualFireAlarmService" - class="org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.VirtualFireAlarmService"/> - </jaxrs:serviceBeans> - <jaxrs:providers> - <bean class="org.codehaus.jackson.jaxrs.JacksonJsonProvider" /> - </jaxrs:providers> - </jaxrs:server> - - - <!--<bean id="mqttSubscriber" class="org.wso2.carbon.device.mgt.iot.firealarm.api.util.MQTTFirealarmSubscriber" >--> - <!-- --> - <!--</bean>--> + <!--<jaxrs:server id="VirtualFireAlarmController" address="/controller">--> + <!--<jaxrs:serviceBeans>--> + <!--<bean id="VirtualFireAlarmControllerService"--> + <!--class="org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.VirtualFireAlarmControllerService">--> + <!--<property name="virtualFireAlarmMQTTSubscriber" ref="mqttSubscriberBean"/>--> + <!--<property name="virtualFireAlarmXMPPConnector" ref="xmppConnectorBean"/>--> + <!--</bean>--> + <!--</jaxrs:serviceBeans>--> + <!--<jaxrs:providers>--> + <!--<bean class="org.codehaus.jackson.jaxrs.JacksonJsonProvider" />--> + <!--</jaxrs:providers>--> + <!--</jaxrs:server>--> + + <jaxrs:server id="VirtualFireAlarm" address="/"> + <jaxrs:serviceBeans> + <bean id="VirtualFireAlarmService" + class="org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.VirtualFireAlarmService"> + <property name="virtualFireAlarmMQTTSubscriber" ref="mqttSubscriberBean"/> + <property name="virtualFireAlarmXMPPConnector" ref="xmppConnectorBean"/> + </bean> + </jaxrs:serviceBeans> + <jaxrs:providers> + <bean class="org.codehaus.jackson.jaxrs.JacksonJsonProvider"/> + </jaxrs:providers> + </jaxrs:server> + + + <bean id="mqttSubscriberBean" + class="org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.VirtualFireAlarmMQTTSubscriber"> + </bean> + <bean id="xmppConnectorBean" + class="org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.VirtualFireAlarmXMPPConnector"> + </bean> + </beans> diff --git a/pom.xml b/pom.xml index 2e0e8f13..faca78c4 100644 --- a/pom.xml +++ b/pom.xml @@ -804,6 +804,19 @@ <artifactId>commons-httpclient</artifactId> <version>${orbit.version.commons-httpclient}</version> </dependency> + + <!--Dependencies on XMPP Client Library--> + <dependency> + <groupId>org.igniterealtime.smack.wso2</groupId> + <artifactId>smack</artifactId> + <version>${smack.wso2.version}</version> + </dependency> + <dependency> + <groupId>org.igniterealtime.smack.wso2</groupId> + <artifactId>smackx</artifactId> + <version>${smackx.wso2.version}</version> + </dependency> + </dependencies> </dependencyManagement> @@ -927,6 +940,10 @@ <commons-io.version>2.4</commons-io.version> <jsr311-api.version>1.1.1</jsr311-api.version> <commons-json.version>2.0.0.wso2v1</commons-json.version> + + <!--XMPP/MQTT Version--> + <smack.wso2.version>3.0.4.wso2v1</smack.wso2.version> + <smackx.wso2.version>3.0.4.wso2v1</smackx.wso2.version> </properties> <scm>