From bbc8c8efceb1ef7697e8221401b9677b6c42c686 Mon Sep 17 00:00:00 2001 From: sinthuja Date: Thu, 4 May 2017 22:15:36 +0530 Subject: [PATCH] Fixing the issue in consuming the events for carbon.super from mqtt. --- .../mqtt/util/MQTTAdapterListener.java | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java index f7ea4d2547..7635155d37 100644 --- a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java +++ b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java @@ -46,6 +46,7 @@ import org.wso2.carbon.identity.jwt.client.extension.dto.AccessTokenInfo; import org.wso2.carbon.identity.jwt.client.extension.exception.JWTClientException; import org.wso2.carbon.identity.jwt.client.extension.service.JWTClientManagerService; import org.wso2.carbon.user.api.UserStoreException; +import org.wso2.carbon.utils.multitenancy.MultitenantConstants; import java.io.IOException; import java.net.MalformedURLException; @@ -79,7 +80,7 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { InputEventAdapterListener inputEventAdapterListener) { String mqttClientId = inputEventAdapterConfiguration.getProperties() .get(MQTTEventAdapterConstants.ADAPTER_CONF_CLIENTID); - if(mqttClientId == null || mqttClientId.trim().isEmpty()){ + if (mqttClientId == null || mqttClientId.trim().isEmpty()) { mqttClientId = MqttClient.generateClientId(); } this.inputEventAdapterConfiguration = inputEventAdapterConfiguration; @@ -110,7 +111,7 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { if (contentValidatorType == null || contentValidatorType.equals(MQTTEventAdapterConstants.DEFAULT)) { contentValidator = InputAdapterServiceDataHolder.getInputAdapterExtensionService() .getDefaultContentValidator(); - } else { + } else { contentValidator = InputAdapterServiceDataHolder.getInputAdapterExtensionService() .getContentValidator(contentValidatorType); } @@ -150,12 +151,12 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { registrationProfile.setTokenScope(MQTTEventAdapterConstants.TOKEN_SCOPE); if (!mqttBrokerConnectionConfiguration.isGlobalCredentailSet()) { registrationProfile.setClientName(MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX - + mqttBrokerConnectionConfiguration.getAdapterName() + - "_" + tenantDomain); + + mqttBrokerConnectionConfiguration.getAdapterName() + + "_" + tenantDomain); registrationProfile.setIsSaasApp(false); } else { registrationProfile.setClientName(MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX - + mqttBrokerConnectionConfiguration.getAdapterName()); + + mqttBrokerConnectionConfiguration.getAdapterName()); registrationProfile.setIsSaasApp(true); } String jsonString = registrationProfile.toJSON(); @@ -163,8 +164,8 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { postMethod.setEntity(requestEntity); String basicAuth = getBase64Encode(username, password); postMethod.setHeader(new BasicHeader(MQTTEventAdapterConstants.AUTHORIZATION_HEADER_NAME, - MQTTEventAdapterConstants.AUTHORIZATION_HEADER_VALUE_PREFIX + - basicAuth)); + MQTTEventAdapterConstants.AUTHORIZATION_HEADER_VALUE_PREFIX + + basicAuth)); HttpResponse httpResponse = httpClient.execute(postMethod); if (httpResponse != null) { String response = MQTTUtil.getResponseString(httpResponse); @@ -183,9 +184,9 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { } } catch (MalformedURLException e) { log.error("Invalid dcrUrl : " + dcrUrlString); - } catch (JWTClientException | UserStoreException e) { + } catch (JWTClientException | UserStoreException e) { log.error("Failed to create an oauth token with jwt grant type.", e); - } catch (NoSuchAlgorithmException |KeyManagementException |KeyStoreException | IOException e) { + } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException | IOException e) { log.error("Failed to create a http connection.", e); } } @@ -204,7 +205,7 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { mqttClient.disconnect(3000); } catch (MqttException e) { log.error("Can not unsubscribe from the destination " + topic + - " with the event adapter " + adapterName, e); + " with the event adapter " + adapterName, e); } } connectionSucceeded = true; @@ -226,11 +227,13 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { } PrivilegedCarbonContext.startTenantFlow(); PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(tenantDomain, true); - TenantAxisUtils.getTenantConfigurationContext(tenantDomain,InputAdapterServiceDataHolder.getMainServerConfigContext()); + if (!tenantDomain.equalsIgnoreCase(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME)) { + TenantAxisUtils.getTenantConfigurationContext(tenantDomain, InputAdapterServiceDataHolder.getMainServerConfigContext()); + } InputEventAdapterListener inputEventAdapterListener = InputAdapterServiceDataHolder - .getInputEventAdapterService().getInputAdapterRuntime(PrivilegedCarbonContext.getThreadLocalCarbonContext() - .getTenantId(), inputEventAdapterConfiguration.getName()); + .getInputEventAdapterService().getInputAdapterRuntime(PrivilegedCarbonContext.getThreadLocalCarbonContext() + .getTenantId(), inputEventAdapterConfiguration.getName()); if (log.isDebugEnabled()) { log.debug("Event received in MQTT Event Adapter - " + msgText);