Pasindu 8 years ago
commit 1f2e157210

@ -85,7 +85,7 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
} catch (TransportHandlerException e) { } catch (TransportHandlerException e) {
log.warn(AgentConstants.LOG_APPENDER + "Subscription to MQTT Broker at: " + log.warn(AgentConstants.LOG_APPENDER + "Subscription to MQTT Broker at: " +
mqttBrokerEndPoint + " failed"); mqttBrokerEndPoint + " failed");
agentManager.updateAgentStatus("Subscription to broker failed."); agentManager.updateAgentStatus("Subscription to broker failed.");
} }
@ -118,16 +118,6 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
receivedMessage = message.toString(); receivedMessage = message.toString();
if(!receivedMessage.contains("policyDefinition")){ if(!receivedMessage.contains("policyDefinition")){
receivedMessage = AgentUtilOperations.extractMessageFromPayload(receivedMessage); receivedMessage = AgentUtilOperations.extractMessageFromPayload(receivedMessage);
}else{
JSONObject jsonMessage = new JSONObject(receivedMessage);
updateCEPPolicy(jsonMessage.getString("policyDefinition"));
}
log.info(AgentConstants.LOG_APPENDER + "Message [" + receivedMessage + "] was received");
} catch (AgentCoreOperationException e) {
log.warn(AgentConstants.LOG_APPENDER + "Could not extract message from payload.", e);
return;
}
String[] controlSignal = receivedMessage.split(":"); String[] controlSignal = receivedMessage.split(":");
// message- "<SIGNAL_TYPE>:<SIGNAL_MODE>" format.(ex: "BULB:ON", "TEMPERATURE", "HUMIDITY") // message- "<SIGNAL_TYPE>:<SIGNAL_MODE>" format.(ex: "BULB:ON", "TEMPERATURE", "HUMIDITY")
@ -168,9 +158,8 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
publishToQueue(humidPublishTopic, securePayLoad); publishToQueue(humidPublishTopic, securePayLoad);
break; break;
case AgentConstants.POLICY_SIGNAL: case AgentConstants.POLICY_REVOKE:
String policy = controlSignal[1]; break;
updateCEPPolicy(policy);
default: default:
log.warn(AgentConstants.LOG_APPENDER + "'" + controlSignal[0] + log.warn(AgentConstants.LOG_APPENDER + "'" + controlSignal[0] +
@ -185,6 +174,17 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
agentManager.getAgentConfigs().getMqttBrokerEndpoint() + " failed"); agentManager.getAgentConfigs().getMqttBrokerEndpoint() + " failed");
} }
} else {
JSONObject jsonMessage = new JSONObject(receivedMessage);
updateCEPPolicy(jsonMessage.getString("policyDefinition"));
}
log.info(AgentConstants.LOG_APPENDER + "Message [" + receivedMessage + "] was received");
} catch (AgentCoreOperationException e) {
log.warn(AgentConstants.LOG_APPENDER + "Could not extract message from payload.", e);
return;
}
} }
@Override @Override
@ -247,8 +247,8 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
} catch (MqttException e) { } catch (MqttException e) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.warn(AgentConstants.LOG_APPENDER + log.warn(AgentConstants.LOG_APPENDER +
"Unable to 'STOP' MQTT connection at broker at: " + "Unable to 'STOP' MQTT connection at broker at: " +
mqttBrokerEndPoint); mqttBrokerEndPoint);
} }
try { try {

@ -57,7 +57,8 @@ public class AgentConstants {
--------------------------------------------------------------------------------------- */ --------------------------------------------------------------------------------------- */
public static final int DEFAULT_MQTT_RECONNECTION_INTERVAL = 2; // time in seconds public static final int DEFAULT_MQTT_RECONNECTION_INTERVAL = 2; // time in seconds
public static final int DEFAULT_MQTT_QUALITY_OF_SERVICE = 0; public static final int DEFAULT_MQTT_QUALITY_OF_SERVICE = 0;
public static final String MQTT_SUBSCRIBE_TOPIC = "%s/" + DEVICE_TYPE + "/%s/#"; //public static final String MQTT_SUBSCRIBE_TOPIC = "%s/" + DEVICE_TYPE + "/%s/#";
public static final String MQTT_SUBSCRIBE_TOPIC = "%s/" + DEVICE_TYPE + "/%s/operation/#";
public static final String MQTT_PUBLISH_TOPIC = "%s/" + DEVICE_TYPE + "/%s/temperature"; public static final String MQTT_PUBLISH_TOPIC = "%s/" + DEVICE_TYPE + "/%s/temperature";
/* --------------------------------------------------------------------------------------- /* ---------------------------------------------------------------------------------------
XMPP Connection specific information XMPP Connection specific information
@ -104,7 +105,7 @@ public class AgentConstants {
--------------------------------------------------------------------------------------- */ --------------------------------------------------------------------------------------- */
public static final String BULB_CONTROL = "BULB"; public static final String BULB_CONTROL = "BULB";
public static final String TEMPERATURE_CONTROL = "TEMPERATURE"; public static final String TEMPERATURE_CONTROL = "TEMPERATURE";
public static final String POLICY_SIGNAL = "POLICY"; public static final String POLICY_REVOKE = "POLICY_REVOKE";
public static final String HUMIDITY_CONTROL = "HUMIDITY"; public static final String HUMIDITY_CONTROL = "HUMIDITY";
public static final String CONTROL_ON = "ON"; public static final String CONTROL_ON = "ON";
public static final String CONTROL_OFF = "OFF"; public static final String CONTROL_OFF = "OFF";

@ -63,7 +63,7 @@ public class AgentConstants {
--------------------------------------------------------------------------------------- */ --------------------------------------------------------------------------------------- */
public static final int DEFAULT_MQTT_RECONNECTION_INTERVAL = 2; // time in seconds public static final int DEFAULT_MQTT_RECONNECTION_INTERVAL = 2; // time in seconds
public static final int DEFAULT_MQTT_QUALITY_OF_SERVICE = 0; public static final int DEFAULT_MQTT_QUALITY_OF_SERVICE = 0;
public static final String MQTT_SUBSCRIBE_TOPIC = "%s/" + DEVICE_TYPE + "/%s"; public static final String MQTT_SUBSCRIBE_TOPIC = "%s/" + DEVICE_TYPE + "/%s/operation/#";
public static final String MQTT_PUBLISH_TOPIC = "%s/" + DEVICE_TYPE + "/%s/temperature"; public static final String MQTT_PUBLISH_TOPIC = "%s/" + DEVICE_TYPE + "/%s/temperature";
/* --------------------------------------------------------------------------------------- /* ---------------------------------------------------------------------------------------

@ -34,6 +34,7 @@ import org.wso2.carbon.device.mgt.common.group.mgt.DeviceGroupConstants;
import org.wso2.carbon.device.mgt.common.operation.mgt.Operation; import org.wso2.carbon.device.mgt.common.operation.mgt.Operation;
import org.wso2.carbon.device.mgt.common.operation.mgt.OperationManagementException; import org.wso2.carbon.device.mgt.common.operation.mgt.OperationManagementException;
import org.wso2.carbon.device.mgt.core.operation.mgt.CommandOperation; import org.wso2.carbon.device.mgt.core.operation.mgt.CommandOperation;
import org.wso2.carbon.device.mgt.core.operation.mgt.ConfigOperation;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl.constants.VirtualFireAlarmConstants; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl.constants.VirtualFireAlarmConstants;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl.dto.SensorRecord; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl.dto.SensorRecord;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl.util.APIUtil; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl.util.APIUtil;
@ -96,9 +97,8 @@ public class VirtualFireAlarmServiceImpl implements VirtualFireAlarmService {
String publishTopic = APIUtil.getTenantDomainOftheUser() + "/" String publishTopic = APIUtil.getTenantDomainOftheUser() + "/"
+ VirtualFireAlarmConstants.DEVICE_TYPE + "/" + deviceId; + VirtualFireAlarmConstants.DEVICE_TYPE + "/" + deviceId;
Operation commandOp = new CommandOperation(); ConfigOperation commandOp = new ConfigOperation();
commandOp.setCode("buzz"); commandOp.setCode("buzz");
commandOp.setType(Operation.Type.COMMAND);
commandOp.setEnabled(true); commandOp.setEnabled(true);
commandOp.setPayLoad(actualMessage); commandOp.setPayLoad(actualMessage);

Loading…
Cancel
Save