Fixing the issue in consuming the events for carbon.super from mqtt.

revert-dabc3590
sinthuja 8 years ago
parent ff6b1e32b2
commit bbc8c8efce

@ -46,6 +46,7 @@ import org.wso2.carbon.identity.jwt.client.extension.dto.AccessTokenInfo;
import org.wso2.carbon.identity.jwt.client.extension.exception.JWTClientException; import org.wso2.carbon.identity.jwt.client.extension.exception.JWTClientException;
import org.wso2.carbon.identity.jwt.client.extension.service.JWTClientManagerService; import org.wso2.carbon.identity.jwt.client.extension.service.JWTClientManagerService;
import org.wso2.carbon.user.api.UserStoreException; import org.wso2.carbon.user.api.UserStoreException;
import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
import java.io.IOException; import java.io.IOException;
import java.net.MalformedURLException; import java.net.MalformedURLException;
@ -79,7 +80,7 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
InputEventAdapterListener inputEventAdapterListener) { InputEventAdapterListener inputEventAdapterListener) {
String mqttClientId = inputEventAdapterConfiguration.getProperties() String mqttClientId = inputEventAdapterConfiguration.getProperties()
.get(MQTTEventAdapterConstants.ADAPTER_CONF_CLIENTID); .get(MQTTEventAdapterConstants.ADAPTER_CONF_CLIENTID);
if(mqttClientId == null || mqttClientId.trim().isEmpty()){ if (mqttClientId == null || mqttClientId.trim().isEmpty()) {
mqttClientId = MqttClient.generateClientId(); mqttClientId = MqttClient.generateClientId();
} }
this.inputEventAdapterConfiguration = inputEventAdapterConfiguration; this.inputEventAdapterConfiguration = inputEventAdapterConfiguration;
@ -110,7 +111,7 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
if (contentValidatorType == null || contentValidatorType.equals(MQTTEventAdapterConstants.DEFAULT)) { if (contentValidatorType == null || contentValidatorType.equals(MQTTEventAdapterConstants.DEFAULT)) {
contentValidator = InputAdapterServiceDataHolder.getInputAdapterExtensionService() contentValidator = InputAdapterServiceDataHolder.getInputAdapterExtensionService()
.getDefaultContentValidator(); .getDefaultContentValidator();
} else { } else {
contentValidator = InputAdapterServiceDataHolder.getInputAdapterExtensionService() contentValidator = InputAdapterServiceDataHolder.getInputAdapterExtensionService()
.getContentValidator(contentValidatorType); .getContentValidator(contentValidatorType);
} }
@ -150,12 +151,12 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
registrationProfile.setTokenScope(MQTTEventAdapterConstants.TOKEN_SCOPE); registrationProfile.setTokenScope(MQTTEventAdapterConstants.TOKEN_SCOPE);
if (!mqttBrokerConnectionConfiguration.isGlobalCredentailSet()) { if (!mqttBrokerConnectionConfiguration.isGlobalCredentailSet()) {
registrationProfile.setClientName(MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX registrationProfile.setClientName(MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX
+ mqttBrokerConnectionConfiguration.getAdapterName() + + mqttBrokerConnectionConfiguration.getAdapterName() +
"_" + tenantDomain); "_" + tenantDomain);
registrationProfile.setIsSaasApp(false); registrationProfile.setIsSaasApp(false);
} else { } else {
registrationProfile.setClientName(MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX registrationProfile.setClientName(MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX
+ mqttBrokerConnectionConfiguration.getAdapterName()); + mqttBrokerConnectionConfiguration.getAdapterName());
registrationProfile.setIsSaasApp(true); registrationProfile.setIsSaasApp(true);
} }
String jsonString = registrationProfile.toJSON(); String jsonString = registrationProfile.toJSON();
@ -163,8 +164,8 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
postMethod.setEntity(requestEntity); postMethod.setEntity(requestEntity);
String basicAuth = getBase64Encode(username, password); String basicAuth = getBase64Encode(username, password);
postMethod.setHeader(new BasicHeader(MQTTEventAdapterConstants.AUTHORIZATION_HEADER_NAME, postMethod.setHeader(new BasicHeader(MQTTEventAdapterConstants.AUTHORIZATION_HEADER_NAME,
MQTTEventAdapterConstants.AUTHORIZATION_HEADER_VALUE_PREFIX + MQTTEventAdapterConstants.AUTHORIZATION_HEADER_VALUE_PREFIX +
basicAuth)); basicAuth));
HttpResponse httpResponse = httpClient.execute(postMethod); HttpResponse httpResponse = httpClient.execute(postMethod);
if (httpResponse != null) { if (httpResponse != null) {
String response = MQTTUtil.getResponseString(httpResponse); String response = MQTTUtil.getResponseString(httpResponse);
@ -183,9 +184,9 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
} }
} catch (MalformedURLException e) { } catch (MalformedURLException e) {
log.error("Invalid dcrUrl : " + dcrUrlString); log.error("Invalid dcrUrl : " + dcrUrlString);
} catch (JWTClientException | UserStoreException e) { } catch (JWTClientException | UserStoreException e) {
log.error("Failed to create an oauth token with jwt grant type.", e); log.error("Failed to create an oauth token with jwt grant type.", e);
} catch (NoSuchAlgorithmException |KeyManagementException |KeyStoreException | IOException e) { } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException | IOException e) {
log.error("Failed to create a http connection.", e); log.error("Failed to create a http connection.", e);
} }
} }
@ -204,7 +205,7 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
mqttClient.disconnect(3000); mqttClient.disconnect(3000);
} catch (MqttException e) { } catch (MqttException e) {
log.error("Can not unsubscribe from the destination " + topic + log.error("Can not unsubscribe from the destination " + topic +
" with the event adapter " + adapterName, e); " with the event adapter " + adapterName, e);
} }
} }
connectionSucceeded = true; connectionSucceeded = true;
@ -226,11 +227,13 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
} }
PrivilegedCarbonContext.startTenantFlow(); PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(tenantDomain, true); PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(tenantDomain, true);
TenantAxisUtils.getTenantConfigurationContext(tenantDomain,InputAdapterServiceDataHolder.getMainServerConfigContext()); if (!tenantDomain.equalsIgnoreCase(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME)) {
TenantAxisUtils.getTenantConfigurationContext(tenantDomain, InputAdapterServiceDataHolder.getMainServerConfigContext());
}
InputEventAdapterListener inputEventAdapterListener = InputAdapterServiceDataHolder InputEventAdapterListener inputEventAdapterListener = InputAdapterServiceDataHolder
.getInputEventAdapterService().getInputAdapterRuntime(PrivilegedCarbonContext.getThreadLocalCarbonContext() .getInputEventAdapterService().getInputAdapterRuntime(PrivilegedCarbonContext.getThreadLocalCarbonContext()
.getTenantId(), inputEventAdapterConfiguration.getName()); .getTenantId(), inputEventAdapterConfiguration.getName());
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Event received in MQTT Event Adapter - " + msgText); log.debug("Event received in MQTT Event Adapter - " + msgText);

Loading…
Cancel
Save