|
|
|
@ -25,10 +25,11 @@ import org.apache.commons.logging.LogFactory;
|
|
|
|
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
|
|
|
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
|
|
|
|
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.core.AgentConstants;
|
|
|
|
|
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.core.AgentManager;
|
|
|
|
|
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.core.AgentUtilOperations;
|
|
|
|
|
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.exception.AgentCoreOperationException;
|
|
|
|
|
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.transport.TransportHandlerException;
|
|
|
|
|
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.transport.mqtt.MQTTTransportHandler;
|
|
|
|
|
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.advanced.core.AgentManager;
|
|
|
|
|
|
|
|
|
|
import java.io.File;
|
|
|
|
|
import java.io.FileOutputStream;
|
|
|
|
@ -52,6 +53,7 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
|
|
|
|
|
super(deviceOwner, deviceType, mqttBrokerEndPoint, subscribeTopic);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@SuppressWarnings("unused")
|
|
|
|
|
public FireAlarmMQTTCommunicator(String deviceOwner, String deviceType,
|
|
|
|
|
String mqttBrokerEndPoint, String subscribeTopic,
|
|
|
|
|
int intervalInMillis) {
|
|
|
|
@ -62,6 +64,8 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
|
|
|
|
|
return dataPushServiceHandler;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//TODO:: Terminate logs with a period
|
|
|
|
|
//TODO: Need to print exceptions
|
|
|
|
|
@Override
|
|
|
|
|
public void connect() {
|
|
|
|
|
final AgentManager agentManager = AgentManager.getInstance();
|
|
|
|
@ -70,21 +74,27 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
|
|
|
|
|
while (!isConnected()) {
|
|
|
|
|
try {
|
|
|
|
|
connectToQueue();
|
|
|
|
|
subscribeToQueue();
|
|
|
|
|
agentManager.updateAgentStatus("Connected to MQTT Queue");
|
|
|
|
|
publishDeviceData(agentManager.getPushInterval());
|
|
|
|
|
} catch (TransportHandlerException e) {
|
|
|
|
|
log.warn(AgentConstants.LOG_APPENDER + "Connection to MQTT Broker at: " + mqttBrokerEndPoint +
|
|
|
|
|
" failed.\n Will retry in " + timeoutInterval + " milli-seconds.");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
subscribeToQueue();
|
|
|
|
|
agentManager.updateAgentStatus("Subscribed to MQTT Queue");
|
|
|
|
|
publishDeviceData();
|
|
|
|
|
|
|
|
|
|
} catch (TransportHandlerException e) {
|
|
|
|
|
log.warn(AgentConstants.LOG_APPENDER +
|
|
|
|
|
"Connection/Subscription to MQTT Broker at: " +
|
|
|
|
|
log.warn(AgentConstants.LOG_APPENDER + "Subscription to MQTT Broker at: " +
|
|
|
|
|
mqttBrokerEndPoint + " failed");
|
|
|
|
|
agentManager.updateAgentStatus("Subscription to broker failed.");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
Thread.sleep(timeoutInterval);
|
|
|
|
|
} catch (InterruptedException ex) {
|
|
|
|
|
log.error(AgentConstants.LOG_APPENDER +
|
|
|
|
|
"MQTT-Subscriber: Thread Sleep Interrupt Exception");
|
|
|
|
|
}
|
|
|
|
|
log.error(AgentConstants.LOG_APPENDER + "MQTT: Connect-Thread Sleep Interrupt Exception.");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -95,22 +105,32 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
|
|
|
|
|
connectorThread.start();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void processIncomingMessage(MqttMessage message, String... messageParams) {
|
|
|
|
|
final AgentManager agentManager = AgentManager.getInstance();
|
|
|
|
|
log.info(AgentConstants.LOG_APPENDER + "Message " + message.toString() + " was received");
|
|
|
|
|
|
|
|
|
|
String serverName = agentManager.getAgentConfigs().getServerName();
|
|
|
|
|
String deviceOwner = agentManager.getAgentConfigs().getDeviceOwner();
|
|
|
|
|
String deviceID = agentManager.getAgentConfigs().getDeviceId();
|
|
|
|
|
String receivedMessage;
|
|
|
|
|
String replyMessage;
|
|
|
|
|
String securePayLoad;
|
|
|
|
|
|
|
|
|
|
String[] controlSignal = message.toString().split(":");
|
|
|
|
|
// log.info("########## Incoming Message : " + controlSignal[0]);
|
|
|
|
|
// message- "<SIGNAL_TYPE>:<SIGNAL_MODE>" format.(ex: "BULB:ON", "TEMPERATURE", "HUMIDITY")
|
|
|
|
|
try {
|
|
|
|
|
receivedMessage = AgentUtilOperations.extractMessageFromPayload(message.toString());
|
|
|
|
|
log.info(AgentConstants.LOG_APPENDER + "Message [" + receivedMessage + "] was received");
|
|
|
|
|
} catch (AgentCoreOperationException e) {
|
|
|
|
|
log.warn(AgentConstants.LOG_APPENDER + "Could not extract message from payload.", e);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (isJSONValid(message.toString())) {
|
|
|
|
|
JsonObject jobj = new Gson().fromJson(message.toString(), JsonObject.class);
|
|
|
|
|
String[] controlSignal = receivedMessage.split(":");
|
|
|
|
|
// message- "<SIGNAL_TYPE>:<SIGNAL_MODE>" format.(ex: "BULB:ON", "TEMPERATURE", "HUMIDITY")
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
if (isJSONValid(receivedMessage)) {
|
|
|
|
|
JsonObject jobj = new Gson().fromJson(receivedMessage, JsonObject.class);
|
|
|
|
|
String type = jobj.get("type").toString();
|
|
|
|
|
|
|
|
|
|
if (type.equalsIgnoreCase("\"policy\"")) {
|
|
|
|
@ -121,9 +141,9 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
|
|
|
|
|
switch (controlSignal[0].toUpperCase()) {
|
|
|
|
|
case AgentConstants.BULB_CONTROL:
|
|
|
|
|
boolean stateToSwitch = controlSignal[1].equals(AgentConstants.CONTROL_ON);
|
|
|
|
|
|
|
|
|
|
agentManager.changeAlarmStatus(stateToSwitch);
|
|
|
|
|
log.info(AgentConstants.LOG_APPENDER + "Bulb was switched to state: '" + controlSignal[1] + "'");
|
|
|
|
|
log.info(
|
|
|
|
|
AgentConstants.LOG_APPENDER + "Bulb was switched to state: '" + controlSignal[1] + "'");
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
case AgentConstants.TEMPERATURE_CONTROL:
|
|
|
|
@ -132,17 +152,12 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
|
|
|
|
|
String replyTemperature = "Current temperature was read as: '" + currentTemperature + "C'";
|
|
|
|
|
log.info(AgentConstants.LOG_APPENDER + replyTemperature);
|
|
|
|
|
|
|
|
|
|
String tempPublishTopic = String.format(
|
|
|
|
|
AgentConstants.MQTT_PUBLISH_TOPIC, deviceOwner, deviceID);
|
|
|
|
|
replyMessage = AgentConstants.TEMPERATURE_CONTROL + ":" + currentTemperature;
|
|
|
|
|
String tempPublishTopic = String.format(AgentConstants.MQTT_PUBLISH_TOPIC,
|
|
|
|
|
serverName, deviceOwner, deviceID);
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
publishToQueue(tempPublishTopic, replyMessage);
|
|
|
|
|
} catch (TransportHandlerException e) {
|
|
|
|
|
log.error(AgentConstants.LOG_APPENDER +
|
|
|
|
|
"MQTT - Publishing, reply message to the MQTT Queue at: " +
|
|
|
|
|
agentManager.getAgentConfigs().getMqttBrokerEndpoint() + " failed");
|
|
|
|
|
}
|
|
|
|
|
replyMessage = AgentConstants.TEMPERATURE_CONTROL + ":" + currentTemperature;
|
|
|
|
|
securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage);
|
|
|
|
|
publishToQueue(tempPublishTopic, securePayLoad);
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
case AgentConstants.HUMIDITY_CONTROL:
|
|
|
|
@ -152,16 +167,11 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
|
|
|
|
|
log.info(AgentConstants.LOG_APPENDER + replyHumidity);
|
|
|
|
|
|
|
|
|
|
String humidPublishTopic = String.format(
|
|
|
|
|
AgentConstants.MQTT_PUBLISH_TOPIC, deviceOwner, deviceID);
|
|
|
|
|
replyMessage = AgentConstants.HUMIDITY_CONTROL + ":" + currentHumidity;
|
|
|
|
|
AgentConstants.MQTT_PUBLISH_TOPIC, serverName, deviceOwner, deviceID);
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
publishToQueue(humidPublishTopic, replyMessage);
|
|
|
|
|
} catch (TransportHandlerException e) {
|
|
|
|
|
log.error(AgentConstants.LOG_APPENDER +
|
|
|
|
|
"MQTT - Publishing, reply message to the MQTT Queue at: " +
|
|
|
|
|
agentManager.getAgentConfigs().getMqttBrokerEndpoint() + " failed");
|
|
|
|
|
}
|
|
|
|
|
replyMessage = AgentConstants.HUMIDITY_CONTROL + ":" + currentHumidity;
|
|
|
|
|
securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage);
|
|
|
|
|
publishToQueue(humidPublishTopic, securePayLoad);
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
default:
|
|
|
|
@ -170,46 +180,55 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} catch (AgentCoreOperationException e) {
|
|
|
|
|
log.warn(AgentConstants.LOG_APPENDER + "Preparing Secure payload failed", e);
|
|
|
|
|
} catch (TransportHandlerException e) {
|
|
|
|
|
log.error(AgentConstants.LOG_APPENDER +
|
|
|
|
|
"MQTT - Publishing, reply message to the MQTT Queue at: " +
|
|
|
|
|
agentManager.getAgentConfigs().getMqttBrokerEndpoint() + " failed");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void publishDeviceData(int publishInterval) {
|
|
|
|
|
public void publishDeviceData() {
|
|
|
|
|
final AgentManager agentManager = AgentManager.getInstance();
|
|
|
|
|
int publishInterval = agentManager.getPushInterval();
|
|
|
|
|
Runnable pushDataRunnable = new Runnable() {
|
|
|
|
|
@Override
|
|
|
|
|
public void run() {
|
|
|
|
|
int currentTemperature = agentManager.getTemperature();
|
|
|
|
|
String payLoad =
|
|
|
|
|
"PUBLISHER:" + AgentConstants.TEMPERATURE_CONTROL + ":" +
|
|
|
|
|
currentTemperature;
|
|
|
|
|
String message = "PUBLISHER:" + AgentConstants.TEMPERATURE_CONTROL + ":" + currentTemperature;
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
String payLoad = AgentUtilOperations.prepareSecurePayLoad(message);
|
|
|
|
|
|
|
|
|
|
MqttMessage pushMessage = new MqttMessage();
|
|
|
|
|
pushMessage.setPayload(payLoad.getBytes(StandardCharsets.UTF_8));
|
|
|
|
|
pushMessage.setQos(DEFAULT_MQTT_QUALITY_OF_SERVICE);
|
|
|
|
|
pushMessage.setRetained(true);
|
|
|
|
|
pushMessage.setRetained(false);
|
|
|
|
|
|
|
|
|
|
String topic = String.format(AgentConstants.MQTT_PUBLISH_TOPIC,
|
|
|
|
|
agentManager.getAgentConfigs().getServerName(),
|
|
|
|
|
agentManager.getAgentConfigs().getDeviceOwner(),
|
|
|
|
|
agentManager.getAgentConfigs().getDeviceId());
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
publishToQueue(topic, pushMessage);
|
|
|
|
|
log.info(AgentConstants.LOG_APPENDER + "Message: '" + pushMessage +
|
|
|
|
|
"' published to MQTT Queue at [" +
|
|
|
|
|
agentManager.getAgentConfigs().getMqttBrokerEndpoint() +
|
|
|
|
|
"] under topic [" + topic + "]");
|
|
|
|
|
log.info(AgentConstants.LOG_APPENDER + "Message: '" + message + "' published to MQTT Queue at [" +
|
|
|
|
|
agentManager.getAgentConfigs().getMqttBrokerEndpoint() + "] under topic [" +
|
|
|
|
|
topic + "]");
|
|
|
|
|
|
|
|
|
|
} catch (TransportHandlerException e) {
|
|
|
|
|
log.warn(AgentConstants.LOG_APPENDER + "Data Publish attempt to topic - [" +
|
|
|
|
|
AgentConstants.MQTT_PUBLISH_TOPIC + "] failed for payload [" +
|
|
|
|
|
payLoad + "]");
|
|
|
|
|
AgentConstants.MQTT_PUBLISH_TOPIC + "] failed for payload [" + message + "]");
|
|
|
|
|
} catch (AgentCoreOperationException e) {
|
|
|
|
|
log.warn(AgentConstants.LOG_APPENDER + "Preparing Secure payload failed", e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
dataPushServiceHandler = service.scheduleAtFixedRate(pushDataRunnable, publishInterval,
|
|
|
|
|
publishInterval, TimeUnit.SECONDS);
|
|
|
|
|
dataPushServiceHandler = service.scheduleAtFixedRate(pushDataRunnable, publishInterval, publishInterval,
|
|
|
|
|
TimeUnit.SECONDS);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -218,8 +237,12 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
|
|
|
|
|
Runnable stopConnection = new Runnable() {
|
|
|
|
|
public void run() {
|
|
|
|
|
while (isConnected()) {
|
|
|
|
|
try {
|
|
|
|
|
|
|
|
|
|
if (dataPushServiceHandler != null) {
|
|
|
|
|
dataPushServiceHandler.cancel(true);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
closeConnection();
|
|
|
|
|
|
|
|
|
|
} catch (MqttException e) {
|
|
|
|
@ -250,6 +273,10 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void publishDeviceData(String... publishData) {
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private boolean isJSONValid(String JSON_STRING) {
|
|
|
|
|
try {
|
|
|
|
|