made content validator to process batch events

revert-dabc3590
ayyoob 9 years ago
parent cdb1f62ba9
commit 3e1b0c54e9

@ -21,6 +21,9 @@ package org.wso2.carbon.event.input.adapter.extensions.http.util;
import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.JsonPath;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.json.simple.JSONArray;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.wso2.carbon.event.input.adapter.extensions.ContentInfo; import org.wso2.carbon.event.input.adapter.extensions.ContentInfo;
import org.wso2.carbon.event.input.adapter.extensions.ContentValidator; import org.wso2.carbon.event.input.adapter.extensions.ContentValidator;
@ -28,17 +31,46 @@ import java.util.Map;
public class HTTPContentValidator implements ContentValidator { public class HTTPContentValidator implements ContentValidator {
private static final Log log = LogFactory.getLog(HTTPContentValidator.class); private static final Log log = LogFactory.getLog(HTTPContentValidator.class);
private static String JSON_ARRAY_START_CHAR = "[";
@Override @Override
public ContentInfo validate(Map<String, String> paramMap) { public ContentInfo validate(Map<String, String> paramMap) {
String deviceId = paramMap.get("deviceId"); String deviceId = paramMap.get("deviceId");
String msg = paramMap.get(HTTPEventAdapterConstants.PAYLOAD_TAG); String msg = paramMap.get(HTTPEventAdapterConstants.PAYLOAD_TAG);
String deviceIdJsonPath = paramMap.get(HTTPEventAdapterConstants.DEVICE_ID_JSON_PATH); String deviceIdJsonPath = paramMap.get(HTTPEventAdapterConstants.DEVICE_ID_JSON_PATH);
boolean status;
if (msg.startsWith(JSON_ARRAY_START_CHAR)) {
status = processMultipleEvents(msg, deviceId, deviceIdJsonPath);
} else {
status = processSingleEvent(msg, deviceId, deviceIdJsonPath);
}
return new ContentInfo(status, msg);
}
private boolean processSingleEvent(String msg, String deviceIdFromTopic, String deviceIdJsonPath) {
Object res = JsonPath.read(msg, deviceIdJsonPath); Object res = JsonPath.read(msg, deviceIdJsonPath);
String deviceIdFromContent = (res != null) ? res.toString() : ""; String deviceIdFromContent = (res != null) ? res.toString() : "";
if (deviceIdFromContent.equals(deviceId)) { if (deviceIdFromContent.equals(deviceIdFromTopic)) {
return new ContentInfo(true, msg); return true;
}
return false;
}
private boolean processMultipleEvents(String msg, String deviceIdFromTopic, String deviceIdJsonPath) {
try {
JSONParser jsonParser = new JSONParser();
JSONArray jsonArray = (JSONArray) jsonParser.parse(msg);
boolean status = false;
for (int i = 0; i < jsonArray.size(); i++) {
status = processSingleEvent(jsonArray.get(i).toString(), deviceIdFromTopic, deviceIdJsonPath);
if (!status) {
return status;
}
}
return status;
} catch (ParseException e) {
log.error("Invalid input " + msg, e);
return false;
} }
return new ContentInfo(false, msg);
} }
} }

@ -21,6 +21,10 @@ package org.wso2.carbon.event.input.adapter.extensions.mqtt.util;
import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.JsonPath;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.wso2.carbon.event.input.adapter.extensions.ContentInfo; import org.wso2.carbon.event.input.adapter.extensions.ContentInfo;
import org.wso2.carbon.event.input.adapter.extensions.ContentValidator; import org.wso2.carbon.event.input.adapter.extensions.ContentValidator;
import org.wso2.carbon.event.input.adapter.extensions.mqtt.Constants; import org.wso2.carbon.event.input.adapter.extensions.mqtt.Constants;
@ -28,6 +32,7 @@ import org.wso2.carbon.event.input.adapter.extensions.mqtt.Constants;
import java.util.Map; import java.util.Map;
public class MQTTContentValidator implements ContentValidator { public class MQTTContentValidator implements ContentValidator {
private static String JSON_ARRAY_START_CHAR = "[";
private static final Log log = LogFactory.getLog(MQTTContentValidator.class); private static final Log log = LogFactory.getLog(MQTTContentValidator.class);
@Override @Override
@ -43,11 +48,39 @@ public class MQTTContentValidator implements ContentValidator {
deviceIdInTopicHierarchyLevelIndex = Integer.parseInt(deviceIdInTopicHierarchyLevel); deviceIdInTopicHierarchyLevelIndex = Integer.parseInt(deviceIdInTopicHierarchyLevel);
} }
String deviceIdFromTopic = topics[deviceIdInTopicHierarchyLevelIndex]; String deviceIdFromTopic = topics[deviceIdInTopicHierarchyLevelIndex];
boolean status;
if (msg.startsWith(JSON_ARRAY_START_CHAR)) {
status = processMultipleEvents(msg, deviceIdFromTopic, deviceIdJsonPath);
} else {
status = processSingleEvent(msg, deviceIdFromTopic, deviceIdJsonPath);
}
return new ContentInfo(status, msg);
}
private boolean processSingleEvent(String msg, String deviceIdFromTopic, String deviceIdJsonPath) {
Object res = JsonPath.read(msg, deviceIdJsonPath); Object res = JsonPath.read(msg, deviceIdJsonPath);
String deviceIdFromContent = (res != null) ? res.toString() : ""; String deviceIdFromContent = (res != null) ? res.toString() : "";
if (deviceIdFromContent.equals(deviceIdFromTopic)) { if (deviceIdFromContent.equals(deviceIdFromTopic)) {
return new ContentInfo(true, msg); return true;
}
return false;
}
private boolean processMultipleEvents(String msg, String deviceIdFromTopic, String deviceIdJsonPath) {
try {
JSONParser jsonParser = new JSONParser();
JSONArray jsonArray = (JSONArray) jsonParser.parse(msg);
boolean status = false;
for (int i = 0; i < jsonArray.size(); i++) {
status = processSingleEvent(jsonArray.get(i).toString(), deviceIdFromTopic, deviceIdJsonPath);
if (!status) {
return status;
}
}
return status;
} catch (ParseException e) {
log.error("Invalid input " + msg, e);
return false;
} }
return new ContentInfo(false, msg);
} }
} }

Loading…
Cancel
Save