diff --git a/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/pom.xml b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/pom.xml new file mode 100644 index 000000000..449706e39 --- /dev/null +++ b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/pom.xml @@ -0,0 +1,78 @@ + + + + + + + iot-base-plugin + org.wso2.carbon.devicemgt-plugins + 2.1.1-SNAPSHOT + ../pom.xml + + + 4.0.0 + org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint + war + WSO2 - Webapp for UI Output Event Adapter + http://wso2.org + + + + junit + junit + test + + + org.apache.tomcat + tomcat-websocket-api + + + org.wso2.carbon.devicemgt-plugins + org.wso2.carbon.device.mgt.iot.output.adapter.ui + provided + + + javax.ws.rs + javax.ws.rs-api + + + org.apache.cxf + cxf-rt-frontend-jaxrs + + + org.apache.httpcomponents.wso2 + httpcore + provided + + + org.wso2.orbit.org.apache.httpcomponents + httpclient + provided + + + org.wso2.carbon.identity + org.wso2.carbon.identity.oauth.stub + provided + + + + + secured-outputui + + diff --git a/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/java/SubscriptionEndpoint.java b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/java/SubscriptionEndpoint.java new file mode 100644 index 000000000..9439b422c --- /dev/null +++ b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/java/SubscriptionEndpoint.java @@ -0,0 +1,73 @@ +/* + * + * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import util.ServiceHolder; + +import javax.websocket.CloseReason; +import javax.websocket.Session; + +/** + * Interface for subscription and un-subscription for web socket + */ + +public class SubscriptionEndpoint { + + private static final Log log = LogFactory.getLog(SubscriptionEndpoint.class); + + public SubscriptionEndpoint() { + + } + + /** + * Web socket onClose - Remove the registered sessions + * + * @param session - Users registered session. + * @param reason - Status code for web-socket close. + * @param streamName - StreamName extracted from the ws url. + * @param version - Version extracted from the ws url. + */ + public void onClose(Session session, CloseReason reason, String streamName, String version) { + if (log.isDebugEnabled()) { + log.debug("Closing a WebSocket due to " + reason.getReasonPhrase() + ", for session ID:" + session.getId + () + + ", for request URI - " + session.getRequestURI()); + } + ServiceHolder.getInstance().getUiOutputCallbackControllerService().unsubscribeWebsocket(streamName, version, + session); + } + + /** + * Web socket onError - Remove the registered sessions + * + * @param session - Users registered session. + * @param throwable - Status code for web-socket close. + * @param streamName - StreamName extracted from the ws url. + * @param version - Version extracted from the ws url. + */ + public void onError(Session session, Throwable throwable, String streamName, String version) { + log.error( + "Error occurred in session ID: " + session.getId() + ", for request URI - " + session.getRequestURI() + + ", " + throwable.getMessage(), throwable); + ServiceHolder.getInstance().getUiOutputCallbackControllerService().unsubscribeWebsocket(streamName, version, + session); + } +} diff --git a/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/java/SuperTenantSubscriptionEndpoint.java b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/java/SuperTenantSubscriptionEndpoint.java new file mode 100644 index 000000000..546473cd3 --- /dev/null +++ b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/java/SuperTenantSubscriptionEndpoint.java @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import oauth.OAuthTokenValdiator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.wso2.carbon.context.PrivilegedCarbonContext; +import util.ServiceHolder; +import org.wso2.carbon.utils.multitenancy.MultitenantConstants; +import util.AuthenticationInfo; + +import javax.websocket.CloseReason; +import javax.websocket.OnClose; +import javax.websocket.OnError; +import javax.websocket.OnMessage; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; + +/** + * Connect to web socket with Super tenant + */ + +@ServerEndpoint(value = "/{streamname}/{version}") +public class SuperTenantSubscriptionEndpoint extends SubscriptionEndpoint { + + private static final Log log = LogFactory.getLog(SuperTenantSubscriptionEndpoint.class); + + /** + * Web socket onOpen - When client sends a message + * + * @param session - Users registered session. + * @param streamName - StreamName extracted from the ws url. + * @param version - Version extracted from the ws url. + */ + @OnOpen + public void onOpen(Session session, @PathParam("streamname") String streamName, + @PathParam("version") String version) { + if (log.isDebugEnabled()) { + log.debug("WebSocket opened, for Session id: " + session.getId() + ", for the Stream:" + streamName); + } + AuthenticationInfo authenticationInfo = OAuthTokenValdiator.getInstance().validateToken(session); + //TODO Authorization + if (authenticationInfo != null && authenticationInfo.isAuthenticated()) { + try { + PrivilegedCarbonContext.startTenantFlow(); + PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(MultitenantConstants.SUPER_TENANT_ID); + ServiceHolder.getInstance().getUiOutputCallbackControllerService().subscribeWebsocket(streamName, + version, session); + } finally { + PrivilegedCarbonContext.endTenantFlow(); + } + } else { + try { + session.close(new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, "Unauthorized Access")); + } catch (IOException e) { + log.error("Failed to disconnect the unauthorized client."); + } + } + } + + /** + * Web socket onMessage - When client sens a message + * + * @param session - Users registered session. + * @param message - Status code for web-socket close. + * @param streamName - StreamName extracted from the ws url. + */ + @OnMessage + public void onMessage(Session session, String message, @PathParam("streamname") String streamName) { + if (log.isDebugEnabled()) { + log.debug("Received and dropped message from client. Message: " + message + ", " + + "for Session id: " + session.getId() + ", for the Stream:" + streamName); + } + } + + /** + * Web socket onClose - Remove the registered sessions + * + * @param session - Users registered session. + * @param reason - Status code for web-socket close. + * @param streamName - StreamName extracted from the ws url. + * @param version - Version extracted from the ws url. + */ + @OnClose + public void onClose(Session session, CloseReason reason, @PathParam("streamname") String streamName, + @PathParam("version") String version) { + try { + PrivilegedCarbonContext.startTenantFlow(); + PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(MultitenantConstants.SUPER_TENANT_ID); + super.onClose(session, reason, streamName, version); + } finally { + PrivilegedCarbonContext.endTenantFlow(); + } + } + + /** + * Web socket onError - Remove the registered sessions + * + * @param session - Users registered session. + * @param throwable - Status code for web-socket close. + * @param streamName - StreamName extracted from the ws url. + * @param version - Version extracted from the ws url. + */ + @OnError + public void onError(Session session, Throwable throwable, @PathParam("streamname") String streamName, + @PathParam("version") String version) { + try { + PrivilegedCarbonContext.startTenantFlow(); + PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(MultitenantConstants.SUPER_TENANT_ID); + super.onError(session, throwable, streamName, version); + } finally { + PrivilegedCarbonContext.endTenantFlow(); + } + } + +} diff --git a/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/java/TenantSubscriptionEndpoint.java b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/java/TenantSubscriptionEndpoint.java new file mode 100644 index 000000000..837e75f00 --- /dev/null +++ b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/java/TenantSubscriptionEndpoint.java @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import oauth.OAuthTokenValdiator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.wso2.carbon.context.PrivilegedCarbonContext; +import util.ServiceHolder; +import util.AuthenticationInfo; + +import javax.websocket.CloseReason; +import javax.websocket.OnClose; +import javax.websocket.OnError; +import javax.websocket.OnMessage; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; + +/** + * Connect to web socket with a tenant + */ + +@ServerEndpoint(value = "/t/{tdomain}/{streamname}/{version}") +public class TenantSubscriptionEndpoint extends SubscriptionEndpoint { + + private static final Log log = LogFactory.getLog(TenantSubscriptionEndpoint.class); + + /** + * Web socket onOpen - When client sends a message + * + * @param session - Users registered session. + * @param streamName - StreamName extracted from the ws url. + * @param version - Version extracted from the ws url. + * @param tdomain - Tenant domain extracted from ws url. + */ + @OnOpen + public void onOpen (Session session, @PathParam("streamname") String streamName , + @PathParam("version") String version, @PathParam("tdomain") String tdomain) { + if (log.isDebugEnabled()) { + log.debug("WebSocket opened, for Session id: "+session.getId()+", for the Stream:"+streamName); + } + AuthenticationInfo authenticationInfo = OAuthTokenValdiator.getInstance().validateToken(session); + //TODO Authorization + if (authenticationInfo != null && authenticationInfo.isAuthenticated()) { + try { + PrivilegedCarbonContext.startTenantFlow(); + PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(tdomain, true); + ServiceHolder.getInstance().getUiOutputCallbackControllerService().subscribeWebsocket(streamName, + version, session); + } finally { + PrivilegedCarbonContext.endTenantFlow(); + } + } else { + try { + session.close(new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, "Unauthorized Access")); + } catch (IOException e) { + log.error("Failed to disconnect the unauthorized client."); + } + } + } + + /** + * Web socket onMessage - When client sens a message + * + * @param session - Users registered session. + * @param message - Status code for web-socket close. + * @param streamName - StreamName extracted from the ws url. + */ + @OnMessage + public void onMessage (Session session, String message, @PathParam("streamname") String streamName, @PathParam("tdomain") String tdomain) { + if (log.isDebugEnabled()) { + log.debug("Received and dropped message from client. Message: " + message+", for Session id: "+session.getId()+", for tenant domain"+tdomain+", for the Adaptor:"+streamName); + } + } + + /** + * Web socket onClose - Remove the registered sessions + * + * @param session - Users registered session. + * @param reason - Status code for web-socket close. + * @param streamName - StreamName extracted from the ws url. + * @param version - Version extracted from the ws url. + */ + @OnClose + public void onClose (Session session, CloseReason reason, @PathParam("streamname") String streamName, + @PathParam("version") String version, @PathParam("tdomain") String tdomain) { + + try { + PrivilegedCarbonContext.getThreadLocalCarbonContext().startTenantFlow(); + PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(tdomain,true); + super.onClose(session, reason, streamName, version); + } finally { + PrivilegedCarbonContext.getThreadLocalCarbonContext().endTenantFlow(); + } + } + + /** + * Web socket onError - Remove the registered sessions + * + * @param session - Users registered session. + * @param throwable - Status code for web-socket close. + * @param streamName - StreamName extracted from the ws url. + * @param version - Version extracted from the ws url. + */ + @OnError + public void onError (Session session, Throwable throwable, @PathParam("streamname") String streamName, + @PathParam("version") String version, @PathParam("tdomain") String tdomain) { + + try { + PrivilegedCarbonContext.getThreadLocalCarbonContext().startTenantFlow(); + PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(tdomain, true); + super.onError(session, throwable, streamName, version); + } finally { + PrivilegedCarbonContext.getThreadLocalCarbonContext().endTenantFlow(); + } + } +} diff --git a/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/java/oauth/OAuthTokenValdiator.java b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/java/oauth/OAuthTokenValdiator.java new file mode 100644 index 000000000..e33bb39fa --- /dev/null +++ b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/java/oauth/OAuthTokenValdiator.java @@ -0,0 +1,205 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ + +package oauth; + +import org.apache.axis2.context.ServiceContext; +import org.apache.axis2.transport.http.HTTPConstants; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.pool.impl.GenericObjectPool; +import org.wso2.carbon.identity.oauth2.stub.OAuth2TokenValidationServiceStub; +import org.wso2.carbon.identity.oauth2.stub.dto.OAuth2TokenValidationRequestDTO; +import org.wso2.carbon.identity.oauth2.stub.dto.OAuth2TokenValidationRequestDTO_OAuth2AccessToken; +import org.wso2.carbon.identity.oauth2.stub.dto.OAuth2TokenValidationResponseDTO; +import org.wso2.carbon.user.api.UserStoreException; +import org.wso2.carbon.utils.CarbonUtils; +import org.wso2.carbon.utils.multitenancy.MultitenantUtils; +import util.AuthenticationInfo; + +import javax.websocket.Session; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.rmi.RemoteException; +import java.util.Properties; + +/** + * This acts as a contract point for OAuth token validation. + */ +public class OAuthTokenValdiator { + + private static String cookie; + private GenericObjectPool stubs; + private static Log log = LogFactory.getLog(OAuthTokenValdiator.class); + private static final String WEBSOCKET_CONFIG_LOCATION = + CarbonUtils.getEtcCarbonConfigDirPath() + File.separator + "websocket-validation.properties"; + private static final String QUERY_STRING_SEPERATOR = "&"; + private static final String QUERY_KEY_VALUE_SEPERATOR = "="; + private static final String TOKEN_TYPE = "bearer"; + private static final String TOKEN_IDENTIFIER = "token"; + private static OAuthTokenValdiator oAuthTokenValdiator; + + public static OAuthTokenValdiator getInstance() { + if (oAuthTokenValdiator == null) { + synchronized (OAuthTokenValdiator.class) { + if (oAuthTokenValdiator == null) { + oAuthTokenValdiator = new OAuthTokenValdiator(); + } + } + } + return oAuthTokenValdiator; + } + + private OAuthTokenValdiator() { + try { + Properties properties = getWebSocketConfig(); + this.stubs = new GenericObjectPool(new OAuthTokenValidaterStubFactory(properties)); + } catch (IOException e) { + log.error("Failed to parse the web socket config file " + WEBSOCKET_CONFIG_LOCATION); + } + } + + /** + * This method gets a string accessToken and validates it + * + * @param session which need to be validated. + * @return AuthenticationInfo with the validated results. + */ + public AuthenticationInfo validateToken(Session session) { + String token = getTokenFromSession(session); + if (token == null) { + AuthenticationInfo authenticationInfo = new AuthenticationInfo(); + authenticationInfo.setAuthenticated(false); + return authenticationInfo; + } + OAuth2TokenValidationServiceStub tokenValidationServiceStub = null; + try { + Object stub = this.stubs.borrowObject(); + if (stub != null) { + tokenValidationServiceStub = (OAuth2TokenValidationServiceStub) stub; + if (cookie != null) { + tokenValidationServiceStub._getServiceClient().getOptions().setProperty( + HTTPConstants.COOKIE_STRING, cookie); + } + return getAuthenticationInfo(token, tokenValidationServiceStub); + } else { + log.warn("Stub initialization failed."); + } + } catch (RemoteException e) { + log.error("Error on connecting with the validation endpoint.", e); + } catch (Exception e) { + log.error("Error occurred in borrowing an validation stub from the pool.", e); + + } finally { + try { + if (tokenValidationServiceStub != null) { + this.stubs.returnObject(tokenValidationServiceStub); + } + } catch (Exception e) { + log.warn("Error occurred while returning the object back to the oauth token validation service " + + "stub pool.", e); + } + } + AuthenticationInfo authenticationInfo = new AuthenticationInfo(); + authenticationInfo.setAuthenticated(false); + return authenticationInfo; + } + + /** + * This creates an AuthenticationInfo object that is used for authorization. This method will validate the token + * and + * sets the required parameters to the object. + * + * @param token that needs to be validated. + * @param tokenValidationServiceStub stub that is used to call the external service. + * @return AuthenticationInfo This contains the information related to authenticated client. + * @throws RemoteException that triggers when failing to call the external service.. + */ + private AuthenticationInfo getAuthenticationInfo(String token, + OAuth2TokenValidationServiceStub tokenValidationServiceStub) + throws RemoteException, UserStoreException { + AuthenticationInfo authenticationInfo = new AuthenticationInfo(); + OAuth2TokenValidationRequestDTO validationRequest = new OAuth2TokenValidationRequestDTO(); + OAuth2TokenValidationRequestDTO_OAuth2AccessToken accessToken = + new OAuth2TokenValidationRequestDTO_OAuth2AccessToken(); + accessToken.setTokenType(TOKEN_TYPE); + accessToken.setIdentifier(token); + validationRequest.setAccessToken(accessToken); + boolean authenticated; + OAuth2TokenValidationResponseDTO tokenValidationResponse; + tokenValidationResponse = tokenValidationServiceStub.validate(validationRequest); + if (tokenValidationResponse == null) { + authenticationInfo.setAuthenticated(false); + return authenticationInfo; + } + authenticated = tokenValidationResponse.getValid(); + if (authenticated) { + String authorizedUser = tokenValidationResponse.getAuthorizedUser(); + String username = MultitenantUtils.getTenantAwareUsername(authorizedUser); + String tenantDomain = MultitenantUtils.getTenantDomain(authorizedUser); + authenticationInfo.setUsername(username); + authenticationInfo.setTenantDomain(tenantDomain); + } else { + if (log.isDebugEnabled()) { + log.debug("Token validation failed for token: " + token); + } + } + ServiceContext serviceContext = tokenValidationServiceStub._getServiceClient() + .getLastOperationContext().getServiceContext(); + cookie = (String) serviceContext.getProperty(HTTPConstants.COOKIE_STRING); + authenticationInfo.setAuthenticated(authenticated); + return authenticationInfo; + } + + /** + * Retrieve JWT configs from registry. + */ + private Properties getWebSocketConfig() throws IOException { + Properties properties = new Properties(); + File configFile =new File(WEBSOCKET_CONFIG_LOCATION); + if (configFile.exists()) { + InputStream fileInputStream = new FileInputStream(configFile); + if (fileInputStream != null) { + properties.load(fileInputStream); + } + } + return properties; + } + + /** + * @param session of the user. + * @return retreive the token from the query string + */ + private String getTokenFromSession(Session session) { + String queryString = session.getQueryString(); + if (queryString != null) { + String[] allQueryParamPairs = queryString.split(QUERY_STRING_SEPERATOR); + + for (String keyValuePair : allQueryParamPairs) { + String[] queryParamPair = keyValuePair.split(QUERY_KEY_VALUE_SEPERATOR); + + if (queryParamPair.length != 2) { + log.warn("Invalid query string [" + queryString + "] passed in."); + break; + } + if (queryParamPair[0].equals(TOKEN_IDENTIFIER)) { + return queryParamPair[1]; + } + } + } + return null; + } +} diff --git a/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/java/oauth/OAuthTokenValidaterStubFactory.java b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/java/oauth/OAuthTokenValidaterStubFactory.java new file mode 100644 index 000000000..a43f87472 --- /dev/null +++ b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/java/oauth/OAuthTokenValidaterStubFactory.java @@ -0,0 +1,177 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package oauth; + +import oauth.exception.OAuthTokenValidationException; +import org.apache.axis2.AxisFault; +import org.apache.axis2.Constants; +import org.apache.axis2.client.Options; +import org.apache.axis2.client.ServiceClient; +import org.apache.axis2.transport.http.HTTPConstants; +import org.apache.axis2.transport.http.HttpTransportProperties; +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.HttpConnectionManager; +import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; +import org.apache.commons.httpclient.contrib.ssl.EasySSLProtocolSocketFactory; +import org.apache.commons.httpclient.params.HttpConnectionManagerParams; +import org.apache.commons.httpclient.protocol.Protocol; +import org.apache.commons.httpclient.protocol.ProtocolSocketFactory; +import org.apache.commons.pool.BasePoolableObjectFactory; +import org.apache.log4j.Logger; +import org.wso2.carbon.identity.oauth2.stub.OAuth2TokenValidationServiceStub; +import util.UIConstants; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.security.GeneralSecurityException; +import java.util.Properties; + +/** + * This follows object pool pattern to manage the stub for oauth validation service. + */ +public class OAuthTokenValidaterStubFactory extends BasePoolableObjectFactory { + private static final Logger log = Logger.getLogger(OAuthTokenValidaterStubFactory.class); + private HttpClient httpClient; + Properties tokenValidationProperties; + + + public OAuthTokenValidaterStubFactory(Properties tokenValidationProperties) { + this.tokenValidationProperties = tokenValidationProperties; + this.httpClient = createHttpClient(); + } + + /** + * This creates a OAuth2TokenValidationServiceStub object to the pool. + * + * @return an OAuthValidationStub object + * @throws Exception thrown when creating the object. + */ + @Override + public Object makeObject() throws Exception { + return this.generateStub(); + } + + /** + * This is used to clean up the OAuth validation stub and releases to the object pool. + * + * @param o object that needs to be released. + * @throws Exception throws when failed to release to the pool + */ + @Override + public void passivateObject(Object o) throws Exception { + if (o instanceof OAuth2TokenValidationServiceStub) { + OAuth2TokenValidationServiceStub stub = (OAuth2TokenValidationServiceStub) o; + stub._getServiceClient().cleanupTransport(); + } + } + + /** + * This is used to create a stub which will be triggered through object pool factory, which will create an + * instance of it. + * + * @return OAuth2TokenValidationServiceStub stub that is used to call an external service. + * @throws OAuthTokenValidationException will be thrown when initialization failed. + */ + private OAuth2TokenValidationServiceStub generateStub() throws OAuthTokenValidationException { + OAuth2TokenValidationServiceStub stub; + try { + URL hostURL = new URL(tokenValidationProperties.getProperty((UIConstants.TOKEN_VALIDATION_ENDPOINT_URL))); + if (hostURL != null) { + stub = new OAuth2TokenValidationServiceStub(hostURL.toString()); + if (stub != null) { + ServiceClient client = stub._getServiceClient(); + client.getServiceContext().getConfigurationContext().setProperty( + HTTPConstants.CACHED_HTTP_CLIENT, httpClient); + + HttpTransportProperties.Authenticator auth = + new HttpTransportProperties.Authenticator(); + auth.setPreemptiveAuthentication(true); + String username = tokenValidationProperties.getProperty(UIConstants.USERNAME); + String password = tokenValidationProperties.getProperty(UIConstants.PASSWORD); + auth.setPassword(username); + auth.setUsername(password); + Options options = client.getOptions(); + options.setProperty(HTTPConstants.AUTHENTICATE, auth); + options.setProperty(HTTPConstants.REUSE_HTTP_CLIENT, Constants.VALUE_TRUE); + client.setOptions(options); + if (hostURL.getProtocol().equals("https")) { + // set up ssl factory since axis2 https transport is used. + EasySSLProtocolSocketFactory sslProtocolSocketFactory = + createProtocolSocketFactory(); + Protocol authhttps = new Protocol(hostURL.getProtocol(), + (ProtocolSocketFactory) sslProtocolSocketFactory, + hostURL.getPort()); + Protocol.registerProtocol(hostURL.getProtocol(), authhttps); + options.setProperty(HTTPConstants.CUSTOM_PROTOCOL_HANDLER, authhttps); + } + } else { + String errorMsg = "OAuth Validation instanization failed."; + throw new OAuthTokenValidationException(errorMsg); + } + } else { + String errorMsg = "host url is invalid"; + throw new OAuthTokenValidationException(errorMsg); + } + } catch (AxisFault axisFault) { + throw new OAuthTokenValidationException( + "Error occurred while creating the OAuth2TokenValidationServiceStub.", axisFault); + } catch (MalformedURLException e) { + throw new OAuthTokenValidationException( + "Error occurred while parsing token endpoint URL", e); + } + + return stub; + } + + /** + * This is required to create a trusted connection with the external entity. + * Have to manually configure it since we use CommonHTTPTransport(axis2 transport) in axis2. + * + * @return an EasySSLProtocolSocketFactory for SSL communication. + */ + private EasySSLProtocolSocketFactory createProtocolSocketFactory() throws OAuthTokenValidationException { + try { + EasySSLProtocolSocketFactory easySSLPSFactory = new EasySSLProtocolSocketFactory(); + return easySSLPSFactory; + } catch (IOException e) { + String errorMsg = "Failed to initiate EasySSLProtocolSocketFactory."; + throw new OAuthTokenValidationException(errorMsg, e); + } catch (GeneralSecurityException e) { + String errorMsg = "Failed to set the key material in easy ssl factory."; + throw new OAuthTokenValidationException(errorMsg, e); + } + } + + /** + * This created httpclient pool that can be used to connect to external entity. This connection can be configured + * via broker.xml by setting up the required http connection parameters. + * + * @return an instance of HttpClient that is configured with MultiThreadedHttpConnectionManager + */ + private HttpClient createHttpClient() { + HttpConnectionManagerParams params = new HttpConnectionManagerParams(); + params.setDefaultMaxConnectionsPerHost(Integer.parseInt(tokenValidationProperties.getProperty( + UIConstants.MAXIMUM_HTTP_CONNECTION_PER_HOST))); + params.setMaxTotalConnections(Integer.parseInt(tokenValidationProperties.getProperty( + UIConstants.MAXIMUM_TOTAL_HTTP_CONNECTION))); + HttpConnectionManager connectionManager = new MultiThreadedHttpConnectionManager(); + connectionManager.setParams(params); + return new HttpClient(connectionManager); + } +} diff --git a/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/java/oauth/exception/OAuthTokenValidationException.java b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/java/oauth/exception/OAuthTokenValidationException.java new file mode 100644 index 000000000..3f54c5244 --- /dev/null +++ b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/java/oauth/exception/OAuthTokenValidationException.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package oauth.exception; + +/** + * This Exception will be thrown, when there any interference with token validation flow. + */ +public class OAuthTokenValidationException extends Exception { + private String errMessage; + + public OAuthTokenValidationException(String msg, Exception nestedEx) { + super(msg, nestedEx); + setErrorMessage(msg); + } + + public OAuthTokenValidationException(String message, Throwable cause) { + super(message, cause); + setErrorMessage(message); + } + + public OAuthTokenValidationException(String msg) { + super(msg); + setErrorMessage(msg); + } + + public OAuthTokenValidationException() { + super(); + } + + public OAuthTokenValidationException(Throwable cause) { + super(cause); + } + + public String getErrorMessage() { + return errMessage; + } + + public void setErrorMessage(String errMessage) { + this.errMessage = errMessage; + } +} diff --git a/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/java/util/AuthenticationInfo.java b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/java/util/AuthenticationInfo.java new file mode 100644 index 000000000..4e2115cf0 --- /dev/null +++ b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/java/util/AuthenticationInfo.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ + +package util; + +/** + * This is returned after authentication. + */ +public class AuthenticationInfo { + + /** + * this variable is used to check whether the client is authenticated. + */ + private boolean authenticated; + private String username; + private String tenantDomain; + /** + * returns whether the client is authenticated + */ + public boolean isAuthenticated() { + return authenticated; + } + + public void setAuthenticated(boolean authenticated) { + this.authenticated = authenticated; + } + + /** + * returns the authenticated client username + */ + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + /** + * return the authenticated client tenant domain + */ + public String getTenantDomain() { + return tenantDomain; + } + + public void setTenantDomain(String tenantDomain) { + this.tenantDomain = tenantDomain; + } +} diff --git a/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/java/util/ServiceHolder.java b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/java/util/ServiceHolder.java new file mode 100644 index 000000000..1f35b27d0 --- /dev/null +++ b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/java/util/ServiceHolder.java @@ -0,0 +1,27 @@ +package util; + + +import org.wso2.carbon.context.PrivilegedCarbonContext; +import org.wso2.carbon.device.mgt.iot.output.adapter.ui.UIOutputCallbackControllerService; + +public class ServiceHolder { + + private static ServiceHolder instance; + private UIOutputCallbackControllerService uiOutputCallbackControllerService; + + private ServiceHolder(){ + uiOutputCallbackControllerService = (UIOutputCallbackControllerService) PrivilegedCarbonContext + .getThreadLocalCarbonContext().getOSGiService(UIOutputCallbackControllerService.class, null); + } + + public synchronized static ServiceHolder getInstance(){ + if (instance==null){ + instance= new ServiceHolder(); + } + return instance; + } + + public UIOutputCallbackControllerService getUiOutputCallbackControllerService() { + return uiOutputCallbackControllerService; + } +} diff --git a/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/java/util/UIConstants.java b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/java/util/UIConstants.java new file mode 100644 index 000000000..2db13ce81 --- /dev/null +++ b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/java/util/UIConstants.java @@ -0,0 +1,38 @@ +/* + * + * * + * * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * * + * * WSO2 Inc. licenses this file to you under the Apache License, + * * Version 2.0 (the "License"); you may not use this file except + * * in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, + * * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * * KIND, either express or implied. See the License for the + * * specific language governing permissions and limitations + * * under the License. + * * + * + */ + +package util; + +/** + * This class contains the constants related to ui client. + */ +public class UIConstants { + + private UIConstants() { + } + public static final String ADAPTER_UI_COLON = ":"; + public static final String MAXIMUM_TOTAL_HTTP_CONNECTION = "maximumTotalHttpConnection"; + public static final String MAXIMUM_HTTP_CONNECTION_PER_HOST = "maximumHttpConnectionPerHost"; + public static final String TOKEN_VALIDATION_ENDPOINT_URL = "tokenValidationEndpoint"; + public static final String USERNAME = "username"; + public static final String PASSWORD = "password"; +} diff --git a/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/webapp/WEB-INF/web.xml b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/webapp/WEB-INF/web.xml new file mode 100644 index 000000000..2ec7fc7e9 --- /dev/null +++ b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint/src/main/webapp/WEB-INF/web.xml @@ -0,0 +1,24 @@ + + + + + Output WebSocket + diff --git a/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/pom.xml b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/pom.xml new file mode 100644 index 000000000..0739a96d4 --- /dev/null +++ b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/pom.xml @@ -0,0 +1,137 @@ + + + + + + iot-base-plugin + org.wso2.carbon.devicemgt-plugins + 2.1.1-SNAPSHOT + ../pom.xml + + + 4.0.0 + org.wso2.carbon.device.mgt.iot.output.adapter.ui + bundle + WSO2 Carbon - Event Output UI Adapter Module + org.wso2.carbon.event.output.adapter.ui provides the back-end functionality of + ui event adapter + + http://wso2.org + + + + org.wso2.carbon.analytics-common + org.wso2.carbon.event.output.adapter.core + + + org.wso2.carbon + org.wso2.carbon.logging + + + org.wso2.carbon + org.wso2.carbon.core + + + javax.websocket + javax.websocket-api + + + org.wso2.carbon.analytics-common + org.wso2.carbon.databridge.commons + + + com.google.code.gson + gson + + + org.wso2.carbon.analytics-common + org.wso2.carbon.event.stream.core + + + + + + + org.apache.felix + maven-scr-plugin + + + generate-scr-descriptor + + scr + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + org.apache.felix + maven-bundle-plugin + true + + + ${project.artifactId} + ${project.artifactId} + + org.wso2.carbon.device.mgt.iot.output.adapter.ui.internal, + org.wso2.carbon.device.mgt.iot.output.adapter.ui.internal.* + + + !org.wso2.carbon.device.mgt.iot.output.adapter.ui.internal, + !org.wso2.carbon.device.mgt.iot.output.adapter.ui.internal.*, + org.wso2.carbon.device.mgt.iot.output.adapter.ui.* + + + org.wso2.carbon.event.output.adapter.core.*, + javax.xml.namespace; version=0.0.0, + org.apache.axis2, + org.apache.axis2.client, + org.apache.axis2.context, + org.apache.axis2.transport.http, + org.apache.commons.httpclient, + org.apache.commons.httpclient.contrib.ssl, + org.apache.commons.httpclient.params, + org.apache.commons.httpclient.protocol, + org.apache.commons.pool, + org.apache.commons.pool.impl, + org.apache.log4j, + com.google.gson, + javax.websocket, + org.apache.commons.logging, + org.osgi.framework, + org.osgi.service.component, + org.wso2.carbon.context, + org.wso2.carbon.databridge.commons, + org.wso2.carbon.event.stream.core, + org.wso2.carbon.event.stream.core.exception, + org.wso2.carbon.utils + + + + + + + + + + + diff --git a/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/src/main/java/org/wso2/carbon/device/mgt/iot/output/adapter/ui/UIEventAdapter.java b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/src/main/java/org/wso2/carbon/device/mgt/iot/output/adapter/ui/UIEventAdapter.java new file mode 100644 index 000000000..4f2898795 --- /dev/null +++ b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/src/main/java/org/wso2/carbon/device/mgt/iot/output/adapter/ui/UIEventAdapter.java @@ -0,0 +1,455 @@ +/* + * + * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.wso2.carbon.device.mgt.iot.output.adapter.ui; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.wso2.carbon.context.CarbonContext; +import org.wso2.carbon.context.PrivilegedCarbonContext; +import org.wso2.carbon.databridge.commons.Attribute; +import org.wso2.carbon.databridge.commons.Event; +import org.wso2.carbon.databridge.commons.StreamDefinition; +import org.wso2.carbon.device.mgt.iot.output.adapter.ui.internal.UIEventAdaptorServiceDataHolder; +import org.wso2.carbon.device.mgt.iot.output.adapter.ui.util.UIEventAdapterConstants; +import org.wso2.carbon.event.output.adapter.core.EventAdapterUtil; +import org.wso2.carbon.event.output.adapter.core.OutputEventAdapter; +import org.wso2.carbon.event.output.adapter.core.OutputEventAdapterConfiguration; +import org.wso2.carbon.event.output.adapter.core.exception.OutputEventAdapterException; +import org.wso2.carbon.event.output.adapter.core.exception.OutputEventAdapterRuntimeException; +import org.wso2.carbon.event.output.adapter.core.exception.TestConnectionNotSupportedException; +import org.wso2.carbon.device.mgt.iot.output.adapter.ui.util.WebSocketSessionUtil; +import org.wso2.carbon.event.stream.core.EventStreamService; +import org.wso2.carbon.event.stream.core.exception.EventStreamConfigurationException; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Contains the life cycle of executions regarding the UI Adapter + */ + +public class UIEventAdapter implements OutputEventAdapter { + + private static final Log log = LogFactory.getLog(UIEventAdapter.class); + private OutputEventAdapterConfiguration eventAdapterConfiguration; + private Map globalProperties; + private int queueSize; + private LinkedBlockingDeque streamSpecificEvents; + private static ThreadPoolExecutor executorService; + private int tenantId; + private boolean doLogDroppedMessage; + + private String streamId; + private List streamMetaAttributes; + private List streamCorrelationAttributes; + private List streamPayloadAttributes; + + public UIEventAdapter(OutputEventAdapterConfiguration eventAdapterConfiguration, Map globalProperties) { + this.eventAdapterConfiguration = eventAdapterConfiguration; + this.globalProperties = globalProperties; + this.doLogDroppedMessage = true; + } + + @Override + public void init() throws OutputEventAdapterException { + + tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId(); + + //ExecutorService will be assigned if it is null + if (executorService == null) { + int minThread; + int maxThread; + long defaultKeepAliveTime; + int jobQueSize; + + //If global properties are available those will be assigned else constant values will be assigned + if (globalProperties.get(UIEventAdapterConstants.ADAPTER_MIN_THREAD_POOL_SIZE_NAME) != null) { + minThread = Integer.parseInt(globalProperties.get( + UIEventAdapterConstants.ADAPTER_MIN_THREAD_POOL_SIZE_NAME)); + } else { + minThread = UIEventAdapterConstants.ADAPTER_MIN_THREAD_POOL_SIZE; + } + + if (globalProperties.get(UIEventAdapterConstants.ADAPTER_MAX_THREAD_POOL_SIZE_NAME) != null) { + maxThread = Integer.parseInt(globalProperties.get( + UIEventAdapterConstants.ADAPTER_MAX_THREAD_POOL_SIZE_NAME)); + } else { + maxThread = UIEventAdapterConstants.ADAPTER_MAX_THREAD_POOL_SIZE; + } + + if (globalProperties.get(UIEventAdapterConstants.ADAPTER_KEEP_ALIVE_TIME_NAME) != null) { + defaultKeepAliveTime = Integer.parseInt(globalProperties.get( + UIEventAdapterConstants.ADAPTER_KEEP_ALIVE_TIME_NAME)); + } else { + defaultKeepAliveTime = UIEventAdapterConstants.DEFAULT_KEEP_ALIVE_TIME_IN_MILLIS; + } + + if (globalProperties.get(UIEventAdapterConstants.ADAPTER_EXECUTOR_JOB_QUEUE_SIZE_NAME) != null) { + jobQueSize = Integer.parseInt(globalProperties.get( + UIEventAdapterConstants.ADAPTER_EXECUTOR_JOB_QUEUE_SIZE_NAME)); + } else { + jobQueSize = UIEventAdapterConstants.ADAPTER_EXECUTOR_JOB_QUEUE_SIZE; + } + + executorService = new ThreadPoolExecutor(minThread, maxThread, defaultKeepAliveTime, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(jobQueSize)); + } + + streamId = eventAdapterConfiguration.getOutputStreamIdOfWso2eventMessageFormat(); + if (streamId == null || streamId.isEmpty()) { + throw new OutputEventAdapterRuntimeException("UI event adapter needs a output stream id"); + } + + // fetch the "streamDefinition" corresponding to the "streamId" and then fetch the different attribute types + // of the streamDefinition corresponding to the event's streamId. They are required when validating values in + // the events against the streamDef attributes. + StreamDefinition streamDefinition = getStreamDefinition(streamId); + streamMetaAttributes = streamDefinition.getMetaData(); + streamCorrelationAttributes = streamDefinition.getCorrelationData(); + streamPayloadAttributes = streamDefinition.getPayloadData(); + + ConcurrentHashMap> tenantSpecifcEventOutputAdapterMap = + UIEventAdaptorServiceDataHolder.getTenantSpecificOutputEventStreamAdapterMap(); + + ConcurrentHashMap streamSpecifAdapterMap = tenantSpecifcEventOutputAdapterMap.get(tenantId); + + if (streamSpecifAdapterMap == null) { + streamSpecifAdapterMap = new ConcurrentHashMap<>(); + if (null != tenantSpecifcEventOutputAdapterMap.putIfAbsent(tenantId, streamSpecifAdapterMap)) { + streamSpecifAdapterMap = tenantSpecifcEventOutputAdapterMap.get(tenantId); + } + } + + String adapterName = streamSpecifAdapterMap.get(streamId); + + if (adapterName != null) { + throw new OutputEventAdapterException(("An Output ui event adapter \"" + adapterName + "\" is already" + + " exist for stream id \"" + streamId + "\"")); + } else { + streamSpecifAdapterMap.put(streamId, eventAdapterConfiguration.getName()); + + ConcurrentHashMap>> tenantSpecificStreamMap = + UIEventAdaptorServiceDataHolder.getTenantSpecificStreamEventMap(); + ConcurrentHashMap> streamSpecificEventsMap = + tenantSpecificStreamMap.get(tenantId); + if (streamSpecificEventsMap == null) { + streamSpecificEventsMap = new ConcurrentHashMap<>(); + if (null != tenantSpecificStreamMap.putIfAbsent(tenantId, streamSpecificEventsMap)) { + streamSpecificEventsMap = tenantSpecificStreamMap.get(tenantId); + } + } + streamSpecificEvents = streamSpecificEventsMap.get(streamId); + if (streamSpecificEvents == null) { + streamSpecificEvents = new LinkedBlockingDeque<>(); + if (null != streamSpecificEventsMap.putIfAbsent(streamId, streamSpecificEvents)) { + streamSpecificEvents = streamSpecificEventsMap.get(streamId); + } + } + } + + if (globalProperties.get(UIEventAdapterConstants.ADAPTER_EVENT_QUEUE_SIZE_NAME) != null) { + try { + queueSize = Integer.parseInt( + globalProperties.get(UIEventAdapterConstants.ADAPTER_EVENT_QUEUE_SIZE_NAME)); + } catch (NumberFormatException e) { + log.error("String does not have the appropriate format for conversion." + e.getMessage()); + queueSize = UIEventAdapterConstants.EVENTS_QUEUE_SIZE; + } + } else { + queueSize = UIEventAdapterConstants.EVENTS_QUEUE_SIZE; + } + } + + @Override + public void testConnect() throws TestConnectionNotSupportedException { + throw new TestConnectionNotSupportedException("Test connection is not available"); + } + + @Override + public void connect() { + //Not needed + } + + @Override + public void publish(Object message, Map dynamicProperties) { + + Event event = (Event) message; + StringBuilder eventBuilder = new StringBuilder("["); + + if (streamSpecificEvents.size() == queueSize) { + streamSpecificEvents.removeFirst(); + } + + eventBuilder.append(event.getTimeStamp()); + + if (event.getMetaData() != null) { + eventBuilder.append(","); + Object[] metaData = event.getMetaData(); + for (int i = 0; i < metaData.length; i++) { + eventBuilder.append("\""); + eventBuilder.append(metaData[i]); + eventBuilder.append("\""); + if (i != (metaData.length - 1)) { + eventBuilder.append(","); + } + } + } + + if (event.getCorrelationData() != null) { + Object[] correlationData = event.getCorrelationData(); + + eventBuilder.append(","); + + for (int i = 0; i < correlationData.length; i++) { + eventBuilder.append("\""); + eventBuilder.append(correlationData[i]); + eventBuilder.append("\""); + if (i != (correlationData.length - 1)) { + eventBuilder.append(","); + } + } + } + + if (event.getPayloadData() != null) { + Object[] payloadData = event.getPayloadData(); + eventBuilder.append(","); + for (int i = 0; i < payloadData.length; i++) { + eventBuilder.append("\""); + eventBuilder.append(payloadData[i]); + eventBuilder.append("\""); + if (i != (payloadData.length - 1)) { + eventBuilder.append(","); + } + } + } + + eventBuilder.append("]"); + String eventString = eventBuilder.toString(); + Object[] eventValues = new Object[UIEventAdapterConstants.INDEX_TWO]; + eventValues[UIEventAdapterConstants.INDEX_ZERO] = eventString; + eventValues[UIEventAdapterConstants.INDEX_ONE] = System.currentTimeMillis(); + streamSpecificEvents.add(eventValues); + + // fetch all valid sessions checked against any queryParameters provided when subscribing. + CopyOnWriteArrayList validSessions = getValidSessions(event); + + try { + executorService.execute(new WebSocketSender(validSessions, eventString)); + } catch (RejectedExecutionException e) { + EventAdapterUtil.logAndDrop(eventAdapterConfiguration.getName(), message, "Job queue is full", e, log, + tenantId); + } + + } + + @Override + public void disconnect() { + //Not needed + } + + @Override + public void destroy() { + + int tenantId = CarbonContext.getThreadLocalCarbonContext().getTenantId(); + + ConcurrentHashMap tenantSpecificAdapterMap = UIEventAdaptorServiceDataHolder + .getTenantSpecificOutputEventStreamAdapterMap().get(tenantId); + if (tenantSpecificAdapterMap != null && streamId != null) { + tenantSpecificAdapterMap.remove(streamId); //Removing outputadapter and streamId + } + + ConcurrentHashMap> tenantSpecificStreamEventMap = + UIEventAdaptorServiceDataHolder.getTenantSpecificStreamEventMap().get(tenantId); + if (tenantSpecificStreamEventMap != null && streamId != null) { + //Removing the streamId and events registered for the output adapter + tenantSpecificStreamEventMap.remove(streamId); + } + } + + @Override + public boolean isPolled() { + return true; + } + + /** + * Fetch the StreamDefinition corresponding to the given StreamId from the EventStreamService. + * + * @param streamId the streamId of this UIEventAdaptor. + * @return the "StreamDefinition" object corresponding to the streamId of this EventAdaptor. + * @throws OutputEventAdapterException if the "EventStreamService" OSGI service is unavailable/unregistered or if + * the matching Steam-Definition for the given StreamId cannot be retrieved. + */ + private StreamDefinition getStreamDefinition(String streamId) throws OutputEventAdapterException { + EventStreamService eventStreamService = UIEventAdaptorServiceDataHolder.getEventStreamService(); + if (eventStreamService != null) { + try { + return eventStreamService.getStreamDefinition(streamId); + } catch (EventStreamConfigurationException e) { + String adaptorType = eventAdapterConfiguration.getType(); + String adaptorName = eventAdapterConfiguration.getName(); + String errorMsg = "Error while retrieving Stream-Definition for Stream with id [" + streamId + "] " + + "for Adaptor [" + adaptorName + "] of type [" + adaptorType + "]."; + log.error(errorMsg); + throw new OutputEventAdapterException(errorMsg, e); + } + } + throw new OutputEventAdapterException( + "Could not retrieve the EventStreamService whilst trying to fetch the Stream-Definition of Stream " + + "with Id [" + streamId + "]."); + } + + /** + * Fetches all valid web-socket sessions from the entire pool of subscribed sessions. The validity is checked + * against any queryString provided when subscribing to the web-socket endpoint. + * + * @param event the current event received and that which needs to be published to subscribed sessions. + * @return a list of all validated web-socket sessions against the queryString values. + */ + private CopyOnWriteArrayList getValidSessions(Event event) { + CopyOnWriteArrayList validSessions = new CopyOnWriteArrayList<>(); + UIOutputCallbackControllerServiceImpl uiOutputCallbackControllerServiceImpl = + UIEventAdaptorServiceDataHolder.getUIOutputCallbackRegisterServiceImpl(); + // get all subscribed web-socket sessions. + CopyOnWriteArrayList webSocketSessionUtils = + uiOutputCallbackControllerServiceImpl.getSessions(tenantId, streamId); + if (webSocketSessionUtils != null) { + for (WebSocketSessionUtil webSocketSessionUtil : webSocketSessionUtils) { + boolean isValidSession = validateEventAgainstSessionFilters(event, webSocketSessionUtil); + if (isValidSession) { + validSessions.add(webSocketSessionUtil); + } + } + } + return validSessions; + } + + + /** + * Processes the given session's validity to receive the current "event" against any queryParams that was used at + * the time when the web-socket-session is subscribed. This method can be extended to validate the event against + * any additional attribute of the given session too. + * + * @param event the current event received and that which needs to be published to subscribed + * sessions. + * @param webSocketSessionUtil the session which needs validated for its authenticity to receive this event. + * @return "true" if the session is valid to receive the event else "false". + */ + private boolean validateEventAgainstSessionFilters(Event event, WebSocketSessionUtil webSocketSessionUtil) { + + // fetch the queryString Key:Value pair map of the given session. + Map queryParamValuePairs = webSocketSessionUtil.getQueryParamValuePairs(); + if (queryParamValuePairs != null) { + // fetch the different attribute values received as part of the current event. + Object[] eventMetaData = event.getMetaData(); + Object[] eventCorrelationData = event.getCorrelationData(); + Object[] eventPayloadData = event.getPayloadData(); + + if (streamMetaAttributes != null) { + for (int i = 0; i < streamMetaAttributes.size(); i++) { + String attributeName = streamMetaAttributes.get(i).getName(); + String queryValue = queryParamValuePairs.get(attributeName); + + if (queryValue != null && + (eventMetaData == null || !eventMetaData[i].toString().equals(queryValue))) { + return false; + } + } + } + + if (streamCorrelationAttributes != null) { + for (int i = 0; i < streamCorrelationAttributes.size(); i++) { + String attributeName = streamCorrelationAttributes.get(i).getName(); + String queryValue = queryParamValuePairs.get(attributeName); + + if (queryValue != null && + (eventCorrelationData == null || !eventCorrelationData[i].toString().equals(queryValue))) { + return false; + } + } + } + + if (streamPayloadAttributes != null) { + for (int i = 0; i < streamPayloadAttributes.size(); i++) { + String attributeName = streamPayloadAttributes.get(i).getName(); + String queryValue = queryParamValuePairs.get(attributeName); + + if (queryValue != null && (eventPayloadData == null || !eventPayloadData[i].toString().equals( + queryValue))) { + return false; + } + } + } + } + return true; + } + + private class WebSocketSender implements Runnable { + + private String message; + private CopyOnWriteArrayList webSocketSessionUtils; + + public WebSocketSender(CopyOnWriteArrayList webSocketSessionUtils, String message) { + this.webSocketSessionUtils = webSocketSessionUtils; + this.message = message; + } + + /** + * When an object implementing interface Runnable is used + * to create a thread, starting the thread causes the object's + * run method to be called in that separately executing + * thread. + *

