fixed adding operation and operation response paths with emqx integration

remotes/1696811969212645420/master
Amalka Subasinghe 2 years ago
parent fb03324837
commit f8e7837efc

@ -285,7 +285,6 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
try { try {
String scopes = mqttBrokerConnectionConfiguration.getBrokerScopes(); String scopes = mqttBrokerConnectionConfiguration.getBrokerScopes();
scopes += " perm:topic:sub:" + this.topic.replace("/",":"); scopes += " perm:topic:sub:" + this.topic.replace("/",":");
scopes += " perm:topic:pub:" + this.topic.replace("/",":");
TokenRequest tokenRequest = new TokenRequest(clientId, clientSecret, TokenRequest tokenRequest = new TokenRequest(clientId, clientSecret,
null, scopes.toString(), "client_credentials", null, null, scopes.toString(), "client_credentials", null,

@ -73,6 +73,10 @@
<groupId>org.wso2.carbon</groupId> <groupId>org.wso2.carbon</groupId>
<artifactId>org.wso2.carbon.user.api</artifactId> <artifactId>org.wso2.carbon.user.api</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>org.wso2.carbon.apimgt.keymgt.extension</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>
@ -135,7 +139,10 @@
org.apache.http.message, org.apache.http.message,
org.apache.commons.ssl, org.apache.commons.ssl,
org.wso2.carbon.identity.jwt.client.extension.*, org.wso2.carbon.identity.jwt.client.extension.*,
org.wso2.carbon.user.api org.wso2.carbon.user.api,
org.wso2.carbon.apimgt.keymgt.extension;version="[5.0,6)",
org.wso2.carbon.apimgt.keymgt.extension.exception;version="[5.0,6)",
org.wso2.carbon.apimgt.keymgt.extension.service;version="[5.0,6)"
</Import-Package> </Import-Package>
</instructions> </instructions>
</configuration> </configuration>

@ -34,6 +34,13 @@ import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.json.simple.JSONObject; import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser; import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException; import org.json.simple.parser.ParseException;
import org.wso2.carbon.apimgt.keymgt.extension.DCRResponse;
import org.wso2.carbon.apimgt.keymgt.extension.TokenRequest;
import org.wso2.carbon.apimgt.keymgt.extension.TokenResponse;
import org.wso2.carbon.apimgt.keymgt.extension.exception.BadRequestException;
import org.wso2.carbon.apimgt.keymgt.extension.exception.KeyMgtException;
import org.wso2.carbon.apimgt.keymgt.extension.service.KeyMgtService;
import org.wso2.carbon.apimgt.keymgt.extension.service.KeyMgtServiceImpl;
import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.output.adapter.mqtt.internal.OutputAdapterServiceDataHolder; import org.wso2.carbon.device.mgt.output.adapter.mqtt.internal.OutputAdapterServiceDataHolder;
import org.wso2.carbon.event.output.adapter.core.exception.ConnectionUnavailableException; import org.wso2.carbon.event.output.adapter.core.exception.ConnectionUnavailableException;
@ -62,9 +69,12 @@ public class MQTTAdapterPublisher {
String clientId; String clientId;
int tenantId; int tenantId;
private String tenantDomain;
public MQTTAdapterPublisher(MQTTBrokerConnectionConfiguration mqttBrokerConnectionConfiguration, String clientId public MQTTAdapterPublisher(MQTTBrokerConnectionConfiguration mqttBrokerConnectionConfiguration, String clientId
, int tenantId) { , int tenantId) {
this.tenantId = tenantId; this.tenantId = tenantId;
this.tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
if (clientId == null || clientId.trim().isEmpty()) { if (clientId == null || clientId.trim().isEmpty()) {
this.clientId = MqttClient.generateClientId(); this.clientId = MqttClient.generateClientId();
} }
@ -85,8 +95,9 @@ public class MQTTAdapterPublisher {
connectionOptions.setCleanSession(cleanSession); connectionOptions.setCleanSession(cleanSession);
connectionOptions.setKeepAliveInterval(keepAlive); connectionOptions.setKeepAliveInterval(keepAlive);
if (mqttBrokerConnectionConfiguration.getUsername() != null) { if (mqttBrokerConnectionConfiguration.getUsername() != null) {
connectionOptions.setUserName(getToken()); String accessToken = getToken();
connectionOptions.setPassword(MQTTEventAdapterConstants.DEFAULT_PASSWORD.toCharArray()); connectionOptions.setUserName(accessToken.substring(0, 18));
connectionOptions.setPassword(accessToken.substring(19).toCharArray());
} }
// Construct an MQTT blocking mode client // Construct an MQTT blocking mode client
mqttClient = new MqttClient(mqttBrokerConnectionConfiguration.getBrokerUrl(), clientId, dataStore); mqttClient = new MqttClient(mqttBrokerConnectionConfiguration.getBrokerUrl(), clientId, dataStore);
@ -151,53 +162,22 @@ public class MQTTAdapterPublisher {
if (dcrUrlString != null && !dcrUrlString.isEmpty()) { if (dcrUrlString != null && !dcrUrlString.isEmpty()) {
try { try {
URL dcrUrl = new URL(dcrUrlString); KeyMgtService keyMgtService = new KeyMgtServiceImpl();
HttpClient httpClient = MQTTUtil.getHttpClient(dcrUrl.getProtocol()); String applicationName = MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX
HttpPost postMethod = new HttpPost(dcrUrlString); + mqttBrokerConnectionConfiguration.getAdapterName();
RegistrationProfile registrationProfile = new RegistrationProfile(); DCRResponse dcrResponse = keyMgtService.dynamicClientRegistration(applicationName, username,
registrationProfile.setCallbackUrl(MQTTEventAdapterConstants.EMPTY_STRING); "client_credentials", null, new String[]{"device_management"}, false, Integer.MAX_VALUE);
registrationProfile.setGrantType(MQTTEventAdapterConstants.GRANT_TYPE); return getToken(dcrResponse.getClientId(), dcrResponse.getClientSecret());
registrationProfile.setOwner(username); // connectionOptions.setUserName(accessToken.substring(0, 18));
registrationProfile.setTokenScope(MQTTEventAdapterConstants.TOKEN_SCOPE); // connectionOptions.setPassword(accessToken.substring(19).toCharArray());
if (!mqttBrokerConnectionConfiguration.isGlobalCredentailSet()) {
registrationProfile.setClientName(MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX
+ mqttBrokerConnectionConfiguration.getAdapterName() + } catch (JWTClientException | UserStoreException e) {
"_" + tenantId); log.error("Failed to create an oauth token with client_credentials grant type.", e);
registrationProfile.setIsSaasApp(false); throw new OutputEventAdapterRuntimeException("Failed to create an oauth token with client_credentials grant type.", e);
} else { } catch (KeyMgtException e) {
registrationProfile.setClientName(MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX log.error("Failed to create an application.", e);
+ mqttBrokerConnectionConfiguration.getAdapterName()); throw new OutputEventAdapterRuntimeException("Failed to create an application.", e);
registrationProfile.setIsSaasApp(true);
}
String jsonString = registrationProfile.toJSON();
StringEntity requestEntity = new StringEntity(jsonString, ContentType.APPLICATION_JSON);
postMethod.setEntity(requestEntity);
String basicAuth = getBase64Encode(username, password);
postMethod.setHeader(new BasicHeader(MQTTEventAdapterConstants.AUTHORIZATION_HEADER_NAME,
MQTTEventAdapterConstants.AUTHORIZATION_HEADER_VALUE_PREFIX +
basicAuth));
HttpResponse httpResponse = httpClient.execute(postMethod);
if (httpResponse != null) {
String response = MQTTUtil.getResponseString(httpResponse);
try {
if (response != null) {
JSONParser jsonParser = new JSONParser();
JSONObject jsonPayload = (JSONObject) jsonParser.parse(response);
String clientId = (String) jsonPayload.get(MQTTEventAdapterConstants.CLIENT_ID);
String clientSecret = (String) jsonPayload.get(MQTTEventAdapterConstants.CLIENT_SECRET);
return getToken(clientId, clientSecret);
}
} catch (ParseException e) {
String msg = "error occurred while parsing generating token for the adapter";
log.error(msg, e);
}
}
} catch (MalformedURLException e) {
throw new OutputEventAdapterRuntimeException("Invalid dcrUrl : " + dcrUrlString);
} catch (KeyManagementException | NoSuchAlgorithmException | KeyStoreException | IOException e) {
throw new OutputEventAdapterRuntimeException("Failed to create an https connection.", e);
} catch (JWTClientException | UserStoreException e) {
log.error("Failed to create an oauth token with jwt grant type.", e);
} }
} }
throw new OutputEventAdapterRuntimeException("Invalid configuration for mqtt publisher"); throw new OutputEventAdapterRuntimeException("Invalid configuration for mqtt publisher");
@ -206,24 +186,24 @@ public class MQTTAdapterPublisher {
private String getToken(String clientId, String clientSecret) private String getToken(String clientId, String clientSecret)
throws UserStoreException, JWTClientException { throws UserStoreException, JWTClientException {
PrivilegedCarbonContext.startTenantFlow(); PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(tenantId, true); PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(tenantDomain, true);
try { try {
String scopes = mqttBrokerConnectionConfiguration.getScopes(); String scopes = mqttBrokerConnectionConfiguration.getScopes();
String username = mqttBrokerConnectionConfiguration.getUsername(); scopes += " perm:topic:pub:" + tenantDomain + ":+:+:operation";
if (mqttBrokerConnectionConfiguration.isGlobalCredentailSet()) {
username = PrivilegedCarbonContext.getThreadLocalCarbonContext() TokenRequest tokenRequest = new TokenRequest(clientId, clientSecret,
.getUserRealm().getRealmConfiguration().getAdminUserName() + "@" + PrivilegedCarbonContext null, scopes.toString(), "client_credentials", null,
.getThreadLocalCarbonContext().getTenantDomain(true); null, null, null, Integer.MAX_VALUE);
} KeyMgtService keyMgtService = new KeyMgtServiceImpl();
TokenResponse tokenResponse = keyMgtService.generateAccessToken(tokenRequest);
JWTClientManagerService jwtClientManagerService = return tokenResponse.getAccessToken();
OutputAdapterServiceDataHolder.getJwtClientManagerService(); } catch (KeyMgtException | BadRequestException e) {
AccessTokenInfo accessTokenInfo = jwtClientManagerService.getJWTClient().getAccessToken( log.error("Error while generating access token", e);
clientId, clientSecret, username, scopes);
return accessTokenInfo.getAccessToken();
} finally { } finally {
PrivilegedCarbonContext.endTenantFlow(); PrivilegedCarbonContext.endTenantFlow();
} }
return null;
} }
private String getBase64Encode(String key, String value) { private String getBase64Encode(String key, String value) {

@ -324,41 +324,62 @@ public class ExServer {
boolean isFound = false; boolean isFound = false;
String tempScope = null; String tempScope = null;
if (request.getType().equals(ClientCheckAclRequest.AclReqType.SUBSCRIBE) || String requestTopic = request.getTopic();
request.getType().equals(ClientCheckAclRequest.AclReqType.PUBLISH)) {
String requestTopic = request.getTopic(); if (request.getType().equals(ClientCheckAclRequest.AclReqType.PUBLISH)) {
if (requestTopic.endsWith("/#")) {
requestTopic = requestTopic.substring(0, requestTopic.indexOf("/#"));
}
// replace / with :
requestTopic = requestTopic.replace("/", ":"); requestTopic = requestTopic.replace("/", ":");
if (request.getType().equals(ClientCheckAclRequest.AclReqType.SUBSCRIBE)) { String[] requestTopicParts = requestTopic.split(":");
tempScope = "perm:topic:sub:" + requestTopic;
} if (requestTopicParts.length >= 4 && "operation".equals(requestTopicParts[3])) {
if (request.getType().equals(ClientCheckAclRequest.AclReqType.PUBLISH)) { // publishing operation from iot server to emqx
tempScope = "perm:topic:pub:" + requestTopicParts[0] + ":+:+:operation";
} else {
// publishing operation response from device to emqx
// publishing events from device to emqx
tempScope = "perm:topic:pub:" + requestTopic; tempScope = "perm:topic:pub:" + requestTopic;
} }
if (scopeList.contains(tempScope)) { for (String scope : scopeList) {
isFound = true; if (scope.startsWith(tempScope)) {
isFound = true;
break;
}
} }
}
if (isFound) { if (request.getType().equals(ClientCheckAclRequest.AclReqType.SUBSCRIBE)) {
ValuedResponse reply = ValuedResponse.newBuilder() if (requestTopic.endsWith("/#")) {
.setBoolResult(true) requestTopic = requestTopic.substring(0, requestTopic.indexOf("/#"));
.setType(ValuedResponse.ResponsedType.STOP_AND_RETURN) }
.build();
responseObserver.onNext(reply); requestTopic = requestTopic.replace("/", ":");
responseObserver.onCompleted(); // subscribing for events from iotserver to emqx
} else { // subscribing for operation from device to emqx
responseObserver.onError(new Exception("not authorized")); // subscribing for operation response from iotserver to emqx
tempScope = "perm:topic:sub:" + requestTopic;
for (String scope : scopeList) {
if (scope.startsWith(tempScope)) {
isFound = true;
break;
}
} }
} }
if (isFound) {
ValuedResponse reply = ValuedResponse.newBuilder()
.setBoolResult(true)
.setType(ValuedResponse.ResponsedType.STOP_AND_RETURN)
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} else {
logger.error("not authorized");
responseObserver.onError(new Exception("not authorized"));
}
} else { } else {
//default //default
ValuedResponse reply = ValuedResponse.newBuilder() ValuedResponse reply = ValuedResponse.newBuilder()

Loading…
Cancel
Save