|
|
|
@ -85,7 +85,7 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
|
|
|
|
|
|
|
|
|
|
} catch (TransportHandlerException e) {
|
|
|
|
|
log.warn(AgentConstants.LOG_APPENDER + "Subscription to MQTT Broker at: " +
|
|
|
|
|
mqttBrokerEndPoint + " failed");
|
|
|
|
|
mqttBrokerEndPoint + " failed");
|
|
|
|
|
agentManager.updateAgentStatus("Subscription to broker failed.");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -108,7 +108,6 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
|
|
|
|
|
public void processIncomingMessage(MqttMessage message, String... messageParams) {
|
|
|
|
|
final AgentManager agentManager = AgentManager.getInstance();
|
|
|
|
|
String tenantDomain = agentManager.getAgentConfigs().getTenantDomain();
|
|
|
|
|
String deviceOwner = agentManager.getAgentConfigs().getDeviceOwner();
|
|
|
|
|
String deviceID = agentManager.getAgentConfigs().getDeviceId();
|
|
|
|
|
String receivedMessage;
|
|
|
|
|
String replyMessage;
|
|
|
|
@ -118,16 +117,6 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
|
|
|
|
|
receivedMessage = message.toString();
|
|
|
|
|
if(!receivedMessage.contains("policyDefinition")){
|
|
|
|
|
receivedMessage = AgentUtilOperations.extractMessageFromPayload(receivedMessage);
|
|
|
|
|
}else{
|
|
|
|
|
JSONObject jsonMessage = new JSONObject(receivedMessage);
|
|
|
|
|
updateCEPPolicy(jsonMessage.getString("policyDefinition"));
|
|
|
|
|
}
|
|
|
|
|
log.info(AgentConstants.LOG_APPENDER + "Message [" + receivedMessage + "] was received");
|
|
|
|
|
} catch (AgentCoreOperationException e) {
|
|
|
|
|
log.warn(AgentConstants.LOG_APPENDER + "Could not extract message from payload.", e);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
String[] controlSignal = receivedMessage.split(":");
|
|
|
|
|
// message- "<SIGNAL_TYPE>:<SIGNAL_MODE>" format.(ex: "BULB:ON", "TEMPERATURE", "HUMIDITY")
|
|
|
|
@ -168,9 +157,8 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
|
|
|
|
|
publishToQueue(humidPublishTopic, securePayLoad);
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
case AgentConstants.POLICY_SIGNAL:
|
|
|
|
|
String policy = controlSignal[1];
|
|
|
|
|
updateCEPPolicy(policy);
|
|
|
|
|
case AgentConstants.POLICY_REVOKE:
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
log.warn(AgentConstants.LOG_APPENDER + "'" + controlSignal[0] +
|
|
|
|
@ -185,6 +173,17 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
|
|
|
|
|
agentManager.getAgentConfigs().getMqttBrokerEndpoint() + " failed");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
JSONObject jsonMessage = new JSONObject(receivedMessage);
|
|
|
|
|
updateCEPPolicy(jsonMessage.getString("policyDefinition"));
|
|
|
|
|
}
|
|
|
|
|
log.info(AgentConstants.LOG_APPENDER + "Message [" + receivedMessage + "] was received");
|
|
|
|
|
} catch (AgentCoreOperationException e) {
|
|
|
|
|
log.warn(AgentConstants.LOG_APPENDER + "Could not extract message from payload.", e);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -219,7 +218,7 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
|
|
|
|
|
|
|
|
|
|
} catch (TransportHandlerException e) {
|
|
|
|
|
log.warn(AgentConstants.LOG_APPENDER + "Data Publish attempt to topic - [" +
|
|
|
|
|
AgentConstants.MQTT_PUBLISH_TOPIC + "] failed for payload [" + message + "]");
|
|
|
|
|
AgentConstants.MQTT_PUBLISH_TOPIC + "] failed for payload [" + message + "]");
|
|
|
|
|
} catch (AgentCoreOperationException e) {
|
|
|
|
|
log.warn(AgentConstants.LOG_APPENDER + "Preparing Secure payload failed", e);
|
|
|
|
|
}
|
|
|
|
@ -247,15 +246,15 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
|
|
|
|
|
} catch (MqttException e) {
|
|
|
|
|
if (log.isDebugEnabled()) {
|
|
|
|
|
log.warn(AgentConstants.LOG_APPENDER +
|
|
|
|
|
"Unable to 'STOP' MQTT connection at broker at: " +
|
|
|
|
|
mqttBrokerEndPoint);
|
|
|
|
|
"Unable to 'STOP' MQTT connection at broker at: " +
|
|
|
|
|
mqttBrokerEndPoint);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
Thread.sleep(timeoutInterval);
|
|
|
|
|
} catch (InterruptedException e1) {
|
|
|
|
|
log.error(AgentConstants.LOG_APPENDER +
|
|
|
|
|
"MQTT-Terminator: Thread Sleep Interrupt Exception");
|
|
|
|
|
"MQTT-Terminator: Thread Sleep Interrupt Exception");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|