diff --git a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/WebsocketEventAdapter.java b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/WebsocketEventAdapter.java index ed50d9f21..ac76d66c8 100644 --- a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/WebsocketEventAdapter.java +++ b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/WebsocketEventAdapter.java @@ -20,17 +20,15 @@ package org.wso2.carbon.device.mgt.output.adapter.websocket; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.json.JSONException; import org.json.JSONObject; import org.wso2.carbon.context.CarbonContext; import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.databridge.commons.Attribute; -import org.wso2.carbon.databridge.commons.Event; import org.wso2.carbon.databridge.commons.StreamDefinition; import org.wso2.carbon.device.mgt.output.adapter.websocket.constants.WebsocketConstants; import org.wso2.carbon.device.mgt.output.adapter.websocket.internal.WebsocketEventAdaptorServiceDataHolder; -import org.wso2.carbon.device.mgt.output.adapter.websocket.util.WebsocketEventAdapterConstants; import org.wso2.carbon.device.mgt.output.adapter.websocket.util.WebSocketSessionRequest; +import org.wso2.carbon.device.mgt.output.adapter.websocket.util.WebsocketEventAdapterConstants; import org.wso2.carbon.event.output.adapter.core.EventAdapterUtil; import org.wso2.carbon.event.output.adapter.core.OutputEventAdapter; import org.wso2.carbon.event.output.adapter.core.OutputEventAdapterConfiguration; @@ -43,7 +41,6 @@ import org.wso2.carbon.event.stream.core.exception.EventStreamConfigurationExcep import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingDeque; @@ -68,9 +65,6 @@ public class WebsocketEventAdapter implements OutputEventAdapter { private boolean doLogDroppedMessage; private String streamId; - private List streamMetaAttributes; - private List streamCorrelationAttributes; - private List streamPayloadAttributes; public WebsocketEventAdapter(OutputEventAdapterConfiguration eventAdapterConfiguration, Map globalProperties) { @@ -128,23 +122,15 @@ public class WebsocketEventAdapter implements OutputEventAdapter { throw new OutputEventAdapterRuntimeException("UI event adapter needs a output stream id"); } - // fetch the "streamDefinition" corresponding to the "streamId" and then fetch the different attribute types - // of the streamDefinition corresponding to the event's streamId. They are required when validating values in - // the events against the streamDef attributes. - StreamDefinition streamDefinition = getStreamDefinition(streamId); - streamMetaAttributes = streamDefinition.getMetaData(); - streamCorrelationAttributes = streamDefinition.getCorrelationData(); - streamPayloadAttributes = streamDefinition.getPayloadData(); - - ConcurrentHashMap> tenantSpecifcEventOutputAdapterMap = + ConcurrentHashMap> tenantSpecificEventOutputAdapterMap = WebsocketEventAdaptorServiceDataHolder.getTenantSpecificOutputEventStreamAdapterMap(); - ConcurrentHashMap streamSpecifAdapterMap = tenantSpecifcEventOutputAdapterMap.get(tenantId); + ConcurrentHashMap streamSpecifAdapterMap = tenantSpecificEventOutputAdapterMap.get(tenantId); if (streamSpecifAdapterMap == null) { streamSpecifAdapterMap = new ConcurrentHashMap<>(); - if (null != tenantSpecifcEventOutputAdapterMap.putIfAbsent(tenantId, streamSpecifAdapterMap)) { - streamSpecifAdapterMap = tenantSpecifcEventOutputAdapterMap.get(tenantId); + if (null != tenantSpecificEventOutputAdapterMap.putIfAbsent(tenantId, streamSpecifAdapterMap)) { + streamSpecifAdapterMap = tenantSpecificEventOutputAdapterMap.get(tenantId); } } @@ -200,61 +186,10 @@ public class WebsocketEventAdapter implements OutputEventAdapter { @Override public void publish(Object message, Map dynamicProperties) { - String eventString; if (streamSpecificEvents.size() == queueSize) { streamSpecificEvents.removeFirst(); } - if (message instanceof Event) { - Event event = (Event) message; - StringBuilder eventBuilder = new StringBuilder("["); - eventBuilder.append(event.getTimeStamp()); - - if (event.getMetaData() != null) { - eventBuilder.append(","); - Object[] metaData = event.getMetaData(); - for (int i = 0; i < metaData.length; i++) { - eventBuilder.append("\""); - eventBuilder.append(metaData[i]); - eventBuilder.append("\""); - if (i != (metaData.length - 1)) { - eventBuilder.append(","); - } - } - } - - if (event.getCorrelationData() != null) { - Object[] correlationData = event.getCorrelationData(); - - eventBuilder.append(","); - - for (int i = 0; i < correlationData.length; i++) { - eventBuilder.append("\""); - eventBuilder.append(correlationData[i]); - eventBuilder.append("\""); - if (i != (correlationData.length - 1)) { - eventBuilder.append(","); - } - } - } - - if (event.getPayloadData() != null) { - Object[] payloadData = event.getPayloadData(); - eventBuilder.append(","); - for (int i = 0; i < payloadData.length; i++) { - eventBuilder.append("\""); - eventBuilder.append(payloadData[i]); - eventBuilder.append("\""); - if (i != (payloadData.length - 1)) { - eventBuilder.append(","); - } - } - } - - eventBuilder.append("]"); - eventString = eventBuilder.toString(); - } else { - eventString = message.toString(); - } + String eventString = message.toString(); Object[] eventValues = new Object[WebsocketEventAdapterConstants.INDEX_TWO]; eventValues[WebsocketEventAdapterConstants.INDEX_ZERO] = eventString; @@ -262,7 +197,7 @@ public class WebsocketEventAdapter implements OutputEventAdapter { streamSpecificEvents.add(eventValues); // fetch all valid sessions checked against any queryParameters provided when subscribing. - CopyOnWriteArrayList validSessions = getValidSessions(message); + CopyOnWriteArrayList validSessions = getValidSessions(eventString); try { executorService.execute(new WebSocketSender(validSessions, eventString)); @@ -332,131 +267,36 @@ public class WebsocketEventAdapter implements OutputEventAdapter { * Fetches all valid web-socket sessions from the entire pool of subscribed sessions. The validity is checked * against any queryString provided when subscribing to the web-socket endpoint. * - * @param event the current event received and that which needs to be published to subscribed sessions. + * @param eventString the current event received and that which needs to be published to subscribed sessions. * @return a list of all validated web-socket sessions against the queryString values. */ - private CopyOnWriteArrayList getValidSessions(Object event) { + private CopyOnWriteArrayList getValidSessions(String eventString) { CopyOnWriteArrayList validSessions = new CopyOnWriteArrayList<>(); WebsocketOutputCallbackControllerServiceImpl websocketOutputCallbackControllerServiceImpl = WebsocketEventAdaptorServiceDataHolder.getUIOutputCallbackRegisterServiceImpl(); // get all subscribed web-socket sessions. - CopyOnWriteArrayList webSocketSessionUtils = + CopyOnWriteArrayList webSocketSessionRequests = websocketOutputCallbackControllerServiceImpl.getSessions(tenantId, streamId); - if (webSocketSessionUtils != null) { - for (WebSocketSessionRequest webSocketSessionUtil : webSocketSessionUtils) { - boolean isValidSession; - if (event instanceof Event) { - isValidSession = validateEventAgainstSessionFilters((Event) event, webSocketSessionUtil); - } else { - isValidSession = validateJsonMessageAgainstEventFilters(event.toString(), webSocketSessionUtil); - } - if (isValidSession) { - validSessions.add(webSocketSessionUtil); + if (webSocketSessionRequests != null) { + for (WebSocketSessionRequest webSocketSessionRequest : webSocketSessionRequests) { + if (validateJsonMessageAgainstEventFilters(eventString, webSocketSessionRequest)) { + validSessions.add(webSocketSessionRequest); } } } return validSessions; } - - /** - * Processes the given session's validity to receive the current "event" against any queryParams that was used at - * the time when the web-socket-session is subscribed. This method can be extended to validate the event against - * any additional attribute of the given session too. - * - * @param event the current event received and that which needs to be published to subscribed - * sessions. - * @param webSocketSessionUtil the session which needs validated for its authenticity to receive this event. - * @return "true" if the session is valid to receive the event else "false". - */ - private boolean validateEventAgainstSessionFilters(Event event, WebSocketSessionRequest webSocketSessionUtil) { - - // fetch the queryString Key:Value pair map of the given session. - Map queryParamValuePairs = webSocketSessionUtil.getQueryParamValuePairs(); - if (queryParamValuePairs != null && !queryParamValuePairs.isEmpty()) { - // fetch the different attribute values received as part of the current event. - Object[] eventMetaData = event.getMetaData(); - Object[] eventCorrelationData = event.getCorrelationData(); - Object[] eventPayloadData = event.getPayloadData(); - - if (streamMetaAttributes != null) { - for (int i = 0; i < streamMetaAttributes.size(); i++) { - String attributeName = streamMetaAttributes.get(i).getName(); - String queryValue = queryParamValuePairs.get(attributeName); - - if (queryValue != null && - (eventMetaData == null || !eventMetaData[i].toString().equals(queryValue))) { - return false; - } - } - } - - if (streamCorrelationAttributes != null) { - for (int i = 0; i < streamCorrelationAttributes.size(); i++) { - String attributeName = streamCorrelationAttributes.get(i).getName(); - String queryValue = queryParamValuePairs.get(attributeName); - - if (queryValue != null && - (eventCorrelationData == null || !eventCorrelationData[i].toString().equals(queryValue))) { - return false; - } - } - } - - if (streamPayloadAttributes != null) { - for (int i = 0; i < streamPayloadAttributes.size(); i++) { - String attributeName = streamPayloadAttributes.get(i).getName(); - String queryValue = queryParamValuePairs.get(attributeName); - - if (queryValue != null && (eventPayloadData == null || !eventPayloadData[i].toString().equals( - queryValue))) { - return false; - } - } - } - } - return true; - } - - - private boolean validateJsonMessageAgainstEventFilters(String jsonMessage, WebSocketSessionRequest webSocketSessionRequest) { + private boolean validateJsonMessageAgainstEventFilters(String eventString, WebSocketSessionRequest webSocketSessionRequest) { Map queryParamValuePairs = webSocketSessionRequest.getQueryParamValuePairs(); - if (queryParamValuePairs != null && !queryParamValuePairs.isEmpty()) { - // fetch the different attribute values received as part of the current event. - Set queryParams = queryParamValuePairs.keySet(); - for (String aQueryParam : queryParams) { - try { - - String queryValue = queryParamValuePairs.get(aQueryParam); - if (queryValue != null && !queryValue.trim().isEmpty()) { - JSONObject jsonObject = new JSONObject(jsonMessage); - JSONObject event = jsonObject.getJSONObject(WebsocketConstants.EVENT); - JSONObject data; - if (!event.isNull(WebsocketConstants.META_DATA)) { - data = event.getJSONObject(WebsocketConstants.META_DATA); - if (!data.isNull(aQueryParam)) { - String eventValue = data.get(aQueryParam).toString(); - if (eventValue == null || !eventValue.equalsIgnoreCase(queryValue)) { - return false; - } - } - - } - - if (!event.isNull(WebsocketConstants.PAYLOAD_DATA)) { - data = event.getJSONObject(WebsocketConstants.PAYLOAD_DATA); - if (!data.isNull(aQueryParam)) { - String eventValue = data.get(aQueryParam).toString(); - if (eventValue == null || !eventValue.equalsIgnoreCase(queryValue)) { - return false; - } - } - } - } - } catch (JSONException e) { - //do nothing - This exception is thrown when the event does not have query parameter. - } - } + String deviceId = queryParamValuePairs.get(WebsocketConstants.DEVICE_ID); + String deviceType = queryParamValuePairs.get(WebsocketConstants.DEVICE_TYPE); + JSONObject eventObj = new JSONObject(eventString); + if (deviceId != null && !deviceId.equals(eventObj.getString(WebsocketConstants.DEVICE_ID))) { + return false; + } + if (deviceType != null && !deviceType.equals(eventObj.getString(WebsocketConstants.DEVICE_TYPE))) { + return false; } return true; } diff --git a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/authorization/DeviceAuthorizer.java b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/authorization/DeviceAuthorizer.java index 12e42a518..bdf06f9d9 100644 --- a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/authorization/DeviceAuthorizer.java +++ b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/authorization/DeviceAuthorizer.java @@ -21,8 +21,6 @@ import feign.Client; import feign.Feign; import feign.FeignException; import feign.Logger; -import feign.Request; -import feign.Response; import feign.gson.GsonDecoder; import feign.gson.GsonEncoder; import feign.jaxrs.JAXRSContract; @@ -37,6 +35,7 @@ import org.wso2.carbon.device.mgt.output.adapter.websocket.authorization.client. .DeviceAccessAuthorizationAdminService; import org.wso2.carbon.device.mgt.output.adapter.websocket.authorization.client.dto.DeviceAuthorizationResult; import org.wso2.carbon.device.mgt.output.adapter.websocket.authorization.client.dto.DeviceIdentifier; +import org.wso2.carbon.device.mgt.output.adapter.websocket.constants.WebsocketConstants; import org.wso2.carbon.device.mgt.output.adapter.websocket.util.PropertyUtils; import org.wso2.carbon.device.mgt.output.adapter.websocket.util.WebSocketSessionRequest; import org.wso2.carbon.event.output.adapter.core.exception.OutputEventAdapterException; @@ -49,7 +48,6 @@ import java.io.InputStream; import java.security.*; import java.security.cert.CertificateException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; @@ -61,8 +59,6 @@ public class DeviceAuthorizer implements Authorizer { private static DeviceAccessAuthorizationAdminService deviceAccessAuthorizationAdminService; private static final String CDMF_SERVER_BASE_CONTEXT = "/api/device-mgt/v1.0"; private static final String DEVICE_MGT_SERVER_URL = "deviceMgtServerUrl"; - private static final String DEVICE_ID = "deviceId"; - private static final String DEVICE_TYPE = "deviceType"; private static Log log = LogFactory.getLog(DeviceAuthorizer.class); public DeviceAuthorizer() { } @@ -84,8 +80,8 @@ public class DeviceAuthorizer implements Authorizer { public synchronized boolean isAuthorized(AuthenticationInfo authenticationInfo, Session session, String stream) { WebSocketSessionRequest webSocketSessionRequest = new WebSocketSessionRequest(session); Map queryParams = webSocketSessionRequest.getQueryParamValuePairs(); - String deviceId = queryParams.get(DEVICE_ID); - String deviceType = queryParams.get(DEVICE_TYPE); + String deviceId = queryParams.get(WebsocketConstants.DEVICE_ID); + String deviceType = queryParams.get(WebsocketConstants.DEVICE_TYPE); if (deviceId != null && !deviceId.isEmpty() && deviceType != null && !deviceType.isEmpty()) { diff --git a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/constants/WebsocketConstants.java b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/constants/WebsocketConstants.java index 743e51c21..3af5ca044 100644 --- a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/constants/WebsocketConstants.java +++ b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/constants/WebsocketConstants.java @@ -33,8 +33,6 @@ public class WebsocketConstants { public static final String TOKEN_VALIDATION_CONTEX = "/services/OAuth2TokenValidationService"; public static final String USERNAME = "username"; public static final String PASSWORD = "password"; - public static final String TOKEN_PARAM = "token"; - public static final String META_DATA = "metaData"; - public static final String PAYLOAD_DATA = "payloadData"; - public static final String EVENT = "event"; + public static final String DEVICE_ID = "deviceId"; + public static final String DEVICE_TYPE = "deviceType"; }