Fix for default policy updates in virtual fire alarm

revert-dabc3590
Nirothipan 8 years ago
parent a17b8a16ec
commit 6cfe4ac3e7

@ -176,6 +176,7 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
} else { } else {
JSONObject jsonMessage = new JSONObject(receivedMessage); JSONObject jsonMessage = new JSONObject(receivedMessage);
updateCEPPolicy(jsonMessage.getString("policyDefinition")); updateCEPPolicy(jsonMessage.getString("policyDefinition"));
} }
log.info(AgentConstants.LOG_APPENDER + "Message [" + receivedMessage + "] was received"); log.info(AgentConstants.LOG_APPENDER + "Message [" + receivedMessage + "] was received");
} catch (AgentCoreOperationException e) { } catch (AgentCoreOperationException e) {

@ -125,8 +125,8 @@ public class AgentManager {
agentCommunicator.put(AgentConstants.XMPP_PROTOCOL, xmppCommunicator); agentCommunicator.put(AgentConstants.XMPP_PROTOCOL, xmppCommunicator);
} catch (TransportHandlerException e) { } catch (TransportHandlerException e) {
log.error("XMPP Endpoint String - " + agentConfigs.getXmppServerEndpoint() + log.info("XMPP Endpoint String - " + agentConfigs.getXmppServerEndpoint() +
", provided in the configuration file is invalid."); ", provided in the configuration file is invalid. XMPP is not configured.");
} }
String mqttTopic = String.format(AgentConstants.MQTT_SUBSCRIBE_TOPIC, agentConfigs.getTenantDomain(), String mqttTopic = String.format(AgentConstants.MQTT_SUBSCRIBE_TOPIC, agentConfigs.getTenantDomain(),
agentConfigs.getDeviceId()); agentConfigs.getDeviceId());

@ -68,32 +68,34 @@ public class SidhdhiQuery implements Runnable {
//Start the execution plan with pre-defined or previously persisted Siddhi query //Start the execution plan with pre-defined or previously persisted Siddhi query
File f = new File(sidhdhiQueryPath); File f = new File(sidhdhiQueryPath);
while (true) {
if (!f.exists()) { if (f.exists()) {
AgentUtilOperations.writeToFile("", sidhdhiQueryPath); //AgentUtilOperations.writeToFile("", sidhdhiQueryPath);
}
StartExecutionPlan startExecutionPlan = new StartExecutionPlan().invoke();
while (true) { StartExecutionPlan startExecutionPlan = new StartExecutionPlan().invoke();
//Check if there is new policy update available
if (AgentManager.isUpdated()) {
System.out.print("### Policy Update Detected!"); //Check if there is new policy update available
//Restart execution plan with new query if (AgentManager.isUpdated()) {
restartSiddhi(); System.out.print("### Policy Update Detected!");
startExecutionPlan = new StartExecutionPlan().invoke(); //Restart execution plan with new query
} restartSiddhi();
InputHandler inputHandler = startExecutionPlan.getInputHandler(); startExecutionPlan = new StartExecutionPlan().invoke();
}
//Sending events to Siddhi InputHandler inputHandler = startExecutionPlan.getInputHandler();
try {
int humidityReading = AgentManager.getInstance().getTemperature(); //Sending events to Siddhi
inputHandler.send(new Object[]{"FIRE_1", humidityReading}); try {
Thread.sleep(3000); int humidityReading = AgentManager.getInstance().getTemperature();
} catch (InterruptedException e) { inputHandler.send(new Object[]{"FIRE_1", humidityReading});
e.printStackTrace(); Thread.sleep(3000);
break; } catch (InterruptedException e) {
e.printStackTrace();
break;
}
} }
} }
} }
@ -117,7 +119,9 @@ public class SidhdhiQuery implements Runnable {
public static String readFile(String path, Charset encoding) { public static String readFile(String path, Charset encoding) {
byte[] encoded = new byte[0]; byte[] encoded = new byte[0];
try { try {
encoded = Files.readAllBytes(Paths.get(path)); if(new File(sidhdhiQueryPath).exists()){
encoded = Files.readAllBytes(Paths.get(path));
}
} catch (IOException e) { } catch (IOException e) {
log.error("Error reading Sidhdhi query from file."); log.error("Error reading Sidhdhi query from file.");
} }
@ -172,7 +176,7 @@ public class SidhdhiQuery implements Runnable {
siddhiManager.addCallback("bulbOnStream", new StreamCallback() { siddhiManager.addCallback("bulbOnStream", new StreamCallback() {
@Override @Override
public void receive(Event[] events) { public void receive(Event[] events) {
System.out.println("Bulb on Event Fired!"); // System.out.println("Bulb on Event Fired!");
if (events.length > 0) { if (events.length > 0) {
if (!AgentManager.getInstance().isAlarmOn()) { if (!AgentManager.getInstance().isAlarmOn()) {
AgentManager.getInstance().changeAlarmStatus(true); AgentManager.getInstance().changeAlarmStatus(true);
@ -185,7 +189,7 @@ public class SidhdhiQuery implements Runnable {
siddhiManager.addCallback("bulbOffStream", new StreamCallback() { siddhiManager.addCallback("bulbOffStream", new StreamCallback() {
@Override @Override
public void receive(Event[] inEvents) { public void receive(Event[] inEvents) {
System.out.println("Bulb off Event Fired"); // System.out.println("Bulb off Event Fired");
if (AgentManager.getInstance().isAlarmOn()) { if (AgentManager.getInstance().isAlarmOn()) {
AgentManager.getInstance().changeAlarmStatus(false); AgentManager.getInstance().changeAlarmStatus(false);
System.out.println("#### Performed HTTP call! OFF."); System.out.println("#### Performed HTTP call! OFF.");
@ -198,7 +202,7 @@ public class SidhdhiQuery implements Runnable {
inputHandler = siddhiManager.getInputHandler("fireAlarmEventStream"); inputHandler = siddhiManager.getInputHandler("fireAlarmEventStream");
//Starting event processing //Starting event processing
System.out.println("Execution Plan Started!"); // System.out.println("Execution Plan Started!");
return this; return this;
} }
} }

@ -114,8 +114,8 @@ public class AgentManager {
agentCommunicator.put(AgentConstants.XMPP_PROTOCOL, xmppCommunicator); agentCommunicator.put(AgentConstants.XMPP_PROTOCOL, xmppCommunicator);
} catch (TransportHandlerException e) { } catch (TransportHandlerException e) {
log.error("XMPP Endpoint String - " + agentConfigs.getXmppServerEndpoint() + log.info("XMPP Endpoint String - " + agentConfigs.getXmppServerEndpoint() +
", provided in the configuration file is invalid."); ", provided in the configuration file is invalid. XMPP is not configured.");
} }
String mqttTopic = String.format(AgentConstants.MQTT_SUBSCRIBE_TOPIC, agentConfigs.getTenantDomain(), String mqttTopic = String.format(AgentConstants.MQTT_SUBSCRIBE_TOPIC, agentConfigs.getTenantDomain(),
agentConfigs.getDeviceId()); agentConfigs.getDeviceId());

Loading…
Cancel
Save