revert-dabc3590
Rasika Perera 8 years ago
commit 7b88ae13ba

@ -19,7 +19,7 @@
<eventReceiver name="Geo-Receiver-WSO2Event-LocationStream"
trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
<from eventAdapterType="wso2event">
<from eventAdapterType="iot-event">
<property name="events.duplicated.in.cluster">false</property>
</from>
<mapping customMapping="disable" type="wso2event"/>

@ -18,9 +18,9 @@
-->
<eventReceiver name="arduino_receiver" statistics="disable" trace="disable"
xmlns="http://wso2.org/carbon/eventreceiver">
<from eventAdapterType="oauth-http">
<property name="contentValidator">iot-http</property>
</from>
<mapping customMapping="disable" type="json"/>
<to streamName="org.wso2.iot.arduino" version="1.0.0"/>
<from eventAdapterType="iot-event">
<property name="events.duplicated.in.cluster">false</property>
</from>
<mapping customMapping="disable" type="wso2event"/>
<to streamName="org.wso2.iot.arduino" version="1.0.0"/>
</eventReceiver>

@ -120,7 +120,7 @@ public class MQTTEventAdapterFactory extends InputEventAdapterFactory {
// set clientId
Property clientId = new Property(MQTTEventAdapterConstants.ADAPTER_CONF_CLIENTID);
clientId.setDisplayName(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_CLIENTID));
clientId.setRequired(true);
clientId.setRequired(false);
clientId.setHint(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_CLIENTID_HINT));
propertyList.add(clientId);

@ -206,8 +206,14 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
}
try {
mqttClient.subscribe(topic);
log.info("mqtt receiver subscribed to topic: " + topic);
} catch (MqttException e) {
log.error("Failed to subscribe to topic: " + topic + ", Retrying.....");
try {
mqttClient.disconnect();
} catch (MqttException ex) {
// do nothing.
}
return false;
}
return true;

