Few changes after testing mqtt and http flow

revert-dabc3590
ayyoob 8 years ago
parent 6242fde431
commit 796de7456f

@ -25,19 +25,20 @@ import org.wso2.carbon.device.mgt.input.adapter.extension.ContentValidator;
import org.wso2.carbon.device.mgt.input.adapter.extension.InputAdapterExtensionService; import org.wso2.carbon.device.mgt.input.adapter.extension.InputAdapterExtensionService;
import org.wso2.carbon.device.mgt.input.adapter.extension.InputAdapterExtensionServiceImpl; import org.wso2.carbon.device.mgt.input.adapter.extension.InputAdapterExtensionServiceImpl;
import org.wso2.carbon.device.mgt.input.adapter.extension.transformer.DefaultContentTransformer; import org.wso2.carbon.device.mgt.input.adapter.extension.transformer.DefaultContentTransformer;
import org.wso2.carbon.device.mgt.input.adapter.extension.transformer.MQTTContentTransformer;
import org.wso2.carbon.device.mgt.input.adapter.extension.validator.DefaultContentValidator; import org.wso2.carbon.device.mgt.input.adapter.extension.validator.DefaultContentValidator;
import org.wso2.carbon.device.mgt.input.adapter.extension.validator.HTTPContentValidator; import org.wso2.carbon.device.mgt.input.adapter.extension.validator.HTTPContentValidator;
import org.wso2.carbon.device.mgt.input.adapter.extension.validator.MQTTContentValidator; import org.wso2.carbon.device.mgt.input.adapter.extension.validator.MQTTContentValidator;
/** /**
* @scr.component name="input.adapter.extension.adapterService.component" immediate="true" * @scr.component name="input.adapter.extension.adapterService.component" immediate="true"
* @scr.reference name="InputAdapterServiceComponent.service" * @scr.reference name="InputAdapterServiceComponent.content.validator.service"
* interface="org.wso2.carbon.device.mgt.input.adapter.extension.ContentValidator" * interface="org.wso2.carbon.device.mgt.input.adapter.extension.ContentValidator"
* cardinality="0..n" * cardinality="0..n"
* policy="dynamic" * policy="dynamic"
* bind="setContentValidator" * bind="setContentValidator"
* unbind="unsetContentValidator" * unbind="unsetContentValidator"
* * @scr.reference name="InputAdapterServiceComponent.service" * @scr.reference name="InputAdapterServiceComponent.transformer.service"
* interface="org.wso2.carbon.device.mgt.input.adapter.extension.ContentTransformer" * interface="org.wso2.carbon.device.mgt.input.adapter.extension.ContentTransformer"
* cardinality="0..n" * cardinality="0..n"
* policy="dynamic" * policy="dynamic"
@ -56,6 +57,7 @@ public class InputAdapterServiceComponent {
} }
InputAdapterServiceDataHolder.getInstance().addContentTransformer(new DefaultContentTransformer()); InputAdapterServiceDataHolder.getInstance().addContentTransformer(new DefaultContentTransformer());
InputAdapterServiceDataHolder.getInstance().addContentTransformer(new MQTTContentTransformer());
InputAdapterServiceDataHolder.getInstance().addContentValidator(new DefaultContentValidator()); InputAdapterServiceDataHolder.getInstance().addContentValidator(new DefaultContentValidator());
InputAdapterServiceDataHolder.getInstance().addContentValidator(new HTTPContentValidator()); InputAdapterServiceDataHolder.getInstance().addContentValidator(new HTTPContentValidator());
InputAdapterServiceDataHolder.getInstance().addContentValidator(new MQTTContentValidator()); InputAdapterServiceDataHolder.getInstance().addContentValidator(new MQTTContentValidator());
@ -87,7 +89,7 @@ public class InputAdapterServiceComponent {
InputAdapterServiceDataHolder.getInstance().addContentTransformer(contentTransformer); InputAdapterServiceDataHolder.getInstance().addContentTransformer(contentTransformer);
} }
protected void unsetContentValidator(ContentTransformer contentTransformer) { protected void unsetContentTransformer(ContentTransformer contentTransformer) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Un-setting ContentTransformer Service"); log.debug("Un-setting ContentTransformer Service");
} }

@ -33,7 +33,7 @@ import java.util.Map;
* This holds the default implementation of ContentTransformer * This holds the default implementation of ContentTransformer
*/ */
public class MQTTContentTransformer implements ContentTransformer { public class MQTTContentTransformer implements ContentTransformer {
private static final String MQTT_CONTENT_TRANSFORMER = "iot-mqtt"; private static final String MQTT_CONTENT_TRANSFORMER = "device-meta-transformer";
private static final String TOPIC = "topic"; private static final String TOPIC = "topic";
private static String JSON_ARRAY_START_CHAR = "["; private static String JSON_ARRAY_START_CHAR = "[";

@ -32,7 +32,7 @@ import java.util.Map;
public class MQTTContentValidator implements ContentValidator { public class MQTTContentValidator implements ContentValidator {
private static final String JSON_ARRAY_START_CHAR = "["; private static final String JSON_ARRAY_START_CHAR = "[";
private static final Log log = LogFactory.getLog(MQTTContentValidator.class); private static final Log log = LogFactory.getLog(MQTTContentValidator.class);
private static final String CDMF_MQTT_CONTENT_VALIDATOR = "iot-mqtt"; private static final String CDMF_MQTT_CONTENT_VALIDATOR = "deviceid-topic-content-validator";
private static final String DEVICE_ID_JSON_PATH = "event.metaData.deviceId"; private static final String DEVICE_ID_JSON_PATH = "event.metaData.deviceId";
private static final String DEVICE_TYPE_JSON_PATH = "event.metaData.deviceId"; private static final String DEVICE_TYPE_JSON_PATH = "event.metaData.deviceId";
private static final String TOPIC = "topic"; private static final String TOPIC = "topic";

@ -114,7 +114,7 @@ public class MQTTEventAdapterFactory extends InputEventAdapterFactory {
contentTransformer.setRequired(false); contentTransformer.setRequired(false);
contentTransformer.setHint( contentTransformer.setHint(
resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_CONTENT_TRANSFORMER_TYPE_HINT)); resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_CONTENT_TRANSFORMER_TYPE_HINT));
contentTransformer.setDefaultValue(MQTTEventAdapterConstants.DEFAULT); contentTransformer.setDefaultValue(MQTTEventAdapterConstants.EMPTY);
propertyList.add(contentTransformer); propertyList.add(contentTransformer);
// set clientId // set clientId

@ -271,8 +271,8 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
ContentInfo contentInfo; ContentInfo contentInfo;
Map<String, Object> dynamicProperties = new HashMap<>(); Map<String, Object> dynamicProperties = new HashMap<>();
dynamicProperties.put(MQTTEventAdapterConstants.TOPIC, topic); dynamicProperties.put(MQTTEventAdapterConstants.TOPIC, topic);
msgText = (String) contentTransformer.transform(msgText, dynamicProperties); Object transformedMessage = contentTransformer.transform(msgText, dynamicProperties);
contentInfo = contentValidator.validate(msgText, dynamicProperties); contentInfo = contentValidator.validate(transformedMessage, dynamicProperties);
if (contentInfo != null && contentInfo.isValidContent()) { if (contentInfo != null && contentInfo.isValidContent()) {
inputEventAdapterListener.onEvent(contentInfo.getMessage()); inputEventAdapterListener.onEvent(contentInfo.getMessage());
} }

@ -106,9 +106,17 @@ public class MQTTBrokerConnectionConfiguration {
this.dcrUrl = PropertyUtils this.dcrUrl = PropertyUtils
.replaceMqttProperty(globalProperties.get(MQTTEventAdapterConstants.ADAPTER_CONF_DCR_URL)); .replaceMqttProperty(globalProperties.get(MQTTEventAdapterConstants.ADAPTER_CONF_DCR_URL));
this.contentValidatorType = globalProperties.get(MQTTEventAdapterConstants.ADAPTER_CONF_CONTENT_VALIDATOR_TYPE); this.contentValidatorType = globalProperties.get(MQTTEventAdapterConstants.ADAPTER_CONF_CONTENT_VALIDATOR_TYPE);
String contentValidatorTypeLocal = eventAdapterConfiguration.getProperties()
.get(MQTTEventAdapterConstants.ADAPTER_CONF_CONTENT_VALIDATOR_TYPE);
if (contentValidatorType == null || contentValidatorType.isEmpty()) { if (contentValidatorType == null || contentValidatorType.isEmpty()) {
this.contentValidatorType = eventAdapterConfiguration.getProperties() this.contentValidatorType = eventAdapterConfiguration.getProperties()
.get(MQTTEventAdapterConstants.ADAPTER_CONF_CONTENT_VALIDATOR_TYPE); .get(MQTTEventAdapterConstants.ADAPTER_CONF_CONTENT_VALIDATOR_TYPE);
} else if (contentValidatorTypeLocal != null && !contentValidatorTypeLocal.equals(MQTTEventAdapterConstants.EMPTY)) {
this.contentValidatorType = eventAdapterConfiguration.getProperties()
.get(MQTTEventAdapterConstants.ADAPTER_CONF_CONTENT_VALIDATOR_TYPE);
}
if (this.contentValidatorType.equals(MQTTEventAdapterConstants.EMPTY)) {
this.contentValidatorType = MQTTEventAdapterConstants.DEFAULT;
} }
String cleanSession = globalProperties.get(MQTTEventAdapterConstants.ADAPTER_CONF_CLEAN_SESSION); String cleanSession = globalProperties.get(MQTTEventAdapterConstants.ADAPTER_CONF_CLEAN_SESSION);
if (cleanSession == null || cleanSession.isEmpty()) { if (cleanSession == null || cleanSession.isEmpty()) {

@ -61,6 +61,7 @@ public class MQTTEventAdapterConstants {
public static final String CLIENT_SECRET = "clientSecret"; public static final String CLIENT_SECRET = "clientSecret";
public static final String CLIENT_NAME = "client_name"; public static final String CLIENT_NAME = "client_name";
public static final String DEFAULT = "default"; public static final String DEFAULT = "default";
public static final String EMPTY = "";
public static final String MQTT_CONTENT_VALIDATION_DEFAULT_PARAMETERS = ""; public static final String MQTT_CONTENT_VALIDATION_DEFAULT_PARAMETERS = "";
public static final String TOPIC = "topic"; public static final String TOPIC = "topic";
public static final String PAYLOAD = "payload"; public static final String PAYLOAD = "payload";

@ -32,7 +32,7 @@ import java.util.Map;
*/ */
public class PullNotificationMqttContentTransformer implements ContentTransformer { public class PullNotificationMqttContentTransformer implements ContentTransformer {
public static final String MQTT_NOTIFICATION_MESSAGE_TRANSFORMER = "mqtt-notification-transformer"; public static final String MQTT_NOTIFICATION_MESSAGE_TRANSFORMER = "mqtt-operation-transformer";
@Override @Override
public String getType() { public String getType() {
@ -53,8 +53,6 @@ public class PullNotificationMqttContentTransformer implements ContentTransforme
} catch (Exception e) { } catch (Exception e) {
//Avoid notification listener to fail. //Avoid notification listener to fail.
return new Object(); return new Object();
} finally {
PrivilegedCarbonContext.endTenantFlow();
} }
} }

@ -60,7 +60,7 @@ public class PullNotificationListenerServiceComponent {
BundleContext bundleContext = componentContext.getBundleContext(); BundleContext bundleContext = componentContext.getBundleContext();
bundleContext.registerService(ServerStartupObserver.class.getName(), new PullNotificationStartupListener(), bundleContext.registerService(ServerStartupObserver.class.getName(), new PullNotificationStartupListener(),
null); null);
bundleContext.registerService(ContentTransformer.class.getName(), new PullNotificationMqttContentTransformer(), null); bundleContext.registerService(ContentTransformer.class, new PullNotificationMqttContentTransformer(), null);
} catch (Throwable e) { } catch (Throwable e) {
log.error("Error occurred while initializing pull notification provider implementation bundle", e); log.error("Error occurred while initializing pull notification provider implementation bundle", e);
} }

@ -43,6 +43,8 @@ public class MqttNotificationListener {
private static final String JSON = "json"; private static final String JSON = "json";
private static final String NAME = "iot_core_server_adapter"; private static final String NAME = "iot_core_server_adapter";
private static final String CONTENT_TRANSFORMER_TYPE = "contentTransformer"; private static final String CONTENT_TRANSFORMER_TYPE = "contentTransformer";
private static final String MQTT_CONTENT_VALIDATOR_TYPE = "contentValidator";
private static final String MQTT_CONTENT_VALIDATOR = "default";
public static void setupMqttInputAdapter() { public static void setupMqttInputAdapter() {
@ -54,6 +56,7 @@ public class MqttNotificationListener {
mqttAdapterProperties.put(TOPIC, SUBSCRIBED_TOPIC); mqttAdapterProperties.put(TOPIC, SUBSCRIBED_TOPIC);
mqttAdapterProperties.put(CONTENT_TRANSFORMER_TYPE, mqttAdapterProperties.put(CONTENT_TRANSFORMER_TYPE,
PullNotificationMqttContentTransformer.MQTT_NOTIFICATION_MESSAGE_TRANSFORMER); PullNotificationMqttContentTransformer.MQTT_NOTIFICATION_MESSAGE_TRANSFORMER);
mqttAdapterProperties.put(MQTT_CONTENT_VALIDATOR_TYPE, MQTT_CONTENT_VALIDATOR);
inputEventAdapterConfiguration.setProperties(mqttAdapterProperties); inputEventAdapterConfiguration.setProperties(mqttAdapterProperties);
try { try {
PrivilegedCarbonContext.startTenantFlow(); PrivilegedCarbonContext.startTenantFlow();

Loading…
Cancel
Save