fixed multi tenant issues after testing thrift endpoint with multitenant

revert-dabc3590
ayyoob 8 years ago
parent e4247a98ff
commit b32bf11c91

@ -19,7 +19,7 @@
<eventReceiver name="Geo-Receiver-WSO2Event-LocationStream" <eventReceiver name="Geo-Receiver-WSO2Event-LocationStream"
trace="disable" xmlns="http://wso2.org/carbon/eventreceiver"> trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
<from eventAdapterType="wso2event"> <from eventAdapterType="iot-event">
<property name="events.duplicated.in.cluster">false</property> <property name="events.duplicated.in.cluster">false</property>
</from> </from>
<mapping customMapping="disable" type="wso2event"/> <mapping customMapping="disable" type="wso2event"/>

@ -18,9 +18,9 @@
--> -->
<eventReceiver name="arduino_receiver" statistics="disable" trace="disable" <eventReceiver name="arduino_receiver" statistics="disable" trace="disable"
xmlns="http://wso2.org/carbon/eventreceiver"> xmlns="http://wso2.org/carbon/eventreceiver">
<from eventAdapterType="oauth-http"> <from eventAdapterType="iot-event">
<property name="contentValidator">iot-http</property> <property name="events.duplicated.in.cluster">false</property>
</from> </from>
<mapping customMapping="disable" type="json"/> <mapping customMapping="disable" type="wso2event"/>
<to streamName="org.wso2.iot.arduino" version="1.0.0"/> <to streamName="org.wso2.iot.arduino" version="1.0.0"/>
</eventReceiver> </eventReceiver>

@ -120,7 +120,7 @@ public class MQTTEventAdapterFactory extends InputEventAdapterFactory {
// set clientId // set clientId
Property clientId = new Property(MQTTEventAdapterConstants.ADAPTER_CONF_CLIENTID); Property clientId = new Property(MQTTEventAdapterConstants.ADAPTER_CONF_CLIENTID);
clientId.setDisplayName(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_CLIENTID)); clientId.setDisplayName(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_CLIENTID));
clientId.setRequired(true); clientId.setRequired(false);
clientId.setHint(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_CLIENTID_HINT)); clientId.setHint(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_CLIENTID_HINT));
propertyList.add(clientId); propertyList.add(clientId);

