From 75b4e5ff27393069981f1fc0dff8a4cfa6f1a27f Mon Sep 17 00:00:00 2001 From: Shabirmean Date: Tue, 20 Oct 2015 20:21:02 +0530 Subject: [PATCH] changes to the VirtualFireAlarm service to send and receive XMPP,MQTT messages --- .../service/impl/VirtualFireAlarmService.java | 470 +++++++----------- .../util/VirtualFireAlarmMQTTSubscriber.java | 19 +- .../util/VirtualFireAlarmXMPPConnector.java | 19 +- 3 files changed, 213 insertions(+), 295 deletions(-) diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmService.java b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmService.java index fb24456b..bb157fc2 100644 --- a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmService.java +++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/VirtualFireAlarmService.java @@ -52,6 +52,7 @@ import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util .VirtualFireAlarmXMPPConnector; import org.wso2.carbon.utils.CarbonUtils; +import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; @@ -107,17 +108,37 @@ public class VirtualFireAlarmService { new ConcurrentHashMap(); public void setVirtualFireAlarmXMPPConnector( - VirtualFireAlarmXMPPConnector virtualFireAlarmXMPPConnector) { + final VirtualFireAlarmXMPPConnector virtualFireAlarmXMPPConnector) { this.virtualFireAlarmXMPPConnector = virtualFireAlarmXMPPConnector; - virtualFireAlarmXMPPConnector.initConnector(); - virtualFireAlarmXMPPConnector.connectAndLogin(); + + Runnable mqttStarter = new Runnable() { + @Override + public void run() { + virtualFireAlarmXMPPConnector.initConnector(); + virtualFireAlarmXMPPConnector.connectAndLogin(); + } + }; + + Thread mqttStarterThread = new Thread(mqttStarter); + mqttStarterThread.setDaemon(true); + mqttStarterThread.start(); } public void setVirtualFireAlarmMQTTSubscriber( - VirtualFireAlarmMQTTSubscriber virtualFireAlarmMQTTSubscriber) { + final VirtualFireAlarmMQTTSubscriber virtualFireAlarmMQTTSubscriber) { this.virtualFireAlarmMQTTSubscriber = virtualFireAlarmMQTTSubscriber; - virtualFireAlarmMQTTSubscriber.initConnector(); - virtualFireAlarmMQTTSubscriber.connectAndSubscribe(); + + Runnable xmppStarter = new Runnable() { + @Override + public void run() { + virtualFireAlarmMQTTSubscriber.initConnector(); + virtualFireAlarmMQTTSubscriber.connectAndSubscribe(); + } + }; + + Thread xmppStarterThread = new Thread(xmppStarter); + xmppStarterThread.setDaemon(true); + xmppStarterThread.start(); } public VirtualFireAlarmXMPPConnector getVirtualFireAlarmXMPPConnector() { @@ -128,6 +149,10 @@ public class VirtualFireAlarmService { return virtualFireAlarmMQTTSubscriber; } + /* --------------------------------------------------------------------------------------- + Device management specific APIs + Also contains utility methods required for the execution of these APIs + --------------------------------------------------------------------------------------- */ @Path("manager/device/register") @PUT public boolean register(@QueryParam("deviceId") String deviceId, @@ -413,18 +438,27 @@ public class VirtualFireAlarmService { return Long.toString(l, Character.MAX_RADIX); } - @Path("controller/register/{owner}/{deviceId}/{ip}") + /* --------------------------------------------------------------------------------------- + Device specific APIs - Control APIs + Data-Publishing APIs + Also contains utility methods required for the execution of these APIs + --------------------------------------------------------------------------------------- */ + @Path("controller/register/{owner}/{deviceId}/{ip}/{port}") @POST public String registerDeviceIP(@PathParam("owner") String owner, @PathParam("deviceId") String deviceId, @PathParam("ip") String deviceIP, - @Context HttpServletResponse response) { + @PathParam("port") String devicePort, + @Context HttpServletResponse response, + @Context HttpServletRequest request) { + + //TODO:: Need to get IP from the request itself String result; log.info("Got register call from IP: " + deviceIP + " for Device ID: " + deviceId + " of owner: " + owner); - deviceToIpMap.put(deviceId, deviceIP); + String deviceHttpEndpoint = deviceIP + ":" + devicePort; + deviceToIpMap.put(deviceId, deviceHttpEndpoint); result = "Device-IP Registered"; response.setStatus(Response.Status.OK.getStatusCode()); @@ -436,7 +470,6 @@ public class VirtualFireAlarmService { return result; } - /* Service to switch "ON" and "OFF" the Virtual FireAlarm bulb Called by an external client intended to control the Virtual FireAlarm bulb */ @Path("controller/bulb/{state}") @@ -469,45 +502,34 @@ public class VirtualFireAlarmService { return; } - String deviceIP = deviceToIpMap.get(deviceId); - if (deviceIP == null) { - response.setStatus(Response.Status.PRECONDITION_FAILED.getStatusCode()); - return; - } - String protocolString = protocol.toUpperCase(); String callUrlPattern = VirtualFireAlarmConstants.BULB_CONTEXT + switchToState; - log.info("Sending command: '" + callUrlPattern + "' to virtual-firealarm at: " + deviceIP + - " " + - "via" + " " + protocolString); + log.info("Sending request to switch-bulb of device [" + deviceId + "] via " + protocolString); try { switch (protocolString) { case HTTP_PROTOCOL: - sendCommandViaHTTP(deviceIP, 9090, callUrlPattern, true); + String deviceHTTPEndpoint = deviceToIpMap.get(deviceId); + if (deviceHTTPEndpoint == null) { + response.setStatus(Response.Status.PRECONDITION_FAILED.getStatusCode()); + return; + } + + sendCommandViaHTTP(deviceHTTPEndpoint, callUrlPattern, true); break; case MQTT_PROTOCOL: - sendCommandViaMQTT(owner, deviceId, - VirtualFireAlarmConstants.BULB_CONTEXT.replace("/", ""), - switchToState); + sendCommandViaMQTT(owner, deviceId, VirtualFireAlarmConstants.BULB_CONTEXT.replace("/", ""), switchToState); break; case XMPP_PROTOCOL: - sendCommandViaXMPP(owner, deviceId, VirtualFireAlarmConstants.BULB_CONTEXT, - switchToState); + sendCommandViaXMPP(owner, deviceId, VirtualFireAlarmConstants.BULB_CONTEXT, switchToState); break; default: - if (protocolString == null) { - sendCommandViaHTTP(deviceIP, 9090, callUrlPattern, true); - } else { - response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode()); - return; - } - break; + response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode()); + return; } } catch (DeviceManagementException e) { - log.error("Failed to send command '" + callUrlPattern + "' to: " + deviceIP + " via" + - " " + protocol); + log.error("Failed to send switch-bulb request to device [" + deviceId + "] via " + protocolString); response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); return; } @@ -526,9 +548,7 @@ public class VirtualFireAlarmService { DeviceValidator deviceValidator = new DeviceValidator(); try { - if (!deviceValidator.isExist(owner, SUPER_TENANT, new DeviceIdentifier(deviceId, - VirtualFireAlarmConstants - .DEVICE_TYPE))) { + if (!deviceValidator.isExist(owner, SUPER_TENANT, new DeviceIdentifier(deviceId, VirtualFireAlarmConstants.DEVICE_TYPE))) { response.setStatus(Response.Status.UNAUTHORIZED.getStatusCode()); return "Unauthorized Access"; } @@ -538,51 +558,34 @@ public class VirtualFireAlarmService { return replyMsg; } - String deviceIp = deviceToIpMap.get(deviceId); - - if (deviceIp == null) { - replyMsg = "IP not registered for device: " + deviceId + " of owner: " + owner; - response.setStatus(Response.Status.PRECONDITION_FAILED.getStatusCode()); - return replyMsg; - } + String protocolString = protocol.toUpperCase(); + log.info("Sending request to read sonar value of device [" + deviceId + "] via " + protocolString); try { - switch (protocol) { + switch (protocolString) { case HTTP_PROTOCOL: - log.info("Sending request to read sonar value at : " + deviceIp + " via " + - HTTP_PROTOCOL); - replyMsg = sendCommandViaHTTP(deviceIp, 9090, - VirtualFireAlarmConstants.SONAR_CONTEXT, false); + String deviceHTTPEndpoint = deviceToIpMap.get(deviceId); + if (deviceHTTPEndpoint == null) { + replyMsg = "IP not registered for device: " + deviceId + " of owner: " + owner; + response.setStatus(Response.Status.PRECONDITION_FAILED.getStatusCode()); + return replyMsg; + } + + replyMsg = sendCommandViaHTTP(deviceHTTPEndpoint, VirtualFireAlarmConstants.SONAR_CONTEXT, false); break; case MQTT_PROTOCOL: - log.info("Sending request to read sonar value at : " + deviceIp + " via " + - MQTT_PROTOCOL); - sendCommandViaMQTT(owner, deviceId, - VirtualFireAlarmConstants.SONAR_CONTEXT.replace("/", ""), - ""); + sendCommandViaMQTT(owner, deviceId, VirtualFireAlarmConstants.SONAR_CONTEXT.replace("/", ""), ""); break; case XMPP_PROTOCOL: - log.info("Sending request to read sonar value at : " + deviceIp + " via " + - XMPP_PROTOCOL); - replyMsg = sendCommandViaXMPP(owner, deviceId, - VirtualFireAlarmConstants.SONAR_CONTEXT, "."); + replyMsg = sendCommandViaXMPP(owner, deviceId, VirtualFireAlarmConstants.SONAR_CONTEXT, ""); break; default: - if (protocol == null) { - log.info("Sending request to read sonar value at : " + deviceIp + " via " + - HTTP_PROTOCOL); - replyMsg = sendCommandViaHTTP(deviceIp, 9090, - VirtualFireAlarmConstants.SONAR_CONTEXT, - false); - } else { - replyMsg = "Requested protocol '" + protocol + "' is not supported"; - response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode()); - return replyMsg; - } - break; + replyMsg = "Requested protocol '" + protocolString + "' is not supported"; + response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode()); + return replyMsg; } } catch (DeviceManagementException e) { replyMsg = e.getErrorMessage(); @@ -606,9 +609,7 @@ public class VirtualFireAlarmService { DeviceValidator deviceValidator = new DeviceValidator(); try { - if (!deviceValidator.isExist(owner, SUPER_TENANT, new DeviceIdentifier(deviceId, - VirtualFireAlarmConstants - .DEVICE_TYPE))) { + if (!deviceValidator.isExist(owner, SUPER_TENANT, new DeviceIdentifier(deviceId, VirtualFireAlarmConstants.DEVICE_TYPE))) { response.setStatus(Response.Status.UNAUTHORIZED.getStatusCode()); return "Unauthorized Access"; } @@ -618,55 +619,35 @@ public class VirtualFireAlarmService { return replyMsg; } - String deviceIp = deviceToIpMap.get(deviceId); + String protocolString = protocol.toUpperCase(); - if (deviceIp == null) { - replyMsg = "IP not registered for device: " + deviceId + " of owner: " + owner; - response.setStatus(Response.Status.PRECONDITION_FAILED.getStatusCode()); - return replyMsg; - } + log.info("Sending request to read virtual-firealarm-temperature of device [" + deviceId + "] via " + protocolString); try { - switch (protocol) { + switch (protocolString) { case HTTP_PROTOCOL: - log.info("Sending request to read virtual-firealarm-temperature at : " + - deviceIp + " via " + HTTP_PROTOCOL); - replyMsg = sendCommandViaHTTP(deviceIp, 9090, - VirtualFireAlarmConstants.TEMPERATURE_CONTEXT, - false); + String deviceHTTPEndpoint = deviceToIpMap.get(deviceId); + if (deviceHTTPEndpoint == null) { + replyMsg = "IP not registered for device: " + deviceId + " of owner: " + owner; + response.setStatus(Response.Status.PRECONDITION_FAILED.getStatusCode()); + return replyMsg; + } + + replyMsg = sendCommandViaHTTP(deviceHTTPEndpoint, VirtualFireAlarmConstants.TEMPERATURE_CONTEXT, false); break; case MQTT_PROTOCOL: - log.info("Sending request to read virtual-firealarm-temperature at : " + - deviceIp + " via " + MQTT_PROTOCOL); - sendCommandViaMQTT(owner, deviceId, - VirtualFireAlarmConstants.TEMPERATURE_CONTEXT.replace("/", - ""), - ""); + sendCommandViaMQTT(owner, deviceId, VirtualFireAlarmConstants.TEMPERATURE_CONTEXT.replace("/", ""), ""); break; case XMPP_PROTOCOL: - log.info("Sending request to read virtual-firealarm-temperature at : " + - deviceIp + " via " + XMPP_PROTOCOL); - replyMsg = sendCommandViaXMPP(owner, deviceId, - VirtualFireAlarmConstants.TEMPERATURE_CONTEXT, - "."); + replyMsg = sendCommandViaXMPP(owner, deviceId, VirtualFireAlarmConstants.TEMPERATURE_CONTEXT, ""); break; default: - if (protocol == null) { - log.info("Sending request to read virtual-firealarm-temperature at : " + - deviceIp + " via " + HTTP_PROTOCOL); - replyMsg = sendCommandViaHTTP(deviceIp, 9090, - VirtualFireAlarmConstants - .TEMPERATURE_CONTEXT, - false); - } else { - replyMsg = "Requested protocol '" + protocol + "' is not supported"; - response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode()); - return replyMsg; - } - break; + replyMsg = "Requested protocol '" + protocolString + "' is not supported"; + response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode()); + return replyMsg; } } catch (DeviceManagementException e) { replyMsg = e.getErrorMessage(); @@ -682,8 +663,7 @@ public class VirtualFireAlarmService { @Path("controller/push_temperature") @POST @Consumes(MediaType.APPLICATION_JSON) - public void pushTemperatureData( - final DeviceJSON dataMsg, @Context HttpServletResponse response) { + public void pushTemperatureData(final DeviceJSON dataMsg, @Context HttpServletResponse response) { boolean result; String deviceId = dataMsg.deviceId; String deviceIp = dataMsg.reply; @@ -708,11 +688,9 @@ public class VirtualFireAlarmService { PrivilegedCarbonContext.startTenantFlow(); PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext(); ctx.setTenantDomain(SUPER_TENANT, true); - DeviceAnalyticsService deviceAnalyticsService = (DeviceAnalyticsService) ctx - .getOSGiService(DeviceAnalyticsService.class, null); - Object metdaData[] = - {dataMsg.owner, VirtualFireAlarmConstants.DEVICE_TYPE, dataMsg.deviceId, - System.currentTimeMillis()}; + DeviceAnalyticsService deviceAnalyticsService = (DeviceAnalyticsService) ctx.getOSGiService( + DeviceAnalyticsService.class, null); + Object metdaData[] = {dataMsg.owner, VirtualFireAlarmConstants.DEVICE_TYPE, dataMsg.deviceId, System.currentTimeMillis()}; Object payloadData[] = {temperature}; try { deviceAnalyticsService.publishEvent(TEMPERATURE_STREAM_DEFINITION, "1.0.0", @@ -723,151 +701,8 @@ public class VirtualFireAlarmService { } finally { PrivilegedCarbonContext.endTenantFlow(); } - - - } - - - /* Service to push all the sensor data collected by the FireAlarm - Called by the FireAlarm device */ - -// @Path("/pushalarmdata") -// @POST -// @Consumes(MediaType.APPLICATION_JSON) -// public void pushAlarmData(final DeviceJSON dataMsg, @Context HttpServletResponse response) { -// boolean result; -// String sensorValues = dataMsg.value; -// log.info("Recieved Sensor Data Values: " + sensorValues); -// -// String sensors[] = sensorValues.split(":"); -// try { -// if (sensors.length == 3) { -// String temperature = sensors[0]; -// String bulb = sensors[1]; -// String sonar = sensors[2]; - -// sensorValues = "Temperature:" + temperature + "C\tBulb Status:" + bulb + -// "\t\tSonar Status:" + sonar; -// log.info(sensorValues); -// DeviceController deviceController = new DeviceController(); -// result = deviceController.pushBamData(dataMsg.owner, FireAlarmConstants -// .DEVICE_TYPE, -// dataMsg.deviceId, -// System.currentTimeMillis(), "DeviceData", -// temperature, "TEMPERATURE"); -// -// if (!result) { -// response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); -// log.error("Error whilst pushing temperature: " + sensorValues); -// return; -// } -// -// result = deviceController.pushBamData(dataMsg.owner, FireAlarmConstants -// .DEVICE_TYPE, -// dataMsg.deviceId, -// System.currentTimeMillis(), "DeviceData", -// bulb, -// "BULB"); -// -// if (!result) { -// response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); -// log.error("Error whilst pushing Bulb data: " + sensorValues); -// return; -// } -// -// result = deviceController.pushBamData(dataMsg.owner, FireAlarmConstants -// .DEVICE_TYPE, -// dataMsg.deviceId, -// System.currentTimeMillis(), "DeviceData", -// sonar, -// "SONAR"); -// -// if (!result) { -// response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); -// log.error("Error whilst pushing Sonar data: " + sensorValues); -// } -// -// } else { -// DeviceController deviceController = new DeviceController(); -// result = deviceController.pushBamData(dataMsg.owner, FireAlarmConstants -// .DEVICE_TYPE, -// dataMsg.deviceId, -// System.currentTimeMillis(), "DeviceData", -// dataMsg.value, dataMsg.reply); -// if (!result) { -// response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); -// log.error("Error whilst pushing sensor data: " + sensorValues); -// } -// } -// -// } catch (UnauthorizedException e) { -// response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); -// log.error("Data Push Attempt Failed at Publisher: " + e.getMessage()); -// } -// } - - - private String sendCommandViaXMPP(String deviceOwner, String deviceId, String resource, - String state) throws DeviceManagementException { - - String replyMsg = ""; - String scriptArguments = ""; - String command = ""; - - String seperator = File.separator; - String xmppServerURL = XmppConfig.getInstance().getXmppEndpoint(); - int indexOfChar = xmppServerURL.lastIndexOf(seperator); - if (indexOfChar != -1) { - xmppServerURL = xmppServerURL.substring((indexOfChar + 1), xmppServerURL.length()); - } - - indexOfChar = xmppServerURL.indexOf(":"); - if (indexOfChar != -1) { - xmppServerURL = xmppServerURL.substring(0, indexOfChar); - } - - String xmppAdminUName = XmppConfig.getInstance().getXmppUsername(); - String xmppAdminPass = XmppConfig.getInstance().getXmppPassword(); - - String xmppAdminUserLogin = xmppAdminUName + "@" + xmppServerURL + seperator + deviceOwner; - String clientToConnect = deviceId + "@" + xmppServerURL + seperator + deviceOwner; - - String scriptsFolder = "repository" + seperator + "resources" + seperator + "scripts"; - String scriptPath = CarbonUtils.getCarbonHome() + seperator + scriptsFolder + seperator - + "xmpp_client.py "; - - scriptArguments = - "-j " + xmppAdminUserLogin + " -p " + xmppAdminPass + " -c " + clientToConnect + - " -r " + resource + " -s " + state; - command = "python " + scriptPath + scriptArguments; - - if (log.isDebugEnabled()) { - log.debug("Connecting to XMPP Server via Admin credentials: " + xmppAdminUserLogin); - log.debug("Trying to contact xmpp device account: " + clientToConnect); - log.debug("Arguments used for the scripts: '" + scriptArguments + "'"); - log.debug("Command exceuted: '" + command + "'"); - } - -// switch (resource) { -// case BULB_CONTEXT: -// scriptArguments = "-r Bulb -s " + state; -// command = "python " + scriptPath + scriptArguments; -// break; -// case SONAR_CONTEXT: -// scriptArguments = "-r Sonar"; -// command = "python " + scriptPath + scriptArguments; -// break; -// case TEMPERATURE_CONTEXT: -// scriptArguments = "-r Temperature"; -// command = "python " + scriptPath + scriptArguments; -// break; -// } - - replyMsg = executeCommand(command); - return replyMsg; } - private String executeCommand(String command) { StringBuffer output = new StringBuffer(); @@ -891,39 +726,10 @@ public class VirtualFireAlarmService { } - - private boolean sendCommandViaMQTT(String deviceOwner, String deviceId, String resource, - String state) throws DeviceManagementException { - - boolean result = false; - DeviceController deviceController = new DeviceController(); - - try { - result = deviceController.publishMqttControl(deviceOwner, - VirtualFireAlarmConstants.DEVICE_TYPE, - deviceId, resource, state); - } catch (DeviceControllerException e) { - String errorMsg = "Error whilst trying to publish to MQTT Queue"; - log.error(errorMsg); - throw new DeviceManagementException(errorMsg, e); - } - return result; - } - - - private String sendCommandViaHTTP(final String deviceIp, int deviceServerPort, - String callUrlPattern, - boolean fireAndForgot) - throws DeviceManagementException { - - if (deviceServerPort == 0) { - deviceServerPort = 9090; - } + private String sendCommandViaHTTP(final String deviceHTTPEndpoint, String urlContext, boolean fireAndForgot) throws DeviceManagementException { String responseMsg = ""; - String urlString = - VirtualFireAlarmConstants.URL_PREFIX + deviceIp + ":" + deviceServerPort + - callUrlPattern; + String urlString = VirtualFireAlarmConstants.URL_PREFIX + deviceHTTPEndpoint + urlContext; if (log.isDebugEnabled()) { log.debug(urlString); @@ -988,13 +794,93 @@ public class VirtualFireAlarmService { } } } - } return responseMsg; } - /* Utility methods relevant to creating and sending http requests */ + private boolean sendCommandViaMQTT(String deviceOwner, String deviceId, String resource, String state) throws DeviceManagementException { + + boolean result = false; + DeviceController deviceController = new DeviceController(); + + try { + result = deviceController.publishMqttControl(deviceOwner, + VirtualFireAlarmConstants.DEVICE_TYPE, + deviceId, resource, state); + } catch (DeviceControllerException e) { + String errorMsg = "Error whilst trying to publish to MQTT Queue"; + log.error(errorMsg); + throw new DeviceManagementException(errorMsg, e); + } + return result; + } + + private String sendCommandViaXMPP(String deviceOwner, String deviceId, String resource, String state) throws DeviceManagementException { + + String replyMsg = ""; +// String scriptArguments = ""; +// String command = ""; + + String seperator = File.separator; + String xmppServerURL = XmppConfig.getInstance().getXmppEndpoint(); + int indexOfChar = xmppServerURL.lastIndexOf(seperator); + if (indexOfChar != -1) { + xmppServerURL = xmppServerURL.substring((indexOfChar + 1), xmppServerURL.length()); + } + + indexOfChar = xmppServerURL.indexOf(":"); + if (indexOfChar != -1) { + xmppServerURL = xmppServerURL.substring(0, indexOfChar); + } + +// String xmppAdminUName = XmppConfig.getInstance().getXmppUsername(); +// String xmppAdminPass = XmppConfig.getInstance().getXmppPassword(); +// String xmppAdminUserLogin = xmppAdminUName + "@" + xmppServerURL + seperator + deviceOwner; +// String clientToConnect = deviceId + "@" + xmppServerURL + seperator + deviceOwner; + String clientToConnect = deviceId + "@" + xmppServerURL + seperator + deviceOwner; + String message = resource.replace("/","") + ":" + state; + + virtualFireAlarmXMPPConnector.sendXMPPMessage(clientToConnect, message, "CONTROL-REQUEST"); + +// String scriptsFolder = "repository" + seperator + "resources" + seperator + "scripts"; +// String scriptPath = CarbonUtils.getCarbonHome() + seperator + scriptsFolder + seperator +// + "xmpp_client.py "; +// +// scriptArguments = +// "-j " + xmppAdminUserLogin + " -p " + xmppAdminPass + " -c " + clientToConnect + +// " -r " + resource + " -s " + state; +// command = "python " + scriptPath + scriptArguments; +// +// if (log.isDebugEnabled()) { +// log.debug("Connecting to XMPP Server via Admin credentials: " + xmppAdminUserLogin); +// log.debug("Trying to contact xmpp device account: " + clientToConnect); +// log.debug("Arguments used for the scripts: '" + scriptArguments + "'"); +// log.debug("Command exceuted: '" + command + "'"); +// } + +// switch (resource) { +// case BULB_CONTEXT: +// scriptArguments = "-r Bulb -s " + state; +// command = "python " + scriptPath + scriptArguments; +// break; +// case SONAR_CONTEXT: +// scriptArguments = "-r Sonar"; +// command = "python " + scriptPath + scriptArguments; +// break; +// case TEMPERATURE_CONTEXT: +// scriptArguments = "-r Temperature"; +// command = "python " + scriptPath + scriptArguments; +// break; +// } + +// replyMsg = executeCommand(command); + return replyMsg; + } + + /* --------------------------------------------------------------------------------------- + Utility methods relevant to creating and sending http requests + --------------------------------------------------------------------------------------- */ /* This methods creates and returns a http connection object */ diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/VirtualFireAlarmMQTTSubscriber.java b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/VirtualFireAlarmMQTTSubscriber.java index ab5703f0..036482d7 100644 --- a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/VirtualFireAlarmMQTTSubscriber.java +++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/VirtualFireAlarmMQTTSubscriber.java @@ -15,10 +15,12 @@ import java.util.UUID; public class VirtualFireAlarmMQTTSubscriber extends MqttSubscriber { private static Log log = LogFactory.getLog(VirtualFireAlarmMQTTSubscriber.class); +// wso2/iot/shabirmean/virtual_firealarm/t4ctwq8qfl11/publisher private static final String subscribeTopic = "wso2" + File.separator + "iot" + File.separator + "+" + File.separator + VirtualFireAlarmConstants.DEVICE_TYPE + File.separator + "+" + File.separator + - "reply"; + "publisher"; + private static final String iotServerSubscriber = UUID.randomUUID().toString().substring(0, 5); private static String mqttEndpoint; @@ -42,7 +44,20 @@ public class VirtualFireAlarmMQTTSubscriber extends MqttSubscriber { @Override protected void postMessageArrived(String topic, MqttMessage message) { - log.info("Message " + message.toString() + " was received for topic: " + topic); + String ownerAndId = topic.replace("wso2" + File.separator + "iot" + File.separator, ""); + ownerAndId = ownerAndId.replace(File.separator + VirtualFireAlarmConstants.DEVICE_TYPE + File.separator, ":"); + ownerAndId = ownerAndId.replace(File.separator + "publisher", ""); + + String owner = ownerAndId.split(":")[0]; + String deviceId = ownerAndId.split(":")[1]; + + log.info("Received MQTT message for: {OWNER-" + owner + "} & {DEVICE.ID-" + deviceId + "}"); + + if (message.toString().contains("PUBLISHER")) { + log.info("Received MQTT publisher message [" + message.toString() + "] topic: [" + topic + "]"); + } else { + log.info("Received MQTT reply message [" + message.toString() + "] topic: [" + topic + "]"); + } } private void retryMQTTSubscription() { diff --git a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/VirtualFireAlarmXMPPConnector.java b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/VirtualFireAlarmXMPPConnector.java index dce69034..9cadaa65 100644 --- a/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/VirtualFireAlarmXMPPConnector.java +++ b/modules/samples/virtual_firealarm/src/org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/sample/virtual/firealarm/service/impl/util/VirtualFireAlarmXMPPConnector.java @@ -41,8 +41,25 @@ public class VirtualFireAlarmXMPPConnector extends XmppConnector { @Override protected void processXMPPMessage(Message xmppMessage) { String from = xmppMessage.getFrom(); + String subject = xmppMessage.getSubject(); String message = xmppMessage.getBody(); - log.info("Received XMPP message '" + message + "' from " + from); + + int indexOfAt = from.indexOf("@"); + int indexOfSlash = from.indexOf("/"); + + String deviceId = from.substring(0, indexOfAt); + String owner = from.substring(indexOfSlash + 1, from.length()); + + log.info("Received XMPP message for: {OWNER-" + owner + "} & {DEVICE.ID-" + deviceId + "}"); + + if (subject.equals("PUBLISHER")) { + log.info("Received XMPP publisher message [" + message + "] from [" + from + "]"); + } else if(subject.equals("CONTROL-REPLY")) { + log.info("Received XMPP reply message [" + message + "] from [" + from + "]"); + } else { + log.info("Received SOME XMPP message [" + message + "] from " + from + "]"); + } + } private void retryXMPPConnection() {