diff --git a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket.endpoint/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/endpoint/SuperTenantSubscriptionEndpoint.java b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket.endpoint/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/endpoint/SuperTenantSubscriptionEndpoint.java index ee01f9dc2..53b40d1ba 100644 --- a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket.endpoint/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/endpoint/SuperTenantSubscriptionEndpoint.java +++ b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket.endpoint/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/endpoint/SuperTenantSubscriptionEndpoint.java @@ -75,6 +75,8 @@ public class SuperTenantSubscriptionEndpoint extends SubscriptionEndpoint { } finally { PrivilegedCarbonContext.endTenantFlow(); } + } else { + log.info("Failed to authorize the connection for the stream : "+ streamName+" , version : "+ version); } } else { try { diff --git a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket.endpoint/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/endpoint/TenantSubscriptionEndpoint.java b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket.endpoint/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/endpoint/TenantSubscriptionEndpoint.java index c08504903..553cdcbea 100644 --- a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket.endpoint/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/endpoint/TenantSubscriptionEndpoint.java +++ b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket.endpoint/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/endpoint/TenantSubscriptionEndpoint.java @@ -74,6 +74,12 @@ public class TenantSubscriptionEndpoint extends SubscriptionEndpoint { } finally { PrivilegedCarbonContext.endTenantFlow(); } + } else { + try { + session.close(new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, "Unauthorized Access")); + } catch (IOException e) { + log.error("Failed to disconnect the unauthorized client.", e); + } } } else { try { diff --git a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/UIEventAdapter.java b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/UIEventAdapter.java index 15d5e24e0..4d622d8e7 100644 --- a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/UIEventAdapter.java +++ b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/UIEventAdapter.java @@ -20,11 +20,14 @@ package org.wso2.carbon.device.mgt.output.adapter.websocket; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.json.JSONException; +import org.json.JSONObject; import org.wso2.carbon.context.CarbonContext; import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.databridge.commons.Attribute; import org.wso2.carbon.databridge.commons.Event; import org.wso2.carbon.databridge.commons.StreamDefinition; +import org.wso2.carbon.device.mgt.output.adapter.websocket.constants.WebsocketConstants; import org.wso2.carbon.device.mgt.output.adapter.websocket.internal.UIEventAdaptorServiceDataHolder; import org.wso2.carbon.device.mgt.output.adapter.websocket.util.UIEventAdapterConstants; import org.wso2.carbon.device.mgt.output.adapter.websocket.util.WebSocketSessionRequest; @@ -40,6 +43,7 @@ import org.wso2.carbon.event.stream.core.exception.EventStreamConfigurationExcep import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingDeque; @@ -116,7 +120,7 @@ public class UIEventAdapter implements OutputEventAdapter { } executorService = new ThreadPoolExecutor(minThread, maxThread, defaultKeepAliveTime, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue(jobQueSize)); + new LinkedBlockingQueue(jobQueSize)); } streamId = eventAdapterConfiguration.getOutputStreamIdOfWso2eventMessageFormat(); @@ -196,71 +200,75 @@ public class UIEventAdapter implements OutputEventAdapter { @Override public void publish(Object message, Map dynamicProperties) { - Event event = (Event) message; - StringBuilder eventBuilder = new StringBuilder("["); - + String eventString; if (streamSpecificEvents.size() == queueSize) { streamSpecificEvents.removeFirst(); } - - eventBuilder.append(event.getTimeStamp()); - - if (event.getMetaData() != null) { - eventBuilder.append(","); - Object[] metaData = event.getMetaData(); - for (int i = 0; i < metaData.length; i++) { - eventBuilder.append("\""); - eventBuilder.append(metaData[i]); - eventBuilder.append("\""); - if (i != (metaData.length - 1)) { - eventBuilder.append(","); + if (message instanceof Event) { + Event event = (Event) message; + StringBuilder eventBuilder = new StringBuilder("["); + eventBuilder.append(event.getTimeStamp()); + + if (event.getMetaData() != null) { + eventBuilder.append(","); + Object[] metaData = event.getMetaData(); + for (int i = 0; i < metaData.length; i++) { + eventBuilder.append("\""); + eventBuilder.append(metaData[i]); + eventBuilder.append("\""); + if (i != (metaData.length - 1)) { + eventBuilder.append(","); + } } } - } - if (event.getCorrelationData() != null) { - Object[] correlationData = event.getCorrelationData(); + if (event.getCorrelationData() != null) { + Object[] correlationData = event.getCorrelationData(); - eventBuilder.append(","); + eventBuilder.append(","); - for (int i = 0; i < correlationData.length; i++) { - eventBuilder.append("\""); - eventBuilder.append(correlationData[i]); - eventBuilder.append("\""); - if (i != (correlationData.length - 1)) { - eventBuilder.append(","); + for (int i = 0; i < correlationData.length; i++) { + eventBuilder.append("\""); + eventBuilder.append(correlationData[i]); + eventBuilder.append("\""); + if (i != (correlationData.length - 1)) { + eventBuilder.append(","); + } } } - } - if (event.getPayloadData() != null) { - Object[] payloadData = event.getPayloadData(); - eventBuilder.append(","); - for (int i = 0; i < payloadData.length; i++) { - eventBuilder.append("\""); - eventBuilder.append(payloadData[i]); - eventBuilder.append("\""); - if (i != (payloadData.length - 1)) { - eventBuilder.append(","); + if (event.getPayloadData() != null) { + Object[] payloadData = event.getPayloadData(); + eventBuilder.append(","); + for (int i = 0; i < payloadData.length; i++) { + eventBuilder.append("\""); + eventBuilder.append(payloadData[i]); + eventBuilder.append("\""); + if (i != (payloadData.length - 1)) { + eventBuilder.append(","); + } } } + + eventBuilder.append("]"); + eventString = eventBuilder.toString(); + } else { + eventString = message.toString(); } - eventBuilder.append("]"); - String eventString = eventBuilder.toString(); Object[] eventValues = new Object[UIEventAdapterConstants.INDEX_TWO]; eventValues[UIEventAdapterConstants.INDEX_ZERO] = eventString; eventValues[UIEventAdapterConstants.INDEX_ONE] = System.currentTimeMillis(); streamSpecificEvents.add(eventValues); // fetch all valid sessions checked against any queryParameters provided when subscribing. - CopyOnWriteArrayList validSessions = getValidSessions(event); + CopyOnWriteArrayList validSessions = getValidSessions(message); try { executorService.execute(new WebSocketSender(validSessions, eventString)); } catch (RejectedExecutionException e) { EventAdapterUtil.logAndDrop(eventAdapterConfiguration.getName(), message, "Job queue is full", e, log, - tenantId); + tenantId); } } @@ -327,7 +335,7 @@ public class UIEventAdapter implements OutputEventAdapter { * @param event the current event received and that which needs to be published to subscribed sessions. * @return a list of all validated web-socket sessions against the queryString values. */ - private CopyOnWriteArrayList getValidSessions(Event event) { + private CopyOnWriteArrayList getValidSessions(Object event) { CopyOnWriteArrayList validSessions = new CopyOnWriteArrayList<>(); UIOutputCallbackControllerServiceImpl uiOutputCallbackControllerServiceImpl = UIEventAdaptorServiceDataHolder.getUIOutputCallbackRegisterServiceImpl(); @@ -336,7 +344,12 @@ public class UIEventAdapter implements OutputEventAdapter { uiOutputCallbackControllerServiceImpl.getSessions(tenantId, streamId); if (webSocketSessionUtils != null) { for (WebSocketSessionRequest webSocketSessionUtil : webSocketSessionUtils) { - boolean isValidSession = validateEventAgainstSessionFilters(event, webSocketSessionUtil); + boolean isValidSession; + if (event instanceof Event) { + isValidSession = validateEventAgainstSessionFilters((Event) event, webSocketSessionUtil); + } else { + isValidSession = validateJsonMessageAgainstEventFilters(event.toString(), webSocketSessionUtil); + } if (isValidSession) { validSessions.add(webSocketSessionUtil); } @@ -351,8 +364,8 @@ public class UIEventAdapter implements OutputEventAdapter { * the time when the web-socket-session is subscribed. This method can be extended to validate the event against * any additional attribute of the given session too. * - * @param event the current event received and that which needs to be published to subscribed - * sessions. + * @param event the current event received and that which needs to be published to subscribed + * sessions. * @param webSocketSessionUtil the session which needs validated for its authenticity to receive this event. * @return "true" if the session is valid to receive the event else "false". */ @@ -405,6 +418,36 @@ public class UIEventAdapter implements OutputEventAdapter { return true; } + + private boolean validateJsonMessageAgainstEventFilters(String jsonMessage, WebSocketSessionRequest webSocketSessionRequest) { + Map queryParamValuePairs = webSocketSessionRequest.getQueryParamValuePairs(); + if (queryParamValuePairs != null && !queryParamValuePairs.isEmpty()) { + // fetch the different attribute values received as part of the current event. + Set queryParams = queryParamValuePairs.keySet(); + for (String aQueryParam : queryParams) { + if (!aQueryParam.equalsIgnoreCase(WebsocketConstants.TOKEN_PARAM)) { + try { + String queryValue = queryParamValuePairs.get(aQueryParam); + if (queryValue != null && !queryValue.trim().isEmpty()) { + JSONObject jsonObject = new JSONObject(jsonMessage); + String eventValue = jsonObject.getString(aQueryParam); + if (eventValue == null || !eventValue.equalsIgnoreCase(queryValue)) { + return false; + } + } + } catch (JSONException e) { + if (log.isDebugEnabled()) { + log.debug("Unable validate the stream filter properties for event : " + jsonMessage + + " ", e); + } + return false; + } + } + } + } + return true; + } + private class WebSocketSender implements Runnable { private String message; @@ -436,13 +479,13 @@ public class UIEventAdapter implements OutputEventAdapter { webSocketSessionUtil.getSession().getBasicRemote().sendText(message); } catch (IOException e) { EventAdapterUtil.logAndDrop(eventAdapterConfiguration.getName(), message, - "Cannot send to endpoint", e, log, tenantId); + "Cannot send to endpoint", e, log, tenantId); } } } } else if (doLogDroppedMessage) { EventAdapterUtil.logAndDrop(eventAdapterConfiguration.getName(), message, "No clients registered", log, - tenantId); + tenantId); doLogDroppedMessage = false; } } diff --git a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/UIEventAdapterFactory.java b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/UIEventAdapterFactory.java index 3348711d1..6233f6eaa 100644 --- a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/UIEventAdapterFactory.java +++ b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/UIEventAdapterFactory.java @@ -51,6 +51,7 @@ public class UIEventAdapterFactory extends OutputEventAdapterFactory { public List getSupportedMessageFormats() { List supportedMessageFormats = new ArrayList(); supportedMessageFormats.add(MessageType.WSO2EVENT); + supportedMessageFormats.add(MessageType.JSON); return supportedMessageFormats; } diff --git a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/constants/WebsocketConstants.java b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/constants/WebsocketConstants.java index 8aea2637f..5d459cd2e 100644 --- a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/constants/WebsocketConstants.java +++ b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/constants/WebsocketConstants.java @@ -32,4 +32,5 @@ public class WebsocketConstants { public static final String TOKEN_VALIDATION_ENDPOINT_URL = "tokenValidationEndpoint"; public static final String USERNAME = "username"; public static final String PASSWORD = "password"; + public static final String TOKEN_PARAM = "token"; }