few changes on content transformation logic

revert-dabc3590
ayyoob 8 years ago
parent 3a7536f1f6
commit 9dc836928c

@ -0,0 +1,89 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.input.adapter.extension.transformer;
import com.jayway.jsonpath.JsonPath;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.wso2.carbon.device.mgt.input.adapter.extension.ContentTransformer;
import java.util.Map;
/**
* This holds the default implementation of ContentTransformer
*/
public class MQTTContentTransformer implements ContentTransformer {
private static final String MQTT_CONTENT_TRANSFORMER = "iot-mqtt";
private static final String TOPIC = "topic";
private static String JSON_ARRAY_START_CHAR = "[";
private static final Log log = LogFactory.getLog(MQTTContentTransformer.class);
@Override
public String getType() {
return MQTT_CONTENT_TRANSFORMER;
}
@Override
public Object transform(Object messagePayload, Map<String, Object> dynamicProperties) {
String topic = (String) dynamicProperties.get(TOPIC);
String topics[] = topic.split("/");
String deviceId = topics[2];
String deviceType = topics[1];
String message = (String) messagePayload;
try {
if (message.startsWith(JSON_ARRAY_START_CHAR)) {
return processMultipleEvents(message, deviceId, deviceType);
} else {
return processSingleEvent(message, deviceId, deviceType);
}
} catch (ParseException e) {
log.error("Invalid input " + message, e);
return false;
}
}
private String processSingleEvent(String msg, String deviceIdFromTopic, String deviceIdJsonPath)
throws ParseException {
JSONParser parser = new JSONParser();
JSONObject jsonObject = new JSONObject();
jsonObject.put("deviceId", deviceIdFromTopic);
JSONObject eventObject = new JSONObject();
eventObject.put("payloadData", parser.parse(msg));
eventObject.put("metaData", jsonObject);
JSONObject event = new JSONObject();
event.put("event", eventObject);
return event.toJSONString();
}
private String processMultipleEvents(String msg, String deviceIdFromTopic, String deviceIdJsonPath)
throws ParseException {
JSONParser jsonParser = new JSONParser();
JSONArray jsonArray = (JSONArray) jsonParser.parse(msg);
JSONArray eventsArray = new JSONArray();
for (int i = 0; i < jsonArray.size(); i++) {
eventsArray.add(i, processSingleEvent(jsonArray.get(i).toString(), deviceIdFromTopic, deviceIdJsonPath));
}
return eventsArray.toJSONString();
}
}

