diff --git a/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/MQTTEventAdapterFactory.java b/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/MQTTEventAdapterFactory.java index dbea539235..6e63d81691 100644 --- a/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/MQTTEventAdapterFactory.java +++ b/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/MQTTEventAdapterFactory.java @@ -94,12 +94,18 @@ public class MQTTEventAdapterFactory extends OutputEventAdapterFactory { qos.setOptions(new String[]{"0", "1", "2"}); qos.setDefaultValue("2"); + // set topic + Property topicProperty = new Property(MQTTEventAdapterConstants.ADAPTER_MESSAGE_TOPIC); + topicProperty.setDisplayName(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_MESSAGE_TOPIC)); + topicProperty.setRequired(false); + staticPropertyList.add(brokerUrl); staticPropertyList.add(userName); staticPropertyList.add(scopes); staticPropertyList.add(clearSession); staticPropertyList.add(qos); staticPropertyList.add(password); + staticPropertyList.add(topicProperty); return staticPropertyList; } diff --git a/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/util/MQTTAdapterPublisher.java b/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/util/MQTTAdapterPublisher.java index d352cf105c..17ea163bb0 100644 --- a/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/util/MQTTAdapterPublisher.java +++ b/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/util/MQTTAdapterPublisher.java @@ -17,37 +17,28 @@ */ package io.entgra.device.mgt.plugins.output.adapter.mqtt.util; +import io.entgra.device.mgt.core.apimgt.keymgt.extension.DCRResponse; +import io.entgra.device.mgt.core.apimgt.keymgt.extension.TokenRequest; +import io.entgra.device.mgt.core.apimgt.keymgt.extension.TokenResponse; +import io.entgra.device.mgt.core.apimgt.keymgt.extension.exception.BadRequestException; +import io.entgra.device.mgt.core.apimgt.keymgt.extension.exception.KeyMgtException; +import io.entgra.device.mgt.core.apimgt.keymgt.extension.service.KeyMgtService; +import io.entgra.device.mgt.core.apimgt.keymgt.extension.service.KeyMgtServiceImpl; +import io.entgra.device.mgt.core.identity.jwt.client.extension.exception.JWTClientException; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.ssl.Base64; -import org.apache.http.HttpResponse; -import org.apache.http.client.HttpClient; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.ContentType; -import org.apache.http.entity.StringEntity; -import org.apache.http.message.BasicHeader; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; -import org.json.simple.JSONObject; -import org.json.simple.parser.JSONParser; -import org.json.simple.parser.ParseException; -import io.entgra.device.mgt.core.apimgt.keymgt.extension.DCRResponse; -import io.entgra.device.mgt.core.apimgt.keymgt.extension.TokenRequest; -import io.entgra.device.mgt.core.apimgt.keymgt.extension.TokenResponse; -import io.entgra.device.mgt.core.apimgt.keymgt.extension.exception.BadRequestException; -import io.entgra.device.mgt.core.apimgt.keymgt.extension.exception.KeyMgtException; -import io.entgra.device.mgt.core.apimgt.keymgt.extension.service.KeyMgtService; -import io.entgra.device.mgt.core.apimgt.keymgt.extension.service.KeyMgtServiceImpl; +import org.jetbrains.annotations.NotNull; import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.event.output.adapter.core.exception.ConnectionUnavailableException; import org.wso2.carbon.event.output.adapter.core.exception.OutputEventAdapterException; import org.wso2.carbon.event.output.adapter.core.exception.OutputEventAdapterRuntimeException; -import io.entgra.device.mgt.core.identity.jwt.client.extension.dto.AccessTokenInfo; -import io.entgra.device.mgt.core.identity.jwt.client.extension.exception.JWTClientException; -import io.entgra.device.mgt.core.identity.jwt.client.extension.service.JWTClientManagerService; import org.wso2.carbon.user.api.UserStoreException; /** @@ -153,6 +144,8 @@ public class MQTTAdapterPublisher { String dcrUrlString = this.mqttBrokerConnectionConfiguration.getDcrUrl(); if (dcrUrlString != null && !dcrUrlString.isEmpty()) { + PrivilegedCarbonContext.startTenantFlow(); + PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(tenantDomain, true); try { KeyMgtService keyMgtService = new KeyMgtServiceImpl(); String applicationName = MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX @@ -170,6 +163,8 @@ public class MQTTAdapterPublisher { } catch (KeyMgtException e) { log.error("Failed to create an application.", e); throw new OutputEventAdapterRuntimeException("Failed to create an application.", e); + } finally { + PrivilegedCarbonContext.endTenantFlow(); } } throw new OutputEventAdapterRuntimeException("Invalid configuration for mqtt publisher"); @@ -177,27 +172,33 @@ public class MQTTAdapterPublisher { private String getToken(String clientId, String clientSecret) throws UserStoreException, JWTClientException { - PrivilegedCarbonContext.startTenantFlow(); - PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(tenantDomain, true); try { - String scopes = mqttBrokerConnectionConfiguration.getScopes(); - scopes += " perm:topic:pub:" + tenantDomain + ":+:+:operation"; - - TokenRequest tokenRequest = new TokenRequest(clientId, clientSecret, - null, scopes.toString(), "client_credentials", null, - null, null, null, Integer.MAX_VALUE); + TokenRequest tokenRequest = getTokenRequest(clientId, clientSecret); KeyMgtService keyMgtService = new KeyMgtServiceImpl(); TokenResponse tokenResponse = keyMgtService.generateAccessToken(tokenRequest); return tokenResponse.getAccessToken(); } catch (KeyMgtException | BadRequestException e) { log.error("Error while generating access token", e); - } finally { - PrivilegedCarbonContext.endTenantFlow(); } return null; } + @NotNull + private TokenRequest getTokenRequest(String clientId, String clientSecret) { + String scopes = mqttBrokerConnectionConfiguration.getScopes(); + scopes += " perm:topic:pub:" + tenantDomain + ":+:+:operation"; + + if (!StringUtils.isEmpty(mqttBrokerConnectionConfiguration.getTopic())) { + scopes += " perm:topic:pub:" + mqttBrokerConnectionConfiguration.getTopic().replace("/",":"); + } + + TokenRequest tokenRequest = new TokenRequest(clientId, clientSecret, + null, scopes.toString(), "client_credentials", null, + null, null, null, Integer.MAX_VALUE); + return tokenRequest; + } + private String getBase64Encode(String key, String value) { return new String(Base64.encodeBase64((key + ":" + value).getBytes())); } diff --git a/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/util/MQTTBrokerConnectionConfiguration.java b/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/util/MQTTBrokerConnectionConfiguration.java index 4796483ac9..576f111dd3 100644 --- a/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/util/MQTTBrokerConnectionConfiguration.java +++ b/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/util/MQTTBrokerConnectionConfiguration.java @@ -17,8 +17,8 @@ */ package io.entgra.device.mgt.plugins.output.adapter.mqtt.util; +import org.apache.commons.lang3.StringUtils; import org.wso2.carbon.event.output.adapter.core.OutputEventAdapterConfiguration; -import org.wso2.carbon.event.output.adapter.core.exception.OutputEventAdapterException; import java.util.Map; @@ -36,6 +36,8 @@ public class MQTTBrokerConnectionConfiguration { private boolean globalCredentailSet; private int qos; + private String topic; + public String getTokenUrl() { return tokenUrl; } @@ -79,6 +81,11 @@ public class MQTTBrokerConnectionConfiguration { public int getQos() { return qos; } + + public String getTopic() { + return topic; + } + public MQTTBrokerConnectionConfiguration(OutputEventAdapterConfiguration eventAdapterConfiguration, Map globalProperties) { adapterName = eventAdapterConfiguration.getName(); @@ -123,7 +130,10 @@ public class MQTTBrokerConnectionConfiguration { this.qos = Integer.parseInt(qosVal); } - + String topic = eventAdapterConfiguration.getStaticProperties().get(MQTTEventAdapterConstants.ADAPTER_MESSAGE_TOPIC); + if (!StringUtils.isEmpty(topic)) { + this.topic = topic; + } } }