diff --git a/components/iot-plugins/das-extensions/org.wso2.carbon.event.input.adapter.extensions/src/main/java/org/wso2/carbon/event/input/adapter/extensions/http/util/HTTPContentValidator.java b/components/iot-plugins/das-extensions/org.wso2.carbon.event.input.adapter.extensions/src/main/java/org/wso2/carbon/event/input/adapter/extensions/http/util/HTTPContentValidator.java index 989f0f282..1afc511a4 100644 --- a/components/iot-plugins/das-extensions/org.wso2.carbon.event.input.adapter.extensions/src/main/java/org/wso2/carbon/event/input/adapter/extensions/http/util/HTTPContentValidator.java +++ b/components/iot-plugins/das-extensions/org.wso2.carbon.event.input.adapter.extensions/src/main/java/org/wso2/carbon/event/input/adapter/extensions/http/util/HTTPContentValidator.java @@ -21,6 +21,9 @@ package org.wso2.carbon.event.input.adapter.extensions.http.util; import com.jayway.jsonpath.JsonPath; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.json.simple.JSONArray; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; import org.wso2.carbon.event.input.adapter.extensions.ContentInfo; import org.wso2.carbon.event.input.adapter.extensions.ContentValidator; @@ -28,17 +31,46 @@ import java.util.Map; public class HTTPContentValidator implements ContentValidator { private static final Log log = LogFactory.getLog(HTTPContentValidator.class); + private static String JSON_ARRAY_START_CHAR = "["; @Override public ContentInfo validate(Map paramMap) { String deviceId = paramMap.get("deviceId"); String msg = paramMap.get(HTTPEventAdapterConstants.PAYLOAD_TAG); String deviceIdJsonPath = paramMap.get(HTTPEventAdapterConstants.DEVICE_ID_JSON_PATH); + boolean status; + if (msg.startsWith(JSON_ARRAY_START_CHAR)) { + status = processMultipleEvents(msg, deviceId, deviceIdJsonPath); + } else { + status = processSingleEvent(msg, deviceId, deviceIdJsonPath); + } + return new ContentInfo(status, msg); + } + + private boolean processSingleEvent(String msg, String deviceIdFromTopic, String deviceIdJsonPath) { Object res = JsonPath.read(msg, deviceIdJsonPath); String deviceIdFromContent = (res != null) ? res.toString() : ""; - if (deviceIdFromContent.equals(deviceId)) { - return new ContentInfo(true, msg); + if (deviceIdFromContent.equals(deviceIdFromTopic)) { + return true; + } + return false; + } + + private boolean processMultipleEvents(String msg, String deviceIdFromTopic, String deviceIdJsonPath) { + try { + JSONParser jsonParser = new JSONParser(); + JSONArray jsonArray = (JSONArray) jsonParser.parse(msg); + boolean status = false; + for (int i = 0; i < jsonArray.size(); i++) { + status = processSingleEvent(jsonArray.get(i).toString(), deviceIdFromTopic, deviceIdJsonPath); + if (!status) { + return status; + } + } + return status; + } catch (ParseException e) { + log.error("Invalid input " + msg, e); + return false; } - return new ContentInfo(false, msg); } } diff --git a/components/iot-plugins/das-extensions/org.wso2.carbon.event.input.adapter.extensions/src/main/java/org/wso2/carbon/event/input/adapter/extensions/mqtt/util/MQTTContentValidator.java b/components/iot-plugins/das-extensions/org.wso2.carbon.event.input.adapter.extensions/src/main/java/org/wso2/carbon/event/input/adapter/extensions/mqtt/util/MQTTContentValidator.java index 96c1eee16..fb14e9ed7 100644 --- a/components/iot-plugins/das-extensions/org.wso2.carbon.event.input.adapter.extensions/src/main/java/org/wso2/carbon/event/input/adapter/extensions/mqtt/util/MQTTContentValidator.java +++ b/components/iot-plugins/das-extensions/org.wso2.carbon.event.input.adapter.extensions/src/main/java/org/wso2/carbon/event/input/adapter/extensions/mqtt/util/MQTTContentValidator.java @@ -21,6 +21,10 @@ package org.wso2.carbon.event.input.adapter.extensions.mqtt.util; import com.jayway.jsonpath.JsonPath; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; import org.wso2.carbon.event.input.adapter.extensions.ContentInfo; import org.wso2.carbon.event.input.adapter.extensions.ContentValidator; import org.wso2.carbon.event.input.adapter.extensions.mqtt.Constants; @@ -28,6 +32,7 @@ import org.wso2.carbon.event.input.adapter.extensions.mqtt.Constants; import java.util.Map; public class MQTTContentValidator implements ContentValidator { + private static String JSON_ARRAY_START_CHAR = "["; private static final Log log = LogFactory.getLog(MQTTContentValidator.class); @Override @@ -43,11 +48,39 @@ public class MQTTContentValidator implements ContentValidator { deviceIdInTopicHierarchyLevelIndex = Integer.parseInt(deviceIdInTopicHierarchyLevel); } String deviceIdFromTopic = topics[deviceIdInTopicHierarchyLevelIndex]; + boolean status; + if (msg.startsWith(JSON_ARRAY_START_CHAR)) { + status = processMultipleEvents(msg, deviceIdFromTopic, deviceIdJsonPath); + } else { + status = processSingleEvent(msg, deviceIdFromTopic, deviceIdJsonPath); + } + return new ContentInfo(status, msg); + } + + private boolean processSingleEvent(String msg, String deviceIdFromTopic, String deviceIdJsonPath) { Object res = JsonPath.read(msg, deviceIdJsonPath); String deviceIdFromContent = (res != null) ? res.toString() : ""; if (deviceIdFromContent.equals(deviceIdFromTopic)) { - return new ContentInfo(true, msg); + return true; + } + return false; + } + + private boolean processMultipleEvents(String msg, String deviceIdFromTopic, String deviceIdJsonPath) { + try { + JSONParser jsonParser = new JSONParser(); + JSONArray jsonArray = (JSONArray) jsonParser.parse(msg); + boolean status = false; + for (int i = 0; i < jsonArray.size(); i++) { + status = processSingleEvent(jsonArray.get(i).toString(), deviceIdFromTopic, deviceIdJsonPath); + if (!status) { + return status; + } + } + return status; + } catch (ParseException e) { + log.error("Invalid input " + msg, e); + return false; } - return new ContentInfo(false, msg); } }