From f8e7837efc4579b2a42199550f48cfb9847a2bcc Mon Sep 17 00:00:00 2001 From: Amalka Subasinghe Date: Mon, 27 Mar 2023 15:05:21 +0530 Subject: [PATCH] fixed adding operation and operation response paths with emqx integration --- .../mqtt/util/MQTTAdapterListener.java | 1 - .../pom.xml | 9 +- .../mqtt/util/MQTTAdapterPublisher.java | 102 +++++++----------- .../mgt/plugins/emqx/exhook/ExServer.java | 69 +++++++----- 4 files changed, 94 insertions(+), 87 deletions(-) diff --git a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java index 395f07aecc..5ae262b9a3 100644 --- a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java +++ b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java @@ -285,7 +285,6 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { try { String scopes = mqttBrokerConnectionConfiguration.getBrokerScopes(); scopes += " perm:topic:sub:" + this.topic.replace("/",":"); - scopes += " perm:topic:pub:" + this.topic.replace("/",":"); TokenRequest tokenRequest = new TokenRequest(clientId, clientSecret, null, scopes.toString(), "client_credentials", null, diff --git a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.mqtt/pom.xml b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.mqtt/pom.xml index bbe29c682a..788b273159 100644 --- a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.mqtt/pom.xml +++ b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.mqtt/pom.xml @@ -73,6 +73,10 @@ org.wso2.carbon org.wso2.carbon.user.api + + org.wso2.carbon.devicemgt + org.wso2.carbon.apimgt.keymgt.extension + @@ -135,7 +139,10 @@ org.apache.http.message, org.apache.commons.ssl, org.wso2.carbon.identity.jwt.client.extension.*, - org.wso2.carbon.user.api + org.wso2.carbon.user.api, + org.wso2.carbon.apimgt.keymgt.extension;version="[5.0,6)", + org.wso2.carbon.apimgt.keymgt.extension.exception;version="[5.0,6)", + org.wso2.carbon.apimgt.keymgt.extension.service;version="[5.0,6)" diff --git a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/output/adapter/mqtt/util/MQTTAdapterPublisher.java b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/output/adapter/mqtt/util/MQTTAdapterPublisher.java index 38f078ff7c..9a2777c81a 100644 --- a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/output/adapter/mqtt/util/MQTTAdapterPublisher.java +++ b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/output/adapter/mqtt/util/MQTTAdapterPublisher.java @@ -34,6 +34,13 @@ import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; +import org.wso2.carbon.apimgt.keymgt.extension.DCRResponse; +import org.wso2.carbon.apimgt.keymgt.extension.TokenRequest; +import org.wso2.carbon.apimgt.keymgt.extension.TokenResponse; +import org.wso2.carbon.apimgt.keymgt.extension.exception.BadRequestException; +import org.wso2.carbon.apimgt.keymgt.extension.exception.KeyMgtException; +import org.wso2.carbon.apimgt.keymgt.extension.service.KeyMgtService; +import org.wso2.carbon.apimgt.keymgt.extension.service.KeyMgtServiceImpl; import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.device.mgt.output.adapter.mqtt.internal.OutputAdapterServiceDataHolder; import org.wso2.carbon.event.output.adapter.core.exception.ConnectionUnavailableException; @@ -62,9 +69,12 @@ public class MQTTAdapterPublisher { String clientId; int tenantId; + private String tenantDomain; + public MQTTAdapterPublisher(MQTTBrokerConnectionConfiguration mqttBrokerConnectionConfiguration, String clientId , int tenantId) { this.tenantId = tenantId; + this.tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(); if (clientId == null || clientId.trim().isEmpty()) { this.clientId = MqttClient.generateClientId(); } @@ -85,8 +95,9 @@ public class MQTTAdapterPublisher { connectionOptions.setCleanSession(cleanSession); connectionOptions.setKeepAliveInterval(keepAlive); if (mqttBrokerConnectionConfiguration.getUsername() != null) { - connectionOptions.setUserName(getToken()); - connectionOptions.setPassword(MQTTEventAdapterConstants.DEFAULT_PASSWORD.toCharArray()); + String accessToken = getToken(); + connectionOptions.setUserName(accessToken.substring(0, 18)); + connectionOptions.setPassword(accessToken.substring(19).toCharArray()); } // Construct an MQTT blocking mode client mqttClient = new MqttClient(mqttBrokerConnectionConfiguration.getBrokerUrl(), clientId, dataStore); @@ -151,53 +162,22 @@ public class MQTTAdapterPublisher { if (dcrUrlString != null && !dcrUrlString.isEmpty()) { try { - URL dcrUrl = new URL(dcrUrlString); - HttpClient httpClient = MQTTUtil.getHttpClient(dcrUrl.getProtocol()); - HttpPost postMethod = new HttpPost(dcrUrlString); - RegistrationProfile registrationProfile = new RegistrationProfile(); - registrationProfile.setCallbackUrl(MQTTEventAdapterConstants.EMPTY_STRING); - registrationProfile.setGrantType(MQTTEventAdapterConstants.GRANT_TYPE); - registrationProfile.setOwner(username); - registrationProfile.setTokenScope(MQTTEventAdapterConstants.TOKEN_SCOPE); - if (!mqttBrokerConnectionConfiguration.isGlobalCredentailSet()) { - registrationProfile.setClientName(MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX - + mqttBrokerConnectionConfiguration.getAdapterName() + - "_" + tenantId); - registrationProfile.setIsSaasApp(false); - } else { - registrationProfile.setClientName(MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX - + mqttBrokerConnectionConfiguration.getAdapterName()); - registrationProfile.setIsSaasApp(true); - } - String jsonString = registrationProfile.toJSON(); - StringEntity requestEntity = new StringEntity(jsonString, ContentType.APPLICATION_JSON); - postMethod.setEntity(requestEntity); - String basicAuth = getBase64Encode(username, password); - postMethod.setHeader(new BasicHeader(MQTTEventAdapterConstants.AUTHORIZATION_HEADER_NAME, - MQTTEventAdapterConstants.AUTHORIZATION_HEADER_VALUE_PREFIX + - basicAuth)); - HttpResponse httpResponse = httpClient.execute(postMethod); - if (httpResponse != null) { - String response = MQTTUtil.getResponseString(httpResponse); - try { - if (response != null) { - JSONParser jsonParser = new JSONParser(); - JSONObject jsonPayload = (JSONObject) jsonParser.parse(response); - String clientId = (String) jsonPayload.get(MQTTEventAdapterConstants.CLIENT_ID); - String clientSecret = (String) jsonPayload.get(MQTTEventAdapterConstants.CLIENT_SECRET); - return getToken(clientId, clientSecret); - } - } catch (ParseException e) { - String msg = "error occurred while parsing generating token for the adapter"; - log.error(msg, e); - } - } - } catch (MalformedURLException e) { - throw new OutputEventAdapterRuntimeException("Invalid dcrUrl : " + dcrUrlString); - } catch (KeyManagementException | NoSuchAlgorithmException | KeyStoreException | IOException e) { - throw new OutputEventAdapterRuntimeException("Failed to create an https connection.", e); - } catch (JWTClientException | UserStoreException e) { - log.error("Failed to create an oauth token with jwt grant type.", e); + KeyMgtService keyMgtService = new KeyMgtServiceImpl(); + String applicationName = MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX + + mqttBrokerConnectionConfiguration.getAdapterName(); + DCRResponse dcrResponse = keyMgtService.dynamicClientRegistration(applicationName, username, + "client_credentials", null, new String[]{"device_management"}, false, Integer.MAX_VALUE); + return getToken(dcrResponse.getClientId(), dcrResponse.getClientSecret()); +// connectionOptions.setUserName(accessToken.substring(0, 18)); +// connectionOptions.setPassword(accessToken.substring(19).toCharArray()); + + + } catch (JWTClientException | UserStoreException e) { + log.error("Failed to create an oauth token with client_credentials grant type.", e); + throw new OutputEventAdapterRuntimeException("Failed to create an oauth token with client_credentials grant type.", e); + } catch (KeyMgtException e) { + log.error("Failed to create an application.", e); + throw new OutputEventAdapterRuntimeException("Failed to create an application.", e); } } throw new OutputEventAdapterRuntimeException("Invalid configuration for mqtt publisher"); @@ -206,24 +186,24 @@ public class MQTTAdapterPublisher { private String getToken(String clientId, String clientSecret) throws UserStoreException, JWTClientException { PrivilegedCarbonContext.startTenantFlow(); - PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(tenantId, true); + PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(tenantDomain, true); try { String scopes = mqttBrokerConnectionConfiguration.getScopes(); - String username = mqttBrokerConnectionConfiguration.getUsername(); - if (mqttBrokerConnectionConfiguration.isGlobalCredentailSet()) { - username = PrivilegedCarbonContext.getThreadLocalCarbonContext() - .getUserRealm().getRealmConfiguration().getAdminUserName() + "@" + PrivilegedCarbonContext - .getThreadLocalCarbonContext().getTenantDomain(true); - } + scopes += " perm:topic:pub:" + tenantDomain + ":+:+:operation"; + + TokenRequest tokenRequest = new TokenRequest(clientId, clientSecret, + null, scopes.toString(), "client_credentials", null, + null, null, null, Integer.MAX_VALUE); + KeyMgtService keyMgtService = new KeyMgtServiceImpl(); + TokenResponse tokenResponse = keyMgtService.generateAccessToken(tokenRequest); - JWTClientManagerService jwtClientManagerService = - OutputAdapterServiceDataHolder.getJwtClientManagerService(); - AccessTokenInfo accessTokenInfo = jwtClientManagerService.getJWTClient().getAccessToken( - clientId, clientSecret, username, scopes); - return accessTokenInfo.getAccessToken(); + return tokenResponse.getAccessToken(); + } catch (KeyMgtException | BadRequestException e) { + log.error("Error while generating access token", e); } finally { PrivilegedCarbonContext.endTenantFlow(); } + return null; } private String getBase64Encode(String key, String value) { diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ExServer.java b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ExServer.java index 9f4f593495..51da55a647 100644 --- a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ExServer.java +++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ExServer.java @@ -324,41 +324,62 @@ public class ExServer { boolean isFound = false; String tempScope = null; - if (request.getType().equals(ClientCheckAclRequest.AclReqType.SUBSCRIBE) || - request.getType().equals(ClientCheckAclRequest.AclReqType.PUBLISH)) { + String requestTopic = request.getTopic(); - String requestTopic = request.getTopic(); - - if (requestTopic.endsWith("/#")) { - requestTopic = requestTopic.substring(0, requestTopic.indexOf("/#")); - } - - // replace / with : + if (request.getType().equals(ClientCheckAclRequest.AclReqType.PUBLISH)) { requestTopic = requestTopic.replace("/", ":"); - if (request.getType().equals(ClientCheckAclRequest.AclReqType.SUBSCRIBE)) { - tempScope = "perm:topic:sub:" + requestTopic; - } - if (request.getType().equals(ClientCheckAclRequest.AclReqType.PUBLISH)) { + String[] requestTopicParts = requestTopic.split(":"); + + if (requestTopicParts.length >= 4 && "operation".equals(requestTopicParts[3])) { + // publishing operation from iot server to emqx + tempScope = "perm:topic:pub:" + requestTopicParts[0] + ":+:+:operation"; + } else { + // publishing operation response from device to emqx + // publishing events from device to emqx tempScope = "perm:topic:pub:" + requestTopic; } - if (scopeList.contains(tempScope)) { - isFound = true; + for (String scope : scopeList) { + if (scope.startsWith(tempScope)) { + isFound = true; + break; + } } + } - if (isFound) { - ValuedResponse reply = ValuedResponse.newBuilder() - .setBoolResult(true) - .setType(ValuedResponse.ResponsedType.STOP_AND_RETURN) - .build(); + if (request.getType().equals(ClientCheckAclRequest.AclReqType.SUBSCRIBE)) { + if (requestTopic.endsWith("/#")) { + requestTopic = requestTopic.substring(0, requestTopic.indexOf("/#")); + } - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } else { - responseObserver.onError(new Exception("not authorized")); + requestTopic = requestTopic.replace("/", ":"); + // subscribing for events from iotserver to emqx + // subscribing for operation from device to emqx + // subscribing for operation response from iotserver to emqx + tempScope = "perm:topic:sub:" + requestTopic; + + for (String scope : scopeList) { + if (scope.startsWith(tempScope)) { + isFound = true; + break; + } } } + + if (isFound) { + ValuedResponse reply = ValuedResponse.newBuilder() + .setBoolResult(true) + .setType(ValuedResponse.ResponsedType.STOP_AND_RETURN) + .build(); + + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } else { + logger.error("not authorized"); + responseObserver.onError(new Exception("not authorized")); + } + } else { //default ValuedResponse reply = ValuedResponse.newBuilder()