Implemented Pull Notification Support

revert-dabc3590
ayyoob 8 years ago
parent ef274b0b3e
commit 8381070bf6

@ -91,6 +91,10 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
this.topic = PropertyUtils.replaceTenantDomainProperty(topic); this.topic = PropertyUtils.replaceTenantDomainProperty(topic);
this.eventAdapterListener = inputEventAdapterListener; this.eventAdapterListener = inputEventAdapterListener;
this.tenantDomain = this.topic.split("/")[0]; this.tenantDomain = this.topic.split("/")[0];
//this is to allow server listener from IoT Core to connect.
if (this.tenantDomain.equals("+")) {
this.tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
}
//SORTING messages until the server fetches them //SORTING messages until the server fetches them
String temp_directory = System.getProperty("java.io.tmpdir"); String temp_directory = System.getProperty("java.io.tmpdir");
@ -126,8 +130,8 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
.getContentTransformer(contentTransformerType); .getContentTransformer(contentTransformerType);
} }
} catch (MqttException e) { } catch (MqttException e) {
log.error("Exception occurred while subscribing to MQTT broker at " log.error("Exception occurred while creating an mqtt client to "
+ mqttBrokerConnectionConfiguration.getBrokerUrl()); + mqttBrokerConnectionConfiguration.getBrokerUrl() + " reason code:" + e.getReasonCode());
throw new InputEventAdapterRuntimeException(e); throw new InputEventAdapterRuntimeException(e);
} }
} }

