Implementing null checks

Ace 9 years ago
parent b57e23817e
commit 55d97dd335

@ -88,58 +88,60 @@ public class ArduinoMQTTConnector extends MQTTTransportHandler {
@Override @Override
public void processIncomingMessage(MqttMessage message, String... messageParams) throws TransportHandlerException { public void processIncomingMessage(MqttMessage message, String... messageParams) throws TransportHandlerException {
String topic = messageParams[0]; if(messageParams.length != 0) {
// owner and the deviceId are extracted from the MQTT topic to which the messgae was received. String topic = messageParams[0];
String ownerAndId = topic.replace(serverName + File.separator, ""); // owner and the deviceId are extracted from the MQTT topic to which the messgae was received.
ownerAndId = ownerAndId.replace(File.separator + ArduinoConstants.DEVICE_TYPE + File.separator, ":"); String ownerAndId = topic.replace(serverName + File.separator, "");
ownerAndId = ownerAndId.replace(File.separator + ArduinoConstants.DEVICE_TYPE + File.separator, ":");
String owner = ownerAndId.split(":")[0]; String owner = ownerAndId.split(":")[0];
String deviceId = ownerAndId.split(":")[1]; String deviceId = ownerAndId.split(":")[1];
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Received MQTT message for: [OWNER-" + owner + "] & [DEVICE.ID-" + deviceId + "]"); log.debug("Received MQTT message for: [OWNER-" + owner + "] & [DEVICE.ID-" + deviceId + "]");
} }
int lastIndex = message.toString().lastIndexOf(":"); int lastIndex = message.toString().lastIndexOf(":");
String msgContext = message.toString().substring(lastIndex + 1); String msgContext = message.toString().substring(lastIndex + 1);
LinkedList<String> deviceControlList; LinkedList<String> deviceControlList;
LinkedList<String> replyMessageList; LinkedList<String> replyMessageList;
if (msgContext.equals(MESSAGE_TO_SEND) || msgContext.equals(ArduinoConstants.STATE_ON) || msgContext.equals( if (msgContext.equals(MESSAGE_TO_SEND) || msgContext.equals(ArduinoConstants.STATE_ON) || msgContext.equals(
ArduinoConstants.STATE_OFF)) { ArduinoConstants.STATE_OFF)) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Received a control message: "); log.debug("Received a control message: ");
log.debug("Control message topic: " + topic); log.debug("Control message topic: " + topic);
log.debug("Control message: " + message.toString()); log.debug("Control message: " + message.toString());
} }
synchronized (ArduinoControllerService.getInternalControlsQueue()) { synchronized (ArduinoControllerService.getInternalControlsQueue()) {
deviceControlList = ArduinoControllerService.getInternalControlsQueue().get(deviceId); deviceControlList = ArduinoControllerService.getInternalControlsQueue().get(deviceId);
if (deviceControlList == null) { if (deviceControlList == null) {
ArduinoControllerService.getInternalControlsQueue() ArduinoControllerService.getInternalControlsQueue()
.put(deviceId, deviceControlList = new LinkedList<String>()); .put(deviceId, deviceControlList = new LinkedList<String>());
}
} }
} deviceControlList.add(message.toString());
deviceControlList.add(message.toString());
} else if (msgContext.equals(MESSAGE_RECEIVED)) { } else if (msgContext.equals(MESSAGE_RECEIVED)) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Received reply from a device: "); log.debug("Received reply from a device: ");
log.debug("Reply message topic: " + topic); log.debug("Reply message topic: " + topic);
log.debug("Reply message: " + message.toString().substring(0, lastIndex)); log.debug("Reply message: " + message.toString().substring(0, lastIndex));
} }
synchronized (ArduinoControllerService.getReplyMsgQueue()) { synchronized (ArduinoControllerService.getReplyMsgQueue()) {
replyMessageList = ArduinoControllerService.getReplyMsgQueue().get(deviceId); replyMessageList = ArduinoControllerService.getReplyMsgQueue().get(deviceId);
if (replyMessageList == null) { if (replyMessageList == null) {
ArduinoControllerService.getReplyMsgQueue() ArduinoControllerService.getReplyMsgQueue()
.put(deviceId, replyMessageList = new LinkedList<>()); .put(deviceId, replyMessageList = new LinkedList<>());
}
} }
replyMessageList.add(message.toString());
} }
replyMessageList.add(message.toString());
} }
} }

