|
|
|
@ -75,7 +75,7 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
|
|
|
|
|
agentManager.updateAgentStatus("Connected to MQTT Queue");
|
|
|
|
|
} catch (TransportHandlerException e) {
|
|
|
|
|
log.warn(AgentConstants.LOG_APPENDER + "Connection to MQTT Broker at: " + mqttBrokerEndPoint +
|
|
|
|
|
" failed.\n Will retry in " + timeoutInterval + " milli-seconds.");
|
|
|
|
|
" failed.\n Will retry in " + timeoutInterval + " milli-seconds.");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
@ -115,68 +115,67 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
receivedMessage = message.toString();
|
|
|
|
|
if(!receivedMessage.contains("policyDefinition")){
|
|
|
|
|
if (!receivedMessage.contains("policyDefinition")) {
|
|
|
|
|
receivedMessage = AgentUtilOperations.extractMessageFromPayload(receivedMessage);
|
|
|
|
|
|
|
|
|
|
String[] controlSignal = receivedMessage.split(":");
|
|
|
|
|
// message- "<SIGNAL_TYPE>:<SIGNAL_MODE>" format.(ex: "BULB:ON", "TEMPERATURE", "HUMIDITY")
|
|
|
|
|
String[] controlSignal = receivedMessage.split(":");
|
|
|
|
|
// message- "<SIGNAL_TYPE>:<SIGNAL_MODE>" format.(ex: "BULB:ON", "TEMPERATURE", "HUMIDITY")
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
switch (controlSignal[0].toUpperCase()) {
|
|
|
|
|
case AgentConstants.BULB_CONTROL:
|
|
|
|
|
boolean stateToSwitch = controlSignal[1].equals(AgentConstants.CONTROL_ON);
|
|
|
|
|
agentManager.changeAlarmStatus(stateToSwitch);
|
|
|
|
|
log.info(
|
|
|
|
|
AgentConstants.LOG_APPENDER + "Bulb was switched to state: '" + controlSignal[1] + "'");
|
|
|
|
|
break;
|
|
|
|
|
try {
|
|
|
|
|
switch (controlSignal[0].toUpperCase()) {
|
|
|
|
|
case AgentConstants.BULB_CONTROL:
|
|
|
|
|
boolean stateToSwitch = controlSignal[1].equals(AgentConstants.CONTROL_ON);
|
|
|
|
|
agentManager.changeAlarmStatus(stateToSwitch);
|
|
|
|
|
log.info(
|
|
|
|
|
AgentConstants.LOG_APPENDER + "Bulb was switched to state: '" + controlSignal[1] + "'");
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
case AgentConstants.TEMPERATURE_CONTROL:
|
|
|
|
|
int currentTemperature = agentManager.getTemperature();
|
|
|
|
|
case AgentConstants.TEMPERATURE_CONTROL:
|
|
|
|
|
int currentTemperature = agentManager.getTemperature();
|
|
|
|
|
|
|
|
|
|
String replyTemperature = "Current temperature was read as: '" + currentTemperature + "C'";
|
|
|
|
|
log.info(AgentConstants.LOG_APPENDER + replyTemperature);
|
|
|
|
|
String replyTemperature = "Current temperature was read as: '" + currentTemperature + "C'";
|
|
|
|
|
log.info(AgentConstants.LOG_APPENDER + replyTemperature);
|
|
|
|
|
|
|
|
|
|
String tempPublishTopic = String.format(AgentConstants.MQTT_PUBLISH_TOPIC, tenantDomain, deviceID);
|
|
|
|
|
String tempPublishTopic = String.format(AgentConstants.MQTT_PUBLISH_TOPIC, tenantDomain, deviceID);
|
|
|
|
|
|
|
|
|
|
replyMessage = AgentConstants.TEMPERATURE_CONTROL + ":" + currentTemperature;
|
|
|
|
|
securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage);
|
|
|
|
|
publishToQueue(tempPublishTopic, securePayLoad);
|
|
|
|
|
break;
|
|
|
|
|
replyMessage = AgentConstants.TEMPERATURE_CONTROL + ":" + currentTemperature;
|
|
|
|
|
securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage);
|
|
|
|
|
publishToQueue(tempPublishTopic, securePayLoad);
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
case AgentConstants.HUMIDITY_CONTROL:
|
|
|
|
|
int currentHumidity = agentManager.getHumidity();
|
|
|
|
|
case AgentConstants.HUMIDITY_CONTROL:
|
|
|
|
|
int currentHumidity = agentManager.getHumidity();
|
|
|
|
|
|
|
|
|
|
String replyHumidity = "Current humidity was read as: '" + currentHumidity + "%'";
|
|
|
|
|
log.info(AgentConstants.LOG_APPENDER + replyHumidity);
|
|
|
|
|
String replyHumidity = "Current humidity was read as: '" + currentHumidity + "%'";
|
|
|
|
|
log.info(AgentConstants.LOG_APPENDER + replyHumidity);
|
|
|
|
|
|
|
|
|
|
String humidPublishTopic = String.format(
|
|
|
|
|
AgentConstants.MQTT_PUBLISH_TOPIC,tenantDomain, deviceID);
|
|
|
|
|
String humidPublishTopic = String.format(
|
|
|
|
|
AgentConstants.MQTT_PUBLISH_TOPIC, tenantDomain, deviceID);
|
|
|
|
|
|
|
|
|
|
replyMessage = AgentConstants.HUMIDITY_CONTROL + ":" + currentHumidity;
|
|
|
|
|
securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage);
|
|
|
|
|
publishToQueue(humidPublishTopic, securePayLoad);
|
|
|
|
|
break;
|
|
|
|
|
replyMessage = AgentConstants.HUMIDITY_CONTROL + ":" + currentHumidity;
|
|
|
|
|
securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage);
|
|
|
|
|
publishToQueue(humidPublishTopic, securePayLoad);
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
case AgentConstants.POLICY_REVOKE:
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
log.warn(AgentConstants.LOG_APPENDER + "'" + controlSignal[0] +
|
|
|
|
|
"' is invalid and not-supported for this device-type");
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
} catch (AgentCoreOperationException e) {
|
|
|
|
|
log.warn(AgentConstants.LOG_APPENDER + "Preparing Secure payload failed", e);
|
|
|
|
|
} catch (TransportHandlerException e) {
|
|
|
|
|
log.error(AgentConstants.LOG_APPENDER +
|
|
|
|
|
"MQTT - Publishing, reply message to the MQTT Queue at: " +
|
|
|
|
|
agentManager.getAgentConfigs().getMqttBrokerEndpoint() + " failed");
|
|
|
|
|
}
|
|
|
|
|
default:
|
|
|
|
|
log.warn(AgentConstants.LOG_APPENDER + "'" + controlSignal[0] +
|
|
|
|
|
"' is invalid and not-supported for this device-type");
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
} catch (AgentCoreOperationException e) {
|
|
|
|
|
log.warn(AgentConstants.LOG_APPENDER + "Preparing Secure payload failed", e);
|
|
|
|
|
} catch (TransportHandlerException e) {
|
|
|
|
|
log.error(AgentConstants.LOG_APPENDER +
|
|
|
|
|
"MQTT - Publishing, reply message to the MQTT Queue at: " +
|
|
|
|
|
agentManager.getAgentConfigs().getMqttBrokerEndpoint() + " failed");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
JSONObject jsonMessage = new JSONObject(receivedMessage);
|
|
|
|
|
updateCEPPolicy(jsonMessage.getString("policyDefinition"));
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
log.info(AgentConstants.LOG_APPENDER + "Message [" + receivedMessage + "] was received");
|
|
|
|
|
} catch (AgentCoreOperationException e) {
|
|
|
|
@ -209,13 +208,13 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
|
|
|
|
|
pushMessage.setRetained(false);
|
|
|
|
|
|
|
|
|
|
String topic = String.format(AgentConstants.MQTT_PUBLISH_TOPIC,
|
|
|
|
|
agentManager.getAgentConfigs().getTenantDomain(),
|
|
|
|
|
agentManager.getAgentConfigs().getDeviceId());
|
|
|
|
|
agentManager.getAgentConfigs().getTenantDomain(),
|
|
|
|
|
agentManager.getAgentConfigs().getDeviceId());
|
|
|
|
|
|
|
|
|
|
publishToQueue(topic, pushMessage);
|
|
|
|
|
log.info(AgentConstants.LOG_APPENDER + "Message: '" + message + "' published to MQTT Queue at [" +
|
|
|
|
|
agentManager.getAgentConfigs().getMqttBrokerEndpoint() + "] under topic [" +
|
|
|
|
|
topic + "]");
|
|
|
|
|
agentManager.getAgentConfigs().getMqttBrokerEndpoint() + "] under topic [" +
|
|
|
|
|
topic + "]");
|
|
|
|
|
|
|
|
|
|
} catch (TransportHandlerException e) {
|
|
|
|
|
log.warn(AgentConstants.LOG_APPENDER + "Data Publish attempt to topic - [" +
|
|
|
|
@ -227,7 +226,7 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
dataPushServiceHandler = service.scheduleAtFixedRate(pushDataRunnable, publishInterval, publishInterval,
|
|
|
|
|
TimeUnit.SECONDS);
|
|
|
|
|
TimeUnit.SECONDS);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|