fixed tenant related issue for mqtt push notification and stream artifact publishing

revert-70aa11f8
ayyoob 8 years ago
parent 1c3a89f57c
commit d2d7be6fd6

@ -46,6 +46,8 @@ public class MQTTNotificationStrategy implements NotificationStrategy {
private String mqttAdapterName; private String mqttAdapterName;
private static final Log log = LogFactory.getLog(MQTTNotificationStrategy.class); private static final Log log = LogFactory.getLog(MQTTNotificationStrategy.class);
private final PushNotificationConfig config; private final PushNotificationConfig config;
private final String providerTenantDomain;
private static final Object lockObj = new Object();
public MQTTNotificationStrategy(PushNotificationConfig config) { public MQTTNotificationStrategy(PushNotificationConfig config) {
this.config = config; this.config = config;
@ -77,7 +79,7 @@ public class MQTTNotificationStrategy implements NotificationStrategy {
adapterConfig.setName(mqttAdapterName); adapterConfig.setName(mqttAdapterName);
adapterConfig.setStaticProperties(configProperties); adapterConfig.setStaticProperties(configProperties);
try { try {
synchronized (MQTTNotificationStrategy.class) { synchronized (lockObj) {
try { try {
MQTTDataHolder.getInstance().getOutputEventAdapterService().isPolled(mqttAdapterName); MQTTDataHolder.getInstance().getOutputEventAdapterService().isPolled(mqttAdapterName);
} catch (OutputEventAdapterException e) { } catch (OutputEventAdapterException e) {
@ -88,6 +90,8 @@ public class MQTTNotificationStrategy implements NotificationStrategy {
} catch (OutputEventAdapterException e) { } catch (OutputEventAdapterException e) {
throw new InvalidConfigurationException("Error occurred while initializing MQTT output event adapter", e); throw new InvalidConfigurationException("Error occurred while initializing MQTT output event adapter", e);
} }
providerTenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain()
.toLowerCase();
} }
@Override @Override
@ -97,19 +101,47 @@ public class MQTTNotificationStrategy implements NotificationStrategy {
@Override @Override
public void execute(NotificationContext ctx) throws PushNotificationExecutionFailedException { public void execute(NotificationContext ctx) throws PushNotificationExecutionFailedException {
String adapterName = mqttAdapterName;
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(true);
if (!providerTenantDomain.equals(tenantDomain)) {
//this is to handle the device type shared with all tenant mode.
adapterName = "mqtt.adapter." + PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain()
.toLowerCase();
try {
MQTTDataHolder.getInstance().getOutputEventAdapterService().isPolled(adapterName);
} catch (OutputEventAdapterException e) {
//event adapter not created
synchronized (lockObj) {
OutputEventAdapterConfiguration adapterConfig = new OutputEventAdapterConfiguration();
adapterConfig.setType(MQTTAdapterConstants.MQTT_ADAPTER_TYPE);
adapterConfig.setMessageFormat(MessageType.TEXT);
adapterConfig.setName(adapterName);
Map<String, String> configProperties = new HashMap<String, String>();
adapterConfig.setStaticProperties(configProperties);
try {
MQTTDataHolder.getInstance().getOutputEventAdapterService().create(adapterConfig);
} catch (OutputEventAdapterException e1) {
throw new PushNotificationExecutionFailedException
("Error occurred while initializing MQTT output event adapter for shared tenant: "
+ tenantDomain, e);
}
}
}
}
Operation operation = ctx.getOperation(); Operation operation = ctx.getOperation();
Properties properties = operation.getProperties(); Properties properties = operation.getProperties();
if (properties != null && properties.get(MQTT_ADAPTER_TOPIC) != null) { if (properties != null && properties.get(MQTT_ADAPTER_TOPIC) != null) {
Map<String, String> dynamicProperties = new HashMap<>(); Map<String, String> dynamicProperties = new HashMap<>();
dynamicProperties.put("topic", (String) properties.get(MQTT_ADAPTER_TOPIC)); dynamicProperties.put("topic", (String) properties.get(MQTT_ADAPTER_TOPIC));
MQTTDataHolder.getInstance().getOutputEventAdapterService().publish(mqttAdapterName, dynamicProperties, MQTTDataHolder.getInstance().getOutputEventAdapterService().publish(adapterName, dynamicProperties,
operation.getPayLoad()); operation.getPayLoad());
} else { } else {
if (PolicyOperation.POLICY_OPERATION_CODE.equals(operation.getCode())) { if (PolicyOperation.POLICY_OPERATION_CODE.equals(operation.getCode())) {
PolicyOperation policyOperation = (PolicyOperation) operation; PolicyOperation policyOperation = (PolicyOperation) operation;
List<ProfileOperation> profileOperations = policyOperation.getProfileOperations(); List<ProfileOperation> profileOperations = policyOperation.getProfileOperations();
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(true);
String deviceType = ctx.getDeviceId().getType(); String deviceType = ctx.getDeviceId().getType();
String deviceId = ctx.getDeviceId().getId(); String deviceId = ctx.getDeviceId().getId();
for (ProfileOperation profileOperation : profileOperations) { for (ProfileOperation profileOperation : profileOperations) {
@ -118,7 +150,7 @@ public class MQTTNotificationStrategy implements NotificationStrategy {
+ deviceType + "/" + deviceId + "/" + profileOperation.getType() + deviceType + "/" + deviceId + "/" + profileOperation.getType()
.toString().toLowerCase() + "/" + profileOperation.getCode().toLowerCase(); .toString().toLowerCase() + "/" + profileOperation.getCode().toLowerCase();
dynamicProperties.put("topic", topic); dynamicProperties.put("topic", topic);
MQTTDataHolder.getInstance().getOutputEventAdapterService().publish(mqttAdapterName, dynamicProperties, MQTTDataHolder.getInstance().getOutputEventAdapterService().publish(adapterName, dynamicProperties,
profileOperation.getPayLoad()); profileOperation.getPayLoad());
} }
@ -131,7 +163,7 @@ public class MQTTNotificationStrategy implements NotificationStrategy {
if (operation.getPayLoad() == null) { if (operation.getPayLoad() == null) {
operation.setPayLoad(""); operation.setPayLoad("");
} }
MQTTDataHolder.getInstance().getOutputEventAdapterService().publish(mqttAdapterName, dynamicProperties, MQTTDataHolder.getInstance().getOutputEventAdapterService().publish(adapterName, dynamicProperties,
operation.getPayLoad()); operation.getPayLoad());
} }

@ -174,6 +174,9 @@ public class DeviceAnalyticsArtifactUploaderAdminServiceImpl implements DeviceAn
publishDynamicEventReceivers(type, MultitenantConstants.SUPER_TENANT_DOMAIN_NAME, receiverFileList); publishDynamicEventReceivers(type, MultitenantConstants.SUPER_TENANT_DOMAIN_NAME, receiverFileList);
} }
} }
if (streamFileList != null) {
publishDynamicEventStream(type, tenantDomain, streamFileList);
}
if (deployAnalyticsCapp(type, list)){ if (deployAnalyticsCapp(type, list)){
return Response.status(Response.Status.BAD_REQUEST) return Response.status(Response.Status.BAD_REQUEST)
.entity("\"Error, Artifact does not exist.\"").build(); .entity("\"Error, Artifact does not exist.\"").build();

Loading…
Cancel
Save