diff --git a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/pom.xml b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/pom.xml
index 507876904e..299a9642bf 100644
--- a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/pom.xml
+++ b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/pom.xml
@@ -21,7 +21,7 @@
org.wso2.carbon.devicemgt-plugins
cdmf-transport-adapters
- 4.1.19-SNAPSHOT
+ 4.1.18
../../pom.xml
diff --git a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/WebsocketEventAdapter.java b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/WebsocketEventAdapter.java
index ed50d9f21b..ac76d66c8b 100644
--- a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/WebsocketEventAdapter.java
+++ b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/WebsocketEventAdapter.java
@@ -20,17 +20,15 @@ package org.wso2.carbon.device.mgt.output.adapter.websocket;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.json.JSONException;
import org.json.JSONObject;
import org.wso2.carbon.context.CarbonContext;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.databridge.commons.Attribute;
-import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.device.mgt.output.adapter.websocket.constants.WebsocketConstants;
import org.wso2.carbon.device.mgt.output.adapter.websocket.internal.WebsocketEventAdaptorServiceDataHolder;
-import org.wso2.carbon.device.mgt.output.adapter.websocket.util.WebsocketEventAdapterConstants;
import org.wso2.carbon.device.mgt.output.adapter.websocket.util.WebSocketSessionRequest;
+import org.wso2.carbon.device.mgt.output.adapter.websocket.util.WebsocketEventAdapterConstants;
import org.wso2.carbon.event.output.adapter.core.EventAdapterUtil;
import org.wso2.carbon.event.output.adapter.core.OutputEventAdapter;
import org.wso2.carbon.event.output.adapter.core.OutputEventAdapterConfiguration;
@@ -43,7 +41,6 @@ import org.wso2.carbon.event.stream.core.exception.EventStreamConfigurationExcep
import java.io.IOException;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingDeque;
@@ -68,9 +65,6 @@ public class WebsocketEventAdapter implements OutputEventAdapter {
private boolean doLogDroppedMessage;
private String streamId;
- private List streamMetaAttributes;
- private List streamCorrelationAttributes;
- private List streamPayloadAttributes;
public WebsocketEventAdapter(OutputEventAdapterConfiguration eventAdapterConfiguration, Map globalProperties) {
@@ -128,23 +122,15 @@ public class WebsocketEventAdapter implements OutputEventAdapter {
throw new OutputEventAdapterRuntimeException("UI event adapter needs a output stream id");
}
- // fetch the "streamDefinition" corresponding to the "streamId" and then fetch the different attribute types
- // of the streamDefinition corresponding to the event's streamId. They are required when validating values in
- // the events against the streamDef attributes.
- StreamDefinition streamDefinition = getStreamDefinition(streamId);
- streamMetaAttributes = streamDefinition.getMetaData();
- streamCorrelationAttributes = streamDefinition.getCorrelationData();
- streamPayloadAttributes = streamDefinition.getPayloadData();
-
- ConcurrentHashMap> tenantSpecifcEventOutputAdapterMap =
+ ConcurrentHashMap> tenantSpecificEventOutputAdapterMap =
WebsocketEventAdaptorServiceDataHolder.getTenantSpecificOutputEventStreamAdapterMap();
- ConcurrentHashMap streamSpecifAdapterMap = tenantSpecifcEventOutputAdapterMap.get(tenantId);
+ ConcurrentHashMap streamSpecifAdapterMap = tenantSpecificEventOutputAdapterMap.get(tenantId);
if (streamSpecifAdapterMap == null) {
streamSpecifAdapterMap = new ConcurrentHashMap<>();
- if (null != tenantSpecifcEventOutputAdapterMap.putIfAbsent(tenantId, streamSpecifAdapterMap)) {
- streamSpecifAdapterMap = tenantSpecifcEventOutputAdapterMap.get(tenantId);
+ if (null != tenantSpecificEventOutputAdapterMap.putIfAbsent(tenantId, streamSpecifAdapterMap)) {
+ streamSpecifAdapterMap = tenantSpecificEventOutputAdapterMap.get(tenantId);
}
}
@@ -200,61 +186,10 @@ public class WebsocketEventAdapter implements OutputEventAdapter {
@Override
public void publish(Object message, Map dynamicProperties) {
- String eventString;
if (streamSpecificEvents.size() == queueSize) {
streamSpecificEvents.removeFirst();
}
- if (message instanceof Event) {
- Event event = (Event) message;
- StringBuilder eventBuilder = new StringBuilder("[");
- eventBuilder.append(event.getTimeStamp());
-
- if (event.getMetaData() != null) {
- eventBuilder.append(",");
- Object[] metaData = event.getMetaData();
- for (int i = 0; i < metaData.length; i++) {
- eventBuilder.append("\"");
- eventBuilder.append(metaData[i]);
- eventBuilder.append("\"");
- if (i != (metaData.length - 1)) {
- eventBuilder.append(",");
- }
- }
- }
-
- if (event.getCorrelationData() != null) {
- Object[] correlationData = event.getCorrelationData();
-
- eventBuilder.append(",");
-
- for (int i = 0; i < correlationData.length; i++) {
- eventBuilder.append("\"");
- eventBuilder.append(correlationData[i]);
- eventBuilder.append("\"");
- if (i != (correlationData.length - 1)) {
- eventBuilder.append(",");
- }
- }
- }
-
- if (event.getPayloadData() != null) {
- Object[] payloadData = event.getPayloadData();
- eventBuilder.append(",");
- for (int i = 0; i < payloadData.length; i++) {
- eventBuilder.append("\"");
- eventBuilder.append(payloadData[i]);
- eventBuilder.append("\"");
- if (i != (payloadData.length - 1)) {
- eventBuilder.append(",");
- }
- }
- }
-
- eventBuilder.append("]");
- eventString = eventBuilder.toString();
- } else {
- eventString = message.toString();
- }
+ String eventString = message.toString();
Object[] eventValues = new Object[WebsocketEventAdapterConstants.INDEX_TWO];
eventValues[WebsocketEventAdapterConstants.INDEX_ZERO] = eventString;
@@ -262,7 +197,7 @@ public class WebsocketEventAdapter implements OutputEventAdapter {
streamSpecificEvents.add(eventValues);
// fetch all valid sessions checked against any queryParameters provided when subscribing.
- CopyOnWriteArrayList validSessions = getValidSessions(message);
+ CopyOnWriteArrayList validSessions = getValidSessions(eventString);
try {
executorService.execute(new WebSocketSender(validSessions, eventString));
@@ -332,131 +267,36 @@ public class WebsocketEventAdapter implements OutputEventAdapter {
* Fetches all valid web-socket sessions from the entire pool of subscribed sessions. The validity is checked
* against any queryString provided when subscribing to the web-socket endpoint.
*
- * @param event the current event received and that which needs to be published to subscribed sessions.
+ * @param eventString the current event received and that which needs to be published to subscribed sessions.
* @return a list of all validated web-socket sessions against the queryString values.
*/
- private CopyOnWriteArrayList getValidSessions(Object event) {
+ private CopyOnWriteArrayList getValidSessions(String eventString) {
CopyOnWriteArrayList validSessions = new CopyOnWriteArrayList<>();
WebsocketOutputCallbackControllerServiceImpl websocketOutputCallbackControllerServiceImpl =
WebsocketEventAdaptorServiceDataHolder.getUIOutputCallbackRegisterServiceImpl();
// get all subscribed web-socket sessions.
- CopyOnWriteArrayList webSocketSessionUtils =
+ CopyOnWriteArrayList webSocketSessionRequests =
websocketOutputCallbackControllerServiceImpl.getSessions(tenantId, streamId);
- if (webSocketSessionUtils != null) {
- for (WebSocketSessionRequest webSocketSessionUtil : webSocketSessionUtils) {
- boolean isValidSession;
- if (event instanceof Event) {
- isValidSession = validateEventAgainstSessionFilters((Event) event, webSocketSessionUtil);
- } else {
- isValidSession = validateJsonMessageAgainstEventFilters(event.toString(), webSocketSessionUtil);
- }
- if (isValidSession) {
- validSessions.add(webSocketSessionUtil);
+ if (webSocketSessionRequests != null) {
+ for (WebSocketSessionRequest webSocketSessionRequest : webSocketSessionRequests) {
+ if (validateJsonMessageAgainstEventFilters(eventString, webSocketSessionRequest)) {
+ validSessions.add(webSocketSessionRequest);
}
}
}
return validSessions;
}
-
- /**
- * Processes the given session's validity to receive the current "event" against any queryParams that was used at
- * the time when the web-socket-session is subscribed. This method can be extended to validate the event against
- * any additional attribute of the given session too.
- *
- * @param event the current event received and that which needs to be published to subscribed
- * sessions.
- * @param webSocketSessionUtil the session which needs validated for its authenticity to receive this event.
- * @return "true" if the session is valid to receive the event else "false".
- */
- private boolean validateEventAgainstSessionFilters(Event event, WebSocketSessionRequest webSocketSessionUtil) {
-
- // fetch the queryString Key:Value pair map of the given session.
- Map queryParamValuePairs = webSocketSessionUtil.getQueryParamValuePairs();
- if (queryParamValuePairs != null && !queryParamValuePairs.isEmpty()) {
- // fetch the different attribute values received as part of the current event.
- Object[] eventMetaData = event.getMetaData();
- Object[] eventCorrelationData = event.getCorrelationData();
- Object[] eventPayloadData = event.getPayloadData();
-
- if (streamMetaAttributes != null) {
- for (int i = 0; i < streamMetaAttributes.size(); i++) {
- String attributeName = streamMetaAttributes.get(i).getName();
- String queryValue = queryParamValuePairs.get(attributeName);
-
- if (queryValue != null &&
- (eventMetaData == null || !eventMetaData[i].toString().equals(queryValue))) {
- return false;
- }
- }
- }
-
- if (streamCorrelationAttributes != null) {
- for (int i = 0; i < streamCorrelationAttributes.size(); i++) {
- String attributeName = streamCorrelationAttributes.get(i).getName();
- String queryValue = queryParamValuePairs.get(attributeName);
-
- if (queryValue != null &&
- (eventCorrelationData == null || !eventCorrelationData[i].toString().equals(queryValue))) {
- return false;
- }
- }
- }
-
- if (streamPayloadAttributes != null) {
- for (int i = 0; i < streamPayloadAttributes.size(); i++) {
- String attributeName = streamPayloadAttributes.get(i).getName();
- String queryValue = queryParamValuePairs.get(attributeName);
-
- if (queryValue != null && (eventPayloadData == null || !eventPayloadData[i].toString().equals(
- queryValue))) {
- return false;
- }
- }
- }
- }
- return true;
- }
-
-
- private boolean validateJsonMessageAgainstEventFilters(String jsonMessage, WebSocketSessionRequest webSocketSessionRequest) {
+ private boolean validateJsonMessageAgainstEventFilters(String eventString, WebSocketSessionRequest webSocketSessionRequest) {
Map queryParamValuePairs = webSocketSessionRequest.getQueryParamValuePairs();
- if (queryParamValuePairs != null && !queryParamValuePairs.isEmpty()) {
- // fetch the different attribute values received as part of the current event.
- Set queryParams = queryParamValuePairs.keySet();
- for (String aQueryParam : queryParams) {
- try {
-
- String queryValue = queryParamValuePairs.get(aQueryParam);
- if (queryValue != null && !queryValue.trim().isEmpty()) {
- JSONObject jsonObject = new JSONObject(jsonMessage);
- JSONObject event = jsonObject.getJSONObject(WebsocketConstants.EVENT);
- JSONObject data;
- if (!event.isNull(WebsocketConstants.META_DATA)) {
- data = event.getJSONObject(WebsocketConstants.META_DATA);
- if (!data.isNull(aQueryParam)) {
- String eventValue = data.get(aQueryParam).toString();
- if (eventValue == null || !eventValue.equalsIgnoreCase(queryValue)) {
- return false;
- }
- }
-
- }
-
- if (!event.isNull(WebsocketConstants.PAYLOAD_DATA)) {
- data = event.getJSONObject(WebsocketConstants.PAYLOAD_DATA);
- if (!data.isNull(aQueryParam)) {
- String eventValue = data.get(aQueryParam).toString();
- if (eventValue == null || !eventValue.equalsIgnoreCase(queryValue)) {
- return false;
- }
- }
- }
- }
- } catch (JSONException e) {
- //do nothing - This exception is thrown when the event does not have query parameter.
- }
- }
+ String deviceId = queryParamValuePairs.get(WebsocketConstants.DEVICE_ID);
+ String deviceType = queryParamValuePairs.get(WebsocketConstants.DEVICE_TYPE);
+ JSONObject eventObj = new JSONObject(eventString);
+ if (deviceId != null && !deviceId.equals(eventObj.getString(WebsocketConstants.DEVICE_ID))) {
+ return false;
+ }
+ if (deviceType != null && !deviceType.equals(eventObj.getString(WebsocketConstants.DEVICE_TYPE))) {
+ return false;
}
return true;
}
diff --git a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/authorization/DeviceAuthorizer.java b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/authorization/DeviceAuthorizer.java
index 12e42a5185..bdf06f9d97 100644
--- a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/authorization/DeviceAuthorizer.java
+++ b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/authorization/DeviceAuthorizer.java
@@ -21,8 +21,6 @@ import feign.Client;
import feign.Feign;
import feign.FeignException;
import feign.Logger;
-import feign.Request;
-import feign.Response;
import feign.gson.GsonDecoder;
import feign.gson.GsonEncoder;
import feign.jaxrs.JAXRSContract;
@@ -37,6 +35,7 @@ import org.wso2.carbon.device.mgt.output.adapter.websocket.authorization.client.
.DeviceAccessAuthorizationAdminService;
import org.wso2.carbon.device.mgt.output.adapter.websocket.authorization.client.dto.DeviceAuthorizationResult;
import org.wso2.carbon.device.mgt.output.adapter.websocket.authorization.client.dto.DeviceIdentifier;
+import org.wso2.carbon.device.mgt.output.adapter.websocket.constants.WebsocketConstants;
import org.wso2.carbon.device.mgt.output.adapter.websocket.util.PropertyUtils;
import org.wso2.carbon.device.mgt.output.adapter.websocket.util.WebSocketSessionRequest;
import org.wso2.carbon.event.output.adapter.core.exception.OutputEventAdapterException;
@@ -49,7 +48,6 @@ import java.io.InputStream;
import java.security.*;
import java.security.cert.CertificateException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -61,8 +59,6 @@ public class DeviceAuthorizer implements Authorizer {
private static DeviceAccessAuthorizationAdminService deviceAccessAuthorizationAdminService;
private static final String CDMF_SERVER_BASE_CONTEXT = "/api/device-mgt/v1.0";
private static final String DEVICE_MGT_SERVER_URL = "deviceMgtServerUrl";
- private static final String DEVICE_ID = "deviceId";
- private static final String DEVICE_TYPE = "deviceType";
private static Log log = LogFactory.getLog(DeviceAuthorizer.class);
public DeviceAuthorizer() {
}
@@ -84,8 +80,8 @@ public class DeviceAuthorizer implements Authorizer {
public synchronized boolean isAuthorized(AuthenticationInfo authenticationInfo, Session session, String stream) {
WebSocketSessionRequest webSocketSessionRequest = new WebSocketSessionRequest(session);
Map queryParams = webSocketSessionRequest.getQueryParamValuePairs();
- String deviceId = queryParams.get(DEVICE_ID);
- String deviceType = queryParams.get(DEVICE_TYPE);
+ String deviceId = queryParams.get(WebsocketConstants.DEVICE_ID);
+ String deviceType = queryParams.get(WebsocketConstants.DEVICE_TYPE);
if (deviceId != null && !deviceId.isEmpty() && deviceType != null && !deviceType.isEmpty()) {
diff --git a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/constants/WebsocketConstants.java b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/constants/WebsocketConstants.java
index 743e51c213..3af5ca044a 100644
--- a/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/constants/WebsocketConstants.java
+++ b/components/extensions/cdmf-transport-adapters/output/org.wso2.carbon.device.mgt.output.adapter.websocket/src/main/java/org/wso2/carbon/device/mgt/output/adapter/websocket/constants/WebsocketConstants.java
@@ -33,8 +33,6 @@ public class WebsocketConstants {
public static final String TOKEN_VALIDATION_CONTEX = "/services/OAuth2TokenValidationService";
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
- public static final String TOKEN_PARAM = "token";
- public static final String META_DATA = "metaData";
- public static final String PAYLOAD_DATA = "payloadData";
- public static final String EVENT = "event";
+ public static final String DEVICE_ID = "deviceId";
+ public static final String DEVICE_TYPE = "deviceType";
}