diff --git a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java
index e6e9fe7001..4d83480569 100644
--- a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java
+++ b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java
@@ -91,6 +91,10 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
this.topic = PropertyUtils.replaceTenantDomainProperty(topic);
this.eventAdapterListener = inputEventAdapterListener;
this.tenantDomain = this.topic.split("/")[0];
+ //this is to allow server listener from IoT Core to connect.
+ if (this.tenantDomain.equals("+")) {
+ this.tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
+ }
//SORTING messages until the server fetches them
String temp_directory = System.getProperty("java.io.tmpdir");
@@ -126,8 +130,8 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
.getContentTransformer(contentTransformerType);
}
} catch (MqttException e) {
- log.error("Exception occurred while subscribing to MQTT broker at "
- + mqttBrokerConnectionConfiguration.getBrokerUrl());
+ log.error("Exception occurred while creating an mqtt client to "
+ + mqttBrokerConnectionConfiguration.getBrokerUrl() + " reason code:" + e.getReasonCode());
throw new InputEventAdapterRuntimeException(e);
}
}
diff --git a/components/extensions/mb-extensions/org.wso2.carbon.andes.extensions.device.mgt.mqtt.authorization/src/main/java/org/wso2/carbon/andes/extensions/device/mgt/mqtt/authorization/DeviceAccessBasedMQTTAuthorizer.java b/components/extensions/mb-extensions/org.wso2.carbon.andes.extensions.device.mgt.mqtt.authorization/src/main/java/org/wso2/carbon/andes/extensions/device/mgt/mqtt/authorization/DeviceAccessBasedMQTTAuthorizer.java
index 568c567eca..9d4e609d8d 100644
--- a/components/extensions/mb-extensions/org.wso2.carbon.andes.extensions.device.mgt.mqtt.authorization/src/main/java/org/wso2/carbon/andes/extensions/device/mgt/mqtt/authorization/DeviceAccessBasedMQTTAuthorizer.java
+++ b/components/extensions/mb-extensions/org.wso2.carbon.andes.extensions.device.mgt.mqtt.authorization/src/main/java/org/wso2/carbon/andes/extensions/device/mgt/mqtt/authorization/DeviceAccessBasedMQTTAuthorizer.java
@@ -74,11 +74,13 @@ public class DeviceAccessBasedMQTTAuthorizer implements IAuthorizer {
private static Log log = LogFactory.getLog(DeviceAccessBasedMQTTAuthorizer.class);
private AuthorizationConfigurationManager MQTTAuthorizationConfiguration;
private static final String CDMF_SERVER_BASE_CONTEXT = "/api/device-mgt/v1.0";
+ private static final String DEFAULT_ADMIN_PERMISSION = "permission/admin/device-mgt";
private static final String CACHE_MANAGER_NAME = "mqttAuthorizationCacheManager";
private static final String CACHE_NAME = "mqttAuthorizationCache";
private static DeviceAccessAuthorizationAdminService deviceAccessAuthorizationAdminService;
private static OAuthRequestInterceptor oAuthRequestInterceptor;
private static final String GATEWAY_ERROR_CODE = "404";
+ private static final String ALL_TENANT_DOMAIN = "+";
public DeviceAccessBasedMQTTAuthorizer() {
oAuthRequestInterceptor = new OAuthRequestInterceptor();
@@ -102,6 +104,13 @@ public class DeviceAccessBasedMQTTAuthorizer implements IAuthorizer {
try {
String topics[] = topic.split("/");
String tenantDomainFromTopic = topics[0];
+ if ("+".equals(tenantDomainFromTopic)) {
+ if (MultitenantConstants.SUPER_TENANT_DOMAIN_NAME.equals(authorizationSubject.getTenantDomain())
+ && isUserAuthorized(authorizationSubject, DEFAULT_ADMIN_PERMISSION, UI_EXECUTE)) {
+ return true;
+ }
+ return false;
+ }
if (!tenantDomainFromTopic.equals(authorizationSubject.getTenantDomain())) {
return false;
}
diff --git a/components/extensions/pom.xml b/components/extensions/pom.xml
index 639275b5a2..0cef1ecda0 100644
--- a/components/extensions/pom.xml
+++ b/components/extensions/pom.xml
@@ -37,6 +37,7 @@
cdmf-transport-adapters
mb-extensions
siddhi-extensions
+ pull-notification-listeners
diff --git a/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/pom.xml b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/pom.xml
new file mode 100644
index 0000000000..612800260f
--- /dev/null
+++ b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/pom.xml
@@ -0,0 +1,110 @@
+
+
+
+
+
+
+ org.wso2.carbon.devicemgt-plugins
+ pull-notification-listeners
+ 3.0.37-SNAPSHOT
+ ../pom.xml
+
+
+
+ 4.0.0
+ org.wso2.carbon.device.mgt.mqtt.notification.listener
+ bundle
+ WSO2 Carbon - MQTT Pull Notification Listener Implementation
+ WSO2 Carbon - MQTT Pull Notification Lister Implementation
+ http://wso2.org
+
+
+
+ org.wso2.carbon.devicemgt
+ org.wso2.carbon.device.mgt.common
+
+
+ org.wso2.carbon.devicemgt
+ org.wso2.carbon.device.mgt.core
+
+
+ org.wso2.carbon.analytics-common
+ org.wso2.carbon.event.input.adapter.core
+
+
+ org.eclipse.osgi
+ org.eclipse.osgi
+
+
+ org.eclipse.osgi
+ org.eclipse.osgi.services
+
+
+ org.eclipse.osgi
+ org.eclipse.osgi.services
+
+
+ org.wso2.carbon.devicemgt-plugins
+ org.wso2.carbon.device.mgt.input.adapter.extension
+
+
+
+
+
+
+ org.apache.felix
+ maven-scr-plugin
+
+
+ org.apache.felix
+ maven-bundle-plugin
+ true
+
+
+ ${project.artifactId}
+ ${project.artifactId}
+ ${carbon.devicemgt.plugins.version}
+ MQTT Pull Notification Provider Bundle
+
+ !org.wso2.carbon.device.mgt.mqtt.notification.listener.internal,
+ org.wso2.carbon.device.mgt.mqtt.notification.listener.*
+
+
+ org.osgi.framework,
+ org.osgi.service.component,
+ org.apache.commons.logging,
+ org.wso2.carbon.device.mgt.common.*,
+ org.wso2.carbon.device.mgt.core.service,
+ com.google.gson,
+ org.wso2.carbon.context,
+ org.wso2.carbon.device.mgt.input.adapter.extension,
+ org.wso2.carbon.event.input.adapter.core,
+ org.wso2.carbon.event.input.adapter.core.exception,
+ org.wso2.carbon.user.api,
+ org.wso2.carbon.core,
+ org.wso2.carbon.device.mgt.core.config,
+ org.wso2.carbon.device.mgt.core.config.pull.notification
+
+
+
+
+
+
+
+
diff --git a/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/DeviceTypeOperationAdapterSubscription.java b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/DeviceTypeOperationAdapterSubscription.java
new file mode 100644
index 0000000000..b2896991fc
--- /dev/null
+++ b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/DeviceTypeOperationAdapterSubscription.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.wso2.carbon.device.mgt.mqtt.notification.listener;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.context.PrivilegedCarbonContext;
+import org.wso2.carbon.device.mgt.common.pull.notification.NotificationContext;
+import org.wso2.carbon.device.mgt.common.pull.notification.PullNotificationExecutionFailedException;
+import org.wso2.carbon.device.mgt.mqtt.notification.listener.internal.MqttNotificationDataHolder;
+import org.wso2.carbon.event.input.adapter.core.InputEventAdapterSubscription;
+import org.wso2.carbon.user.api.UserStoreException;
+
+/**
+ * Creates a event subscription for the input adapter.
+ */
+public class DeviceTypeOperationAdapterSubscription implements InputEventAdapterSubscription {
+ private static final Log log = LogFactory.getLog(DeviceTypeOperationAdapterSubscription.class);
+
+ @Override
+ public void onEvent(Object o) {
+
+ if (o == null || !(o instanceof NotificationMessage)) {
+ return;
+ }
+
+ NotificationMessage notificationMessage = (NotificationMessage) o;
+ PrivilegedCarbonContext.startTenantFlow();
+ PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(notificationMessage.getTenantDomain(),
+ true);
+ String deviceType = "";
+ try {
+ PrivilegedCarbonContext.getThreadLocalCarbonContext().setUsername(PrivilegedCarbonContext.
+ getThreadLocalCarbonContext().getUserRealm().getRealmConfiguration().getAdminUserName());
+ NotificationContext notificationContext = notificationMessage.getNotificationContext();
+ deviceType = notificationContext.getDeviceId().getType();
+ MqttNotificationDataHolder.getInstance().getDeviceManagementProviderService()
+ .executePullNotification(deviceType, notificationContext);
+ } catch (UserStoreException e) {
+ log.error("Failed to retrieve tenant username", e);
+ } catch (PullNotificationExecutionFailedException e) {
+ log.error("Failed to execute device type pull notification subscriber execution for device type" + deviceType,
+ e);
+ } finally {
+ PrivilegedCarbonContext.endTenantFlow();
+ }
+
+ }
+}
diff --git a/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/NotificationMessage.java b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/NotificationMessage.java
new file mode 100644
index 0000000000..3fdb6e7bc1
--- /dev/null
+++ b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/NotificationMessage.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.wso2.carbon.device.mgt.mqtt.notification.listener;
+
+import org.wso2.carbon.device.mgt.common.pull.notification.NotificationContext;
+
+public class NotificationMessage {
+
+ private String tenantDomain;
+ private NotificationContext notificationContext;
+
+ public NotificationMessage(String tenantDomain, NotificationContext notificationContext) {
+ this.tenantDomain = tenantDomain;
+ this.notificationContext = notificationContext;
+ }
+
+ public String getTenantDomain() {
+ return tenantDomain;
+ }
+
+ public void setTenantDomain(String tenantDomain) {
+ this.tenantDomain = tenantDomain;
+ }
+
+ public NotificationContext getNotificationContext() {
+ return notificationContext;
+ }
+
+ public void setNotificationContext(
+ NotificationContext notificationContext) {
+ this.notificationContext = notificationContext;
+ }
+}
diff --git a/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/PullNotificationMqttContentTransformer.java b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/PullNotificationMqttContentTransformer.java
new file mode 100644
index 0000000000..18b9a331b0
--- /dev/null
+++ b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/PullNotificationMqttContentTransformer.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.wso2.carbon.device.mgt.mqtt.notification.listener;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import org.wso2.carbon.context.PrivilegedCarbonContext;
+import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
+import org.wso2.carbon.device.mgt.common.pull.notification.NotificationContext;
+import org.wso2.carbon.device.mgt.common.pull.notification.NotificationPayload;
+import org.wso2.carbon.device.mgt.input.adapter.extension.ContentTransformer;
+
+import java.util.Map;
+
+/**
+ * This transforms the incomming message payload to notification message, inorder to pass this
+ * information before its passed to the input adapter subscriber.
+ */
+public class PullNotificationMqttContentTransformer implements ContentTransformer {
+
+ public static final String MQTT_NOTIFICATION_MESSAGE_TRANSFORMER = "mqtt-notification-transformer";
+
+ @Override
+ public String getType() {
+ return MQTT_NOTIFICATION_MESSAGE_TRANSFORMER;
+ }
+
+ @Override
+ public Object transform(Object message, Map dynamicProperties) {
+ String topic = (String) dynamicProperties.get("topic");
+ String[] topicParams = topic.split("/");
+ String tenantDomain = topicParams[0];
+ String deviceType = topicParams[1];
+ String deviceId = topicParams[2];
+ Gson gson = new Gson();
+ try {
+ NotificationPayload notificationPayload = gson.fromJson((String) message, NotificationPayload.class);
+ NotificationContext notificationContext = new NotificationContext
+ (new DeviceIdentifier(deviceId, deviceType), notificationPayload);
+ return new NotificationMessage(tenantDomain, notificationContext);
+ } catch (Exception e) {
+ //Avoid notification listener to fail.
+ return new Object();
+ } finally {
+ PrivilegedCarbonContext.endTenantFlow();
+ }
+ }
+
+}
diff --git a/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/PullNotificationStartupListener.java b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/PullNotificationStartupListener.java
new file mode 100644
index 0000000000..5db5acadfa
--- /dev/null
+++ b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/PullNotificationStartupListener.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.wso2.carbon.device.mgt.mqtt.notification.listener;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.context.PrivilegedCarbonContext;
+import org.wso2.carbon.core.ServerStartupObserver;
+import org.wso2.carbon.device.mgt.core.config.DeviceConfigurationManager;
+import org.wso2.carbon.device.mgt.mqtt.notification.listener.internal.MqttNotificationDataHolder;
+import org.wso2.carbon.device.mgt.mqtt.notification.listener.util.MqttNotificationListener;
+import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
+
+/**
+ * Startup listener is been used to make sure the reciever gets activated after the server start up to avoid
+ * Bundle not loading issues.
+ */
+public class PullNotificationStartupListener implements ServerStartupObserver {
+
+ private static final Log log = LogFactory.getLog(PullNotificationStartupListener.class);
+
+ @Override
+ public void completingServerStartup() {
+ }
+
+ @Override
+ public void completedServerStartup() {
+ PrivilegedCarbonContext.startTenantFlow();
+ PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(
+ MultitenantConstants.SUPER_TENANT_DOMAIN_NAME, true);
+ try {
+ //TODO DeviceConfiguration Either need to be a osgi service or need to add those variable to system variables.
+ boolean isEnabled = DeviceConfigurationManager.getInstance().getDeviceManagementConfig()
+ .getPullNotificationConfiguration().isEnabled();
+ if (isEnabled) {
+ MqttNotificationListener.setupMqttInputAdapter();
+ MqttNotificationDataHolder.getInstance().getInputEventAdapterService().start();
+ log.info("Mqtt operation listener activated");
+ }
+ } finally {
+ PrivilegedCarbonContext.endTenantFlow();
+ }
+ }
+
+}
diff --git a/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/internal/MqttNotificationDataHolder.java b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/internal/MqttNotificationDataHolder.java
new file mode 100644
index 0000000000..4a95c543fe
--- /dev/null
+++ b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/internal/MqttNotificationDataHolder.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.wso2.carbon.device.mgt.mqtt.notification.listener.internal;
+
+import org.wso2.carbon.device.mgt.core.service.DeviceManagementProviderService;
+import org.wso2.carbon.event.input.adapter.core.InputEventAdapterService;
+
+public class MqttNotificationDataHolder {
+
+ private DeviceManagementProviderService deviceManagementProviderService;
+ private InputEventAdapterService inputEventAdapterService;
+
+ private static MqttNotificationDataHolder thisInstance = new MqttNotificationDataHolder();
+
+ public static MqttNotificationDataHolder getInstance() {
+ return thisInstance;
+ }
+
+ public DeviceManagementProviderService getDeviceManagementProviderService() {
+ return deviceManagementProviderService;
+ }
+
+ public void setDeviceManagementProviderService(DeviceManagementProviderService deviceManagementProviderService) {
+ this.deviceManagementProviderService = deviceManagementProviderService;
+ }
+
+ public InputEventAdapterService getInputEventAdapterService() {
+ return inputEventAdapterService;
+ }
+
+ public void setInputEventAdapterService(
+ InputEventAdapterService inputEventAdapterService) {
+ this.inputEventAdapterService = inputEventAdapterService;
+ }
+}
diff --git a/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/internal/PullNotificationListenerServiceComponent.java b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/internal/PullNotificationListenerServiceComponent.java
new file mode 100644
index 0000000000..df15fe2cf2
--- /dev/null
+++ b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/internal/PullNotificationListenerServiceComponent.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.wso2.carbon.device.mgt.mqtt.notification.listener.internal;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.component.ComponentContext;
+import org.wso2.carbon.core.ServerStartupObserver;
+import org.wso2.carbon.device.mgt.core.service.DeviceManagementProviderService;
+import org.wso2.carbon.device.mgt.input.adapter.extension.ContentTransformer;
+import org.wso2.carbon.device.mgt.mqtt.notification.listener.PullNotificationMqttContentTransformer;
+import org.wso2.carbon.device.mgt.mqtt.notification.listener.PullNotificationStartupListener;
+import org.wso2.carbon.device.mgt.mqtt.notification.listener.util.MqttNotificationListener;
+import org.wso2.carbon.event.input.adapter.core.InputEventAdapterService;
+
+/**
+ * @scr.component name="org.wso2.carbon.device.mgt.mqtt.notification.listener.internal.PullNotificationListenerServiceComponent" immediate="true"
+ * @scr.reference name="carbon.device.mgt.provider"
+ * interface="org.wso2.carbon.device.mgt.core.service.DeviceManagementProviderService"
+ * cardinality="1..1"
+ * policy="dynamic"
+ * bind="setDeviceManagementProviderService"
+ * unbind="unsetDeviceManagementProviderService"
+ * @scr.reference name="event.input.adapter.service"
+ * interface="org.wso2.carbon.event.input.adapter.core.InputEventAdapterService"
+ * cardinality="1..1"
+ * policy="dynamic"
+ * bind="setInputEventAdapterService"
+ * unbind="unsetInputEventAdapterService"
+ */
+public class PullNotificationListenerServiceComponent {
+
+ private static final Log log = LogFactory.getLog(PullNotificationListenerServiceComponent.class);
+
+ @SuppressWarnings("unused")
+ protected void activate(ComponentContext componentContext) {
+ try {
+ //Do nothing
+ if (log.isDebugEnabled()) {
+ log.debug("pull notification provider implementation bundle has been successfully " +
+ "initialized");
+ }
+ BundleContext bundleContext = componentContext.getBundleContext();
+ bundleContext.registerService(ServerStartupObserver.class.getName(), new PullNotificationStartupListener(),
+ null);
+ bundleContext.registerService(ContentTransformer.class.getName(), new PullNotificationMqttContentTransformer(), null);
+ } catch (Throwable e) {
+ log.error("Error occurred while initializing pull notification provider implementation bundle", e);
+ }
+ }
+
+ protected void deactivate(ComponentContext componentContext) {
+ //Do nothing
+ }
+
+ protected void setDeviceManagementProviderService(DeviceManagementProviderService deviceManagementProviderService) {
+ MqttNotificationDataHolder.getInstance().setDeviceManagementProviderService(deviceManagementProviderService);
+ }
+
+ protected void unsetDeviceManagementProviderService(DeviceManagementProviderService deviceManagementProviderService) {
+ MqttNotificationDataHolder.getInstance().setDeviceManagementProviderService(deviceManagementProviderService);
+ }
+
+ protected void setInputEventAdapterService(InputEventAdapterService inputEventAdapterService) {
+ MqttNotificationDataHolder.getInstance().setInputEventAdapterService(inputEventAdapterService);
+ }
+
+ protected void unsetInputEventAdapterService(InputEventAdapterService inputEventAdapterService) {
+ MqttNotificationDataHolder.getInstance().setInputEventAdapterService(null);
+ }
+
+}
diff --git a/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/util/MqttNotificationListener.java b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/util/MqttNotificationListener.java
new file mode 100644
index 0000000000..3f0c4048f5
--- /dev/null
+++ b/components/extensions/pull-notification-listeners/org.wso2.carbon.device.mgt.mqtt.notification.listener/src/main/java/org/wso2/carbon/device/mgt/mqtt/notification/listener/util/MqttNotificationListener.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.wso2.carbon.device.mgt.mqtt.notification.listener.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.context.PrivilegedCarbonContext;
+import org.wso2.carbon.device.mgt.mqtt.notification.listener.DeviceTypeOperationAdapterSubscription;
+import org.wso2.carbon.device.mgt.mqtt.notification.listener.PullNotificationMqttContentTransformer;
+import org.wso2.carbon.device.mgt.mqtt.notification.listener.internal.MqttNotificationDataHolder;
+import org.wso2.carbon.event.input.adapter.core.InputEventAdapterConfiguration;
+import org.wso2.carbon.event.input.adapter.core.exception.InputEventAdapterException;
+import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This creates a link between input adapter and the subscription of the input adpater.
+ */
+public class MqttNotificationListener {
+ private static final Log log = LogFactory.getLog(MqttNotificationListener.class);
+
+ private static final String TOPIC = "topic";
+ private static final String SUBSCRIBED_TOPIC = "+/+/+/update/operation";
+ private static final String TYPE = "oauth-mqtt";
+ private static final String JSON = "json";
+ private static final String NAME = "iot_core_server_adapter";
+ private static final String CONTENT_TRANSFORMER_TYPE = "contentTransformer";
+
+
+ public static void setupMqttInputAdapter() {
+ InputEventAdapterConfiguration inputEventAdapterConfiguration = new InputEventAdapterConfiguration();
+ inputEventAdapterConfiguration.setName(NAME);
+ inputEventAdapterConfiguration.setType(TYPE);
+ inputEventAdapterConfiguration.setMessageFormat(JSON);
+ Map mqttAdapterProperties = new HashMap<>();
+ mqttAdapterProperties.put(TOPIC, SUBSCRIBED_TOPIC);
+ mqttAdapterProperties.put(CONTENT_TRANSFORMER_TYPE,
+ PullNotificationMqttContentTransformer.MQTT_NOTIFICATION_MESSAGE_TRANSFORMER);
+ inputEventAdapterConfiguration.setProperties(mqttAdapterProperties);
+ try {
+ PrivilegedCarbonContext.startTenantFlow();
+ PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(MultitenantConstants
+ .SUPER_TENANT_DOMAIN_NAME, true);
+ MqttNotificationDataHolder.getInstance().getInputEventAdapterService()
+ .create(inputEventAdapterConfiguration, new DeviceTypeOperationAdapterSubscription());
+ } catch (InputEventAdapterException e) {
+ log.error("Unable to create Input Event Adapter for pull notification.", e);
+ } finally {
+ PrivilegedCarbonContext.endTenantFlow();
+ }
+ }
+
+}
diff --git a/components/extensions/pull-notification-listeners/pom.xml b/components/extensions/pull-notification-listeners/pom.xml
new file mode 100644
index 0000000000..54240211b2
--- /dev/null
+++ b/components/extensions/pull-notification-listeners/pom.xml
@@ -0,0 +1,58 @@
+
+
+
+
+
+
+ org.wso2.carbon.devicemgt-plugins
+ extensions
+ 3.0.37-SNAPSHOT
+ ../pom.xml
+
+
+ 4.0.0
+ pull-notification-listeners
+ pom
+ WSO2 Carbon - Pull Notification Extension
+ http://wso2.org
+
+
+ org.wso2.carbon.device.mgt.mqtt.notification.listener
+
+
+
+
+
+
+ org.apache.felix
+ maven-scr-plugin
+ 1.7.2
+
+
+ generate-scr-scrdescriptor
+
+ scr
+
+
+
+
+
+
+
+
diff --git a/components/mobile-plugins/android-plugin/org.wso2.carbon.device.mgt.mobile.android.api/src/main/java/org/wso2/carbon/mdm/services/android/services/impl/EventReceiverServiceImpl.java b/components/mobile-plugins/android-plugin/org.wso2.carbon.device.mgt.mobile.android.api/src/main/java/org/wso2/carbon/mdm/services/android/services/impl/EventReceiverServiceImpl.java
index 362289b2ca..f56fba52c6 100644
--- a/components/mobile-plugins/android-plugin/org.wso2.carbon.device.mgt.mobile.android.api/src/main/java/org/wso2/carbon/mdm/services/android/services/impl/EventReceiverServiceImpl.java
+++ b/components/mobile-plugins/android-plugin/org.wso2.carbon.device.mgt.mobile.android.api/src/main/java/org/wso2/carbon/mdm/services/android/services/impl/EventReceiverServiceImpl.java
@@ -53,12 +53,11 @@ public class EventReceiverServiceImpl implements EventReceiverService {
log.debug("Invoking Android device even logging.");
}
Message message = new Message();
- Object metaData[] = {eventBeanWrapper.getDeviceIdentifier()};
-
- Object payload[] = {eventBeanWrapper.getPayload(), eventBeanWrapper.getType()};
+ Object payload[] = {eventBeanWrapper.getDeviceIdentifier(),
+ eventBeanWrapper.getPayload(), eventBeanWrapper.getType()};
try {
if (AndroidAPIUtils.getEventPublisherService().publishEvent(
- EVENT_STREAM_DEFINITION, "1.0.0", metaData, new Object[0], payload)) {
+ EVENT_STREAM_DEFINITION, "1.0.0", new Object[0], new Object[0], payload)) {
message.setResponseCode("Event is published successfully.");
return Response.status(Response.Status.CREATED).entity(message).build();
} else {
diff --git a/components/mobile-plugins/android-plugin/org.wso2.carbon.device.mgt.mobile.android/src/main/java/org/wso2/carbon/device/mgt/mobile/android/impl/AndroidDeviceManagementService.java b/components/mobile-plugins/android-plugin/org.wso2.carbon.device.mgt.mobile.android/src/main/java/org/wso2/carbon/device/mgt/mobile/android/impl/AndroidDeviceManagementService.java
index fb8ef11775..5dc35ce60f 100644
--- a/components/mobile-plugins/android-plugin/org.wso2.carbon.device.mgt.mobile.android/src/main/java/org/wso2/carbon/device/mgt/mobile/android/impl/AndroidDeviceManagementService.java
+++ b/components/mobile-plugins/android-plugin/org.wso2.carbon.device.mgt.mobile.android/src/main/java/org/wso2/carbon/device/mgt/mobile/android/impl/AndroidDeviceManagementService.java
@@ -29,6 +29,7 @@ import org.wso2.carbon.device.mgt.common.app.mgt.ApplicationManager;
import org.wso2.carbon.device.mgt.common.configuration.mgt.ConfigurationEntry;
import org.wso2.carbon.device.mgt.common.configuration.mgt.PlatformConfiguration;
import org.wso2.carbon.device.mgt.common.policy.mgt.PolicyMonitoringManager;
+import org.wso2.carbon.device.mgt.common.pull.notification.PullNotificationSubscriber;
import org.wso2.carbon.device.mgt.common.push.notification.PushNotificationConfig;
import org.wso2.carbon.device.mgt.common.spi.DeviceManagementService;
import org.wso2.carbon.device.mgt.mobile.android.impl.util.AndroidPluginConstants;
@@ -120,6 +121,11 @@ public class AndroidDeviceManagementService implements DeviceManagementService {
return null;
}
+ @Override
+ public PullNotificationSubscriber getPullNotificationSubscriber() {
+ return null;
+ }
+
private String getConfigProperty(List configs, String propertyName) {
for (ConfigurationEntry entry : configs) {
if (propertyName.equals(entry.getName())) {
diff --git a/components/mobile-plugins/windows-plugin/org.wso2.carbon.device.mgt.mobile.windows/src/main/java/org/wso2/carbon/device/mgt/mobile/windows/impl/WindowsDeviceManagementService.java b/components/mobile-plugins/windows-plugin/org.wso2.carbon.device.mgt.mobile.windows/src/main/java/org/wso2/carbon/device/mgt/mobile/windows/impl/WindowsDeviceManagementService.java
index f3cc7b364b..d6b635df39 100644
--- a/components/mobile-plugins/windows-plugin/org.wso2.carbon.device.mgt.mobile.windows/src/main/java/org/wso2/carbon/device/mgt/mobile/windows/impl/WindowsDeviceManagementService.java
+++ b/components/mobile-plugins/windows-plugin/org.wso2.carbon.device.mgt.mobile.windows/src/main/java/org/wso2/carbon/device/mgt/mobile/windows/impl/WindowsDeviceManagementService.java
@@ -21,6 +21,7 @@ package org.wso2.carbon.device.mgt.mobile.windows.impl;
import org.wso2.carbon.device.mgt.common.*;
import org.wso2.carbon.device.mgt.common.app.mgt.ApplicationManager;
import org.wso2.carbon.device.mgt.common.policy.mgt.PolicyMonitoringManager;
+import org.wso2.carbon.device.mgt.common.pull.notification.PullNotificationSubscriber;
import org.wso2.carbon.device.mgt.common.push.notification.PushNotificationConfig;
import org.wso2.carbon.device.mgt.common.spi.DeviceManagementService;
@@ -80,4 +81,9 @@ public class WindowsDeviceManagementService implements DeviceManagementService {
return null;
}
+ @Override
+ public PullNotificationSubscriber getPullNotificationSubscriber() {
+ return null;
+ }
+
}
diff --git a/features/extensions-feature/org.wso2.carbon.device.mgt.notification.listener.feature/pom.xml b/features/extensions-feature/org.wso2.carbon.device.mgt.notification.listener.feature/pom.xml
new file mode 100644
index 0000000000..52e7817fee
--- /dev/null
+++ b/features/extensions-feature/org.wso2.carbon.device.mgt.notification.listener.feature/pom.xml
@@ -0,0 +1,61 @@
+
+
+
+ org.wso2.carbon.devicemgt-plugins
+ extensions-feature
+ 3.0.37-SNAPSHOT
+ ../pom.xml
+
+ 4.0.0
+
+ org.wso2.carbon.device.mgt.notification.listener.feature
+ pom
+ 3.0.37-SNAPSHOT
+ WSO2 Carbon - Notification Listener
+ http://wso2.org
+ This feature contains the core bundles required iot core listeners
+
+
+
+ org.wso2.carbon.devicemgt-plugins
+ org.wso2.carbon.device.mgt.mqtt.notification.listener
+
+
+
+
+
+
+ org.wso2.maven
+ carbon-p2-plugin
+ ${carbon.p2.plugin.version}
+
+
+ 4-p2-feature-generation
+ package
+
+ p2-feature-gen
+
+
+ org.wso2.carbon.device.mgt.notification.listener
+ ../etc/feature.properties
+
+
+ org.wso2.carbon.p2.category.type:server
+ org.eclipse.equinox.p2.type.group:false
+
+
+
+
+ org.wso2.carbon.devicemgt-plugins:org.wso2.carbon.device.mgt.mqtt.notification.listener:${carbon.devicemgt.plugins.version}
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/features/extensions-feature/org.wso2.carbon.device.mgt.notification.listener.feature/src/main/resources/build.properties b/features/extensions-feature/org.wso2.carbon.device.mgt.notification.listener.feature/src/main/resources/build.properties
new file mode 100644
index 0000000000..33bb0980d3
--- /dev/null
+++ b/features/extensions-feature/org.wso2.carbon.device.mgt.notification.listener.feature/src/main/resources/build.properties
@@ -0,0 +1,19 @@
+#
+# Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+#
+# WSO2 Inc. licenses this file to you under the Apache License,
+# Version 2.0 (the "License"); you may not use this file except
+# in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+custom = true
diff --git a/features/extensions-feature/org.wso2.carbon.device.mgt.notification.listener.feature/src/main/resources/p2.inf b/features/extensions-feature/org.wso2.carbon.device.mgt.notification.listener.feature/src/main/resources/p2.inf
new file mode 100644
index 0000000000..7ab37b9d7d
--- /dev/null
+++ b/features/extensions-feature/org.wso2.carbon.device.mgt.notification.listener.feature/src/main/resources/p2.inf
@@ -0,0 +1 @@
+instructions.configure = \
\ No newline at end of file
diff --git a/features/extensions-feature/pom.xml b/features/extensions-feature/pom.xml
index 0f2b2d9964..dd157efd03 100644
--- a/features/extensions-feature/pom.xml
+++ b/features/extensions-feature/pom.xml
@@ -39,6 +39,7 @@
org.wso2.carbon.device.mgt.adapter.feature
org.wso2.carbon.andes.extensions.device.mgt.mqtt.authorization.feature
org.wso2.extension.siddhi.execution.json.feature
-
+ org.wso2.carbon.device.mgt.notification.listener.feature
+
diff --git a/pom.xml b/pom.xml
index b3d7b36fde..8bd3b46c52 100644
--- a/pom.xml
+++ b/pom.xml
@@ -453,7 +453,11 @@
org.wso2.carbon.device.mgt.output.adapter.websocket
${carbon.devicemgt.plugins.version}
-
+
+ org.wso2.carbon.devicemgt-plugins
+ org.wso2.carbon.device.mgt.mqtt.notification.listener
+ ${carbon.devicemgt.plugins.version}
+
org.wso2.carbon.devicemgt-plugins
@@ -1153,7 +1157,7 @@
1.1.1
- 2.0.58
+ 2.0.69-SNAPSHOT
[2.0.0, 3.0.0)