diff --git a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/internal/InputAdapterServiceComponent.java b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/internal/InputAdapterServiceComponent.java index b7370fa3aa..ee41c354dc 100644 --- a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/internal/InputAdapterServiceComponent.java +++ b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/internal/InputAdapterServiceComponent.java @@ -25,19 +25,20 @@ import org.wso2.carbon.device.mgt.input.adapter.extension.ContentValidator; import org.wso2.carbon.device.mgt.input.adapter.extension.InputAdapterExtensionService; import org.wso2.carbon.device.mgt.input.adapter.extension.InputAdapterExtensionServiceImpl; import org.wso2.carbon.device.mgt.input.adapter.extension.transformer.DefaultContentTransformer; +import org.wso2.carbon.device.mgt.input.adapter.extension.transformer.MQTTContentTransformer; import org.wso2.carbon.device.mgt.input.adapter.extension.validator.DefaultContentValidator; import org.wso2.carbon.device.mgt.input.adapter.extension.validator.HTTPContentValidator; import org.wso2.carbon.device.mgt.input.adapter.extension.validator.MQTTContentValidator; /** * @scr.component name="input.adapter.extension.adapterService.component" immediate="true" - * @scr.reference name="InputAdapterServiceComponent.service" + * @scr.reference name="InputAdapterServiceComponent.content.validator.service" * interface="org.wso2.carbon.device.mgt.input.adapter.extension.ContentValidator" * cardinality="0..n" * policy="dynamic" * bind="setContentValidator" * unbind="unsetContentValidator" - * * @scr.reference name="InputAdapterServiceComponent.service" + * @scr.reference name="InputAdapterServiceComponent.transformer.service" * interface="org.wso2.carbon.device.mgt.input.adapter.extension.ContentTransformer" * cardinality="0..n" * policy="dynamic" @@ -56,6 +57,7 @@ public class InputAdapterServiceComponent { } InputAdapterServiceDataHolder.getInstance().addContentTransformer(new DefaultContentTransformer()); + InputAdapterServiceDataHolder.getInstance().addContentTransformer(new MQTTContentTransformer()); InputAdapterServiceDataHolder.getInstance().addContentValidator(new DefaultContentValidator()); InputAdapterServiceDataHolder.getInstance().addContentValidator(new HTTPContentValidator()); InputAdapterServiceDataHolder.getInstance().addContentValidator(new MQTTContentValidator()); @@ -87,7 +89,7 @@ public class InputAdapterServiceComponent { InputAdapterServiceDataHolder.getInstance().addContentTransformer(contentTransformer); } - protected void unsetContentValidator(ContentTransformer contentTransformer) { + protected void unsetContentTransformer(ContentTransformer contentTransformer) { if (log.isDebugEnabled()) { log.debug("Un-setting ContentTransformer Service"); } diff --git a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/transformer/MQTTContentTransformer.java b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/transformer/MQTTContentTransformer.java index e3a89d0180..fba9340096 100644 --- a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/transformer/MQTTContentTransformer.java +++ b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/transformer/MQTTContentTransformer.java @@ -33,7 +33,7 @@ import java.util.Map; * This holds the default implementation of ContentTransformer */ public class MQTTContentTransformer implements ContentTransformer { - private static final String MQTT_CONTENT_TRANSFORMER = "iot-mqtt"; + private static final String MQTT_CONTENT_TRANSFORMER = "device-meta-transformer"; private static final String TOPIC = "topic"; private static String JSON_ARRAY_START_CHAR = "["; diff --git a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/validator/MQTTContentValidator.java b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/validator/MQTTContentValidator.java index f395178b5a..20261ab6d9 100644 --- a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/validator/MQTTContentValidator.java +++ b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/validator/MQTTContentValidator.java @@ -32,7 +32,7 @@ import java.util.Map; public class MQTTContentValidator implements ContentValidator { private static final String JSON_ARRAY_START_CHAR = "["; private static final Log log = LogFactory.getLog(MQTTContentValidator.class); - private static final String CDMF_MQTT_CONTENT_VALIDATOR = "iot-mqtt"; + private static final String CDMF_MQTT_CONTENT_VALIDATOR = "deviceid-topic-content-validator"; private static final String DEVICE_ID_JSON_PATH = "event.metaData.deviceId"; private static final String DEVICE_TYPE_JSON_PATH = "event.metaData.deviceId"; private static final String TOPIC = "topic"; diff --git a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/MQTTEventAdapterFactory.java b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/MQTTEventAdapterFactory.java index f5f93b17a7..54f0ec364b 100644 --- a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/MQTTEventAdapterFactory.java +++ b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/MQTTEventAdapterFactory.java @@ -114,7 +114,7 @@ public class MQTTEventAdapterFactory extends InputEventAdapterFactory { contentTransformer.setRequired(false); contentTransformer.setHint( resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_CONTENT_TRANSFORMER_TYPE_HINT)); - contentTransformer.setDefaultValue(MQTTEventAdapterConstants.DEFAULT); + contentTransformer.setDefaultValue(MQTTEventAdapterConstants.EMPTY); propertyList.add(contentTransformer); // set clientId diff --git a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java index 4d83480569..968faa4311 100644 --- a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java +++ b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java @@ -271,8 +271,8 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { ContentInfo contentInfo; Map dynamicProperties = new HashMap<>(); dynamicProperties.put(MQTTEventAdapterConstants.TOPIC, topic); - msgText = (String) contentTransformer.transform(msgText, dynamicProperties); - contentInfo = contentValidator.validate(msgText, dynamicProperties); + Object transformedMessage = contentTransformer.transform(msgText, dynamicProperties); + contentInfo = contentValidator.validate(transformedMessage, dynamicProperties); if (contentInfo != null && contentInfo.isValidContent()) { inputEventAdapterListener.onEvent(contentInfo.getMessage()); } diff --git a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTBrokerConnectionConfiguration.java b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTBrokerConnectionConfiguration.java index 32669bd965..b32e7d6d93 100644 --- a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTBrokerConnectionConfiguration.java +++ b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTBrokerConnectionConfiguration.java @@ -106,9 +106,17 @@ public class MQTTBrokerConnectionConfiguration { this.dcrUrl = PropertyUtils .replaceMqttProperty(globalProperties.get(MQTTEventAdapterConstants.ADAPTER_CONF_DCR_URL)); this.contentValidatorType = globalProperties.get(MQTTEventAdapterConstants.ADAPTER_CONF_CONTENT_VALIDATOR_TYPE); + String contentValidatorTypeLocal = eventAdapterConfiguration.getProperties() + .get(MQTTEventAdapterConstants.ADAPTER_CONF_CONTENT_VALIDATOR_TYPE); if (contentValidatorType == null || contentValidatorType.isEmpty()) { this.contentValidatorType = eventAdapterConfiguration.getProperties() .get(MQTTEventAdapterConstants.ADAPTER_CONF_CONTENT_VALIDATOR_TYPE); + } else if (contentValidatorTypeLocal != null && !contentValidatorTypeLocal.equals(MQTTEventAdapterConstants.EMPTY)) { + this.contentValidatorType = eventAdapterConfiguration.getProperties() + .get(MQTTEventAdapterConstants.ADAPTER_CONF_CONTENT_VALIDATOR_TYPE); + } + if (this.contentValidatorType.equals(MQTTEventAdapterConstants.EMPTY)) { + this.contentValidatorType = MQTTEventAdapterConstants.DEFAULT; } String cleanSession = globalProperties.get(MQTTEventAdapterConstants.ADAPTER_CONF_CLEAN_SESSION); if (cleanSession == null || cleanSession.isEmpty()) { diff --git a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTEventAdapterConstants.java b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTEventAdapterConstants.java index 0a0b09e2a7..7c80d8a1d3 100644 --- a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTEventAdapterConstants.java +++ b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTEventAdapterConstants.java @@ -61,6 +61,7 @@ public class MQTTEventAdapterConstants { public static final String CLIENT_SECRET = "clientSecret"; public static final String CLIENT_NAME = "client_name"; public static final String DEFAULT = "default"; + public static final String EMPTY = ""; public static final String MQTT_CONTENT_VALIDATION_DEFAULT_PARAMETERS = ""; public static final String TOPIC = "topic"; public static final String PAYLOAD = "payload"; diff --git a/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/PullNotificationMqttContentTransformer.java b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/PullNotificationMqttContentTransformer.java index 1f5a2417f6..476b4eba0c 100644 --- a/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/PullNotificationMqttContentTransformer.java +++ b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/PullNotificationMqttContentTransformer.java @@ -32,7 +32,7 @@ import java.util.Map; */ public class PullNotificationMqttContentTransformer implements ContentTransformer { - public static final String MQTT_NOTIFICATION_MESSAGE_TRANSFORMER = "mqtt-notification-transformer"; + public static final String MQTT_NOTIFICATION_MESSAGE_TRANSFORMER = "mqtt-operation-transformer"; @Override public String getType() { @@ -53,8 +53,6 @@ public class PullNotificationMqttContentTransformer implements ContentTransforme } catch (Exception e) { //Avoid notification listener to fail. return new Object(); - } finally { - PrivilegedCarbonContext.endTenantFlow(); } } diff --git a/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/internal/PullNotificationListenerServiceComponent.java b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/internal/PullNotificationListenerServiceComponent.java index df15fe2cf2..7e94759154 100644 --- a/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/internal/PullNotificationListenerServiceComponent.java +++ b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/internal/PullNotificationListenerServiceComponent.java @@ -60,7 +60,7 @@ public class PullNotificationListenerServiceComponent { BundleContext bundleContext = componentContext.getBundleContext(); bundleContext.registerService(ServerStartupObserver.class.getName(), new PullNotificationStartupListener(), null); - bundleContext.registerService(ContentTransformer.class.getName(), new PullNotificationMqttContentTransformer(), null); + bundleContext.registerService(ContentTransformer.class, new PullNotificationMqttContentTransformer(), null); } catch (Throwable e) { log.error("Error occurred while initializing pull notification provider implementation bundle", e); } diff --git a/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/util/MqttNotificationListener.java b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/util/MqttNotificationListener.java index 3f0c4048f5..08800272d6 100644 --- a/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/util/MqttNotificationListener.java +++ b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/util/MqttNotificationListener.java @@ -43,6 +43,8 @@ public class MqttNotificationListener { private static final String JSON = "json"; private static final String NAME = "iot_core_server_adapter"; private static final String CONTENT_TRANSFORMER_TYPE = "contentTransformer"; + private static final String MQTT_CONTENT_VALIDATOR_TYPE = "contentValidator"; + private static final String MQTT_CONTENT_VALIDATOR = "default"; public static void setupMqttInputAdapter() { @@ -54,6 +56,7 @@ public class MqttNotificationListener { mqttAdapterProperties.put(TOPIC, SUBSCRIBED_TOPIC); mqttAdapterProperties.put(CONTENT_TRANSFORMER_TYPE, PullNotificationMqttContentTransformer.MQTT_NOTIFICATION_MESSAGE_TRANSFORMER); + mqttAdapterProperties.put(MQTT_CONTENT_VALIDATOR_TYPE, MQTT_CONTENT_VALIDATOR); inputEventAdapterConfiguration.setProperties(mqttAdapterProperties); try { PrivilegedCarbonContext.startTenantFlow();