@ -96,7 +96,7 @@ public class ConnectedCupMQTTConnector extends MQTTTransportHandler {
MqttMessage pushMessage = new MqttMessage();
String publishTopic =
serverName + File.separator + deviceOwner + File.separator +
"wso2" + File.separator + deviceOwner + File.separator +
ConnectedCupConstants.DEVICE_TYPE + File.separator + deviceId;
try {
@ -110,7 +110,7 @@ public class ConnectedCupMQTTConnector extends MQTTTransportHandler {
publishToQueue(publishTopic, pushMessage);
} catch (Exception e) {
String errorMsg = "Preparing Secure payload failed for device - [" + deviceId + "] of owner - " +
String errorMsg = "Preparing payload failed for device - [" + deviceId + "] of owner - " +
"[" + deviceOwner + "].";
throw new TransportHandlerException(errorMsg, e);
@ -122,33 +122,55 @@ public class ConnectedCupMQTTConnector extends MQTTTransportHandler {
public void processIncomingMessage(MqttMessage mqttMessage, String... strings) throws TransportHandlerException {
String topic = strings[0];
String ownerAndId = topic.replace("wso2" + File.separator + "iot" + File.separator, "");
String ownerAndId = topic.replace("wso2" + File.separator, "");
ownerAndId = ownerAndId.replace(File.separator + ConnectedCupConstants.DEVICE_TYPE + File.separator, ":");
ownerAndId = ownerAndId.replace(File.separator + "connectedcup_publisher", "");
ownerAndId = ownerAndId.replace(File.separator + "connected_publisher", "");
String owner = ownerAndId.split(":")[0];
String deviceId = ownerAndId.split(":")[1];
// String actualMessage = mqttMessage.toString();
String[] messageData = mqttMessage.toString().split(":");
Float value = Float.valueOf(messageData[1]);
// if (actualMessage.contains("PUBLISHER")) {
// float temperature = Float.parseFloat(actualMessage.split(":")[2]);
// if (!ConnectedCupServiceUtils.publishToDAS(owner, deviceId, messageData[0], value)) {
// log.error("MQTT Subscriber: Publishing data to DAS failed.");
// }
// if (log.isDebugEnabled()) {
// log.debug("MQTT Subscriber: Published data to DAS successfully.");
// }
// } else if (actualMessage.contains("TEMPERATURE")) {
// String temperatureValue = actualMessage.split(":")[1];
// SensorDataManager.getInstance().setSensorRecord(deviceId, ConnectedCupConstants.SENSOR_TEMPERATURE,
// temperatureValue,
// Calendar.getInstance().getTimeInMillis());
// }
switch(messageData[0]) {
case "temperature":
SensorDataManager.getInstance().setSensorRecord(deviceId, ConnectedCupConstants.SENSOR_TEMPERATURE,
case "temperature": SensorDataManager.getInstance().setSensorRecord(deviceId, ConnectedCupConstants.SENSOR_TEMPERATURE,
ConnectedCupServiceUtils.publishToDAS(owner, deviceId, value);
case "coffeelevel":
SensorDataManager.getInstance().setSensorRecord(deviceId, ConnectedCupConstants.SENSOR_TEMPERATURE,
case "coffeelevel": SensorDataManager.getInstance().setSensorRecord(deviceId, ConnectedCupConstants.SENSOR_LEVEL,
ConnectedCupServiceUtils.publishToDAS(owner, deviceId, value);
log.info("Received MQTT message for OWNER: " + owner + " DEVICE.ID: " + deviceId + " | Command: " +
messageData[0] +" " + messageData[1] );
ConnectedCupServiceUtils.publishToDAS(owner, deviceId, messageData[0], value);
if (log.isDebugEnabled()) {
log.debug("Received MQTT message for OWNER: " + owner + " DEVICE.ID: " + deviceId + " | Command: " +
messageData[0] +" " + messageData[1] );