diff --git a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/transformer/MQTTContentTransformer.java b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/transformer/MQTTContentTransformer.java new file mode 100644 index 0000000000..e3a89d0180 --- /dev/null +++ b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/transformer/MQTTContentTransformer.java @@ -0,0 +1,89 @@ +/* +* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +* +* WSO2 Inc. licenses this file to you under the Apache License, +* Version 2.0 (the "License"); you may not use this file except +* in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.wso2.carbon.device.mgt.input.adapter.extension.transformer; + +import com.jayway.jsonpath.JsonPath; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; +import org.wso2.carbon.device.mgt.input.adapter.extension.ContentTransformer; + +import java.util.Map; + +/** + * This holds the default implementation of ContentTransformer + */ +public class MQTTContentTransformer implements ContentTransformer { + private static final String MQTT_CONTENT_TRANSFORMER = "iot-mqtt"; + private static final String TOPIC = "topic"; + private static String JSON_ARRAY_START_CHAR = "["; + + private static final Log log = LogFactory.getLog(MQTTContentTransformer.class); + + @Override + public String getType() { + return MQTT_CONTENT_TRANSFORMER; + } + + @Override + public Object transform(Object messagePayload, Map dynamicProperties) { + String topic = (String) dynamicProperties.get(TOPIC); + String topics[] = topic.split("/"); + String deviceId = topics[2]; + String deviceType = topics[1]; + String message = (String) messagePayload; + try { + if (message.startsWith(JSON_ARRAY_START_CHAR)) { + return processMultipleEvents(message, deviceId, deviceType); + } else { + return processSingleEvent(message, deviceId, deviceType); + } + } catch (ParseException e) { + log.error("Invalid input " + message, e); + return false; + } + } + + private String processSingleEvent(String msg, String deviceIdFromTopic, String deviceIdJsonPath) + throws ParseException { + JSONParser parser = new JSONParser(); + JSONObject jsonObject = new JSONObject(); + jsonObject.put("deviceId", deviceIdFromTopic); + JSONObject eventObject = new JSONObject(); + eventObject.put("payloadData", parser.parse(msg)); + eventObject.put("metaData", jsonObject); + JSONObject event = new JSONObject(); + event.put("event", eventObject); + return event.toJSONString(); + } + + private String processMultipleEvents(String msg, String deviceIdFromTopic, String deviceIdJsonPath) + throws ParseException { + JSONParser jsonParser = new JSONParser(); + JSONArray jsonArray = (JSONArray) jsonParser.parse(msg); + JSONArray eventsArray = new JSONArray(); + for (int i = 0; i < jsonArray.size(); i++) { + eventsArray.add(i, processSingleEvent(jsonArray.get(i).toString(), deviceIdFromTopic, deviceIdJsonPath)); + } + return eventsArray.toJSONString(); + } +} diff --git a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/validator/MQTTContentValidator.java b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/validator/MQTTContentValidator.java index 3322acb9db..f395178b5a 100644 --- a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/validator/MQTTContentValidator.java +++ b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/validator/MQTTContentValidator.java @@ -33,10 +33,10 @@ public class MQTTContentValidator implements ContentValidator { private static final String JSON_ARRAY_START_CHAR = "["; private static final Log log = LogFactory.getLog(MQTTContentValidator.class); private static final String CDMF_MQTT_CONTENT_VALIDATOR = "iot-mqtt"; - public static final String DEVICE_ID_JSON_PATH = "event.metaData.deviceId"; - public static final String DEVICE_TYPE_JSON_PATH = "event.metaData.deviceId"; - public static final String TOPIC = "topic"; - public static final int DEVICE_ID_TOPIC_HIERARCHY_INDEX = 2; + private static final String DEVICE_ID_JSON_PATH = "event.metaData.deviceId"; + private static final String DEVICE_TYPE_JSON_PATH = "event.metaData.deviceId"; + private static final String TOPIC = "topic"; + private static final int DEVICE_ID_TOPIC_HIERARCHY_INDEX = 2; @Override public String getType() { @@ -47,15 +47,14 @@ public class MQTTContentValidator implements ContentValidator { public ContentInfo validate(Object msgPayload, Map dynamicParams) { String topic = (String) dynamicParams.get(TOPIC); String topics[] = topic.split("/"); - String deviceIdJsonPath = DEVICE_ID_JSON_PATH; int deviceIdInTopicHierarchyLevelIndex = DEVICE_ID_TOPIC_HIERARCHY_INDEX; String deviceIdFromTopic = topics[deviceIdInTopicHierarchyLevelIndex]; boolean status; String message = (String) msgPayload; if (message.startsWith(JSON_ARRAY_START_CHAR)) { - status = processMultipleEvents(message, deviceIdFromTopic, deviceIdJsonPath); + status = processMultipleEvents(message, deviceIdFromTopic, DEVICE_ID_JSON_PATH); } else { - status = processSingleEvent(message, deviceIdFromTopic, deviceIdJsonPath); + status = processSingleEvent(message, deviceIdFromTopic, DEVICE_ID_JSON_PATH); } return new ContentInfo(status, msgPayload); } diff --git a/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/DeviceTypeOperationAdapterSubscription.java b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/DeviceTypeOperationAdapterSubscription.java index b2896991fc..7dcda874c5 100644 --- a/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/DeviceTypeOperationAdapterSubscription.java +++ b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/DeviceTypeOperationAdapterSubscription.java @@ -21,7 +21,6 @@ package org.wso2.carbon.device.mgt.mqtt.notification.listener; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.wso2.carbon.context.PrivilegedCarbonContext; -import org.wso2.carbon.device.mgt.common.pull.notification.NotificationContext; import org.wso2.carbon.device.mgt.common.pull.notification.PullNotificationExecutionFailedException; import org.wso2.carbon.device.mgt.mqtt.notification.listener.internal.MqttNotificationDataHolder; import org.wso2.carbon.event.input.adapter.core.InputEventAdapterSubscription; @@ -48,10 +47,10 @@ public class DeviceTypeOperationAdapterSubscription implements InputEventAdapter try { PrivilegedCarbonContext.getThreadLocalCarbonContext().setUsername(PrivilegedCarbonContext. getThreadLocalCarbonContext().getUserRealm().getRealmConfiguration().getAdminUserName()); - NotificationContext notificationContext = notificationMessage.getNotificationContext(); - deviceType = notificationContext.getDeviceId().getType(); - MqttNotificationDataHolder.getInstance().getDeviceManagementProviderService() - .executePullNotification(deviceType, notificationContext); + deviceType = notificationMessage.getDeviceIdentifier().getType(); + MqttNotificationDataHolder.getInstance().getDeviceManagementProviderService(). + updatePullNotificationOperation(notificationMessage.getDeviceIdentifier(), + notificationMessage.getOperation()); } catch (UserStoreException e) { log.error("Failed to retrieve tenant username", e); } catch (PullNotificationExecutionFailedException e) { diff --git a/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/NotificationMessage.java b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/NotificationMessage.java index 3fdb6e7bc1..dfd17e29da 100644 --- a/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/NotificationMessage.java +++ b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/NotificationMessage.java @@ -18,16 +18,19 @@ */ package org.wso2.carbon.device.mgt.mqtt.notification.listener; -import org.wso2.carbon.device.mgt.common.pull.notification.NotificationContext; +import org.wso2.carbon.device.mgt.common.DeviceIdentifier; +import org.wso2.carbon.device.mgt.common.operation.mgt.Operation; public class NotificationMessage { private String tenantDomain; - private NotificationContext notificationContext; + private DeviceIdentifier deviceIdentifier; + private Operation operation; - public NotificationMessage(String tenantDomain, NotificationContext notificationContext) { + public NotificationMessage(String tenantDomain, DeviceIdentifier deviceIdentifier,Operation operation) { this.tenantDomain = tenantDomain; - this.notificationContext = notificationContext; + this.operation = operation; + this.deviceIdentifier = deviceIdentifier; } public String getTenantDomain() { @@ -38,12 +41,20 @@ public class NotificationMessage { this.tenantDomain = tenantDomain; } - public NotificationContext getNotificationContext() { - return notificationContext; + public Operation getOperation() { + return operation; } public void setNotificationContext( - NotificationContext notificationContext) { - this.notificationContext = notificationContext; + Operation operation) { + this.operation = operation; + } + + public DeviceIdentifier getDeviceIdentifier() { + return deviceIdentifier; + } + + public void setDeviceIdentifier(DeviceIdentifier deviceIdentifier) { + this.deviceIdentifier = deviceIdentifier; } } diff --git a/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/PullNotificationMqttContentTransformer.java b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/PullNotificationMqttContentTransformer.java index 18b9a331b0..1f5a2417f6 100644 --- a/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/PullNotificationMqttContentTransformer.java +++ b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/PullNotificationMqttContentTransformer.java @@ -19,11 +19,9 @@ package org.wso2.carbon.device.mgt.mqtt.notification.listener; import com.google.gson.Gson; -import com.google.gson.JsonSyntaxException; import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.device.mgt.common.DeviceIdentifier; -import org.wso2.carbon.device.mgt.common.pull.notification.NotificationContext; -import org.wso2.carbon.device.mgt.common.pull.notification.NotificationPayload; +import org.wso2.carbon.device.mgt.common.operation.mgt.Operation; import org.wso2.carbon.device.mgt.input.adapter.extension.ContentTransformer; import java.util.Map; @@ -50,10 +48,8 @@ public class PullNotificationMqttContentTransformer implements ContentTransforme String deviceId = topicParams[2]; Gson gson = new Gson(); try { - NotificationPayload notificationPayload = gson.fromJson((String) message, NotificationPayload.class); - NotificationContext notificationContext = new NotificationContext - (new DeviceIdentifier(deviceId, deviceType), notificationPayload); - return new NotificationMessage(tenantDomain, notificationContext); + Operation operation = gson.fromJson((String) message, Operation.class); + return new NotificationMessage(tenantDomain, new DeviceIdentifier(deviceId, deviceType),operation); } catch (Exception e) { //Avoid notification listener to fail. return new Object(); diff --git a/features/extensions-feature/org.wso2.carbon.device.mgt.notification.listener.feature/pom.xml b/features/extensions-feature/org.wso2.carbon.device.mgt.notification.listener.feature/pom.xml index ccfcaef415..3e8c65d6b6 100644 --- a/features/extensions-feature/org.wso2.carbon.device.mgt.notification.listener.feature/pom.xml +++ b/features/extensions-feature/org.wso2.carbon.device.mgt.notification.listener.feature/pom.xml @@ -12,7 +12,7 @@ org.wso2.carbon.device.mgt.notification.listener.feature pom - 3.0.37-SNAPSHOT + 3.0.38-SNAPSHOT WSO2 Carbon - Notification Listener http://wso2.org This feature contains the core bundles required iot core listeners diff --git a/pom.xml b/pom.xml index e0f052795b..ad77c31462 100644 --- a/pom.xml +++ b/pom.xml @@ -1157,7 +1157,7 @@ 1.1.1 - 2.0.69-SNAPSHOT + 2.0.71-SNAPSHOT [2.0.0, 3.0.0)