dm-improvements
Lasantha Dharmakeerthi 8 months ago
commit 7876d87c51

@ -44,13 +44,14 @@ import org.wso2.carbon.context.PrivilegedCarbonContext;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
public class ExServer {
private static final Log logger = LogFactory.getLog(ExServer.class.getName());
private static Map<String, String> accessTokenMap = new HashMap<>();
private static Map<String, String> authorizedScopeMap = new HashMap<>();
private static Map<String, String> accessTokenMap = new ConcurrentHashMap<>();
private static Map<String, String> authorizedScopeMap = new ConcurrentHashMap<>();
private Server server;
public ExServer() {
@ -177,25 +178,27 @@ public class ExServer {
if (request.getResultCode().equals("success")) {
String accessToken = accessTokenMap.get(request.getConninfo().getClientid());
String scopeString = authorizedScopeMap.get(accessToken);
String[] scopeArray = scopeString.split(" ");
String deviceType = null;
String deviceId = null;
for (String scope : scopeArray) {
if (scope.startsWith("device_")) {
String[] scopeParts = scope.split("_");
deviceType = scopeParts[1];
deviceId = scopeParts[2];
break;
if (!StringUtils.isEmpty(scopeString)) {
String[] scopeArray = scopeString.split(" ");
String deviceType = null;
String deviceId = null;
for (String scope : scopeArray) {
if (scope.startsWith("device_")) {
String[] scopeParts = scope.split("_");
deviceType = scopeParts[1];
deviceId = scopeParts[2];
break;
}
}
}
if (!StringUtils.isEmpty(deviceType) && !StringUtils.isEmpty(deviceId)) {
try {
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain("carbon.super");
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(-1234);
DeviceManagementProviderService deviceManagementProviderService = getDeviceManagementService();
deviceManagementProviderService.changeDeviceStatus(new DeviceIdentifier(deviceId, deviceType), EnrolmentInfo.Status.ACTIVE);
} catch (DeviceManagementException e) {
logger.error("onClientConnack: Error while setting device status");
if (!StringUtils.isEmpty(deviceType) && !StringUtils.isEmpty(deviceId)) {
try {
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain("carbon.super");
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(-1234);
DeviceManagementProviderService deviceManagementProviderService = getDeviceManagementService();
deviceManagementProviderService.changeDeviceStatus(new DeviceIdentifier(deviceId, deviceType), EnrolmentInfo.Status.ACTIVE);
} catch (DeviceManagementException e) {
logger.error("onClientConnack: Error while setting device status");
}
}
}
}
@ -315,53 +318,57 @@ public class ExServer {
if (StringUtils.isEmpty(accessToken) || !accessToken.startsWith(request.getClientinfo().getUsername())) {
logger.info("Valid access token not found");
responseObserver.onError(new Exception("not authorized"));
return;
}
String authorizedScopeList = authorizedScopeMap.get(accessToken);
String[] scopeArray = authorizedScopeList.split(" ");
List<String> scopeList = Arrays.asList(scopeArray);
boolean isFound = false;
if (!StringUtils.isEmpty(authorizedScopeList)) {
String[] scopeArray = authorizedScopeList.split(" ");
List<String> scopeList = Arrays.asList(scopeArray);
String tempScope = null;
String requestTopic = request.getTopic();
if (request.getType().equals(ClientCheckAclRequest.AclReqType.PUBLISH)) {
requestTopic = requestTopic.replace("/", ":");
String tempScope = null;
String requestTopic = request.getTopic();
String[] requestTopicParts = requestTopic.split(":");
if (request.getType().equals(ClientCheckAclRequest.AclReqType.PUBLISH)) {
requestTopic = requestTopic.replace("/", ":");
if (requestTopicParts.length >= 4 && "operation".equals(requestTopicParts[3])) {
// publishing operation from iot server to emqx
tempScope = "perm:topic:pub:" + requestTopicParts[0] + ":+:+:operation";
} else {
// publishing operation response from device to emqx
// publishing events from device to emqx
tempScope = "perm:topic:pub:" + requestTopic;
}
String[] requestTopicParts = requestTopic.split(":");
for (String scope : scopeList) {
if (scope.startsWith(tempScope)) {
isFound = true;
break;
if (requestTopicParts.length >= 4 && "operation".equals(requestTopicParts[3])) {
// publishing operation from iot server to emqx
tempScope = "perm:topic:pub:" + requestTopicParts[0] + ":+:+:operation";
} else {
// publishing operation response from device to emqx
// publishing events from device to emqx
tempScope = "perm:topic:pub:" + requestTopic;
}
}
}
if (request.getType().equals(ClientCheckAclRequest.AclReqType.SUBSCRIBE)) {
if (requestTopic.endsWith("/#")) {
requestTopic = requestTopic.substring(0, requestTopic.indexOf("/#"));
for (String scope : scopeList) {
if (scope.startsWith(tempScope)) {
isFound = true;
break;
}
}
}
requestTopic = requestTopic.replace("/", ":");
// subscribing for events from iotserver to emqx
// subscribing for operation from device to emqx
// subscribing for operation response from iotserver to emqx
tempScope = "perm:topic:sub:" + requestTopic;
if (request.getType().equals(ClientCheckAclRequest.AclReqType.SUBSCRIBE)) {
if (requestTopic.endsWith("/#")) {
requestTopic = requestTopic.substring(0, requestTopic.indexOf("/#"));
}
for (String scope : scopeList) {
if (scope.startsWith(tempScope)) {
isFound = true;
break;
requestTopic = requestTopic.replace("/", ":");
// subscribing for events from iotserver to emqx
// subscribing for operation from device to emqx
// subscribing for operation response from iotserver to emqx
tempScope = "perm:topic:sub:" + requestTopic;
for (String scope : scopeList) {
if (scope.startsWith(tempScope)) {
isFound = true;
break;
}
}
}
}

Loading…
Cancel
Save