|
|
|
@ -20,17 +20,15 @@ package org.wso2.carbon.device.mgt.output.adapter.websocket;
|
|
|
|
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
|
|
import org.json.JSONException;
|
|
|
|
|
import org.json.JSONObject;
|
|
|
|
|
import org.wso2.carbon.context.CarbonContext;
|
|
|
|
|
import org.wso2.carbon.context.PrivilegedCarbonContext;
|
|
|
|
|
import org.wso2.carbon.databridge.commons.Attribute;
|
|
|
|
|
import org.wso2.carbon.databridge.commons.Event;
|
|
|
|
|
import org.wso2.carbon.databridge.commons.StreamDefinition;
|
|
|
|
|
import org.wso2.carbon.device.mgt.output.adapter.websocket.constants.WebsocketConstants;
|
|
|
|
|
import org.wso2.carbon.device.mgt.output.adapter.websocket.internal.WebsocketEventAdaptorServiceDataHolder;
|
|
|
|
|
import org.wso2.carbon.device.mgt.output.adapter.websocket.util.WebsocketEventAdapterConstants;
|
|
|
|
|
import org.wso2.carbon.device.mgt.output.adapter.websocket.util.WebSocketSessionRequest;
|
|
|
|
|
import org.wso2.carbon.device.mgt.output.adapter.websocket.util.WebsocketEventAdapterConstants;
|
|
|
|
|
import org.wso2.carbon.event.output.adapter.core.EventAdapterUtil;
|
|
|
|
|
import org.wso2.carbon.event.output.adapter.core.OutputEventAdapter;
|
|
|
|
|
import org.wso2.carbon.event.output.adapter.core.OutputEventAdapterConfiguration;
|
|
|
|
@ -43,7 +41,6 @@ import org.wso2.carbon.event.stream.core.exception.EventStreamConfigurationExcep
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.Map;
|
|
|
|
|
import java.util.Set;
|
|
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
import java.util.concurrent.CopyOnWriteArrayList;
|
|
|
|
|
import java.util.concurrent.LinkedBlockingDeque;
|
|
|
|
@ -68,9 +65,6 @@ public class WebsocketEventAdapter implements OutputEventAdapter {
|
|
|
|
|
private boolean doLogDroppedMessage;
|
|
|
|
|
|
|
|
|
|
private String streamId;
|
|
|
|
|
private List<Attribute> streamMetaAttributes;
|
|
|
|
|
private List<Attribute> streamCorrelationAttributes;
|
|
|
|
|
private List<Attribute> streamPayloadAttributes;
|
|
|
|
|
|
|
|
|
|
public WebsocketEventAdapter(OutputEventAdapterConfiguration eventAdapterConfiguration, Map<String,
|
|
|
|
|
String> globalProperties) {
|
|
|
|
@ -128,23 +122,15 @@ public class WebsocketEventAdapter implements OutputEventAdapter {
|
|
|
|
|
throw new OutputEventAdapterRuntimeException("UI event adapter needs a output stream id");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// fetch the "streamDefinition" corresponding to the "streamId" and then fetch the different attribute types
|
|
|
|
|
// of the streamDefinition corresponding to the event's streamId. They are required when validating values in
|
|
|
|
|
// the events against the streamDef attributes.
|
|
|
|
|
StreamDefinition streamDefinition = getStreamDefinition(streamId);
|
|
|
|
|
streamMetaAttributes = streamDefinition.getMetaData();
|
|
|
|
|
streamCorrelationAttributes = streamDefinition.getCorrelationData();
|
|
|
|
|
streamPayloadAttributes = streamDefinition.getPayloadData();
|
|
|
|
|
|
|
|
|
|
ConcurrentHashMap<Integer, ConcurrentHashMap<String, String>> tenantSpecifcEventOutputAdapterMap =
|
|
|
|
|
ConcurrentHashMap<Integer, ConcurrentHashMap<String, String>> tenantSpecificEventOutputAdapterMap =
|
|
|
|
|
WebsocketEventAdaptorServiceDataHolder.getTenantSpecificOutputEventStreamAdapterMap();
|
|
|
|
|
|
|
|
|
|
ConcurrentHashMap<String, String> streamSpecifAdapterMap = tenantSpecifcEventOutputAdapterMap.get(tenantId);
|
|
|
|
|
ConcurrentHashMap<String, String> streamSpecifAdapterMap = tenantSpecificEventOutputAdapterMap.get(tenantId);
|
|
|
|
|
|
|
|
|
|
if (streamSpecifAdapterMap == null) {
|
|
|
|
|
streamSpecifAdapterMap = new ConcurrentHashMap<>();
|
|
|
|
|
if (null != tenantSpecifcEventOutputAdapterMap.putIfAbsent(tenantId, streamSpecifAdapterMap)) {
|
|
|
|
|
streamSpecifAdapterMap = tenantSpecifcEventOutputAdapterMap.get(tenantId);
|
|
|
|
|
if (null != tenantSpecificEventOutputAdapterMap.putIfAbsent(tenantId, streamSpecifAdapterMap)) {
|
|
|
|
|
streamSpecifAdapterMap = tenantSpecificEventOutputAdapterMap.get(tenantId);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -200,61 +186,10 @@ public class WebsocketEventAdapter implements OutputEventAdapter {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void publish(Object message, Map<String, String> dynamicProperties) {
|
|
|
|
|
String eventString;
|
|
|
|
|
if (streamSpecificEvents.size() == queueSize) {
|
|
|
|
|
streamSpecificEvents.removeFirst();
|
|
|
|
|
}
|
|
|
|
|
if (message instanceof Event) {
|
|
|
|
|
Event event = (Event) message;
|
|
|
|
|
StringBuilder eventBuilder = new StringBuilder("[");
|
|
|
|
|
eventBuilder.append(event.getTimeStamp());
|
|
|
|
|
|
|
|
|
|
if (event.getMetaData() != null) {
|
|
|
|
|
eventBuilder.append(",");
|
|
|
|
|
Object[] metaData = event.getMetaData();
|
|
|
|
|
for (int i = 0; i < metaData.length; i++) {
|
|
|
|
|
eventBuilder.append("\"");
|
|
|
|
|
eventBuilder.append(metaData[i]);
|
|
|
|
|
eventBuilder.append("\"");
|
|
|
|
|
if (i != (metaData.length - 1)) {
|
|
|
|
|
eventBuilder.append(",");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (event.getCorrelationData() != null) {
|
|
|
|
|
Object[] correlationData = event.getCorrelationData();
|
|
|
|
|
|
|
|
|
|
eventBuilder.append(",");
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < correlationData.length; i++) {
|
|
|
|
|
eventBuilder.append("\"");
|
|
|
|
|
eventBuilder.append(correlationData[i]);
|
|
|
|
|
eventBuilder.append("\"");
|
|
|
|
|
if (i != (correlationData.length - 1)) {
|
|
|
|
|
eventBuilder.append(",");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (event.getPayloadData() != null) {
|
|
|
|
|
Object[] payloadData = event.getPayloadData();
|
|
|
|
|
eventBuilder.append(",");
|
|
|
|
|
for (int i = 0; i < payloadData.length; i++) {
|
|
|
|
|
eventBuilder.append("\"");
|
|
|
|
|
eventBuilder.append(payloadData[i]);
|
|
|
|
|
eventBuilder.append("\"");
|
|
|
|
|
if (i != (payloadData.length - 1)) {
|
|
|
|
|
eventBuilder.append(",");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
eventBuilder.append("]");
|
|
|
|
|
eventString = eventBuilder.toString();
|
|
|
|
|
} else {
|
|
|
|
|
eventString = message.toString();
|
|
|
|
|
}
|
|
|
|
|
String eventString = message.toString();
|
|
|
|
|
|
|
|
|
|
Object[] eventValues = new Object[WebsocketEventAdapterConstants.INDEX_TWO];
|
|
|
|
|
eventValues[WebsocketEventAdapterConstants.INDEX_ZERO] = eventString;
|
|
|
|
@ -262,7 +197,7 @@ public class WebsocketEventAdapter implements OutputEventAdapter {
|
|
|
|
|
streamSpecificEvents.add(eventValues);
|
|
|
|
|
|
|
|
|
|
// fetch all valid sessions checked against any queryParameters provided when subscribing.
|
|
|
|
|
CopyOnWriteArrayList<WebSocketSessionRequest> validSessions = getValidSessions(message);
|
|
|
|
|
CopyOnWriteArrayList<WebSocketSessionRequest> validSessions = getValidSessions(eventString);
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
executorService.execute(new WebSocketSender(validSessions, eventString));
|
|
|
|
@ -332,131 +267,36 @@ public class WebsocketEventAdapter implements OutputEventAdapter {
|
|
|
|
|
* Fetches all valid web-socket sessions from the entire pool of subscribed sessions. The validity is checked
|
|
|
|
|
* against any queryString provided when subscribing to the web-socket endpoint.
|
|
|
|
|
*
|
|
|
|
|
* @param event the current event received and that which needs to be published to subscribed sessions.
|
|
|
|
|
* @param eventString the current event received and that which needs to be published to subscribed sessions.
|
|
|
|
|
* @return a list of all validated web-socket sessions against the queryString values.
|
|
|
|
|
*/
|
|
|
|
|
private CopyOnWriteArrayList<WebSocketSessionRequest> getValidSessions(Object event) {
|
|
|
|
|
private CopyOnWriteArrayList<WebSocketSessionRequest> getValidSessions(String eventString) {
|
|
|
|
|
CopyOnWriteArrayList<WebSocketSessionRequest> validSessions = new CopyOnWriteArrayList<>();
|
|
|
|
|
WebsocketOutputCallbackControllerServiceImpl websocketOutputCallbackControllerServiceImpl =
|
|
|
|
|
WebsocketEventAdaptorServiceDataHolder.getUIOutputCallbackRegisterServiceImpl();
|
|
|
|
|
// get all subscribed web-socket sessions.
|
|
|
|
|
CopyOnWriteArrayList<WebSocketSessionRequest> webSocketSessionUtils =
|
|
|
|
|
CopyOnWriteArrayList<WebSocketSessionRequest> webSocketSessionRequests =
|
|
|
|
|
websocketOutputCallbackControllerServiceImpl.getSessions(tenantId, streamId);
|
|
|
|
|
if (webSocketSessionUtils != null) {
|
|
|
|
|
for (WebSocketSessionRequest webSocketSessionUtil : webSocketSessionUtils) {
|
|
|
|
|
boolean isValidSession;
|
|
|
|
|
if (event instanceof Event) {
|
|
|
|
|
isValidSession = validateEventAgainstSessionFilters((Event) event, webSocketSessionUtil);
|
|
|
|
|
} else {
|
|
|
|
|
isValidSession = validateJsonMessageAgainstEventFilters(event.toString(), webSocketSessionUtil);
|
|
|
|
|
}
|
|
|
|
|
if (isValidSession) {
|
|
|
|
|
validSessions.add(webSocketSessionUtil);
|
|
|
|
|
if (webSocketSessionRequests != null) {
|
|
|
|
|
for (WebSocketSessionRequest webSocketSessionRequest : webSocketSessionRequests) {
|
|
|
|
|
if (validateJsonMessageAgainstEventFilters(eventString, webSocketSessionRequest)) {
|
|
|
|
|
validSessions.add(webSocketSessionRequest);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return validSessions;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Processes the given session's validity to receive the current "event" against any queryParams that was used at
|
|
|
|
|
* the time when the web-socket-session is subscribed. This method can be extended to validate the event against
|
|
|
|
|
* any additional attribute of the given session too.
|
|
|
|
|
*
|
|
|
|
|
* @param event the current event received and that which needs to be published to subscribed
|
|
|
|
|
* sessions.
|
|
|
|
|
* @param webSocketSessionUtil the session which needs validated for its authenticity to receive this event.
|
|
|
|
|
* @return "true" if the session is valid to receive the event else "false".
|
|
|
|
|
*/
|
|
|
|
|
private boolean validateEventAgainstSessionFilters(Event event, WebSocketSessionRequest webSocketSessionUtil) {
|
|
|
|
|
|
|
|
|
|
// fetch the queryString Key:Value pair map of the given session.
|
|
|
|
|
Map<String, String> queryParamValuePairs = webSocketSessionUtil.getQueryParamValuePairs();
|
|
|
|
|
if (queryParamValuePairs != null && !queryParamValuePairs.isEmpty()) {
|
|
|
|
|
// fetch the different attribute values received as part of the current event.
|
|
|
|
|
Object[] eventMetaData = event.getMetaData();
|
|
|
|
|
Object[] eventCorrelationData = event.getCorrelationData();
|
|
|
|
|
Object[] eventPayloadData = event.getPayloadData();
|
|
|
|
|
|
|
|
|
|
if (streamMetaAttributes != null) {
|
|
|
|
|
for (int i = 0; i < streamMetaAttributes.size(); i++) {
|
|
|
|
|
String attributeName = streamMetaAttributes.get(i).getName();
|
|
|
|
|
String queryValue = queryParamValuePairs.get(attributeName);
|
|
|
|
|
|
|
|
|
|
if (queryValue != null &&
|
|
|
|
|
(eventMetaData == null || !eventMetaData[i].toString().equals(queryValue))) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (streamCorrelationAttributes != null) {
|
|
|
|
|
for (int i = 0; i < streamCorrelationAttributes.size(); i++) {
|
|
|
|
|
String attributeName = streamCorrelationAttributes.get(i).getName();
|
|
|
|
|
String queryValue = queryParamValuePairs.get(attributeName);
|
|
|
|
|
|
|
|
|
|
if (queryValue != null &&
|
|
|
|
|
(eventCorrelationData == null || !eventCorrelationData[i].toString().equals(queryValue))) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (streamPayloadAttributes != null) {
|
|
|
|
|
for (int i = 0; i < streamPayloadAttributes.size(); i++) {
|
|
|
|
|
String attributeName = streamPayloadAttributes.get(i).getName();
|
|
|
|
|
String queryValue = queryParamValuePairs.get(attributeName);
|
|
|
|
|
|
|
|
|
|
if (queryValue != null && (eventPayloadData == null || !eventPayloadData[i].toString().equals(
|
|
|
|
|
queryValue))) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private boolean validateJsonMessageAgainstEventFilters(String jsonMessage, WebSocketSessionRequest webSocketSessionRequest) {
|
|
|
|
|
private boolean validateJsonMessageAgainstEventFilters(String eventString, WebSocketSessionRequest webSocketSessionRequest) {
|
|
|
|
|
Map<String, String> queryParamValuePairs = webSocketSessionRequest.getQueryParamValuePairs();
|
|
|
|
|
if (queryParamValuePairs != null && !queryParamValuePairs.isEmpty()) {
|
|
|
|
|
// fetch the different attribute values received as part of the current event.
|
|
|
|
|
Set<String> queryParams = queryParamValuePairs.keySet();
|
|
|
|
|
for (String aQueryParam : queryParams) {
|
|
|
|
|
try {
|
|
|
|
|
|
|
|
|
|
String queryValue = queryParamValuePairs.get(aQueryParam);
|
|
|
|
|
if (queryValue != null && !queryValue.trim().isEmpty()) {
|
|
|
|
|
JSONObject jsonObject = new JSONObject(jsonMessage);
|
|
|
|
|
JSONObject event = jsonObject.getJSONObject(WebsocketConstants.EVENT);
|
|
|
|
|
JSONObject data;
|
|
|
|
|
if (!event.isNull(WebsocketConstants.META_DATA)) {
|
|
|
|
|
data = event.getJSONObject(WebsocketConstants.META_DATA);
|
|
|
|
|
if (!data.isNull(aQueryParam)) {
|
|
|
|
|
String eventValue = data.get(aQueryParam).toString();
|
|
|
|
|
if (eventValue == null || !eventValue.equalsIgnoreCase(queryValue)) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!event.isNull(WebsocketConstants.PAYLOAD_DATA)) {
|
|
|
|
|
data = event.getJSONObject(WebsocketConstants.PAYLOAD_DATA);
|
|
|
|
|
if (!data.isNull(aQueryParam)) {
|
|
|
|
|
String eventValue = data.get(aQueryParam).toString();
|
|
|
|
|
if (eventValue == null || !eventValue.equalsIgnoreCase(queryValue)) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} catch (JSONException e) {
|
|
|
|
|
//do nothing - This exception is thrown when the event does not have query parameter.
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
String deviceId = queryParamValuePairs.get(WebsocketConstants.DEVICE_ID);
|
|
|
|
|
String deviceType = queryParamValuePairs.get(WebsocketConstants.DEVICE_TYPE);
|
|
|
|
|
JSONObject eventObj = new JSONObject(eventString);
|
|
|
|
|
if (deviceId != null && !deviceId.equals(eventObj.getString(WebsocketConstants.DEVICE_ID))) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
if (deviceType != null && !deviceType.equals(eventObj.getString(WebsocketConstants.DEVICE_TYPE))) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|