Fixed issue in showing multiple devices locations in single map UI (#924)

* Fixed issue in showing multiple devices locations in single map UI
revert-dabc3590
Charitha Goonetilleke 6 years ago committed by Geeth
parent 42dd0afe43
commit c59e8f2eab

@ -20,17 +20,15 @@ package org.wso2.carbon.device.mgt.output.adapter.websocket;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.json.JSONException;
import org.json.JSONObject; import org.json.JSONObject;
import org.wso2.carbon.context.CarbonContext; import org.wso2.carbon.context.CarbonContext;
import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.databridge.commons.Attribute; import org.wso2.carbon.databridge.commons.Attribute;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.StreamDefinition; import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.device.mgt.output.adapter.websocket.constants.WebsocketConstants; import org.wso2.carbon.device.mgt.output.adapter.websocket.constants.WebsocketConstants;
import org.wso2.carbon.device.mgt.output.adapter.websocket.internal.WebsocketEventAdaptorServiceDataHolder; import org.wso2.carbon.device.mgt.output.adapter.websocket.internal.WebsocketEventAdaptorServiceDataHolder;
import org.wso2.carbon.device.mgt.output.adapter.websocket.util.WebsocketEventAdapterConstants;
import org.wso2.carbon.device.mgt.output.adapter.websocket.util.WebSocketSessionRequest; import org.wso2.carbon.device.mgt.output.adapter.websocket.util.WebSocketSessionRequest;
import org.wso2.carbon.device.mgt.output.adapter.websocket.util.WebsocketEventAdapterConstants;
import org.wso2.carbon.event.output.adapter.core.EventAdapterUtil; import org.wso2.carbon.event.output.adapter.core.EventAdapterUtil;
import org.wso2.carbon.event.output.adapter.core.OutputEventAdapter; import org.wso2.carbon.event.output.adapter.core.OutputEventAdapter;
import org.wso2.carbon.event.output.adapter.core.OutputEventAdapterConfiguration; import org.wso2.carbon.event.output.adapter.core.OutputEventAdapterConfiguration;
@ -43,7 +41,6 @@ import org.wso2.carbon.event.stream.core.exception.EventStreamConfigurationExcep
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingDeque;
@ -68,9 +65,6 @@ public class WebsocketEventAdapter implements OutputEventAdapter {
private boolean doLogDroppedMessage; private boolean doLogDroppedMessage;
private String streamId; private String streamId;
private List<Attribute> streamMetaAttributes;
private List<Attribute> streamCorrelationAttributes;
private List<Attribute> streamPayloadAttributes;
public WebsocketEventAdapter(OutputEventAdapterConfiguration eventAdapterConfiguration, Map<String, public WebsocketEventAdapter(OutputEventAdapterConfiguration eventAdapterConfiguration, Map<String,
String> globalProperties) { String> globalProperties) {
@ -128,23 +122,15 @@ public class WebsocketEventAdapter implements OutputEventAdapter {
throw new OutputEventAdapterRuntimeException("UI event adapter needs a output stream id"); throw new OutputEventAdapterRuntimeException("UI event adapter needs a output stream id");
} }
// fetch the "streamDefinition" corresponding to the "streamId" and then fetch the different attribute types ConcurrentHashMap<Integer, ConcurrentHashMap<String, String>> tenantSpecificEventOutputAdapterMap =
// of the streamDefinition corresponding to the event's streamId. They are required when validating values in
// the events against the streamDef attributes.
StreamDefinition streamDefinition = getStreamDefinition(streamId);
streamMetaAttributes = streamDefinition.getMetaData();
streamCorrelationAttributes = streamDefinition.getCorrelationData();
streamPayloadAttributes = streamDefinition.getPayloadData();
ConcurrentHashMap<Integer, ConcurrentHashMap<String, String>> tenantSpecifcEventOutputAdapterMap =
WebsocketEventAdaptorServiceDataHolder.getTenantSpecificOutputEventStreamAdapterMap(); WebsocketEventAdaptorServiceDataHolder.getTenantSpecificOutputEventStreamAdapterMap();
ConcurrentHashMap<String, String> streamSpecifAdapterMap = tenantSpecifcEventOutputAdapterMap.get(tenantId); ConcurrentHashMap<String, String> streamSpecifAdapterMap = tenantSpecificEventOutputAdapterMap.get(tenantId);
if (streamSpecifAdapterMap == null) { if (streamSpecifAdapterMap == null) {
streamSpecifAdapterMap = new ConcurrentHashMap<>(); streamSpecifAdapterMap = new ConcurrentHashMap<>();
if (null != tenantSpecifcEventOutputAdapterMap.putIfAbsent(tenantId, streamSpecifAdapterMap)) { if (null != tenantSpecificEventOutputAdapterMap.putIfAbsent(tenantId, streamSpecifAdapterMap)) {
streamSpecifAdapterMap = tenantSpecifcEventOutputAdapterMap.get(tenantId); streamSpecifAdapterMap = tenantSpecificEventOutputAdapterMap.get(tenantId);
} }
} }
@ -200,61 +186,10 @@ public class WebsocketEventAdapter implements OutputEventAdapter {
@Override @Override
public void publish(Object message, Map<String, String> dynamicProperties) { public void publish(Object message, Map<String, String> dynamicProperties) {
String eventString;
if (streamSpecificEvents.size() == queueSize) { if (streamSpecificEvents.size() == queueSize) {
streamSpecificEvents.removeFirst(); streamSpecificEvents.removeFirst();
} }
if (message instanceof Event) { String eventString = message.toString();
Event event = (Event) message;
StringBuilder eventBuilder = new StringBuilder("[");
eventBuilder.append(event.getTimeStamp());
if (event.getMetaData() != null) {
eventBuilder.append(",");
Object[] metaData = event.getMetaData();
for (int i = 0; i < metaData.length; i++) {
eventBuilder.append("\"");
eventBuilder.append(metaData[i]);
eventBuilder.append("\"");
if (i != (metaData.length - 1)) {
eventBuilder.append(",");
}
}
}
if (event.getCorrelationData() != null) {
Object[] correlationData = event.getCorrelationData();
eventBuilder.append(",");
for (int i = 0; i < correlationData.length; i++) {
eventBuilder.append("\"");
eventBuilder.append(correlationData[i]);
eventBuilder.append("\"");
if (i != (correlationData.length - 1)) {
eventBuilder.append(",");
}
}
}
if (event.getPayloadData() != null) {
Object[] payloadData = event.getPayloadData();
eventBuilder.append(",");
for (int i = 0; i < payloadData.length; i++) {
eventBuilder.append("\"");
eventBuilder.append(payloadData[i]);
eventBuilder.append("\"");
if (i != (payloadData.length - 1)) {
eventBuilder.append(",");
}
}
}
eventBuilder.append("]");
eventString = eventBuilder.toString();
} else {
eventString = message.toString();
}
Object[] eventValues = new Object[WebsocketEventAdapterConstants.INDEX_TWO]; Object[] eventValues = new Object[WebsocketEventAdapterConstants.INDEX_TWO];
eventValues[WebsocketEventAdapterConstants.INDEX_ZERO] = eventString; eventValues[WebsocketEventAdapterConstants.INDEX_ZERO] = eventString;
@ -262,7 +197,7 @@ public class WebsocketEventAdapter implements OutputEventAdapter {
streamSpecificEvents.add(eventValues); streamSpecificEvents.add(eventValues);
// fetch all valid sessions checked against any queryParameters provided when subscribing. // fetch all valid sessions checked against any queryParameters provided when subscribing.
CopyOnWriteArrayList<WebSocketSessionRequest> validSessions = getValidSessions(message); CopyOnWriteArrayList<WebSocketSessionRequest> validSessions = getValidSessions(eventString);
try { try {
executorService.execute(new WebSocketSender(validSessions, eventString)); executorService.execute(new WebSocketSender(validSessions, eventString));
@ -332,131 +267,36 @@ public class WebsocketEventAdapter implements OutputEventAdapter {
* Fetches all valid web-socket sessions from the entire pool of subscribed sessions. The validity is checked * Fetches all valid web-socket sessions from the entire pool of subscribed sessions. The validity is checked
* against any queryString provided when subscribing to the web-socket endpoint. * against any queryString provided when subscribing to the web-socket endpoint.
* *
* @param event the current event received and that which needs to be published to subscribed sessions. * @param eventString the current event received and that which needs to be published to subscribed sessions.
* @return a list of all validated web-socket sessions against the queryString values. * @return a list of all validated web-socket sessions against the queryString values.
*/ */
private CopyOnWriteArrayList<WebSocketSessionRequest> getValidSessions(Object event) { private CopyOnWriteArrayList<WebSocketSessionRequest> getValidSessions(String eventString) {
CopyOnWriteArrayList<WebSocketSessionRequest> validSessions = new CopyOnWriteArrayList<>(); CopyOnWriteArrayList<WebSocketSessionRequest> validSessions = new CopyOnWriteArrayList<>();
WebsocketOutputCallbackControllerServiceImpl websocketOutputCallbackControllerServiceImpl = WebsocketOutputCallbackControllerServiceImpl websocketOutputCallbackControllerServiceImpl =
WebsocketEventAdaptorServiceDataHolder.getUIOutputCallbackRegisterServiceImpl(); WebsocketEventAdaptorServiceDataHolder.getUIOutputCallbackRegisterServiceImpl();
// get all subscribed web-socket sessions. // get all subscribed web-socket sessions.
CopyOnWriteArrayList<WebSocketSessionRequest> webSocketSessionUtils = CopyOnWriteArrayList<WebSocketSessionRequest> webSocketSessionRequests =
websocketOutputCallbackControllerServiceImpl.getSessions(tenantId, streamId); websocketOutputCallbackControllerServiceImpl.getSessions(tenantId, streamId);
if (webSocketSessionUtils != null) { if (webSocketSessionRequests != null) {
for (WebSocketSessionRequest webSocketSessionUtil : webSocketSessionUtils) { for (WebSocketSessionRequest webSocketSessionRequest : webSocketSessionRequests) {
boolean isValidSession; if (validateJsonMessageAgainstEventFilters(eventString, webSocketSessionRequest)) {
if (event instanceof Event) { validSessions.add(webSocketSessionRequest);
isValidSession = validateEventAgainstSessionFilters((Event) event, webSocketSessionUtil);
} else {
isValidSession = validateJsonMessageAgainstEventFilters(event.toString(), webSocketSessionUtil);
}
if (isValidSession) {
validSessions.add(webSocketSessionUtil);
} }
} }
} }
return validSessions; return validSessions;
} }
private boolean validateJsonMessageAgainstEventFilters(String eventString, WebSocketSessionRequest webSocketSessionRequest) {
/**
* Processes the given session's validity to receive the current "event" against any queryParams that was used at
* the time when the web-socket-session is subscribed. This method can be extended to validate the event against
* any additional attribute of the given session too.
*
* @param event the current event received and that which needs to be published to subscribed
* sessions.
* @param webSocketSessionUtil the session which needs validated for its authenticity to receive this event.
* @return "true" if the session is valid to receive the event else "false".
*/
private boolean validateEventAgainstSessionFilters(Event event, WebSocketSessionRequest webSocketSessionUtil) {
// fetch the queryString Key:Value pair map of the given session.
Map<String, String> queryParamValuePairs = webSocketSessionUtil.getQueryParamValuePairs();
if (queryParamValuePairs != null && !queryParamValuePairs.isEmpty()) {
// fetch the different attribute values received as part of the current event.
Object[] eventMetaData = event.getMetaData();
Object[] eventCorrelationData = event.getCorrelationData();
Object[] eventPayloadData = event.getPayloadData();
if (streamMetaAttributes != null) {
for (int i = 0; i < streamMetaAttributes.size(); i++) {
String attributeName = streamMetaAttributes.get(i).getName();
String queryValue = queryParamValuePairs.get(attributeName);
if (queryValue != null &&
(eventMetaData == null || !eventMetaData[i].toString().equals(queryValue))) {
return false;
}
}
}
if (streamCorrelationAttributes != null) {
for (int i = 0; i < streamCorrelationAttributes.size(); i++) {
String attributeName = streamCorrelationAttributes.get(i).getName();
String queryValue = queryParamValuePairs.get(attributeName);
if (queryValue != null &&
(eventCorrelationData == null || !eventCorrelationData[i].toString().equals(queryValue))) {
return false;
}
}
}
if (streamPayloadAttributes != null) {
for (int i = 0; i < streamPayloadAttributes.size(); i++) {
String attributeName = streamPayloadAttributes.get(i).getName();
String queryValue = queryParamValuePairs.get(attributeName);
if (queryValue != null && (eventPayloadData == null || !eventPayloadData[i].toString().equals(
queryValue))) {
return false;
}
}
}
}
return true;
}
private boolean validateJsonMessageAgainstEventFilters(String jsonMessage, WebSocketSessionRequest webSocketSessionRequest) {
Map<String, String> queryParamValuePairs = webSocketSessionRequest.getQueryParamValuePairs(); Map<String, String> queryParamValuePairs = webSocketSessionRequest.getQueryParamValuePairs();
if (queryParamValuePairs != null && !queryParamValuePairs.isEmpty()) { String deviceId = queryParamValuePairs.get(WebsocketConstants.DEVICE_ID);
// fetch the different attribute values received as part of the current event. String deviceType = queryParamValuePairs.get(WebsocketConstants.DEVICE_TYPE);
Set<String> queryParams = queryParamValuePairs.keySet(); JSONObject eventObj = new JSONObject(eventString);
for (String aQueryParam : queryParams) { if (deviceId != null && !deviceId.equals(eventObj.getString(WebsocketConstants.DEVICE_ID))) {
try { return false;
}
String queryValue = queryParamValuePairs.get(aQueryParam); if (deviceType != null && !deviceType.equals(eventObj.getString(WebsocketConstants.DEVICE_TYPE))) {
if (queryValue != null && !queryValue.trim().isEmpty()) { return false;
JSONObject jsonObject = new JSONObject(jsonMessage);
JSONObject event = jsonObject.getJSONObject(WebsocketConstants.EVENT);
JSONObject data;
if (!event.isNull(WebsocketConstants.META_DATA)) {
data = event.getJSONObject(WebsocketConstants.META_DATA);
if (!data.isNull(aQueryParam)) {
String eventValue = data.get(aQueryParam).toString();
if (eventValue == null || !eventValue.equalsIgnoreCase(queryValue)) {
return false;
}
}
}
if (!event.isNull(WebsocketConstants.PAYLOAD_DATA)) {
data = event.getJSONObject(WebsocketConstants.PAYLOAD_DATA);
if (!data.isNull(aQueryParam)) {
String eventValue = data.get(aQueryParam).toString();
if (eventValue == null || !eventValue.equalsIgnoreCase(queryValue)) {
return false;
}
}
}
}
} catch (JSONException e) {
//do nothing - This exception is thrown when the event does not have query parameter.
}
}
} }
return true; return true;
} }

@ -21,8 +21,6 @@ import feign.Client;
import feign.Feign; import feign.Feign;
import feign.FeignException; import feign.FeignException;
import feign.Logger; import feign.Logger;
import feign.Request;
import feign.Response;
import feign.gson.GsonDecoder; import feign.gson.GsonDecoder;
import feign.gson.GsonEncoder; import feign.gson.GsonEncoder;
import feign.jaxrs.JAXRSContract; import feign.jaxrs.JAXRSContract;
@ -37,6 +35,7 @@ import org.wso2.carbon.device.mgt.output.adapter.websocket.authorization.client.
.DeviceAccessAuthorizationAdminService; .DeviceAccessAuthorizationAdminService;
import org.wso2.carbon.device.mgt.output.adapter.websocket.authorization.client.dto.DeviceAuthorizationResult; import org.wso2.carbon.device.mgt.output.adapter.websocket.authorization.client.dto.DeviceAuthorizationResult;
import org.wso2.carbon.device.mgt.output.adapter.websocket.authorization.client.dto.DeviceIdentifier; import org.wso2.carbon.device.mgt.output.adapter.websocket.authorization.client.dto.DeviceIdentifier;
import org.wso2.carbon.device.mgt.output.adapter.websocket.constants.WebsocketConstants;
import org.wso2.carbon.device.mgt.output.adapter.websocket.util.PropertyUtils; import org.wso2.carbon.device.mgt.output.adapter.websocket.util.PropertyUtils;
import org.wso2.carbon.device.mgt.output.adapter.websocket.util.WebSocketSessionRequest; import org.wso2.carbon.device.mgt.output.adapter.websocket.util.WebSocketSessionRequest;
import org.wso2.carbon.event.output.adapter.core.exception.OutputEventAdapterException; import org.wso2.carbon.event.output.adapter.core.exception.OutputEventAdapterException;
@ -49,7 +48,6 @@ import java.io.InputStream;
import java.security.*; import java.security.*;
import java.security.cert.CertificateException; import java.security.cert.CertificateException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -61,8 +59,6 @@ public class DeviceAuthorizer implements Authorizer {
private static DeviceAccessAuthorizationAdminService deviceAccessAuthorizationAdminService; private static DeviceAccessAuthorizationAdminService deviceAccessAuthorizationAdminService;
private static final String CDMF_SERVER_BASE_CONTEXT = "/api/device-mgt/v1.0"; private static final String CDMF_SERVER_BASE_CONTEXT = "/api/device-mgt/v1.0";
private static final String DEVICE_MGT_SERVER_URL = "deviceMgtServerUrl"; private static final String DEVICE_MGT_SERVER_URL = "deviceMgtServerUrl";
private static final String DEVICE_ID = "deviceId";
private static final String DEVICE_TYPE = "deviceType";
private static Log log = LogFactory.getLog(DeviceAuthorizer.class); private static Log log = LogFactory.getLog(DeviceAuthorizer.class);
public DeviceAuthorizer() { public DeviceAuthorizer() {
} }
@ -84,8 +80,8 @@ public class DeviceAuthorizer implements Authorizer {
public synchronized boolean isAuthorized(AuthenticationInfo authenticationInfo, Session session, String stream) { public synchronized boolean isAuthorized(AuthenticationInfo authenticationInfo, Session session, String stream) {
WebSocketSessionRequest webSocketSessionRequest = new WebSocketSessionRequest(session); WebSocketSessionRequest webSocketSessionRequest = new WebSocketSessionRequest(session);
Map<String, String> queryParams = webSocketSessionRequest.getQueryParamValuePairs(); Map<String, String> queryParams = webSocketSessionRequest.getQueryParamValuePairs();
String deviceId = queryParams.get(DEVICE_ID); String deviceId = queryParams.get(WebsocketConstants.DEVICE_ID);
String deviceType = queryParams.get(DEVICE_TYPE); String deviceType = queryParams.get(WebsocketConstants.DEVICE_TYPE);
if (deviceId != null && !deviceId.isEmpty() && deviceType != null && !deviceType.isEmpty()) { if (deviceId != null && !deviceId.isEmpty() && deviceType != null && !deviceType.isEmpty()) {

@ -33,8 +33,6 @@ public class WebsocketConstants {
public static final String TOKEN_VALIDATION_CONTEX = "/services/OAuth2TokenValidationService"; public static final String TOKEN_VALIDATION_CONTEX = "/services/OAuth2TokenValidationService";
public static final String USERNAME = "username"; public static final String USERNAME = "username";
public static final String PASSWORD = "password"; public static final String PASSWORD = "password";
public static final String TOKEN_PARAM = "token"; public static final String DEVICE_ID = "deviceId";
public static final String META_DATA = "metaData"; public static final String DEVICE_TYPE = "deviceType";
public static final String PAYLOAD_DATA = "payloadData";
public static final String EVENT = "event";
} }

Loading…
Cancel
Save