moved common configuration to global properties for input and output adapters

revert-dabc3590
ayyoob 8 years ago
parent 4cb6aca5b0
commit 1df7d20a81

@ -156,6 +156,7 @@
<dependency>
<groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>org.wso2.carbon.apimgt.annotations</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

@ -144,6 +144,7 @@
<dependency>
<groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>org.wso2.carbon.apimgt.annotations</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

@ -21,7 +21,7 @@
<parent>
<artifactId>virtual-fire-alarm-plugin</artifactId>
<groupId>org.wso2.carbon.devicemgt-plugins</groupId>
<version>3.0.5-SNAPSHOT</version>
<version>3.0.6-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

@ -238,6 +238,7 @@
<dependency>
<groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>org.wso2.carbon.apimgt.annotations</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>

@ -66,7 +66,7 @@ public class HTTPEventAdapterFactory extends InputEventAdapterFactory {
// Transport Exposed
Property exposedTransportsProperty = new Property(HTTPEventAdapterConstants.EXPOSED_TRANSPORTS);
exposedTransportsProperty.setRequired(true);
exposedTransportsProperty.setRequired(false);
exposedTransportsProperty.setDisplayName(
resourceBundle.getString(HTTPEventAdapterConstants.EXPOSED_TRANSPORTS));
exposedTransportsProperty.setOptions(

@ -56,8 +56,8 @@ public class MQTTEventAdapterConstants {
public static final String GRANT_TYPE = "password refresh_token";
public static final String TOKEN_SCOPE = "production";
public static final String APPLICATION_NAME_PREFIX = "InputAdapter_";
public static final String CLIENT_ID = "client_id";
public static final String CLIENT_SECRET = "client_secret";
public static final String CLIENT_ID = "clientId";
public static final String CLIENT_SECRET = "clientSecret";
public static final String CLIENT_NAME = "client_name";
public static final String DEFAULT = "default";
public static final String MQTT_CONTENT_VALIDATION_DEFAULT_PARAMETERS = "";

@ -175,7 +175,7 @@ public class MQTTAdapterPublisher {
JSONObject jsonPayload = (JSONObject) jsonParser.parse(response);
String clientId = (String) jsonPayload.get(MQTTEventAdapterConstants.CLIENT_ID);
String clientSecret = (String) jsonPayload.get(MQTTEventAdapterConstants.CLIENT_SECRET);
getToken(clientId, clientSecret);
return getToken(clientId, clientSecret);
}
} catch (ParseException e) {
String msg = "error occurred while parsing generating token for the adapter";

@ -30,7 +30,7 @@ public final class MQTTEventAdapterConstants {
public static final String ADAPTER_CONF_PASSWORD = "password";
public static final String ADAPTER_CONF_PASSWORD_HINT = "password.hint";
public static final String ADAPTER_CONF_DCR_URL = "dcrUrl";
public static final String ADAPTER_CONF_TOKEN_URL = "keymanagerUrl";
public static final String ADAPTER_CONF_TOKEN_URL = "tokenUrl";
public static final String ADAPTER_CONF_SCOPES = "scopes";
public static final String ADAPTER_CONF_SCOPES_HINT = "scopes.hint";
public static final String ADAPTER_CONF_URL_HINT = "url.hint";
@ -58,11 +58,11 @@ public final class MQTTEventAdapterConstants {
public static final String DEFAULT_CALLBACK = "";
public static final String DEFAULT_PASSWORD = "";
public static final String GRANT_TYPE = "grant_type";
public static final String GRANT_TYPE = "password";
public static final String TOKEN_SCOPE = "production";
public static final String APPLICATION_NAME_PREFIX = "OutputAdapter_";
public static final String CLIENT_ID = "client_id";
public static final String CLIENT_SECRET = "client_secret";
public static final String CLIENT_ID = "clientId";
public static final String CLIENT_SECRET = "clientSecret";
public static final String AUTHORIZATION_HEADER_NAME = "Authorization";

@ -52,7 +52,7 @@ public class SubscriptionEndpoint {
() +
", for request URI - " + session.getRequestURI());
}
ServiceHolder.getInstance().getUiOutputCallbackControllerService().unsubscribeWebsocket(streamName, version,
ServiceHolder.getInstance().getWebsocketOutputCallbackControllerService().unsubscribeWebsocket(streamName, version,
session);
}
@ -68,7 +68,7 @@ public class SubscriptionEndpoint {
log.error(
"Error occurred in session ID: " + session.getId() + ", for request URI - " + session.getRequestURI() +
", " + throwable.getMessage(), throwable);
ServiceHolder.getInstance().getUiOutputCallbackControllerService().unsubscribeWebsocket(streamName, version,
ServiceHolder.getInstance().getWebsocketOutputCallbackControllerService().unsubscribeWebsocket(streamName, version,
session);
}
}

@ -75,7 +75,7 @@ public class SuperTenantSubscriptionEndpoint extends SubscriptionEndpoint {
PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(
MultitenantConstants.SUPER_TENANT_ID);
ServiceHolder.getInstance().getUiOutputCallbackControllerService().subscribeWebsocket(streamName,
ServiceHolder.getInstance().getWebsocketOutputCallbackControllerService().subscribeWebsocket(streamName,
version,
session);
} finally {

@ -74,7 +74,7 @@ public class TenantSubscriptionEndpoint extends SubscriptionEndpoint {
try {
PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(tdomain, true);
ServiceHolder.getInstance().getUiOutputCallbackControllerService().subscribeWebsocket(streamName
ServiceHolder.getInstance().getWebsocketOutputCallbackControllerService().subscribeWebsocket(streamName
, version, session);
} finally {
PrivilegedCarbonContext.endTenantFlow();

@ -21,18 +21,18 @@ package org.wso2.carbon.device.mgt.output.adapter.websocket.endpoint.util;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.output.adapter.websocket.UIOutputCallbackControllerService;
import org.wso2.carbon.device.mgt.output.adapter.websocket.WebsocketOutputCallbackControllerService;
import org.wso2.carbon.device.mgt.output.adapter.websocket.service.WebsocketValidationService;
public class ServiceHolder {
private static ServiceHolder instance;
private UIOutputCallbackControllerService uiOutputCallbackControllerService;
private WebsocketOutputCallbackControllerService websocketOutputCallbackControllerService;
private static final Log log = LogFactory.getLog(ServiceHolder.class);
private ServiceHolder(){
uiOutputCallbackControllerService = (UIOutputCallbackControllerService) PrivilegedCarbonContext
.getThreadLocalCarbonContext().getOSGiService(UIOutputCallbackControllerService.class, null);
websocketOutputCallbackControllerService = (WebsocketOutputCallbackControllerService) PrivilegedCarbonContext
.getThreadLocalCarbonContext().getOSGiService(WebsocketOutputCallbackControllerService.class, null);
}
public synchronized static ServiceHolder getInstance() {
@ -42,8 +42,8 @@ public class ServiceHolder {
return instance;
}
public UIOutputCallbackControllerService getUiOutputCallbackControllerService() {
return uiOutputCallbackControllerService;
public WebsocketOutputCallbackControllerService getWebsocketOutputCallbackControllerService() {
return websocketOutputCallbackControllerService;
}
public static WebsocketValidationService getWebsocketValidationService() {

@ -28,8 +28,8 @@ import org.wso2.carbon.databridge.commons.Attribute;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.device.mgt.output.adapter.websocket.constants.WebsocketConstants;
import org.wso2.carbon.device.mgt.output.adapter.websocket.internal.UIEventAdaptorServiceDataHolder;
import org.wso2.carbon.device.mgt.output.adapter.websocket.util.UIEventAdapterConstants;
import org.wso2.carbon.device.mgt.output.adapter.websocket.internal.WebsocketEventAdaptorServiceDataHolder;
import org.wso2.carbon.device.mgt.output.adapter.websocket.util.WebsocketEventAdapterConstants;
import org.wso2.carbon.device.mgt.output.adapter.websocket.util.WebSocketSessionRequest;
import org.wso2.carbon.event.output.adapter.core.EventAdapterUtil;
import org.wso2.carbon.event.output.adapter.core.OutputEventAdapter;
@ -56,9 +56,9 @@ import java.util.concurrent.TimeUnit;
* Contains the life cycle of executions regarding the UI Adapter
*/
public class UIEventAdapter implements OutputEventAdapter {
public class WebsocketEventAdapter implements OutputEventAdapter {
private static final Log log = LogFactory.getLog(UIEventAdapter.class);
private static final Log log = LogFactory.getLog(WebsocketEventAdapter.class);
private OutputEventAdapterConfiguration eventAdapterConfiguration;
private Map<String, String> globalProperties;
private int queueSize;
@ -72,7 +72,7 @@ public class UIEventAdapter implements OutputEventAdapter {
private List<Attribute> streamCorrelationAttributes;
private List<Attribute> streamPayloadAttributes;
public UIEventAdapter(OutputEventAdapterConfiguration eventAdapterConfiguration, Map<String,
public WebsocketEventAdapter(OutputEventAdapterConfiguration eventAdapterConfiguration, Map<String,
String> globalProperties) {
this.eventAdapterConfiguration = eventAdapterConfiguration;
this.globalProperties = globalProperties;
@ -91,32 +91,32 @@ public class UIEventAdapter implements OutputEventAdapter {
int jobQueSize;
//If global properties are available those will be assigned else constant values will be assigned
if (globalProperties.get(UIEventAdapterConstants.ADAPTER_MIN_THREAD_POOL_SIZE_NAME) != null) {
if (globalProperties.get(WebsocketEventAdapterConstants.ADAPTER_MIN_THREAD_POOL_SIZE_NAME) != null) {
minThread = Integer.parseInt(globalProperties.get(
UIEventAdapterConstants.ADAPTER_MIN_THREAD_POOL_SIZE_NAME));
WebsocketEventAdapterConstants.ADAPTER_MIN_THREAD_POOL_SIZE_NAME));
} else {
minThread = UIEventAdapterConstants.ADAPTER_MIN_THREAD_POOL_SIZE;
minThread = WebsocketEventAdapterConstants.ADAPTER_MIN_THREAD_POOL_SIZE;
}
if (globalProperties.get(UIEventAdapterConstants.ADAPTER_MAX_THREAD_POOL_SIZE_NAME) != null) {
if (globalProperties.get(WebsocketEventAdapterConstants.ADAPTER_MAX_THREAD_POOL_SIZE_NAME) != null) {
maxThread = Integer.parseInt(globalProperties.get(
UIEventAdapterConstants.ADAPTER_MAX_THREAD_POOL_SIZE_NAME));
WebsocketEventAdapterConstants.ADAPTER_MAX_THREAD_POOL_SIZE_NAME));
} else {
maxThread = UIEventAdapterConstants.ADAPTER_MAX_THREAD_POOL_SIZE;
maxThread = WebsocketEventAdapterConstants.ADAPTER_MAX_THREAD_POOL_SIZE;
}
if (globalProperties.get(UIEventAdapterConstants.ADAPTER_KEEP_ALIVE_TIME_NAME) != null) {
if (globalProperties.get(WebsocketEventAdapterConstants.ADAPTER_KEEP_ALIVE_TIME_NAME) != null) {
defaultKeepAliveTime = Integer.parseInt(globalProperties.get(
UIEventAdapterConstants.ADAPTER_KEEP_ALIVE_TIME_NAME));
WebsocketEventAdapterConstants.ADAPTER_KEEP_ALIVE_TIME_NAME));
} else {
defaultKeepAliveTime = UIEventAdapterConstants.DEFAULT_KEEP_ALIVE_TIME_IN_MILLIS;
defaultKeepAliveTime = WebsocketEventAdapterConstants.DEFAULT_KEEP_ALIVE_TIME_IN_MILLIS;
}
if (globalProperties.get(UIEventAdapterConstants.ADAPTER_EXECUTOR_JOB_QUEUE_SIZE_NAME) != null) {
if (globalProperties.get(WebsocketEventAdapterConstants.ADAPTER_EXECUTOR_JOB_QUEUE_SIZE_NAME) != null) {
jobQueSize = Integer.parseInt(globalProperties.get(
UIEventAdapterConstants.ADAPTER_EXECUTOR_JOB_QUEUE_SIZE_NAME));
WebsocketEventAdapterConstants.ADAPTER_EXECUTOR_JOB_QUEUE_SIZE_NAME));
} else {
jobQueSize = UIEventAdapterConstants.ADAPTER_EXECUTOR_JOB_QUEUE_SIZE;
jobQueSize = WebsocketEventAdapterConstants.ADAPTER_EXECUTOR_JOB_QUEUE_SIZE;
}
executorService = new ThreadPoolExecutor(minThread, maxThread, defaultKeepAliveTime, TimeUnit.MILLISECONDS,
@ -137,7 +137,7 @@ public class UIEventAdapter implements OutputEventAdapter {
streamPayloadAttributes = streamDefinition.getPayloadData();
ConcurrentHashMap<Integer, ConcurrentHashMap<String, String>> tenantSpecifcEventOutputAdapterMap =
UIEventAdaptorServiceDataHolder.getTenantSpecificOutputEventStreamAdapterMap();
WebsocketEventAdaptorServiceDataHolder.getTenantSpecificOutputEventStreamAdapterMap();
ConcurrentHashMap<String, String> streamSpecifAdapterMap = tenantSpecifcEventOutputAdapterMap.get(tenantId);
@ -157,7 +157,7 @@ public class UIEventAdapter implements OutputEventAdapter {
streamSpecifAdapterMap.put(streamId, eventAdapterConfiguration.getName());
ConcurrentHashMap<Integer, ConcurrentHashMap<String, LinkedBlockingDeque<Object>>> tenantSpecificStreamMap =
UIEventAdaptorServiceDataHolder.getTenantSpecificStreamEventMap();
WebsocketEventAdaptorServiceDataHolder.getTenantSpecificStreamEventMap();
ConcurrentHashMap<String, LinkedBlockingDeque<Object>> streamSpecificEventsMap =
tenantSpecificStreamMap.get(tenantId);
if (streamSpecificEventsMap == null) {
@ -175,16 +175,16 @@ public class UIEventAdapter implements OutputEventAdapter {
}
}
if (globalProperties.get(UIEventAdapterConstants.ADAPTER_EVENT_QUEUE_SIZE_NAME) != null) {
if (globalProperties.get(WebsocketEventAdapterConstants.ADAPTER_EVENT_QUEUE_SIZE_NAME) != null) {
try {
queueSize = Integer.parseInt(
globalProperties.get(UIEventAdapterConstants.ADAPTER_EVENT_QUEUE_SIZE_NAME));
globalProperties.get(WebsocketEventAdapterConstants.ADAPTER_EVENT_QUEUE_SIZE_NAME));
} catch (NumberFormatException e) {
log.error("String does not have the appropriate format for conversion." + e.getMessage());
queueSize = UIEventAdapterConstants.EVENTS_QUEUE_SIZE;
queueSize = WebsocketEventAdapterConstants.EVENTS_QUEUE_SIZE;
}
} else {
queueSize = UIEventAdapterConstants.EVENTS_QUEUE_SIZE;
queueSize = WebsocketEventAdapterConstants.EVENTS_QUEUE_SIZE;
}
}
@ -256,9 +256,9 @@ public class UIEventAdapter implements OutputEventAdapter {
eventString = message.toString();
}
Object[] eventValues = new Object[UIEventAdapterConstants.INDEX_TWO];
eventValues[UIEventAdapterConstants.INDEX_ZERO] = eventString;
eventValues[UIEventAdapterConstants.INDEX_ONE] = System.currentTimeMillis();
Object[] eventValues = new Object[WebsocketEventAdapterConstants.INDEX_TWO];
eventValues[WebsocketEventAdapterConstants.INDEX_ZERO] = eventString;
eventValues[WebsocketEventAdapterConstants.INDEX_ONE] = System.currentTimeMillis();
streamSpecificEvents.add(eventValues);
// fetch all valid sessions checked against any queryParameters provided when subscribing.
@ -282,14 +282,14 @@ public class UIEventAdapter implements OutputEventAdapter {
public void destroy() {
int tenantId = CarbonContext.getThreadLocalCarbonContext().getTenantId();
ConcurrentHashMap<String, String> tenantSpecificAdapterMap = UIEventAdaptorServiceDataHolder
ConcurrentHashMap<String, String> tenantSpecificAdapterMap = WebsocketEventAdaptorServiceDataHolder
.getTenantSpecificOutputEventStreamAdapterMap().get(tenantId);
if (tenantSpecificAdapterMap != null && streamId != null) {
tenantSpecificAdapterMap.remove(streamId); //Removing outputadapter and streamId
}
ConcurrentHashMap<String, LinkedBlockingDeque<Object>> tenantSpecificStreamEventMap =
UIEventAdaptorServiceDataHolder.getTenantSpecificStreamEventMap().get(tenantId);
WebsocketEventAdaptorServiceDataHolder.getTenantSpecificStreamEventMap().get(tenantId);
if (tenantSpecificStreamEventMap != null && streamId != null) {
//Removing the streamId and events registered for the output adapter
tenantSpecificStreamEventMap.remove(streamId);
@ -310,7 +310,7 @@ public class UIEventAdapter implements OutputEventAdapter {
* the matching Steam-Definition for the given StreamId cannot be retrieved.
*/
private StreamDefinition getStreamDefinition(String streamId) throws OutputEventAdapterException {
EventStreamService eventStreamService = UIEventAdaptorServiceDataHolder.getEventStreamService();
EventStreamService eventStreamService = WebsocketEventAdaptorServiceDataHolder.getEventStreamService();
if (eventStreamService != null) {
try {
return eventStreamService.getStreamDefinition(streamId);
@ -337,11 +337,11 @@ public class UIEventAdapter implements OutputEventAdapter {
*/
private CopyOnWriteArrayList<WebSocketSessionRequest> getValidSessions(Object event) {
CopyOnWriteArrayList<WebSocketSessionRequest> validSessions = new CopyOnWriteArrayList<>();
UIOutputCallbackControllerServiceImpl uiOutputCallbackControllerServiceImpl =
UIEventAdaptorServiceDataHolder.getUIOutputCallbackRegisterServiceImpl();
WebsocketOutputCallbackControllerServiceImpl websocketOutputCallbackControllerServiceImpl =
WebsocketEventAdaptorServiceDataHolder.getUIOutputCallbackRegisterServiceImpl();
// get all subscribed web-socket sessions.
CopyOnWriteArrayList<WebSocketSessionRequest> webSocketSessionUtils =
uiOutputCallbackControllerServiceImpl.getSessions(tenantId, streamId);
websocketOutputCallbackControllerServiceImpl.getSessions(tenantId, streamId);
if (webSocketSessionUtils != null) {
for (WebSocketSessionRequest webSocketSessionUtil : webSocketSessionUtils) {
boolean isValidSession;

@ -25,7 +25,7 @@ import org.wso2.carbon.device.mgt.output.adapter.websocket.authentication.Authen
import org.wso2.carbon.device.mgt.output.adapter.websocket.authorization.Authorizer;
import org.wso2.carbon.device.mgt.output.adapter.websocket.service.WebsocketValidationService;
import org.wso2.carbon.device.mgt.output.adapter.websocket.service.WebsocketValidationServiceImpl;
import org.wso2.carbon.device.mgt.output.adapter.websocket.util.UIEventAdapterConstants;
import org.wso2.carbon.device.mgt.output.adapter.websocket.util.WebsocketEventAdapterConstants;
import org.wso2.carbon.event.output.adapter.core.MessageType;
import org.wso2.carbon.event.output.adapter.core.OutputEventAdapter;
import org.wso2.carbon.event.output.adapter.core.OutputEventAdapterConfiguration;
@ -41,20 +41,20 @@ import java.util.ResourceBundle;
/**
* The UI event adapter factory class to create a UI output adapter
*/
public class UIEventAdapterFactory extends OutputEventAdapterFactory {
public class WebsocketEventAdapterFactory extends OutputEventAdapterFactory {
private ResourceBundle resourceBundle = ResourceBundle.getBundle("org.wso2.carbon.device.mgt.output.adapter.websocket.i18n" +
".Resources", Locale.getDefault());
private BundleContext bundleContext;
private boolean isAuthInitialized = false;
private static final Log log = LogFactory.getLog(UIEventAdapter.class);
private static final Log log = LogFactory.getLog(WebsocketEventAdapter.class);
public UIEventAdapterFactory() {
public WebsocketEventAdapterFactory() {
}
@Override
public String getType() {
return UIEventAdapterConstants.ADAPTER_TYPE_UI;
return WebsocketEventAdapterConstants.ADAPTER_TYPE_UI;
}
@Override
@ -77,8 +77,8 @@ public class UIEventAdapterFactory extends OutputEventAdapterFactory {
@Override
public String getUsageTips() {
return resourceBundle.getString(UIEventAdapterConstants.ADAPTER_USAGE_TIPS_PREFIX) + " "
+ resourceBundle.getString(UIEventAdapterConstants.ADAPTER_USAGE_TIPS_POSTFIX);
return resourceBundle.getString(WebsocketEventAdapterConstants.ADAPTER_USAGE_TIPS_PREFIX) + " "
+ resourceBundle.getString(WebsocketEventAdapterConstants.ADAPTER_USAGE_TIPS_POSTFIX);
}
@Override
@ -87,7 +87,7 @@ public class UIEventAdapterFactory extends OutputEventAdapterFactory {
if (!isAuthInitialized) {
initializeAuthenticatorAndAuthorizor(globalProperties);
}
return new UIEventAdapter(eventAdapterConfiguration, globalProperties);
return new WebsocketEventAdapter(eventAdapterConfiguration, globalProperties);
}
public BundleContext getBundleContext() {
@ -100,24 +100,26 @@ public class UIEventAdapterFactory extends OutputEventAdapterFactory {
private void initializeAuthenticatorAndAuthorizor (Map<String, String> globalProperties) {
if (!isAuthInitialized) {
synchronized (UIEventAdapterFactory.class) {
synchronized (WebsocketEventAdapterFactory.class) {
if (!isAuthInitialized) {
try {
WebsocketValidationServiceImpl websocketValidationService =
new WebsocketValidationServiceImpl();
String authenticatorClassName = globalProperties.get(
UIEventAdapterConstants.AUTHENTICATOR_CLASS);
String authorizerClassName = globalProperties.get(UIEventAdapterConstants.AUTHORIZER_CLASS);
WebsocketEventAdapterConstants.AUTHENTICATOR_CLASS);
String authorizerClassName = globalProperties.get(WebsocketEventAdapterConstants.AUTHORIZER_CLASS);
if (authenticatorClassName != null && !authenticatorClassName.isEmpty()) {
Class<? extends Authenticator> authenticatorClass = Class.forName(authenticatorClassName)
.asSubclass(Authenticator.class);
Authenticator authenticator = authenticatorClass.newInstance();
authenticator.init(globalProperties);
websocketValidationService.setAuthenticator(authenticator);
}
if (authorizerClassName != null && !authorizerClassName.isEmpty()) {
Class<? extends Authorizer> authorizerClass = Class.forName(authorizerClassName)
.asSubclass(Authorizer.class);
Authorizer authorizer = authorizerClass.newInstance();
authorizer.init(globalProperties);
websocketValidationService.setAuthorizer(authorizer);
}
bundleContext.registerService(

@ -24,7 +24,7 @@ import javax.websocket.Session;
/**
* This interface is exposed as an OSGI service, which will be invoked by the local websocket endpoint to inform new subscriptions; and do un-subscriptions..
*/
public interface UIOutputCallbackControllerService {
public interface WebsocketOutputCallbackControllerService {
/**
* Used to subscribe the session id and stream id for later web socket connectivity

@ -19,9 +19,9 @@
package org.wso2.carbon.device.mgt.output.adapter.websocket;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.output.adapter.websocket.internal.UIEventAdaptorServiceDataHolder;
import org.wso2.carbon.device.mgt.output.adapter.websocket.internal.WebsocketEventAdaptorServiceDataHolder;
import org.wso2.carbon.device.mgt.output.adapter.websocket.util.WebSocketSessionRequest;
import org.wso2.carbon.device.mgt.output.adapter.websocket.util.UIEventAdapterConstants;
import org.wso2.carbon.device.mgt.output.adapter.websocket.util.WebsocketEventAdapterConstants;
import javax.websocket.Session;
import java.util.Iterator;
@ -32,12 +32,12 @@ import java.util.concurrent.LinkedBlockingDeque;
/**
* Service implementation class which exposes to front end
*/
public class UIOutputCallbackControllerServiceImpl implements UIOutputCallbackControllerService {
public class WebsocketOutputCallbackControllerServiceImpl implements WebsocketOutputCallbackControllerService {
private ConcurrentHashMap<Integer, ConcurrentHashMap<String, CopyOnWriteArrayList<WebSocketSessionRequest>>>
outputEventAdaptorSessionMap;
public UIOutputCallbackControllerServiceImpl() {
public WebsocketOutputCallbackControllerServiceImpl() {
outputEventAdaptorSessionMap = new ConcurrentHashMap<>();
}
@ -53,9 +53,9 @@ public class UIOutputCallbackControllerServiceImpl implements UIOutputCallbackCo
int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
if (version == null || " ".equals(version)) {
version = UIEventAdapterConstants.ADAPTER_UI_DEFAULT_OUTPUT_STREAM_VERSION;
version = WebsocketEventAdapterConstants.ADAPTER_UI_DEFAULT_OUTPUT_STREAM_VERSION;
}
String streamId = streamName + UIEventAdapterConstants.ADAPTER_UI_COLON + version;
String streamId = streamName + WebsocketEventAdapterConstants.ADAPTER_UI_COLON + version;
ConcurrentHashMap<String, CopyOnWriteArrayList<WebSocketSessionRequest>> tenantSpecificAdaptorMap =
outputEventAdaptorSessionMap.get(tenantId);
if (tenantSpecificAdaptorMap == null) {
@ -102,9 +102,9 @@ public class UIOutputCallbackControllerServiceImpl implements UIOutputCallbackCo
*/
public LinkedBlockingDeque<Object> getEvents(int tenanId, String streamName, String version) {
ConcurrentHashMap<String, LinkedBlockingDeque<Object>> tenantSpecificStreamMap =
UIEventAdaptorServiceDataHolder.getTenantSpecificStreamEventMap().get(tenanId);
WebsocketEventAdaptorServiceDataHolder.getTenantSpecificStreamEventMap().get(tenanId);
if (tenantSpecificStreamMap != null) {
String streamId = streamName + UIEventAdapterConstants.ADAPTER_UI_COLON + version;
String streamId = streamName + WebsocketEventAdapterConstants.ADAPTER_UI_COLON + version;
return tenantSpecificStreamMap.get(streamId);
}
return null;
@ -120,9 +120,9 @@ public class UIOutputCallbackControllerServiceImpl implements UIOutputCallbackCo
public void unsubscribeWebsocket(String streamName, String version, Session session) {
int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
if (version == null || " ".equals(version)) {
version = UIEventAdapterConstants.ADAPTER_UI_DEFAULT_OUTPUT_STREAM_VERSION;
version = WebsocketEventAdapterConstants.ADAPTER_UI_DEFAULT_OUTPUT_STREAM_VERSION;
}
String id = streamName + UIEventAdapterConstants.ADAPTER_UI_COLON + version;
String id = streamName + WebsocketEventAdapterConstants.ADAPTER_UI_COLON + version;
ConcurrentHashMap<String, CopyOnWriteArrayList<WebSocketSessionRequest>> tenantSpecificAdaptorMap
= outputEventAdaptorSessionMap.get(tenantId);
if (tenantSpecificAdaptorMap != null) {

@ -94,9 +94,9 @@ public class OAuthTokenValidaterStubFactory extends BasePoolableObjectFactory {
private OAuth2TokenValidationServiceStub generateStub() throws OAuthTokenValidationException {
OAuth2TokenValidationServiceStub stub;
try {
URL hostURL = new URL(PropertyUtils.replaceMqttProperty(tokenValidationProperties.get(
URL hostURL = new URL(PropertyUtils.replaceProperty(tokenValidationProperties.get(
(WebsocketConstants.TOKEN_VALIDATION_ENDPOINT_URL)))
+ WebsocketConstants.TOKEN_VALIDATION_CONTEXT);
+ WebsocketConstants.TOKEN_VALIDATION_CONTEX);
stub = new OAuth2TokenValidationServiceStub(hostURL.toString());
ServiceClient client = stub._getServiceClient();
client.getServiceContext().getConfigurationContext().setProperty(

@ -111,7 +111,7 @@ public class DeviceAuthorizer implements Authorizer {
}
private String getDeviceMgtServerUrl(Map<String, String> properties) throws OutputEventAdapterException {
String deviceMgtServerUrl = PropertyUtils.replaceMqttProperty(properties.get(DEVICE_MGT_SERVER_URL));
String deviceMgtServerUrl = PropertyUtils.replaceProperty(properties.get(DEVICE_MGT_SERVER_URL));
if (deviceMgtServerUrl == null || deviceMgtServerUrl.isEmpty()) {
logger.error("deviceMgtServerUrl can't be empty ");
}

@ -28,6 +28,8 @@ import org.wso2.carbon.device.mgt.output.adapter.websocket.authorization.client.
import org.wso2.carbon.device.mgt.output.adapter.websocket.authorization.client.dto.ApiApplicationRegistrationService;
import org.wso2.carbon.device.mgt.output.adapter.websocket.authorization.client.dto.ApiRegistrationProfile;
import org.wso2.carbon.device.mgt.output.adapter.websocket.authorization.client.dto.TokenIssuerService;
import org.wso2.carbon.device.mgt.output.adapter.websocket.util.PropertyUtils;
import org.wso2.carbon.event.output.adapter.core.exception.OutputEventAdapterException;
import java.util.Map;
@ -50,14 +52,16 @@ public class OAuthRequestInterceptor implements RequestInterceptor {
private static final String CONNECTION_USERNAME = "username";
private static final String CONNECTION_PASSWORD = "password";
private static final String TOKEN_ENDPOINT = "keymanagerUrl";
private static final String TOKEN_ENDPOINT = "tokenUrl";
private static final String TOKEN_REFRESH_TIME_OFFSET = "tokenRefreshTimeOffset";
private static final String TOKEN_SCOPES = "scopes";
private static final String DEVICE_MGT_SERVER_URL = "deviceMgtServerUrl";
private static final String TOKEN_ENDPOINT_CONTEXT = "tokenEndpointContext";
private static final String TOKEN_ENDPOINT_CONTEXT = "tokenUrl";
private static String username;
private static String password;
private static String tokenEndpoint;
private static String deviceMgtServerUrl;
private static String scopes;
private static Map<String, String> globalProperties;
@ -66,16 +70,22 @@ public class OAuthRequestInterceptor implements RequestInterceptor {
*/
public OAuthRequestInterceptor(Map<String, String> globalProperties) {
this.globalProperties = globalProperties;
deviceMgtServerUrl = getDeviceMgtServerUrl(globalProperties);
refreshTimeOffset = getRefreshTimeOffset(globalProperties);
username = getUsername(globalProperties);
password = getPassword(globalProperties);
tokenEndpoint = getTokenEndpoint(globalProperties);
apiApplicationRegistrationService = Feign.builder().requestInterceptor(
new BasicAuthRequestInterceptor(username, password))
.contract(new JAXRSContract()).encoder(new GsonEncoder()).decoder(new GsonDecoder())
.target(ApiApplicationRegistrationService.class,
deviceMgtServerUrl + API_APPLICATION_REGISTRATION_CONTEXT);
try {
deviceMgtServerUrl = getDeviceMgtServerUrl(globalProperties);
refreshTimeOffset = getRefreshTimeOffset(globalProperties);
username = getUsername(globalProperties);
password = getPassword(globalProperties);
tokenEndpoint = getTokenEndpoint(globalProperties);
scopes = getScopes(globalProperties);
apiApplicationRegistrationService = Feign.builder().requestInterceptor(
new BasicAuthRequestInterceptor(username, password))
.contract(new JAXRSContract()).encoder(new GsonEncoder()).decoder(new GsonDecoder())
.target(ApiApplicationRegistrationService.class,
deviceMgtServerUrl + API_APPLICATION_REGISTRATION_CONTEXT);
} catch (OutputEventAdapterException e) {
logger.error("Invalid url: deviceMgtServerUrl" + deviceMgtServerUrl + " or tokenEndpoint:" + tokenEndpoint,
e);
}
}
@Override
@ -94,7 +104,11 @@ public class OAuthRequestInterceptor implements RequestInterceptor {
new BasicAuthRequestInterceptor(consumerKey, consumerSecret))
.contract(new JAXRSContract()).encoder(new GsonEncoder()).decoder(new GsonDecoder())
.target(TokenIssuerService.class, tokenEndpoint);
tokenInfo = tokenIssuerService.getToken(PASSWORD_GRANT_TYPE, username, password);
if (scopes == null || scopes.isEmpty()) {
tokenInfo = tokenIssuerService.getToken(PASSWORD_GRANT_TYPE, username, password);
} else {
tokenInfo = tokenIssuerService.getToken(PASSWORD_GRANT_TYPE, username, password, scopes);
}
tokenInfo.setExpires_in(System.currentTimeMillis() + tokenInfo.getExpires_in());
}
synchronized(this) {
@ -123,20 +137,20 @@ public class OAuthRequestInterceptor implements RequestInterceptor {
return password;
}
private String getDeviceMgtServerUrl(Map<String, String> globalProperties) {
private String getDeviceMgtServerUrl(Map<String, String> globalProperties) throws OutputEventAdapterException {
String deviceMgtServerUrl = globalProperties.get(DEVICE_MGT_SERVER_URL);
if (deviceMgtServerUrl == null || deviceMgtServerUrl.isEmpty()) {
logger.error("deviceMgtServerUrl can't be empty ");
}
return deviceMgtServerUrl;
return PropertyUtils.replaceProperty(deviceMgtServerUrl);
}
private String getTokenEndpoint(Map<String, String> globalProperties) {
String tokenEndpoint = globalProperties.get(TOKEN_ENDPOINT) + globalProperties.get(TOKEN_ENDPOINT_CONTEXT);
private String getTokenEndpoint(Map<String, String> globalProperties) throws OutputEventAdapterException {
String tokenEndpoint = globalProperties.get(TOKEN_ENDPOINT_CONTEXT);
if ( tokenEndpoint.isEmpty()) {
logger.error("tokenEndpoint can't be empty ");
}
return tokenEndpoint;
return PropertyUtils.replaceProperty(tokenEndpoint);
}
private long getRefreshTimeOffset(Map<String, String> globalProperties) {
@ -149,5 +163,9 @@ public class OAuthRequestInterceptor implements RequestInterceptor {
return refreshTimeOffset;
}
private String getScopes(Map<String, String> globalProperties) {
return globalProperties.get(TOKEN_SCOPES);
}
}

@ -36,7 +36,6 @@ import javax.ws.rs.core.MediaType;
/**
* This hold the api defintion that is used as a contract with netflix feign.
*/
@Path("/token")
public interface TokenIssuerService {
@POST
@ -45,6 +44,12 @@ public interface TokenIssuerService {
AccessTokenInfo getToken(@QueryParam("grant_type") String grant, @QueryParam("username") String username,
@QueryParam("password") String password);
@POST
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_FORM_URLENCODED)
AccessTokenInfo getToken(@QueryParam("grant_type") String grant, @QueryParam("username") String username,
@QueryParam("password") String password, @QueryParam("scopes") String scopes);
@POST
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_FORM_URLENCODED)

@ -30,7 +30,7 @@ public class WebsocketConstants {
public static final String MAXIMUM_TOTAL_HTTP_CONNECTION = "maximumTotalHttpConnection";
public static final String MAXIMUM_HTTP_CONNECTION_PER_HOST = "maximumHttpConnectionPerHost";
public static final String TOKEN_VALIDATION_ENDPOINT_URL = "keymanagerUrl";
public static final String TOKEN_VALIDATION_CONTEXT = "/services/OAuth2TokenValidationService";
public static final String TOKEN_VALIDATION_CONTEX = "/services/OAuth2TokenValidationService";
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
public static final String TOKEN_PARAM = "token";

@ -18,7 +18,7 @@
*/
package org.wso2.carbon.device.mgt.output.adapter.websocket.internal;
import org.wso2.carbon.device.mgt.output.adapter.websocket.UIOutputCallbackControllerServiceImpl;
import org.wso2.carbon.device.mgt.output.adapter.websocket.WebsocketOutputCallbackControllerServiceImpl;
import org.wso2.carbon.event.stream.core.EventStreamService;
import java.util.concurrent.ConcurrentHashMap;
@ -27,9 +27,9 @@ import java.util.concurrent.LinkedBlockingDeque;
/**
* Creates a holder of type UIOutputCallbackRegisterServiceImpl.
*/
public final class UIEventAdaptorServiceDataHolder {
public final class WebsocketEventAdaptorServiceDataHolder {
private static UIOutputCallbackControllerServiceImpl UIOutputCallbackRegisterServiceImpl;
private static WebsocketOutputCallbackControllerServiceImpl UIOutputCallbackRegisterServiceImpl;
private static ConcurrentHashMap<Integer, ConcurrentHashMap<String, String>>
tenantSpecificOutputEventStreamAdapterMap = new ConcurrentHashMap<>();
private static ConcurrentHashMap<Integer, ConcurrentHashMap<String, LinkedBlockingDeque<Object>>>
@ -37,21 +37,21 @@ public final class UIEventAdaptorServiceDataHolder {
private static EventStreamService eventStreamService;
public static void registerEventStreamService(EventStreamService eventBuilderService) {
UIEventAdaptorServiceDataHolder.eventStreamService = eventBuilderService;
WebsocketEventAdaptorServiceDataHolder.eventStreamService = eventBuilderService;
}
public static EventStreamService getEventStreamService() {
return UIEventAdaptorServiceDataHolder.eventStreamService;
return WebsocketEventAdaptorServiceDataHolder.eventStreamService;
}
public static void registerUIOutputCallbackRegisterServiceInternal(
UIOutputCallbackControllerServiceImpl UIOutputCallbackRegisterServiceImpl) {
UIEventAdaptorServiceDataHolder.UIOutputCallbackRegisterServiceImpl =
WebsocketOutputCallbackControllerServiceImpl UIOutputCallbackRegisterServiceImpl) {
WebsocketEventAdaptorServiceDataHolder.UIOutputCallbackRegisterServiceImpl =
UIOutputCallbackRegisterServiceImpl;
}
public static UIOutputCallbackControllerServiceImpl getUIOutputCallbackRegisterServiceImpl() {
return UIEventAdaptorServiceDataHolder.UIOutputCallbackRegisterServiceImpl;
public static WebsocketOutputCallbackControllerServiceImpl getUIOutputCallbackRegisterServiceImpl() {
return WebsocketEventAdaptorServiceDataHolder.UIOutputCallbackRegisterServiceImpl;
}
public static ConcurrentHashMap<Integer,ConcurrentHashMap<String, String>>

@ -21,21 +21,21 @@ package org.wso2.carbon.device.mgt.output.adapter.websocket.internal;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.osgi.service.component.ComponentContext;
import org.wso2.carbon.device.mgt.output.adapter.websocket.UIEventAdapterFactory;
import org.wso2.carbon.device.mgt.output.adapter.websocket.UIOutputCallbackControllerServiceImpl;
import org.wso2.carbon.device.mgt.output.adapter.websocket.WebsocketEventAdapterFactory;
import org.wso2.carbon.device.mgt.output.adapter.websocket.WebsocketOutputCallbackControllerServiceImpl;
import org.wso2.carbon.event.output.adapter.core.OutputEventAdapterFactory;
import org.wso2.carbon.device.mgt.output.adapter.websocket.UIOutputCallbackControllerService;
import org.wso2.carbon.device.mgt.output.adapter.websocket.WebsocketOutputCallbackControllerService;
import org.wso2.carbon.event.stream.core.EventStreamService;
/**
* @scr.component component.name="output.extensions.Ui.AdapterService.component" immediate="true"
* @scr.component component.name="output.extensions.secured.websocket.AdapterService.component" immediate="true"
* @scr.reference name="eventStreamService.service"
* interface="org.wso2.carbon.event.stream.core.EventStreamService" cardinality="1..1"
* policy="dynamic" bind="setEventStreamService" unbind="unsetEventStreamService"
*/
public class UILocalEventAdapterServiceComponent {
public class WebsocketLocalEventAdapterServiceComponent {
private static final Log log = LogFactory.getLog(UILocalEventAdapterServiceComponent.class);
private static final Log log = LogFactory.getLog(WebsocketLocalEventAdapterServiceComponent.class);
/**
* initialize the websocket adapter service here service here.
@ -45,16 +45,17 @@ public class UILocalEventAdapterServiceComponent {
protected void activate(ComponentContext context) {
try {
UIEventAdapterFactory uiEventAdapterFactory = new UIEventAdapterFactory();
context.getBundleContext().registerService(OutputEventAdapterFactory.class.getName(), uiEventAdapterFactory, null);
UIOutputCallbackControllerServiceImpl UIOutputCallbackRegisterServiceImpl =
new UIOutputCallbackControllerServiceImpl();
context.getBundleContext().registerService(UIOutputCallbackControllerService.class.getName(),
WebsocketEventAdapterFactory websocketEventAdapterFactory = new WebsocketEventAdapterFactory();
context.getBundleContext().registerService(OutputEventAdapterFactory.class.getName()
, websocketEventAdapterFactory, null);
WebsocketOutputCallbackControllerServiceImpl UIOutputCallbackRegisterServiceImpl =
new WebsocketOutputCallbackControllerServiceImpl();
context.getBundleContext().registerService(WebsocketOutputCallbackControllerService.class.getName(),
UIOutputCallbackRegisterServiceImpl, null);
uiEventAdapterFactory.setBundleContext(context.getBundleContext());
websocketEventAdapterFactory.setBundleContext(context.getBundleContext());
UIEventAdaptorServiceDataHolder.registerUIOutputCallbackRegisterServiceInternal(
WebsocketEventAdaptorServiceDataHolder.registerUIOutputCallbackRegisterServiceInternal(
UIOutputCallbackRegisterServiceImpl);
if (log.isDebugEnabled()) {
log.debug("Successfully deployed the output websocket adapter service");
@ -70,13 +71,13 @@ public class UILocalEventAdapterServiceComponent {
if (log.isDebugEnabled()) {
log.debug("Setting the EventStreamService reference for the UILocalEventAdaptor Service");
}
UIEventAdaptorServiceDataHolder.registerEventStreamService(eventStreamService);
WebsocketEventAdaptorServiceDataHolder.registerEventStreamService(eventStreamService);
}
protected void unsetEventStreamService(EventStreamService eventStreamService) {
if (log.isDebugEnabled()) {
log.debug("Un-Setting the EventStreamService reference for the UILocalEventAdaptor Service");
}
UIEventAdaptorServiceDataHolder.registerEventStreamService(null);
WebsocketEventAdaptorServiceDataHolder.registerEventStreamService(null);
}
}

@ -26,7 +26,7 @@ import java.util.regex.Pattern;
public class PropertyUtils {
//This method is only used if the mb features are within DAS.
public static String replaceMqttProperty(String urlWithPlaceholders) throws OutputEventAdapterException {
public static String replaceProperty(String urlWithPlaceholders) throws OutputEventAdapterException {
String regex = "\\$\\{(.*?)\\}";
Pattern pattern = Pattern.compile(regex);
Matcher matchPattern = pattern.matcher(urlWithPlaceholders);

@ -21,9 +21,9 @@ package org.wso2.carbon.device.mgt.output.adapter.websocket.util;
/**
* This class contains the constants related to websocket Output Event Adaptor.
*/
public class UIEventAdapterConstants {
public class WebsocketEventAdapterConstants {
private UIEventAdapterConstants() {
private WebsocketEventAdapterConstants() {
}
public static final String ADAPTER_TYPE_UI = "secured-websocket";

@ -1244,7 +1244,7 @@
<javax.ws.rs.version>1.1.1</javax.ws.rs.version>
<!-- Carbon Device Management -->
<carbon.devicemgt.version>2.0.6</carbon.devicemgt.version>
<carbon.devicemgt.version>2.0.7-SNAPSHOT</carbon.devicemgt.version>
<carbon.devicemgt.version.range>[2.0.0, 3.0.0)</carbon.devicemgt.version.range>
<!-- Carbon App Management -->

Loading…
Cancel
Save