diff --git a/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/communication/mqtt/FireAlarmMQTTCommunicator.java b/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/communication/mqtt/FireAlarmMQTTCommunicator.java index 0d263201c..aae520f8e 100644 --- a/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/communication/mqtt/FireAlarmMQTTCommunicator.java +++ b/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/communication/mqtt/FireAlarmMQTTCommunicator.java @@ -75,7 +75,7 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler { agentManager.updateAgentStatus("Connected to MQTT Queue"); } catch (TransportHandlerException e) { log.warn(AgentConstants.LOG_APPENDER + "Connection to MQTT Broker at: " + mqttBrokerEndPoint + - " failed.\n Will retry in " + timeoutInterval + " milli-seconds."); + " failed.\n Will retry in " + timeoutInterval + " milli-seconds."); } try { @@ -115,68 +115,67 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler { try { receivedMessage = message.toString(); - if(!receivedMessage.contains("policyDefinition")){ + if (!receivedMessage.contains("policyDefinition")) { receivedMessage = AgentUtilOperations.extractMessageFromPayload(receivedMessage); - String[] controlSignal = receivedMessage.split(":"); - // message- ":" format.(ex: "BULB:ON", "TEMPERATURE", "HUMIDITY") + String[] controlSignal = receivedMessage.split(":"); + // message- ":" format.(ex: "BULB:ON", "TEMPERATURE", "HUMIDITY") - try { - switch (controlSignal[0].toUpperCase()) { - case AgentConstants.BULB_CONTROL: - boolean stateToSwitch = controlSignal[1].equals(AgentConstants.CONTROL_ON); - agentManager.changeAlarmStatus(stateToSwitch); - log.info( - AgentConstants.LOG_APPENDER + "Bulb was switched to state: '" + controlSignal[1] + "'"); - break; + try { + switch (controlSignal[0].toUpperCase()) { + case AgentConstants.BULB_CONTROL: + boolean stateToSwitch = controlSignal[1].equals(AgentConstants.CONTROL_ON); + agentManager.changeAlarmStatus(stateToSwitch); + log.info( + AgentConstants.LOG_APPENDER + "Bulb was switched to state: '" + controlSignal[1] + "'"); + break; - case AgentConstants.TEMPERATURE_CONTROL: - int currentTemperature = agentManager.getTemperature(); + case AgentConstants.TEMPERATURE_CONTROL: + int currentTemperature = agentManager.getTemperature(); - String replyTemperature = "Current temperature was read as: '" + currentTemperature + "C'"; - log.info(AgentConstants.LOG_APPENDER + replyTemperature); + String replyTemperature = "Current temperature was read as: '" + currentTemperature + "C'"; + log.info(AgentConstants.LOG_APPENDER + replyTemperature); - String tempPublishTopic = String.format(AgentConstants.MQTT_PUBLISH_TOPIC, tenantDomain, deviceID); + String tempPublishTopic = String.format(AgentConstants.MQTT_PUBLISH_TOPIC, tenantDomain, deviceID); - replyMessage = AgentConstants.TEMPERATURE_CONTROL + ":" + currentTemperature; - securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage); - publishToQueue(tempPublishTopic, securePayLoad); - break; + replyMessage = AgentConstants.TEMPERATURE_CONTROL + ":" + currentTemperature; + securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage); + publishToQueue(tempPublishTopic, securePayLoad); + break; - case AgentConstants.HUMIDITY_CONTROL: - int currentHumidity = agentManager.getHumidity(); + case AgentConstants.HUMIDITY_CONTROL: + int currentHumidity = agentManager.getHumidity(); - String replyHumidity = "Current humidity was read as: '" + currentHumidity + "%'"; - log.info(AgentConstants.LOG_APPENDER + replyHumidity); + String replyHumidity = "Current humidity was read as: '" + currentHumidity + "%'"; + log.info(AgentConstants.LOG_APPENDER + replyHumidity); - String humidPublishTopic = String.format( - AgentConstants.MQTT_PUBLISH_TOPIC,tenantDomain, deviceID); + String humidPublishTopic = String.format( + AgentConstants.MQTT_PUBLISH_TOPIC, tenantDomain, deviceID); - replyMessage = AgentConstants.HUMIDITY_CONTROL + ":" + currentHumidity; - securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage); - publishToQueue(humidPublishTopic, securePayLoad); - break; + replyMessage = AgentConstants.HUMIDITY_CONTROL + ":" + currentHumidity; + securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage); + publishToQueue(humidPublishTopic, securePayLoad); + break; case AgentConstants.POLICY_REVOKE: break; - default: - log.warn(AgentConstants.LOG_APPENDER + "'" + controlSignal[0] + - "' is invalid and not-supported for this device-type"); - break; - } - } catch (AgentCoreOperationException e) { - log.warn(AgentConstants.LOG_APPENDER + "Preparing Secure payload failed", e); - } catch (TransportHandlerException e) { - log.error(AgentConstants.LOG_APPENDER + - "MQTT - Publishing, reply message to the MQTT Queue at: " + - agentManager.getAgentConfigs().getMqttBrokerEndpoint() + " failed"); - } + default: + log.warn(AgentConstants.LOG_APPENDER + "'" + controlSignal[0] + + "' is invalid and not-supported for this device-type"); + break; + } + } catch (AgentCoreOperationException e) { + log.warn(AgentConstants.LOG_APPENDER + "Preparing Secure payload failed", e); + } catch (TransportHandlerException e) { + log.error(AgentConstants.LOG_APPENDER + + "MQTT - Publishing, reply message to the MQTT Queue at: " + + agentManager.getAgentConfigs().getMqttBrokerEndpoint() + " failed"); + } } else { JSONObject jsonMessage = new JSONObject(receivedMessage); updateCEPPolicy(jsonMessage.getString("policyDefinition")); - } log.info(AgentConstants.LOG_APPENDER + "Message [" + receivedMessage + "] was received"); } catch (AgentCoreOperationException e) { @@ -209,13 +208,13 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler { pushMessage.setRetained(false); String topic = String.format(AgentConstants.MQTT_PUBLISH_TOPIC, - agentManager.getAgentConfigs().getTenantDomain(), - agentManager.getAgentConfigs().getDeviceId()); + agentManager.getAgentConfigs().getTenantDomain(), + agentManager.getAgentConfigs().getDeviceId()); publishToQueue(topic, pushMessage); log.info(AgentConstants.LOG_APPENDER + "Message: '" + message + "' published to MQTT Queue at [" + - agentManager.getAgentConfigs().getMqttBrokerEndpoint() + "] under topic [" + - topic + "]"); + agentManager.getAgentConfigs().getMqttBrokerEndpoint() + "] under topic [" + + topic + "]"); } catch (TransportHandlerException e) { log.warn(AgentConstants.LOG_APPENDER + "Data Publish attempt to topic - [" + @@ -227,7 +226,7 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler { }; dataPushServiceHandler = service.scheduleAtFixedRate(pushDataRunnable, publishInterval, publishInterval, - TimeUnit.SECONDS); + TimeUnit.SECONDS); } diff --git a/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/sidhdhi/SidhdhiQuery.java b/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/sidhdhi/SidhdhiQuery.java index f08e8eb0a..9aa8b31ab 100644 --- a/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/sidhdhi/SidhdhiQuery.java +++ b/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/sidhdhi/SidhdhiQuery.java @@ -69,15 +69,9 @@ public class SidhdhiQuery implements Runnable { //Start the execution plan with pre-defined or previously persisted Siddhi query File f = new File(sidhdhiQueryPath); while (true) { - - if (f.exists()) { - //AgentUtilOperations.writeToFile("", sidhdhiQueryPath); - - - StartExecutionPlan startExecutionPlan = new StartExecutionPlan().invoke(); - - - + if (f.exists()) { + //AgentUtilOperations.writeToFile("", sidhdhiQueryPath); + StartExecutionPlan startExecutionPlan = new StartExecutionPlan().invoke(); //Check if there is new policy update available if (AgentManager.isUpdated()) { System.out.print("### Policy Update Detected!"); @@ -86,7 +80,6 @@ public class SidhdhiQuery implements Runnable { startExecutionPlan = new StartExecutionPlan().invoke(); } InputHandler inputHandler = startExecutionPlan.getInputHandler(); - //Sending events to Siddhi try { int humidityReading = AgentManager.getInstance().getTemperature(); @@ -119,7 +112,7 @@ public class SidhdhiQuery implements Runnable { public static String readFile(String path, Charset encoding) { byte[] encoded = new byte[0]; try { - if(new File(sidhdhiQueryPath).exists()){ + if (new File(sidhdhiQueryPath).exists()) { encoded = Files.readAllBytes(Paths.get(path)); } } catch (IOException e) { @@ -176,7 +169,7 @@ public class SidhdhiQuery implements Runnable { siddhiManager.addCallback("bulbOnStream", new StreamCallback() { @Override public void receive(Event[] events) { - // System.out.println("Bulb on Event Fired!"); + // System.out.println("Bulb on Event Fired!"); if (events.length > 0) { if (!AgentManager.getInstance().isAlarmOn()) { AgentManager.getInstance().changeAlarmStatus(true); @@ -189,7 +182,7 @@ public class SidhdhiQuery implements Runnable { siddhiManager.addCallback("bulbOffStream", new StreamCallback() { @Override public void receive(Event[] inEvents) { - // System.out.println("Bulb off Event Fired"); + // System.out.println("Bulb off Event Fired"); if (AgentManager.getInstance().isAlarmOn()) { AgentManager.getInstance().changeAlarmStatus(false); System.out.println("#### Performed HTTP call! OFF."); @@ -197,12 +190,10 @@ public class SidhdhiQuery implements Runnable { } }); - //Retrieving InputHandler to push events into Siddhi inputHandler = siddhiManager.getInputHandler("fireAlarmEventStream"); - //Starting event processing - // System.out.println("Execution Plan Started!"); + // System.out.println("Execution Plan Started!"); return this; } }