@ -46,7 +46,7 @@ public class MQTTEventAdapterConstants {
public static final String ADAPTER_CONF_CLEAN_SESSION = "cleanSession";
public static final String ADAPTER_CONF_CLEAN_SESSION_HINT = "cleanSession.hint";
public static final String ADAPTER_CONF_KEEP_ALIVE = "keepAlive";
public static final int ADAPTER_CONF_DEFAULT_KEEP_ALIVE = 60000;
public static final int ADAPTER_CONF_DEFAULT_KEEP_ALIVE = 20000;
public static final int INITIAL_RECONNECTION_DURATION = 4000;
public static final int RECONNECTION_PROGRESS_FACTOR = 2;

@ -0,0 +1,136 @@
<?xml version="1.0" encoding="utf-8"?>
<!--
~ Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
~
~ WSO2 Inc. licenses this file to you under the Apache License,
~ Version 2.0 (the "License"); you may not use this file except
~ in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<groupId>org.wso2.carbon.devicemgt-plugins</groupId>
<artifactId>cdmf-transport-adapters</artifactId>
<version>3.0.37-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>org.wso2.carbon.device.mgt.input.adapter.thrift</artifactId>
<packaging>bundle</packaging>
<name>WSO2 Carbon - Event Input Thrift Adapter Module</name>
<description>org.wso2.carbon.event.input.adapter.thrift provides the back-end
functionality of input wso2event adapter
</description>
<url>http://wso2.org</url>
<dependencies>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.event.input.adapter.core</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.event.input.adapter.wso2event</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.carbon</groupId>
<artifactId>org.wso2.carbon.logging</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.carbon</groupId>
<artifactId>org.wso2.carbon.core</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.databridge.commons</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.databridge.commons.thrift</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.databridge.commons.binary</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.databridge.core</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-scr-plugin</artifactId>
<executions>
<execution>
<id>generate-scr-descriptor</id>
<goals>
<goal>scr</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<extensions>true</extensions>
<configuration>
<instructions>
<Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
<Bundle-Name>${project.artifactId}</Bundle-Name>
<Private-Package>
org.wso2.carbon.device.mgt.input.adapter.thrift.internal,
org.wso2.carbon.device.mgt.input.adapter.thrift.internal.*,
</Private-Package>
<Export-Package>
!org.wso2.carbon.device.mgt.input.adapter.thrift.internal,
!org.wso2.carbon.device.mgt.input.adapter.thrift.internal.*,
org.wso2.carbon.device.mgt.input.adapter.thrift.*,
</Export-Package>
<Import-Package>
org.wso2.carbon.event.input.adapter.core,
org.wso2.carbon.event.input.adapter.core.*,
!javax.xml.namespace,
javax.xml.namespace; version=0.0.0,
org.apache.commons.logging,
org.osgi.framework,
org.osgi.service.component,
org.wso2.carbon.context,
org.wso2.carbon.databridge.commons,
org.wso2.carbon.databridge.core,
org.wso2.carbon.event.input.adapter.wso2event,
org.apache.axis2.context,
org.wso2.carbon.core.multitenancy.utils,
org.wso2.carbon.utils,
org.wso2.carbon.utils.multitenancy
</Import-Package>
</instructions>
</configuration>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,90 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy
* of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.wso2.carbon.device.mgt.input.adapter.thrift;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.input.adapter.thrift.internal.ThriftEventAdapterServiceHolder;
import org.wso2.carbon.event.input.adapter.core.EventAdapterConstants;
import org.wso2.carbon.event.input.adapter.core.InputEventAdapter;
import org.wso2.carbon.event.input.adapter.core.InputEventAdapterConfiguration;
import org.wso2.carbon.event.input.adapter.core.InputEventAdapterListener;
import org.wso2.carbon.event.input.adapter.core.exception.InputEventAdapterException;
import org.wso2.carbon.event.input.adapter.core.exception.TestConnectionNotSupportedException;
import java.util.Map;
public final class ThriftAdapter implements InputEventAdapter {
private static final Log log = LogFactory.getLog(ThriftAdapter.class);
private final InputEventAdapterConfiguration eventAdapterConfiguration;
private final Map<String, String> globalProperties;
private InputEventAdapterListener eventAdaptorListener;
public ThriftAdapter(InputEventAdapterConfiguration eventAdapterConfiguration,
Map<String, String> globalProperties) {
this.eventAdapterConfiguration = eventAdapterConfiguration;
this.globalProperties = globalProperties;
}
@Override
public void init(InputEventAdapterListener eventAdaptorListener) throws InputEventAdapterException {
this.eventAdaptorListener = eventAdaptorListener;
}
@Override
public void testConnect() throws TestConnectionNotSupportedException {
throw new TestConnectionNotSupportedException("not-supported");
}
@Override
public void connect() {
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(true);
String streamId = eventAdapterConfiguration.getInputStreamIdOfWso2eventMessageFormat();
ThriftEventAdapterServiceHolder.registerAdapterService(tenantDomain, streamId, this);
}
@Override
public void disconnect() {
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(true);
String streamId = eventAdapterConfiguration.getInputStreamIdOfWso2eventMessageFormat();
ThriftEventAdapterServiceHolder.unregisterAdapterService(tenantDomain, streamId, this);
}
@Override
public void destroy() {
}
@Override
public boolean isEventDuplicatedInCluster() {
return Boolean.parseBoolean(eventAdapterConfiguration.getProperties().get(
EventAdapterConstants.EVENTS_DUPLICATED_IN_CLUSTER));
}
@Override
public boolean isPolling() {
return false;
}
public String getEventAdapterName() {
return eventAdapterConfiguration.getName();
}
public InputEventAdapterListener getEventAdaptorListener() {
return eventAdaptorListener;
}
}

@ -0,0 +1,41 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy
* of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.wso2.carbon.device.mgt.input.adapter.thrift;
import org.wso2.carbon.event.input.adapter.core.InputEventAdapter;
import org.wso2.carbon.event.input.adapter.core.InputEventAdapterConfiguration;
import org.wso2.carbon.event.input.adapter.wso2event.WSO2EventEventAdapterFactory;
import java.util.Map;
/**
* The WSO2Event adapter factory class to create a WSO2Event input adapter
*/
public class ThriftEventAdapterFactory extends WSO2EventEventAdapterFactory {
private static final String ADAPTER_TYPE_THRIFT = "iot-event";
@Override
public String getType() {
return ADAPTER_TYPE_THRIFT;
}
@Override
public InputEventAdapter createEventAdapter(InputEventAdapterConfiguration eventAdapterConfiguration,
Map<String, String> globalProperties) {
return new ThriftAdapter(eventAdapterConfiguration, globalProperties);
}
}

@ -0,0 +1,167 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy
* of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.wso2.carbon.device.mgt.input.adapter.thrift.internal;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.osgi.service.component.ComponentContext;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.core.multitenancy.utils.TenantAxisUtils;
import org.wso2.carbon.databridge.commons.Credentials;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.databridge.core.AgentCallback;
import org.wso2.carbon.databridge.core.DataBridgeSubscriberService;
import org.wso2.carbon.device.mgt.input.adapter.thrift.ThriftAdapter;
import org.wso2.carbon.event.input.adapter.core.InputEventAdapterFactory;
import org.wso2.carbon.device.mgt.input.adapter.thrift.ThriftEventAdapterFactory;
import org.wso2.carbon.utils.ConfigurationContextService;
import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
/**
* @scr.component name="input.wso2EventAdapterService.component" immediate="true"
* @scr.reference name="agentserverservice.service"
* interface="org.wso2.carbon.databridge.core.DataBridgeSubscriberService" cardinality="1..1"
* policy="dynamic" bind="setDataBridgeSubscriberService" unbind="unSetDataBridgeSubscriberService"
* @scr.reference name="config.context.service"
* interface="org.wso2.carbon.utils.ConfigurationContextService"
* cardinality="1..1" policy="dynamic" bind="setConfigurationContextService"
* unbind="unsetConfigurationContextService"
*/
public class ThriftEventAdapterServiceComponent {
private static final Log log = LogFactory.getLog(ThriftEventAdapterServiceComponent.class);
/**
* initialize the agent service here service here.
*
* @param context
*/
protected void activate(ComponentContext context) {
try {
InputEventAdapterFactory wso2EventEventAdapterFactory = new ThriftEventAdapterFactory();
context.getBundleContext().registerService(InputEventAdapterFactory.class.getName(), wso2EventEventAdapterFactory, null);
if (log.isDebugEnabled()) {
log.debug("Successfully deployed the input WSO2Event adapter service");
}
} catch (RuntimeException e) {
log.error("Can not create the input WSO2Event adapter service ", e);
}
}
protected void setDataBridgeSubscriberService(
DataBridgeSubscriberService dataBridgeSubscriberService) {
if (ThriftEventAdapterServiceHolder.getDataBridgeSubscriberService() == null) {
ThriftEventAdapterServiceHolder.registerDataBridgeSubscriberService(dataBridgeSubscriberService);
dataBridgeSubscriberService.subscribe(new AgentCallback() {
@Override
public void definedStream(StreamDefinition streamDefinition, int i) {
}
@Override
public void removeStream(StreamDefinition streamDefinition, int i) {
}
@Override
public void receive(List<Event> events, Credentials credentials) {
try {
PrivilegedCarbonContext.startTenantFlow();
String tenantDomain = getTenantDomain(events, credentials);
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(tenantDomain, true);
if (!tenantDomain.equalsIgnoreCase(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME)) {
TenantAxisUtils.getTenantConfigurationContext(tenantDomain, ThriftEventAdapterServiceHolder
.getConfigurationContext());
}
for (Event event : events) {
ConcurrentHashMap<String, ThriftAdapter> adapters = ThriftEventAdapterServiceHolder
.getAdapterService(tenantDomain, event.getStreamId());
if (adapters != null) {
event = getStrippedEvent(event, credentials);
for (ThriftAdapter adapter : adapters.values()) {
adapter.getEventAdaptorListener().onEvent(event);
}
}
if (log.isDebugEnabled()) {
log.debug("Event received in wso2Event Adapter - " + event);
}
}
} finally {
PrivilegedCarbonContext.endTenantFlow();
}
}
});
}
}
protected void unSetDataBridgeSubscriberService(
DataBridgeSubscriberService dataBridgeSubscriberService) {
}
protected void setConfigurationContextService(ConfigurationContextService contextService) {
ConfigurationContext serverConfigContext = contextService.getServerConfigContext();
ThriftEventAdapterServiceHolder.setConfigurationContext(serverConfigContext);
}
protected void unsetConfigurationContextService(ConfigurationContextService contextService) {
ThriftEventAdapterServiceHolder.setConfigurationContext(null);
}
private String getTenantDomain(List<Event> events, Credentials credentials) {
Object[] objects = events.get(0).getMetaData();
String tenantDomain = credentials.getDomainName();
if (objects != null && objects.length > 0) {
if (tenantDomain.equals(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME)) {
String[] splitValues = ((String) objects[0]).split("@");
if (splitValues.length > 1) {
tenantDomain = splitValues[0];
}
}
}
return tenantDomain;
}
private Event getStrippedEvent (Event event, Credentials credentials) {
Object[] objects = event.getMetaData();
String tenantDomain = credentials.getDomainName();
if (objects != null && objects.length > 0) {
if (tenantDomain.equals(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME)) {
String[] splitValues = ((String) objects[0]).split("@");
if (splitValues.length > 1) {
event.getMetaData()[0] = splitValues[1];
}
}
}
return event;
}
}

@ -0,0 +1,99 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy
* of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.wso2.carbon.device.mgt.input.adapter.thrift.internal;
import org.apache.axis2.context.ConfigurationContext;
import org.wso2.carbon.databridge.core.DataBridgeSubscriberService;
import org.wso2.carbon.device.mgt.input.adapter.thrift.ThriftAdapter;
import java.util.concurrent.ConcurrentHashMap;
/**
* common place to hold some OSGI bundle references.
*/
public final class ThriftEventAdapterServiceHolder {
private static DataBridgeSubscriberService dataBridgeSubscriberService;
private static ConfigurationContext configurationContext;
private static ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<String, ThriftAdapter>>>
inputEventAdapterListenerMap =
new ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<String, ThriftAdapter>>>();
private ThriftEventAdapterServiceHolder() {
}
public static void registerDataBridgeSubscriberService(
DataBridgeSubscriberService dataBridgeSubscriberService) {
ThriftEventAdapterServiceHolder.dataBridgeSubscriberService = dataBridgeSubscriberService;
}
public static DataBridgeSubscriberService getDataBridgeSubscriberService() {
return dataBridgeSubscriberService;
}
public static synchronized void registerAdapterService(String tenantDomain, String streamId,
ThriftAdapter thriftAdapter) {
ConcurrentHashMap<String, ConcurrentHashMap<String, ThriftAdapter>>
tenantSpecificInputEventAdapterListenerMap = inputEventAdapterListenerMap.get(tenantDomain);
if (tenantSpecificInputEventAdapterListenerMap == null) {
tenantSpecificInputEventAdapterListenerMap =
new ConcurrentHashMap<String, ConcurrentHashMap<String, ThriftAdapter>>();
inputEventAdapterListenerMap.put(tenantDomain, tenantSpecificInputEventAdapterListenerMap);
}
ConcurrentHashMap<String, ThriftAdapter> streamSpecificInputEventAdapterListenerMap =
tenantSpecificInputEventAdapterListenerMap.get(streamId);
if (streamSpecificInputEventAdapterListenerMap == null) {
streamSpecificInputEventAdapterListenerMap = new ConcurrentHashMap<String, ThriftAdapter>();
tenantSpecificInputEventAdapterListenerMap.put(streamId, streamSpecificInputEventAdapterListenerMap);
}
streamSpecificInputEventAdapterListenerMap.put(thriftAdapter.getEventAdapterName(), thriftAdapter);
}
public static void unregisterAdapterService(String tenantDomain, String streamId,
ThriftAdapter thriftAdapter) {
ConcurrentHashMap<String, ConcurrentHashMap<String, ThriftAdapter>>
tenantSpecificInputEventAdapterListenerMap = inputEventAdapterListenerMap.get(tenantDomain);
if (tenantSpecificInputEventAdapterListenerMap != null) {
ConcurrentHashMap<String, ThriftAdapter> streamSpecificInputEventAdapterListenerMap =
tenantSpecificInputEventAdapterListenerMap.get(streamId);
if (streamSpecificInputEventAdapterListenerMap != null) {
streamSpecificInputEventAdapterListenerMap.remove(thriftAdapter.getEventAdapterName());
}
}
}
public static ConcurrentHashMap<String, ThriftAdapter> getAdapterService(String tenantDomain, String streamId) {
ConcurrentHashMap<String, ConcurrentHashMap<String, ThriftAdapter>>
tenantSpecificInputEventAdapterListenerMap = inputEventAdapterListenerMap.get(tenantDomain);
if (tenantSpecificInputEventAdapterListenerMap != null) {
return tenantSpecificInputEventAdapterListenerMap.get(streamId);
}
return null;
}
public static ConfigurationContext getConfigurationContext() {
return configurationContext;
}
public static void setConfigurationContext(ConfigurationContext configurationContext) {
ThriftEventAdapterServiceHolder.configurationContext = configurationContext;
}
}

@ -0,0 +1,25 @@
#
# Copyright (c) 2005-2014, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
#
# WSO2 Inc. licenses this file to you under the Apache License,
# Version 2.0 (the "License"); you may not use this file except
# in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
stream=Stream Name
version=Stream Version
events.duplicated.in.cluster=Is events duplicated in cluster
events.duplicated.in.cluster.hint=This depends on how events are published to the server, 'true' only if multiple receiver URLs are defined in different receiver groups ({}).
wso2event.usage.tips_prefix=Following url formats are used to receive events<br/>For load-balancing: use "," to separate values for multiple endpoints<br/>&nbsp;&nbsp;&nbsp;Eg: <i>{tcp://&lt;hostname&gt;:&lt;port&gt;,tcp://&lt;hostname&gt;:&lt;port&gt;, ...}</i> <br/><br/>For failover: use "|" to separate multiple endpoints<br/>&nbsp;&nbsp;&nbsp;Eg: <i>{tcp://&lt;hostname&gt;:&lt;port>|tcp://&lt;hostname&gt;:&lt;port&gt;| ...}</i><br/><br/>For more than one cluster: use "{}" to separate multiple clusters<br/>&nbsp;&nbsp;&nbsp;Eg: <i>{tcp://&lt;hostname&gt;:&lt;port&gt;|tcp://&lt;hostname>:&lt;port&gt;| ...}</i>,<i>{tcp://&lt;hostname&gt;:&lt;port&gt;}</i><br/><br/>Ports available for Thrift protocol - TCP port:
wso2event.usage.tips_in_between=&nbsp;or SSL port:
wso2event.usage.tips_postfix=<br/>Ports available for Binary protocol - TCP port:

@ -77,7 +77,8 @@ public class TenantSubscriptionEndpoint extends SubscriptionEndpoint {
if (isAuthorized) {
try {
PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(tdomain, true);
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(authenticationInfo.getTenantDomain()
, true);
ServiceHolder.getInstance().getWebsocketOutputCallbackControllerService().subscribeWebsocket(streamName
, version, session);
} finally {

@ -425,23 +425,36 @@ public class WebsocketEventAdapter implements OutputEventAdapter {
// fetch the different attribute values received as part of the current event.
Set<String> queryParams = queryParamValuePairs.keySet();
for (String aQueryParam : queryParams) {
if (!aQueryParam.equalsIgnoreCase(WebsocketConstants.TOKEN_PARAM)) {
try {
String queryValue = queryParamValuePairs.get(aQueryParam);
if (queryValue != null && !queryValue.trim().isEmpty()) {
JSONObject jsonObject = new JSONObject(jsonMessage);
String eventValue = jsonObject.getString(aQueryParam);
if (eventValue == null || !eventValue.equalsIgnoreCase(queryValue)) {
return false;
try {
String queryValue = queryParamValuePairs.get(aQueryParam);
if (queryValue != null && !queryValue.trim().isEmpty()) {
JSONObject jsonObject = new JSONObject(jsonMessage);
JSONObject event = jsonObject.getJSONObject(WebsocketConstants.EVENT);
JSONObject data;
if (!event.isNull(WebsocketConstants.META_DATA)) {
data = event.getJSONObject(WebsocketConstants.META_DATA);
if (!data.isNull(aQueryParam)) {
String eventValue = data.get(aQueryParam).toString();
if (eventValue == null || !eventValue.equalsIgnoreCase(queryValue)) {
return false;
}
}
}
} catch (JSONException e) {
if (log.isDebugEnabled()) {
log.debug("Unable validate the stream filter properties for event : " + jsonMessage
+ " ", e);
if (!event.isNull(WebsocketConstants.PAYLOAD_DATA)) {
data = event.getJSONObject(WebsocketConstants.PAYLOAD_DATA);
if (!data.isNull(aQueryParam)) {
String eventValue = data.get(aQueryParam).toString();
if (eventValue == null || !eventValue.equalsIgnoreCase(queryValue)) {
return false;
}
}
}
return false;
}
} catch (JSONException e) {
//do nothing - This exception is thrown when the event does not have query parameter.
}
}
}

@ -34,4 +34,7 @@ public class WebsocketConstants {
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
public static final String TOKEN_PARAM = "token";
public static final String META_DATA = "metaData";
public static final String PAYLOAD_DATA = "payloadData";
public static final String EVENT = "event";
}

@ -37,6 +37,7 @@
<module>input/org.wso2.carbon.device.mgt.input.adapter.http</module>
<module>input/org.wso2.carbon.device.mgt.input.adapter.mqtt</module>
<module>input/org.wso2.carbon.device.mgt.input.adapter.xmpp</module>
<module>input/org.wso2.carbon.device.mgt.input.adapter.thrift</module>
<module>output/org.wso2.carbon.device.mgt.output.adapter.mqtt</module>
<module>output/org.wso2.carbon.device.mgt.output.adapter.xmpp</module>
<module>output/org.wso2.carbon.device.mgt.output.adapter.websocket</module>

@ -72,7 +72,7 @@ public class DeviceAccessBasedMQTTAuthorizer implements IAuthorizer {
private static final String UI_EXECUTE = "ui.execute";
private static Log log = LogFactory.getLog(DeviceAccessBasedMQTTAuthorizer.class);
AuthorizationConfigurationManager MQTTAuthorizationConfiguration;
private AuthorizationConfigurationManager MQTTAuthorizationConfiguration;
private static final String CDMF_SERVER_BASE_CONTEXT = "/api/device-mgt/v1.0";
private static final String CACHE_MANAGER_NAME = "mqttAuthorizationCacheManager";
private static final String CACHE_NAME = "mqttAuthorizationCache";
@ -173,17 +173,15 @@ public class DeviceAccessBasedMQTTAuthorizer implements IAuthorizer {
} catch (FeignException e) {
oAuthRequestInterceptor.resetApiApplicationKey();
if (e.getMessage().contains(GATEWAY_ERROR_CODE)) {
log.error("Failed to connect to the device authorization service.");
log.error("Failed to connect to the device authorization service.", e);
} else {
log.error(e.getMessage(), e);
}
log.error(e.getMessage(), e);
}
return false;
} finally {
PrivilegedCarbonContext.endTenantFlow();
}
return false;
}
/**

@ -18,7 +18,7 @@
-->
<eventReceiver name="Android-Agent-Event-Receiver" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
<from eventAdapterType="wso2event">
<from eventAdapterType="iot-event">
<property name="events.duplicated.in.cluster">false</property>
</from>
<mapping customMapping="disable" type="wso2event"/>

@ -9,12 +9,12 @@
@Plan:trace('false')
@Import('org.wso2.android.agent.Stream:1.0.0')
define stream dataIn (deviceId string, payload string, type string);
define stream dataIn (meta_deviceId string, payload string, type string);
@Export('org.wso2.geo.LocationStream:1.0.0')
define stream dataOut (id string, timeStamp long, latitude double, longitude double, type string, speed float, heading float );
from dataIn[type == 'location']
select deviceId as id, convert(json:getProperty(payload, 'timeStamp'), 'long') as timeStamp, convert(json:getProperty(payload,
select meta_deviceId as id, convert(json:getProperty(payload, 'timeStamp'), 'long') as timeStamp, convert(json:getProperty(payload,
'latitude'), 'double') as latitude, convert(json:getProperty(payload, 'longitude'), 'double') as longitude,
'android' as type, 0.0f as speed, 0.0f as heading insert into dataOut

@ -3,11 +3,13 @@
"version": "1.0.0",
"nickName": "",
"description": "Stream that receives various types of events from android agent",
"payloadData": [
"metaData": [
{
"name": "deviceId",
"type": "STRING"
},
}
],
"payloadData": [
{
"name": "payload",
"type": "STRING"

@ -21,6 +21,7 @@ package org.wso2.carbon.mdm.services.android.services.impl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.analytics.datasource.commons.exception.AnalyticsException;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.analytics.data.publisher.exception.DataPublisherConfigurationException;
import org.wso2.carbon.mdm.services.android.bean.DeviceState;
import org.wso2.carbon.mdm.services.android.bean.ErrorResponse;
@ -31,6 +32,7 @@ import org.wso2.carbon.mdm.services.android.exception.UnexpectedServerErrorExcep
import org.wso2.carbon.mdm.services.android.services.EventReceiverService;
import org.wso2.carbon.mdm.services.android.util.AndroidAPIUtils;
import org.wso2.carbon.mdm.services.android.util.Message;
import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
import javax.validation.Valid;
import javax.validation.constraints.Size;
@ -51,12 +53,12 @@ public class EventReceiverServiceImpl implements EventReceiverService {
log.debug("Invoking Android device even logging.");
}
Message message = new Message();
Object metaData[] = {eventBeanWrapper.getDeviceIdentifier()};
Object payload[] = {eventBeanWrapper.getDeviceIdentifier(), eventBeanWrapper.getPayload(),
eventBeanWrapper.getType()};
Object payload[] = {eventBeanWrapper.getPayload(), eventBeanWrapper.getType()};
try {
if (AndroidAPIUtils.getEventPublisherService().publishEvent(
EVENT_STREAM_DEFINITION, "1.0.0", new Object[0], new Object[0], payload)) {
EVENT_STREAM_DEFINITION, "1.0.0", metaData, new Object[0], payload)) {
message.setResponseCode("Event is published successfully.");
return Response.status(Response.Status.CREATED).entity(message).build();
} else {

@ -20,7 +20,6 @@
<eventReceiver name="android_sense_receiver-carbon.super" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
<from eventAdapterType="oauth-mqtt">
<property name="topic">carbon.super/android_sense/+/data</property>
<property name="clientId">android_sense_receiver-carbon.super</property>
</from>
<mapping customMapping="disable" type="json"/>
<to streamName="org.wso2.iot.android.sense" version="1.0.0"/>

@ -20,7 +20,6 @@
<eventReceiver name="android_sense_receiver-${tenant-domain}" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
<from eventAdapterType="oauth-mqtt">
<property name="topic">${tenant-domain}/android_sense/+/data</property>
<property name="clientId">android_sense_receiver-${tenant-domain}</property>
</from>
<mapping customMapping="disable" type="json"/>
<to streamName="org.wso2.iot.android.sense" version="1.0.0"/>

@ -19,7 +19,6 @@
<eventReceiver name="raspberrypi_receiver-carbon.super" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
<from eventAdapterType="oauth-mqtt">
<property name="topic">carbon.super/raspberrypi/+/temperature</property>
<property name="clientId">raspberrypi_receiver-carbon.super</property>
</from>
<mapping customMapping="disable" type="json"/>
<to streamName="org.wso2.iot.raspberrypi" version="1.0.0"/>

@ -19,7 +19,6 @@
<eventReceiver name="raspberrypi_receiver-${tenant-domain}" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
<from eventAdapterType="oauth-mqtt">
<property name="topic">${tenant-domain}/raspberrypi/+/temperature</property>
<property name="clientId">raspberrypi_receiver-${tenant-domain}</property>
</from>
<mapping customMapping="disable" type="json"/>
<to streamName="org.wso2.iot.raspberrypi" version="1.0.0"/>

@ -19,7 +19,6 @@
<eventReceiver name="virtualfirealarm_receiver-carbon.super" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
<from eventAdapterType="oauth-mqtt">
<property name="topic">carbon.super/virtual_firealarm/+/temperature</property>
<property name="clientId">virtualfirealarm_receiver-carbon.super</property>
</from>
<mapping customMapping="disable" type="json"/>
<to streamName="org.wso2.iot.virtualfirealarm" version="1.0.0"/>

@ -19,7 +19,6 @@
<eventReceiver name="virtualfirealarm_receiver-${tenant-domain}" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
<from eventAdapterType="oauth-mqtt">
<property name="topic">${tenant-domain}/virtual_firealarm/+/temperature</property>
<property name="clientId">virtualfirealarm_receiver-${tenant-domain}</property>
</from>
<mapping customMapping="disable" type="json"/>
<to streamName="org.wso2.iot.virtualfirealarm" version="1.0.0"/>

@ -64,6 +64,14 @@
<groupId>org.wso2.carbon.devicemgt-plugins</groupId>
<artifactId>org.wso2.carbon.device.mgt.input.adapter.xmpp</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.carbon.devicemgt-plugins</groupId>
<artifactId>org.wso2.carbon.device.mgt.input.adapter.thrift</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.event.input.adapter.wso2event</artifactId>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
@ -174,12 +182,19 @@
<bundleDef>
org.wso2.carbon.devicemgt-plugins:org.wso2.carbon.device.mgt.input.adapter.mqtt:${carbon.devicemgt.plugins.version}
</bundleDef>
<bundleDef>
org.wso2.carbon.devicemgt-plugins:org.wso2.carbon.device.mgt.input.adapter.thrift:${carbon.devicemgt.plugins.version}
</bundleDef>
<bundleDef>
org.wso2.carbon.analytics-common:org.wso2.carbon.event.input.adapter.wso2event:${carbon.analytics.common.version}
</bundleDef>
<bundleDef>
org.wso2.carbon.devicemgt-plugins:org.wso2.carbon.device.mgt.input.adapter.xmpp:${carbon.devicemgt.plugins.version}
</bundleDef>
<bundleDef>
org.wso2.carbon.devicemgt-plugins:org.wso2.carbon.device.mgt.output.adapter.websocket:${carbon.devicemgt.plugins.version}
</bundleDef>
<bundleDef>
org.eclipse.paho:org.eclipse.paho.client.mqttv3:${eclipse.paho.version}
</bundleDef>

@ -355,6 +355,11 @@
<artifactId>org.wso2.carbon.event.output.adapter.core</artifactId>
<version>${carbon.analytics.common.version}</version>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.event.input.adapter.wso2event</artifactId>
<version>${carbon.analytics.common.version}</version>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.event.input.adapter.core</artifactId>
@ -365,6 +370,21 @@
<artifactId>org.wso2.carbon.databridge.commons</artifactId>
<version>${carbon.analytics.common.version}</version>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.databridge.commons.thrift</artifactId>
<version>${carbon.analytics.common.version}</version>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.databridge.commons.binary</artifactId>
<version>${carbon.analytics.common.version}</version>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.databridge.core</artifactId>
<version>${carbon.analytics.common.version}</version>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics</groupId>
<artifactId>org.wso2.carbon.analytics.api</artifactId>
@ -387,6 +407,11 @@
<artifactId>org.wso2.carbon.device.mgt.output.adapter.mqtt</artifactId>
<version>${carbon.devicemgt.plugins.version}</version>
</dependency>
<dependency>
<groupId>org.wso2.carbon.devicemgt-plugins</groupId>
<artifactId>org.wso2.carbon.device.mgt.input.adapter.thrift</artifactId>
<version>${carbon.devicemgt.plugins.version}</version>
</dependency>
<dependency>
<groupId>org.wso2.carbon.devicemgt-plugins</groupId>
<artifactId>org.wso2.carbon.device.mgt.output.adapter.xmpp</artifactId>

Loading…
Cancel
Save