@ -46,7 +46,7 @@ public class MQTTEventAdapterConstants {
public static final String ADAPTER_CONF_CLEAN_SESSION = "cleanSession"; public static final String ADAPTER_CONF_CLEAN_SESSION = "cleanSession";
public static final String ADAPTER_CONF_CLEAN_SESSION_HINT = "cleanSession.hint"; public static final String ADAPTER_CONF_CLEAN_SESSION_HINT = "cleanSession.hint";
public static final String ADAPTER_CONF_KEEP_ALIVE = "keepAlive"; public static final String ADAPTER_CONF_KEEP_ALIVE = "keepAlive";
public static final int ADAPTER_CONF_DEFAULT_KEEP_ALIVE = 60000; public static final int ADAPTER_CONF_DEFAULT_KEEP_ALIVE = 20000;
public static final int INITIAL_RECONNECTION_DURATION = 4000; public static final int INITIAL_RECONNECTION_DURATION = 4000;
public static final int RECONNECTION_PROGRESS_FACTOR = 2; public static final int RECONNECTION_PROGRESS_FACTOR = 2;

@ -0,0 +1,134 @@
<?xml version="1.0" encoding="utf-8"?>
<!--
~ Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
~
~ WSO2 Inc. licenses this file to you under the Apache License,
~ Version 2.0 (the "License"); you may not use this file except
~ in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<groupId>org.wso2.carbon.devicemgt-plugins</groupId>
<artifactId>cdmf-transport-adapters</artifactId>
<version>3.0.37-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>org.wso2.carbon.device.mgt.input.adapter.thrift</artifactId>
<packaging>bundle</packaging>
<name>WSO2 Carbon - Event Input Thrift Adapter Module</name>
<description>org.wso2.carbon.event.input.adapter.thrift provides the back-end
functionality of input wso2event adapter
</description>
<url>http://wso2.org</url>
<dependencies>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.event.input.adapter.core</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.event.input.adapter.wso2event</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.carbon</groupId>
<artifactId>org.wso2.carbon.logging</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.carbon</groupId>
<artifactId>org.wso2.carbon.core</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.databridge.commons</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.databridge.commons.thrift</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.databridge.commons.binary</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.databridge.core</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-scr-plugin</artifactId>
<executions>
<execution>
<id>generate-scr-descriptor</id>
<goals>
<goal>scr</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<extensions>true</extensions>
<configuration>
<instructions>
<Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
<Bundle-Name>${project.artifactId}</Bundle-Name>
<Private-Package>
org.wso2.carbon.device.mgt.input.adapter.thrift.internal,
org.wso2.carbon.device.mgt.input.adapter.thrift.internal.*,
</Private-Package>
<Export-Package>
!org.wso2.carbon.device.mgt.input.adapter.thrift.internal,
!org.wso2.carbon.device.mgt.input.adapter.thrift.internal.*,
org.wso2.carbon.device.mgt.input.adapter.thrift.*,
</Export-Package>
<Import-Package>
org.wso2.carbon.event.input.adapter.core,
org.wso2.carbon.event.input.adapter.core.*,
!javax.xml.namespace,
javax.xml.namespace; version=0.0.0,
org.apache.commons.logging,
org.osgi.framework,
org.osgi.service.component,
org.wso2.carbon.context,
org.wso2.carbon.databridge.commons,
org.wso2.carbon.databridge.core,
org.wso2.carbon.event.input.adapter.wso2event,
org.apache.axis2.context,
org.wso2.carbon.core.multitenancy.utils,
org.wso2.carbon.utils
</Import-Package>
</instructions>
</configuration>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,90 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy
* of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.wso2.carbon.device.mgt.input.adapter.thrift;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.input.adapter.thrift.internal.ThriftEventAdapterServiceHolder;
import org.wso2.carbon.event.input.adapter.core.EventAdapterConstants;
import org.wso2.carbon.event.input.adapter.core.InputEventAdapter;
import org.wso2.carbon.event.input.adapter.core.InputEventAdapterConfiguration;
import org.wso2.carbon.event.input.adapter.core.InputEventAdapterListener;
import org.wso2.carbon.event.input.adapter.core.exception.InputEventAdapterException;
import org.wso2.carbon.event.input.adapter.core.exception.TestConnectionNotSupportedException;
import java.util.Map;
public final class ThriftAdapter implements InputEventAdapter {
private static final Log log = LogFactory.getLog(ThriftAdapter.class);
private final InputEventAdapterConfiguration eventAdapterConfiguration;
private final Map<String, String> globalProperties;
private InputEventAdapterListener eventAdaptorListener;
public ThriftAdapter(InputEventAdapterConfiguration eventAdapterConfiguration,
Map<String, String> globalProperties) {
this.eventAdapterConfiguration = eventAdapterConfiguration;
this.globalProperties = globalProperties;
}
@Override
public void init(InputEventAdapterListener eventAdaptorListener) throws InputEventAdapterException {
this.eventAdaptorListener = eventAdaptorListener;
}
@Override
public void testConnect() throws TestConnectionNotSupportedException {
throw new TestConnectionNotSupportedException("not-supported");
}
@Override
public void connect() {
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(true);
String streamId = eventAdapterConfiguration.getInputStreamIdOfWso2eventMessageFormat();
ThriftEventAdapterServiceHolder.registerAdapterService(tenantDomain, streamId, this);
}
@Override
public void disconnect() {
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(true);
String streamId = eventAdapterConfiguration.getInputStreamIdOfWso2eventMessageFormat();
ThriftEventAdapterServiceHolder.unregisterAdapterService(tenantDomain, streamId, this);
}
@Override
public void destroy() {
}
@Override
public boolean isEventDuplicatedInCluster() {
return Boolean.parseBoolean(eventAdapterConfiguration.getProperties().get(
EventAdapterConstants.EVENTS_DUPLICATED_IN_CLUSTER));
}
@Override
public boolean isPolling() {
return false;
}
public String getEventAdapterName() {
return eventAdapterConfiguration.getName();
}
public InputEventAdapterListener getEventAdaptorListener() {
return eventAdaptorListener;
}
}

@ -0,0 +1,41 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy
* of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.wso2.carbon.device.mgt.input.adapter.thrift;
import org.wso2.carbon.event.input.adapter.core.InputEventAdapter;
import org.wso2.carbon.event.input.adapter.core.InputEventAdapterConfiguration;
import org.wso2.carbon.event.input.adapter.wso2event.WSO2EventEventAdapterFactory;
import java.util.Map;
/**
* The WSO2Event adapter factory class to create a WSO2Event input adapter
*/
public class ThriftEventAdapterFactory extends WSO2EventEventAdapterFactory {
private static final String ADAPTER_TYPE_THRIFT = "iot-event";
@Override
public String getType() {
return ADAPTER_TYPE_THRIFT;
}
@Override
public InputEventAdapter createEventAdapter(InputEventAdapterConfiguration eventAdapterConfiguration,
Map<String, String> globalProperties) {
return new ThriftAdapter(eventAdapterConfiguration, globalProperties);
}
}

@ -0,0 +1,167 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy
* of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.wso2.carbon.device.mgt.input.adapter.thrift.internal;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.osgi.service.component.ComponentContext;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.core.multitenancy.utils.TenantAxisUtils;
import org.wso2.carbon.databridge.commons.Credentials;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.databridge.core.AgentCallback;
import org.wso2.carbon.databridge.core.DataBridgeSubscriberService;
import org.wso2.carbon.device.mgt.input.adapter.thrift.ThriftAdapter;
import org.wso2.carbon.event.input.adapter.core.InputEventAdapterFactory;
import org.wso2.carbon.device.mgt.input.adapter.thrift.ThriftEventAdapterFactory;
import org.wso2.carbon.utils.ConfigurationContextService;
import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
/**
* @scr.component name="input.wso2EventAdapterService.component" immediate="true"
* @scr.reference name="agentserverservice.service"
* interface="org.wso2.carbon.databridge.core.DataBridgeSubscriberService" cardinality="1..1"
* policy="dynamic" bind="setDataBridgeSubscriberService" unbind="unSetDataBridgeSubscriberService"
* @scr.reference name="config.context.service"
* interface="org.wso2.carbon.utils.ConfigurationContextService"
* cardinality="1..1" policy="dynamic" bind="setConfigurationContextService"
* unbind="unsetConfigurationContextService"
*/
public class ThriftEventAdapterServiceComponent {
private static final Log log = LogFactory.getLog(ThriftEventAdapterServiceComponent.class);
/**
* initialize the agent service here service here.
*
* @param context
*/
protected void activate(ComponentContext context) {
try {
InputEventAdapterFactory wso2EventEventAdapterFactory = new ThriftEventAdapterFactory();
context.getBundleContext().registerService(InputEventAdapterFactory.class.getName(), wso2EventEventAdapterFactory, null);
if (log.isDebugEnabled()) {
log.debug("Successfully deployed the input WSO2Event adapter service");
}
} catch (RuntimeException e) {
log.error("Can not create the input WSO2Event adapter service ", e);
}
}
protected void setDataBridgeSubscriberService(
DataBridgeSubscriberService dataBridgeSubscriberService) {
if (ThriftEventAdapterServiceHolder.getDataBridgeSubscriberService() == null) {
ThriftEventAdapterServiceHolder.registerDataBridgeSubscriberService(dataBridgeSubscriberService);
dataBridgeSubscriberService.subscribe(new AgentCallback() {
@Override
public void definedStream(StreamDefinition streamDefinition, int i) {
}
@Override
public void removeStream(StreamDefinition streamDefinition, int i) {
}
@Override
public void receive(List<Event> events, Credentials credentials) {
try {
PrivilegedCarbonContext.startTenantFlow();
String tenantDomain = getTenantDomain(events, credentials);
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(tenantDomain, true);
if (!tenantDomain.equalsIgnoreCase(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME)) {
TenantAxisUtils.getTenantConfigurationContext(tenantDomain, ThriftEventAdapterServiceHolder
.getConfigurationContext());
}
for (Event event : events) {
ConcurrentHashMap<String, ThriftAdapter> adapters = ThriftEventAdapterServiceHolder
.getAdapterService(tenantDomain, event.getStreamId());
if (adapters != null) {
event = getStrippedEvent(event, credentials);
for (ThriftAdapter adapter : adapters.values()) {
adapter.getEventAdaptorListener().onEvent(event);
}
}
if (log.isDebugEnabled()) {
log.debug("Event received in wso2Event Adapter - " + event);
}
}
} finally {
PrivilegedCarbonContext.endTenantFlow();
}
}
});
}
}
protected void unSetDataBridgeSubscriberService(
DataBridgeSubscriberService dataBridgeSubscriberService) {
}
protected void setConfigurationContextService(ConfigurationContextService contextService) {
ConfigurationContext serverConfigContext = contextService.getServerConfigContext();
ThriftEventAdapterServiceHolder.setConfigurationContext(serverConfigContext);
}
protected void unsetConfigurationContextService(ConfigurationContextService contextService) {
ThriftEventAdapterServiceHolder.setConfigurationContext(null);
}
private String getTenantDomain(List<Event> events, Credentials credentials) {
Object[] objects = events.get(0).getMetaData();
String tenantDomain = credentials.getDomainName();
if (objects != null && objects.length > 0) {
if (tenantDomain.equals(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME)) {
String[] splitValues = ((String) objects[0]).split("@@");
if (splitValues.length > 1) {
tenantDomain = splitValues[0];
}
}
}
return tenantDomain;
}
private Event getStrippedEvent (Event event, Credentials credentials) {
Object[] objects = event.getMetaData();
String tenantDomain = credentials.getDomainName();
if (objects != null && objects.length > 0) {
if (tenantDomain.equals(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME)) {
String[] splitValues = ((String) objects[0]).split("@");
if (splitValues.length > 1) {
event.getMetaData()[0] = splitValues[1];
}
}
}
return event;
}
}

@ -0,0 +1,99 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy
* of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.wso2.carbon.device.mgt.input.adapter.thrift.internal;
import org.apache.axis2.context.ConfigurationContext;
import org.wso2.carbon.databridge.core.DataBridgeSubscriberService;
import org.wso2.carbon.device.mgt.input.adapter.thrift.ThriftAdapter;
import java.util.concurrent.ConcurrentHashMap;
/**
* common place to hold some OSGI bundle references.
*/
public final class ThriftEventAdapterServiceHolder {
private static DataBridgeSubscriberService dataBridgeSubscriberService;
private static ConfigurationContext configurationContext;
private static ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<String, ThriftAdapter>>>
inputEventAdapterListenerMap =
new ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<String, ThriftAdapter>>>();
private ThriftEventAdapterServiceHolder() {
}
public static void registerDataBridgeSubscriberService(
DataBridgeSubscriberService dataBridgeSubscriberService) {
ThriftEventAdapterServiceHolder.dataBridgeSubscriberService = dataBridgeSubscriberService;
}
public static DataBridgeSubscriberService getDataBridgeSubscriberService() {
return dataBridgeSubscriberService;
}
public static synchronized void registerAdapterService(String tenantDomain, String streamId,
ThriftAdapter thriftAdapter) {
ConcurrentHashMap<String, ConcurrentHashMap<String, ThriftAdapter>>
tenantSpecificInputEventAdapterListenerMap = inputEventAdapterListenerMap.get(tenantDomain);
if (tenantSpecificInputEventAdapterListenerMap == null) {
tenantSpecificInputEventAdapterListenerMap =
new ConcurrentHashMap<String, ConcurrentHashMap<String, ThriftAdapter>>();
inputEventAdapterListenerMap.put(tenantDomain, tenantSpecificInputEventAdapterListenerMap);
}
ConcurrentHashMap<String, ThriftAdapter> streamSpecificInputEventAdapterListenerMap =
tenantSpecificInputEventAdapterListenerMap.get(streamId);
if (streamSpecificInputEventAdapterListenerMap == null) {
streamSpecificInputEventAdapterListenerMap = new ConcurrentHashMap<String, ThriftAdapter>();
tenantSpecificInputEventAdapterListenerMap.put(streamId, streamSpecificInputEventAdapterListenerMap);
}
streamSpecificInputEventAdapterListenerMap.put(thriftAdapter.getEventAdapterName(), thriftAdapter);
}
public static void unregisterAdapterService(String tenantDomain, String streamId,
ThriftAdapter thriftAdapter) {
ConcurrentHashMap<String, ConcurrentHashMap<String, ThriftAdapter>>
tenantSpecificInputEventAdapterListenerMap = inputEventAdapterListenerMap.get(tenantDomain);
if (tenantSpecificInputEventAdapterListenerMap != null) {
ConcurrentHashMap<String, ThriftAdapter> streamSpecificInputEventAdapterListenerMap =
tenantSpecificInputEventAdapterListenerMap.get(streamId);
if (streamSpecificInputEventAdapterListenerMap != null) {
streamSpecificInputEventAdapterListenerMap.remove(thriftAdapter.getEventAdapterName());
}
}
}
public static ConcurrentHashMap<String, ThriftAdapter> getAdapterService(String tenantDomain, String streamId) {
ConcurrentHashMap<String, ConcurrentHashMap<String, ThriftAdapter>>
tenantSpecificInputEventAdapterListenerMap = inputEventAdapterListenerMap.get(tenantDomain);
if (tenantSpecificInputEventAdapterListenerMap != null) {
return tenantSpecificInputEventAdapterListenerMap.get(streamId);
}
return null;
}
public static ConfigurationContext getConfigurationContext() {
return configurationContext;
}
public static void setConfigurationContext(ConfigurationContext configurationContext) {
ThriftEventAdapterServiceHolder.configurationContext = configurationContext;
}
}

@ -0,0 +1,25 @@
#
# Copyright (c) 2005-2014, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
#
# WSO2 Inc. licenses this file to you under the Apache License,
# Version 2.0 (the "License"); you may not use this file except
# in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
stream=Stream Name
version=Stream Version
events.duplicated.in.cluster=Is events duplicated in cluster
events.duplicated.in.cluster.hint=This depends on how events are published to the server, 'true' only if multiple receiver URLs are defined in different receiver groups ({}).
wso2event.usage.tips_prefix=Following url formats are used to receive events<br/>For load-balancing: use "," to separate values for multiple endpoints<br/>&nbsp;&nbsp;&nbsp;Eg: <i>{tcp://&lt;hostname&gt;:&lt;port&gt;,tcp://&lt;hostname&gt;:&lt;port&gt;, ...}</i> <br/><br/>For failover: use "|" to separate multiple endpoints<br/>&nbsp;&nbsp;&nbsp;Eg: <i>{tcp://&lt;hostname&gt;:&lt;port>|tcp://&lt;hostname&gt;:&lt;port&gt;| ...}</i><br/><br/>For more than one cluster: use "{}" to separate multiple clusters<br/>&nbsp;&nbsp;&nbsp;Eg: <i>{tcp://&lt;hostname&gt;:&lt;port&gt;|tcp://&lt;hostname>:&lt;port&gt;| ...}</i>,<i>{tcp://&lt;hostname&gt;:&lt;port&gt;}</i><br/><br/>Ports available for Thrift protocol - TCP port:
wso2event.usage.tips_in_between=&nbsp;or SSL port:
wso2event.usage.tips_postfix=<br/>Ports available for Binary protocol - TCP port:

@ -77,7 +77,8 @@ public class TenantSubscriptionEndpoint extends SubscriptionEndpoint {
if (isAuthorized) { if (isAuthorized) {
try { try {
PrivilegedCarbonContext.startTenantFlow(); PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(tdomain, true); PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(authenticationInfo.getTenantDomain()
, true);
ServiceHolder.getInstance().getWebsocketOutputCallbackControllerService().subscribeWebsocket(streamName ServiceHolder.getInstance().getWebsocketOutputCallbackControllerService().subscribeWebsocket(streamName
, version, session); , version, session);
} finally { } finally {

@ -37,6 +37,7 @@
<module>input/org.wso2.carbon.device.mgt.input.adapter.http</module> <module>input/org.wso2.carbon.device.mgt.input.adapter.http</module>
<module>input/org.wso2.carbon.device.mgt.input.adapter.mqtt</module> <module>input/org.wso2.carbon.device.mgt.input.adapter.mqtt</module>
<module>input/org.wso2.carbon.device.mgt.input.adapter.xmpp</module> <module>input/org.wso2.carbon.device.mgt.input.adapter.xmpp</module>
<module>input/org.wso2.carbon.device.mgt.input.adapter.thrift</module>
<module>output/org.wso2.carbon.device.mgt.output.adapter.mqtt</module> <module>output/org.wso2.carbon.device.mgt.output.adapter.mqtt</module>
<module>output/org.wso2.carbon.device.mgt.output.adapter.xmpp</module> <module>output/org.wso2.carbon.device.mgt.output.adapter.xmpp</module>
<module>output/org.wso2.carbon.device.mgt.output.adapter.websocket</module> <module>output/org.wso2.carbon.device.mgt.output.adapter.websocket</module>

@ -18,7 +18,7 @@
--> -->
<eventReceiver name="Android-Agent-Event-Receiver" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver"> <eventReceiver name="Android-Agent-Event-Receiver" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
<from eventAdapterType="wso2event"> <from eventAdapterType="iot-event">
<property name="events.duplicated.in.cluster">false</property> <property name="events.duplicated.in.cluster">false</property>
</from> </from>
<mapping customMapping="disable" type="wso2event"/> <mapping customMapping="disable" type="wso2event"/>

@ -20,7 +20,6 @@
<eventReceiver name="android_sense_receiver-carbon.super" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver"> <eventReceiver name="android_sense_receiver-carbon.super" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
<from eventAdapterType="oauth-mqtt"> <from eventAdapterType="oauth-mqtt">
<property name="topic">carbon.super/android_sense/+/data</property> <property name="topic">carbon.super/android_sense/+/data</property>
<property name="clientId">android_sense_receiver-carbon.super</property>
</from> </from>
<mapping customMapping="disable" type="json"/> <mapping customMapping="disable" type="json"/>
<to streamName="org.wso2.iot.android.sense" version="1.0.0"/> <to streamName="org.wso2.iot.android.sense" version="1.0.0"/>

@ -20,7 +20,6 @@
<eventReceiver name="android_sense_receiver-${tenant-domain}" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver"> <eventReceiver name="android_sense_receiver-${tenant-domain}" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
<from eventAdapterType="oauth-mqtt"> <from eventAdapterType="oauth-mqtt">
<property name="topic">${tenant-domain}/android_sense/+/data</property> <property name="topic">${tenant-domain}/android_sense/+/data</property>
<property name="clientId">android_sense_receiver-${tenant-domain}</property>
</from> </from>
<mapping customMapping="disable" type="json"/> <mapping customMapping="disable" type="json"/>
<to streamName="org.wso2.iot.android.sense" version="1.0.0"/> <to streamName="org.wso2.iot.android.sense" version="1.0.0"/>

@ -19,7 +19,6 @@
<eventReceiver name="raspberrypi_receiver-carbon.super" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver"> <eventReceiver name="raspberrypi_receiver-carbon.super" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
<from eventAdapterType="oauth-mqtt"> <from eventAdapterType="oauth-mqtt">
<property name="topic">carbon.super/raspberrypi/+/temperature</property> <property name="topic">carbon.super/raspberrypi/+/temperature</property>
<property name="clientId">raspberrypi_receiver-carbon.super</property>
</from> </from>
<mapping customMapping="disable" type="json"/> <mapping customMapping="disable" type="json"/>
<to streamName="org.wso2.iot.raspberrypi" version="1.0.0"/> <to streamName="org.wso2.iot.raspberrypi" version="1.0.0"/>

@ -19,7 +19,6 @@
<eventReceiver name="raspberrypi_receiver-${tenant-domain}" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver"> <eventReceiver name="raspberrypi_receiver-${tenant-domain}" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
<from eventAdapterType="oauth-mqtt"> <from eventAdapterType="oauth-mqtt">
<property name="topic">${tenant-domain}/raspberrypi/+/temperature</property> <property name="topic">${tenant-domain}/raspberrypi/+/temperature</property>
<property name="clientId">raspberrypi_receiver-${tenant-domain}</property>
</from> </from>
<mapping customMapping="disable" type="json"/> <mapping customMapping="disable" type="json"/>
<to streamName="org.wso2.iot.raspberrypi" version="1.0.0"/> <to streamName="org.wso2.iot.raspberrypi" version="1.0.0"/>

@ -19,7 +19,6 @@
<eventReceiver name="virtualfirealarm_receiver-carbon.super" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver"> <eventReceiver name="virtualfirealarm_receiver-carbon.super" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
<from eventAdapterType="oauth-mqtt"> <from eventAdapterType="oauth-mqtt">
<property name="topic">carbon.super/virtual_firealarm/+/temperature</property> <property name="topic">carbon.super/virtual_firealarm/+/temperature</property>
<property name="clientId">virtualfirealarm_receiver-carbon.super</property>
</from> </from>
<mapping customMapping="disable" type="json"/> <mapping customMapping="disable" type="json"/>
<to streamName="org.wso2.iot.virtualfirealarm" version="1.0.0"/> <to streamName="org.wso2.iot.virtualfirealarm" version="1.0.0"/>

@ -19,7 +19,6 @@
<eventReceiver name="virtualfirealarm_receiver-${tenant-domain}" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver"> <eventReceiver name="virtualfirealarm_receiver-${tenant-domain}" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
<from eventAdapterType="oauth-mqtt"> <from eventAdapterType="oauth-mqtt">
<property name="topic">${tenant-domain}/virtual_firealarm/+/temperature</property> <property name="topic">${tenant-domain}/virtual_firealarm/+/temperature</property>
<property name="clientId">virtualfirealarm_receiver-${tenant-domain}</property>
</from> </from>
<mapping customMapping="disable" type="json"/> <mapping customMapping="disable" type="json"/>
<to streamName="org.wso2.iot.virtualfirealarm" version="1.0.0"/> <to streamName="org.wso2.iot.virtualfirealarm" version="1.0.0"/>

@ -64,6 +64,14 @@
<groupId>org.wso2.carbon.devicemgt-plugins</groupId> <groupId>org.wso2.carbon.devicemgt-plugins</groupId>
<artifactId>org.wso2.carbon.device.mgt.input.adapter.xmpp</artifactId> <artifactId>org.wso2.carbon.device.mgt.input.adapter.xmpp</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.wso2.carbon.devicemgt-plugins</groupId>
<artifactId>org.wso2.carbon.device.mgt.input.adapter.thrift</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.event.input.adapter.wso2event</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.jayway.jsonpath</groupId> <groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId> <artifactId>json-path</artifactId>
@ -174,12 +182,19 @@
<bundleDef> <bundleDef>
org.wso2.carbon.devicemgt-plugins:org.wso2.carbon.device.mgt.input.adapter.mqtt:${carbon.devicemgt.plugins.version} org.wso2.carbon.devicemgt-plugins:org.wso2.carbon.device.mgt.input.adapter.mqtt:${carbon.devicemgt.plugins.version}
</bundleDef> </bundleDef>
<bundleDef>
org.wso2.carbon.devicemgt-plugins:org.wso2.carbon.device.mgt.input.adapter.thrift:${carbon.devicemgt.plugins.version}
</bundleDef>
<bundleDef>
org.wso2.carbon.analytics-common:org.wso2.carbon.event.input.adapter.wso2event:${carbon.analytics.common.version}
</bundleDef>
<bundleDef> <bundleDef>
org.wso2.carbon.devicemgt-plugins:org.wso2.carbon.device.mgt.input.adapter.xmpp:${carbon.devicemgt.plugins.version} org.wso2.carbon.devicemgt-plugins:org.wso2.carbon.device.mgt.input.adapter.xmpp:${carbon.devicemgt.plugins.version}
</bundleDef> </bundleDef>
<bundleDef> <bundleDef>
org.wso2.carbon.devicemgt-plugins:org.wso2.carbon.device.mgt.output.adapter.websocket:${carbon.devicemgt.plugins.version} org.wso2.carbon.devicemgt-plugins:org.wso2.carbon.device.mgt.output.adapter.websocket:${carbon.devicemgt.plugins.version}
</bundleDef> </bundleDef>
<bundleDef> <bundleDef>
org.eclipse.paho:org.eclipse.paho.client.mqttv3:${eclipse.paho.version} org.eclipse.paho:org.eclipse.paho.client.mqttv3:${eclipse.paho.version}
</bundleDef> </bundleDef>

@ -355,6 +355,11 @@
<artifactId>org.wso2.carbon.event.output.adapter.core</artifactId> <artifactId>org.wso2.carbon.event.output.adapter.core</artifactId>
<version>${carbon.analytics.common.version}</version> <version>${carbon.analytics.common.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.event.input.adapter.wso2event</artifactId>
<version>${carbon.analytics.common.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.wso2.carbon.analytics-common</groupId> <groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.event.input.adapter.core</artifactId> <artifactId>org.wso2.carbon.event.input.adapter.core</artifactId>
@ -365,6 +370,21 @@
<artifactId>org.wso2.carbon.databridge.commons</artifactId> <artifactId>org.wso2.carbon.databridge.commons</artifactId>
<version>${carbon.analytics.common.version}</version> <version>${carbon.analytics.common.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.databridge.commons.thrift</artifactId>
<version>${carbon.analytics.common.version}</version>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.databridge.commons.binary</artifactId>
<version>${carbon.analytics.common.version}</version>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.databridge.core</artifactId>
<version>${carbon.analytics.common.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.wso2.carbon.analytics</groupId> <groupId>org.wso2.carbon.analytics</groupId>
<artifactId>org.wso2.carbon.analytics.api</artifactId> <artifactId>org.wso2.carbon.analytics.api</artifactId>
@ -387,6 +407,11 @@
<artifactId>org.wso2.carbon.device.mgt.output.adapter.mqtt</artifactId> <artifactId>org.wso2.carbon.device.mgt.output.adapter.mqtt</artifactId>
<version>${carbon.devicemgt.plugins.version}</version> <version>${carbon.devicemgt.plugins.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.wso2.carbon.devicemgt-plugins</groupId>
<artifactId>org.wso2.carbon.device.mgt.input.adapter.thrift</artifactId>
<version>${carbon.devicemgt.plugins.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.wso2.carbon.devicemgt-plugins</groupId> <groupId>org.wso2.carbon.devicemgt-plugins</groupId>
<artifactId>org.wso2.carbon.device.mgt.output.adapter.xmpp</artifactId> <artifactId>org.wso2.carbon.device.mgt.output.adapter.xmpp</artifactId>

Loading…
Cancel
Save