fixing mqtt lister with content transformation and validation

bump-versions-for-development
Amalka Subasinghe 2 years ago
parent 11785fa4bb
commit f451529e6d

@ -35,6 +35,7 @@ import java.util.Map;
public class MQTTContentTransformer implements ContentTransformer { public class MQTTContentTransformer implements ContentTransformer {
private static final String MQTT_CONTENT_TRANSFORMER = "device-meta-transformer"; private static final String MQTT_CONTENT_TRANSFORMER = "device-meta-transformer";
private static final String TOPIC = "topic"; private static final String TOPIC = "topic";
private static final String DEVICE_ID_INDEX = "deviceIdIndex";
private static String JSON_ARRAY_START_CHAR = "["; private static String JSON_ARRAY_START_CHAR = "[";
private static final Log log = LogFactory.getLog(MQTTContentTransformer.class); private static final Log log = LogFactory.getLog(MQTTContentTransformer.class);
@ -47,15 +48,19 @@ public class MQTTContentTransformer implements ContentTransformer {
@Override @Override
public Object transform(Object messagePayload, Map<String, Object> dynamicProperties) { public Object transform(Object messagePayload, Map<String, Object> dynamicProperties) {
String topic = (String) dynamicProperties.get(TOPIC); String topic = (String) dynamicProperties.get(TOPIC);
if (!dynamicProperties.containsKey(DEVICE_ID_INDEX)) {
log.error("device id not found in topic ");
return false;
}
int deviceIdIndex = (int)dynamicProperties.get(DEVICE_ID_INDEX);
String topics[] = topic.split("/"); String topics[] = topic.split("/");
String deviceId = topics[2]; String deviceId = topics[deviceIdIndex];
String deviceType = topics[1];
String message = (String) messagePayload; String message = (String) messagePayload;
try { try {
if (message.startsWith(JSON_ARRAY_START_CHAR)) { if (message.startsWith(JSON_ARRAY_START_CHAR)) {
return processMultipleEvents(message, deviceId, deviceType); return processMultipleEvents(message, deviceId, deviceId);
} else { } else {
return processSingleEvent(message, deviceId, deviceType); return processSingleEvent(message, deviceId, deviceId);
} }
} catch (ParseException e) { } catch (ParseException e) {
log.error("Invalid input " + message, e); log.error("Invalid input " + message, e);

@ -36,6 +36,7 @@ public class MQTTContentValidator implements ContentValidator {
private static final String DEVICE_ID_JSON_PATH = "event.metaData.deviceId"; private static final String DEVICE_ID_JSON_PATH = "event.metaData.deviceId";
private static final String DEVICE_TYPE_JSON_PATH = "event.metaData.deviceId"; private static final String DEVICE_TYPE_JSON_PATH = "event.metaData.deviceId";
private static final String TOPIC = "topic"; private static final String TOPIC = "topic";
private static final String DEVICE_ID_INDEX = "deviceIdIndex";
private static final int DEVICE_ID_TOPIC_HIERARCHY_INDEX = 2; private static final int DEVICE_ID_TOPIC_HIERARCHY_INDEX = 2;
@Override @Override
@ -46,9 +47,13 @@ public class MQTTContentValidator implements ContentValidator {
@Override @Override
public ContentInfo validate(Object msgPayload, Map<String, Object> dynamicParams) { public ContentInfo validate(Object msgPayload, Map<String, Object> dynamicParams) {
String topic = (String) dynamicParams.get(TOPIC); String topic = (String) dynamicParams.get(TOPIC);
if (!dynamicParams.containsKey(DEVICE_ID_INDEX)) {
log.error("device id not found in topic ");
return null;
}
int deviceIdIndex = (int)dynamicParams.get(DEVICE_ID_INDEX);
String topics[] = topic.split("/"); String topics[] = topic.split("/");
int deviceIdInTopicHierarchyLevelIndex = DEVICE_ID_TOPIC_HIERARCHY_INDEX; String deviceIdFromTopic = topics[deviceIdIndex];
String deviceIdFromTopic = topics[deviceIdInTopicHierarchyLevelIndex];
boolean status; boolean status;
String message = (String) msgPayload; String message = (String) msgPayload;
if (message.startsWith(JSON_ARRAY_START_CHAR)) { if (message.startsWith(JSON_ARRAY_START_CHAR)) {

@ -80,6 +80,10 @@
<groupId>org.wso2.carbon.devicemgt</groupId> <groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>org.wso2.carbon.identity.jwt.client.extension</artifactId> <artifactId>org.wso2.carbon.identity.jwt.client.extension</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>org.wso2.carbon.apimgt.keymgt.extension</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.wso2.carbon</groupId> <groupId>org.wso2.carbon</groupId>
<artifactId>org.wso2.carbon.user.api</artifactId> <artifactId>org.wso2.carbon.user.api</artifactId>
@ -151,7 +155,10 @@
org.wso2.carbon.utils.multitenancy, org.wso2.carbon.utils.multitenancy,
org.apache.axis2.context, org.apache.axis2.context,
org.wso2.carbon.core.multitenancy.utils, org.wso2.carbon.core.multitenancy.utils,
org.wso2.carbon.utils org.wso2.carbon.utils,
org.wso2.carbon.apimgt.keymgt.extension;version="[5.0,6)",
org.wso2.carbon.apimgt.keymgt.extension.exception;version="[5.0,6)",
org.wso2.carbon.apimgt.keymgt.extension.service;version="[5.0,6)"
</Import-Package> </Import-Package>
</instructions> </instructions>
</configuration> </configuration>

@ -17,22 +17,18 @@
*/ */
package org.wso2.carbon.device.mgt.input.adapter.mqtt.util; package org.wso2.carbon.device.mgt.input.adapter.mqtt.util;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.HttpHostConnectException;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.message.BasicHeader;
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.json.simple.JSONObject; import org.wso2.carbon.apimgt.keymgt.extension.DCRResponse;
import org.json.simple.parser.JSONParser; import org.wso2.carbon.apimgt.keymgt.extension.TokenRequest;
import org.json.simple.parser.ParseException; import org.wso2.carbon.apimgt.keymgt.extension.TokenResponse;
import org.wso2.carbon.apimgt.keymgt.extension.exception.BadRequestException;
import org.wso2.carbon.apimgt.keymgt.extension.exception.KeyMgtException;
import org.wso2.carbon.apimgt.keymgt.extension.service.KeyMgtService;
import org.wso2.carbon.apimgt.keymgt.extension.service.KeyMgtServiceImpl;
import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.core.ServerStatus; import org.wso2.carbon.core.ServerStatus;
import org.wso2.carbon.core.multitenancy.utils.TenantAxisUtils; import org.wso2.carbon.core.multitenancy.utils.TenantAxisUtils;
@ -43,18 +39,10 @@ import org.wso2.carbon.device.mgt.input.adapter.mqtt.internal.InputAdapterServic
import org.wso2.carbon.event.input.adapter.core.InputEventAdapterConfiguration; import org.wso2.carbon.event.input.adapter.core.InputEventAdapterConfiguration;
import org.wso2.carbon.event.input.adapter.core.InputEventAdapterListener; import org.wso2.carbon.event.input.adapter.core.InputEventAdapterListener;
import org.wso2.carbon.event.input.adapter.core.exception.InputEventAdapterRuntimeException; import org.wso2.carbon.event.input.adapter.core.exception.InputEventAdapterRuntimeException;
import org.wso2.carbon.identity.jwt.client.extension.dto.AccessTokenInfo;
import org.wso2.carbon.identity.jwt.client.extension.exception.JWTClientException; import org.wso2.carbon.identity.jwt.client.extension.exception.JWTClientException;
import org.wso2.carbon.identity.jwt.client.extension.service.JWTClientManagerService;
import org.wso2.carbon.user.api.UserStoreException; import org.wso2.carbon.user.api.UserStoreException;
import org.wso2.carbon.utils.multitenancy.MultitenantConstants; import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -68,6 +56,7 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
private MQTTBrokerConnectionConfiguration mqttBrokerConnectionConfiguration; private MQTTBrokerConnectionConfiguration mqttBrokerConnectionConfiguration;
private String topic; private String topic;
private String topicStructure;
private String tenantDomain; private String tenantDomain;
private volatile boolean connectionSucceeded = false; private volatile boolean connectionSucceeded = false;
private ContentValidator contentValidator; private ContentValidator contentValidator;
@ -88,13 +77,10 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
this.mqttBrokerConnectionConfiguration = mqttBrokerConnectionConfiguration; this.mqttBrokerConnectionConfiguration = mqttBrokerConnectionConfiguration;
this.cleanSession = mqttBrokerConnectionConfiguration.isCleanSession(); this.cleanSession = mqttBrokerConnectionConfiguration.isCleanSession();
int keepAlive = mqttBrokerConnectionConfiguration.getKeepAlive(); int keepAlive = mqttBrokerConnectionConfiguration.getKeepAlive();
this.topic = PropertyUtils.replaceTenantDomainProperty(topic); this.topicStructure = new String(topic);
this.topic = PropertyUtils.replacePlaceholders(topic);
this.eventAdapterListener = inputEventAdapterListener; this.eventAdapterListener = inputEventAdapterListener;
this.tenantDomain = this.topic.split("/")[0]; this.tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
//this is to allow server listener from IoT Core to connect.
if (this.tenantDomain.equals("+")) {
this.tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
}
//SORTING messages until the server fetches them //SORTING messages until the server fetches them
String temp_directory = System.getProperty("java.io.tmpdir"); String temp_directory = System.getProperty("java.io.tmpdir");
@ -146,58 +132,21 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
//getJWT Client Parameters. //getJWT Client Parameters.
if (dcrUrlString != null && !dcrUrlString.isEmpty()) { if (dcrUrlString != null && !dcrUrlString.isEmpty()) {
try { try {
URL dcrUrl = new URL(dcrUrlString); KeyMgtService keyMgtService = new KeyMgtServiceImpl();
HttpClient httpClient = MQTTUtil.getHttpClient(dcrUrl.getProtocol()); String applicationName = MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX
HttpPost postMethod = new HttpPost(dcrUrlString); + mqttBrokerConnectionConfiguration.getAdapterName();
RegistrationProfile registrationProfile = new RegistrationProfile(); DCRResponse dcrResponse = keyMgtService.dynamicClientRegistration(applicationName, username,
registrationProfile.setCallbackUrl(MQTTEventAdapterConstants.EMPTY_STRING); "client_credentials", null, new String[]{"device_management"}, false, Integer.MAX_VALUE);
registrationProfile.setGrantType(MQTTEventAdapterConstants.GRANT_TYPE); String accessToken = getToken(dcrResponse.getClientId(), dcrResponse.getClientSecret());
registrationProfile.setOwner(username); connectionOptions.setUserName(accessToken.substring(0, 18));
registrationProfile.setTokenScope(MQTTEventAdapterConstants.TOKEN_SCOPE); connectionOptions.setPassword(accessToken.substring(19).toCharArray());
if (!mqttBrokerConnectionConfiguration.isGlobalCredentailSet()) {
registrationProfile.setClientName(MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX
+ mqttBrokerConnectionConfiguration.getAdapterName() +
"_" + tenantDomain);
registrationProfile.setIsSaasApp(false);
} else {
registrationProfile.setClientName(MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX
+ mqttBrokerConnectionConfiguration.getAdapterName());
registrationProfile.setIsSaasApp(true);
}
String jsonString = registrationProfile.toJSON();
StringEntity requestEntity = new StringEntity(jsonString, ContentType.APPLICATION_JSON);
postMethod.setEntity(requestEntity);
String basicAuth = getBase64Encode(username, password);
postMethod.setHeader(new BasicHeader(MQTTEventAdapterConstants.AUTHORIZATION_HEADER_NAME,
MQTTEventAdapterConstants.AUTHORIZATION_HEADER_VALUE_PREFIX +
basicAuth));
HttpResponse httpResponse = httpClient.execute(postMethod);
if (httpResponse != null) {
String response = MQTTUtil.getResponseString(httpResponse);
try {
if (response != null) {
JSONParser jsonParser = new JSONParser();
JSONObject jsonPayload = (JSONObject) jsonParser.parse(response);
String clientId = (String) jsonPayload.get(MQTTEventAdapterConstants.CLIENT_ID);
String clientSecret = (String) jsonPayload.get(MQTTEventAdapterConstants.CLIENT_SECRET);
connectionOptions.setUserName(getToken(clientId, clientSecret));
}
} catch (ParseException e) {
String msg = "error occurred while parsing generating token for the adapter";
log.error(msg, e);
}
}
} catch (HttpHostConnectException e) {
log.error("Keymanager is unreachable, Waiting....");
return false;
} catch (MalformedURLException e) {
log.error("Invalid dcrUrl : " + dcrUrlString);
return false;
} catch (JWTClientException | UserStoreException e) { } catch (JWTClientException | UserStoreException e) {
log.error("Failed to create an oauth token with jwt grant type.", e); log.error("Failed to create an oauth token with client_credentials grant type.", e);
return false; return false;
} catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException | IOException e) { } catch (KeyMgtException e) {
log.error("Failed to create a http connection.", e); log.error("Failed to create an application.", e);
return false; return false;
} }
} }
@ -249,7 +198,8 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
@Override @Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
try { try {
String msgText = mqttMessage.toString(); String mqttMsgString = mqttMessage.toString();
String msgText = mqttMsgString.substring(mqttMsgString.indexOf("{"), mqttMsgString.indexOf("}") +1);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug(msgText); log.debug(msgText);
} }
@ -267,10 +217,18 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
log.debug("Event received in MQTT Event Adapter - " + msgText); log.debug("Event received in MQTT Event Adapter - " + msgText);
} }
if (contentValidator != null && contentTransformer != null) { if (contentValidator != null && contentTransformer != null) {
ContentInfo contentInfo; ContentInfo contentInfo;
Map<String, Object> dynamicProperties = new HashMap<>(); Map<String, Object> dynamicProperties = new HashMap<>();
dynamicProperties.put(MQTTEventAdapterConstants.TOPIC, topic); dynamicProperties.put(MQTTEventAdapterConstants.TOPIC, topic);
int deviceIdIndex = -1;
for (String topicStructurePart : topicStructure.split("/")) {
deviceIdIndex++;
if (topicStructurePart.contains("+")) {
dynamicProperties.put(MQTTEventAdapterConstants.DEVICE_ID_INDEX, deviceIdIndex);
}
}
Object transformedMessage = contentTransformer.transform(msgText, dynamicProperties); Object transformedMessage = contentTransformer.transform(msgText, dynamicProperties);
contentInfo = contentValidator.validate(transformedMessage, dynamicProperties); contentInfo = contentValidator.validate(transformedMessage, dynamicProperties);
if (contentInfo != null && contentInfo.isValidContent()) { if (contentInfo != null && contentInfo.isValidContent()) {
@ -326,21 +284,22 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(tenantDomain, true); PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(tenantDomain, true);
try { try {
String scopes = mqttBrokerConnectionConfiguration.getBrokerScopes(); String scopes = mqttBrokerConnectionConfiguration.getBrokerScopes();
String username = mqttBrokerConnectionConfiguration.getUsername(); scopes += " perm:topic:sub:" + this.topic.replace("/",":");
if (mqttBrokerConnectionConfiguration.isGlobalCredentailSet()) { scopes += " perm:topic:pub:" + this.topic.replace("/",":");
username = PrivilegedCarbonContext.getThreadLocalCarbonContext()
.getUserRealm().getRealmConfiguration().getAdminUserName() + "@" + PrivilegedCarbonContext TokenRequest tokenRequest = new TokenRequest(clientId, clientSecret,
.getThreadLocalCarbonContext().getTenantDomain(true); null, scopes.toString(), "client_credentials", null,
} null, null, null, Integer.MAX_VALUE);
KeyMgtService keyMgtService = new KeyMgtServiceImpl();
TokenResponse tokenResponse = keyMgtService.generateAccessToken(tokenRequest);
JWTClientManagerService jwtClientManagerService = return tokenResponse.getAccessToken();
InputAdapterServiceDataHolder.getJwtClientManagerService(); } catch (KeyMgtException | BadRequestException e) {
AccessTokenInfo accessTokenInfo = jwtClientManagerService.getJWTClient().getAccessToken( log.error("Error while generating access token", e);
clientId, clientSecret, username, scopes);
return accessTokenInfo.getAccessToken();
} finally { } finally {
PrivilegedCarbonContext.endTenantFlow(); PrivilegedCarbonContext.endTenantFlow();
} }
return null;
} }
private String getBase64Encode(String key, String value) { private String getBase64Encode(String key, String value) {

@ -64,6 +64,7 @@ public class MQTTEventAdapterConstants {
public static final String EMPTY = ""; public static final String EMPTY = "";
public static final String MQTT_CONTENT_VALIDATION_DEFAULT_PARAMETERS = ""; public static final String MQTT_CONTENT_VALIDATION_DEFAULT_PARAMETERS = "";
public static final String TOPIC = "topic"; public static final String TOPIC = "topic";
public static final String DEVICE_ID_INDEX = "deviceIdIndex";
public static final String PAYLOAD = "payload"; public static final String PAYLOAD = "payload";
public static final String AUTHORIZATION_HEADER_NAME = "Authorization"; public static final String AUTHORIZATION_HEADER_NAME = "Authorization";
public static final String AUTHORIZATION_HEADER_VALUE_PREFIX = "Basic "; public static final String AUTHORIZATION_HEADER_VALUE_PREFIX = "Basic ";

@ -25,7 +25,9 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
class PropertyUtils { class PropertyUtils {
private static final String TENANT_DOMAIN_PROPERTY = "\\$\\{tenant-domain\\}"; private static final String TENANT_DOMAIN_PROPERTY = "\\$\\{tenantDomain\\}";
private static final String DEVICE_TYPE_PROPERTY = "\\$\\{deviceType\\}";
private static final String DEVICE_ID_PROPERTY = "\\$\\{deviceId\\}";
//This method is only used if the mb features are within DAS. //This method is only used if the mb features are within DAS.
static String replaceMqttProperty(String urlWithPlaceholders) throws InputEventAdapterException { static String replaceMqttProperty(String urlWithPlaceholders) throws InputEventAdapterException {
@ -50,4 +52,10 @@ class PropertyUtils {
.getThreadLocalCarbonContext().getTenantDomain(true)); .getThreadLocalCarbonContext().getTenantDomain(true));
return urlWithPlaceholders; return urlWithPlaceholders;
} }
public static String replacePlaceholders(String urlWithPlaceholders) {
urlWithPlaceholders = urlWithPlaceholders.replaceAll(TENANT_DOMAIN_PROPERTY, "+")
.replaceAll(DEVICE_TYPE_PROPERTY, "+").replaceAll(DEVICE_ID_PROPERTY, "+");
return urlWithPlaceholders;
}
} }

@ -305,6 +305,11 @@
<artifactId>org.wso2.carbon.identity.jwt.client.extension</artifactId> <artifactId>org.wso2.carbon.identity.jwt.client.extension</artifactId>
<version>${carbon.devicemgt.version}</version> <version>${carbon.devicemgt.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>org.wso2.carbon.apimgt.keymgt.extension</artifactId>
<version>${carbon.devicemgt.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.wso2.carbon.devicemgt</groupId> <groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>org.wso2.carbon.apimgt.application.extension</artifactId> <artifactId>org.wso2.carbon.apimgt.application.extension</artifactId>
@ -1170,7 +1175,7 @@
<properties> <properties>
<!-- Carbon Device Management --> <!-- Carbon Device Management -->
<carbon.devicemgt.version>5.0.20</carbon.devicemgt.version> <carbon.devicemgt.version>5.0.21-SNAPSHOT</carbon.devicemgt.version>
<carbon.devicemgt.version.range>[5.0.0, 6.0.0)</carbon.devicemgt.version.range> <carbon.devicemgt.version.range>[5.0.0, 6.0.0)</carbon.devicemgt.version.range>
<!-- Carbon Device Management Plugins --> <!-- Carbon Device Management Plugins -->

Loading…
Cancel
Save