From 575e4932e68f233b12f08999a1da89444b79575d Mon Sep 17 00:00:00 2001 From: Rasika Perera Date: Tue, 6 Jun 2017 05:36:36 +0530 Subject: [PATCH] Fixing geo script issue --- .../AbstractGeoOperationExecutor.java | 115 ++++++++ .../function/GeoDistanceFunctionExecutor.java | 124 ++++++++ .../GeoIntersectsFunctionExecutor.java | 28 ++ .../GeoWithinDistanceFunctionExecutor.java | 44 +++ .../function/GeoWithinFunctionExecutor.java | 27 ++ .../geo/internal/util/ClosestOperation.java | 43 +++ .../geo/internal/util/GeoOperation.java | 92 ++++++ .../geo/internal/util/GeometryUtils.java | 87 ++++++ .../internal/util/IntersectsOperation.java | 40 +++ .../util/WithinDistanceOperation.java | 40 +++ .../geo/internal/util/WithinOperation.java | 40 +++ .../geo/stream/GeoCrossesStreamProcessor.java | 138 +++++++++ ...GeoLocationApproximateStreamProcessor.java | 246 ++++++++++++++++ .../stream/GeoProximityStreamProcessor.java | 179 ++++++++++++ .../stream/GeoStationaryStreamProcessor.java | 154 ++++++++++ ...oClosestPointsStreamFunctionProcessor.java | 80 ++++++ .../src/main/resources/geo.siddhiext | 26 ++ .../siddhi/extensions/geo/GeoTestCase.java | 49 ++++ .../geo/function/GeoDistanceTestCase.java | 88 ++++++ .../geo/function/GeoIntersectsTestCase.java | 73 +++++ .../function/GeoWithinDistanceTestCase.java | 73 +++++ .../geo/function/GeoWithinTestCase.java | 268 ++++++++++++++++++ .../geo/stream/GeoCrossesTestCase.java | 76 +++++ .../geo/stream/GeoProximityTestCase.java | 91 ++++++ .../geo/stream/GeoStationaryTestCase.java | 75 +++++ .../function/AverageLocationTestCase.java | 161 +++++++++++ .../function/GeoClosetPointTestCase.java | 183 ++++++++++++ .../src/test/resources/log4j.properties | 35 +++ 28 files changed, 2675 insertions(+) create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/function/AbstractGeoOperationExecutor.java create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoDistanceFunctionExecutor.java create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoIntersectsFunctionExecutor.java create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoWithinDistanceFunctionExecutor.java create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoWithinFunctionExecutor.java create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/internal/util/ClosestOperation.java create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/internal/util/GeoOperation.java create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/internal/util/GeometryUtils.java create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/internal/util/IntersectsOperation.java create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/internal/util/WithinDistanceOperation.java create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/internal/util/WithinOperation.java create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoCrossesStreamProcessor.java create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoLocationApproximateStreamProcessor.java create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoProximityStreamProcessor.java create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoStationaryStreamProcessor.java create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/stream/function/GeoClosestPointsStreamFunctionProcessor.java create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/resources/geo.siddhiext create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/GeoTestCase.java create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoDistanceTestCase.java create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoIntersectsTestCase.java create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoWithinDistanceTestCase.java create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoWithinTestCase.java create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoCrossesTestCase.java create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoProximityTestCase.java create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoStationaryTestCase.java create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/stream/function/AverageLocationTestCase.java create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/stream/function/GeoClosetPointTestCase.java create mode 100644 components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/resources/log4j.properties diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/function/AbstractGeoOperationExecutor.java b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/function/AbstractGeoOperationExecutor.java new file mode 100644 index 0000000000..18e4c9041b --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/function/AbstractGeoOperationExecutor.java @@ -0,0 +1,115 @@ + +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.gpl.siddhi.extensions.geo.function; + +import org.wso2.siddhi.core.config.ExecutionPlanContext; +import org.wso2.siddhi.core.executor.ExpressionExecutor; +import org.wso2.siddhi.core.executor.function.FunctionExecutor; +import org.wso2.gpl.siddhi.extensions.geo.internal.util.GeoOperation; +import org.wso2.siddhi.query.api.definition.Attribute.Type; + +public abstract class AbstractGeoOperationExecutor extends FunctionExecutor { + + GeoOperation geoOperation; + + /** + * The initialization method for FunctionExecutor, this method will be called before the other methods + * + * @param attributeExpressionExecutors are the executors of each function parameters + * @param executionPlanContext the context of the execution plan + */ + @Override + protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { + this.geoOperation.init(attributeExpressionExecutors, 0, attributeExpressionExecutors.length); + } + /** + * The main executions method which will be called upon event arrival + * + * @param data the runtime values of the attributeExpressionExecutors + * @return + */ + @Override + protected Object execute(Object[] data) { + return geoOperation.process(data); + } + + /** + * The main execution method which will be called upon event arrival + * which has zero or one function parameter + * + * @param data null if the function parameter count is zero or + * runtime data value of the function parameter + * @return the function result + */ + @Override + protected Object execute(Object data) { + throw new IllegalStateException(this.getClass().getCanonicalName() + " cannot execute data " + data); + } + + /** + * This will be called only once, to acquire required resources + * after initializing the system and before processing the events. + */ + @Override + public void start() { + + } + + /** + * This will be called only once, to release the acquired resources + * before shutting down the system. + */ + @Override + public void stop() { + + } + + /** + * The serializable state of the element, that need to be + * persisted for the reconstructing the element to the same state + * on a different point of time + * + * @return stateful objects of the element as an array + */ + @Override + public Object[] currentState() { + return new Object[0]; + } + + /** + * The serialized state of the element, for reconstructing + * the element to the same state as if was on a previous point of time. + * + * @param state the stateful objects of the element as an array on + * the same order provided by currentState(). + */ + @Override + public void restoreState(Object[] state) { + + } + + //TODO: look into cloning + + + public Type getReturnType() { + return geoOperation.getReturnType(); + } + +} diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoDistanceFunctionExecutor.java b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoDistanceFunctionExecutor.java new file mode 100644 index 0000000000..c7e45d5e58 --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoDistanceFunctionExecutor.java @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.gpl.siddhi.extensions.geo.function; + +import org.wso2.siddhi.core.config.ExecutionPlanContext; +import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException; +import org.wso2.siddhi.core.executor.ExpressionExecutor; +import org.wso2.siddhi.core.executor.function.FunctionExecutor; +import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException; + +/** + * geo:distance(location1Latitude, location1Longitude, location2Latitude, location2Longitude) + * + * This method gives the distance between two geo locations in meters + * + * location1Latitude - latitude value of 1st location + * location1Longitude - longitude value of 1st location + * location2Latitude - latitude value of 2nd location + * location2Longitude - longitude value of 2nd location + * + * Accept Type(s) for geo:distance(location1Latitude, location1Longitude, location2Latitude, location2Longitude); + * location1Latitude : DOUBLE + * location1Longitude : DOUBLE + * location2Latitude : DOUBLE + * location2Longitude : DOUBLE + * + * Return Type(s): DOUBLE + */ +public class GeoDistanceFunctionExecutor extends FunctionExecutor { + Attribute.Type returnType = Attribute.Type.DOUBLE; + @Override + public void start() { + + } + + @Override + public void stop() { + + } + + @Override + public Object[] currentState() { + return new Object[0]; + } + + @Override + public void restoreState(Object[] state) { + + } + + @Override + protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { + if (attributeExpressionExecutors.length != 4) { + throw new ExecutionPlanValidationException("Invalid no of arguments passed to " + + "geo:distance() function, " + + "requires 4, but found " + attributeExpressionExecutors.length); + } + } + + @Override + protected Object execute(Object[] data) { + if (data[0] == null) { + throw new ExecutionPlanRuntimeException("Invalid input given to geo:distance() " + + "function. First argument should be double"); + } + if (data[1] == null) { + throw new ExecutionPlanRuntimeException("Invalid input given to geo:distance() " + + "function. Second argument should be double"); + } + if (data[2] == null) { + throw new ExecutionPlanRuntimeException("Invalid input given to geo:distance() " + + "function. Third argument should be double"); + } + if (data[3] == null) { + throw new ExecutionPlanRuntimeException("Invalid input given to geo:distance() " + + "function. Fourth argument should be double"); + } + + double latitude = (Double) data[0]; + double longitude = (Double) data[1]; + double prevLatitude = (Double) data[2]; + double prevLongitude = (Double) data[3]; + + int R = 6371000; // Radius of the earth in m + latitude = latitude * (Math.PI / 180); + prevLatitude = prevLatitude * (Math.PI / 180); + longitude = longitude * (Math.PI / 180); + prevLongitude = prevLongitude * (Math.PI / 180); + double dlon = prevLongitude - longitude; + double dlat = prevLatitude - latitude; + double a = Math.sin(dlat / 2) * Math.sin(dlat / 2) + Math.cos(latitude) * Math.cos(prevLatitude) * + Math.sin(dlon / 2) * Math.sin(dlon / 2); + double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a)); + return R * c; + } + + @Override + protected Object execute(Object data) { + return null; + } + + @Override + public Attribute.Type getReturnType() { + return returnType; + } + +} \ No newline at end of file diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoIntersectsFunctionExecutor.java b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoIntersectsFunctionExecutor.java new file mode 100644 index 0000000000..01ec9a8993 --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoIntersectsFunctionExecutor.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.gpl.siddhi.extensions.geo.function; + +import org.wso2.gpl.siddhi.extensions.geo.internal.util.IntersectsOperation; + +public class GeoIntersectsFunctionExecutor extends AbstractGeoOperationExecutor { + public GeoIntersectsFunctionExecutor(){ + this.geoOperation = new IntersectsOperation(); + } +} + diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoWithinDistanceFunctionExecutor.java b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoWithinDistanceFunctionExecutor.java new file mode 100644 index 0000000000..7d39ec37ab --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoWithinDistanceFunctionExecutor.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.gpl.siddhi.extensions.geo.function; + +import org.wso2.siddhi.core.config.ExecutionPlanContext; +import org.wso2.siddhi.core.exception.ExecutionPlanCreationException; +import org.wso2.siddhi.core.executor.ExpressionExecutor; +import org.wso2.gpl.siddhi.extensions.geo.internal.util.WithinDistanceOperation; +import org.wso2.siddhi.query.api.definition.Attribute; + +public class GeoWithinDistanceFunctionExecutor extends AbstractGeoOperationExecutor { + public GeoWithinDistanceFunctionExecutor() { + this.geoOperation = new WithinDistanceOperation(); + } + /** + * The initialization method for FunctionExecutor, this method will be called before the other methods + * + * @param attributeExpressionExecutors are the executors of each function parameters + * @param executionPlanContext the context of the execution plan + */ + @Override + protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { + this.geoOperation.init(attributeExpressionExecutors, 0, attributeExpressionExecutors.length); + if (attributeExpressionExecutors[attributeExpressionExecutors.length - 1].getReturnType() != Attribute.Type.DOUBLE) { + throw new ExecutionPlanCreationException("Last argument should be a double"); + } + } +} diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoWithinFunctionExecutor.java b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoWithinFunctionExecutor.java new file mode 100644 index 0000000000..8ef2cb92ce --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoWithinFunctionExecutor.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.gpl.siddhi.extensions.geo.function; + +import org.wso2.gpl.siddhi.extensions.geo.internal.util.WithinOperation; + +public class GeoWithinFunctionExecutor extends AbstractGeoOperationExecutor { + public GeoWithinFunctionExecutor() { + this.geoOperation = new WithinOperation(); + } +} diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/internal/util/ClosestOperation.java b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/internal/util/ClosestOperation.java new file mode 100644 index 0000000000..70f97c9cb4 --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/internal/util/ClosestOperation.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.gpl.siddhi.extensions.geo.internal.util; + +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.geom.prep.PreparedGeometry; +import com.vividsolutions.jts.operation.distance.DistanceOp; +import org.wso2.siddhi.query.api.definition.Attribute; + +public class ClosestOperation extends org.wso2.gpl.siddhi.extensions.geo.internal.util.GeoOperation { + @Override + public Object operation(Geometry a, Geometry b, Object[] data) { + DistanceOp distOp = new DistanceOp(a, b); + return distOp.nearestPoints(); + } + + @Override + public Object operation(Geometry a, PreparedGeometry b, Object[] data) { + DistanceOp distOp = new DistanceOp(a, b.getGeometry()); + return distOp.nearestPoints(); + } + + @Override + public Attribute.Type getReturnType() { + return null; + } +} diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/internal/util/GeoOperation.java b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/internal/util/GeoOperation.java new file mode 100644 index 0000000000..544c05e15b --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/internal/util/GeoOperation.java @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.gpl.siddhi.extensions.geo.internal.util; + +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.geom.prep.PreparedGeometry; +import org.wso2.siddhi.core.exception.ExecutionPlanCreationException; +import org.wso2.siddhi.core.executor.ConstantExpressionExecutor; +import org.wso2.siddhi.core.executor.ExpressionExecutor; +import org.wso2.siddhi.query.api.definition.Attribute; + +public abstract class GeoOperation { + public boolean point = false; + protected Object data; + public PreparedGeometry geometry = null; + + public void init(ExpressionExecutor[] attributeExpressionExecutors, int start, int end) { + int position = start; + if (attributeExpressionExecutors[position].getReturnType() == Attribute.Type.DOUBLE) { + point = true; + if (attributeExpressionExecutors[position + 1].getReturnType() != Attribute.Type.DOUBLE) { + throw new ExecutionPlanCreationException("Longitude and Latitude must be provided as double values"); + } + ++position; + } else if (attributeExpressionExecutors[position].getReturnType() == Attribute.Type.STRING) { + point = false; + } else { + throw new ExecutionPlanCreationException((position + 1) + + " parameter should be a string for a geometry or a double for a latitude"); + } + ++position; + if (position >= end) { + return; + } + if (attributeExpressionExecutors[position].getReturnType() != Attribute.Type.STRING) { + throw new ExecutionPlanCreationException((position + 1) + " parameter should be a GeoJSON geometry string"); + } + if (attributeExpressionExecutors[position] instanceof ConstantExpressionExecutor) { + String strGeometry = attributeExpressionExecutors[position].execute(null).toString(); + geometry = GeometryUtils.preparedGeometryFromJSON(strGeometry); + } + } + + public Object process(Object[] data) { + Geometry currentGeometry; + if (point) { + double longitude = (Double) data[0]; + double latitude = (Double) data[1]; + currentGeometry = GeometryUtils.createPoint(longitude, latitude); + } else { + currentGeometry = GeometryUtils.geometryFromJSON(data[0].toString()); + } + if (geometry != null) { + return operation(currentGeometry, geometry, data); + } else { + return operation(currentGeometry, GeometryUtils.geometryFromJSON(data[point ? 2 : 1].toString()), + data); + } + } + + public Geometry getCurrentGeometry(Object[] data) { + if (point) { + double longitude = (Double) data[0]; + double latitude = (Double) data[1]; + return GeometryUtils.createPoint(longitude, latitude); + } else { + return GeometryUtils.createGeometry(data[0]); + } + } + + public abstract Object operation(Geometry a, Geometry b, Object[] data); + + public abstract Object operation(Geometry a, PreparedGeometry b, Object[] data); + + public abstract Attribute.Type getReturnType(); +} diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/internal/util/GeometryUtils.java b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/internal/util/GeometryUtils.java new file mode 100644 index 0000000000..c6a1029993 --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/internal/util/GeometryUtils.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.gpl.siddhi.extensions.geo.internal.util; + +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.vividsolutions.jts.geom.Coordinate; +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.geom.GeometryFactory; +import com.vividsolutions.jts.geom.Point; +import com.vividsolutions.jts.geom.prep.PreparedGeometry; +import com.vividsolutions.jts.geom.prep.PreparedGeometryFactory; +import org.geotools.geojson.geom.GeometryJSON; +import org.geotools.geometry.jts.JTSFactoryFinder; + +import java.io.IOException; +import java.io.StringWriter; + +public class GeometryUtils { + + public static final double TO_DEGREE = 110574.61087757687; + private static final String COORDINATES = "coordinates"; + private static final String RADIUS = "radius"; + private static GeometryFactory geometryFactory = JTSFactoryFinder.getGeometryFactory(); + //JTSFactoryFinder.getGeometryFactory(new Hints(Hints.CRS, DefaultGeographicCRS.WGS84)); + private static GeometryJSON geometryJSON = new GeometryJSON(10); + + public static Geometry geometryFromJSON(String strGeometry) { + if (strGeometry.contains(RADIUS)) { + JsonObject jsonObject = new JsonParser().parse(strGeometry).getAsJsonObject(); + JsonArray jLocCoordinatesArray = jsonObject.getAsJsonArray(COORDINATES); + Coordinate coords = new Coordinate(Double.parseDouble(jLocCoordinatesArray.get(0) + .toString()), Double.parseDouble(jLocCoordinatesArray.get(1).toString())); + Point point = geometryFactory.createPoint(coords); // create the points for GeoJSON file points + double radius = Double.parseDouble(jsonObject.get(RADIUS).toString()) / TO_DEGREE; //convert to degrees + return point.buffer(radius); //draw the buffer + } else { + try { + return geometryJSON.read(strGeometry.replace("'", "\"")); + } catch (IOException e) { + throw new RuntimeException("Failed to create a geometry from given str " + strGeometry, e); + } + } + } + + public static String geometrytoJSON(Geometry geometry) { + StringWriter sw = new StringWriter(); + try { + geometryJSON.write(geometry, sw); + } catch (IOException e) { + throw new RuntimeException("Failed to create a json string from given geometry " + geometry, e); + } + return sw.toString(); + } + + public static PreparedGeometry preparedGeometryFromJSON(String strGeometry) { + return PreparedGeometryFactory.prepare(geometryFromJSON(strGeometry)); + } + public static Point createPoint(double longitude, double latitude) { + return geometryFactory.createPoint(new Coordinate(longitude, latitude)); + } + public static Geometry createGeometry(Object data){ + if(data instanceof Geometry) { + return (Geometry) data; + } + else { + return geometryFromJSON(data.toString()); + } + } +} \ No newline at end of file diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/internal/util/IntersectsOperation.java b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/internal/util/IntersectsOperation.java new file mode 100644 index 0000000000..5617573609 --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/internal/util/IntersectsOperation.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.gpl.siddhi.extensions.geo.internal.util; + +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.geom.prep.PreparedGeometry; +import org.wso2.siddhi.query.api.definition.Attribute; + +public class IntersectsOperation extends GeoOperation { + @Override + public Object operation(Geometry a, Geometry b, Object[] data) { + return a.intersects(b); + } + + @Override + public Object operation(Geometry a, PreparedGeometry b, Object[] data) { + return b.intersects(a); + } + + @Override + public Attribute.Type getReturnType() { + return Attribute.Type.BOOL; + } +} diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/internal/util/WithinDistanceOperation.java b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/internal/util/WithinDistanceOperation.java new file mode 100644 index 0000000000..7aa988bdcd --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/internal/util/WithinDistanceOperation.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.gpl.siddhi.extensions.geo.internal.util; + +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.geom.prep.PreparedGeometry; +import org.wso2.siddhi.query.api.definition.Attribute; + +public class WithinDistanceOperation extends GeoOperation { + @Override + public Object operation(Geometry a, Geometry b, Object[] data) { + return a.isWithinDistance(b, (Double) data[data.length - 1] / GeometryUtils.TO_DEGREE); + } + + @Override + public Object operation(Geometry a, PreparedGeometry b, Object[] data) { + return a.isWithinDistance(b.getGeometry(), (Double) data[data.length - 1] / GeometryUtils.TO_DEGREE); + } + + @Override + public Attribute.Type getReturnType() { + return Attribute.Type.BOOL; + } +} diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/internal/util/WithinOperation.java b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/internal/util/WithinOperation.java new file mode 100644 index 0000000000..4ef44b183d --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/internal/util/WithinOperation.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.gpl.siddhi.extensions.geo.internal.util; + +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.geom.prep.PreparedGeometry; +import org.wso2.siddhi.query.api.definition.Attribute; + +public class WithinOperation extends GeoOperation { + @Override + public Object operation(Geometry a, Geometry b, Object[] data) { + return a.within(b); + } + + @Override + public Object operation(Geometry a, PreparedGeometry b, Object[] data) { + return b.contains(a); + } + + @Override + public Attribute.Type getReturnType() { + return Attribute.Type.BOOL; + } +} diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoCrossesStreamProcessor.java b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoCrossesStreamProcessor.java new file mode 100644 index 0000000000..ab429b36e3 --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoCrossesStreamProcessor.java @@ -0,0 +1,138 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.gpl.siddhi.extensions.geo.stream; + +import org.wso2.siddhi.core.config.ExecutionPlanContext; +import org.wso2.siddhi.core.event.ComplexEvent; +import org.wso2.siddhi.core.event.ComplexEventChunk; +import org.wso2.siddhi.core.event.stream.StreamEvent; +import org.wso2.siddhi.core.event.stream.StreamEventCloner; +import org.wso2.siddhi.core.event.stream.populater.ComplexEventPopulater; +import org.wso2.siddhi.core.executor.ExpressionExecutor; +import org.wso2.siddhi.core.query.processor.Processor; +import org.wso2.siddhi.core.query.processor.stream.StreamProcessor; +import org.wso2.gpl.siddhi.extensions.geo.internal.util.GeoOperation; +import org.wso2.gpl.siddhi.extensions.geo.internal.util.WithinOperation; +import org.wso2.siddhi.query.api.definition.AbstractDefinition; +import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.definition.Attribute.Type; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +public class GeoCrossesStreamProcessor extends StreamProcessor { + + private GeoOperation geoOperation; + private Set set = Collections.newSetFromMap(new ConcurrentHashMap()); + + /** + * The init method of the StreamProcessor, this method will be called before other methods + * + * @param inputDefinition the incoming stream definition + * @param attributeExpressionExecutors the executors of each function parameters + * @param executionPlanContext the context of the execution plan + * @return the additional output attributes introduced by the function + */ + @Override + protected List init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { + geoOperation = new WithinOperation(); + geoOperation.init(attributeExpressionExecutors, 1, attributeExpressionLength); + List attributeList = new ArrayList(1); + attributeList.add(new Attribute("crosses", Type.BOOL)); + return attributeList; + } + + /** + * This will be called only once, to acquire required resources + * after initializing the system and before processing the events. + */ + @Override + public void start() { + + } + + /** + * This will be called only once, to release the acquired resources + * before shutting down the system. + */ + @Override + public void stop() { + + } + + /** + * The serializable state of the element, that need to be + * persisted for the reconstructing the element to the same state + * on a different point of time + * + * @return stateful objects of the element as an array + */ + @Override + public Object[] currentState() { + return new Object[0]; + } + + /** + * The serialized state of the element, for reconstructing + * the element to the same state as if was on a previous point of time. + * + * @param state the stateful objects of the element as an array on + * the same order provided by currentState(). + */ + @Override + public void restoreState(Object[] state) { + + } + + @Override + protected void process(ComplexEventChunk streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) { + while (streamEventChunk.hasNext()) { + ComplexEvent complexEvent = streamEventChunk.next(); + + Object[] data = new Object[attributeExpressionLength - 1]; + for (int i = 1; i < attributeExpressionLength; i++) { + data[i-1] = attributeExpressionExecutors[i].execute(complexEvent); + } + boolean within = (Boolean)geoOperation.process(data); + + String id = attributeExpressionExecutors[0].execute(complexEvent).toString(); + if (set.contains(id)) { + if (!within) { + //alert out + complexEventPopulater.populateComplexEvent(complexEvent, new Object[]{within}); + set.remove(id); + } else { + streamEventChunk.remove(); + } + } else { + if (within) { + //alert in + complexEventPopulater.populateComplexEvent(complexEvent, new Object[]{within}); + set.add(id); + } else { + streamEventChunk.remove(); + } + } + } + nextProcessor.process(streamEventChunk); + } +} diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoLocationApproximateStreamProcessor.java b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoLocationApproximateStreamProcessor.java new file mode 100644 index 0000000000..52321260d3 --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoLocationApproximateStreamProcessor.java @@ -0,0 +1,246 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.gpl.siddhi.extensions.geo.stream; + +import org.wso2.siddhi.core.config.ExecutionPlanContext; +import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException; +import org.wso2.siddhi.core.executor.ExpressionExecutor; +import org.wso2.siddhi.core.query.processor.stream.function.StreamFunctionProcessor; +import org.wso2.siddhi.query.api.definition.AbstractDefinition; +import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * geoLocationApproximate(locationRecorder, latitude, longitude, sensorProximity, sensorUUID, sensorWeight, timestamp) + * + * This method computed the average location of the locationRecorder using the collection iBeacons which the location + * recorder resides. + * + * locationRecorder - unique id of the object or item + * latitude - latitude value of the iBeacon + * longitude - longitude value of the iBeacon + * sensorProximity - proximity which will be given by the iBeacon (eg: ENTER, RANGE, EXIT) + * sensorUUID - unique id of the iBeacon + * sensorWeight - weight of the iBeacon which influence the averaging of the location (eg: approximate distance from + * the beacon + * timestamp - timestamp of the log which will be used to remove iBeacon from one's collection when there is no + * new log for 5 minutes + * + * Accept Type(s) for geoLocationApproximate(locationRecorder, latitude, longitude, sensorProximity, sensorUUID, + * sensorWeight, timestamp); + * locationRecorder : STRING + * latitude : DOUBLE + * longitude : DOUBLE + * sensorProximity : STRING + * sensorUUID : STRING + * sensorWeight : DOUBLE + * timestamp : LONG + * + * Return Type(s): DOUBLE, DOUBLE, BOOL + * + */ +public class GeoLocationApproximateStreamProcessor extends StreamFunctionProcessor { + + //locationRecorder,uuid -> BeaconValueHolder + private Map> + personSpecificRecordLocatorMaps; + + @Override + public void start() { + + } + + @Override + public void stop() { + + } + + @Override + public Object[] currentState() { + return new Object[]{personSpecificRecordLocatorMaps}; + } + + @Override + public void restoreState(Object[] state) { + personSpecificRecordLocatorMaps = (Map>) state[0]; + } + + @Override + protected Object[] process(Object[] data) { + if (data[0] == null) { + throw new ExecutionPlanRuntimeException("Invalid input given to geo:locationApproximate() " + + "function. First argument should be string"); + } + if (data[1] == null) { + throw new ExecutionPlanRuntimeException("Invalid input given to geo:locationApproximate() " + + "function. Second argument should be double"); + } + if (data[2] == null) { + throw new ExecutionPlanRuntimeException("Invalid input given to geo:locationApproximate() " + + "function. Third argument should be double"); + } + if (data[3] == null) { + throw new ExecutionPlanRuntimeException("Invalid input given to geo:locationApproximate() " + + "function. Forth argument should be string"); + } + if (data[4] == null) { + throw new ExecutionPlanRuntimeException("Invalid input given to geo:locationApproximate() " + + "function. Fifth argument should be string"); + } + if (data[5] == null) { + throw new ExecutionPlanRuntimeException("Invalid input given to geo:locationApproximate() " + + "function. Sixth argument should be double"); + } + if (data[6] == null) { + throw new ExecutionPlanRuntimeException("Invalid input given to geo:locationApproximate() " + + "function. Seventh argument should be long"); + } + + String locationRecorder = (String) data[0]; + double latitude = (Double) data[1]; + double longitude = (Double) data[2]; + String beaconProximity = (String) data[3]; + String uuid = (String) data[4]; + double weight = (Double) data[5]; //is calculated previously eg: distance + long timestamp = (Long) data[6]; + + if (personSpecificRecordLocatorMaps.get(locationRecorder) == null) { + personSpecificRecordLocatorMaps.put(locationRecorder, new ConcurrentHashMap()); + } + //both "enter" and "range" attributes are there in the logs I retrieve when it comes to cleaning logs + if ("ENTER".equalsIgnoreCase(beaconProximity) || "RANGE".equalsIgnoreCase(beaconProximity)) { + if (personSpecificRecordLocatorMaps.get(locationRecorder).containsKey(uuid)) { + BeaconValueHolder tempBeaconValue = personSpecificRecordLocatorMaps.get(locationRecorder).get(uuid); + if (tempBeaconValue.getLastUpdatedTime() < timestamp) { + BeaconValueHolder beaconValueHolder = new BeaconValueHolder(latitude, longitude, timestamp, weight); + personSpecificRecordLocatorMaps.get(locationRecorder).put(uuid, beaconValueHolder); + } + } else { + BeaconValueHolder beaconValueHolder = new BeaconValueHolder(latitude, longitude, timestamp, weight); + personSpecificRecordLocatorMaps.get(locationRecorder).put(uuid, beaconValueHolder); + } + } else { + if (personSpecificRecordLocatorMaps.get(locationRecorder).containsKey(uuid)) { + BeaconValueHolder tempBeaconValue = personSpecificRecordLocatorMaps.get(locationRecorder).get(uuid); + if (tempBeaconValue.getLastUpdatedTime() < timestamp) { + personSpecificRecordLocatorMaps.get(locationRecorder).remove(uuid); + } + } + } + + int noOfSensors = personSpecificRecordLocatorMaps.get(locationRecorder).size(); + double sensorValues[][] = new double[noOfSensors][3]; + int actualNoOfSensors = 0; + double totalWeight = 0; + for (Map.Entry beaconLocation : personSpecificRecordLocatorMaps.get(locationRecorder).entrySet()) { + BeaconValueHolder beaconValueHolder = beaconLocation.getValue(); + long prevTimestamp = beaconValueHolder.getLastUpdatedTime(); + if ((timestamp - prevTimestamp) > 300000) { + //if there is a beacon which has a log older than 5 minutes, removing the beacon assuming the + //device has gone away from that beacon + personSpecificRecordLocatorMaps.get(locationRecorder).remove(beaconLocation.getKey()); + } else { + sensorValues[actualNoOfSensors][0] = beaconValueHolder.getLatitude(); + sensorValues[actualNoOfSensors][1] = beaconValueHolder.getLongitude(); + sensorValues[actualNoOfSensors][2] = beaconValueHolder.getBeaconDistance(); + totalWeight += beaconValueHolder.getBeaconDistance(); + actualNoOfSensors++; + } + } + if (actualNoOfSensors == 0) { + return new Object[]{latitude, longitude, false}; + } + + double tempLatitude, tempLongitude; + double x = 0; + double y = 0; + double z = 0; + for (int i = 0; i < actualNoOfSensors; i++) { + weight = sensorValues[i][2] / totalWeight; + tempLatitude = sensorValues[i][0] * Math.PI / 180.0; + tempLongitude = sensorValues[i][1] * Math.PI / 180.0; + x += Math.cos(tempLatitude) * Math.cos(tempLongitude) * weight; + y += Math.cos(tempLatitude) * Math.sin(tempLongitude) * weight; + z += Math.sin(tempLatitude) * weight; + } + longitude = Math.atan2(y, x) * 180 / Math.PI; + double hyp = Math.sqrt(x * x + y * y); + latitude = Math.atan2(z, hyp) * 180 / Math.PI; + + return new Object[]{latitude, longitude, true}; + } + + @Override + protected Object[] process(Object data) { + return new Object[0]; + } + + @Override + protected List init(AbstractDefinition inputDefinition, + ExpressionExecutor[] attributeExpressionExecutors, + ExecutionPlanContext executionPlanContext) { + personSpecificRecordLocatorMaps = new ConcurrentHashMap>(); + if (attributeExpressionExecutors.length != 7) { + throw new ExecutionPlanValidationException("Invalid no of arguments passed to " + + "geo:locationApproximate() function, " + + "requires 7, but found " + attributeExpressionExecutors.length); + } + ArrayList attributes = new ArrayList(3); + attributes.add(new Attribute("averagedLatitude", Attribute.Type.DOUBLE)); + attributes.add(new Attribute("averagedLongitude", Attribute.Type.DOUBLE)); + attributes.add(new Attribute("averageExist", Attribute.Type.BOOL)); + return attributes; + } + + private class BeaconValueHolder { + private double latitude; + private double longitude; + private long lastUpdatedTime; + private double beaconDistance; + + public BeaconValueHolder(double latitude, double longitude, long lastUpdatedTime, double + beaconDistance) { + this.latitude = latitude; + this.longitude = longitude; + this.lastUpdatedTime = lastUpdatedTime; + this.beaconDistance = beaconDistance; + } + + public long getLastUpdatedTime() { + return lastUpdatedTime; + } + + public double getBeaconDistance() { + return beaconDistance; + } + + public double getLatitude() { + return latitude; + } + + public double getLongitude() { + return longitude; + } + } +} diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoProximityStreamProcessor.java b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoProximityStreamProcessor.java new file mode 100644 index 0000000000..60123f03e0 --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoProximityStreamProcessor.java @@ -0,0 +1,179 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.gpl.siddhi.extensions.geo.stream; + +import com.vividsolutions.jts.geom.Geometry; +import org.wso2.siddhi.core.config.ExecutionPlanContext; +import org.wso2.siddhi.core.event.ComplexEventChunk; +import org.wso2.siddhi.core.event.stream.StreamEvent; +import org.wso2.siddhi.core.event.stream.StreamEventCloner; +import org.wso2.siddhi.core.event.stream.populater.ComplexEventPopulater; +import org.wso2.siddhi.core.exception.ExecutionPlanCreationException; +import org.wso2.siddhi.core.executor.ExpressionExecutor; +import org.wso2.siddhi.core.query.processor.Processor; +import org.wso2.siddhi.core.query.processor.stream.StreamProcessor; +import org.wso2.gpl.siddhi.extensions.geo.internal.util.GeoOperation; +import org.wso2.gpl.siddhi.extensions.geo.internal.util.WithinDistanceOperation; +import org.wso2.siddhi.query.api.definition.AbstractDefinition; +import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.definition.Attribute.Type; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +public class GeoProximityStreamProcessor extends StreamProcessor { + + private GeoOperation geoOperation; + private double radius; + private ConcurrentHashMap map = new ConcurrentHashMap(); + private Set set = Collections.newSetFromMap(new ConcurrentHashMap()); + + /** + * The init method of the StreamProcessor, this method will be called before other methods + * + * @param inputDefinition the incoming stream definition + * @param attributeExpressionExecutors the executors of each function parameters + * @param executionPlanContext the context of the execution plan + * @return the additional output attributes introduced by the function + */ + @Override + protected List init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { + this.geoOperation = new WithinDistanceOperation(); + this.geoOperation.init(attributeExpressionExecutors, 1, attributeExpressionLength - 1); + if (attributeExpressionExecutors[attributeExpressionLength - 1].getReturnType() != Type.DOUBLE) { + throw new ExecutionPlanCreationException("Last parameter should be a double"); + } + radius = (Double) attributeExpressionExecutors[attributeExpressionLength - 1].execute(null); + ArrayList attributeList = new ArrayList(); + attributeList.add(new Attribute("proximityWith", Type.STRING)); + attributeList.add(new Attribute("inCloseProximity", Type.BOOL)); + return attributeList; + } + + /** + * This will be called only once, to acquire required resources + * after initializing the system and before processing the events. + */ + @Override + public void start() { + + } + + /** + * This will be called only once, to release the acquired resources + * before shutting down the system. + */ + @Override + public void stop() { + + } + + /** + * The serializable state of the element, that need to be + * persisted for the reconstructing the element to the same state + * on a different point of time + * + * @return stateful objects of the element as an array + */ + @Override + public Object[] currentState() { + return new Object[0]; + } + + /** + * The serialized state of the element, for reconstructing + * the element to the same state as if was on a previous point of time. + * + * @param state the stateful objects of the element as an array on + * the same order provided by currentState(). + */ + @Override + public void restoreState(Object[] state) { + + } + + /** + * The main processing method that will be called upon event arrival + * + * @param streamEventChunk the event chunk that need to be processed + * @param nextProcessor the next processor to which the success events need to be passed + * @param streamEventCloner helps to clone the incoming event for local storage or modification + * @param complexEventPopulater helps to populate the events with the resultant attributes + */ + @Override + protected void process(ComplexEventChunk streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) { + while (streamEventChunk.hasNext()) { + StreamEvent streamEvent = streamEventChunk.next(); + Geometry currentGeometry, previousGeometry; + Object[] data = new Object[attributeExpressionLength - 1]; + for (int i = 1; i < attributeExpressionLength; i++) { + data[i - 1] = attributeExpressionExecutors[i].execute(streamEvent); + } + String currentId = attributeExpressionExecutors[0].execute(streamEvent).toString(); + String previousId; + currentGeometry = geoOperation.getCurrentGeometry(data); + if(!map.contains(currentId)) { + map.put(currentId, currentGeometry); + } + for (Map.Entry entry : map.entrySet()) { + previousId = entry.getKey(); + if (!previousId.equals(currentId)) { + previousGeometry = entry.getValue(); + boolean within = (Boolean) geoOperation.operation(currentGeometry, previousGeometry, new Object[]{radius}); + String key = makeCompositeKey(currentId, previousId); + boolean contains = set.contains(key); + if (contains) { + if (!within) { + //alert out + StreamEvent newStreamEvent = streamEventCloner.copyStreamEvent(streamEvent); + complexEventPopulater.populateComplexEvent(newStreamEvent, new Object[]{previousId, within}); + streamEventChunk.insertBeforeCurrent(newStreamEvent); + set.remove(key); + } + } else { + if (within) { + //alert in + StreamEvent newStreamEvent = streamEventCloner.copyStreamEvent(streamEvent); + complexEventPopulater.populateComplexEvent(newStreamEvent, new Object[]{previousId, within}); + streamEventChunk.insertBeforeCurrent(newStreamEvent); + set.add(key); + } + } + } + } + streamEventChunk.remove(); + } + nextProcessor.process(streamEventChunk); + } +/* + private Object[] toOutput(Geometry geometry, boolean within) { + if (geoOperation.point) { + return new Object[]{((Point) geometry).getX(), ((Point) geometry).getY(), within}; + } else { + return new Object[]{GeometryUtils.geometrytoJSON(geometry), within}; + } + }*/ + public String makeCompositeKey(String key1, String key2) { + if (key1.compareToIgnoreCase(key2) < 0) { + return key1 + "~" + key2; + } else { + return key2 + "~" + key1; + } + } +} diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoStationaryStreamProcessor.java b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoStationaryStreamProcessor.java new file mode 100644 index 0000000000..7b2909d3af --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoStationaryStreamProcessor.java @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.gpl.siddhi.extensions.geo.stream; + +import com.vividsolutions.jts.geom.Geometry; +import org.wso2.siddhi.core.config.ExecutionPlanContext; +import org.wso2.siddhi.core.event.ComplexEvent; +import org.wso2.siddhi.core.event.ComplexEventChunk; +import org.wso2.siddhi.core.event.stream.StreamEvent; +import org.wso2.siddhi.core.event.stream.StreamEventCloner; +import org.wso2.siddhi.core.event.stream.populater.ComplexEventPopulater; +import org.wso2.siddhi.core.exception.ExecutionPlanCreationException; +import org.wso2.siddhi.core.executor.ExpressionExecutor; +import org.wso2.siddhi.core.query.processor.Processor; +import org.wso2.siddhi.core.query.processor.stream.StreamProcessor; +import org.wso2.gpl.siddhi.extensions.geo.internal.util.GeoOperation; +import org.wso2.gpl.siddhi.extensions.geo.internal.util.WithinDistanceOperation; +import org.wso2.siddhi.query.api.definition.AbstractDefinition; +import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.definition.Attribute.Type; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +public class GeoStationaryStreamProcessor extends StreamProcessor { + + private GeoOperation geoOperation; + private double radius; + private ConcurrentHashMap map = new ConcurrentHashMap(); + + /** + * The init method of the StreamProcessor, this method will be called before other methods + * + * @param inputDefinition the incoming stream definition + * @param attributeExpressionExecutors the executors of each function parameters + * @param executionPlanContext the context of the execution plan + * @return the additional output attributes introduced by the function + */ + @Override + protected List init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { + geoOperation = new WithinDistanceOperation(); + geoOperation.init(attributeExpressionExecutors, 1, attributeExpressionLength - 1); + if (attributeExpressionExecutors[attributeExpressionLength - 1].getReturnType() != Type.DOUBLE) { + throw new ExecutionPlanCreationException("Last parameter should be a double"); + } + radius = (Double) attributeExpressionExecutors[attributeExpressionLength - 1].execute(null); + ArrayList attributeList = new ArrayList(1); + attributeList.add(new Attribute("stationary", Type.BOOL)); + return attributeList; + } + + /** + * This will be called only once, to acquire required resources + * after initializing the system and before processing the events. + */ + @Override + public void start() { + + } + + /** + * This will be called only once, to release the acquired resources + * before shutting down the system. + */ + @Override + public void stop() { + + } + + /** + * The serializable state of the element, that need to be + * persisted for the reconstructing the element to the same state + * on a different point of time + * + * @return stateful objects of the element as an array + */ + @Override + public Object[] currentState() { + return new Object[0]; + } + + /** + * The serialized state of the element, for reconstructing + * the element to the same state as if was on a previous point of time. + * + * @param state the stateful objects of the element as an array on + * the same order provided by currentState(). + */ + @Override + public void restoreState(Object[] state) { + + } + + @Override + protected void process(ComplexEventChunk streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) { + while (streamEventChunk.hasNext()) { + ComplexEvent complexEvent = streamEventChunk.next(); + + Object[] data = new Object[attributeExpressionLength - 1]; + for (int i = 1; i < attributeExpressionLength; i++) { + data[i - 1] = attributeExpressionExecutors[i].execute(complexEvent); + } + Geometry currentGeometry = geoOperation.getCurrentGeometry(data); + String id = attributeExpressionExecutors[0].execute(complexEvent).toString(); + Geometry previousGeometry = map.get(id); + if (previousGeometry == null) { + currentGeometry.setUserData(false); + map.put(id, currentGeometry); + streamEventChunk.remove(); + continue; + } + boolean stationary = (Boolean) geoOperation.operation(currentGeometry, previousGeometry, new Object[]{radius}); + + if((Boolean)previousGeometry.getUserData()) { + if(!stationary) { + //alert out + complexEventPopulater.populateComplexEvent(complexEvent, new Object[]{stationary}); + currentGeometry.setUserData(stationary); + map.put(id, currentGeometry); + } else { + streamEventChunk.remove(); + } + } else { + if (stationary) { + //alert in + previousGeometry.setUserData(stationary); + complexEventPopulater.populateComplexEvent(complexEvent, new Object[]{stationary}); + } else { + currentGeometry.setUserData(stationary); + map.put(id, currentGeometry); + streamEventChunk.remove(); + } + } + } + nextProcessor.process(streamEventChunk); + } +} diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/stream/function/GeoClosestPointsStreamFunctionProcessor.java b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/stream/function/GeoClosestPointsStreamFunctionProcessor.java new file mode 100644 index 0000000000..8dce79f73f --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/java/org/wso2/gpl/siddhi/extensions/geo/stream/function/GeoClosestPointsStreamFunctionProcessor.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.gpl.siddhi.extensions.geo.stream.function; + +import com.vividsolutions.jts.geom.Coordinate; +import org.wso2.siddhi.core.config.ExecutionPlanContext; +import org.wso2.siddhi.core.executor.ExpressionExecutor; +import org.wso2.siddhi.core.query.processor.stream.function.StreamFunctionProcessor; +import org.wso2.gpl.siddhi.extensions.geo.internal.util.ClosestOperation; +import org.wso2.gpl.siddhi.extensions.geo.internal.util.GeoOperation; +import org.wso2.siddhi.query.api.definition.AbstractDefinition; +import org.wso2.siddhi.query.api.definition.Attribute; + +import java.util.ArrayList; +import java.util.List; + +public class GeoClosestPointsStreamFunctionProcessor extends StreamFunctionProcessor { + + GeoOperation geoOperation; + + @Override + protected List init(AbstractDefinition abstractDefinition, ExpressionExecutor[] expressionExecutors, ExecutionPlanContext executionPlanContext) { + this.geoOperation = new ClosestOperation(); + this.geoOperation.init(attributeExpressionExecutors, 0, attributeExpressionExecutors.length); + List attributeList = new ArrayList(4); + attributeList.add(new Attribute("closestPointOf1From2Latitude", Attribute.Type.DOUBLE)); + attributeList.add(new Attribute("closestPointOf1From2Longitude", Attribute.Type.DOUBLE)); + attributeList.add(new Attribute("closestPointOf2From1Latitude", Attribute.Type.DOUBLE)); + attributeList.add(new Attribute("closestPointOf2From1Longitude", Attribute.Type.DOUBLE)); + return attributeList; + } + + @Override + protected Object[] process(Object[] data) { + Coordinate[] coordinates = (Coordinate[]) geoOperation.process(data); + + return new Object[]{coordinates[0].x, coordinates[0].y, coordinates[1].x, coordinates[1].y}; + } + + @Override + protected Object[] process(Object o) { + return null; + } + + @Override + public void start() { + + } + + @Override + public void stop() { + + } + + @Override + public Object[] currentState() { + return new Object[0]; + } + + @Override + public void restoreState(Object[] objects) { + + } +} diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/resources/geo.siddhiext b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/resources/geo.siddhiext new file mode 100644 index 0000000000..679e72f105 --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/main/resources/geo.siddhiext @@ -0,0 +1,26 @@ +# +# Copyright (C) 2015-2016 WSO2 Inc. (http://wso2.com) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +within=org.wso2.gpl.siddhi.extensions.geo.function.GeoWithinFunctionExecutor +intersects=org.wso2.gpl.siddhi.extensions.geo.function.GeoIntersectsFunctionExecutor +withinDistance=org.wso2.gpl.siddhi.extensions.geo.function.GeoWithinDistanceFunctionExecutor +crosses=org.wso2.gpl.siddhi.extensions.geo.stream.GeoCrossesStreamProcessor +proximity=org.wso2.gpl.siddhi.extensions.geo.stream.GeoProximityStreamProcessor +stationary=org.wso2.gpl.siddhi.extensions.geo.stream.GeoStationaryStreamProcessor +closestPoints=org.wso2.gpl.siddhi.extensions.geo.stream.function.GeoClosestPointsStreamFunctionProcessor +locationApproximate=org.wso2.gpl.siddhi.extensions.geo.stream.GeoLocationApproximateStreamProcessor +distance=org.wso2.gpl.siddhi.extensions.geo.function.GeoDistanceFunctionExecutor diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/GeoTestCase.java b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/GeoTestCase.java new file mode 100644 index 0000000000..b55ee235aa --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/GeoTestCase.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.wso2.gpl.siddhi.extensions.geo; + +import org.apache.log4j.Logger; +import org.junit.BeforeClass; +import org.wso2.siddhi.core.ExecutionPlanRuntime; +import org.wso2.siddhi.core.SiddhiManager; +import org.wso2.siddhi.core.stream.input.InputHandler; + +import java.util.ArrayList; + +public abstract class GeoTestCase { + protected static SiddhiManager siddhiManager; + private static Logger logger = Logger.getLogger(GeoTestCase.class); + protected static ArrayList data; + protected static ArrayList expectedResult; + protected static int eventCount; + + @BeforeClass + public static void setUp() throws Exception { + logger.info("Init Siddhi");// Create Siddhi Manager + siddhiManager = new SiddhiManager(); + data = new ArrayList(); + expectedResult = new ArrayList(); + } + + protected void generateEvents(ExecutionPlanRuntime executionPlanRuntime) throws Exception { + InputHandler inputHandler = executionPlanRuntime.getInputHandler("dataIn"); + for (Object[] dataLine : data) { + inputHandler.send(dataLine); + } + } +} diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoDistanceTestCase.java b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoDistanceTestCase.java new file mode 100644 index 0000000000..d8c3c905f2 --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoDistanceTestCase.java @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.gpl.siddhi.extensions.geo.function; + +import junit.framework.Assert; +import org.apache.log4j.Logger; +import org.junit.Before; +import org.junit.Test; +import org.wso2.siddhi.core.ExecutionPlanRuntime; +import org.wso2.siddhi.core.SiddhiManager; +import org.wso2.siddhi.core.event.Event; +import org.wso2.siddhi.core.query.output.callback.QueryCallback; +import org.wso2.siddhi.core.stream.input.InputHandler; +import org.wso2.siddhi.core.util.EventPrinter; + + +public class GeoDistanceTestCase { + static final Logger log = Logger.getLogger(GeoDistanceTestCase.class); + private volatile int count; + private volatile boolean eventArrived; + + @Before + public void init() { + count = 0; + eventArrived = false; + } + + @Test + public void testGeoDistanceTestCase() throws InterruptedException { + log.info("testGeoDistance TestCase"); + SiddhiManager siddhiManager = new SiddhiManager(); + + ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime( + "@config(async = 'true') " + + "define stream cleanedStream (latitude double, longitude double, prevLatitude double, " + + "prevLongitude double); " + + "@info(name = 'query1') " + + "from cleanedStream " + + "select geo:distance(latitude, longitude, prevLatitude, prevLongitude) as distance " + + "insert into dataOut;"); + + executionPlanRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + EventPrinter.print(timeStamp, inEvents, removeEvents); + for (Event event : inEvents) { + count++; + if (count == 1) { + Assert.assertEquals(2322119.848252557, event.getData(0)); + eventArrived = true; + } else if (count == 2) { + Assert.assertEquals(871946.8734223971, event.getData(0)); + eventArrived = true; + } + } + } + }); + + InputHandler inputHandler = executionPlanRuntime.getInputHandler("cleanedStream"); + executionPlanRuntime.start(); + //getting distance near equator + inputHandler.send(new Object[]{8.116553, 77.523679, 9.850047, 98.597177}); + Thread.sleep(500); + //getting distance away from equator + inputHandler.send(new Object[]{54.432063, 19.669778, 59.971487, 29.958951}); + Thread.sleep(100); + + Assert.assertEquals(2, count); + Assert.assertTrue(eventArrived); + executionPlanRuntime.shutdown(); + } +} diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoIntersectsTestCase.java b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoIntersectsTestCase.java new file mode 100644 index 0000000000..46e96d6fe5 --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoIntersectsTestCase.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.gpl.siddhi.extensions.geo.function; + +import org.apache.log4j.Logger; +import org.junit.Assert; +import org.junit.Test; +import org.wso2.siddhi.core.ExecutionPlanRuntime; +import org.wso2.siddhi.core.event.Event; +import org.wso2.siddhi.core.query.output.callback.QueryCallback; +import org.wso2.gpl.siddhi.extensions.geo.GeoTestCase; + +public class GeoIntersectsTestCase extends GeoTestCase { + private static Logger logger = Logger.getLogger(GeoIntersectsTestCase.class); + + @Test + public void testGeometry() throws Exception { + logger.info("TestGeometry"); + + data.clear(); + expectedResult.clear(); + eventCount = 0; + + data.add(new Object[]{"{'type':'Polygon','coordinates':[[[0.5, 0.5],[0.5, 1.5],[1.5, 1.5],[1.5, 0.5],[0.5, 0.5]]]}"}); + expectedResult.add(true); + data.add(new Object[]{"{'type':'Circle','coordinates':[-1, -1], 'radius':221148}"}); + expectedResult.add(true); + data.add(new Object[]{"{'type':'Point','coordinates':[2, 0]}"}); + expectedResult.add(false); + data.add(new Object[]{"{'type':'Polygon','coordinates':[[[2, 2],[2, 1],[1, 1],[1, 2],[2, 2]]]}"}); + expectedResult.add(true); + + String executionPlan = "@config(async = 'true') define stream dataIn (geometry string);" + + "@info(name = 'query1') from dataIn" + + " select geo:intersects(geometry, \"{'type':'Polygon','coordinates':[[[0, 0],[0, 1],[1, 1],[1, 0],[0, 0]]]}\") as intersects \n" + + " insert into dataOut"; + + long start = System.currentTimeMillis(); + ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan); + long end = System.currentTimeMillis(); + logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f))); + executionPlanRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + for (Event event : inEvents) { + logger.info(event); + Boolean isWithin = (Boolean) event.getData(0); + Assert.assertEquals(expectedResult.get(eventCount++), isWithin); + } + } + }); + executionPlanRuntime.start(); + generateEvents(executionPlanRuntime); + Thread.sleep(1000); + Assert.assertEquals(expectedResult.size(), eventCount); + } +} \ No newline at end of file diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoWithinDistanceTestCase.java b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoWithinDistanceTestCase.java new file mode 100644 index 0000000000..65ad846298 --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoWithinDistanceTestCase.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.gpl.siddhi.extensions.geo.function; + +import org.apache.log4j.Logger; +import org.junit.Assert; +import org.junit.Test; +import org.wso2.siddhi.core.ExecutionPlanRuntime; +import org.wso2.siddhi.core.event.Event; +import org.wso2.siddhi.core.query.output.callback.QueryCallback; +import org.wso2.gpl.siddhi.extensions.geo.GeoTestCase; + +public class GeoWithinDistanceTestCase extends GeoTestCase { + private static Logger logger = Logger.getLogger(GeoWithinDistanceTestCase.class); + + @Test + public void testGeometry() throws Exception { + logger.info("TestGeometry"); + + data.clear(); + expectedResult.clear(); + eventCount = 0; + + data.add(new Object[]{"{'type':'Polygon','coordinates':[[[0.5, 0.5],[0.5, 1.5],[1.5, 1.5],[1.5, 0.5],[0.5, 0.5]]]}"}); + expectedResult.add(true); + data.add(new Object[]{"{'type':'Circle','coordinates':[-1, -1], 'radius':110575}"}); + expectedResult.add(true); + data.add(new Object[]{"{'type':'Point','coordinates':[3, 0]}"}); + expectedResult.add(false); + data.add(new Object[]{"{'type':'Polygon','coordinates':[[[3, 3],[3, 2],[2, 2],[2, 3],[3, 3]]]}"}); + expectedResult.add(false); + + String executionPlan = "@config(async = 'true') define stream dataIn (geometry string);" + + "@info(name = 'query1') from dataIn" + + " select geo:withinDistance(geometry, \"{'type':'Polygon','coordinates':[[[0, 0],[0, 1],[1, 1],[1, 0],[0, 0]]]}\", 110574.61087757687) as intersects \n" + + " insert into dataOut"; + + long start = System.currentTimeMillis(); + ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan); + long end = System.currentTimeMillis(); + logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f))); + executionPlanRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + for (Event event : inEvents) { + logger.info(event); + Boolean isWithin = (Boolean) event.getData(0); + Assert.assertEquals(expectedResult.get(eventCount++), isWithin); + } + } + }); + executionPlanRuntime.start(); + generateEvents(executionPlanRuntime); + Thread.sleep(1000); + Assert.assertEquals(expectedResult.size(), eventCount); + } +} \ No newline at end of file diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoWithinTestCase.java b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoWithinTestCase.java new file mode 100644 index 0000000000..b91acc5529 --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/function/GeoWithinTestCase.java @@ -0,0 +1,268 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.wso2.gpl.siddhi.extensions.geo.function; + +import org.apache.log4j.Logger; +import org.junit.Assert; +import org.junit.Test; +import org.wso2.siddhi.core.ExecutionPlanRuntime; +import org.wso2.siddhi.core.event.Event; +import org.wso2.siddhi.core.query.output.callback.QueryCallback; +import org.wso2.siddhi.core.stream.input.InputHandler; +import org.wso2.siddhi.core.util.EventPrinter; +import org.wso2.gpl.siddhi.extensions.geo.GeoTestCase; + +public class GeoWithinTestCase extends GeoTestCase { + private static Logger logger = Logger.getLogger(GeoWithinTestCase.class); + + private volatile int count; + private volatile boolean eventArrived; + + @Test + public void testPoint() throws Exception { + logger.info("TestPoint"); + + data.clear(); + expectedResult.clear(); + eventCount = 0; + data.add(new Object[]{"km-4354", 0.5d, 0.5d}); + expectedResult.add(true); + data.add(new Object[]{"km-4354", 2d, 2d}); + expectedResult.add(false); + data.add(new Object[]{"km-4354", -0.5d, 1.5d}); + expectedResult.add(false); + data.add(new Object[]{"km-4354", 0.5d, 1.25d}); + expectedResult.add(true); + data.add(new Object[]{"km-4354", 0.75d, 0.5d}); + expectedResult.add(true); + data.add(new Object[]{"km-4354", 3.5d, 0.5d}); + expectedResult.add(false); + + String executionPlan = "@config(async = 'true') define stream dataIn (id string, longitude double, latitude double);" + + "@info(name = 'query1') from dataIn " + + "select geo:within(longitude,latitude,\"{'type':'Polygon','coordinates':[[[0,0],[0,2],[1,2],[1,0],[0,0]]]}\") as notify \n" + + " \n" + + "insert into dataOut"; + + long start = System.currentTimeMillis(); + ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan); + long end = System.currentTimeMillis(); + logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f))); + executionPlanRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + for (Event event : inEvents) { + logger.info(event); + Boolean isWithin = (Boolean) event.getData(0); + Assert.assertEquals(expectedResult.get(eventCount++), isWithin); + } + } + }); + executionPlanRuntime.start(); + generateEvents(executionPlanRuntime); + Thread.sleep(1000); + Assert.assertEquals(expectedResult.size(), eventCount); + } + + @Test + public void testPoint2() throws Exception { + logger.info("TestPoint2"); + data.clear(); + expectedResult.clear(); + eventCount = 0; + data.add(new Object[]{"km-4354", 0.75d, 1d, "{'type':'Polygon','coordinates':[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[0.75,0.5],[0.5,0.5]]]}"}); + expectedResult.add(false); + data.add(new Object[]{"km-4354", 1d, 1d, "{'type': 'Circle', 'radius': 110575, 'coordinates':[1.5, 1.5]}"}); + expectedResult.add(true); + data.add(new Object[]{"km-4354", 3d, 3d, "{'type': 'Circle', 'radius': 110575, 'coordinates':[0.5, 1.5]}"}); + expectedResult.add(false); + data.add(new Object[]{"km-4354", 0.6d, 1.0d, "{'type':'MultiPolygon','coordinates':[[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[0.75,0.5],[0.5,0.5]]], [[[1, 1],[1,2],[2,2],[2,1],[1,1]]]]}"}); + expectedResult.add(true); + data.add(new Object[]{"km-4354", 1.5d, 1.5d, "{'type':'MultiPolygon','coordinates':[[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[0.75,0.5],[0.5,0.5]]], [[[1, 1],[1,2],[2,2],[2,1],[1,1]]]]}"}); + expectedResult.add(true); + data.add(new Object[]{"km-4354", -0.5d, 1.5d, "{'type':'MultiPolygon','coordinates':[[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[0.75,0.5],[0.5,0.5]]], [[[1, 1],[1,2],[2,2],[2,1],[1,1]]]]}"}); + expectedResult.add(false); + + String executionPlan = "@config(async = 'true') define stream dataIn (id string, longitude double, latitude double, geometry string);" + + "@info(name = 'query1') from dataIn " + + "select geo:within(longitude, latitude, geometry) as notify \n" + + " \n" + + "insert into dataOut"; + + long start = System.currentTimeMillis(); + ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan); + long end = System.currentTimeMillis(); + logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f))); + executionPlanRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + for (Event event : inEvents) { + logger.info(event); + Boolean isWithin = (Boolean) event.getData(0); + Assert.assertEquals(expectedResult.get(eventCount++), isWithin); + } + } + }); + executionPlanRuntime.start(); + generateEvents(executionPlanRuntime); + Thread.sleep(1000); + Assert.assertEquals(expectedResult.size(), eventCount); + } + + @Test + public void testGeometry() throws Exception { + logger.info("TestGeometry"); + + data.clear(); + expectedResult.clear(); + eventCount = 0; + data.add(new Object[]{"km-4354", "{'type':'Polygon','coordinates':[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[0.75,0.5],[0.5,0.5]]]}"}); + expectedResult.add(true); + data.add(new Object[]{"km-4354", "{'type': 'Circle', 'radius': 110575, 'coordinates':[1.5, 1.5]}"}); + expectedResult.add(true); + data.add(new Object[]{"km-4354", "{'type': 'Circle', 'radius': 110575, 'coordinates':[0.5, 1.5]}"}); + expectedResult.add(false); + data.add(new Object[]{"km-4354", "{'type':'Point', 'coordinates':[-1,1]}"}); + expectedResult.add(false); + data.add(new Object[]{"km-4354", "{'type':'MultiPolygon','coordinates':[[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[0.75,0.5],[0.5,0.5]]], [[[1, 1],[1,2],[2,2],[2,1],[1,1]]]]}"}); + expectedResult.add(true); + data.add(new Object[]{"km-4354", "{'type':'Point', 'coordinates':[1,1]}"}); + expectedResult.add(true); + + String executionPlan = "@config(async = 'true') define stream dataIn (id string, geometry string);" + + "@info(name = 'query1') from dataIn " + + "select geo:within(geometry,\"{'type':'Polygon','coordinates':[[[0,0],[0,4],[3,4],[3,0],[0,0]]]}\") as notify \n" + + " \n" + + "insert into dataOut"; + + long start = System.currentTimeMillis(); + ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan); + long end = System.currentTimeMillis(); + logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f))); + executionPlanRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + for (Event event : inEvents) { + logger.info(event); + Boolean isWithin = (Boolean) event.getData(0); + Assert.assertEquals(expectedResult.get(eventCount++), isWithin); + } + } + }); + executionPlanRuntime.start(); + generateEvents(executionPlanRuntime); + Thread.sleep(1000); + Assert.assertEquals(expectedResult.size(), eventCount); + } + + @Test + public void testGeometry2() throws Exception { + logger.info("TestGeometry"); + + data.clear(); + expectedResult.clear(); + eventCount = 0; + data.add(new Object[]{"km-4354", "{'type':'Point', 'coordinates':[1.0, 1.5]}", "{'type':'Polygon','coordinates':[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[0.75,0.5],[0.5,0.5]]]}"}); + expectedResult.add(false); + data.add(new Object[]{"km-4354", "{'type':'Point', 'coordinates':[1.5, 1.0]}", "{'type': 'Circle', 'radius': 110575, 'coordinates':[1.5, 1.5]}"}); + expectedResult.add(true); + data.add(new Object[]{"km-4354", "{'type': 'Circle', 'radius': 10, 'coordinates':[0.5, 1.5]}", "{'type': 'Circle', 'radius': 110575, 'coordinates':[0.5, 1.5]}"}); + expectedResult.add(true); + data.add(new Object[]{"km-4354", "{'type': 'Circle', 'radius': 20, 'coordinates':[0.5, 1.5]}", "{'type': 'Circle', 'radius': 10, 'coordinates':[0.5, 1.5]}"}); + expectedResult.add(false); + data.add(new Object[]{"km-4354", "{'type':'Point', 'coordinates':[-0.5, 1.0]}", "{'type':'MultiPolygon','coordinates':[[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[0.75,0.5],[0.5,0.5]]], [[[1, 1],[1,2],[2,2],[2,1],[1,1]]]]}"}); + expectedResult.add(false); + data.add(new Object[]{"km-4354", "{'type':'Polygon','coordinates':[[[0.5, 0.5],[0.5, -0.5],[-0.5, -0.5],[-0.5, 0.5], [0.5, 0.5]]]}", "{'type': 'Circle', 'radius': 110575, 'coordinates':[0, 0]}"}); + expectedResult.add(true); + + String executionPlan = "@config(async = 'true') define stream dataIn (id string, geometry string, otherGeometry string);" + + "@info(name = 'query1') from dataIn " + + "select geo:within(geometry,otherGeometry) as notify \n" + + " \n" + + "insert into dataOut"; + + long start = System.currentTimeMillis(); + ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan); + long end = System.currentTimeMillis(); + logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f))); + executionPlanRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + for (Event event : inEvents) { + logger.info(event); + Boolean isWithin = (Boolean) event.getData(0); + Assert.assertEquals(expectedResult.get(eventCount++), isWithin); + } + } + }); + executionPlanRuntime.start(); + generateEvents(executionPlanRuntime); + Thread.sleep(1000); + Assert.assertEquals(expectedResult.size(), eventCount); + } + + @Test + public void testGeometry3() throws Exception { + logger.info("TestGeometryJoin"); + + count = 0; + eventArrived = false; + String executionPlan = "" + + "define stream dataIn (id string, latitude double, longitude double); " + + "define stream dataToTable (id string, geometry string); " + + "" + + "define table dataTable (id string, geometry string); " + + "" + + "from dataToTable " + + "insert into dataTable; " + + "" + + "@info(name = 'query1') " + + "from dataIn join dataTable " + + "on geo:within(dataIn.latitude, dataIn.longitude, dataTable.geometry) " + + "select dataIn.id as dataInID, dataTable.id as dataTableID " + + "insert into dataOut; "; + + ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan); + + InputHandler dataIn = executionPlanRuntime.getInputHandler("dataIn"); + InputHandler dataToTable = executionPlanRuntime.getInputHandler("dataToTable"); + + executionPlanRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + EventPrinter.print(timeStamp, inEvents, removeEvents); + for (Event event : inEvents) { + eventArrived = true; + count++; + } + } + }); + executionPlanRuntime.start(); + Thread.sleep(1000); + + dataToTable.send(new Object[]{"1", "{'type': 'Circle', 'radius': 110575, 'coordinates':[1.5, 1.5]}"}); + dataToTable.send(new Object[]{"2", "{'type': 'Circle', 'radius': 110575, 'coordinates':[12.5, 1.5]}"}); + Thread.sleep(1000); + dataIn.send(new Object[]{"3", 1.5, 1.0}); + dataIn.send(new Object[]{"4", 7.5, 1.0}); + Thread.sleep(1000); + executionPlanRuntime.shutdown(); + Assert.assertEquals(1, count); + Assert.assertEquals(true, eventArrived); + } +} \ No newline at end of file diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoCrossesTestCase.java b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoCrossesTestCase.java new file mode 100644 index 0000000000..a149bd5200 --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoCrossesTestCase.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.wso2.gpl.siddhi.extensions.geo.stream; + +import org.apache.log4j.Logger; +import org.junit.Assert; +import org.junit.Test; +import org.wso2.siddhi.core.ExecutionPlanRuntime; +import org.wso2.siddhi.core.event.Event; +import org.wso2.siddhi.core.query.output.callback.QueryCallback; +import org.wso2.gpl.siddhi.extensions.geo.GeoTestCase; + +public class GeoCrossesTestCase extends GeoTestCase { + private static Logger logger = Logger.getLogger(GeoCrossesTestCase.class); + @Test + public void testCrosses() throws Exception { + logger.info("TestCrosses"); + + data.clear(); + expectedResult.clear(); + eventCount = 0; + + data.add(new Object[]{"km-4354", -0.5d, 0.5d}); + data.add(new Object[]{"km-4354", 1.5d, 0.5d}); + expectedResult.add(true); + data.add(new Object[]{"km-4354", 1.5d, 0.25d}); + data.add(new Object[]{"tc-1729", 0.5d, 0.5d}); + expectedResult.add(true); + data.add(new Object[]{"tc-1729", -0.5d, 0.5d}); + expectedResult.add(false); + data.add(new Object[]{"km-4354", 1.3d, 0.1d}); + data.add(new Object[]{"km-4354", 3d, 0.25d}); + expectedResult.add(false); + data.add(new Object[]{"km-4354", 4d, 0.25d}); + data.add(new Object[]{"km-4354", 1d, 0.5d}); + expectedResult.add(true); + + String executionPlan = "@config(async = 'true') define stream dataIn (id string, longitude double, latitude double);" + + "@info(name = 'query1') from dataIn#geo:crosses(id,longitude,latitude,\"{'type':'Polygon','coordinates':[[[0, 0],[2, 0],[2, 1],[0, 1],[0, 0]]]}\") " + + "select crosses \n" + + "insert into dataOut"; + + long start = System.currentTimeMillis(); + ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan); + long end = System.currentTimeMillis(); + logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f))); + executionPlanRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + for (Event event : inEvents) { + Boolean isWithin = (Boolean) event.getData(0); + Assert.assertEquals(expectedResult.get(eventCount++), isWithin); + } + } + }); + executionPlanRuntime.start(); + generateEvents(executionPlanRuntime); + Thread.sleep(1000); + Assert.assertEquals(expectedResult.size(), eventCount); + } +} \ No newline at end of file diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoProximityTestCase.java b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoProximityTestCase.java new file mode 100644 index 0000000000..2a043a52b6 --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoProximityTestCase.java @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.wso2.gpl.siddhi.extensions.geo.stream; + +import org.apache.log4j.Logger; +import org.junit.Assert; +import org.junit.Test; +import org.wso2.siddhi.core.ExecutionPlanRuntime; +import org.wso2.siddhi.core.event.Event; +import org.wso2.siddhi.core.query.output.callback.QueryCallback; +import org.wso2.siddhi.core.util.EventPrinter; +import org.wso2.gpl.siddhi.extensions.geo.GeoTestCase; + +import java.util.ArrayList; + +public class GeoProximityTestCase extends GeoTestCase { + private static Logger logger = Logger.getLogger(GeoProximityTestCase.class); + ArrayList expectedResultId; + @Test + public void testProximity() throws Exception { + logger.info("TestProximity"); + + data.clear(); + expectedResultId = new ArrayList(); + eventCount = 0; + + data.add(new Object[]{"1", 0d, 0d}); + data.add(new Object[]{"2", 1d, 1d}); + data.add(new Object[]{"3", 2d, 2d}); + data.add(new Object[]{"1", 1.5d, 1.5d}); + expectedResultId.add("2"); + expectedResult.add(true); + expectedResultId.add("3"); + expectedResult.add(true); + data.add(new Object[]{"1", 1.6d, 1.6d}); + data.add(new Object[]{"2", 5d, 5d}); + expectedResultId.add("1"); + expectedResult.add(false); + data.add(new Object[]{"1", 2d, 2d}); + data.add(new Object[]{"1", 5.5d, 5.5d}); + expectedResultId.add("2"); + expectedResult.add(true); + expectedResultId.add("3"); + expectedResult.add(false); + + String executionPlan = "@config(async = 'true')" + + "define stream dataIn (id string, longitude double, latitude double);" + + + "@info(name = 'query1') " + + "from dataIn#geo:proximity(id,longitude,latitude, 110574.61087757687) " + + "select inCloseProximity, proximityWith \n" + + "insert into dataOut"; + + long start = System.currentTimeMillis(); + ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan); + long end = System.currentTimeMillis(); + logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f))); + executionPlanRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + EventPrinter.print(timeStamp, inEvents, removeEvents); + for (Event event : inEvents) { + Boolean proximity = (Boolean) event.getData(0); + Assert.assertEquals(expectedResult.get(eventCount), proximity); + String other = (String) event.getData(1); + Assert.assertEquals(expectedResultId.get(eventCount), other); + eventCount++; + } + } + }); + executionPlanRuntime.start(); + generateEvents(executionPlanRuntime); + Thread.sleep(1000); + Assert.assertEquals(expectedResult.size(), eventCount); + } +} \ No newline at end of file diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoStationaryTestCase.java b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoStationaryTestCase.java new file mode 100644 index 0000000000..bf78594036 --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/stream/GeoStationaryTestCase.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.wso2.gpl.siddhi.extensions.geo.stream; + +import org.apache.log4j.Logger; +import org.junit.Assert; +import org.junit.Test; +import org.wso2.siddhi.core.ExecutionPlanRuntime; +import org.wso2.siddhi.core.event.Event; +import org.wso2.siddhi.core.query.output.callback.QueryCallback; +import org.wso2.gpl.siddhi.extensions.geo.GeoTestCase; + +public class GeoStationaryTestCase extends GeoTestCase { + private static Logger logger = Logger.getLogger(GeoStationaryTestCase.class); + @Test + public void testStationary() throws Exception { + logger.info("TestStationary"); + + data.clear(); + expectedResult.clear(); + eventCount = 0; + + data.add(new Object[]{"km-4354", 0d, 0d}); + data.add(new Object[]{"km-4354", 1d, 1d}); + data.add(new Object[]{"km-4354", 1d, 1.5d}); + expectedResult.add(true); + data.add(new Object[]{"km-4354", 1d, 1.75d}); + data.add(new Object[]{"km-4354", 1d, 2.5d}); + expectedResult.add(false); + data.add(new Object[]{"km-4354", 1d, 2.3d}); + expectedResult.add(true); + data.add(new Object[]{"km-4354", 1d, 2.2d}); + data.add(new Object[]{"km-4354", 1d, 2.6d}); + data.add(new Object[]{"km-4354", 1d, 3.6d}); + expectedResult.add(false); + + String executionPlan = "@config(async = 'true') define stream dataIn (id string, longitude double, latitude double);" + + "@info(name = 'query1') from dataIn#geo:stationary(id,longitude,latitude, 110574.61087757687) " + + "select stationary \n" + + "insert into dataOut"; + + long start = System.currentTimeMillis(); + ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan); + long end = System.currentTimeMillis(); + logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f))); + executionPlanRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + for (Event event : inEvents) { + Boolean isWithin = (Boolean) event.getData(0); + Assert.assertEquals(expectedResult.get(eventCount++), isWithin); + } + } + }); + executionPlanRuntime.start(); + generateEvents(executionPlanRuntime); + Thread.sleep(1000); + Assert.assertEquals(expectedResult.size(), eventCount); + } +} \ No newline at end of file diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/stream/function/AverageLocationTestCase.java b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/stream/function/AverageLocationTestCase.java new file mode 100644 index 0000000000..bc60ffc112 --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/stream/function/AverageLocationTestCase.java @@ -0,0 +1,161 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.gpl.siddhi.extensions.geo.stream.function; + +import junit.framework.Assert; +import org.apache.log4j.Logger; +import org.junit.Before; +import org.junit.Test; +import org.wso2.siddhi.core.ExecutionPlanRuntime; +import org.wso2.siddhi.core.SiddhiManager; +import org.wso2.siddhi.core.event.Event; +import org.wso2.siddhi.core.query.output.callback.QueryCallback; +import org.wso2.siddhi.core.stream.input.InputHandler; +import org.wso2.siddhi.core.util.EventPrinter; + + +public class AverageLocationTestCase { + static final Logger log = Logger.getLogger(AverageLocationTestCase.class); + private volatile int count; + private volatile boolean eventArrived; + + @Before + public void init() { + count = 0; + eventArrived = false; + } + + @Test + public void testAverageLocationWithSameWeightsTestCase() throws InterruptedException { + log.info("testAverageLocation with same weight TestCase"); + SiddhiManager siddhiManager = new SiddhiManager(); + + ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime( + "@config(async = 'true') " + + "define stream cleanedStream (locationRecorder string, latitude double, longitude double, " + + "beaconProximity string, uuid string, weight double, timestamp long); " + + "@info(name = 'query1') " + + "from cleanedStream#geo:locationApproximate(locationRecorder, latitude, longitude, " + + "beaconProximity, uuid, weight, timestamp) " + + "select * " + + "insert into dataOut;"); + + executionPlanRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + EventPrinter.print(timeStamp, inEvents, removeEvents); + for (Event event : inEvents) { + count++; + if (count == 1) { + Assert.assertEquals(6.876657000000001, event.getData(7)); + Assert.assertEquals(79.897648, event.getData(8)); + eventArrived = true; + } else if (count == 2) { + Assert.assertEquals(6.797727042508542, event.getData(7)); + Assert.assertEquals(80.13557409252783, event.getData(8)); + eventArrived = true; + } else if (count == 3) { + Assert.assertEquals(6.853572272662002, event.getData(7)); + Assert.assertEquals(true, 80.34826512892124 == (Double) event.getData(8) || 80.34826512892126 == (Double) event.getData(8)); + eventArrived = true; + } else if (count == 4) { + Assert.assertEquals(true, 8.026326160526303 == (Double) event.getData(7) || 8.0263261605263 == (Double) event.getData(7)); + Assert.assertEquals(80.42794459517538, event.getData(8)); + eventArrived = true; + } + } + } + }); + + InputHandler inputHandler = executionPlanRuntime.getInputHandler("cleanedStream"); + executionPlanRuntime.start(); + //nugegods, ratnapura, nuwara eliya, vavniya --> thbuttegama + inputHandler.send(new Object[]{"person1", 6.876657, 79.897648, "ENTER", "uuid1", 20.0d, 1452583935l}); + Thread.sleep(500); + inputHandler.send(new Object[]{"person1", 6.718681, 80.373422, "ENTER", "uuid2", 20.0d, 1452583937l}); + Thread.sleep(500); + inputHandler.send(new Object[]{"person1", 6.964981, 80.773796, "ENTER", "uuid3", 20.0d, 1452583939l}); + Thread.sleep(500); + inputHandler.send(new Object[]{"person1", 8.729925, 80.475966, "ENTER", "uuid4", 100.0d, 1452583941l}); + Thread.sleep(100); + + Assert.assertEquals(4, count); + Assert.assertTrue(eventArrived); + executionPlanRuntime.shutdown(); + } + + @Test + public void testAverageLocationWithDifferentWeightsTestCase2() throws InterruptedException { + log.info("testAverageLocation with different weights TestCase"); + SiddhiManager siddhiManager = new SiddhiManager(); + + ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime( + "@config(async = 'true') " + + "define stream cleanedStream (locationRecorder string, latitude double, longitude double, " + + "beaconProximity string, uuid string, weight double, timestamp long); " + + "@info(name = 'query1') " + + "from cleanedStream#geo:locationApproximate(locationRecorder, latitude, longitude, " + + "beaconProximity, uuid, weight, timestamp) " + + "select * " + + "insert into dataOut;"); + + executionPlanRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + EventPrinter.print(timeStamp, inEvents, removeEvents); + for (Event event : inEvents) { + count++; + if (count == 1) { + Assert.assertEquals(6.876657000000001, event.getData(7)); + Assert.assertEquals(79.897648, event.getData(8)); + eventArrived = true; + } else if (count == 2) { + Assert.assertEquals(6.797727042508542, event.getData(7)); + Assert.assertEquals(80.13557409252783, event.getData(8)); + eventArrived = true; + } else if (count == 3) { + Assert.assertEquals(6.853572272662002, event.getData(7)); + Assert.assertEquals(true, 80.34826512892124 == (Double) event.getData(8) || 80.34826512892126 == (Double) event.getData(8)); + eventArrived = true; + } else if (count == 4) { + Assert.assertEquals(7.322639705655454, event.getData(7)); + Assert.assertEquals(80.38008364787895, event.getData(8)); + eventArrived = true; + } + } + } + }); + + InputHandler inputHandler = executionPlanRuntime.getInputHandler("cleanedStream"); + executionPlanRuntime.start(); + //nugegods, ratnapura, nuwara eliya, vavniya --> kegalle + inputHandler.send(new Object[]{"person1", 6.876657, 79.897648, "ENTER", "uuid1", 20.0d, 1452583935l}); + Thread.sleep(500); + inputHandler.send(new Object[]{"person1", 6.718681, 80.373422, "ENTER", "uuid2", 20.0d, 1452583937l}); + Thread.sleep(500); + inputHandler.send(new Object[]{"person1", 6.964981, 80.773796, "ENTER", "uuid3", 20.0d, 1452583939l}); + Thread.sleep(500); + inputHandler.send(new Object[]{"person1", 8.729925, 80.475966, "ENTER", "uuid4", 20.0d, 1452583941l}); + Thread.sleep(100); + + Assert.assertEquals(4, count); + Assert.assertTrue(eventArrived); + executionPlanRuntime.shutdown(); + } +} diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/stream/function/GeoClosetPointTestCase.java b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/stream/function/GeoClosetPointTestCase.java new file mode 100644 index 0000000000..ae8023b853 --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/java/org/wso2/gpl/siddhi/extensions/geo/stream/function/GeoClosetPointTestCase.java @@ -0,0 +1,183 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.wso2.gpl.siddhi.extensions.geo.stream.function; + +import com.vividsolutions.jts.geom.Coordinate; +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.geom.prep.PreparedGeometry; +import com.vividsolutions.jts.operation.distance.DistanceOp; +import org.apache.log4j.Logger; +import org.junit.Assert; +import org.junit.Test; +import org.wso2.siddhi.core.ExecutionPlanRuntime; +import org.wso2.siddhi.core.event.Event; +import org.wso2.siddhi.core.query.output.callback.QueryCallback; +import org.wso2.siddhi.core.util.EventPrinter; +import org.wso2.gpl.siddhi.extensions.geo.GeoTestCase; +import org.wso2.gpl.siddhi.extensions.geo.internal.util.GeometryUtils; + +public class GeoClosetPointTestCase extends GeoTestCase { + private static Logger logger = Logger.getLogger(GeoClosetPointTestCase.class); + + +// @Test +// public void testClosestBasic() throws Exception { +// logger.info("testClosest"); +// +// PreparedGeometry geometry = GeometryUtils.preparedGeometryFromJSON(" {'type':'Polygon','coordinates':[[[0,0],[0,2],[1,2],[1,0],[0,0]]]}"); +// Geometry point = GeometryUtils.createPoint(-1, -1); +// +// DistanceOp distOp = new DistanceOp(geometry.getGeometry(), point); +// +// Coordinate[] closestPt = distOp.nearestPoints(); +// System.out.println(closestPt[0]); +// +// } + + @Test + public void testClosestPoints() throws Exception { + logger.info("testClosestPoints"); + + data.clear(); + expectedResult.clear(); + eventCount = 0; + data.add(new Object[]{"km-4354", 0.5d, 0.5d}); + expectedResult.add(true); + data.add(new Object[]{"km-4354", 2d, 2d}); + expectedResult.add(false); + data.add(new Object[]{"km-4354", -0.5d, 1.5d}); + expectedResult.add(false); + data.add(new Object[]{"km-4354", 0.5d, 1.25d}); + expectedResult.add(true); + data.add(new Object[]{"km-4354", 0.0d, 0.0d}); + expectedResult.add(true); + data.add(new Object[]{"km-4354", -1d, -1d}); + expectedResult.add(false); + + String executionPlan = "@config(async = 'true') define stream dataIn (id string, longitude double, latitude double);" + + "@info(name = 'query1') from dataIn#geo:closestPoints(longitude,latitude,\"{'type':'Polygon','coordinates':[[[0,0],[0,2],[1,2],[1,0],[0,0]]]}\") " + + "select closestPointOf1From2Latitude, closestPointOf1From2Longitude, closestPointOf2From1Latitude, closestPointOf2From1Longitude " + + " \n" + + "insert into dataOut"; + + long start = System.currentTimeMillis(); + ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan); + final long end = System.currentTimeMillis(); + logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f))); + executionPlanRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + EventPrinter.print(timeStamp, inEvents, removeEvents); + for (Event event : inEvents) { + eventCount++; + switch (eventCount){ + case 1: + Assert.assertArrayEquals(new Object[]{0.5,0.5,0.5,0.5},event.getData()); + break; + case 2: + Assert.assertArrayEquals(new Object[]{2.0, 2.0, 1.0, 2.0},event.getData()); + break; + case 3: + Assert.assertArrayEquals(new Object[]{-0.5, 1.5, 0.0, 1.5},event.getData()); + break; + case 4: + Assert.assertArrayEquals(new Object[]{0.5, 1.25, 0.5, 1.25},event.getData()); + break; + case 5: + Assert.assertArrayEquals(new Object[]{0.0, 0.0, 0.0, 0.0},event.getData()); + break; + case 6: + Assert.assertArrayEquals(new Object[]{-1.0, -1.0, 0.0, 0.0},event.getData()); + break; + + } + } + } + }); + executionPlanRuntime.start(); + generateEvents(executionPlanRuntime); + Thread.sleep(1000); + Assert.assertEquals(expectedResult.size(), eventCount); + } + + + @Test + public void testClosestPointsGeometry() throws Exception { + logger.info("testClosestPointsGeometry"); + + data.clear(); + expectedResult.clear(); + eventCount = 0; + data.add(new Object[]{"km-4354", "{'type':'Polygon','coordinates':[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[0.75,0.5],[0.5,0.5]]]}"}); + expectedResult.add(true); + data.add(new Object[]{"km-4354", "{'type': 'Circle', 'radius': 110575, 'coordinates':[1.5, 1.5]}"}); + expectedResult.add(true); + data.add(new Object[]{"km-4354", "{'type': 'Circle', 'radius': 110575, 'coordinates':[10, 1.5]}"}); + expectedResult.add(false); + data.add(new Object[]{"km-4354", "{'type':'Point', 'coordinates':[-1,1]}"}); + expectedResult.add(false); + data.add(new Object[]{"km-4354", "{'type':'MultiPolygon','coordinates':[[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[5,5],[0.5,0.5]]], [[[1, 1],[1,2],[2,2],[2,1],[1,1]]]]}"}); + expectedResult.add(true); + data.add(new Object[]{"km-4354", "{'type':'Point', 'coordinates':[10,10]}"}); + expectedResult.add(true); + + String executionPlan = "@config(async = 'true') define stream dataIn (id string, geometry string);" + + "@info(name = 'query1') from dataIn#geo:closestPoints(geometry,\"{'type':'Polygon','coordinates':[[[0,0],[0,4],[3,4],[3,0],[0,0]]]}\") " + + "select closestPointOf1From2Latitude, closestPointOf1From2Longitude, closestPointOf2From1Latitude, closestPointOf2From1Longitude \n" + + "insert into dataOut"; + + long start = System.currentTimeMillis(); + ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan); + long end = System.currentTimeMillis(); + logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f))); + executionPlanRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { + EventPrinter.print(timeStamp, inEvents, removeEvents); + for (Event event : inEvents) { + eventCount++; + switch (eventCount){ + case 1: + Assert.assertArrayEquals(new Object[]{0.5, 0.5, 0.5, 0.5},event.getData()); + break; + case 2: + Assert.assertArrayEquals(new Object[]{2.5000035190937595, 1.5, 2.5000035190937595, 1.5},event.getData()); + break; + case 3: + Assert.assertArrayEquals(new Object[]{8.99999648090624, 1.5000000000000007, 3.0, 1.5000000000000009},event.getData()); + break; + case 4: + Assert.assertArrayEquals(new Object[]{-1.0, 1.0, 0.0, 1.0},event.getData()); + break; + case 5: + Assert.assertArrayEquals(new Object[]{0.5, 0.5, 0.5, 0.5},event.getData()); + break; + case 6: + Assert.assertArrayEquals(new Object[]{10.0, 10.0, 3.0, 4.0},event.getData()); + break; + + } + } + } + }); + executionPlanRuntime.start(); + generateEvents(executionPlanRuntime); + Thread.sleep(1000); + Assert.assertEquals(expectedResult.size(), eventCount); + } +} \ No newline at end of file diff --git a/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/resources/log4j.properties b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/resources/log4j.properties new file mode 100644 index 0000000000..f42abcae28 --- /dev/null +++ b/components/extensions/siddhi-extensions/org.wso2.gpl.siddhi.extension.geo.script/src/main/resources/scripts/gpl-siddhi-geo-extention/org.wso2.gpl.siddhi.extension.geo.plugin/src/test/resources/log4j.properties @@ -0,0 +1,35 @@ +# +# Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +# +# WSO2 Inc. licenses this file to you under the Apache License, +# Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# For the general syntax of property based configuration files see the +# documenation of org.apache.log4j.PropertyConfigurator. + +# The root category uses the appender called A1. Since no priority is +# specified, the root category assumes the default priority for root +# which is DEBUG in log4j. The root category is the only category that +# has a default priority. All other categories need not be assigned a +# priority in which case they inherit their priority from the +# hierarchy. + +#log4j.rootLogger=DEBUG, stdout +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +#log4j.appender.stdout.layout.ConversionPattern=%m%n +log4j.appender.stdout.layout.ConversionPattern=[%t] %-5p %c %x - %m%n