From f451529e6d6de58bc74456d223026e826a6f6815 Mon Sep 17 00:00:00 2001 From: Amalka Subasinghe Date: Sat, 18 Mar 2023 01:49:28 +0530 Subject: [PATCH] fixing mqtt lister with content transformation and validation --- .../transformer/MQTTContentTransformer.java | 13 +- .../validator/MQTTContentValidator.java | 9 +- .../pom.xml | 9 +- .../mqtt/util/MQTTAdapterListener.java | 133 ++++++------------ .../mqtt/util/MQTTEventAdapterConstants.java | 1 + .../adapter/mqtt/util/PropertyUtils.java | 10 +- pom.xml | 7 +- 7 files changed, 86 insertions(+), 96 deletions(-) diff --git a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/transformer/MQTTContentTransformer.java b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/transformer/MQTTContentTransformer.java index fba934009..66f2b4ee8 100644 --- a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/transformer/MQTTContentTransformer.java +++ b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/transformer/MQTTContentTransformer.java @@ -35,6 +35,7 @@ import java.util.Map; public class MQTTContentTransformer implements ContentTransformer { private static final String MQTT_CONTENT_TRANSFORMER = "device-meta-transformer"; private static final String TOPIC = "topic"; + private static final String DEVICE_ID_INDEX = "deviceIdIndex"; private static String JSON_ARRAY_START_CHAR = "["; private static final Log log = LogFactory.getLog(MQTTContentTransformer.class); @@ -47,15 +48,19 @@ public class MQTTContentTransformer implements ContentTransformer { @Override public Object transform(Object messagePayload, Map dynamicProperties) { String topic = (String) dynamicProperties.get(TOPIC); + if (!dynamicProperties.containsKey(DEVICE_ID_INDEX)) { + log.error("device id not found in topic "); + return false; + } + int deviceIdIndex = (int)dynamicProperties.get(DEVICE_ID_INDEX); String topics[] = topic.split("/"); - String deviceId = topics[2]; - String deviceType = topics[1]; + String deviceId = topics[deviceIdIndex]; String message = (String) messagePayload; try { if (message.startsWith(JSON_ARRAY_START_CHAR)) { - return processMultipleEvents(message, deviceId, deviceType); + return processMultipleEvents(message, deviceId, deviceId); } else { - return processSingleEvent(message, deviceId, deviceType); + return processSingleEvent(message, deviceId, deviceId); } } catch (ParseException e) { log.error("Invalid input " + message, e); diff --git a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/validator/MQTTContentValidator.java b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/validator/MQTTContentValidator.java index 20261ab6d..71d68d6ea 100644 --- a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/validator/MQTTContentValidator.java +++ b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/validator/MQTTContentValidator.java @@ -36,6 +36,7 @@ public class MQTTContentValidator implements ContentValidator { private static final String DEVICE_ID_JSON_PATH = "event.metaData.deviceId"; private static final String DEVICE_TYPE_JSON_PATH = "event.metaData.deviceId"; private static final String TOPIC = "topic"; + private static final String DEVICE_ID_INDEX = "deviceIdIndex"; private static final int DEVICE_ID_TOPIC_HIERARCHY_INDEX = 2; @Override @@ -46,9 +47,13 @@ public class MQTTContentValidator implements ContentValidator { @Override public ContentInfo validate(Object msgPayload, Map dynamicParams) { String topic = (String) dynamicParams.get(TOPIC); + if (!dynamicParams.containsKey(DEVICE_ID_INDEX)) { + log.error("device id not found in topic "); + return null; + } + int deviceIdIndex = (int)dynamicParams.get(DEVICE_ID_INDEX); String topics[] = topic.split("/"); - int deviceIdInTopicHierarchyLevelIndex = DEVICE_ID_TOPIC_HIERARCHY_INDEX; - String deviceIdFromTopic = topics[deviceIdInTopicHierarchyLevelIndex]; + String deviceIdFromTopic = topics[deviceIdIndex]; boolean status; String message = (String) msgPayload; if (message.startsWith(JSON_ARRAY_START_CHAR)) { diff --git a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/pom.xml b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/pom.xml index ea044a723..e7ac5c933 100644 --- a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/pom.xml +++ b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/pom.xml @@ -80,6 +80,10 @@ org.wso2.carbon.devicemgt org.wso2.carbon.identity.jwt.client.extension + + org.wso2.carbon.devicemgt + org.wso2.carbon.apimgt.keymgt.extension + org.wso2.carbon org.wso2.carbon.user.api @@ -151,7 +155,10 @@ org.wso2.carbon.utils.multitenancy, org.apache.axis2.context, org.wso2.carbon.core.multitenancy.utils, - org.wso2.carbon.utils + org.wso2.carbon.utils, + org.wso2.carbon.apimgt.keymgt.extension;version="[5.0,6)", + org.wso2.carbon.apimgt.keymgt.extension.exception;version="[5.0,6)", + org.wso2.carbon.apimgt.keymgt.extension.service;version="[5.0,6)" diff --git a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java index e41cfea50..395f07aec 100644 --- a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java +++ b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java @@ -17,22 +17,18 @@ */ package org.wso2.carbon.device.mgt.input.adapter.mqtt.util; -import org.apache.axis2.context.ConfigurationContext; import org.apache.commons.codec.binary.Base64; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.http.HttpResponse; -import org.apache.http.client.HttpClient; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.conn.HttpHostConnectException; -import org.apache.http.entity.ContentType; -import org.apache.http.entity.StringEntity; -import org.apache.http.message.BasicHeader; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; -import org.json.simple.JSONObject; -import org.json.simple.parser.JSONParser; -import org.json.simple.parser.ParseException; +import org.wso2.carbon.apimgt.keymgt.extension.DCRResponse; +import org.wso2.carbon.apimgt.keymgt.extension.TokenRequest; +import org.wso2.carbon.apimgt.keymgt.extension.TokenResponse; +import org.wso2.carbon.apimgt.keymgt.extension.exception.BadRequestException; +import org.wso2.carbon.apimgt.keymgt.extension.exception.KeyMgtException; +import org.wso2.carbon.apimgt.keymgt.extension.service.KeyMgtService; +import org.wso2.carbon.apimgt.keymgt.extension.service.KeyMgtServiceImpl; import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.core.ServerStatus; import org.wso2.carbon.core.multitenancy.utils.TenantAxisUtils; @@ -43,18 +39,10 @@ import org.wso2.carbon.device.mgt.input.adapter.mqtt.internal.InputAdapterServic import org.wso2.carbon.event.input.adapter.core.InputEventAdapterConfiguration; import org.wso2.carbon.event.input.adapter.core.InputEventAdapterListener; import org.wso2.carbon.event.input.adapter.core.exception.InputEventAdapterRuntimeException; -import org.wso2.carbon.identity.jwt.client.extension.dto.AccessTokenInfo; import org.wso2.carbon.identity.jwt.client.extension.exception.JWTClientException; -import org.wso2.carbon.identity.jwt.client.extension.service.JWTClientManagerService; import org.wso2.carbon.user.api.UserStoreException; import org.wso2.carbon.utils.multitenancy.MultitenantConstants; -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URL; -import java.security.KeyManagementException; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; import java.util.HashMap; import java.util.Map; @@ -68,6 +56,7 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { private MQTTBrokerConnectionConfiguration mqttBrokerConnectionConfiguration; private String topic; + private String topicStructure; private String tenantDomain; private volatile boolean connectionSucceeded = false; private ContentValidator contentValidator; @@ -88,13 +77,10 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { this.mqttBrokerConnectionConfiguration = mqttBrokerConnectionConfiguration; this.cleanSession = mqttBrokerConnectionConfiguration.isCleanSession(); int keepAlive = mqttBrokerConnectionConfiguration.getKeepAlive(); - this.topic = PropertyUtils.replaceTenantDomainProperty(topic); + this.topicStructure = new String(topic); + this.topic = PropertyUtils.replacePlaceholders(topic); this.eventAdapterListener = inputEventAdapterListener; - this.tenantDomain = this.topic.split("/")[0]; - //this is to allow server listener from IoT Core to connect. - if (this.tenantDomain.equals("+")) { - this.tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(); - } + this.tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(); //SORTING messages until the server fetches them String temp_directory = System.getProperty("java.io.tmpdir"); @@ -146,58 +132,21 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { //getJWT Client Parameters. if (dcrUrlString != null && !dcrUrlString.isEmpty()) { try { - URL dcrUrl = new URL(dcrUrlString); - HttpClient httpClient = MQTTUtil.getHttpClient(dcrUrl.getProtocol()); - HttpPost postMethod = new HttpPost(dcrUrlString); - RegistrationProfile registrationProfile = new RegistrationProfile(); - registrationProfile.setCallbackUrl(MQTTEventAdapterConstants.EMPTY_STRING); - registrationProfile.setGrantType(MQTTEventAdapterConstants.GRANT_TYPE); - registrationProfile.setOwner(username); - registrationProfile.setTokenScope(MQTTEventAdapterConstants.TOKEN_SCOPE); - if (!mqttBrokerConnectionConfiguration.isGlobalCredentailSet()) { - registrationProfile.setClientName(MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX - + mqttBrokerConnectionConfiguration.getAdapterName() + - "_" + tenantDomain); - registrationProfile.setIsSaasApp(false); - } else { - registrationProfile.setClientName(MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX - + mqttBrokerConnectionConfiguration.getAdapterName()); - registrationProfile.setIsSaasApp(true); - } - String jsonString = registrationProfile.toJSON(); - StringEntity requestEntity = new StringEntity(jsonString, ContentType.APPLICATION_JSON); - postMethod.setEntity(requestEntity); - String basicAuth = getBase64Encode(username, password); - postMethod.setHeader(new BasicHeader(MQTTEventAdapterConstants.AUTHORIZATION_HEADER_NAME, - MQTTEventAdapterConstants.AUTHORIZATION_HEADER_VALUE_PREFIX + - basicAuth)); - HttpResponse httpResponse = httpClient.execute(postMethod); - if (httpResponse != null) { - String response = MQTTUtil.getResponseString(httpResponse); - try { - if (response != null) { - JSONParser jsonParser = new JSONParser(); - JSONObject jsonPayload = (JSONObject) jsonParser.parse(response); - String clientId = (String) jsonPayload.get(MQTTEventAdapterConstants.CLIENT_ID); - String clientSecret = (String) jsonPayload.get(MQTTEventAdapterConstants.CLIENT_SECRET); - connectionOptions.setUserName(getToken(clientId, clientSecret)); - } - } catch (ParseException e) { - String msg = "error occurred while parsing generating token for the adapter"; - log.error(msg, e); - } - } - } catch (HttpHostConnectException e) { - log.error("Keymanager is unreachable, Waiting...."); - return false; - } catch (MalformedURLException e) { - log.error("Invalid dcrUrl : " + dcrUrlString); - return false; + KeyMgtService keyMgtService = new KeyMgtServiceImpl(); + String applicationName = MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX + + mqttBrokerConnectionConfiguration.getAdapterName(); + DCRResponse dcrResponse = keyMgtService.dynamicClientRegistration(applicationName, username, + "client_credentials", null, new String[]{"device_management"}, false, Integer.MAX_VALUE); + String accessToken = getToken(dcrResponse.getClientId(), dcrResponse.getClientSecret()); + connectionOptions.setUserName(accessToken.substring(0, 18)); + connectionOptions.setPassword(accessToken.substring(19).toCharArray()); + + } catch (JWTClientException | UserStoreException e) { - log.error("Failed to create an oauth token with jwt grant type.", e); + log.error("Failed to create an oauth token with client_credentials grant type.", e); return false; - } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException | IOException e) { - log.error("Failed to create a http connection.", e); + } catch (KeyMgtException e) { + log.error("Failed to create an application.", e); return false; } } @@ -249,7 +198,8 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { try { - String msgText = mqttMessage.toString(); + String mqttMsgString = mqttMessage.toString(); + String msgText = mqttMsgString.substring(mqttMsgString.indexOf("{"), mqttMsgString.indexOf("}") +1); if (log.isDebugEnabled()) { log.debug(msgText); } @@ -267,10 +217,18 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { log.debug("Event received in MQTT Event Adapter - " + msgText); } + if (contentValidator != null && contentTransformer != null) { ContentInfo contentInfo; Map dynamicProperties = new HashMap<>(); dynamicProperties.put(MQTTEventAdapterConstants.TOPIC, topic); + int deviceIdIndex = -1; + for (String topicStructurePart : topicStructure.split("/")) { + deviceIdIndex++; + if (topicStructurePart.contains("+")) { + dynamicProperties.put(MQTTEventAdapterConstants.DEVICE_ID_INDEX, deviceIdIndex); + } + } Object transformedMessage = contentTransformer.transform(msgText, dynamicProperties); contentInfo = contentValidator.validate(transformedMessage, dynamicProperties); if (contentInfo != null && contentInfo.isValidContent()) { @@ -326,21 +284,22 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(tenantDomain, true); try { String scopes = mqttBrokerConnectionConfiguration.getBrokerScopes(); - String username = mqttBrokerConnectionConfiguration.getUsername(); - if (mqttBrokerConnectionConfiguration.isGlobalCredentailSet()) { - username = PrivilegedCarbonContext.getThreadLocalCarbonContext() - .getUserRealm().getRealmConfiguration().getAdminUserName() + "@" + PrivilegedCarbonContext - .getThreadLocalCarbonContext().getTenantDomain(true); - } + scopes += " perm:topic:sub:" + this.topic.replace("/",":"); + scopes += " perm:topic:pub:" + this.topic.replace("/",":"); + + TokenRequest tokenRequest = new TokenRequest(clientId, clientSecret, + null, scopes.toString(), "client_credentials", null, + null, null, null, Integer.MAX_VALUE); + KeyMgtService keyMgtService = new KeyMgtServiceImpl(); + TokenResponse tokenResponse = keyMgtService.generateAccessToken(tokenRequest); - JWTClientManagerService jwtClientManagerService = - InputAdapterServiceDataHolder.getJwtClientManagerService(); - AccessTokenInfo accessTokenInfo = jwtClientManagerService.getJWTClient().getAccessToken( - clientId, clientSecret, username, scopes); - return accessTokenInfo.getAccessToken(); + return tokenResponse.getAccessToken(); + } catch (KeyMgtException | BadRequestException e) { + log.error("Error while generating access token", e); } finally { PrivilegedCarbonContext.endTenantFlow(); } + return null; } private String getBase64Encode(String key, String value) { diff --git a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTEventAdapterConstants.java b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTEventAdapterConstants.java index 7c80d8a1d..663f25b72 100644 --- a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTEventAdapterConstants.java +++ b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTEventAdapterConstants.java @@ -64,6 +64,7 @@ public class MQTTEventAdapterConstants { public static final String EMPTY = ""; public static final String MQTT_CONTENT_VALIDATION_DEFAULT_PARAMETERS = ""; public static final String TOPIC = "topic"; + public static final String DEVICE_ID_INDEX = "deviceIdIndex"; public static final String PAYLOAD = "payload"; public static final String AUTHORIZATION_HEADER_NAME = "Authorization"; public static final String AUTHORIZATION_HEADER_VALUE_PREFIX = "Basic "; diff --git a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/PropertyUtils.java b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/PropertyUtils.java index 682a292f7..1415a1684 100644 --- a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/PropertyUtils.java +++ b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/PropertyUtils.java @@ -25,7 +25,9 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; class PropertyUtils { - private static final String TENANT_DOMAIN_PROPERTY = "\\$\\{tenant-domain\\}"; + private static final String TENANT_DOMAIN_PROPERTY = "\\$\\{tenantDomain\\}"; + private static final String DEVICE_TYPE_PROPERTY = "\\$\\{deviceType\\}"; + private static final String DEVICE_ID_PROPERTY = "\\$\\{deviceId\\}"; //This method is only used if the mb features are within DAS. static String replaceMqttProperty(String urlWithPlaceholders) throws InputEventAdapterException { @@ -50,4 +52,10 @@ class PropertyUtils { .getThreadLocalCarbonContext().getTenantDomain(true)); return urlWithPlaceholders; } + + public static String replacePlaceholders(String urlWithPlaceholders) { + urlWithPlaceholders = urlWithPlaceholders.replaceAll(TENANT_DOMAIN_PROPERTY, "+") + .replaceAll(DEVICE_TYPE_PROPERTY, "+").replaceAll(DEVICE_ID_PROPERTY, "+"); + return urlWithPlaceholders; + } } diff --git a/pom.xml b/pom.xml index 264626acb..5ebba9e78 100644 --- a/pom.xml +++ b/pom.xml @@ -305,6 +305,11 @@ org.wso2.carbon.identity.jwt.client.extension ${carbon.devicemgt.version} + + org.wso2.carbon.devicemgt + org.wso2.carbon.apimgt.keymgt.extension + ${carbon.devicemgt.version} + org.wso2.carbon.devicemgt org.wso2.carbon.apimgt.application.extension @@ -1170,7 +1175,7 @@ - 5.0.20 + 5.0.21-SNAPSHOT [5.0.0, 6.0.0)