diff --git a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java
index 395f07aecc..5ae262b9a3 100644
--- a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java
+++ b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java
@@ -285,7 +285,6 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
try {
String scopes = mqttBrokerConnectionConfiguration.getBrokerScopes();
scopes += " perm:topic:sub:" + this.topic.replace("/",":");
- scopes += " perm:topic:pub:" + this.topic.replace("/",":");
TokenRequest tokenRequest = new TokenRequest(clientId, clientSecret,
null, scopes.toString(), "client_credentials", null,
diff --git a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.mqtt/pom.xml b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.mqtt/pom.xml
index bbe29c682a..788b273159 100644
--- a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.mqtt/pom.xml
+++ b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.mqtt/pom.xml
@@ -73,6 +73,10 @@
org.wso2.carbon
org.wso2.carbon.user.api
+
+ org.wso2.carbon.devicemgt
+ org.wso2.carbon.apimgt.keymgt.extension
+
@@ -135,7 +139,10 @@
org.apache.http.message,
org.apache.commons.ssl,
org.wso2.carbon.identity.jwt.client.extension.*,
- org.wso2.carbon.user.api
+ org.wso2.carbon.user.api,
+ org.wso2.carbon.apimgt.keymgt.extension;version="[5.0,6)",
+ org.wso2.carbon.apimgt.keymgt.extension.exception;version="[5.0,6)",
+ org.wso2.carbon.apimgt.keymgt.extension.service;version="[5.0,6)"
diff --git a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/output/adapter/mqtt/util/MQTTAdapterPublisher.java b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/output/adapter/mqtt/util/MQTTAdapterPublisher.java
index 38f078ff7c..9a2777c81a 100644
--- a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/output/adapter/mqtt/util/MQTTAdapterPublisher.java
+++ b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/output/adapter/mqtt/util/MQTTAdapterPublisher.java
@@ -34,6 +34,13 @@ import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
+import org.wso2.carbon.apimgt.keymgt.extension.DCRResponse;
+import org.wso2.carbon.apimgt.keymgt.extension.TokenRequest;
+import org.wso2.carbon.apimgt.keymgt.extension.TokenResponse;
+import org.wso2.carbon.apimgt.keymgt.extension.exception.BadRequestException;
+import org.wso2.carbon.apimgt.keymgt.extension.exception.KeyMgtException;
+import org.wso2.carbon.apimgt.keymgt.extension.service.KeyMgtService;
+import org.wso2.carbon.apimgt.keymgt.extension.service.KeyMgtServiceImpl;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.output.adapter.mqtt.internal.OutputAdapterServiceDataHolder;
import org.wso2.carbon.event.output.adapter.core.exception.ConnectionUnavailableException;
@@ -62,9 +69,12 @@ public class MQTTAdapterPublisher {
String clientId;
int tenantId;
+ private String tenantDomain;
+
public MQTTAdapterPublisher(MQTTBrokerConnectionConfiguration mqttBrokerConnectionConfiguration, String clientId
, int tenantId) {
this.tenantId = tenantId;
+ this.tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
if (clientId == null || clientId.trim().isEmpty()) {
this.clientId = MqttClient.generateClientId();
}
@@ -85,8 +95,9 @@ public class MQTTAdapterPublisher {
connectionOptions.setCleanSession(cleanSession);
connectionOptions.setKeepAliveInterval(keepAlive);
if (mqttBrokerConnectionConfiguration.getUsername() != null) {
- connectionOptions.setUserName(getToken());
- connectionOptions.setPassword(MQTTEventAdapterConstants.DEFAULT_PASSWORD.toCharArray());
+ String accessToken = getToken();
+ connectionOptions.setUserName(accessToken.substring(0, 18));
+ connectionOptions.setPassword(accessToken.substring(19).toCharArray());
}
// Construct an MQTT blocking mode client
mqttClient = new MqttClient(mqttBrokerConnectionConfiguration.getBrokerUrl(), clientId, dataStore);
@@ -151,53 +162,22 @@ public class MQTTAdapterPublisher {
if (dcrUrlString != null && !dcrUrlString.isEmpty()) {
try {
- URL dcrUrl = new URL(dcrUrlString);
- HttpClient httpClient = MQTTUtil.getHttpClient(dcrUrl.getProtocol());
- HttpPost postMethod = new HttpPost(dcrUrlString);
- RegistrationProfile registrationProfile = new RegistrationProfile();
- registrationProfile.setCallbackUrl(MQTTEventAdapterConstants.EMPTY_STRING);
- registrationProfile.setGrantType(MQTTEventAdapterConstants.GRANT_TYPE);
- registrationProfile.setOwner(username);
- registrationProfile.setTokenScope(MQTTEventAdapterConstants.TOKEN_SCOPE);
- if (!mqttBrokerConnectionConfiguration.isGlobalCredentailSet()) {
- registrationProfile.setClientName(MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX
- + mqttBrokerConnectionConfiguration.getAdapterName() +
- "_" + tenantId);
- registrationProfile.setIsSaasApp(false);
- } else {
- registrationProfile.setClientName(MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX
- + mqttBrokerConnectionConfiguration.getAdapterName());
- registrationProfile.setIsSaasApp(true);
- }
- String jsonString = registrationProfile.toJSON();
- StringEntity requestEntity = new StringEntity(jsonString, ContentType.APPLICATION_JSON);
- postMethod.setEntity(requestEntity);
- String basicAuth = getBase64Encode(username, password);
- postMethod.setHeader(new BasicHeader(MQTTEventAdapterConstants.AUTHORIZATION_HEADER_NAME,
- MQTTEventAdapterConstants.AUTHORIZATION_HEADER_VALUE_PREFIX +
- basicAuth));
- HttpResponse httpResponse = httpClient.execute(postMethod);
- if (httpResponse != null) {
- String response = MQTTUtil.getResponseString(httpResponse);
- try {
- if (response != null) {
- JSONParser jsonParser = new JSONParser();
- JSONObject jsonPayload = (JSONObject) jsonParser.parse(response);
- String clientId = (String) jsonPayload.get(MQTTEventAdapterConstants.CLIENT_ID);
- String clientSecret = (String) jsonPayload.get(MQTTEventAdapterConstants.CLIENT_SECRET);
- return getToken(clientId, clientSecret);
- }
- } catch (ParseException e) {
- String msg = "error occurred while parsing generating token for the adapter";
- log.error(msg, e);
- }
- }
- } catch (MalformedURLException e) {
- throw new OutputEventAdapterRuntimeException("Invalid dcrUrl : " + dcrUrlString);
- } catch (KeyManagementException | NoSuchAlgorithmException | KeyStoreException | IOException e) {
- throw new OutputEventAdapterRuntimeException("Failed to create an https connection.", e);
- } catch (JWTClientException | UserStoreException e) {
- log.error("Failed to create an oauth token with jwt grant type.", e);
+ KeyMgtService keyMgtService = new KeyMgtServiceImpl();
+ String applicationName = MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX
+ + mqttBrokerConnectionConfiguration.getAdapterName();
+ DCRResponse dcrResponse = keyMgtService.dynamicClientRegistration(applicationName, username,
+ "client_credentials", null, new String[]{"device_management"}, false, Integer.MAX_VALUE);
+ return getToken(dcrResponse.getClientId(), dcrResponse.getClientSecret());
+// connectionOptions.setUserName(accessToken.substring(0, 18));
+// connectionOptions.setPassword(accessToken.substring(19).toCharArray());
+
+
+ } catch (JWTClientException | UserStoreException e) {
+ log.error("Failed to create an oauth token with client_credentials grant type.", e);
+ throw new OutputEventAdapterRuntimeException("Failed to create an oauth token with client_credentials grant type.", e);
+ } catch (KeyMgtException e) {
+ log.error("Failed to create an application.", e);
+ throw new OutputEventAdapterRuntimeException("Failed to create an application.", e);
}
}
throw new OutputEventAdapterRuntimeException("Invalid configuration for mqtt publisher");
@@ -206,24 +186,24 @@ public class MQTTAdapterPublisher {
private String getToken(String clientId, String clientSecret)
throws UserStoreException, JWTClientException {
PrivilegedCarbonContext.startTenantFlow();
- PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(tenantId, true);
+ PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(tenantDomain, true);
try {
String scopes = mqttBrokerConnectionConfiguration.getScopes();
- String username = mqttBrokerConnectionConfiguration.getUsername();
- if (mqttBrokerConnectionConfiguration.isGlobalCredentailSet()) {
- username = PrivilegedCarbonContext.getThreadLocalCarbonContext()
- .getUserRealm().getRealmConfiguration().getAdminUserName() + "@" + PrivilegedCarbonContext
- .getThreadLocalCarbonContext().getTenantDomain(true);
- }
+ scopes += " perm:topic:pub:" + tenantDomain + ":+:+:operation";
+
+ TokenRequest tokenRequest = new TokenRequest(clientId, clientSecret,
+ null, scopes.toString(), "client_credentials", null,
+ null, null, null, Integer.MAX_VALUE);
+ KeyMgtService keyMgtService = new KeyMgtServiceImpl();
+ TokenResponse tokenResponse = keyMgtService.generateAccessToken(tokenRequest);
- JWTClientManagerService jwtClientManagerService =
- OutputAdapterServiceDataHolder.getJwtClientManagerService();
- AccessTokenInfo accessTokenInfo = jwtClientManagerService.getJWTClient().getAccessToken(
- clientId, clientSecret, username, scopes);
- return accessTokenInfo.getAccessToken();
+ return tokenResponse.getAccessToken();
+ } catch (KeyMgtException | BadRequestException e) {
+ log.error("Error while generating access token", e);
} finally {
PrivilegedCarbonContext.endTenantFlow();
}
+ return null;
}
private String getBase64Encode(String key, String value) {
diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ExServer.java b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ExServer.java
index 9f4f593495..51da55a647 100644
--- a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ExServer.java
+++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ExServer.java
@@ -324,41 +324,62 @@ public class ExServer {
boolean isFound = false;
String tempScope = null;
- if (request.getType().equals(ClientCheckAclRequest.AclReqType.SUBSCRIBE) ||
- request.getType().equals(ClientCheckAclRequest.AclReqType.PUBLISH)) {
+ String requestTopic = request.getTopic();
- String requestTopic = request.getTopic();
-
- if (requestTopic.endsWith("/#")) {
- requestTopic = requestTopic.substring(0, requestTopic.indexOf("/#"));
- }
-
- // replace / with :
+ if (request.getType().equals(ClientCheckAclRequest.AclReqType.PUBLISH)) {
requestTopic = requestTopic.replace("/", ":");
- if (request.getType().equals(ClientCheckAclRequest.AclReqType.SUBSCRIBE)) {
- tempScope = "perm:topic:sub:" + requestTopic;
- }
- if (request.getType().equals(ClientCheckAclRequest.AclReqType.PUBLISH)) {
+ String[] requestTopicParts = requestTopic.split(":");
+
+ if (requestTopicParts.length >= 4 && "operation".equals(requestTopicParts[3])) {
+ // publishing operation from iot server to emqx
+ tempScope = "perm:topic:pub:" + requestTopicParts[0] + ":+:+:operation";
+ } else {
+ // publishing operation response from device to emqx
+ // publishing events from device to emqx
tempScope = "perm:topic:pub:" + requestTopic;
}
- if (scopeList.contains(tempScope)) {
- isFound = true;
+ for (String scope : scopeList) {
+ if (scope.startsWith(tempScope)) {
+ isFound = true;
+ break;
+ }
}
+ }
- if (isFound) {
- ValuedResponse reply = ValuedResponse.newBuilder()
- .setBoolResult(true)
- .setType(ValuedResponse.ResponsedType.STOP_AND_RETURN)
- .build();
+ if (request.getType().equals(ClientCheckAclRequest.AclReqType.SUBSCRIBE)) {
+ if (requestTopic.endsWith("/#")) {
+ requestTopic = requestTopic.substring(0, requestTopic.indexOf("/#"));
+ }
- responseObserver.onNext(reply);
- responseObserver.onCompleted();
- } else {
- responseObserver.onError(new Exception("not authorized"));
+ requestTopic = requestTopic.replace("/", ":");
+ // subscribing for events from iotserver to emqx
+ // subscribing for operation from device to emqx
+ // subscribing for operation response from iotserver to emqx
+ tempScope = "perm:topic:sub:" + requestTopic;
+
+ for (String scope : scopeList) {
+ if (scope.startsWith(tempScope)) {
+ isFound = true;
+ break;
+ }
}
}
+
+ if (isFound) {
+ ValuedResponse reply = ValuedResponse.newBuilder()
+ .setBoolResult(true)
+ .setType(ValuedResponse.ResponsedType.STOP_AND_RETURN)
+ .build();
+
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ } else {
+ logger.error("not authorized");
+ responseObserver.onError(new Exception("not authorized"));
+ }
+
} else {
//default
ValuedResponse reply = ValuedResponse.newBuilder()