@ -33,10 +33,10 @@ public class MQTTContentValidator implements ContentValidator {
private static final String JSON_ARRAY_START_CHAR = "["; private static final String JSON_ARRAY_START_CHAR = "[";
private static final Log log = LogFactory.getLog(MQTTContentValidator.class); private static final Log log = LogFactory.getLog(MQTTContentValidator.class);
private static final String CDMF_MQTT_CONTENT_VALIDATOR = "iot-mqtt"; private static final String CDMF_MQTT_CONTENT_VALIDATOR = "iot-mqtt";
public static final String DEVICE_ID_JSON_PATH = "event.metaData.deviceId"; private static final String DEVICE_ID_JSON_PATH = "event.metaData.deviceId";
public static final String DEVICE_TYPE_JSON_PATH = "event.metaData.deviceId"; private static final String DEVICE_TYPE_JSON_PATH = "event.metaData.deviceId";
public static final String TOPIC = "topic"; private static final String TOPIC = "topic";
public static final int DEVICE_ID_TOPIC_HIERARCHY_INDEX = 2; private static final int DEVICE_ID_TOPIC_HIERARCHY_INDEX = 2;
@Override @Override
public String getType() { public String getType() {
@ -47,15 +47,14 @@ public class MQTTContentValidator implements ContentValidator {
public ContentInfo validate(Object msgPayload, Map<String, Object> dynamicParams) { public ContentInfo validate(Object msgPayload, Map<String, Object> dynamicParams) {
String topic = (String) dynamicParams.get(TOPIC); String topic = (String) dynamicParams.get(TOPIC);
String topics[] = topic.split("/"); String topics[] = topic.split("/");
String deviceIdJsonPath = DEVICE_ID_JSON_PATH;
int deviceIdInTopicHierarchyLevelIndex = DEVICE_ID_TOPIC_HIERARCHY_INDEX; int deviceIdInTopicHierarchyLevelIndex = DEVICE_ID_TOPIC_HIERARCHY_INDEX;
String deviceIdFromTopic = topics[deviceIdInTopicHierarchyLevelIndex]; String deviceIdFromTopic = topics[deviceIdInTopicHierarchyLevelIndex];
boolean status; boolean status;
String message = (String) msgPayload; String message = (String) msgPayload;
if (message.startsWith(JSON_ARRAY_START_CHAR)) { if (message.startsWith(JSON_ARRAY_START_CHAR)) {
status = processMultipleEvents(message, deviceIdFromTopic, deviceIdJsonPath); status = processMultipleEvents(message, deviceIdFromTopic, DEVICE_ID_JSON_PATH);
} else { } else {
status = processSingleEvent(message, deviceIdFromTopic, deviceIdJsonPath); status = processSingleEvent(message, deviceIdFromTopic, DEVICE_ID_JSON_PATH);
} }
return new ContentInfo(status, msgPayload); return new ContentInfo(status, msgPayload);
} }

@ -21,7 +21,6 @@ package org.wso2.carbon.device.mgt.mqtt.notification.listener;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.common.pull.notification.NotificationContext;
import org.wso2.carbon.device.mgt.common.pull.notification.PullNotificationExecutionFailedException; import org.wso2.carbon.device.mgt.common.pull.notification.PullNotificationExecutionFailedException;
import org.wso2.carbon.device.mgt.mqtt.notification.listener.internal.MqttNotificationDataHolder; import org.wso2.carbon.device.mgt.mqtt.notification.listener.internal.MqttNotificationDataHolder;
import org.wso2.carbon.event.input.adapter.core.InputEventAdapterSubscription; import org.wso2.carbon.event.input.adapter.core.InputEventAdapterSubscription;
@ -48,10 +47,10 @@ public class DeviceTypeOperationAdapterSubscription implements InputEventAdapter
try { try {
PrivilegedCarbonContext.getThreadLocalCarbonContext().setUsername(PrivilegedCarbonContext. PrivilegedCarbonContext.getThreadLocalCarbonContext().setUsername(PrivilegedCarbonContext.
getThreadLocalCarbonContext().getUserRealm().getRealmConfiguration().getAdminUserName()); getThreadLocalCarbonContext().getUserRealm().getRealmConfiguration().getAdminUserName());
NotificationContext notificationContext = notificationMessage.getNotificationContext(); deviceType = notificationMessage.getDeviceIdentifier().getType();
deviceType = notificationContext.getDeviceId().getType(); MqttNotificationDataHolder.getInstance().getDeviceManagementProviderService().
MqttNotificationDataHolder.getInstance().getDeviceManagementProviderService() updatePullNotificationOperation(notificationMessage.getDeviceIdentifier(),
.executePullNotification(deviceType, notificationContext); notificationMessage.getOperation());
} catch (UserStoreException e) { } catch (UserStoreException e) {
log.error("Failed to retrieve tenant username", e); log.error("Failed to retrieve tenant username", e);
} catch (PullNotificationExecutionFailedException e) { } catch (PullNotificationExecutionFailedException e) {

@ -18,16 +18,19 @@
*/ */
package org.wso2.carbon.device.mgt.mqtt.notification.listener; package org.wso2.carbon.device.mgt.mqtt.notification.listener;
import org.wso2.carbon.device.mgt.common.pull.notification.NotificationContext; import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
import org.wso2.carbon.device.mgt.common.operation.mgt.Operation;
public class NotificationMessage { public class NotificationMessage {
private String tenantDomain; private String tenantDomain;
private NotificationContext notificationContext; private DeviceIdentifier deviceIdentifier;
private Operation operation;
public NotificationMessage(String tenantDomain, NotificationContext notificationContext) { public NotificationMessage(String tenantDomain, DeviceIdentifier deviceIdentifier,Operation operation) {
this.tenantDomain = tenantDomain; this.tenantDomain = tenantDomain;
this.notificationContext = notificationContext; this.operation = operation;
this.deviceIdentifier = deviceIdentifier;
} }
public String getTenantDomain() { public String getTenantDomain() {
@ -38,12 +41,20 @@ public class NotificationMessage {
this.tenantDomain = tenantDomain; this.tenantDomain = tenantDomain;
} }
public NotificationContext getNotificationContext() { public Operation getOperation() {
return notificationContext; return operation;
} }
public void setNotificationContext( public void setNotificationContext(
NotificationContext notificationContext) { Operation operation) {
this.notificationContext = notificationContext; this.operation = operation;
}
public DeviceIdentifier getDeviceIdentifier() {
return deviceIdentifier;
}
public void setDeviceIdentifier(DeviceIdentifier deviceIdentifier) {
this.deviceIdentifier = deviceIdentifier;
} }
} }

@ -19,11 +19,9 @@
package org.wso2.carbon.device.mgt.mqtt.notification.listener; package org.wso2.carbon.device.mgt.mqtt.notification.listener;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.common.DeviceIdentifier; import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
import org.wso2.carbon.device.mgt.common.pull.notification.NotificationContext; import org.wso2.carbon.device.mgt.common.operation.mgt.Operation;
import org.wso2.carbon.device.mgt.common.pull.notification.NotificationPayload;
import org.wso2.carbon.device.mgt.input.adapter.extension.ContentTransformer; import org.wso2.carbon.device.mgt.input.adapter.extension.ContentTransformer;
import java.util.Map; import java.util.Map;
@ -50,10 +48,8 @@ public class PullNotificationMqttContentTransformer implements ContentTransforme
String deviceId = topicParams[2]; String deviceId = topicParams[2];
Gson gson = new Gson(); Gson gson = new Gson();
try { try {
NotificationPayload notificationPayload = gson.fromJson((String) message, NotificationPayload.class); Operation operation = gson.fromJson((String) message, Operation.class);
NotificationContext notificationContext = new NotificationContext return new NotificationMessage(tenantDomain, new DeviceIdentifier(deviceId, deviceType),operation);
(new DeviceIdentifier(deviceId, deviceType), notificationPayload);
return new NotificationMessage(tenantDomain, notificationContext);
} catch (Exception e) { } catch (Exception e) {
//Avoid notification listener to fail. //Avoid notification listener to fail.
return new Object(); return new Object();

@ -12,7 +12,7 @@
<artifactId>org.wso2.carbon.device.mgt.notification.listener.feature</artifactId> <artifactId>org.wso2.carbon.device.mgt.notification.listener.feature</artifactId>
<packaging>pom</packaging> <packaging>pom</packaging>
<version>3.0.37-SNAPSHOT</version> <version>3.0.38-SNAPSHOT</version>
<name>WSO2 Carbon - Notification Listener</name> <name>WSO2 Carbon - Notification Listener</name>
<url>http://wso2.org</url> <url>http://wso2.org</url>
<description>This feature contains the core bundles required iot core listeners</description> <description>This feature contains the core bundles required iot core listeners</description>

@ -1157,7 +1157,7 @@
<javax.ws.rs.version>1.1.1</javax.ws.rs.version> <javax.ws.rs.version>1.1.1</javax.ws.rs.version>
<!-- Carbon Device Management --> <!-- Carbon Device Management -->
<carbon.devicemgt.version>2.0.69-SNAPSHOT</carbon.devicemgt.version> <carbon.devicemgt.version>2.0.71-SNAPSHOT</carbon.devicemgt.version>
<carbon.devicemgt.version.range>[2.0.0, 3.0.0)</carbon.devicemgt.version.range> <carbon.devicemgt.version.range>[2.0.0, 3.0.0)</carbon.devicemgt.version.range>

Loading…
Cancel
Save