+ * The general contract of the method run is that it may + * take any action whatsoever. + * + * @see Thread#run() + */ + @Override + public void run() { + if (webSocketSessionUtils != null) { + doLogDroppedMessage = true; + for (WebSocketSessionUtil webSocketSessionUtil : webSocketSessionUtils) { + synchronized (WebSocketSessionUtil.class) { + try { + webSocketSessionUtil.getSession().getBasicRemote().sendText(message); + } catch (IOException e) { + EventAdapterUtil.logAndDrop(eventAdapterConfiguration.getName(), message, + "Cannot send to endpoint", e, log, tenantId); + } + } + } + } else if (doLogDroppedMessage) { + EventAdapterUtil.logAndDrop(eventAdapterConfiguration.getName(), message, "No clients registered", log, + tenantId); + doLogDroppedMessage = false; + } + } + } +} + diff --git a/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/src/main/java/org/wso2/carbon/device/mgt/iot/output/adapter/ui/UIEventAdapterFactory.java b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/src/main/java/org/wso2/carbon/device/mgt/iot/output/adapter/ui/UIEventAdapterFactory.java new file mode 100644 index 000000000..809518ce2 --- /dev/null +++ b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/src/main/java/org/wso2/carbon/device/mgt/iot/output/adapter/ui/UIEventAdapterFactory.java @@ -0,0 +1,81 @@ +/* + * + * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.wso2.carbon.device.mgt.iot.output.adapter.ui; + +import org.wso2.carbon.device.mgt.iot.output.adapter.ui.util.UIEventAdapterConstants; +import org.wso2.carbon.event.output.adapter.core.MessageType; +import org.wso2.carbon.event.output.adapter.core.OutputEventAdapter; +import org.wso2.carbon.event.output.adapter.core.OutputEventAdapterConfiguration; +import org.wso2.carbon.event.output.adapter.core.OutputEventAdapterFactory; +import org.wso2.carbon.event.output.adapter.core.Property; +import org.wso2.carbon.utils.CarbonUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.ResourceBundle; + +/** + * The UI event adapter factory class to create a UI output adapter + */ +public class UIEventAdapterFactory extends OutputEventAdapterFactory { + + private ResourceBundle resourceBundle = ResourceBundle.getBundle("org.wso2.carbon.device.mgt.iot.output.adapter.ui.i18n" + + ".Resources", Locale.getDefault()); + + public UIEventAdapterFactory() { + } + + @Override + public String getType() { + return UIEventAdapterConstants.ADAPTER_TYPE_UI; + } + + @Override + public List getSupportedMessageFormats() { + List supportedMessageFormats = new ArrayList(); + supportedMessageFormats.add(MessageType.WSO2EVENT); + return supportedMessageFormats; + } + + @Override + public List getStaticPropertyList() { + return null; + } + + @Override + public List getDynamicPropertyList() { + return null; + } + + @Override + public String getUsageTips() { + return resourceBundle.getString(UIEventAdapterConstants.ADAPTER_USAGE_TIPS_PREFIX) + " " + + resourceBundle.getString(UIEventAdapterConstants.ADAPTER_USAGE_TIPS_POSTFIX); + } + + @Override + public OutputEventAdapter createEventAdapter(OutputEventAdapterConfiguration eventAdapterConfiguration, + Map globalProperties) { + return new UIEventAdapter(eventAdapterConfiguration, globalProperties); + } + +} diff --git a/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/src/main/java/org/wso2/carbon/device/mgt/iot/output/adapter/ui/UIOutputCallbackControllerService.java b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/src/main/java/org/wso2/carbon/device/mgt/iot/output/adapter/ui/UIOutputCallbackControllerService.java new file mode 100644 index 000000000..74cb77ddf --- /dev/null +++ b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/src/main/java/org/wso2/carbon/device/mgt/iot/output/adapter/ui/UIOutputCallbackControllerService.java @@ -0,0 +1,50 @@ +/* + * + * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.wso2.carbon.device.mgt.iot.output.adapter.ui; + +import javax.websocket.Session; + +/** + * This interface is exposed as an OSGI service, which will be invoked by the local websocket endpoint to inform new subscriptions; and do un-subscriptions.. + */ +public interface UIOutputCallbackControllerService { + + /** + * Used to subscribe the session id and stream id for later web socket connectivity + * + * @param streamName - Stream name which user register to. + * @param version - Stream version which user uses. + * @param session - Session which user registered. + * @return + */ + void subscribeWebsocket(String streamName, String version, Session session); + + /** + * Used to return events per streamId + * + * @param streamName - Stream name which user register to. + * @param version - Stream version which user uses. + * @param session - Session which user subscribed to. + * @return the events list. + */ + void unsubscribeWebsocket(String streamName, String version, Session session); + +} diff --git a/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/src/main/java/org/wso2/carbon/device/mgt/iot/output/adapter/ui/UIOutputCallbackControllerServiceImpl.java b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/src/main/java/org/wso2/carbon/device/mgt/iot/output/adapter/ui/UIOutputCallbackControllerServiceImpl.java new file mode 100644 index 000000000..0c9602033 --- /dev/null +++ b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/src/main/java/org/wso2/carbon/device/mgt/iot/output/adapter/ui/UIOutputCallbackControllerServiceImpl.java @@ -0,0 +1,149 @@ +/* + * + * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.wso2.carbon.device.mgt.iot.output.adapter.ui; + +import com.google.gson.JsonObject; +import org.wso2.carbon.context.PrivilegedCarbonContext; +import org.wso2.carbon.device.mgt.iot.output.adapter.ui.internal.UIEventAdaptorServiceDataHolder; +import org.wso2.carbon.device.mgt.iot.output.adapter.ui.util.WebSocketSessionUtil; +import org.wso2.carbon.device.mgt.iot.output.adapter.ui.util.UIEventAdapterConstants; + +import javax.websocket.Session; +import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.LinkedBlockingDeque; + +/** + * Service implementation class which exposes to front end + */ +public class UIOutputCallbackControllerServiceImpl implements UIOutputCallbackControllerService { + + private ConcurrentHashMap>> + outputEventAdaptorSessionMap; + + public UIOutputCallbackControllerServiceImpl() { + outputEventAdaptorSessionMap = new ConcurrentHashMap<>(); + } + + /** + * Used to subscribe the session id and stream id for later web socket connectivity + * + * @param streamName - Stream name which user register to. + * @param version - Stream version which user uses. + * @param session - Session which user registered. + */ + public void subscribeWebsocket(String streamName, String version, Session session) { + + int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId(); + + if (version == null || " ".equals(version)) { + version = UIEventAdapterConstants.ADAPTER_UI_DEFAULT_OUTPUT_STREAM_VERSION; + } + String streamId = streamName + UIEventAdapterConstants.ADAPTER_UI_COLON + version; + ConcurrentHashMap> tenantSpecificAdaptorMap = + outputEventAdaptorSessionMap.get(tenantId); + if (tenantSpecificAdaptorMap == null) { + tenantSpecificAdaptorMap = new ConcurrentHashMap<>(); + if (null != outputEventAdaptorSessionMap.putIfAbsent(tenantId, tenantSpecificAdaptorMap)) { + tenantSpecificAdaptorMap = outputEventAdaptorSessionMap.get(tenantId); + } + } + CopyOnWriteArrayList adapterSpecificSessions = tenantSpecificAdaptorMap.get(streamId); + if (adapterSpecificSessions == null) { + adapterSpecificSessions = new CopyOnWriteArrayList<>(); + if (null != tenantSpecificAdaptorMap.putIfAbsent(streamId, adapterSpecificSessions)) { + adapterSpecificSessions = tenantSpecificAdaptorMap.get(streamId); + } + } + + WebSocketSessionUtil webSocketSessionUtil = new WebSocketSessionUtil(session); + adapterSpecificSessions.add(webSocketSessionUtil); + } + + /** + * Used to return registered sessions per streamId + * + * @param tenantId - Tenant id of the user. + * @param streamId - Stream name and version which user register to. + * @return the sessions list. + */ + public CopyOnWriteArrayList getSessions(int tenantId, String streamId) { + ConcurrentHashMap> tenantSpecificAdaptorMap + = outputEventAdaptorSessionMap.get(tenantId); + if (tenantSpecificAdaptorMap != null) { + return tenantSpecificAdaptorMap.get(streamId); + } + return null; + } + + /** + * Used to return events per streamId + * + * @param tenanId - Tenant id of the user. + * @param streamName - Stream name which user register to. + * @param version - Stream version which user uses. + * @return the events list. + */ + public LinkedBlockingDeque getEvents(int tenanId, String streamName, String version) { + ConcurrentHashMap> tenantSpecificStreamMap = + UIEventAdaptorServiceDataHolder.getTenantSpecificStreamEventMap().get(tenanId); + if (tenantSpecificStreamMap != null) { + String streamId = streamName + UIEventAdapterConstants.ADAPTER_UI_COLON + version; + return tenantSpecificStreamMap.get(streamId); + } + return null; + } + + /** + * Used to return events per streamId + * + * @param streamName - Stream name which user register to. + * @param version - Stream version which user uses. + * @param session - Session which user subscribed to. + */ + public void unsubscribeWebsocket(String streamName, String version, Session session) { + int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId(); + if (version == null || " ".equals(version)) { + version = UIEventAdapterConstants.ADAPTER_UI_DEFAULT_OUTPUT_STREAM_VERSION; + } + String id = streamName + UIEventAdapterConstants.ADAPTER_UI_COLON + version; + ConcurrentHashMap> tenantSpecificAdaptorMap + = outputEventAdaptorSessionMap.get(tenantId); + if (tenantSpecificAdaptorMap != null) { + CopyOnWriteArrayList adapterSpecificSessions = tenantSpecificAdaptorMap.get(id); + if (adapterSpecificSessions != null) { + WebSocketSessionUtil sessionToRemove = null; + Iterator iterator = adapterSpecificSessions.iterator(); + while (iterator.hasNext()) { + WebSocketSessionUtil webSocketSessionUtil = iterator.next(); + if (session.getId().equals(webSocketSessionUtil.getSession().getId())) { + sessionToRemove = webSocketSessionUtil; + break; + } + } + if (sessionToRemove != null) { + adapterSpecificSessions.remove(sessionToRemove); + } + } + } + } + +} diff --git a/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/src/main/java/org/wso2/carbon/device/mgt/iot/output/adapter/ui/internal/UIEventAdaptorServiceDataHolder.java b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/src/main/java/org/wso2/carbon/device/mgt/iot/output/adapter/ui/internal/UIEventAdaptorServiceDataHolder.java new file mode 100644 index 000000000..dd0894985 --- /dev/null +++ b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/src/main/java/org/wso2/carbon/device/mgt/iot/output/adapter/ui/internal/UIEventAdaptorServiceDataHolder.java @@ -0,0 +1,67 @@ +/* + * + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.wso2.carbon.device.mgt.iot.output.adapter.ui.internal; + +import org.wso2.carbon.device.mgt.iot.output.adapter.ui.UIOutputCallbackControllerServiceImpl; +import org.wso2.carbon.event.stream.core.EventStreamService; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingDeque; + +/** + * Creates a holder of type UIOutputCallbackRegisterServiceImpl. + */ +public final class UIEventAdaptorServiceDataHolder { + + private static UIOutputCallbackControllerServiceImpl UIOutputCallbackRegisterServiceImpl; + private static ConcurrentHashMap> + tenantSpecificOutputEventStreamAdapterMap = new ConcurrentHashMap>(); + private static ConcurrentHashMap>> + tenantSpecificStreamEventMap = new ConcurrentHashMap>>(); + private static EventStreamService eventStreamService; + + public static void registerEventStreamService(EventStreamService eventBuilderService) { + UIEventAdaptorServiceDataHolder.eventStreamService = eventBuilderService; + } + + public static EventStreamService getEventStreamService() { + return UIEventAdaptorServiceDataHolder.eventStreamService; + } + + public static void registerUIOutputCallbackRegisterServiceInternal( + UIOutputCallbackControllerServiceImpl UIOutputCallbackRegisterServiceImpl) { + UIEventAdaptorServiceDataHolder.UIOutputCallbackRegisterServiceImpl = + UIOutputCallbackRegisterServiceImpl; + } + + public static UIOutputCallbackControllerServiceImpl getUIOutputCallbackRegisterServiceImpl() { + return UIEventAdaptorServiceDataHolder.UIOutputCallbackRegisterServiceImpl; + } + + public static ConcurrentHashMap> + getTenantSpecificOutputEventStreamAdapterMap() { + return tenantSpecificOutputEventStreamAdapterMap; + } + + public static ConcurrentHashMap>> + getTenantSpecificStreamEventMap() { + return tenantSpecificStreamEventMap; + } +} diff --git a/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/src/main/java/org/wso2/carbon/device/mgt/iot/output/adapter/ui/internal/UILocalEventAdapterServiceComponent.java b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/src/main/java/org/wso2/carbon/device/mgt/iot/output/adapter/ui/internal/UILocalEventAdapterServiceComponent.java new file mode 100644 index 000000000..341d4b377 --- /dev/null +++ b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/src/main/java/org/wso2/carbon/device/mgt/iot/output/adapter/ui/internal/UILocalEventAdapterServiceComponent.java @@ -0,0 +1,79 @@ +/* + * + * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.wso2.carbon.device.mgt.iot.output.adapter.ui.internal; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.osgi.service.component.ComponentContext; +import org.wso2.carbon.device.mgt.iot.output.adapter.ui.UIEventAdapterFactory; +import org.wso2.carbon.device.mgt.iot.output.adapter.ui.UIOutputCallbackControllerServiceImpl; +import org.wso2.carbon.event.output.adapter.core.OutputEventAdapterFactory; +import org.wso2.carbon.device.mgt.iot.output.adapter.ui.UIOutputCallbackControllerService; +import org.wso2.carbon.event.stream.core.EventStreamService; + +/** + * @scr.component component.name="output.extensions.Ui.AdapterService.component" immediate="true" + * @scr.reference name="eventStreamService.service" + * interface="org.wso2.carbon.event.stream.core.EventStreamService" cardinality="1..1" + * policy="dynamic" bind="setEventStreamService" unbind="unsetEventStreamService" + */ +public class UILocalEventAdapterServiceComponent { + + private static final Log log = LogFactory.getLog(UILocalEventAdapterServiceComponent.class); + + /** + * initialize the ui adapter service here service here. + * + * @param context + */ + protected void activate(ComponentContext context) { + + try { + OutputEventAdapterFactory uiEventAdapterFactory = new UIEventAdapterFactory(); + context.getBundleContext().registerService(OutputEventAdapterFactory.class.getName(), uiEventAdapterFactory, null); + UIOutputCallbackControllerServiceImpl UIOutputCallbackRegisterServiceImpl = + new UIOutputCallbackControllerServiceImpl(); + context.getBundleContext().registerService(UIOutputCallbackControllerService.class.getName(), + UIOutputCallbackRegisterServiceImpl, null); + + UIEventAdaptorServiceDataHolder.registerUIOutputCallbackRegisterServiceInternal( + UIOutputCallbackRegisterServiceImpl); + if (log.isDebugEnabled()) { + log.debug("Successfully deployed the output ui adapter service"); + } + } catch (RuntimeException e) { + log.error("Can not create the output ui adapter service ", e); + } + } + + protected void setEventStreamService(EventStreamService eventStreamService) { + if (log.isDebugEnabled()) { + log.debug("Setting the EventStreamService reference for the UILocalEventAdaptor Service"); + } + UIEventAdaptorServiceDataHolder.registerEventStreamService(eventStreamService); + } + + protected void unsetEventStreamService(EventStreamService eventStreamService) { + if (log.isDebugEnabled()) { + log.debug("Un-Setting the EventStreamService reference for the UILocalEventAdaptor Service"); + } + UIEventAdaptorServiceDataHolder.registerEventStreamService(null); + } +} diff --git a/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/src/main/java/org/wso2/carbon/device/mgt/iot/output/adapter/ui/util/UIEventAdapterConstants.java b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/src/main/java/org/wso2/carbon/device/mgt/iot/output/adapter/ui/util/UIEventAdapterConstants.java new file mode 100644 index 000000000..8fdea199c --- /dev/null +++ b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/src/main/java/org/wso2/carbon/device/mgt/iot/output/adapter/ui/util/UIEventAdapterConstants.java @@ -0,0 +1,51 @@ +/* + * + * Copyright (c) 2014-2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.wso2.carbon.device.mgt.iot.output.adapter.ui.util; + +/** + * This class contains the constants related to ui Output Event Adaptor. + */ +public class UIEventAdapterConstants { + + private UIEventAdapterConstants() { + } + + public static final String ADAPTER_TYPE_UI = "iot-ui"; + public static final String ADAPTER_USAGE_TIPS_PREFIX = "ui.usage.tips_prefix"; + public static final String ADAPTER_USAGE_TIPS_POSTFIX = "ui.usage.tips_postfix"; + public static final String ADAPTER_UI_DEFAULT_OUTPUT_STREAM_VERSION = "1.0.0"; + public static final String ADAPTER_UI_COLON = ":"; + public static final int INDEX_ZERO = 0; + public static final int INDEX_ONE = 1; + public static final int INDEX_TWO = 2; + + public static final int ADAPTER_MIN_THREAD_POOL_SIZE = 8; + public static final int ADAPTER_MAX_THREAD_POOL_SIZE = 100; + public static final int ADAPTER_EXECUTOR_JOB_QUEUE_SIZE = 2000; + public static final long DEFAULT_KEEP_ALIVE_TIME_IN_MILLIS = 20000; + public static final String ADAPTER_MIN_THREAD_POOL_SIZE_NAME = "minThread"; + public static final String ADAPTER_MAX_THREAD_POOL_SIZE_NAME = "maxThread"; + public static final String ADAPTER_KEEP_ALIVE_TIME_NAME = "keepAliveTimeInMillis"; + public static final String ADAPTER_EXECUTOR_JOB_QUEUE_SIZE_NAME = "jobQueueSize"; + + public static final String ADAPTER_EVENT_QUEUE_SIZE_NAME = "eventQueueSize"; + public static final int EVENTS_QUEUE_SIZE = 30; + +} diff --git a/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/src/main/java/org/wso2/carbon/device/mgt/iot/output/adapter/ui/util/WebSocketSessionUtil.java b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/src/main/java/org/wso2/carbon/device/mgt/iot/output/adapter/ui/util/WebSocketSessionUtil.java new file mode 100644 index 000000000..7c083f8f0 --- /dev/null +++ b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/src/main/java/org/wso2/carbon/device/mgt/iot/output/adapter/ui/util/WebSocketSessionUtil.java @@ -0,0 +1,62 @@ +package org.wso2.carbon.device.mgt.iot.output.adapter.ui.util; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.websocket.Session; +import java.util.HashMap; +import java.util.Map; + +/** + * This is wrapper class over the javax.websocket.Session implementation. This class contains additional attributes + * of the Session object derived from processing some of the (default) existing attributes. + * Ex: Query-String's [Key:Value] Map derived from the queryString attribute of the original class. + */ +public class WebSocketSessionUtil { + private static final Log log = LogFactory.getLog(WebSocketSessionUtil.class); + + private static final String QUERY_STRING_SEPERATOR = "&"; + private static final String QUERY_KEY_VALUE_SEPERATOR = "="; + private Map queryParamValuePairs = null; + private Session session; + + public WebSocketSessionUtil(Session session) { + this.session = session; + setQueryParamValuePairs(); + } + + public Map getQueryParamValuePairs() { + return queryParamValuePairs; + } + + public Session getSession() { + return session; + } + + /** + * Processes the queryString from the current instance's Session attribute and constructs a map of Query + * Key:Value pair. + */ + private void setQueryParamValuePairs() { + if (session.getQueryString() != null) { + String queryString = session.getQueryString(); + String[] allQueryParamPairs = queryString.split(QUERY_STRING_SEPERATOR); + + for (String keyValuePair : allQueryParamPairs) { + String[] thisQueryParamPair = keyValuePair.split(QUERY_KEY_VALUE_SEPERATOR); + + if (thisQueryParamPair.length != 2) { + log.warn("Invalid query string [" + queryString + "] passed in."); + break; + } + + if (queryParamValuePairs == null) { + queryParamValuePairs = new HashMap<>(); + } + + queryParamValuePairs.put(thisQueryParamPair[0], thisQueryParamPair[1]); + } + } + } +} diff --git a/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/src/main/resources/org/wso2/carbon/device/mgt/iot/output/adapter/ui/i18n/Resources.properties b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/src/main/resources/org/wso2/carbon/device/mgt/iot/output/adapter/ui/i18n/Resources.properties new file mode 100644 index 000000000..4a3dcba8e --- /dev/null +++ b/components/iot-plugins/iot-base-plugin/org.wso2.carbon.device.mgt.iot.output.adapter.ui/src/main/resources/org/wso2/carbon/device/mgt/iot/output/adapter/ui/i18n/Resources.properties @@ -0,0 +1,22 @@ +# +# Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +# +# WSO2 Inc. licenses this file to you under the Apache License, +# Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +output.event.stream.name=Output Stream Name +output.event.stream.version=Output Stream Version +ui.usage.tips_prefix=There must be an UI output adaptor for each stream to be visualized +ui.usage.tips_postfix= via Analytics Dashboard. diff --git a/components/iot-plugins/iot-base-plugin/pom.xml b/components/iot-plugins/iot-base-plugin/pom.xml index 6db848b96..e4f313c65 100644 --- a/components/iot-plugins/iot-base-plugin/pom.xml +++ b/components/iot-plugins/iot-base-plugin/pom.xml @@ -29,7 +29,7 @@ 4.0.0 iot-base-plugin pom - WSO2 Carbon - Arduino Plugin + WSO2 Carbon - IoT Base Plugin http://wso2.org @@ -37,6 +37,8 @@ org.wso2.carbon.device.mgt.iot.ui org.wso2.carbon.device.mgt.iot.output.adapter.mqtt org.wso2.carbon.device.mgt.iot.output.adapter.xmpp + org.wso2.carbon.device.mgt.iot.output.adapter.ui + org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint org.wso2.carbon.device.mgt.iot.input.adapter.extension org.wso2.carbon.device.mgt.iot.input.adapter.http org.wso2.carbon.device.mgt.iot.input.adapter.mqtt diff --git a/features/iot-plugins-feature/iot-base-plugin-feature/org.wso2.carbon.device.mgt.iot.adapter.feature/pom.xml b/features/iot-plugins-feature/iot-base-plugin-feature/org.wso2.carbon.device.mgt.iot.adapter.feature/pom.xml index c35442bf8..79a61d850 100644 --- a/features/iot-plugins-feature/iot-base-plugin-feature/org.wso2.carbon.device.mgt.iot.adapter.feature/pom.xml +++ b/features/iot-plugins-feature/iot-base-plugin-feature/org.wso2.carbon.device.mgt.iot.adapter.feature/pom.xml @@ -44,6 +44,10 @@ org.wso2.carbon.devicemgt-plugins org.wso2.carbon.device.mgt.iot.output.adapter.xmpp + + org.wso2.carbon.devicemgt-plugins + org.wso2.carbon.device.mgt.iot.output.adapter.ui + org.wso2.carbon.devicemgt-plugins org.wso2.carbon.device.mgt.iot.input.adapter.extension @@ -64,6 +68,32 @@ + + org.apache.maven.plugins + maven-dependency-plugin + + + copy + package + + copy + + + + + org.wso2.carbon.devicemgt-plugins + org.wso2.carbon.device.mgt.iot.output.adapter.ui.endpoint + ${carbon.devicemgt.plugins.version} + war + true + ${project.build.directory}/maven-shared-archive-resources/webapps/ + secured-outputui.war + + + + + + maven-resources-plugin @@ -127,6 +157,9 @@ org.wso2.carbon.devicemgt-plugins:org.wso2.carbon.device.mgt.iot.input.adapter.xmpp:${carbon.devicemgt.plugins.version} + + org.wso2.carbon.devicemgt-plugins:org.wso2.carbon.device.mgt.iot.output.adapter.ui:${carbon.devicemgt.plugins.version} + diff --git a/features/iot-plugins-feature/iot-base-plugin-feature/org.wso2.carbon.device.mgt.iot.adapter.feature/src/main/resources/p2.inf b/features/iot-plugins-feature/iot-base-plugin-feature/org.wso2.carbon.device.mgt.iot.adapter.feature/src/main/resources/p2.inf index 7ab37b9d7..c1df69e78 100644 --- a/features/iot-plugins-feature/iot-base-plugin-feature/org.wso2.carbon.device.mgt.iot.adapter.feature/src/main/resources/p2.inf +++ b/features/iot-plugins-feature/iot-base-plugin-feature/org.wso2.carbon.device.mgt.iot.adapter.feature/src/main/resources/p2.inf @@ -1 +1,4 @@ -instructions.configure = \ \ No newline at end of file +instructions.configure = \ +org.eclipse.equinox.p2.touchpoint.natives.mkdir(path:${installFolder}/../../deployment/server/webapps/);\ +org.eclipse.equinox.p2.touchpoint.natives.copy(source:${installFolder}/../features/org.wso2.carbon.device.mgt.iot.adapter_${feature.version}/webapps/,target:${installFolder}/../../deployment/server/webapps/,overwrite:true);\ +org.eclipse.equinox.p2.touchpoint.natives.copy(source:${installFolder}/../features/org.wso2.carbon.device.mgt.iot.adapter_${feature.version}/websocket-validation.properties,target:${installFolder}/../../conf/etc/websocket-validation.properties,overwrite:true);\ diff --git a/features/iot-plugins-feature/iot-base-plugin-feature/org.wso2.carbon.device.mgt.iot.adapter.feature/src/main/resources/websocket-validation.properties b/features/iot-plugins-feature/iot-base-plugin-feature/org.wso2.carbon.device.mgt.iot.adapter.feature/src/main/resources/websocket-validation.properties new file mode 100644 index 000000000..f4b75e2a1 --- /dev/null +++ b/features/iot-plugins-feature/iot-base-plugin-feature/org.wso2.carbon.device.mgt.iot.adapter.feature/src/main/resources/websocket-validation.properties @@ -0,0 +1,25 @@ +# +# Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +# +# WSO2 Inc. licenses this file to you under the Apache License, +# Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +#This hold the properties that is used for token validation for the the websocket + +tokenValidationEndpoint=https://localhost:9443/services/OAuth2TokenValidationService +username=admin +password=admin +maximumHttpConnectionPerHost=2 +maximumTotalHttpConnection=100 \ No newline at end of file diff --git a/pom.xml b/pom.xml index 1c51c1fb1..458950867 100644 --- a/pom.xml +++ b/pom.xml @@ -329,6 +329,11 @@ org.wso2.carbon.analytics.api ${carbon.analytics.version} + + org.wso2.carbon.analytics-common + org.wso2.carbon.event.stream.core + ${carbon.analytics.common.version} + @@ -372,6 +377,11 @@ ${carbon.devicemgt.plugins.version} war + + org.wso2.carbon.devicemgt-plugins + org.wso2.carbon.device.mgt.iot.output.adapter.ui + ${carbon.devicemgt.plugins.version} + @@ -1013,6 +1023,23 @@ commons-pool ${commons.pool.wso2.version} + + + javax.websocket + javax.websocket-api + ${javax.websocket.version} + + + org.apache.tomcat + tomcat-websocket-api + ${tomcat.websocket.version} + provided + + + javax.ws.rs + javax.ws.rs-api + ${javax.version} + @@ -1146,6 +1173,11 @@ 1.5.6.wso2v1 + + 7.0.54 + 1.0 + 2.0 + github-scm