@ -74,11 +74,13 @@ public class DeviceAccessBasedMQTTAuthorizer implements IAuthorizer {
private static Log log = LogFactory.getLog(DeviceAccessBasedMQTTAuthorizer.class); private static Log log = LogFactory.getLog(DeviceAccessBasedMQTTAuthorizer.class);
private AuthorizationConfigurationManager MQTTAuthorizationConfiguration; private AuthorizationConfigurationManager MQTTAuthorizationConfiguration;
private static final String CDMF_SERVER_BASE_CONTEXT = "/api/device-mgt/v1.0"; private static final String CDMF_SERVER_BASE_CONTEXT = "/api/device-mgt/v1.0";
private static final String DEFAULT_ADMIN_PERMISSION = "permission/admin/device-mgt";
private static final String CACHE_MANAGER_NAME = "mqttAuthorizationCacheManager"; private static final String CACHE_MANAGER_NAME = "mqttAuthorizationCacheManager";
private static final String CACHE_NAME = "mqttAuthorizationCache"; private static final String CACHE_NAME = "mqttAuthorizationCache";
private static DeviceAccessAuthorizationAdminService deviceAccessAuthorizationAdminService; private static DeviceAccessAuthorizationAdminService deviceAccessAuthorizationAdminService;
private static OAuthRequestInterceptor oAuthRequestInterceptor; private static OAuthRequestInterceptor oAuthRequestInterceptor;
private static final String GATEWAY_ERROR_CODE = "<am:code>404</am:code>"; private static final String GATEWAY_ERROR_CODE = "<am:code>404</am:code>";
private static final String ALL_TENANT_DOMAIN = "+";
public DeviceAccessBasedMQTTAuthorizer() { public DeviceAccessBasedMQTTAuthorizer() {
oAuthRequestInterceptor = new OAuthRequestInterceptor(); oAuthRequestInterceptor = new OAuthRequestInterceptor();
@ -102,6 +104,13 @@ public class DeviceAccessBasedMQTTAuthorizer implements IAuthorizer {
try { try {
String topics[] = topic.split("/"); String topics[] = topic.split("/");
String tenantDomainFromTopic = topics[0]; String tenantDomainFromTopic = topics[0];
if ("+".equals(tenantDomainFromTopic)) {
if (MultitenantConstants.SUPER_TENANT_DOMAIN_NAME.equals(authorizationSubject.getTenantDomain())
&& isUserAuthorized(authorizationSubject, DEFAULT_ADMIN_PERMISSION, UI_EXECUTE)) {
return true;
}
return false;
}
if (!tenantDomainFromTopic.equals(authorizationSubject.getTenantDomain())) { if (!tenantDomainFromTopic.equals(authorizationSubject.getTenantDomain())) {
return false; return false;
} }

@ -37,6 +37,7 @@
<module>cdmf-transport-adapters</module> <module>cdmf-transport-adapters</module>
<module>mb-extensions</module> <module>mb-extensions</module>
<module>siddhi-extensions</module> <module>siddhi-extensions</module>
<module>pull-notification-listeners</module>
</modules> </modules>
<build> <build>

@ -0,0 +1,110 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
~
~ WSO2 Inc. licenses this file to you under the Apache License,
~ Version 2.0 (the "License"); you may not use this file except
~ in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.wso2.carbon.devicemgt-plugins</groupId>
<artifactId>pull-notification-listeners</artifactId>
<version>3.0.37-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>org.wso2.carbon.device.mgt.mqtt.notification.listener</artifactId>
<packaging>bundle</packaging>
<name>WSO2 Carbon - MQTT Pull Notification Listener Implementation</name>
<description>WSO2 Carbon - MQTT Pull Notification Lister Implementation</description>
<url>http://wso2.org</url>
<dependencies>
<dependency>
<groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>org.wso2.carbon.device.mgt.common</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>org.wso2.carbon.device.mgt.core</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.event.input.adapter.core</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.osgi</groupId>
<artifactId>org.eclipse.osgi</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.osgi</groupId>
<artifactId>org.eclipse.osgi.services</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.osgi</groupId>
<artifactId>org.eclipse.osgi.services</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.carbon.devicemgt-plugins</groupId>
<artifactId>org.wso2.carbon.device.mgt.input.adapter.extension</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-scr-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<extensions>true</extensions>
<configuration>
<instructions>
<Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
<Bundle-Name>${project.artifactId}</Bundle-Name>
<Bundle-Version>${carbon.devicemgt.plugins.version}</Bundle-Version>
<Bundle-Description>MQTT Pull Notification Provider Bundle</Bundle-Description>
<Export-Package>
!org.wso2.carbon.device.mgt.mqtt.notification.listener.internal,
org.wso2.carbon.device.mgt.mqtt.notification.listener.*
</Export-Package>
<Import-Package>
org.osgi.framework,
org.osgi.service.component,
org.apache.commons.logging,
org.wso2.carbon.device.mgt.common.*,
org.wso2.carbon.device.mgt.core.service,
com.google.gson,
org.wso2.carbon.context,
org.wso2.carbon.device.mgt.input.adapter.extension,
org.wso2.carbon.event.input.adapter.core,
org.wso2.carbon.event.input.adapter.core.exception,
org.wso2.carbon.user.api,
org.wso2.carbon.core,
org.wso2.carbon.device.mgt.core.config,
org.wso2.carbon.device.mgt.core.config.pull.notification
</Import-Package>
</instructions>
</configuration>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,65 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.wso2.carbon.device.mgt.mqtt.notification.listener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.common.pull.notification.NotificationContext;
import org.wso2.carbon.device.mgt.common.pull.notification.PullNotificationExecutionFailedException;
import org.wso2.carbon.device.mgt.mqtt.notification.listener.internal.MqttNotificationDataHolder;
import org.wso2.carbon.event.input.adapter.core.InputEventAdapterSubscription;
import org.wso2.carbon.user.api.UserStoreException;
/**
* Creates a event subscription for the input adapter.
*/
public class DeviceTypeOperationAdapterSubscription implements InputEventAdapterSubscription {
private static final Log log = LogFactory.getLog(DeviceTypeOperationAdapterSubscription.class);
@Override
public void onEvent(Object o) {
if (o == null || !(o instanceof NotificationMessage)) {
return;
}
NotificationMessage notificationMessage = (NotificationMessage) o;
PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(notificationMessage.getTenantDomain(),
true);
String deviceType = "";
try {
PrivilegedCarbonContext.getThreadLocalCarbonContext().setUsername(PrivilegedCarbonContext.
getThreadLocalCarbonContext().getUserRealm().getRealmConfiguration().getAdminUserName());
NotificationContext notificationContext = notificationMessage.getNotificationContext();
deviceType = notificationContext.getDeviceId().getType();
MqttNotificationDataHolder.getInstance().getDeviceManagementProviderService()
.executePullNotification(deviceType, notificationContext);
} catch (UserStoreException e) {
log.error("Failed to retrieve tenant username", e);
} catch (PullNotificationExecutionFailedException e) {
log.error("Failed to execute device type pull notification subscriber execution for device type" + deviceType,
e);
} finally {
PrivilegedCarbonContext.endTenantFlow();
}
}
}

@ -0,0 +1,49 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.wso2.carbon.device.mgt.mqtt.notification.listener;
import org.wso2.carbon.device.mgt.common.pull.notification.NotificationContext;
public class NotificationMessage {
private String tenantDomain;
private NotificationContext notificationContext;
public NotificationMessage(String tenantDomain, NotificationContext notificationContext) {
this.tenantDomain = tenantDomain;
this.notificationContext = notificationContext;
}
public String getTenantDomain() {
return tenantDomain;
}
public void setTenantDomain(String tenantDomain) {
this.tenantDomain = tenantDomain;
}
public NotificationContext getNotificationContext() {
return notificationContext;
}
public void setNotificationContext(
NotificationContext notificationContext) {
this.notificationContext = notificationContext;
}
}

@ -0,0 +1,65 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.wso2.carbon.device.mgt.mqtt.notification.listener;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
import org.wso2.carbon.device.mgt.common.pull.notification.NotificationContext;
import org.wso2.carbon.device.mgt.common.pull.notification.NotificationPayload;
import org.wso2.carbon.device.mgt.input.adapter.extension.ContentTransformer;
import java.util.Map;
/**
* This transforms the incomming message payload to notification message, inorder to pass this
* information before its passed to the input adapter subscriber.
*/
public class PullNotificationMqttContentTransformer implements ContentTransformer {
public static final String MQTT_NOTIFICATION_MESSAGE_TRANSFORMER = "mqtt-notification-transformer";
@Override
public String getType() {
return MQTT_NOTIFICATION_MESSAGE_TRANSFORMER;
}
@Override
public Object transform(Object message, Map<String, Object> dynamicProperties) {
String topic = (String) dynamicProperties.get("topic");
String[] topicParams = topic.split("/");
String tenantDomain = topicParams[0];
String deviceType = topicParams[1];
String deviceId = topicParams[2];
Gson gson = new Gson();
try {
NotificationPayload notificationPayload = gson.fromJson((String) message, NotificationPayload.class);
NotificationContext notificationContext = new NotificationContext
(new DeviceIdentifier(deviceId, deviceType), notificationPayload);
return new NotificationMessage(tenantDomain, notificationContext);
} catch (Exception e) {
//Avoid notification listener to fail.
return new Object();
} finally {
PrivilegedCarbonContext.endTenantFlow();
}
}
}

@ -0,0 +1,61 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.wso2.carbon.device.mgt.mqtt.notification.listener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.core.ServerStartupObserver;
import org.wso2.carbon.device.mgt.core.config.DeviceConfigurationManager;
import org.wso2.carbon.device.mgt.mqtt.notification.listener.internal.MqttNotificationDataHolder;
import org.wso2.carbon.device.mgt.mqtt.notification.listener.util.MqttNotificationListener;
import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
/**
* Startup listener is been used to make sure the reciever gets activated after the server start up to avoid
* Bundle not loading issues.
*/
public class PullNotificationStartupListener implements ServerStartupObserver {
private static final Log log = LogFactory.getLog(PullNotificationStartupListener.class);
@Override
public void completingServerStartup() {
}
@Override
public void completedServerStartup() {
PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(
MultitenantConstants.SUPER_TENANT_DOMAIN_NAME, true);
try {
//TODO DeviceConfiguration Either need to be a osgi service or need to add those variable to system variables.
boolean isEnabled = DeviceConfigurationManager.getInstance().getDeviceManagementConfig()
.getPullNotificationConfiguration().isEnabled();
if (isEnabled) {
MqttNotificationListener.setupMqttInputAdapter();
MqttNotificationDataHolder.getInstance().getInputEventAdapterService().start();
log.info("Mqtt operation listener activated");
}
} finally {
PrivilegedCarbonContext.endTenantFlow();
}
}
}

@ -0,0 +1,51 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.wso2.carbon.device.mgt.mqtt.notification.listener.internal;
import org.wso2.carbon.device.mgt.core.service.DeviceManagementProviderService;
import org.wso2.carbon.event.input.adapter.core.InputEventAdapterService;
public class MqttNotificationDataHolder {
private DeviceManagementProviderService deviceManagementProviderService;
private InputEventAdapterService inputEventAdapterService;
private static MqttNotificationDataHolder thisInstance = new MqttNotificationDataHolder();
public static MqttNotificationDataHolder getInstance() {
return thisInstance;
}
public DeviceManagementProviderService getDeviceManagementProviderService() {
return deviceManagementProviderService;
}
public void setDeviceManagementProviderService(DeviceManagementProviderService deviceManagementProviderService) {
this.deviceManagementProviderService = deviceManagementProviderService;
}
public InputEventAdapterService getInputEventAdapterService() {
return inputEventAdapterService;
}
public void setInputEventAdapterService(
InputEventAdapterService inputEventAdapterService) {
this.inputEventAdapterService = inputEventAdapterService;
}
}

@ -0,0 +1,89 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.wso2.carbon.device.mgt.mqtt.notification.listener.internal;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.ComponentContext;
import org.wso2.carbon.core.ServerStartupObserver;
import org.wso2.carbon.device.mgt.core.service.DeviceManagementProviderService;
import org.wso2.carbon.device.mgt.input.adapter.extension.ContentTransformer;
import org.wso2.carbon.device.mgt.mqtt.notification.listener.PullNotificationMqttContentTransformer;
import org.wso2.carbon.device.mgt.mqtt.notification.listener.PullNotificationStartupListener;
import org.wso2.carbon.device.mgt.mqtt.notification.listener.util.MqttNotificationListener;
import org.wso2.carbon.event.input.adapter.core.InputEventAdapterService;
/**
* @scr.component name="org.wso2.carbon.device.mgt.mqtt.notification.listener.internal.PullNotificationListenerServiceComponent" immediate="true"
* @scr.reference name="carbon.device.mgt.provider"
* interface="org.wso2.carbon.device.mgt.core.service.DeviceManagementProviderService"
* cardinality="1..1"
* policy="dynamic"
* bind="setDeviceManagementProviderService"
* unbind="unsetDeviceManagementProviderService"
* @scr.reference name="event.input.adapter.service"
* interface="org.wso2.carbon.event.input.adapter.core.InputEventAdapterService"
* cardinality="1..1"
* policy="dynamic"
* bind="setInputEventAdapterService"
* unbind="unsetInputEventAdapterService"
*/
public class PullNotificationListenerServiceComponent {
private static final Log log = LogFactory.getLog(PullNotificationListenerServiceComponent.class);
@SuppressWarnings("unused")
protected void activate(ComponentContext componentContext) {
try {
//Do nothing
if (log.isDebugEnabled()) {
log.debug("pull notification provider implementation bundle has been successfully " +
"initialized");
}
BundleContext bundleContext = componentContext.getBundleContext();
bundleContext.registerService(ServerStartupObserver.class.getName(), new PullNotificationStartupListener(),
null);
bundleContext.registerService(ContentTransformer.class.getName(), new PullNotificationMqttContentTransformer(), null);
} catch (Throwable e) {
log.error("Error occurred while initializing pull notification provider implementation bundle", e);
}
}
protected void deactivate(ComponentContext componentContext) {
//Do nothing
}
protected void setDeviceManagementProviderService(DeviceManagementProviderService deviceManagementProviderService) {
MqttNotificationDataHolder.getInstance().setDeviceManagementProviderService(deviceManagementProviderService);
}
protected void unsetDeviceManagementProviderService(DeviceManagementProviderService deviceManagementProviderService) {
MqttNotificationDataHolder.getInstance().setDeviceManagementProviderService(deviceManagementProviderService);
}
protected void setInputEventAdapterService(InputEventAdapterService inputEventAdapterService) {
MqttNotificationDataHolder.getInstance().setInputEventAdapterService(inputEventAdapterService);
}
protected void unsetInputEventAdapterService(InputEventAdapterService inputEventAdapterService) {
MqttNotificationDataHolder.getInstance().setInputEventAdapterService(null);
}
}

@ -0,0 +1,71 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.wso2.carbon.device.mgt.mqtt.notification.listener.util;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.mqtt.notification.listener.DeviceTypeOperationAdapterSubscription;
import org.wso2.carbon.device.mgt.mqtt.notification.listener.PullNotificationMqttContentTransformer;
import org.wso2.carbon.device.mgt.mqtt.notification.listener.internal.MqttNotificationDataHolder;
import org.wso2.carbon.event.input.adapter.core.InputEventAdapterConfiguration;
import org.wso2.carbon.event.input.adapter.core.exception.InputEventAdapterException;
import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
import java.util.HashMap;
import java.util.Map;
/**
* This creates a link between input adapter and the subscription of the input adpater.
*/
public class MqttNotificationListener {
private static final Log log = LogFactory.getLog(MqttNotificationListener.class);
private static final String TOPIC = "topic";
private static final String SUBSCRIBED_TOPIC = "+/+/+/update/operation";
private static final String TYPE = "oauth-mqtt";
private static final String JSON = "json";
private static final String NAME = "iot_core_server_adapter";
private static final String CONTENT_TRANSFORMER_TYPE = "contentTransformer";
public static void setupMqttInputAdapter() {
InputEventAdapterConfiguration inputEventAdapterConfiguration = new InputEventAdapterConfiguration();
inputEventAdapterConfiguration.setName(NAME);
inputEventAdapterConfiguration.setType(TYPE);
inputEventAdapterConfiguration.setMessageFormat(JSON);
Map<String, String> mqttAdapterProperties = new HashMap<>();
mqttAdapterProperties.put(TOPIC, SUBSCRIBED_TOPIC);
mqttAdapterProperties.put(CONTENT_TRANSFORMER_TYPE,
PullNotificationMqttContentTransformer.MQTT_NOTIFICATION_MESSAGE_TRANSFORMER);
inputEventAdapterConfiguration.setProperties(mqttAdapterProperties);
try {
PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(MultitenantConstants
.SUPER_TENANT_DOMAIN_NAME, true);
MqttNotificationDataHolder.getInstance().getInputEventAdapterService()
.create(inputEventAdapterConfiguration, new DeviceTypeOperationAdapterSubscription());
} catch (InputEventAdapterException e) {
log.error("Unable to create Input Event Adapter for pull notification.", e);
} finally {
PrivilegedCarbonContext.endTenantFlow();
}
}
}

@ -0,0 +1,58 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (c) 2014, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
~
~ WSO2 Inc. licenses this file to you under the Apache License,
~ Version 2.0 (the "License"); you may not use this file except
~ in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.wso2.carbon.devicemgt-plugins</groupId>
<artifactId>extensions</artifactId>
<version>3.0.37-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>pull-notification-listeners</artifactId>
<packaging>pom</packaging>
<name>WSO2 Carbon - Pull Notification Extension</name>
<url>http://wso2.org</url>
<modules>
<module>org.wso2.carbon.device.mgt.mqtt.notification.listener</module>
</modules>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-scr-plugin</artifactId>
<version>1.7.2</version>
<executions>
<execution>
<id>generate-scr-scrdescriptor</id>
<goals>
<goal>scr</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>

@ -53,12 +53,11 @@ public class EventReceiverServiceImpl implements EventReceiverService {
log.debug("Invoking Android device even logging."); log.debug("Invoking Android device even logging.");
} }
Message message = new Message(); Message message = new Message();
Object metaData[] = {eventBeanWrapper.getDeviceIdentifier()}; Object payload[] = {eventBeanWrapper.getDeviceIdentifier(),
eventBeanWrapper.getPayload(), eventBeanWrapper.getType()};
Object payload[] = {eventBeanWrapper.getPayload(), eventBeanWrapper.getType()};
try { try {
if (AndroidAPIUtils.getEventPublisherService().publishEvent( if (AndroidAPIUtils.getEventPublisherService().publishEvent(
EVENT_STREAM_DEFINITION, "1.0.0", metaData, new Object[0], payload)) { EVENT_STREAM_DEFINITION, "1.0.0", new Object[0], new Object[0], payload)) {
message.setResponseCode("Event is published successfully."); message.setResponseCode("Event is published successfully.");
return Response.status(Response.Status.CREATED).entity(message).build(); return Response.status(Response.Status.CREATED).entity(message).build();
} else { } else {

@ -29,6 +29,7 @@ import org.wso2.carbon.device.mgt.common.app.mgt.ApplicationManager;
import org.wso2.carbon.device.mgt.common.configuration.mgt.ConfigurationEntry; import org.wso2.carbon.device.mgt.common.configuration.mgt.ConfigurationEntry;
import org.wso2.carbon.device.mgt.common.configuration.mgt.PlatformConfiguration; import org.wso2.carbon.device.mgt.common.configuration.mgt.PlatformConfiguration;
import org.wso2.carbon.device.mgt.common.policy.mgt.PolicyMonitoringManager; import org.wso2.carbon.device.mgt.common.policy.mgt.PolicyMonitoringManager;
import org.wso2.carbon.device.mgt.common.pull.notification.PullNotificationSubscriber;
import org.wso2.carbon.device.mgt.common.push.notification.PushNotificationConfig; import org.wso2.carbon.device.mgt.common.push.notification.PushNotificationConfig;
import org.wso2.carbon.device.mgt.common.spi.DeviceManagementService; import org.wso2.carbon.device.mgt.common.spi.DeviceManagementService;
import org.wso2.carbon.device.mgt.mobile.android.impl.util.AndroidPluginConstants; import org.wso2.carbon.device.mgt.mobile.android.impl.util.AndroidPluginConstants;
@ -120,6 +121,11 @@ public class AndroidDeviceManagementService implements DeviceManagementService {
return null; return null;
} }
@Override
public PullNotificationSubscriber getPullNotificationSubscriber() {
return null;
}
private String getConfigProperty(List<ConfigurationEntry> configs, String propertyName) { private String getConfigProperty(List<ConfigurationEntry> configs, String propertyName) {
for (ConfigurationEntry entry : configs) { for (ConfigurationEntry entry : configs) {
if (propertyName.equals(entry.getName())) { if (propertyName.equals(entry.getName())) {

@ -21,6 +21,7 @@ package org.wso2.carbon.device.mgt.mobile.windows.impl;
import org.wso2.carbon.device.mgt.common.*; import org.wso2.carbon.device.mgt.common.*;
import org.wso2.carbon.device.mgt.common.app.mgt.ApplicationManager; import org.wso2.carbon.device.mgt.common.app.mgt.ApplicationManager;
import org.wso2.carbon.device.mgt.common.policy.mgt.PolicyMonitoringManager; import org.wso2.carbon.device.mgt.common.policy.mgt.PolicyMonitoringManager;
import org.wso2.carbon.device.mgt.common.pull.notification.PullNotificationSubscriber;
import org.wso2.carbon.device.mgt.common.push.notification.PushNotificationConfig; import org.wso2.carbon.device.mgt.common.push.notification.PushNotificationConfig;
import org.wso2.carbon.device.mgt.common.spi.DeviceManagementService; import org.wso2.carbon.device.mgt.common.spi.DeviceManagementService;
@ -80,4 +81,9 @@ public class WindowsDeviceManagementService implements DeviceManagementService {
return null; return null;
} }
@Override
public PullNotificationSubscriber getPullNotificationSubscriber() {
return null;
}
} }

@ -0,0 +1,61 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.wso2.carbon.devicemgt-plugins</groupId>
<artifactId>extensions-feature</artifactId>
<version>3.0.37-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>org.wso2.carbon.device.mgt.notification.listener.feature</artifactId>
<packaging>pom</packaging>
<version>3.0.37-SNAPSHOT</version>
<name>WSO2 Carbon - Notification Listener</name>
<url>http://wso2.org</url>
<description>This feature contains the core bundles required iot core listeners</description>
<dependencies>
<dependency>
<groupId>org.wso2.carbon.devicemgt-plugins</groupId>
<artifactId>org.wso2.carbon.device.mgt.mqtt.notification.listener</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.wso2.maven</groupId>
<artifactId>carbon-p2-plugin</artifactId>
<version>${carbon.p2.plugin.version}</version>
<executions>
<execution>
<id>4-p2-feature-generation</id>
<phase>package</phase>
<goals>
<goal>p2-feature-gen</goal>
</goals>
<configuration>
<id>org.wso2.carbon.device.mgt.notification.listener</id>
<propertiesFile>../etc/feature.properties</propertiesFile>
<adviceFile>
<properties>
<propertyDef>org.wso2.carbon.p2.category.type:server</propertyDef>
<propertyDef>org.eclipse.equinox.p2.type.group:false</propertyDef>
</properties>
</adviceFile>
<bundles>
<bundleDef>
org.wso2.carbon.devicemgt-plugins:org.wso2.carbon.device.mgt.mqtt.notification.listener:${carbon.devicemgt.plugins.version}
</bundleDef>
</bundles>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,19 @@
#
# Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
#
# WSO2 Inc. licenses this file to you under the Apache License,
# Version 2.0 (the "License"); you may not use this file except
# in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
custom = true

@ -39,6 +39,7 @@
<module>org.wso2.carbon.device.mgt.adapter.feature</module> <module>org.wso2.carbon.device.mgt.adapter.feature</module>
<module>org.wso2.carbon.andes.extensions.device.mgt.mqtt.authorization.feature</module> <module>org.wso2.carbon.andes.extensions.device.mgt.mqtt.authorization.feature</module>
<module>org.wso2.extension.siddhi.execution.json.feature</module> <module>org.wso2.extension.siddhi.execution.json.feature</module>
<module>org.wso2.carbon.device.mgt.notification.listener.feature</module>
</modules> </modules>
</project> </project>

@ -453,7 +453,11 @@
<artifactId>org.wso2.carbon.device.mgt.output.adapter.websocket</artifactId> <artifactId>org.wso2.carbon.device.mgt.output.adapter.websocket</artifactId>
<version>${carbon.devicemgt.plugins.version}</version> <version>${carbon.devicemgt.plugins.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.wso2.carbon.devicemgt-plugins</groupId>
<artifactId>org.wso2.carbon.device.mgt.mqtt.notification.listener</artifactId>
<version>${carbon.devicemgt.plugins.version}</version>
</dependency>
<!--Android sense DeviceType Impl, API and Agent--> <!--Android sense DeviceType Impl, API and Agent-->
<dependency> <dependency>
<groupId>org.wso2.carbon.devicemgt-plugins</groupId> <groupId>org.wso2.carbon.devicemgt-plugins</groupId>
@ -1153,7 +1157,7 @@
<javax.ws.rs.version>1.1.1</javax.ws.rs.version> <javax.ws.rs.version>1.1.1</javax.ws.rs.version>
<!-- Carbon Device Management --> <!-- Carbon Device Management -->
<carbon.devicemgt.version>2.0.58</carbon.devicemgt.version> <carbon.devicemgt.version>2.0.69-SNAPSHOT</carbon.devicemgt.version>
<carbon.devicemgt.version.range>[2.0.0, 3.0.0)</carbon.devicemgt.version.range> <carbon.devicemgt.version.range>[2.0.0, 3.0.0)</carbon.devicemgt.version.range>
<!-- Carbon App Management --> <!-- Carbon App Management -->

Loading…
Cancel
Save