@ -65,28 +65,30 @@ public class DigitalDisplayMQTTConnector extends MQTTTransportHandler {
@Override @Override
public void processIncomingMessage(MqttMessage message, String... messageParams) { public void processIncomingMessage(MqttMessage message, String... messageParams) {
String topic = messageParams[0]; if(messageParams.length != 0) {
String ownerAndId = topic.replace("wso2" + File.separator + "iot" + File.separator, ""); String topic = messageParams[0];
ownerAndId = ownerAndId.replace(File.separator + DigitalDisplayConstants.DEVICE_TYPE + File.separator, ":"); String ownerAndId = topic.replace("wso2" + File.separator + "iot" + File.separator, "");
ownerAndId = ownerAndId.replace(File.separator + "digital_display_publisher", ""); ownerAndId = ownerAndId.replace(File.separator + DigitalDisplayConstants.DEVICE_TYPE + File.separator, ":");
ownerAndId = ownerAndId.replace(File.separator + "digital_display_publisher", "");
String owner = ownerAndId.split(":")[0];
String deviceId = ownerAndId.split(":")[1]; String owner = ownerAndId.split(":")[0];
String[] messageData = message.toString().split(":"); String deviceId = ownerAndId.split(":")[1];
String[] messageData = message.toString().split(":");
if (log.isDebugEnabled()){
log.debug("Received MQTT message for: [OWNER-" + owner + "] & [DEVICE.ID-" + deviceId + "]"); if (log.isDebugEnabled()) {
} log.debug("Received MQTT message for: [OWNER-" + owner + "] & [DEVICE.ID-" + deviceId + "]");
}
if (messageData.length == 3) { if (messageData.length == 3) {
String randomId = messageData[0]; String randomId = messageData[0];
String requestMessage = messageData[1]; String requestMessage = messageData[1];
String result = messageData[2]; String result = messageData[2];
if(log.isDebugEnabled()){ if (log.isDebugEnabled()) {
log.debug("Return result " + result + " for Request " + requestMessage); log.debug("Return result " + result + " for Request " + requestMessage);
}
DigitalDisplayWebSocketServerEndPoint.sendMessage(randomId, result);
} }
DigitalDisplayWebSocketServerEndPoint.sendMessage(randomId, result);
} }
} }

@ -86,37 +86,39 @@ public class RaspberryPiMQTTConnector extends MQTTTransportHandler {
@Override @Override
public void processIncomingMessage(MqttMessage message, String... messageParams) throws TransportHandlerException { public void processIncomingMessage(MqttMessage message, String... messageParams) throws TransportHandlerException {
String topic = messageParams[0]; if(messageParams.length != 0) {
// owner and the deviceId are extracted from the MQTT topic to which the messgae was received. String topic = messageParams[0];
String ownerAndId = topic.replace(serverName + File.separator, ""); // owner and the deviceId are extracted from the MQTT topic to which the messgae was received.
ownerAndId = ownerAndId.replace(File.separator + RaspberrypiConstants.DEVICE_TYPE + File.separator, ":"); String ownerAndId = topic.replace(serverName + File.separator, "");
ownerAndId = ownerAndId.replace(File.separator + "publisher", ""); ownerAndId = ownerAndId.replace(File.separator + RaspberrypiConstants.DEVICE_TYPE + File.separator, ":");
ownerAndId = ownerAndId.replace(File.separator + "publisher", "");
String owner = ownerAndId.split(":")[0];
String deviceId = ownerAndId.split(":")[1];
String receivedMessage = message.toString();
if (log.isDebugEnabled()) {
log.debug("Received MQTT message for: [OWNER-" + owner + "] & [DEVICE.ID-" + deviceId + "]");
log.debug("Message [" + receivedMessage + "] topic: [" + topic + "]");
}
if (receivedMessage.contains("PUBLISHER")) {
float temperature = Float.parseFloat(receivedMessage.split(":")[2]);
if (!RaspberrypiServiceUtils.publishToDAS(owner, deviceId, temperature)) { String owner = ownerAndId.split(":")[0];
log.error("MQTT Subscriber: Publishing data to DAS failed."); String deviceId = ownerAndId.split(":")[1];
} String receivedMessage = message.toString();
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("MQTT Subscriber: Published data to DAS successfully."); log.debug("Received MQTT message for: [OWNER-" + owner + "] & [DEVICE.ID-" + deviceId + "]");
log.debug("Message [" + receivedMessage + "] topic: [" + topic + "]");
} }
} else if (receivedMessage.contains("TEMPERATURE")) { if (receivedMessage.contains("PUBLISHER")) {
String temperatureValue = receivedMessage.split(":")[1]; float temperature = Float.parseFloat(receivedMessage.split(":")[2]);
SensorDataManager.getInstance().setSensorRecord(deviceId, RaspberrypiConstants.SENSOR_TEMPERATURE,
temperatureValue, if (!RaspberrypiServiceUtils.publishToDAS(owner, deviceId, temperature)) {
Calendar.getInstance().getTimeInMillis()); log.error("MQTT Subscriber: Publishing data to DAS failed.");
}
if (log.isDebugEnabled()) {
log.debug("MQTT Subscriber: Published data to DAS successfully.");
}
} else if (receivedMessage.contains("TEMPERATURE")) {
String temperatureValue = receivedMessage.split(":")[1];
SensorDataManager.getInstance().setSensorRecord(deviceId, RaspberrypiConstants.SENSOR_TEMPERATURE,
temperatureValue,
Calendar.getInstance().getTimeInMillis());
}
} }
} }

