From d2d7be6fd6829afa918404c2bc3a5bd94c3ebd58 Mon Sep 17 00:00:00 2001 From: ayyoob Date: Wed, 10 May 2017 13:50:06 +0530 Subject: [PATCH] fixed tenant related issue for mqtt push notification and stream artifact publishing --- .../mqtt/MQTTNotificationStrategy.java | 42 ++++++++++++++++--- ...yticsArtifactUploaderAdminServiceImpl.java | 3 ++ 2 files changed, 40 insertions(+), 5 deletions(-) diff --git a/components/device-mgt-extensions/org.wso2.carbon.device.mgt.extensions.push.notification.provider.mqtt/src/main/java/org/wso2/carbon/device/mgt/extensions/push/notification/provider/mqtt/MQTTNotificationStrategy.java b/components/device-mgt-extensions/org.wso2.carbon.device.mgt.extensions.push.notification.provider.mqtt/src/main/java/org/wso2/carbon/device/mgt/extensions/push/notification/provider/mqtt/MQTTNotificationStrategy.java index 288b902172..121599324a 100644 --- a/components/device-mgt-extensions/org.wso2.carbon.device.mgt.extensions.push.notification.provider.mqtt/src/main/java/org/wso2/carbon/device/mgt/extensions/push/notification/provider/mqtt/MQTTNotificationStrategy.java +++ b/components/device-mgt-extensions/org.wso2.carbon.device.mgt.extensions.push.notification.provider.mqtt/src/main/java/org/wso2/carbon/device/mgt/extensions/push/notification/provider/mqtt/MQTTNotificationStrategy.java @@ -46,6 +46,8 @@ public class MQTTNotificationStrategy implements NotificationStrategy { private String mqttAdapterName; private static final Log log = LogFactory.getLog(MQTTNotificationStrategy.class); private final PushNotificationConfig config; + private final String providerTenantDomain; + private static final Object lockObj = new Object(); public MQTTNotificationStrategy(PushNotificationConfig config) { this.config = config; @@ -77,7 +79,7 @@ public class MQTTNotificationStrategy implements NotificationStrategy { adapterConfig.setName(mqttAdapterName); adapterConfig.setStaticProperties(configProperties); try { - synchronized (MQTTNotificationStrategy.class) { + synchronized (lockObj) { try { MQTTDataHolder.getInstance().getOutputEventAdapterService().isPolled(mqttAdapterName); } catch (OutputEventAdapterException e) { @@ -88,6 +90,8 @@ public class MQTTNotificationStrategy implements NotificationStrategy { } catch (OutputEventAdapterException e) { throw new InvalidConfigurationException("Error occurred while initializing MQTT output event adapter", e); } + providerTenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain() + .toLowerCase(); } @Override @@ -97,19 +101,47 @@ public class MQTTNotificationStrategy implements NotificationStrategy { @Override public void execute(NotificationContext ctx) throws PushNotificationExecutionFailedException { + String adapterName = mqttAdapterName; + String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(true); + if (!providerTenantDomain.equals(tenantDomain)) { + //this is to handle the device type shared with all tenant mode. + + adapterName = "mqtt.adapter." + PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain() + .toLowerCase(); + try { + MQTTDataHolder.getInstance().getOutputEventAdapterService().isPolled(adapterName); + } catch (OutputEventAdapterException e) { + //event adapter not created + synchronized (lockObj) { + OutputEventAdapterConfiguration adapterConfig = new OutputEventAdapterConfiguration(); + adapterConfig.setType(MQTTAdapterConstants.MQTT_ADAPTER_TYPE); + adapterConfig.setMessageFormat(MessageType.TEXT); + adapterConfig.setName(adapterName); + Map configProperties = new HashMap(); + adapterConfig.setStaticProperties(configProperties); + try { + MQTTDataHolder.getInstance().getOutputEventAdapterService().create(adapterConfig); + } catch (OutputEventAdapterException e1) { + throw new PushNotificationExecutionFailedException + ("Error occurred while initializing MQTT output event adapter for shared tenant: " + + tenantDomain, e); + } + } + } + + } Operation operation = ctx.getOperation(); Properties properties = operation.getProperties(); if (properties != null && properties.get(MQTT_ADAPTER_TOPIC) != null) { Map dynamicProperties = new HashMap<>(); dynamicProperties.put("topic", (String) properties.get(MQTT_ADAPTER_TOPIC)); - MQTTDataHolder.getInstance().getOutputEventAdapterService().publish(mqttAdapterName, dynamicProperties, + MQTTDataHolder.getInstance().getOutputEventAdapterService().publish(adapterName, dynamicProperties, operation.getPayLoad()); } else { if (PolicyOperation.POLICY_OPERATION_CODE.equals(operation.getCode())) { PolicyOperation policyOperation = (PolicyOperation) operation; List profileOperations = policyOperation.getProfileOperations(); - String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(true); String deviceType = ctx.getDeviceId().getType(); String deviceId = ctx.getDeviceId().getId(); for (ProfileOperation profileOperation : profileOperations) { @@ -118,7 +150,7 @@ public class MQTTNotificationStrategy implements NotificationStrategy { + deviceType + "/" + deviceId + "/" + profileOperation.getType() .toString().toLowerCase() + "/" + profileOperation.getCode().toLowerCase(); dynamicProperties.put("topic", topic); - MQTTDataHolder.getInstance().getOutputEventAdapterService().publish(mqttAdapterName, dynamicProperties, + MQTTDataHolder.getInstance().getOutputEventAdapterService().publish(adapterName, dynamicProperties, profileOperation.getPayLoad()); } @@ -131,7 +163,7 @@ public class MQTTNotificationStrategy implements NotificationStrategy { if (operation.getPayLoad() == null) { operation.setPayLoad(""); } - MQTTDataHolder.getInstance().getOutputEventAdapterService().publish(mqttAdapterName, dynamicProperties, + MQTTDataHolder.getInstance().getOutputEventAdapterService().publish(adapterName, dynamicProperties, operation.getPayLoad()); } diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/service/impl/admin/DeviceAnalyticsArtifactUploaderAdminServiceImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/service/impl/admin/DeviceAnalyticsArtifactUploaderAdminServiceImpl.java index 9a73a99913..27e74549b0 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/service/impl/admin/DeviceAnalyticsArtifactUploaderAdminServiceImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/service/impl/admin/DeviceAnalyticsArtifactUploaderAdminServiceImpl.java @@ -174,6 +174,9 @@ public class DeviceAnalyticsArtifactUploaderAdminServiceImpl implements DeviceAn publishDynamicEventReceivers(type, MultitenantConstants.SUPER_TENANT_DOMAIN_NAME, receiverFileList); } } + if (streamFileList != null) { + publishDynamicEventStream(type, tenantDomain, streamFileList); + } if (deployAnalyticsCapp(type, list)){ return Response.status(Response.Status.BAD_REQUEST) .entity("\"Error, Artifact does not exist.\"").build();