From 6cfe4ac3e74fb55ea6911b3e45790c389868a185 Mon Sep 17 00:00:00 2001 From: Nirothipan Date: Wed, 28 Jun 2017 15:10:22 +0530 Subject: [PATCH 1/2] Fix for default policy updates in virtual fire alarm --- .../mqtt/FireAlarmMQTTCommunicator.java | 1 + .../agent/advanced/core/AgentManager.java | 4 +- .../agent/advanced/sidhdhi/SidhdhiQuery.java | 56 ++++++++++--------- .../agent/core/AgentManager.java | 4 +- 4 files changed, 35 insertions(+), 30 deletions(-) diff --git a/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/communication/mqtt/FireAlarmMQTTCommunicator.java b/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/communication/mqtt/FireAlarmMQTTCommunicator.java index 4ad470671..0d263201c 100644 --- a/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/communication/mqtt/FireAlarmMQTTCommunicator.java +++ b/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/communication/mqtt/FireAlarmMQTTCommunicator.java @@ -176,6 +176,7 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler { } else { JSONObject jsonMessage = new JSONObject(receivedMessage); updateCEPPolicy(jsonMessage.getString("policyDefinition")); + } log.info(AgentConstants.LOG_APPENDER + "Message [" + receivedMessage + "] was received"); } catch (AgentCoreOperationException e) { diff --git a/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/core/AgentManager.java b/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/core/AgentManager.java index 78e3b19cf..79ba03500 100644 --- a/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/core/AgentManager.java +++ b/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/core/AgentManager.java @@ -125,8 +125,8 @@ public class AgentManager { agentCommunicator.put(AgentConstants.XMPP_PROTOCOL, xmppCommunicator); } catch (TransportHandlerException e) { - log.error("XMPP Endpoint String - " + agentConfigs.getXmppServerEndpoint() + - ", provided in the configuration file is invalid."); + log.info("XMPP Endpoint String - " + agentConfigs.getXmppServerEndpoint() + + ", provided in the configuration file is invalid. XMPP is not configured."); } String mqttTopic = String.format(AgentConstants.MQTT_SUBSCRIBE_TOPIC, agentConfigs.getTenantDomain(), agentConfigs.getDeviceId()); diff --git a/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/sidhdhi/SidhdhiQuery.java b/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/sidhdhi/SidhdhiQuery.java index 66829bbdf..f08e8eb0a 100644 --- a/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/sidhdhi/SidhdhiQuery.java +++ b/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/sidhdhi/SidhdhiQuery.java @@ -68,32 +68,34 @@ public class SidhdhiQuery implements Runnable { //Start the execution plan with pre-defined or previously persisted Siddhi query File f = new File(sidhdhiQueryPath); + while (true) { - if (!f.exists()) { - AgentUtilOperations.writeToFile("", sidhdhiQueryPath); - } + if (f.exists()) { + //AgentUtilOperations.writeToFile("", sidhdhiQueryPath); - StartExecutionPlan startExecutionPlan = new StartExecutionPlan().invoke(); - while (true) { + StartExecutionPlan startExecutionPlan = new StartExecutionPlan().invoke(); - //Check if there is new policy update available - if (AgentManager.isUpdated()) { - System.out.print("### Policy Update Detected!"); - //Restart execution plan with new query - restartSiddhi(); - startExecutionPlan = new StartExecutionPlan().invoke(); - } - InputHandler inputHandler = startExecutionPlan.getInputHandler(); - - //Sending events to Siddhi - try { - int humidityReading = AgentManager.getInstance().getTemperature(); - inputHandler.send(new Object[]{"FIRE_1", humidityReading}); - Thread.sleep(3000); - } catch (InterruptedException e) { - e.printStackTrace(); - break; + + + //Check if there is new policy update available + if (AgentManager.isUpdated()) { + System.out.print("### Policy Update Detected!"); + //Restart execution plan with new query + restartSiddhi(); + startExecutionPlan = new StartExecutionPlan().invoke(); + } + InputHandler inputHandler = startExecutionPlan.getInputHandler(); + + //Sending events to Siddhi + try { + int humidityReading = AgentManager.getInstance().getTemperature(); + inputHandler.send(new Object[]{"FIRE_1", humidityReading}); + Thread.sleep(3000); + } catch (InterruptedException e) { + e.printStackTrace(); + break; + } } } } @@ -117,7 +119,9 @@ public class SidhdhiQuery implements Runnable { public static String readFile(String path, Charset encoding) { byte[] encoded = new byte[0]; try { - encoded = Files.readAllBytes(Paths.get(path)); + if(new File(sidhdhiQueryPath).exists()){ + encoded = Files.readAllBytes(Paths.get(path)); + } } catch (IOException e) { log.error("Error reading Sidhdhi query from file."); } @@ -172,7 +176,7 @@ public class SidhdhiQuery implements Runnable { siddhiManager.addCallback("bulbOnStream", new StreamCallback() { @Override public void receive(Event[] events) { - System.out.println("Bulb on Event Fired!"); + // System.out.println("Bulb on Event Fired!"); if (events.length > 0) { if (!AgentManager.getInstance().isAlarmOn()) { AgentManager.getInstance().changeAlarmStatus(true); @@ -185,7 +189,7 @@ public class SidhdhiQuery implements Runnable { siddhiManager.addCallback("bulbOffStream", new StreamCallback() { @Override public void receive(Event[] inEvents) { - System.out.println("Bulb off Event Fired"); + // System.out.println("Bulb off Event Fired"); if (AgentManager.getInstance().isAlarmOn()) { AgentManager.getInstance().changeAlarmStatus(false); System.out.println("#### Performed HTTP call! OFF."); @@ -198,7 +202,7 @@ public class SidhdhiQuery implements Runnable { inputHandler = siddhiManager.getInputHandler("fireAlarmEventStream"); //Starting event processing - System.out.println("Execution Plan Started!"); + // System.out.println("Execution Plan Started!"); return this; } } diff --git a/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/core/AgentManager.java b/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/core/AgentManager.java index 4ad7f0037..df3435c27 100644 --- a/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/core/AgentManager.java +++ b/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/core/AgentManager.java @@ -114,8 +114,8 @@ public class AgentManager { agentCommunicator.put(AgentConstants.XMPP_PROTOCOL, xmppCommunicator); } catch (TransportHandlerException e) { - log.error("XMPP Endpoint String - " + agentConfigs.getXmppServerEndpoint() + - ", provided in the configuration file is invalid."); + log.info("XMPP Endpoint String - " + agentConfigs.getXmppServerEndpoint() + + ", provided in the configuration file is invalid. XMPP is not configured."); } String mqttTopic = String.format(AgentConstants.MQTT_SUBSCRIBE_TOPIC, agentConfigs.getTenantDomain(), agentConfigs.getDeviceId()); From 2c3462006e20a3887d1651852c7ad72ef7233283 Mon Sep 17 00:00:00 2001 From: Nirothipan Date: Thu, 29 Jun 2017 09:23:41 +0530 Subject: [PATCH 2/2] code formatting --- .../mqtt/FireAlarmMQTTCommunicator.java | 97 +++++++++---------- .../agent/advanced/sidhdhi/SidhdhiQuery.java | 23 ++--- 2 files changed, 55 insertions(+), 65 deletions(-) diff --git a/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/communication/mqtt/FireAlarmMQTTCommunicator.java b/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/communication/mqtt/FireAlarmMQTTCommunicator.java index 0d263201c..aae520f8e 100644 --- a/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/communication/mqtt/FireAlarmMQTTCommunicator.java +++ b/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/communication/mqtt/FireAlarmMQTTCommunicator.java @@ -75,7 +75,7 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler { agentManager.updateAgentStatus("Connected to MQTT Queue"); } catch (TransportHandlerException e) { log.warn(AgentConstants.LOG_APPENDER + "Connection to MQTT Broker at: " + mqttBrokerEndPoint + - " failed.\n Will retry in " + timeoutInterval + " milli-seconds."); + " failed.\n Will retry in " + timeoutInterval + " milli-seconds."); } try { @@ -115,68 +115,67 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler { try { receivedMessage = message.toString(); - if(!receivedMessage.contains("policyDefinition")){ + if (!receivedMessage.contains("policyDefinition")) { receivedMessage = AgentUtilOperations.extractMessageFromPayload(receivedMessage); - String[] controlSignal = receivedMessage.split(":"); - // message- ":" format.(ex: "BULB:ON", "TEMPERATURE", "HUMIDITY") + String[] controlSignal = receivedMessage.split(":"); + // message- ":" format.(ex: "BULB:ON", "TEMPERATURE", "HUMIDITY") - try { - switch (controlSignal[0].toUpperCase()) { - case AgentConstants.BULB_CONTROL: - boolean stateToSwitch = controlSignal[1].equals(AgentConstants.CONTROL_ON); - agentManager.changeAlarmStatus(stateToSwitch); - log.info( - AgentConstants.LOG_APPENDER + "Bulb was switched to state: '" + controlSignal[1] + "'"); - break; + try { + switch (controlSignal[0].toUpperCase()) { + case AgentConstants.BULB_CONTROL: + boolean stateToSwitch = controlSignal[1].equals(AgentConstants.CONTROL_ON); + agentManager.changeAlarmStatus(stateToSwitch); + log.info( + AgentConstants.LOG_APPENDER + "Bulb was switched to state: '" + controlSignal[1] + "'"); + break; - case AgentConstants.TEMPERATURE_CONTROL: - int currentTemperature = agentManager.getTemperature(); + case AgentConstants.TEMPERATURE_CONTROL: + int currentTemperature = agentManager.getTemperature(); - String replyTemperature = "Current temperature was read as: '" + currentTemperature + "C'"; - log.info(AgentConstants.LOG_APPENDER + replyTemperature); + String replyTemperature = "Current temperature was read as: '" + currentTemperature + "C'"; + log.info(AgentConstants.LOG_APPENDER + replyTemperature); - String tempPublishTopic = String.format(AgentConstants.MQTT_PUBLISH_TOPIC, tenantDomain, deviceID); + String tempPublishTopic = String.format(AgentConstants.MQTT_PUBLISH_TOPIC, tenantDomain, deviceID); - replyMessage = AgentConstants.TEMPERATURE_CONTROL + ":" + currentTemperature; - securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage); - publishToQueue(tempPublishTopic, securePayLoad); - break; + replyMessage = AgentConstants.TEMPERATURE_CONTROL + ":" + currentTemperature; + securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage); + publishToQueue(tempPublishTopic, securePayLoad); + break; - case AgentConstants.HUMIDITY_CONTROL: - int currentHumidity = agentManager.getHumidity(); + case AgentConstants.HUMIDITY_CONTROL: + int currentHumidity = agentManager.getHumidity(); - String replyHumidity = "Current humidity was read as: '" + currentHumidity + "%'"; - log.info(AgentConstants.LOG_APPENDER + replyHumidity); + String replyHumidity = "Current humidity was read as: '" + currentHumidity + "%'"; + log.info(AgentConstants.LOG_APPENDER + replyHumidity); - String humidPublishTopic = String.format( - AgentConstants.MQTT_PUBLISH_TOPIC,tenantDomain, deviceID); + String humidPublishTopic = String.format( + AgentConstants.MQTT_PUBLISH_TOPIC, tenantDomain, deviceID); - replyMessage = AgentConstants.HUMIDITY_CONTROL + ":" + currentHumidity; - securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage); - publishToQueue(humidPublishTopic, securePayLoad); - break; + replyMessage = AgentConstants.HUMIDITY_CONTROL + ":" + currentHumidity; + securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage); + publishToQueue(humidPublishTopic, securePayLoad); + break; case AgentConstants.POLICY_REVOKE: break; - default: - log.warn(AgentConstants.LOG_APPENDER + "'" + controlSignal[0] + - "' is invalid and not-supported for this device-type"); - break; - } - } catch (AgentCoreOperationException e) { - log.warn(AgentConstants.LOG_APPENDER + "Preparing Secure payload failed", e); - } catch (TransportHandlerException e) { - log.error(AgentConstants.LOG_APPENDER + - "MQTT - Publishing, reply message to the MQTT Queue at: " + - agentManager.getAgentConfigs().getMqttBrokerEndpoint() + " failed"); - } + default: + log.warn(AgentConstants.LOG_APPENDER + "'" + controlSignal[0] + + "' is invalid and not-supported for this device-type"); + break; + } + } catch (AgentCoreOperationException e) { + log.warn(AgentConstants.LOG_APPENDER + "Preparing Secure payload failed", e); + } catch (TransportHandlerException e) { + log.error(AgentConstants.LOG_APPENDER + + "MQTT - Publishing, reply message to the MQTT Queue at: " + + agentManager.getAgentConfigs().getMqttBrokerEndpoint() + " failed"); + } } else { JSONObject jsonMessage = new JSONObject(receivedMessage); updateCEPPolicy(jsonMessage.getString("policyDefinition")); - } log.info(AgentConstants.LOG_APPENDER + "Message [" + receivedMessage + "] was received"); } catch (AgentCoreOperationException e) { @@ -209,13 +208,13 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler { pushMessage.setRetained(false); String topic = String.format(AgentConstants.MQTT_PUBLISH_TOPIC, - agentManager.getAgentConfigs().getTenantDomain(), - agentManager.getAgentConfigs().getDeviceId()); + agentManager.getAgentConfigs().getTenantDomain(), + agentManager.getAgentConfigs().getDeviceId()); publishToQueue(topic, pushMessage); log.info(AgentConstants.LOG_APPENDER + "Message: '" + message + "' published to MQTT Queue at [" + - agentManager.getAgentConfigs().getMqttBrokerEndpoint() + "] under topic [" + - topic + "]"); + agentManager.getAgentConfigs().getMqttBrokerEndpoint() + "] under topic [" + + topic + "]"); } catch (TransportHandlerException e) { log.warn(AgentConstants.LOG_APPENDER + "Data Publish attempt to topic - [" + @@ -227,7 +226,7 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler { }; dataPushServiceHandler = service.scheduleAtFixedRate(pushDataRunnable, publishInterval, publishInterval, - TimeUnit.SECONDS); + TimeUnit.SECONDS); } diff --git a/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/sidhdhi/SidhdhiQuery.java b/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/sidhdhi/SidhdhiQuery.java index f08e8eb0a..9aa8b31ab 100644 --- a/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/sidhdhi/SidhdhiQuery.java +++ b/components/device-types/virtual-fire-alarm-plugin/org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.impl/src/main/java/org/wso2/carbon/device/mgt/iot/virtualfirealarm/agent/advanced/sidhdhi/SidhdhiQuery.java @@ -69,15 +69,9 @@ public class SidhdhiQuery implements Runnable { //Start the execution plan with pre-defined or previously persisted Siddhi query File f = new File(sidhdhiQueryPath); while (true) { - - if (f.exists()) { - //AgentUtilOperations.writeToFile("", sidhdhiQueryPath); - - - StartExecutionPlan startExecutionPlan = new StartExecutionPlan().invoke(); - - - + if (f.exists()) { + //AgentUtilOperations.writeToFile("", sidhdhiQueryPath); + StartExecutionPlan startExecutionPlan = new StartExecutionPlan().invoke(); //Check if there is new policy update available if (AgentManager.isUpdated()) { System.out.print("### Policy Update Detected!"); @@ -86,7 +80,6 @@ public class SidhdhiQuery implements Runnable { startExecutionPlan = new StartExecutionPlan().invoke(); } InputHandler inputHandler = startExecutionPlan.getInputHandler(); - //Sending events to Siddhi try { int humidityReading = AgentManager.getInstance().getTemperature(); @@ -119,7 +112,7 @@ public class SidhdhiQuery implements Runnable { public static String readFile(String path, Charset encoding) { byte[] encoded = new byte[0]; try { - if(new File(sidhdhiQueryPath).exists()){ + if (new File(sidhdhiQueryPath).exists()) { encoded = Files.readAllBytes(Paths.get(path)); } } catch (IOException e) { @@ -176,7 +169,7 @@ public class SidhdhiQuery implements Runnable { siddhiManager.addCallback("bulbOnStream", new StreamCallback() { @Override public void receive(Event[] events) { - // System.out.println("Bulb on Event Fired!"); + // System.out.println("Bulb on Event Fired!"); if (events.length > 0) { if (!AgentManager.getInstance().isAlarmOn()) { AgentManager.getInstance().changeAlarmStatus(true); @@ -189,7 +182,7 @@ public class SidhdhiQuery implements Runnable { siddhiManager.addCallback("bulbOffStream", new StreamCallback() { @Override public void receive(Event[] inEvents) { - // System.out.println("Bulb off Event Fired"); + // System.out.println("Bulb off Event Fired"); if (AgentManager.getInstance().isAlarmOn()) { AgentManager.getInstance().changeAlarmStatus(false); System.out.println("#### Performed HTTP call! OFF."); @@ -197,12 +190,10 @@ public class SidhdhiQuery implements Runnable { } }); - //Retrieving InputHandler to push events into Siddhi inputHandler = siddhiManager.getInputHandler("fireAlarmEventStream"); - //Starting event processing - // System.out.println("Execution Plan Started!"); + // System.out.println("Execution Plan Started!"); return this; } }