Merge pull request #729 from Nirothipan/master

Virtual fire alarm advance agent startup siddhi exception
revert-dabc3590
Madhawa Perera 8 years ago committed by GitHub
commit eca3aa773e

@ -115,7 +115,7 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
try {
receivedMessage = message.toString();
if(!receivedMessage.contains("policyDefinition")){
if (!receivedMessage.contains("policyDefinition")) {
receivedMessage = AgentUtilOperations.extractMessageFromPayload(receivedMessage);
String[] controlSignal = receivedMessage.split(":");
@ -150,7 +150,7 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
log.info(AgentConstants.LOG_APPENDER + replyHumidity);
String humidPublishTopic = String.format(
AgentConstants.MQTT_PUBLISH_TOPIC,tenantDomain, deviceID);
AgentConstants.MQTT_PUBLISH_TOPIC, tenantDomain, deviceID);
replyMessage = AgentConstants.HUMIDITY_CONTROL + ":" + currentHumidity;
securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage);

@ -125,8 +125,8 @@ public class AgentManager {
agentCommunicator.put(AgentConstants.XMPP_PROTOCOL, xmppCommunicator);
} catch (TransportHandlerException e) {
log.error("XMPP Endpoint String - " + agentConfigs.getXmppServerEndpoint() +
", provided in the configuration file is invalid.");
log.info("XMPP Endpoint String - " + agentConfigs.getXmppServerEndpoint() +
", provided in the configuration file is invalid. XMPP is not configured.");
}
String mqttTopic = String.format(AgentConstants.MQTT_SUBSCRIBE_TOPIC, agentConfigs.getTenantDomain(),
agentConfigs.getDeviceId());

@ -68,15 +68,10 @@ public class SidhdhiQuery implements Runnable {
//Start the execution plan with pre-defined or previously persisted Siddhi query
File f = new File(sidhdhiQueryPath);
if (!f.exists()) {
AgentUtilOperations.writeToFile("", sidhdhiQueryPath);
}
StartExecutionPlan startExecutionPlan = new StartExecutionPlan().invoke();
while (true) {
if (f.exists()) {
//AgentUtilOperations.writeToFile("", sidhdhiQueryPath);
StartExecutionPlan startExecutionPlan = new StartExecutionPlan().invoke();
//Check if there is new policy update available
if (AgentManager.isUpdated()) {
System.out.print("### Policy Update Detected!");
@ -85,7 +80,6 @@ public class SidhdhiQuery implements Runnable {
startExecutionPlan = new StartExecutionPlan().invoke();
}
InputHandler inputHandler = startExecutionPlan.getInputHandler();
//Sending events to Siddhi
try {
int humidityReading = AgentManager.getInstance().getTemperature();
@ -97,6 +91,7 @@ public class SidhdhiQuery implements Runnable {
}
}
}
}
/**
* Re-Initialize SiddhiManager
@ -117,7 +112,9 @@ public class SidhdhiQuery implements Runnable {
public static String readFile(String path, Charset encoding) {
byte[] encoded = new byte[0];
try {
if (new File(sidhdhiQueryPath).exists()) {
encoded = Files.readAllBytes(Paths.get(path));
}
} catch (IOException e) {
log.error("Error reading Sidhdhi query from file.");
}
@ -172,7 +169,7 @@ public class SidhdhiQuery implements Runnable {
siddhiManager.addCallback("bulbOnStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
System.out.println("Bulb on Event Fired!");
// System.out.println("Bulb on Event Fired!");
if (events.length > 0) {
if (!AgentManager.getInstance().isAlarmOn()) {
AgentManager.getInstance().changeAlarmStatus(true);
@ -185,7 +182,7 @@ public class SidhdhiQuery implements Runnable {
siddhiManager.addCallback("bulbOffStream", new StreamCallback() {
@Override
public void receive(Event[] inEvents) {
System.out.println("Bulb off Event Fired");
// System.out.println("Bulb off Event Fired");
if (AgentManager.getInstance().isAlarmOn()) {
AgentManager.getInstance().changeAlarmStatus(false);
System.out.println("#### Performed HTTP call! OFF.");
@ -193,12 +190,10 @@ public class SidhdhiQuery implements Runnable {
}
});
//Retrieving InputHandler to push events into Siddhi
inputHandler = siddhiManager.getInputHandler("fireAlarmEventStream");
//Starting event processing
System.out.println("Execution Plan Started!");
// System.out.println("Execution Plan Started!");
return this;
}
}

@ -114,8 +114,8 @@ public class AgentManager {
agentCommunicator.put(AgentConstants.XMPP_PROTOCOL, xmppCommunicator);
} catch (TransportHandlerException e) {
log.error("XMPP Endpoint String - " + agentConfigs.getXmppServerEndpoint() +
", provided in the configuration file is invalid.");
log.info("XMPP Endpoint String - " + agentConfigs.getXmppServerEndpoint() +
", provided in the configuration file is invalid. XMPP is not configured.");
}
String mqttTopic = String.format(AgentConstants.MQTT_SUBSCRIBE_TOPIC, agentConfigs.getTenantDomain(),
agentConfigs.getDeviceId());

Loading…
Cancel
Save