@ -122,54 +122,56 @@ public class VirtualFireAlarmMQTTConnector extends MQTTTransportHandler {
*/ */
@Override @Override
public void processIncomingMessage(MqttMessage mqttMessage, String... messageParams) { public void processIncomingMessage(MqttMessage mqttMessage, String... messageParams) {
String topic = messageParams[0]; if(messageParams.length != 0) {
// owner and the deviceId are extracted from the MQTT topic to which the messgae was received. String topic = messageParams[0];
String ownerAndId = topic.replace(serverName + File.separator, ""); // owner and the deviceId are extracted from the MQTT topic to which the messgae was received.
ownerAndId = ownerAndId.replace(File.separator + VirtualFireAlarmConstants.DEVICE_TYPE + File.separator, ":"); String ownerAndId = topic.replace(serverName + File.separator, "");
ownerAndId = ownerAndId.replace(File.separator + "publisher", ""); ownerAndId = ownerAndId.replace(File.separator + VirtualFireAlarmConstants.DEVICE_TYPE + File.separator, ":");
ownerAndId = ownerAndId.replace(File.separator + "publisher", "");
String owner = ownerAndId.split(":")[0]; String owner = ownerAndId.split(":")[0];
String deviceId = ownerAndId.split(":")[1]; String deviceId = ownerAndId.split(":")[1];
if (log.isDebugEnabled()) {
log.debug("Received MQTT message for: [OWNER-" + owner + "] & [DEVICE.ID-" + deviceId + "]");
}
String actualMessage;
try {
// the hash-code of the deviceId is used as the alias for device certificates during SCEP enrollment.
// hence, the same is used here to fetch the device-specific-certificate from the key store.
PublicKey clientPublicKey = VirtualFireAlarmServiceUtils.getDevicePublicKey(deviceId);
PrivateKey serverPrivateKey = SecurityManager.getServerPrivateKey();
// the MQTT-messages from VirtualFireAlarm devices are in the form {"Msg":<MESSAGE>, "Sig":<SIGNATURE>}
actualMessage = VirtualFireAlarmServiceUtils.extractMessageFromPayload(mqttMessage.toString(),
serverPrivateKey, clientPublicKey);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("MQTT: Received Message [" + actualMessage + "] topic: [" + topic + "]"); log.debug("Received MQTT message for: [OWNER-" + owner + "] & [DEVICE.ID-" + deviceId + "]");
} }
if (actualMessage.contains("PUBLISHER")) { String actualMessage;
float temperature = Float.parseFloat(actualMessage.split(":")[2]); try {
// the hash-code of the deviceId is used as the alias for device certificates during SCEP enrollment.
if (!VirtualFireAlarmServiceUtils.publishToDAS(owner, deviceId, temperature)) { // hence, the same is used here to fetch the device-specific-certificate from the key store.
log.error("MQTT Subscriber: Publishing data to DAS failed."); PublicKey clientPublicKey = VirtualFireAlarmServiceUtils.getDevicePublicKey(deviceId);
} PrivateKey serverPrivateKey = SecurityManager.getServerPrivateKey();
// the MQTT-messages from VirtualFireAlarm devices are in the form {"Msg":<MESSAGE>, "Sig":<SIGNATURE>}
actualMessage = VirtualFireAlarmServiceUtils.extractMessageFromPayload(mqttMessage.toString(),
serverPrivateKey, clientPublicKey);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("MQTT Subscriber: Published data to DAS successfully."); log.debug("MQTT: Received Message [" + actualMessage + "] topic: [" + topic + "]");
} }
} else if (actualMessage.contains("TEMPERATURE")) { if (actualMessage.contains("PUBLISHER")) {
String temperatureValue = actualMessage.split(":")[1]; float temperature = Float.parseFloat(actualMessage.split(":")[2]);
SensorDataManager.getInstance().setSensorRecord(deviceId, VirtualFireAlarmConstants.SENSOR_TEMP,
temperatureValue, if (!VirtualFireAlarmServiceUtils.publishToDAS(owner, deviceId, temperature)) {
Calendar.getInstance().getTimeInMillis()); log.error("MQTT Subscriber: Publishing data to DAS failed.");
}
if (log.isDebugEnabled()) {
log.debug("MQTT Subscriber: Published data to DAS successfully.");
}
} else if (actualMessage.contains("TEMPERATURE")) {
String temperatureValue = actualMessage.split(":")[1];
SensorDataManager.getInstance().setSensorRecord(deviceId, VirtualFireAlarmConstants.SENSOR_TEMP,
temperatureValue,
Calendar.getInstance().getTimeInMillis());
}
} catch (VirtualFireAlarmException e) {
String errorMsg =
"CertificateManagementService failure oo Signature-Verification/Decryption was unsuccessful.";
log.error(errorMsg, e);
} }
} catch (VirtualFireAlarmException e) {
String errorMsg =
"CertificateManagementService failure oo Signature-Verification/Decryption was unsuccessful.";
log.error(errorMsg, e);
} }
} }

Loading…
Cancel
Save