Prevent duplicate alerts & added owner field

revert-70aa11f8
charitha 6 years ago
parent 919fedee20
commit 2a995deeb4

@ -113,6 +113,8 @@ public final class DeviceManagementConstants {
public static final String FLUCTUATION_RADIUS = "fluctuationRadius"; public static final String FLUCTUATION_RADIUS = "fluctuationRadius";
public static final String QUERY_NAME = "queryName"; public static final String QUERY_NAME = "queryName";
public static final String AREA_NAME = "areaName"; public static final String AREA_NAME = "areaName";
public static final String EXECUTION_PLAN_NAME = "executionPlanName";
public static final String DEVICE_OWNER = "owner";
public static final String GEO_FENCE_GEO_JSON = "geoFenceGeoJSON"; public static final String GEO_FENCE_GEO_JSON = "geoFenceGeoJSON";
public static final String SPEED_ALERT_VALUE = "speedAlertValue"; public static final String SPEED_ALERT_VALUE = "speedAlertValue";

@ -198,9 +198,8 @@ public class DeviceInformationManagerImpl implements DeviceInformationManager {
deviceDAO.updateDevice(device, CarbonContext.getThreadLocalCarbonContext().getTenantId()); deviceDAO.updateDevice(device, CarbonContext.getThreadLocalCarbonContext().getTenantId());
deviceDetailsDAO.deleteDeviceLocation(deviceLocation.getDeviceId(), device.getEnrolmentInfo().getId()); deviceDetailsDAO.deleteDeviceLocation(deviceLocation.getDeviceId(), device.getEnrolmentInfo().getId());
deviceDetailsDAO.addDeviceLocation(deviceLocation, device.getEnrolmentInfo().getId()); deviceDetailsDAO.addDeviceLocation(deviceLocation, device.getEnrolmentInfo().getId());
//TODO: This has to be fixed with enrollment id or username should include in the stream def.
if (DeviceManagerUtil.isPublishLocationResponseEnabled()) { if (DeviceManagerUtil.isPublishLocationResponseEnabled()) {
Object[] metaData = {device.getDeviceIdentifier(), device.getType()}; Object[] metaData = {device.getDeviceIdentifier(), device.getEnrolmentInfo().getOwner(), device.getType()};
Object[] payload = new Object[]{ Object[] payload = new Object[]{
deviceLocation.getUpdatedTime().getTime(), deviceLocation.getUpdatedTime().getTime(),
deviceLocation.getLatitude(), deviceLocation.getLatitude(),

@ -376,6 +376,7 @@ public class GeoLocationProviderServiceImpl implements GeoLocationProviderServic
ExecutionPlanConfigurationDto[] allActiveExecutionPlanConfigs = null; ExecutionPlanConfigurationDto[] allActiveExecutionPlanConfigs = null;
String activeExecutionPlan = null; String activeExecutionPlan = null;
String executionPlanName = getExecutionPlanName(alertType, alert.getQueryName()); String executionPlanName = getExecutionPlanName(alertType, alert.getQueryName());
parseMap.put(GeoServices.EXECUTION_PLAN_NAME, executionPlanName);
eventprocessorStub = getEventProcessorAdminServiceStub(); eventprocessorStub = getEventProcessorAdminServiceStub();
String parsedTemplate = parseTemplateForGeoClusters(alertType, parseMap); String parsedTemplate = parseTemplateForGeoClusters(alertType, parseMap);
String validationResponse = eventprocessorStub.validateExecutionPlan(parsedTemplate); String validationResponse = eventprocessorStub.validateExecutionPlan(parsedTemplate);
@ -483,6 +484,8 @@ public class GeoLocationProviderServiceImpl implements GeoLocationProviderServic
ExecutionPlanConfigurationDto[] allActiveExecutionPlanConfigs = null; ExecutionPlanConfigurationDto[] allActiveExecutionPlanConfigs = null;
String activeExecutionPlan = null; String activeExecutionPlan = null;
String executionPlanName = getExecutionPlanName(alertType, alert.getQueryName(), identifier.getId(), owner); String executionPlanName = getExecutionPlanName(alertType, alert.getQueryName(), identifier.getId(), owner);
parseMap.put(GeoServices.EXECUTION_PLAN_NAME, executionPlanName);
parseMap.put(GeoServices.DEVICE_OWNER, owner);
eventprocessorStub = getEventProcessorAdminServiceStub(); eventprocessorStub = getEventProcessorAdminServiceStub();
String parsedTemplate = parseTemplate(alertType, parseMap); String parsedTemplate = parseTemplate(alertType, parseMap);
String validationResponse = eventprocessorStub.validateExecutionPlan(parsedTemplate); String validationResponse = eventprocessorStub.validateExecutionPlan(parsedTemplate);
@ -600,8 +603,10 @@ public class GeoLocationProviderServiceImpl implements GeoLocationProviderServic
} }
private String getExecutionPlanName(String alertType, String queryName, String deviceId, String owner) { private String getExecutionPlanName(String alertType, String queryName, String deviceId, String owner) {
if ("Traffic".equals(alertType)) { if (GeoServices.ALERT_TYPE_TRAFFIC.equals(alertType)) {
return "Geo-ExecutionPlan-Traffic_" + queryName + "_alert"; return "Geo-ExecutionPlan-Traffic_" + queryName + "_alert";
} else if (GeoServices.ALERT_TYPE_SPEED.equals(alertType)) {
return "Geo-ExecutionPlan-" + alertType + "---_" + owner + "_" + deviceId + "_alert";
} else { } else {
return "Geo-ExecutionPlan-" + alertType + "_" + queryName + "---_" + owner + "_" + deviceId + "_alert"; return "Geo-ExecutionPlan-" + alertType + "_" + queryName + "---_" + owner + "_" + deviceId + "_alert";
} }

@ -7,14 +7,31 @@
/* define streams/tables and write queries here ... */ /* define streams/tables and write queries here ... */
@Import('org.wso2.geo.StandardSpatialEvents:1.0.0') @Import('org.wso2.geo.StandardSpatialEvents:1.0.0')
define stream dataIn (id string, latitude double, longitude double, timeStamp long, type string ,speed float, heading float, eventId string); define stream dataIn (id string, owner string, latitude double, longitude double, timeStamp long, type string ,speed float, heading float, eventId string);
@Export('org.wso2.geo.ProcessedSpatialEvents:1.0.0') @Export('iot.per.device.stream.geo.FusedSpatialEvent:1.0.0')
define stream dataOut (id string, latitude double, longitude double, timeStamp long, type string ,speed float, heading float, eventId string, state string, information string); define stream dataOut (id string, owner string, latitude double, longitude double, timeStamp long, type string, speed float, heading float, state string, information string, notify bool);
from dataIn[geo:within(longitude,latitude,"$geoFenceGeoJSON")==false and id == "$deviceId"] @Export('iot.per.device.stream.geo.AlertNotifications:1.0.0')
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId , "ALERTED" as state, str:concat(str:concat(str:concat(type," device "),id), " is outside $areaName area!!!") as information define stream alertsOut (id string, owner string, state string, information string, timeStamp long, latitude double, longitude double, type string);
/* Check if the device is within the geo fence. */
from dataIn[id == "$deviceId" and owner == "$owner"]
select id, owner, latitude, longitude, timeStamp, type, speed, heading, eventId, geo:within(longitude,latitude,"$geoFenceGeoJSON") as isWithin
insert into withinStream;
from withinStream[isWithin == false]
select id, owner, latitude, longitude,timeStamp, type, speed, heading, "ALERTED" as state, str:concat(str:concat(str:concat(type," device "),id), " is outside $areaName area!!!") as information, true as notify
insert into dataOut; insert into dataOut;
from dataIn[geo:within(longitude,latitude,"$geoFenceGeoJSON")!=false and id == "$deviceId"]
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId , "NORMAL" as state, "" as information from withinStream[isWithin == true]
select id, owner, latitude, longitude,timeStamp, type, speed, heading, "NORMAL" as state, "" as information, false as notify
insert into dataOut; insert into dataOut;
from every fs1=withinStream, fs2=withinStream[fs1.isWithin != isWithin]
select fs2.id, fs2.owner, fs2.latitude, fs2.longitude, fs2.timeStamp, fs2.type, fs2.speed, fs2.heading, fs2.eventId, fs2.isWithin
insert into crossedStream;
from crossedStream[isWithin == false]
select id, owner, "ALERTED" as state, str:concat(str:concat(str:concat(type," device "),id), " is outside $areaName area!!!") as information, timeStamp, latitude, longitude, type
insert into alertsOut;

@ -1,5 +1,5 @@
/* Enter a unique ExecutionPlan */ /* Enter a unique ExecutionPlan */
@Plan:name('Geo-ExecutionPlan-Proximity_alert') @Plan:name('$executionPlanName')
/* Enter a unique description for ExecutionPlan */ /* Enter a unique description for ExecutionPlan */
-- @Plan:description('ExecutionPlan') -- @Plan:description('ExecutionPlan')
@ -7,10 +7,10 @@
/* define streams/tables and write queries here ... */ /* define streams/tables and write queries here ... */
@Import('org.wso2.geo.StandardSpatialEvents:1.0.0') @Import('org.wso2.geo.StandardSpatialEvents:1.0.0')
define stream dataIn (id string, latitude double, longitude double, timeStamp long, type string, speed float, heading float, eventId string ); define stream dataIn (id string, owner string, latitude double, longitude double, timeStamp long, type string, speed float, heading float, eventId string );
@Export('org.wso2.geo.ProcessedSpatialEvents:1.0.0') @Export('iot.per.device.stream.geo.FusedSpatialEvent:1.0.0')
define stream dataOut ( id string, latitude double, longitude double, timeStamp long, type string, speed float, heading float, eventId string, state string, information string ); define stream dataOut (id string, owner string, latitude double, longitude double, timeStamp long, type string, speed float, heading float, state string, information string, notify bool);
@IndexBy('id') @IndexBy('id')
define table ProximityTable(id string, timeStamp long); define table ProximityTable(id string, timeStamp long);
@ -19,11 +19,11 @@ define table ProximityTable(id string, timeStamp long);
define table AlertsTable(id string , proximityWith string, eventId string); define table AlertsTable(id string , proximityWith string, eventId string);
from dataIn from dataIn
select id, latitude, longitude, timeStamp, type, speed, heading, eventId select id, owner, latitude, longitude, timeStamp, type, speed, heading, eventId
insert into initialStream; insert into initialStream;
from initialStream[type == 'STOP'] from initialStream[type == 'STOP']
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId , "" as proximityInfo ,"false" as isProximity select id, owner, latitude, longitude,timeStamp, type, speed, heading ,eventId , "" as proximityInfo ,"false" as isProximity
insert into dataOutStream; insert into dataOutStream;
from initialStream[type != 'STOP'] from initialStream[type != 'STOP']
@ -31,51 +31,51 @@ select *
insert into objectInitialStream; insert into objectInitialStream;
from objectInitialStream#geo:proximity(id,longitude,latitude, $proximityDistance) from objectInitialStream#geo:proximity(id,longitude,latitude, $proximityDistance)
select id, latitude, longitude, timeStamp, type, speed, heading, eventId,inCloseProximity,proximityWith select id, owner, latitude, longitude, timeStamp, type, speed, heading, eventId,inCloseProximity,proximityWith
insert into proxymityStream; insert into proxymityStream;
from proxymityStream[AlertsTable.id == proxymityStream.id in AlertsTable] from proxymityStream[AlertsTable.id == proxymityStream.id in AlertsTable]
select id, latitude, longitude, timeStamp, type, speed, heading, eventId,inCloseProximity,proximityWith,true as inAlertTable select id, owner, latitude, longitude, timeStamp, type, speed, heading, eventId,inCloseProximity,proximityWith,true as inAlertTable
insert into innerStreamOne; insert into innerStreamOne;
from proxymityStream[not(AlertsTable.id == proxymityStream.id in AlertsTable)] from proxymityStream[not(AlertsTable.id == proxymityStream.id in AlertsTable)]
select id, latitude, longitude, timeStamp, type, speed, heading, eventId,inCloseProximity,proximityWith,false as inAlertTable select id, owner, latitude, longitude, timeStamp, type, speed, heading, eventId,inCloseProximity,proximityWith,false as inAlertTable
insert into innerStreamOne; insert into innerStreamOne;
from proxymityStream[AlertsTable.id == proxymityStream.proximityWith in AlertsTable] from proxymityStream[AlertsTable.id == proxymityStream.proximityWith in AlertsTable]
select id, latitude, longitude, timeStamp, type, speed, heading, eventId,inCloseProximity,proximityWith,true as inAlertTable select id, owner, latitude, longitude, timeStamp, type, speed, heading, eventId, inCloseProximity, proximityWith, true as inAlertTable
insert into innerStreamSeven; insert into innerStreamSeven;
from proxymityStream[not(AlertsTable.id == proxymityStream.proximityWith in AlertsTable)] from proxymityStream[not(AlertsTable.id == proxymityStream.proximityWith in AlertsTable)]
select id, latitude, longitude, timeStamp, type, speed, heading, eventId,inCloseProximity,proximityWith,false as inAlertTable select id, owner, latitude, longitude, timeStamp, type, speed, heading, eventId,inCloseProximity,proximityWith,false as inAlertTable
insert into innerStreamSeven; insert into innerStreamSeven;
from innerStreamOne[inCloseProximity == true AND not(inAlertTable)] from innerStreamOne[inCloseProximity == true AND not(inAlertTable)]
select id,str:concat(",",proximityWith) as proximityWith , eventId select id, owner, str:concat(",",proximityWith) as proximityWith , eventId
insert into AlertsTable; insert into AlertsTable;
from innerStreamSeven[inCloseProximity == true AND not(inAlertTable)] from innerStreamSeven[inCloseProximity == true AND not(inAlertTable)]
select proximityWith as id,str:concat(",",id) as proximityWith , eventId select proximityWith as id, owner, str:concat(",",id) as proximityWith , eventId
insert into AlertsTable; insert into AlertsTable;
from innerStreamOne[innerStreamOne.inCloseProximity == true AND inAlertTable]#window.length(0) join AlertsTable from innerStreamOne[innerStreamOne.inCloseProximity == true AND inAlertTable]#window.length(0) join AlertsTable
on innerStreamOne.id == AlertsTable.id on innerStreamOne.id == AlertsTable.id
select innerStreamOne.id as id, str:concat(",", innerStreamOne.proximityWith, AlertsTable.proximityWith) as proximityWith, innerStreamOne.eventId as eventId select innerStreamOne.id as id, innerStreamOne.owner as owner, str:concat(",", innerStreamOne.proximityWith, AlertsTable.proximityWith) as proximityWith, innerStreamOne.eventId as eventId
insert into updateStream; insert into updateStream;
from innerStreamSeven[innerStreamSeven.inCloseProximity == true AND inAlertTable]#window.length(0) join AlertsTable from innerStreamSeven[innerStreamSeven.inCloseProximity == true AND inAlertTable]#window.length(0) join AlertsTable
on innerStreamSeven.proximityWith == AlertsTable.id on innerStreamSeven.proximityWith == AlertsTable.id
select innerStreamSeven.proximityWith as id, str:concat(",", innerStreamSeven.id, AlertsTable.proximityWith) as proximityWith, innerStreamSeven.eventId as eventId select innerStreamSeven.proximityWith as id, innerStreamSeven.owner as owner, str:concat(",", innerStreamSeven.id, AlertsTable.proximityWith) as proximityWith, innerStreamSeven.eventId as eventId
insert into updateStream; insert into updateStream;
from innerStreamOne[innerStreamOne.inCloseProximity == false AND inAlertTable]#window.length(0) join AlertsTable from innerStreamOne[innerStreamOne.inCloseProximity == false AND inAlertTable]#window.length(0) join AlertsTable
on innerStreamOne.id == AlertsTable.id on innerStreamOne.id == AlertsTable.id
select innerStreamOne.id as id, str:replaceAll(AlertsTable.proximityWith, str:concat(",", innerStreamOne.proximityWith), "") as proximityWith, innerStreamOne.eventId as eventId select innerStreamOne.id as id, innerStreamOne.owner as owner, str:replaceAll(AlertsTable.proximityWith, str:concat(",", innerStreamOne.proximityWith), "") as proximityWith, innerStreamOne.eventId as eventId
insert into updateStream; insert into updateStream;
from innerStreamSeven[innerStreamSeven.inCloseProximity == false AND inAlertTable]#window.length(0) join AlertsTable from innerStreamSeven[innerStreamSeven.inCloseProximity == false AND inAlertTable]#window.length(0) join AlertsTable
on innerStreamSeven.proximityWith == AlertsTable.id on innerStreamSeven.proximityWith == AlertsTable.id
select innerStreamSeven.proximityWith as id, str:replaceAll(AlertsTable.proximityWith, str:concat(",", innerStreamSeven.id), "") as proximityWith, innerStreamSeven.eventId as eventId select innerStreamSeven.proximityWith as id, innerStreamSeven.owner as owner, str:replaceAll(AlertsTable.proximityWith, str:concat(",", innerStreamSeven.id), "") as proximityWith, innerStreamSeven.eventId as eventId
insert into updateStream; insert into updateStream;
from updateStream from updateStream
@ -88,23 +88,23 @@ delete AlertsTable
on id== AlertsTable.id; on id== AlertsTable.id;
from objectInitialStream[AlertsTable.id == objectInitialStream.id in AlertsTable] from objectInitialStream[AlertsTable.id == objectInitialStream.id in AlertsTable]
select id, latitude, longitude, timeStamp, type, speed, heading, eventId, true as inAlertTable select id, owner, latitude, longitude, timeStamp, type, speed, heading, eventId, true as inAlertTable
insert into publishStream; insert into publishStream;
from objectInitialStream[not(AlertsTable.id == objectInitialStream.id in AlertsTable)] from objectInitialStream[not(AlertsTable.id == objectInitialStream.id in AlertsTable)]
select id, latitude, longitude, timeStamp, type, speed, heading, eventId, false as inAlertTable select id, owner, latitude, longitude, timeStamp, type, speed, heading, eventId, false as inAlertTable
insert into publishStream; insert into publishStream;
from publishStream[inAlertTable == true]#window.length(0) join AlertsTable from publishStream[inAlertTable == true]#window.length(0) join AlertsTable
on publishStream.id== AlertsTable.id on publishStream.id== AlertsTable.id
select publishStream.id as id, publishStream.latitude as latitude, publishStream.longitude as longitude, publishStream.timeStamp as timeStamp, publishStream.type as type, publishStream.speed as speed, publishStream.heading as heading, publishStream.eventId as eventId, AlertsTable.proximityWith as proximityInfo select publishStream.id as id, publishStream.owner as owner, publishStream.latitude as latitude, publishStream.longitude as longitude, publishStream.timeStamp as timeStamp, publishStream.type as type, publishStream.speed as speed, publishStream.heading as heading, publishStream.eventId as eventId, AlertsTable.proximityWith as proximityInfo
insert into innerStreamTwo; insert into innerStreamTwo;
from publishStream[inAlertTable == false] from publishStream[inAlertTable == false]
delete ProximityTable on ProximityTable.id==id; delete ProximityTable on ProximityTable.id==id;
from publishStream[inAlertTable == false] from publishStream[inAlertTable == false]
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId , "" as proximityInfo ,"false" as isProximity select id, owner, latitude, longitude,timeStamp, type, speed, heading ,eventId , "" as proximityInfo ,"false" as isProximity
insert into dataOutStream; insert into dataOutStream;
from innerStreamTwo[ProximityTable.id == innerStreamTwo.id in ProximityTable] from innerStreamTwo[ProximityTable.id == innerStreamTwo.id in ProximityTable]
@ -112,29 +112,29 @@ insert into innerStreamThree;
from innerStreamThree#window.length(0) join ProximityTable from innerStreamThree#window.length(0) join ProximityTable
on innerStreamThree.id == ProximityTable.id on innerStreamThree.id == ProximityTable.id
select innerStreamThree.id , innerStreamThree.latitude, innerStreamThree.longitude,innerStreamThree.timeStamp, innerStreamThree.type, innerStreamThree.speed, innerStreamThree.heading ,innerStreamThree.eventId, ProximityTable.timeStamp as storedTime, innerStreamThree.proximityInfo as proximityInfo select innerStreamThree.id, innerStreamThree.owner, innerStreamThree.latitude, innerStreamThree.longitude,innerStreamThree.timeStamp, innerStreamThree.type, innerStreamThree.speed, innerStreamThree.heading ,innerStreamThree.eventId, ProximityTable.timeStamp as storedTime, innerStreamThree.proximityInfo as proximityInfo
insert into innerStreamFour; insert into innerStreamFour;
from innerStreamFour[(timeStamp - storedTime) >= $proximityTime] from innerStreamFour[(timeStamp - storedTime) >= $proximityTime]
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId ,proximityInfo,"true" as isProximity select id, owner, latitude, longitude,timeStamp, type, speed, heading ,eventId ,proximityInfo, "true" as isProximity
insert into dataOutStream; insert into dataOutStream;
from innerStreamFour[(timeStamp - storedTime) < $proximityTime] from innerStreamFour[(timeStamp - storedTime) < $proximityTime]
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId , proximityInfo ,"false" as isProximity select id, owner, latitude, longitude,timeStamp, type, speed, heading ,eventId, proximityInfo, "false" as isProximity
insert into dataOutStream; insert into dataOutStream;
from innerStreamTwo[not(ProximityTable.id == innerStreamTwo.id in ProximityTable)] from innerStreamTwo[not(ProximityTable.id == innerStreamTwo.id in ProximityTable)]
select innerStreamTwo.id, innerStreamTwo.timeStamp select innerStreamTwo.id, innerStreamTwo.owner, innerStreamTwo.timeStamp
insert into ProximityTable; insert into ProximityTable;
from innerStreamTwo[not(ProximityTable.id == innerStreamTwo.id in ProximityTable)] from innerStreamTwo[not(ProximityTable.id == innerStreamTwo.id in ProximityTable)]
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId , "" as proximityInfo ,"false" as isProximity select id, owner, latitude, longitude,timeStamp, type, speed, heading ,eventId , "" as proximityInfo ,"false" as isProximity
insert into dataOutStream; insert into dataOutStream;
from dataOutStream[isProximity == 'true'] from dataOutStream[isProximity == 'true']
select id, latitude, longitude, timeStamp, type, speed, heading, eventId,"WARNING" as state,str:concat("Proximity with "," ",proximityInfo) as information select id, owner, latitude, longitude, timeStamp, type, speed, heading, "WARNING" as state,str:concat("Proximity with "," ",proximityInfo) as information, true as notify
insert into dataOut; insert into dataOut;
from dataOutStream[isProximity == 'false'] from dataOutStream[isProximity == 'false']
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId ,"NORMAL" as state,"" as information select id, owner, latitude, longitude,timeStamp, type, speed, heading, "NORMAL" as state,"" as information, false as notify
insert into dataOut; insert into dataOut;

@ -1,5 +1,5 @@
/* Enter a unique ExecutionPlan */ /* Enter a unique ExecutionPlan */
@Plan:name('Geo-ExecutionPlan-Proximity_alert') @Plan:name('$executionPlanName')
/* Enter a unique description for ExecutionPlan */ /* Enter a unique description for ExecutionPlan */
-- @Plan:description('ExecutionPlan') -- @Plan:description('ExecutionPlan')

@ -1,5 +1,5 @@
/* Enter a unique ExecutionPlan */ /* Enter a unique ExecutionPlan */
@Plan:name('Geo-ExecutionPlan-Speed---$deviceId_alert') @Plan:name('$executionPlanName')
/* Enter a unique description for ExecutionPlan */ /* Enter a unique description for ExecutionPlan */
-- @Plan:description('ExecutionPlan') -- @Plan:description('ExecutionPlan')
@ -7,14 +7,22 @@
/* define streams/tables and write queries here ... */ /* define streams/tables and write queries here ... */
@Import('org.wso2.geo.StandardSpatialEvents:1.0.0') @Import('org.wso2.geo.StandardSpatialEvents:1.0.0')
define stream dataIn (id string, latitude double, longitude double, timeStamp long, type string, speed float, heading float, eventId string); define stream dataIn (id string, owner string, latitude double, longitude double, timeStamp long, type string, speed float, heading float, eventId string);
@Export('org.wso2.geo.ProcessedSpatialEvents:1.0.0') @Export('iot.per.device.stream.geo.FusedSpatialEvent:1.0.0')
define stream dataOut (id string, latitude double, longitude double, timeStamp long, type string, speed float, heading float, eventId string, state string, information string); define stream dataOut (id string, owner string, latitude double, longitude double, timeStamp long, type string, speed float, heading float, state string, information string, notify bool );
from dataIn[speed >= $speedAlertValue and id == "$deviceId"] @Export('iot.per.device.stream.geo.AlertNotifications:1.0.0')
select id , latitude, longitude,timeStamp, type ,speed, heading ,eventId , "ALERTED" as state, str:concat(str:concat(str:concat(str:concat("Movement of ",type), " device "), id), " is not normal!!") as information define stream alertsOut (id string, owner string, state string, information string, timeStamp long, latitude double, longitude double, type string);
from dataIn[speed >= $speedAlertValue and id == "$deviceId" and owner == "$owner"]
select id, owner, latitude, longitude, timeStamp, type, speed, heading, "ALERTED" as state, str:concat(str:concat(str:concat(str:concat("Speed of ",type), " device "), id), " is not normal!!") as information, true as notify
insert into dataOut; insert into dataOut;
from dataIn[speed < $speedAlertValue and id == "$deviceId"]
select id , latitude, longitude,timeStamp, type ,speed, heading ,eventId , "NORMAL" as state, str:concat(str:concat(str:concat(str:concat("Movement of ",type), " device "), id), " is normal") as information from dataIn[speed < $speedAlertValue and id == "$deviceId" and owner == "$owner"]
select id, owner, latitude, longitude, timeStamp, type, speed, heading, "NORMAL" as state, str:concat(str:concat(str:concat(str:concat("Speed of ",type), " device "), id), " is normal") as information, false as notify
insert into dataOut; insert into dataOut;
from dataOut[notify == true]
select id, owner, state, information, timeStamp, latitude, longitude, type
insert into alertsOut;

@ -1,5 +1,5 @@
/* Enter a unique ExecutionPlan */ /* Enter a unique ExecutionPlan */
@Plan:name('Geo-ExecutionPlan-Speed---_alert') @Plan:name('$executionPlanName')
/* Enter a unique description for ExecutionPlan */ /* Enter a unique description for ExecutionPlan */
-- @Plan:description('ExecutionPlan') -- @Plan:description('ExecutionPlan')

@ -7,11 +7,13 @@
/* define streams/tables and write queries here ... */ /* define streams/tables and write queries here ... */
@Import('org.wso2.geo.StandardSpatialEvents:1.0.0') @Import('org.wso2.geo.StandardSpatialEvents:1.0.0')
define stream dataIn (id string, latitude double, longitude double, timeStamp long, type string ,speed float, heading float, eventId string); define stream dataIn (id string, owner string, latitude double, longitude double, timeStamp long, type string ,speed float, heading float, eventId string);
@Export('iot.per.device.stream.geo.FusedSpatialEvent:1.0.0')
define stream dataOut (id string, owner string, latitude double, longitude double, timeStamp long, type string, speed float, heading float, state string, information string, notify bool );
@Export('org.wso2.geo.ProcessedSpatialEvents:1.0.0') @Export('iot.per.device.stream.geo.AlertNotifications:1.0.0')
define stream dataOut (id string, latitude double, longitude double, timeStamp long, type string ,speed float, heading float, eventId string, state string, information string); define stream alertsOut (id string, owner string, state string, information string, timeStamp long, latitude double, longitude double, type string);
@IndexBy('id') @IndexBy('id')
define table StationeryTable(id string, timeStamp long); define table StationeryTable(id string, timeStamp long);
@ -20,18 +22,18 @@ define table StationeryTable(id string, timeStamp long);
define table AlertsTable(id string, stationary bool); define table AlertsTable(id string, stationary bool);
from dataIn from dataIn
select id, latitude, longitude, timeStamp, type, speed, heading, eventId,geo:within(longitude,latitude,"$geoFenceGeoJSON") as isWithin select id, owner, latitude, longitude, timeStamp, type, speed, heading, eventId,geo:within(longitude,latitude,"$geoFenceGeoJSON") as isWithin
insert into innerStreamOne; insert into innerStreamOne;
from innerStreamOne[isWithin == false] from innerStreamOne[isWithin == false]
delete StationeryTable on StationeryTable.id==id; delete StationeryTable on StationeryTable.id==id;
from innerStreamOne[isWithin == false] from innerStreamOne[isWithin == false]
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId , "false" as isStationary select id, owner, latitude, longitude,timeStamp, type, speed, heading ,eventId , "false" as isStationary
insert into dataOutStream; insert into dataOutStream;
from innerStreamOne[isWithin == true]#geo:stationary(id,longitude,latitude, $fluctuationRadius) from innerStreamOne[isWithin == true]#geo:stationary(id,longitude,latitude, $fluctuationRadius)
select id, latitude, longitude, timeStamp, type, speed, heading, eventId,stationary select id, owner, latitude, longitude, timeStamp, type, speed, heading, eventId,stationary
insert into innerStreamTwo; insert into innerStreamTwo;
from innerStreamTwo[innerStreamTwo.stationary == true] from innerStreamTwo[innerStreamTwo.stationary == true]
@ -45,7 +47,7 @@ from innerStreamTwo[innerStreamTwo.stationary == false]
delete StationeryTable on StationeryTable.id==id; delete StationeryTable on StationeryTable.id==id;
from innerStreamOne[isWithin == true AND not(AlertsTable.id == innerStreamOne.id in AlertsTable)] from innerStreamOne[isWithin == true AND not(AlertsTable.id == innerStreamOne.id in AlertsTable)]
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId , "false" as isStationary select id, owner, latitude, longitude,timeStamp, type, speed, heading ,eventId , "false" as isStationary
insert into dataOutStream; insert into dataOutStream;
from innerStreamOne[isWithin == true AND AlertsTable.id == innerStreamOne.id in AlertsTable] from innerStreamOne[isWithin == true AND AlertsTable.id == innerStreamOne.id in AlertsTable]
@ -53,7 +55,7 @@ insert into innerStreamThree;
from innerStreamThree#window.length(0) join AlertsTable from innerStreamThree#window.length(0) join AlertsTable
on innerStreamThree.id == AlertsTable.id on innerStreamThree.id == AlertsTable.id
select innerStreamThree.id , innerStreamThree.latitude, innerStreamThree.longitude,innerStreamThree.timeStamp, innerStreamThree.type, innerStreamThree.speed, innerStreamThree.heading ,innerStreamThree.eventId select innerStreamThree.id, innerStreamThree.owner, innerStreamThree.latitude, innerStreamThree.longitude,innerStreamThree.timeStamp, innerStreamThree.type, innerStreamThree.speed, innerStreamThree.heading ,innerStreamThree.eventId
insert into innerStreamFour; insert into innerStreamFour;
from innerStreamFour[not(StationeryTable.id == innerStreamFour.id in StationeryTable)] from innerStreamFour[not(StationeryTable.id == innerStreamFour.id in StationeryTable)]
@ -61,7 +63,7 @@ select innerStreamFour.id, innerStreamFour.timeStamp
insert into StationeryTable; insert into StationeryTable;
from innerStreamOne[isWithin == true AND not(StationeryTable.id == innerStreamOne.id in StationeryTable)] from innerStreamOne[isWithin == true AND not(StationeryTable.id == innerStreamOne.id in StationeryTable)]
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId , "false" as isStationary select id, owner, latitude, longitude,timeStamp, type, speed, heading ,eventId , "false" as isStationary
insert into dataOutStream; insert into dataOutStream;
from innerStreamOne[isWithin == true AND StationeryTable.id == innerStreamOne.id in StationeryTable] from innerStreamOne[isWithin == true AND StationeryTable.id == innerStreamOne.id in StationeryTable]
@ -69,21 +71,25 @@ insert into innerStreamFive;
from innerStreamFive#window.length(0) join StationeryTable from innerStreamFive#window.length(0) join StationeryTable
on innerStreamFive.id == StationeryTable.id on innerStreamFive.id == StationeryTable.id
select innerStreamFive.id , innerStreamFive.latitude, innerStreamFive.longitude,innerStreamFive.timeStamp, innerStreamFive.type, innerStreamFive.speed, innerStreamFive.heading ,innerStreamFive.eventId, StationeryTable.timeStamp as storedTime select innerStreamFive.id, innerStreamFive.owner, innerStreamFive.latitude, innerStreamFive.longitude,innerStreamFive.timeStamp, innerStreamFive.type, innerStreamFive.speed, innerStreamFive.heading ,innerStreamFive.eventId, StationeryTable.timeStamp as storedTime
insert into innerStreamSix; insert into innerStreamSix;
from innerStreamSix[(timeStamp - storedTime) >= $stationeryTime] from innerStreamSix[(timeStamp - storedTime) >= $stationeryTime]
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId ,"true" as isStationary select id, owner, latitude, longitude,timeStamp, type, speed, heading ,eventId ,"true" as isStationary
insert into dataOutStream; insert into dataOutStream;
from innerStreamSix[(timeStamp - storedTime) < $stationeryTime] from innerStreamSix[(timeStamp - storedTime) < $stationeryTime]
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId ,"false" as isStationary select id, owner, latitude, longitude,timeStamp, type, speed, heading ,eventId ,"false" as isStationary
insert into dataOutStream; insert into dataOutStream;
from dataOutStream[isStationary == 'true'] from dataOutStream[isStationary == 'true']
select id ,latitude, longitude,timeStamp, type, speed, heading ,eventId ,"ALERTED" as state, str:concat(str:concat(str:concat(type," device "),id),"is in $stationeryName area!!!") as information select id, owner, latitude, longitude,timeStamp, type, speed, heading, "ALERTED" as state, str:concat(str:concat(str:concat(type," device "),id),"is in $stationeryName area!!!") as information, true as notify
insert into dataOut; insert into dataOut;
from dataOutStream[isStationary == 'false'] from dataOutStream[isStationary == 'false']
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId ,"NORMAL" as state,"" as information select id, owner, latitude, longitude,timeStamp, type, speed, heading, "NORMAL" as state,"" as information, false as notify
insert into dataOut; insert into dataOut;
from dataOut[notify == true]
select id, owner, state, information, timeStamp, latitude, longitude, type
insert into alertsOut;

@ -7,11 +7,11 @@
/* define streams/tables and write queries here ... */ /* define streams/tables and write queries here ... */
@Import('rawGeoStream:1.0.0') @Import('rawGeoStream:1.0.0')
define stream dataIn (id string, timeStamp long, geometry string, state string, information string); define stream dataIn (id string, owner string, timeStamp long, geometry string, state string, information string);
@Export('AlertsNotifications:1.0.0') @Export('AlertsNotifications:1.0.0')
define stream dataOut (id string, state string, information string, timeStamp long, latitude double, longitude double); define stream dataOut (id string, owner string, state string, information string, timeStamp long, latitude double, longitude double);
from dataIn[geo:intersects(geometry, "$geoFenceGeoJSON")==true and geodashboard:needToNotify(id, str:concat(information, state), "sendFirst") == true and id == $deviceId] from dataIn[geo:intersects(geometry, "$geoFenceGeoJSON")==true and geodashboard:needToNotify(id, str:concat(information, state), "sendFirst") == true and id == "$deviceId" and owner == "$owner"]
select id, state, str:concat("Traffic alert in $areaName. State: ", state, " ", information) as information, timeStamp, 0.0 as latitude, 0.0 as longitude select id, owner, state, str:concat("Traffic alert in $areaName. State: ", state, " ", information) as information, timeStamp, 0.0 as latitude, 0.0 as longitude
insert into dataOut insert into dataOut

@ -7,14 +7,31 @@
/* define streams/tables and write queries here ... */ /* define streams/tables and write queries here ... */
@Import('org.wso2.geo.StandardSpatialEvents:1.0.0') @Import('org.wso2.geo.StandardSpatialEvents:1.0.0')
define stream dataIn (id string, latitude double, longitude double, timeStamp long, type string ,speed float, heading float, eventId string); define stream dataIn (id string, owner string, latitude double, longitude double, timeStamp long, type string ,speed float, heading float, eventId string);
@Export('org.wso2.geo.ProcessedSpatialEvents:1.0.0') @Export('iot.per.device.stream.geo.FusedSpatialEvent:1.0.0')
define stream dataOut (id string, latitude double, longitude double, timeStamp long, type string ,speed float, heading float, eventId string, state string, information string); define stream dataOut (id string, owner string, latitude double, longitude double, timeStamp long, type string, speed float, heading float, state string, information string, notify bool );
from dataIn[geo:within(longitude,latitude,"$geoFenceGeoJSON")==true and id == "$deviceId"] @Export('iot.per.device.stream.geo.AlertNotifications:1.0.0')
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId , "ALERTED" as state, str:concat(str:concat(str:concat(type," device "), id), " is in $areaName restricted area!!!") as information define stream alertsOut (id string, owner string, state string, information string, timeStamp long, latitude double, longitude double, type string);
/* Check if the device is within the geo fence. */
from dataIn[id == "$deviceId" and owner == "$owner"]
select id, owner, latitude, longitude, timeStamp, type, speed, heading, eventId, geo:within(longitude,latitude,"$geoFenceGeoJSON") as isWithin
insert into withinStream;
from withinStream[isWithin == true]
select id, owner, latitude, longitude,timeStamp, type, speed, heading, "ALERTED" as state, str:concat(str:concat(str:concat(type," device "),id), " is in $areaName restricted area!!!") as information, true as notify
insert into dataOut; insert into dataOut;
from dataIn[geo:within(longitude,latitude,"$geoFenceGeoJSON")!=true and id == "$deviceId"]
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId , "NORMAL" as state, "" as information from withinStream[isWithin == false]
select id, owner, latitude, longitude,timeStamp, type, speed, heading, "NORMAL" as state, "" as information, false as notify
insert into dataOut; insert into dataOut;
from every fs1=withinStream, fs2=withinStream[fs1.isWithin != isWithin]
select fs2.id, fs2.owner, fs2.latitude, fs2.longitude, fs2.timeStamp, fs2.type, fs2.speed, fs2.heading, fs2.eventId, fs2.isWithin
insert into crossedStream;
from crossedStream[isWithin == true]
select id, owner, "ALERTED" as state, str:concat(str:concat(str:concat(type," device "),id), " is outside $areaName area!!!") as information, timeStamp, latitude, longitude, type
insert into alertsOut;

@ -248,7 +248,6 @@ function setWithinAlert(leafletId) {
var data = { var data = {
'parseData': JSON.stringify({ 'parseData': JSON.stringify({
'geoFenceGeoJSON': selectedAreaGeoJson, 'geoFenceGeoJSON': selectedAreaGeoJson,
'executionPlanName': createExecutionPlanName(queryName, "WithIn", deviceId),
'areaName': areaName, 'areaName': areaName,
'deviceId': deviceId 'deviceId': deviceId
}), }),
@ -307,7 +306,6 @@ function setExitAlert(leafletId) {
var data = { var data = {
'parseData': JSON.stringify({ 'parseData': JSON.stringify({
'geoFenceGeoJSON': selectedAreaGeoJson, 'geoFenceGeoJSON': selectedAreaGeoJson,
'executionPlanName': createExecutionPlanName(queryName, "Exit", deviceId),
'areaName': areaName, 'areaName': areaName,
'deviceId': deviceId 'deviceId': deviceId
}), }),
@ -379,7 +377,6 @@ function setStationeryAlert(leafletId) {
var data = { var data = {
'parseData': JSON.stringify({ 'parseData': JSON.stringify({
'geoFenceGeoJSON': selectedProcessedAreaGeoJson, 'geoFenceGeoJSON': selectedProcessedAreaGeoJson,
'executionPlanName': createExecutionPlanName(queryName, "Stationery", deviceId),
'stationeryName': stationeryName, 'stationeryName': stationeryName,
'stationeryTime': time, 'stationeryTime': time,
'fluctuationRadius': fluctuationRadius 'fluctuationRadius': fluctuationRadius
@ -492,7 +489,6 @@ function setTrafficAlert(leafletId) {
var data = { var data = {
'parseData': JSON.stringify({ 'parseData': JSON.stringify({
'geoFenceGeoJSON': selectedProcessedAreaGeoJson, 'geoFenceGeoJSON': selectedProcessedAreaGeoJson,
'executionPlanName': createExecutionPlanName(queryName, "Traffic", deviceId),
'areaName': areaName 'areaName': areaName
}), }),
'executionPlan': 'Traffic', 'executionPlan': 'Traffic',
@ -634,21 +630,6 @@ function setProximityAlert() {
} }
} }
// TODO:this is not a remote call , move this to application.js
function createExecutionPlanName(queryName, id, deviceId) {
if (id == "WithIn") {
return 'Geo-ExecutionPlan-Within' + (queryName ? '_' + queryName : '') + "---" + (deviceId ? '_' + deviceId : '') + '_alert'; // TODO: value of the `queryName` can't be empty, because it will cause name conflicts in CEP, have to do validation(check not empty String)
} else if (id == "Exit") {
return 'Geo-ExecutionPlan-Exit' + (queryName ? '_' + queryName : '') + "---" + (deviceId ? '_' + deviceId : '') + '_alert'; // TODO: value of the `queryName` can't be empty, because it will cause name conflicts in CEP, have to do validation(check not empty String)
} else if (id == "Stationery") {
return 'Geo-ExecutionPlan-Stationery' + (queryName ? '_' + queryName : '') + "---" + (deviceId ? '_' + deviceId : '') + '_alert'; // TODO: value of the `queryName` can't be empty, because it will cause name conflicts in CEP, have to do validation(check not empty String)
} else if (id == "Traffic") {
return 'Geo-ExecutionPlan-Traffic' + (queryName ? '_' + queryName : '') + '_alert'; // TODO: value of the `queryName` can't be empty, because it will cause name conflicts in CEP, have to do validation(check not empty String)
}
}
// TODO:this is not a remote call , move this to application.js // TODO:this is not a remote call , move this to application.js
function closeAll() { function closeAll() {
$('.modal').modal('hide'); $('.modal').modal('hide');

@ -165,7 +165,9 @@ function setSpeedAlert() {
noty({text: message, type: 'error'}); noty({text: message, type: 'error'});
} else { } else {
data = { data = {
'parseData': JSON.stringify({'speedAlertValue': speedAlertValue, 'deviceId': deviceId}), // parseKey : parseValue pair , this key pair is replace with the key in the template file 'parseData': JSON.stringify({
'speedAlertValue': speedAlertValue,
'deviceId': deviceId}), // parseKey : parseValue pair , this key pair is replace with the key in the template file
'executionPlan': 'Speed', 'executionPlan': 'Speed',
'customName': null, 'customName': null,
'cepAction': 'edit', 'cepAction': 'edit',
@ -221,7 +223,6 @@ function setWithinAlert(leafletId) {
var data = { var data = {
'parseData': JSON.stringify({ 'parseData': JSON.stringify({
'geoFenceGeoJSON': selectedAreaGeoJson, 'geoFenceGeoJSON': selectedAreaGeoJson,
'executionPlanName': createExecutionPlanName(queryName, "WithIn", deviceId),
'areaName': areaName, 'areaName': areaName,
'deviceId': deviceId 'deviceId': deviceId
}), }),
@ -280,7 +281,6 @@ function setExitAlert(leafletId) {
var data = { var data = {
'parseData': JSON.stringify({ 'parseData': JSON.stringify({
'geoFenceGeoJSON': selectedAreaGeoJson, 'geoFenceGeoJSON': selectedAreaGeoJson,
'executionPlanName': createExecutionPlanName(queryName, "Exit", deviceId),
'areaName': areaName, 'areaName': areaName,
'deviceId': deviceId 'deviceId': deviceId
}), }),
@ -352,7 +352,6 @@ function setStationeryAlert(leafletId) {
var data = { var data = {
'parseData': JSON.stringify({ 'parseData': JSON.stringify({
'geoFenceGeoJSON': selectedProcessedAreaGeoJson, 'geoFenceGeoJSON': selectedProcessedAreaGeoJson,
'executionPlanName': createExecutionPlanName(queryName, "Stationery", deviceId),
'stationeryName': stationeryName, 'stationeryName': stationeryName,
'stationeryTime': time, 'stationeryTime': time,
'fluctuationRadius': fluctuationRadius 'fluctuationRadius': fluctuationRadius
@ -427,7 +426,6 @@ function setTrafficAlert(leafletId) {
var data = { var data = {
'parseData': JSON.stringify({ 'parseData': JSON.stringify({
'geoFenceGeoJSON': selectedProcessedAreaGeoJson, 'geoFenceGeoJSON': selectedProcessedAreaGeoJson,
'executionPlanName': createExecutionPlanName(queryName, "Traffic", deviceId),
'areaName': areaName 'areaName': areaName
}), }),
'executionPlan': 'Traffic', 'executionPlan': 'Traffic',
@ -569,21 +567,6 @@ function setProximityAlert() {
} }
} }
// TODO:this is not a remote call , move this to application.js
function createExecutionPlanName(queryName, id) {
if (id == "WithIn") {
return 'Geo-ExecutionPlan-Within' + (queryName ? '_' + queryName : '') + "---" + '_alert'; // TODO: value of the `queryName` can't be empty, because it will cause name conflicts in CEP, have to do validation(check not empty String)
} else if (id == "Exit") {
return 'Geo-ExecutionPlan-Exit' + (queryName ? '_' + queryName : '') + "---" + '_alert'; // TODO: value of the `queryName` can't be empty, because it will cause name conflicts in CEP, have to do validation(check not empty String)
} else if (id == "Stationery") {
return 'Geo-ExecutionPlan-Stationery' + (queryName ? '_' + queryName : '') + "---" + '_alert'; // TODO: value of the `queryName` can't be empty, because it will cause name conflicts in CEP, have to do validation(check not empty String)
} else if (id == "Traffic") {
return 'Geo-ExecutionPlan-Traffic' + (queryName ? '_' + queryName : '') + '_alert'; // TODO: value of the `queryName` can't be empty, because it will cause name conflicts in CEP, have to do validation(check not empty String)
}
}
// TODO:this is not a remote call , move this to application.js // TODO:this is not a remote call , move this to application.js
function closeAll() { function closeAll() {
$('.modal').modal('hide'); $('.modal').modal('hide');

Loading…
Cancel
Save