parent
7583957e16
commit
7c88dfa0b3
@ -0,0 +1,140 @@
|
||||
/* Enter a unique ExecutionPlan */
|
||||
@Plan:name('Geo-ExecutionPlan-Proximity_alert')
|
||||
|
||||
/* Enter a unique description for ExecutionPlan */
|
||||
-- @Plan:description('ExecutionPlan')
|
||||
|
||||
/* define streams/tables and write queries here ... */
|
||||
|
||||
@Import('org.wso2.geo.StandardSpatialEvents:1.0.0')
|
||||
define stream dataIn (id string, latitude double, longitude double, timeStamp long, type string, speed float, heading float, eventId string );
|
||||
|
||||
@Export('org.wso2.geo.ProcessedSpatialEvents:1.0.0')
|
||||
define stream dataOut ( id string, latitude double, longitude double, timeStamp long, type string, speed float, heading float, eventId string, state string, information string );
|
||||
|
||||
@IndexBy('id')
|
||||
define table ProximityTable(id string, timeStamp long);
|
||||
|
||||
@IndexBy('id')
|
||||
define table AlertsTable(id string , proximityWith string, eventId string);
|
||||
|
||||
from dataIn#geodashboard:subscribe()
|
||||
select id, latitude, longitude, timeStamp, type, speed, heading, eventId
|
||||
insert into initialStream;
|
||||
|
||||
from initialStream[type == 'STOP']
|
||||
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId , "" as proximityInfo ,"false" as isProximity
|
||||
insert into dataOutStream;
|
||||
|
||||
from initialStream[type != 'STOP']
|
||||
select *
|
||||
insert into objectInitialStream;
|
||||
|
||||
from objectInitialStream#geo:proximity(id,longitude,latitude, $proximityDistance)
|
||||
select id, latitude, longitude, timeStamp, type, speed, heading, eventId,inCloseProximity,proximityWith
|
||||
insert into proxymityStream;
|
||||
|
||||
from proxymityStream[AlertsTable.id == proxymityStream.id in AlertsTable]
|
||||
select id, latitude, longitude, timeStamp, type, speed, heading, eventId,inCloseProximity,proximityWith,true as inAlertTable
|
||||
insert into innerStreamOne;
|
||||
|
||||
from proxymityStream[not(AlertsTable.id == proxymityStream.id in AlertsTable)]
|
||||
select id, latitude, longitude, timeStamp, type, speed, heading, eventId,inCloseProximity,proximityWith,false as inAlertTable
|
||||
insert into innerStreamOne;
|
||||
|
||||
from proxymityStream[AlertsTable.id == proxymityStream.proximityWith in AlertsTable]
|
||||
select id, latitude, longitude, timeStamp, type, speed, heading, eventId,inCloseProximity,proximityWith,true as inAlertTable
|
||||
insert into innerStreamSeven;
|
||||
|
||||
from proxymityStream[not(AlertsTable.id == proxymityStream.proximityWith in AlertsTable)]
|
||||
select id, latitude, longitude, timeStamp, type, speed, heading, eventId,inCloseProximity,proximityWith,false as inAlertTable
|
||||
insert into innerStreamSeven;
|
||||
|
||||
from innerStreamOne[inCloseProximity == true AND not(inAlertTable)]
|
||||
select id,str:concat(",",proximityWith) as proximityWith , eventId
|
||||
insert into AlertsTable;
|
||||
|
||||
from innerStreamSeven[inCloseProximity == true AND not(inAlertTable)]
|
||||
select proximityWith as id,str:concat(",",id) as proximityWith , eventId
|
||||
insert into AlertsTable;
|
||||
|
||||
from innerStreamOne[innerStreamOne.inCloseProximity == true AND inAlertTable]#window.length(0) join AlertsTable
|
||||
on innerStreamOne.id == AlertsTable.id
|
||||
select innerStreamOne.id as id, str:concat(",", innerStreamOne.proximityWith, AlertsTable.proximityWith) as proximityWith, innerStreamOne.eventId as eventId
|
||||
insert into updateStream;
|
||||
|
||||
from innerStreamSeven[innerStreamSeven.inCloseProximity == true AND inAlertTable]#window.length(0) join AlertsTable
|
||||
on innerStreamSeven.proximityWith == AlertsTable.id
|
||||
select innerStreamSeven.proximityWith as id, str:concat(",", innerStreamSeven.id, AlertsTable.proximityWith) as proximityWith, innerStreamSeven.eventId as eventId
|
||||
insert into updateStream;
|
||||
|
||||
from innerStreamOne[innerStreamOne.inCloseProximity == false AND inAlertTable]#window.length(0) join AlertsTable
|
||||
on innerStreamOne.id == AlertsTable.id
|
||||
select innerStreamOne.id as id, str:replaceAll(AlertsTable.proximityWith, str:concat(",", innerStreamOne.proximityWith), "") as proximityWith, innerStreamOne.eventId as eventId
|
||||
insert into updateStream;
|
||||
|
||||
from innerStreamSeven[innerStreamSeven.inCloseProximity == false AND inAlertTable]#window.length(0) join AlertsTable
|
||||
on innerStreamSeven.proximityWith == AlertsTable.id
|
||||
select innerStreamSeven.proximityWith as id, str:replaceAll(AlertsTable.proximityWith, str:concat(",", innerStreamSeven.id), "") as proximityWith, innerStreamSeven.eventId as eventId
|
||||
insert into updateStream;
|
||||
|
||||
from updateStream
|
||||
select *
|
||||
update AlertsTable
|
||||
on id== AlertsTable.id;
|
||||
|
||||
from updateStream[proximityWith == ""]
|
||||
delete AlertsTable
|
||||
on id== AlertsTable.id;
|
||||
|
||||
from objectInitialStream[AlertsTable.id == objectInitialStream.id in AlertsTable]
|
||||
select id, latitude, longitude, timeStamp, type, speed, heading, eventId, true as inAlertTable
|
||||
insert into publishStream;
|
||||
|
||||
from objectInitialStream[not(AlertsTable.id == objectInitialStream.id in AlertsTable)]
|
||||
select id, latitude, longitude, timeStamp, type, speed, heading, eventId, false as inAlertTable
|
||||
insert into publishStream;
|
||||
|
||||
from publishStream[inAlertTable == true]#window.length(0) join AlertsTable
|
||||
on publishStream.id== AlertsTable.id
|
||||
select publishStream.id as id, publishStream.latitude as latitude, publishStream.longitude as longitude, publishStream.timeStamp as timeStamp, publishStream.type as type, publishStream.speed as speed, publishStream.heading as heading, publishStream.eventId as eventId, AlertsTable.proximityWith as proximityInfo
|
||||
insert into innerStreamTwo;
|
||||
|
||||
from publishStream[inAlertTable == false]
|
||||
delete ProximityTable on ProximityTable.id==id;
|
||||
|
||||
from publishStream[inAlertTable == false]
|
||||
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId , "" as proximityInfo ,"false" as isProximity
|
||||
insert into dataOutStream;
|
||||
|
||||
from innerStreamTwo[ProximityTable.id == innerStreamTwo.id in ProximityTable]
|
||||
insert into innerStreamThree;
|
||||
|
||||
from innerStreamThree#window.length(0) join ProximityTable
|
||||
on innerStreamThree.id == ProximityTable.id
|
||||
select innerStreamThree.id , innerStreamThree.latitude, innerStreamThree.longitude,innerStreamThree.timeStamp, innerStreamThree.type, innerStreamThree.speed, innerStreamThree.heading ,innerStreamThree.eventId, ProximityTable.timeStamp as storedTime, innerStreamThree.proximityInfo as proximityInfo
|
||||
insert into innerStreamFour;
|
||||
|
||||
from innerStreamFour[(timeStamp - storedTime) >= $proximityTime]
|
||||
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId ,proximityInfo,"true" as isProximity
|
||||
insert into dataOutStream;
|
||||
|
||||
from innerStreamFour[(timeStamp - storedTime) < $proximityTime]
|
||||
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId , proximityInfo ,"false" as isProximity
|
||||
insert into dataOutStream;
|
||||
|
||||
from innerStreamTwo[not(ProximityTable.id == innerStreamTwo.id in ProximityTable)]
|
||||
select innerStreamTwo.id, innerStreamTwo.timeStamp
|
||||
insert into ProximityTable;
|
||||
|
||||
from innerStreamTwo[not(ProximityTable.id == innerStreamTwo.id in ProximityTable)]
|
||||
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId , "" as proximityInfo ,"false" as isProximity
|
||||
insert into dataOutStream;
|
||||
|
||||
from dataOutStream[isProximity == 'true']
|
||||
select id, latitude, longitude, timeStamp, type, speed, heading, eventId,"WARNING" as state,str:concat("Proximity with "," ",proximityInfo) as information
|
||||
insert into dataOut;
|
||||
|
||||
from dataOutStream[isProximity == 'false']
|
||||
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId ,"NORMAL" as state,"" as information
|
||||
insert into dataOut;
|
@ -0,0 +1,89 @@
|
||||
/* Enter a unique ExecutionPlan */
|
||||
@Plan:name('$executionPlanName')
|
||||
|
||||
/* Enter a unique description for ExecutionPlan */
|
||||
-- @Plan:description('ExecutionPlan')
|
||||
|
||||
/* define streams/tables and write queries here ... */
|
||||
|
||||
@Import('org.wso2.geo.StandardSpatialEvents:1.0.0')
|
||||
define stream dataIn (id string, latitude double, longitude double, timeStamp long, type string ,speed float, heading float, eventId string);
|
||||
|
||||
|
||||
@Export('org.wso2.geo.ProcessedSpatialEvents:1.0.0')
|
||||
define stream dataOut (id string, latitude double, longitude double, timeStamp long, type string ,speed float, heading float, eventId string, state string, information string);
|
||||
|
||||
@IndexBy('id')
|
||||
define table StationeryTable(id string, timeStamp long);
|
||||
|
||||
@IndexBy('id')
|
||||
define table AlertsTable(id string, stationary bool);
|
||||
|
||||
from dataIn#geodashboard:subscribe()
|
||||
select id, latitude, longitude, timeStamp, type, speed, heading, eventId,geo:within(longitude,latitude,"$geoFenceGeoJSON") as isWithin
|
||||
insert into innerStreamOne;
|
||||
|
||||
from innerStreamOne[isWithin == false]
|
||||
delete StationeryTable on StationeryTable.id==id;
|
||||
|
||||
from innerStreamOne[isWithin == false]
|
||||
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId , "false" as isStationary
|
||||
insert into dataOutStream;
|
||||
|
||||
from innerStreamOne[isWithin == true]#geo:stationary(id,longitude,latitude, $fluctuationRadius)
|
||||
select id, latitude, longitude, timeStamp, type, speed, heading, eventId,stationary
|
||||
insert into innerStreamTwo;
|
||||
|
||||
from innerStreamTwo[innerStreamTwo.stationary == true]
|
||||
select innerStreamTwo.id, innerStreamTwo.stationary
|
||||
insert into AlertsTable;
|
||||
|
||||
from innerStreamTwo[innerStreamTwo.stationary == false]
|
||||
delete AlertsTable on AlertsTable.id==id;
|
||||
|
||||
from innerStreamTwo[innerStreamTwo.stationary == false]
|
||||
delete StationeryTable on StationeryTable.id==id;
|
||||
|
||||
from innerStreamOne[isWithin == true AND not(AlertsTable.id == innerStreamOne.id in AlertsTable)]
|
||||
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId , "false" as isStationary
|
||||
insert into dataOutStream;
|
||||
|
||||
from innerStreamOne[isWithin == true AND AlertsTable.id == innerStreamOne.id in AlertsTable]
|
||||
insert into innerStreamThree;
|
||||
|
||||
from innerStreamThree#window.length(0) join AlertsTable
|
||||
on innerStreamThree.id == AlertsTable.id
|
||||
select innerStreamThree.id , innerStreamThree.latitude, innerStreamThree.longitude,innerStreamThree.timeStamp, innerStreamThree.type, innerStreamThree.speed, innerStreamThree.heading ,innerStreamThree.eventId
|
||||
insert into innerStreamFour;
|
||||
|
||||
from innerStreamFour[not(StationeryTable.id == innerStreamFour.id in StationeryTable)]
|
||||
select innerStreamFour.id, innerStreamFour.timeStamp
|
||||
insert into StationeryTable;
|
||||
|
||||
from innerStreamOne[isWithin == true AND not(StationeryTable.id == innerStreamOne.id in StationeryTable)]
|
||||
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId , "false" as isStationary
|
||||
insert into dataOutStream;
|
||||
|
||||
from innerStreamOne[isWithin == true AND StationeryTable.id == innerStreamOne.id in StationeryTable]
|
||||
insert into innerStreamFive;
|
||||
|
||||
from innerStreamFive#window.length(0) join StationeryTable
|
||||
on innerStreamFive.id == StationeryTable.id
|
||||
select innerStreamFive.id , innerStreamFive.latitude, innerStreamFive.longitude,innerStreamFive.timeStamp, innerStreamFive.type, innerStreamFive.speed, innerStreamFive.heading ,innerStreamFive.eventId, StationeryTable.timeStamp as storedTime
|
||||
insert into innerStreamSix;
|
||||
|
||||
from innerStreamSix[(timeStamp - storedTime) >= $stationeryTime]
|
||||
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId ,"true" as isStationary
|
||||
insert into dataOutStream;
|
||||
|
||||
from innerStreamSix[(timeStamp - storedTime) < $stationeryTime]
|
||||
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId ,"false" as isStationary
|
||||
insert into dataOutStream;
|
||||
|
||||
from dataOutStream[isStationary == 'true']
|
||||
select id ,latitude, longitude,timeStamp, type, speed, heading ,eventId ,"ALERTED" as state, "This device is in $stationeryName area!!!" as information
|
||||
insert into dataOut;
|
||||
|
||||
from dataOutStream[isStationary == 'false']
|
||||
select id , latitude, longitude,timeStamp, type, speed, heading ,eventId ,"NORMAL" as state,"" as information
|
||||
insert into dataOut;
|
Loading…
Reference in new issue