Adding JSON message format support for secure-websocket.

revert-dabc3590
sinthuja 8 years ago
parent 0f17b2e766
commit 4da1d5ec3a

@ -75,6 +75,8 @@ public class SuperTenantSubscriptionEndpoint extends SubscriptionEndpoint {
} finally { } finally {
PrivilegedCarbonContext.endTenantFlow(); PrivilegedCarbonContext.endTenantFlow();
} }
} else {
log.info("Failed to authorize the connection for the stream : "+ streamName+" , version : "+ version);
} }
} else { } else {
try { try {

@ -74,6 +74,12 @@ public class TenantSubscriptionEndpoint extends SubscriptionEndpoint {
} finally { } finally {
PrivilegedCarbonContext.endTenantFlow(); PrivilegedCarbonContext.endTenantFlow();
} }
} else {
try {
session.close(new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, "Unauthorized Access"));
} catch (IOException e) {
log.error("Failed to disconnect the unauthorized client.", e);
}
} }
} else { } else {
try { try {

@ -20,11 +20,14 @@ package org.wso2.carbon.device.mgt.output.adapter.websocket;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.json.JSONException;
import org.json.JSONObject;
import org.wso2.carbon.context.CarbonContext; import org.wso2.carbon.context.CarbonContext;
import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.databridge.commons.Attribute; import org.wso2.carbon.databridge.commons.Attribute;
import org.wso2.carbon.databridge.commons.Event; import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.StreamDefinition; import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.device.mgt.output.adapter.websocket.constants.WebsocketConstants;
import org.wso2.carbon.device.mgt.output.adapter.websocket.internal.UIEventAdaptorServiceDataHolder; import org.wso2.carbon.device.mgt.output.adapter.websocket.internal.UIEventAdaptorServiceDataHolder;
import org.wso2.carbon.device.mgt.output.adapter.websocket.util.UIEventAdapterConstants; import org.wso2.carbon.device.mgt.output.adapter.websocket.util.UIEventAdapterConstants;
import org.wso2.carbon.device.mgt.output.adapter.websocket.util.WebSocketSessionRequest; import org.wso2.carbon.device.mgt.output.adapter.websocket.util.WebSocketSessionRequest;
@ -40,6 +43,7 @@ import org.wso2.carbon.event.stream.core.exception.EventStreamConfigurationExcep
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingDeque;
@ -116,7 +120,7 @@ public class UIEventAdapter implements OutputEventAdapter {
} }
executorService = new ThreadPoolExecutor(minThread, maxThread, defaultKeepAliveTime, TimeUnit.MILLISECONDS, executorService = new ThreadPoolExecutor(minThread, maxThread, defaultKeepAliveTime, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(jobQueSize)); new LinkedBlockingQueue<Runnable>(jobQueSize));
} }
streamId = eventAdapterConfiguration.getOutputStreamIdOfWso2eventMessageFormat(); streamId = eventAdapterConfiguration.getOutputStreamIdOfWso2eventMessageFormat();
@ -196,71 +200,75 @@ public class UIEventAdapter implements OutputEventAdapter {
@Override @Override
public void publish(Object message, Map<String, String> dynamicProperties) { public void publish(Object message, Map<String, String> dynamicProperties) {
Event event = (Event) message; String eventString;
StringBuilder eventBuilder = new StringBuilder("[");
if (streamSpecificEvents.size() == queueSize) { if (streamSpecificEvents.size() == queueSize) {
streamSpecificEvents.removeFirst(); streamSpecificEvents.removeFirst();
} }
if (message instanceof Event) {
eventBuilder.append(event.getTimeStamp()); Event event = (Event) message;
StringBuilder eventBuilder = new StringBuilder("[");
if (event.getMetaData() != null) { eventBuilder.append(event.getTimeStamp());
eventBuilder.append(",");
Object[] metaData = event.getMetaData(); if (event.getMetaData() != null) {
for (int i = 0; i < metaData.length; i++) { eventBuilder.append(",");
eventBuilder.append("\""); Object[] metaData = event.getMetaData();
eventBuilder.append(metaData[i]); for (int i = 0; i < metaData.length; i++) {
eventBuilder.append("\""); eventBuilder.append("\"");
if (i != (metaData.length - 1)) { eventBuilder.append(metaData[i]);
eventBuilder.append(","); eventBuilder.append("\"");
if (i != (metaData.length - 1)) {
eventBuilder.append(",");
}
} }
} }
}
if (event.getCorrelationData() != null) { if (event.getCorrelationData() != null) {
Object[] correlationData = event.getCorrelationData(); Object[] correlationData = event.getCorrelationData();
eventBuilder.append(","); eventBuilder.append(",");
for (int i = 0; i < correlationData.length; i++) { for (int i = 0; i < correlationData.length; i++) {
eventBuilder.append("\""); eventBuilder.append("\"");
eventBuilder.append(correlationData[i]); eventBuilder.append(correlationData[i]);
eventBuilder.append("\""); eventBuilder.append("\"");
if (i != (correlationData.length - 1)) { if (i != (correlationData.length - 1)) {
eventBuilder.append(","); eventBuilder.append(",");
}
} }
} }
}
if (event.getPayloadData() != null) { if (event.getPayloadData() != null) {
Object[] payloadData = event.getPayloadData(); Object[] payloadData = event.getPayloadData();
eventBuilder.append(","); eventBuilder.append(",");
for (int i = 0; i < payloadData.length; i++) { for (int i = 0; i < payloadData.length; i++) {
eventBuilder.append("\""); eventBuilder.append("\"");
eventBuilder.append(payloadData[i]); eventBuilder.append(payloadData[i]);
eventBuilder.append("\""); eventBuilder.append("\"");
if (i != (payloadData.length - 1)) { if (i != (payloadData.length - 1)) {
eventBuilder.append(","); eventBuilder.append(",");
}
} }
} }
eventBuilder.append("]");
eventString = eventBuilder.toString();
} else {
eventString = message.toString();
} }
eventBuilder.append("]");
String eventString = eventBuilder.toString();
Object[] eventValues = new Object[UIEventAdapterConstants.INDEX_TWO]; Object[] eventValues = new Object[UIEventAdapterConstants.INDEX_TWO];
eventValues[UIEventAdapterConstants.INDEX_ZERO] = eventString; eventValues[UIEventAdapterConstants.INDEX_ZERO] = eventString;
eventValues[UIEventAdapterConstants.INDEX_ONE] = System.currentTimeMillis(); eventValues[UIEventAdapterConstants.INDEX_ONE] = System.currentTimeMillis();
streamSpecificEvents.add(eventValues); streamSpecificEvents.add(eventValues);
// fetch all valid sessions checked against any queryParameters provided when subscribing. // fetch all valid sessions checked against any queryParameters provided when subscribing.
CopyOnWriteArrayList<WebSocketSessionRequest> validSessions = getValidSessions(event); CopyOnWriteArrayList<WebSocketSessionRequest> validSessions = getValidSessions(message);
try { try {
executorService.execute(new WebSocketSender(validSessions, eventString)); executorService.execute(new WebSocketSender(validSessions, eventString));
} catch (RejectedExecutionException e) { } catch (RejectedExecutionException e) {
EventAdapterUtil.logAndDrop(eventAdapterConfiguration.getName(), message, "Job queue is full", e, log, EventAdapterUtil.logAndDrop(eventAdapterConfiguration.getName(), message, "Job queue is full", e, log,
tenantId); tenantId);
} }
} }
@ -327,7 +335,7 @@ public class UIEventAdapter implements OutputEventAdapter {
* @param event the current event received and that which needs to be published to subscribed sessions. * @param event the current event received and that which needs to be published to subscribed sessions.
* @return a list of all validated web-socket sessions against the queryString values. * @return a list of all validated web-socket sessions against the queryString values.
*/ */
private CopyOnWriteArrayList<WebSocketSessionRequest> getValidSessions(Event event) { private CopyOnWriteArrayList<WebSocketSessionRequest> getValidSessions(Object event) {
CopyOnWriteArrayList<WebSocketSessionRequest> validSessions = new CopyOnWriteArrayList<>(); CopyOnWriteArrayList<WebSocketSessionRequest> validSessions = new CopyOnWriteArrayList<>();
UIOutputCallbackControllerServiceImpl uiOutputCallbackControllerServiceImpl = UIOutputCallbackControllerServiceImpl uiOutputCallbackControllerServiceImpl =
UIEventAdaptorServiceDataHolder.getUIOutputCallbackRegisterServiceImpl(); UIEventAdaptorServiceDataHolder.getUIOutputCallbackRegisterServiceImpl();
@ -336,7 +344,12 @@ public class UIEventAdapter implements OutputEventAdapter {
uiOutputCallbackControllerServiceImpl.getSessions(tenantId, streamId); uiOutputCallbackControllerServiceImpl.getSessions(tenantId, streamId);
if (webSocketSessionUtils != null) { if (webSocketSessionUtils != null) {
for (WebSocketSessionRequest webSocketSessionUtil : webSocketSessionUtils) { for (WebSocketSessionRequest webSocketSessionUtil : webSocketSessionUtils) {
boolean isValidSession = validateEventAgainstSessionFilters(event, webSocketSessionUtil); boolean isValidSession;
if (event instanceof Event) {
isValidSession = validateEventAgainstSessionFilters((Event) event, webSocketSessionUtil);
} else {
isValidSession = validateJsonMessageAgainstEventFilters(event.toString(), webSocketSessionUtil);
}
if (isValidSession) { if (isValidSession) {
validSessions.add(webSocketSessionUtil); validSessions.add(webSocketSessionUtil);
} }
@ -351,8 +364,8 @@ public class UIEventAdapter implements OutputEventAdapter {
* the time when the web-socket-session is subscribed. This method can be extended to validate the event against * the time when the web-socket-session is subscribed. This method can be extended to validate the event against
* any additional attribute of the given session too. * any additional attribute of the given session too.
* *
* @param event the current event received and that which needs to be published to subscribed * @param event the current event received and that which needs to be published to subscribed
* sessions. * sessions.
* @param webSocketSessionUtil the session which needs validated for its authenticity to receive this event. * @param webSocketSessionUtil the session which needs validated for its authenticity to receive this event.
* @return "true" if the session is valid to receive the event else "false". * @return "true" if the session is valid to receive the event else "false".
*/ */
@ -405,6 +418,36 @@ public class UIEventAdapter implements OutputEventAdapter {
return true; return true;
} }
private boolean validateJsonMessageAgainstEventFilters(String jsonMessage, WebSocketSessionRequest webSocketSessionRequest) {
Map<String, String> queryParamValuePairs = webSocketSessionRequest.getQueryParamValuePairs();
if (queryParamValuePairs != null && !queryParamValuePairs.isEmpty()) {
// fetch the different attribute values received as part of the current event.
Set<String> queryParams = queryParamValuePairs.keySet();
for (String aQueryParam : queryParams) {
if (!aQueryParam.equalsIgnoreCase(WebsocketConstants.TOKEN_PARAM)) {
try {
String queryValue = queryParamValuePairs.get(aQueryParam);
if (queryValue != null && !queryValue.trim().isEmpty()) {
JSONObject jsonObject = new JSONObject(jsonMessage);
String eventValue = jsonObject.getString(aQueryParam);
if (eventValue == null || !eventValue.equalsIgnoreCase(queryValue)) {
return false;
}
}
} catch (JSONException e) {
if (log.isDebugEnabled()) {
log.debug("Unable validate the stream filter properties for event : " + jsonMessage
+ " ", e);
}
return false;
}
}
}
}
return true;
}
private class WebSocketSender implements Runnable { private class WebSocketSender implements Runnable {
private String message; private String message;
@ -436,13 +479,13 @@ public class UIEventAdapter implements OutputEventAdapter {
webSocketSessionUtil.getSession().getBasicRemote().sendText(message); webSocketSessionUtil.getSession().getBasicRemote().sendText(message);
} catch (IOException e) { } catch (IOException e) {
EventAdapterUtil.logAndDrop(eventAdapterConfiguration.getName(), message, EventAdapterUtil.logAndDrop(eventAdapterConfiguration.getName(), message,
"Cannot send to endpoint", e, log, tenantId); "Cannot send to endpoint", e, log, tenantId);
} }
} }
} }
} else if (doLogDroppedMessage) { } else if (doLogDroppedMessage) {
EventAdapterUtil.logAndDrop(eventAdapterConfiguration.getName(), message, "No clients registered", log, EventAdapterUtil.logAndDrop(eventAdapterConfiguration.getName(), message, "No clients registered", log,
tenantId); tenantId);
doLogDroppedMessage = false; doLogDroppedMessage = false;
} }
} }

@ -51,6 +51,7 @@ public class UIEventAdapterFactory extends OutputEventAdapterFactory {
public List<String> getSupportedMessageFormats() { public List<String> getSupportedMessageFormats() {
List<String> supportedMessageFormats = new ArrayList<String>(); List<String> supportedMessageFormats = new ArrayList<String>();
supportedMessageFormats.add(MessageType.WSO2EVENT); supportedMessageFormats.add(MessageType.WSO2EVENT);
supportedMessageFormats.add(MessageType.JSON);
return supportedMessageFormats; return supportedMessageFormats;
} }

@ -32,4 +32,5 @@ public class WebsocketConstants {
public static final String TOKEN_VALIDATION_ENDPOINT_URL = "tokenValidationEndpoint"; public static final String TOKEN_VALIDATION_ENDPOINT_URL = "tokenValidationEndpoint";
public static final String USERNAME = "username"; public static final String USERNAME = "username";
public static final String PASSWORD = "password"; public static final String PASSWORD = "password";
public static final String TOKEN_PARAM = "token";
} }

Loading…
Cancel
Save