Fixing geo script issue

revert-dabc3590
Rasika Perera 8 years ago
parent 9c23a39709
commit 575e4932e6

@ -0,0 +1,115 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.gpl.siddhi.extensions.geo.function;
import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.function.FunctionExecutor;
import org.wso2.gpl.siddhi.extensions.geo.internal.util.GeoOperation;
import org.wso2.siddhi.query.api.definition.Attribute.Type;
public abstract class AbstractGeoOperationExecutor extends FunctionExecutor {
GeoOperation geoOperation;
/**
* The initialization method for FunctionExecutor, this method will be called before the other methods
*
* @param attributeExpressionExecutors are the executors of each function parameters
* @param executionPlanContext the context of the execution plan
*/
@Override
protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
this.geoOperation.init(attributeExpressionExecutors, 0, attributeExpressionExecutors.length);
}
/**
* The main executions method which will be called upon event arrival
*
* @param data the runtime values of the attributeExpressionExecutors
* @return
*/
@Override
protected Object execute(Object[] data) {
return geoOperation.process(data);
}
/**
* The main execution method which will be called upon event arrival
* which has zero or one function parameter
*
* @param data null if the function parameter count is zero or
* runtime data value of the function parameter
* @return the function result
*/
@Override
protected Object execute(Object data) {
throw new IllegalStateException(this.getClass().getCanonicalName() + " cannot execute data " + data);
}
/**
* This will be called only once, to acquire required resources
* after initializing the system and before processing the events.
*/
@Override
public void start() {
}
/**
* This will be called only once, to release the acquired resources
* before shutting down the system.
*/
@Override
public void stop() {
}
/**
* The serializable state of the element, that need to be
* persisted for the reconstructing the element to the same state
* on a different point of time
*
* @return stateful objects of the element as an array
*/
@Override
public Object[] currentState() {
return new Object[0];
}
/**
* The serialized state of the element, for reconstructing
* the element to the same state as if was on a previous point of time.
*
* @param state the stateful objects of the element as an array on
* the same order provided by currentState().
*/
@Override
public void restoreState(Object[] state) {
}
//TODO: look into cloning
public Type getReturnType() {
return geoOperation.getReturnType();
}
}

@ -0,0 +1,124 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.gpl.siddhi.extensions.geo.function;
import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.function.FunctionExecutor;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
/**
* geo:distance(location1Latitude, location1Longitude, location2Latitude, location2Longitude)
*
* This method gives the distance between two geo locations in meters
*
* location1Latitude - latitude value of 1st location
* location1Longitude - longitude value of 1st location
* location2Latitude - latitude value of 2nd location
* location2Longitude - longitude value of 2nd location
*
* Accept Type(s) for geo:distance(location1Latitude, location1Longitude, location2Latitude, location2Longitude);
* location1Latitude : DOUBLE
* location1Longitude : DOUBLE
* location2Latitude : DOUBLE
* location2Longitude : DOUBLE
*
* Return Type(s): DOUBLE
*/
public class GeoDistanceFunctionExecutor extends FunctionExecutor {
Attribute.Type returnType = Attribute.Type.DOUBLE;
@Override
public void start() {
}
@Override
public void stop() {
}
@Override
public Object[] currentState() {
return new Object[0];
}
@Override
public void restoreState(Object[] state) {
}
@Override
protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
if (attributeExpressionExecutors.length != 4) {
throw new ExecutionPlanValidationException("Invalid no of arguments passed to " +
"geo:distance() function, " +
"requires 4, but found " + attributeExpressionExecutors.length);
}
}
@Override
protected Object execute(Object[] data) {
if (data[0] == null) {
throw new ExecutionPlanRuntimeException("Invalid input given to geo:distance() " +
"function. First argument should be double");
}
if (data[1] == null) {
throw new ExecutionPlanRuntimeException("Invalid input given to geo:distance() " +
"function. Second argument should be double");
}
if (data[2] == null) {
throw new ExecutionPlanRuntimeException("Invalid input given to geo:distance() " +
"function. Third argument should be double");
}
if (data[3] == null) {
throw new ExecutionPlanRuntimeException("Invalid input given to geo:distance() " +
"function. Fourth argument should be double");
}
double latitude = (Double) data[0];
double longitude = (Double) data[1];
double prevLatitude = (Double) data[2];
double prevLongitude = (Double) data[3];
int R = 6371000; // Radius of the earth in m
latitude = latitude * (Math.PI / 180);
prevLatitude = prevLatitude * (Math.PI / 180);
longitude = longitude * (Math.PI / 180);
prevLongitude = prevLongitude * (Math.PI / 180);
double dlon = prevLongitude - longitude;
double dlat = prevLatitude - latitude;
double a = Math.sin(dlat / 2) * Math.sin(dlat / 2) + Math.cos(latitude) * Math.cos(prevLatitude) *
Math.sin(dlon / 2) * Math.sin(dlon / 2);
double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
return R * c;
}
@Override
protected Object execute(Object data) {
return null;
}
@Override
public Attribute.Type getReturnType() {
return returnType;
}
}

@ -0,0 +1,28 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.gpl.siddhi.extensions.geo.function;
import org.wso2.gpl.siddhi.extensions.geo.internal.util.IntersectsOperation;
public class GeoIntersectsFunctionExecutor extends AbstractGeoOperationExecutor {
public GeoIntersectsFunctionExecutor(){
this.geoOperation = new IntersectsOperation();
}
}

@ -0,0 +1,44 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.gpl.siddhi.extensions.geo.function;
import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.exception.ExecutionPlanCreationException;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.gpl.siddhi.extensions.geo.internal.util.WithinDistanceOperation;
import org.wso2.siddhi.query.api.definition.Attribute;
public class GeoWithinDistanceFunctionExecutor extends AbstractGeoOperationExecutor {
public GeoWithinDistanceFunctionExecutor() {
this.geoOperation = new WithinDistanceOperation();
}
/**
* The initialization method for FunctionExecutor, this method will be called before the other methods
*
* @param attributeExpressionExecutors are the executors of each function parameters
* @param executionPlanContext the context of the execution plan
*/
@Override
protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
this.geoOperation.init(attributeExpressionExecutors, 0, attributeExpressionExecutors.length);
if (attributeExpressionExecutors[attributeExpressionExecutors.length - 1].getReturnType() != Attribute.Type.DOUBLE) {
throw new ExecutionPlanCreationException("Last argument should be a double");
}
}
}

@ -0,0 +1,27 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.gpl.siddhi.extensions.geo.function;
import org.wso2.gpl.siddhi.extensions.geo.internal.util.WithinOperation;
public class GeoWithinFunctionExecutor extends AbstractGeoOperationExecutor {
public GeoWithinFunctionExecutor() {
this.geoOperation = new WithinOperation();
}
}

@ -0,0 +1,43 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.gpl.siddhi.extensions.geo.internal.util;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.geom.prep.PreparedGeometry;
import com.vividsolutions.jts.operation.distance.DistanceOp;
import org.wso2.siddhi.query.api.definition.Attribute;
public class ClosestOperation extends org.wso2.gpl.siddhi.extensions.geo.internal.util.GeoOperation {
@Override
public Object operation(Geometry a, Geometry b, Object[] data) {
DistanceOp distOp = new DistanceOp(a, b);
return distOp.nearestPoints();
}
@Override
public Object operation(Geometry a, PreparedGeometry b, Object[] data) {
DistanceOp distOp = new DistanceOp(a, b.getGeometry());
return distOp.nearestPoints();
}
@Override
public Attribute.Type getReturnType() {
return null;
}
}

@ -0,0 +1,92 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.gpl.siddhi.extensions.geo.internal.util;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.geom.prep.PreparedGeometry;
import org.wso2.siddhi.core.exception.ExecutionPlanCreationException;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.query.api.definition.Attribute;
public abstract class GeoOperation {
public boolean point = false;
protected Object data;
public PreparedGeometry geometry = null;
public void init(ExpressionExecutor[] attributeExpressionExecutors, int start, int end) {
int position = start;
if (attributeExpressionExecutors[position].getReturnType() == Attribute.Type.DOUBLE) {
point = true;
if (attributeExpressionExecutors[position + 1].getReturnType() != Attribute.Type.DOUBLE) {
throw new ExecutionPlanCreationException("Longitude and Latitude must be provided as double values");
}
++position;
} else if (attributeExpressionExecutors[position].getReturnType() == Attribute.Type.STRING) {
point = false;
} else {
throw new ExecutionPlanCreationException((position + 1) +
" parameter should be a string for a geometry or a double for a latitude");
}
++position;
if (position >= end) {
return;
}
if (attributeExpressionExecutors[position].getReturnType() != Attribute.Type.STRING) {
throw new ExecutionPlanCreationException((position + 1) + " parameter should be a GeoJSON geometry string");
}
if (attributeExpressionExecutors[position] instanceof ConstantExpressionExecutor) {
String strGeometry = attributeExpressionExecutors[position].execute(null).toString();
geometry = GeometryUtils.preparedGeometryFromJSON(strGeometry);
}
}
public Object process(Object[] data) {
Geometry currentGeometry;
if (point) {
double longitude = (Double) data[0];
double latitude = (Double) data[1];
currentGeometry = GeometryUtils.createPoint(longitude, latitude);
} else {
currentGeometry = GeometryUtils.geometryFromJSON(data[0].toString());
}
if (geometry != null) {
return operation(currentGeometry, geometry, data);
} else {
return operation(currentGeometry, GeometryUtils.geometryFromJSON(data[point ? 2 : 1].toString()),
data);
}
}
public Geometry getCurrentGeometry(Object[] data) {
if (point) {
double longitude = (Double) data[0];
double latitude = (Double) data[1];
return GeometryUtils.createPoint(longitude, latitude);
} else {
return GeometryUtils.createGeometry(data[0]);
}
}
public abstract Object operation(Geometry a, Geometry b, Object[] data);
public abstract Object operation(Geometry a, PreparedGeometry b, Object[] data);
public abstract Attribute.Type getReturnType();
}

@ -0,0 +1,87 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.gpl.siddhi.extensions.geo.internal.util;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.vividsolutions.jts.geom.Coordinate;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.geom.GeometryFactory;
import com.vividsolutions.jts.geom.Point;
import com.vividsolutions.jts.geom.prep.PreparedGeometry;
import com.vividsolutions.jts.geom.prep.PreparedGeometryFactory;
import org.geotools.geojson.geom.GeometryJSON;
import org.geotools.geometry.jts.JTSFactoryFinder;
import java.io.IOException;
import java.io.StringWriter;
public class GeometryUtils {
public static final double TO_DEGREE = 110574.61087757687;
private static final String COORDINATES = "coordinates";
private static final String RADIUS = "radius";
private static GeometryFactory geometryFactory = JTSFactoryFinder.getGeometryFactory();
//JTSFactoryFinder.getGeometryFactory(new Hints(Hints.CRS, DefaultGeographicCRS.WGS84));
private static GeometryJSON geometryJSON = new GeometryJSON(10);
public static Geometry geometryFromJSON(String strGeometry) {
if (strGeometry.contains(RADIUS)) {
JsonObject jsonObject = new JsonParser().parse(strGeometry).getAsJsonObject();
JsonArray jLocCoordinatesArray = jsonObject.getAsJsonArray(COORDINATES);
Coordinate coords = new Coordinate(Double.parseDouble(jLocCoordinatesArray.get(0)
.toString()), Double.parseDouble(jLocCoordinatesArray.get(1).toString()));
Point point = geometryFactory.createPoint(coords); // create the points for GeoJSON file points
double radius = Double.parseDouble(jsonObject.get(RADIUS).toString()) / TO_DEGREE; //convert to degrees
return point.buffer(radius); //draw the buffer
} else {
try {
return geometryJSON.read(strGeometry.replace("'", "\""));
} catch (IOException e) {
throw new RuntimeException("Failed to create a geometry from given str " + strGeometry, e);
}
}
}
public static String geometrytoJSON(Geometry geometry) {
StringWriter sw = new StringWriter();
try {
geometryJSON.write(geometry, sw);
} catch (IOException e) {
throw new RuntimeException("Failed to create a json string from given geometry " + geometry, e);
}
return sw.toString();
}
public static PreparedGeometry preparedGeometryFromJSON(String strGeometry) {
return PreparedGeometryFactory.prepare(geometryFromJSON(strGeometry));
}
public static Point createPoint(double longitude, double latitude) {
return geometryFactory.createPoint(new Coordinate(longitude, latitude));
}
public static Geometry createGeometry(Object data){
if(data instanceof Geometry) {
return (Geometry) data;
}
else {
return geometryFromJSON(data.toString());
}
}
}

@ -0,0 +1,40 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.gpl.siddhi.extensions.geo.internal.util;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.geom.prep.PreparedGeometry;
import org.wso2.siddhi.query.api.definition.Attribute;
public class IntersectsOperation extends GeoOperation {
@Override
public Object operation(Geometry a, Geometry b, Object[] data) {
return a.intersects(b);
}
@Override
public Object operation(Geometry a, PreparedGeometry b, Object[] data) {
return b.intersects(a);
}
@Override
public Attribute.Type getReturnType() {
return Attribute.Type.BOOL;
}
}

@ -0,0 +1,40 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.gpl.siddhi.extensions.geo.internal.util;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.geom.prep.PreparedGeometry;
import org.wso2.siddhi.query.api.definition.Attribute;
public class WithinDistanceOperation extends GeoOperation {
@Override
public Object operation(Geometry a, Geometry b, Object[] data) {
return a.isWithinDistance(b, (Double) data[data.length - 1] / GeometryUtils.TO_DEGREE);
}
@Override
public Object operation(Geometry a, PreparedGeometry b, Object[] data) {
return a.isWithinDistance(b.getGeometry(), (Double) data[data.length - 1] / GeometryUtils.TO_DEGREE);
}
@Override
public Attribute.Type getReturnType() {
return Attribute.Type.BOOL;
}
}

@ -0,0 +1,40 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.gpl.siddhi.extensions.geo.internal.util;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.geom.prep.PreparedGeometry;
import org.wso2.siddhi.query.api.definition.Attribute;
public class WithinOperation extends GeoOperation {
@Override
public Object operation(Geometry a, Geometry b, Object[] data) {
return a.within(b);
}
@Override
public Object operation(Geometry a, PreparedGeometry b, Object[] data) {
return b.contains(a);
}
@Override
public Attribute.Type getReturnType() {
return Attribute.Type.BOOL;
}
}

@ -0,0 +1,138 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.gpl.siddhi.extensions.geo.stream;
import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.event.stream.populater.ComplexEventPopulater;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.stream.StreamProcessor;
import org.wso2.gpl.siddhi.extensions.geo.internal.util.GeoOperation;
import org.wso2.gpl.siddhi.extensions.geo.internal.util.WithinOperation;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.definition.Attribute.Type;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class GeoCrossesStreamProcessor extends StreamProcessor {
private GeoOperation geoOperation;
private Set<String> set = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
/**
* The init method of the StreamProcessor, this method will be called before other methods
*
* @param inputDefinition the incoming stream definition
* @param attributeExpressionExecutors the executors of each function parameters
* @param executionPlanContext the context of the execution plan
* @return the additional output attributes introduced by the function
*/
@Override
protected List<Attribute> init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
geoOperation = new WithinOperation();
geoOperation.init(attributeExpressionExecutors, 1, attributeExpressionLength);
List<Attribute> attributeList = new ArrayList<Attribute>(1);
attributeList.add(new Attribute("crosses", Type.BOOL));
return attributeList;
}
/**
* This will be called only once, to acquire required resources
* after initializing the system and before processing the events.
*/
@Override
public void start() {
}
/**
* This will be called only once, to release the acquired resources
* before shutting down the system.
*/
@Override
public void stop() {
}
/**
* The serializable state of the element, that need to be
* persisted for the reconstructing the element to the same state
* on a different point of time
*
* @return stateful objects of the element as an array
*/
@Override
public Object[] currentState() {
return new Object[0];
}
/**
* The serialized state of the element, for reconstructing
* the element to the same state as if was on a previous point of time.
*
* @param state the stateful objects of the element as an array on
* the same order provided by currentState().
*/
@Override
public void restoreState(Object[] state) {
}
@Override
protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) {
while (streamEventChunk.hasNext()) {
ComplexEvent complexEvent = streamEventChunk.next();
Object[] data = new Object[attributeExpressionLength - 1];
for (int i = 1; i < attributeExpressionLength; i++) {
data[i-1] = attributeExpressionExecutors[i].execute(complexEvent);
}
boolean within = (Boolean)geoOperation.process(data);
String id = attributeExpressionExecutors[0].execute(complexEvent).toString();
if (set.contains(id)) {
if (!within) {
//alert out
complexEventPopulater.populateComplexEvent(complexEvent, new Object[]{within});
set.remove(id);
} else {
streamEventChunk.remove();
}
} else {
if (within) {
//alert in
complexEventPopulater.populateComplexEvent(complexEvent, new Object[]{within});
set.add(id);
} else {
streamEventChunk.remove();
}
}
}
nextProcessor.process(streamEventChunk);
}
}

@ -0,0 +1,246 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.gpl.siddhi.extensions.geo.stream;
import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.query.processor.stream.function.StreamFunctionProcessor;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* geoLocationApproximate(locationRecorder, latitude, longitude, sensorProximity, sensorUUID, sensorWeight, timestamp)
*
* This method computed the average location of the locationRecorder using the collection iBeacons which the location
* recorder resides.
*
* locationRecorder - unique id of the object or item
* latitude - latitude value of the iBeacon
* longitude - longitude value of the iBeacon
* sensorProximity - proximity which will be given by the iBeacon (eg: ENTER, RANGE, EXIT)
* sensorUUID - unique id of the iBeacon
* sensorWeight - weight of the iBeacon which influence the averaging of the location (eg: approximate distance from
* the beacon
* timestamp - timestamp of the log which will be used to remove iBeacon from one's collection when there is no
* new log for 5 minutes
*
* Accept Type(s) for geoLocationApproximate(locationRecorder, latitude, longitude, sensorProximity, sensorUUID,
* sensorWeight, timestamp);
* locationRecorder : STRING
* latitude : DOUBLE
* longitude : DOUBLE
* sensorProximity : STRING
* sensorUUID : STRING
* sensorWeight : DOUBLE
* timestamp : LONG
*
* Return Type(s): DOUBLE, DOUBLE, BOOL
*
*/
public class GeoLocationApproximateStreamProcessor extends StreamFunctionProcessor {
//locationRecorder,uuid -> BeaconValueHolder
private Map<String, Map<String, BeaconValueHolder>>
personSpecificRecordLocatorMaps;
@Override
public void start() {
}
@Override
public void stop() {
}
@Override
public Object[] currentState() {
return new Object[]{personSpecificRecordLocatorMaps};
}
@Override
public void restoreState(Object[] state) {
personSpecificRecordLocatorMaps = (Map<String, Map<String, BeaconValueHolder>>) state[0];
}
@Override
protected Object[] process(Object[] data) {
if (data[0] == null) {
throw new ExecutionPlanRuntimeException("Invalid input given to geo:locationApproximate() " +
"function. First argument should be string");
}
if (data[1] == null) {
throw new ExecutionPlanRuntimeException("Invalid input given to geo:locationApproximate() " +
"function. Second argument should be double");
}
if (data[2] == null) {
throw new ExecutionPlanRuntimeException("Invalid input given to geo:locationApproximate() " +
"function. Third argument should be double");
}
if (data[3] == null) {
throw new ExecutionPlanRuntimeException("Invalid input given to geo:locationApproximate() " +
"function. Forth argument should be string");
}
if (data[4] == null) {
throw new ExecutionPlanRuntimeException("Invalid input given to geo:locationApproximate() " +
"function. Fifth argument should be string");
}
if (data[5] == null) {
throw new ExecutionPlanRuntimeException("Invalid input given to geo:locationApproximate() " +
"function. Sixth argument should be double");
}
if (data[6] == null) {
throw new ExecutionPlanRuntimeException("Invalid input given to geo:locationApproximate() " +
"function. Seventh argument should be long");
}
String locationRecorder = (String) data[0];
double latitude = (Double) data[1];
double longitude = (Double) data[2];
String beaconProximity = (String) data[3];
String uuid = (String) data[4];
double weight = (Double) data[5]; //is calculated previously eg: distance
long timestamp = (Long) data[6];
if (personSpecificRecordLocatorMaps.get(locationRecorder) == null) {
personSpecificRecordLocatorMaps.put(locationRecorder, new ConcurrentHashMap<String, BeaconValueHolder>());
}
//both "enter" and "range" attributes are there in the logs I retrieve when it comes to cleaning logs
if ("ENTER".equalsIgnoreCase(beaconProximity) || "RANGE".equalsIgnoreCase(beaconProximity)) {
if (personSpecificRecordLocatorMaps.get(locationRecorder).containsKey(uuid)) {
BeaconValueHolder tempBeaconValue = personSpecificRecordLocatorMaps.get(locationRecorder).get(uuid);
if (tempBeaconValue.getLastUpdatedTime() < timestamp) {
BeaconValueHolder beaconValueHolder = new BeaconValueHolder(latitude, longitude, timestamp, weight);
personSpecificRecordLocatorMaps.get(locationRecorder).put(uuid, beaconValueHolder);
}
} else {
BeaconValueHolder beaconValueHolder = new BeaconValueHolder(latitude, longitude, timestamp, weight);
personSpecificRecordLocatorMaps.get(locationRecorder).put(uuid, beaconValueHolder);
}
} else {
if (personSpecificRecordLocatorMaps.get(locationRecorder).containsKey(uuid)) {
BeaconValueHolder tempBeaconValue = personSpecificRecordLocatorMaps.get(locationRecorder).get(uuid);
if (tempBeaconValue.getLastUpdatedTime() < timestamp) {
personSpecificRecordLocatorMaps.get(locationRecorder).remove(uuid);
}
}
}
int noOfSensors = personSpecificRecordLocatorMaps.get(locationRecorder).size();
double sensorValues[][] = new double[noOfSensors][3];
int actualNoOfSensors = 0;
double totalWeight = 0;
for (Map.Entry<String, BeaconValueHolder> beaconLocation : personSpecificRecordLocatorMaps.get(locationRecorder).entrySet()) {
BeaconValueHolder beaconValueHolder = beaconLocation.getValue();
long prevTimestamp = beaconValueHolder.getLastUpdatedTime();
if ((timestamp - prevTimestamp) > 300000) {
//if there is a beacon which has a log older than 5 minutes, removing the beacon assuming the
//device has gone away from that beacon
personSpecificRecordLocatorMaps.get(locationRecorder).remove(beaconLocation.getKey());
} else {
sensorValues[actualNoOfSensors][0] = beaconValueHolder.getLatitude();
sensorValues[actualNoOfSensors][1] = beaconValueHolder.getLongitude();
sensorValues[actualNoOfSensors][2] = beaconValueHolder.getBeaconDistance();
totalWeight += beaconValueHolder.getBeaconDistance();
actualNoOfSensors++;
}
}
if (actualNoOfSensors == 0) {
return new Object[]{latitude, longitude, false};
}
double tempLatitude, tempLongitude;
double x = 0;
double y = 0;
double z = 0;
for (int i = 0; i < actualNoOfSensors; i++) {
weight = sensorValues[i][2] / totalWeight;
tempLatitude = sensorValues[i][0] * Math.PI / 180.0;
tempLongitude = sensorValues[i][1] * Math.PI / 180.0;
x += Math.cos(tempLatitude) * Math.cos(tempLongitude) * weight;
y += Math.cos(tempLatitude) * Math.sin(tempLongitude) * weight;
z += Math.sin(tempLatitude) * weight;
}
longitude = Math.atan2(y, x) * 180 / Math.PI;
double hyp = Math.sqrt(x * x + y * y);
latitude = Math.atan2(z, hyp) * 180 / Math.PI;
return new Object[]{latitude, longitude, true};
}
@Override
protected Object[] process(Object data) {
return new Object[0];
}
@Override
protected List<Attribute> init(AbstractDefinition inputDefinition,
ExpressionExecutor[] attributeExpressionExecutors,
ExecutionPlanContext executionPlanContext) {
personSpecificRecordLocatorMaps = new ConcurrentHashMap<String, Map<String, BeaconValueHolder>>();
if (attributeExpressionExecutors.length != 7) {
throw new ExecutionPlanValidationException("Invalid no of arguments passed to " +
"geo:locationApproximate() function, " +
"requires 7, but found " + attributeExpressionExecutors.length);
}
ArrayList<Attribute> attributes = new ArrayList<Attribute>(3);
attributes.add(new Attribute("averagedLatitude", Attribute.Type.DOUBLE));
attributes.add(new Attribute("averagedLongitude", Attribute.Type.DOUBLE));
attributes.add(new Attribute("averageExist", Attribute.Type.BOOL));
return attributes;
}
private class BeaconValueHolder {
private double latitude;
private double longitude;
private long lastUpdatedTime;
private double beaconDistance;
public BeaconValueHolder(double latitude, double longitude, long lastUpdatedTime, double
beaconDistance) {
this.latitude = latitude;
this.longitude = longitude;
this.lastUpdatedTime = lastUpdatedTime;
this.beaconDistance = beaconDistance;
}
public long getLastUpdatedTime() {
return lastUpdatedTime;
}
public double getBeaconDistance() {
return beaconDistance;
}
public double getLatitude() {
return latitude;
}
public double getLongitude() {
return longitude;
}
}
}

@ -0,0 +1,179 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.gpl.siddhi.extensions.geo.stream;
import com.vividsolutions.jts.geom.Geometry;
import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.event.stream.populater.ComplexEventPopulater;
import org.wso2.siddhi.core.exception.ExecutionPlanCreationException;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.stream.StreamProcessor;
import org.wso2.gpl.siddhi.extensions.geo.internal.util.GeoOperation;
import org.wso2.gpl.siddhi.extensions.geo.internal.util.WithinDistanceOperation;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.definition.Attribute.Type;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class GeoProximityStreamProcessor extends StreamProcessor {
private GeoOperation geoOperation;
private double radius;
private ConcurrentHashMap<String, Geometry> map = new ConcurrentHashMap<String, Geometry>();
private Set<String> set = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
/**
* The init method of the StreamProcessor, this method will be called before other methods
*
* @param inputDefinition the incoming stream definition
* @param attributeExpressionExecutors the executors of each function parameters
* @param executionPlanContext the context of the execution plan
* @return the additional output attributes introduced by the function
*/
@Override
protected List<Attribute> init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
this.geoOperation = new WithinDistanceOperation();
this.geoOperation.init(attributeExpressionExecutors, 1, attributeExpressionLength - 1);
if (attributeExpressionExecutors[attributeExpressionLength - 1].getReturnType() != Type.DOUBLE) {
throw new ExecutionPlanCreationException("Last parameter should be a double");
}
radius = (Double) attributeExpressionExecutors[attributeExpressionLength - 1].execute(null);
ArrayList<Attribute> attributeList = new ArrayList<Attribute>();
attributeList.add(new Attribute("proximityWith", Type.STRING));
attributeList.add(new Attribute("inCloseProximity", Type.BOOL));
return attributeList;
}
/**
* This will be called only once, to acquire required resources
* after initializing the system and before processing the events.
*/
@Override
public void start() {
}
/**
* This will be called only once, to release the acquired resources
* before shutting down the system.
*/
@Override
public void stop() {
}
/**
* The serializable state of the element, that need to be
* persisted for the reconstructing the element to the same state
* on a different point of time
*
* @return stateful objects of the element as an array
*/
@Override
public Object[] currentState() {
return new Object[0];
}
/**
* The serialized state of the element, for reconstructing
* the element to the same state as if was on a previous point of time.
*
* @param state the stateful objects of the element as an array on
* the same order provided by currentState().
*/
@Override
public void restoreState(Object[] state) {
}
/**
* The main processing method that will be called upon event arrival
*
* @param streamEventChunk the event chunk that need to be processed
* @param nextProcessor the next processor to which the success events need to be passed
* @param streamEventCloner helps to clone the incoming event for local storage or modification
* @param complexEventPopulater helps to populate the events with the resultant attributes
*/
@Override
protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) {
while (streamEventChunk.hasNext()) {
StreamEvent streamEvent = streamEventChunk.next();
Geometry currentGeometry, previousGeometry;
Object[] data = new Object[attributeExpressionLength - 1];
for (int i = 1; i < attributeExpressionLength; i++) {
data[i - 1] = attributeExpressionExecutors[i].execute(streamEvent);
}
String currentId = attributeExpressionExecutors[0].execute(streamEvent).toString();
String previousId;
currentGeometry = geoOperation.getCurrentGeometry(data);
if(!map.contains(currentId)) {
map.put(currentId, currentGeometry);
}
for (Map.Entry<String, Geometry> entry : map.entrySet()) {
previousId = entry.getKey();
if (!previousId.equals(currentId)) {
previousGeometry = entry.getValue();
boolean within = (Boolean) geoOperation.operation(currentGeometry, previousGeometry, new Object[]{radius});
String key = makeCompositeKey(currentId, previousId);
boolean contains = set.contains(key);
if (contains) {
if (!within) {
//alert out
StreamEvent newStreamEvent = streamEventCloner.copyStreamEvent(streamEvent);
complexEventPopulater.populateComplexEvent(newStreamEvent, new Object[]{previousId, within});
streamEventChunk.insertBeforeCurrent(newStreamEvent);
set.remove(key);
}
} else {
if (within) {
//alert in
StreamEvent newStreamEvent = streamEventCloner.copyStreamEvent(streamEvent);
complexEventPopulater.populateComplexEvent(newStreamEvent, new Object[]{previousId, within});
streamEventChunk.insertBeforeCurrent(newStreamEvent);
set.add(key);
}
}
}
}
streamEventChunk.remove();
}
nextProcessor.process(streamEventChunk);
}
/*
private Object[] toOutput(Geometry geometry, boolean within) {
if (geoOperation.point) {
return new Object[]{((Point) geometry).getX(), ((Point) geometry).getY(), within};
} else {
return new Object[]{GeometryUtils.geometrytoJSON(geometry), within};
}
}*/
public String makeCompositeKey(String key1, String key2) {
if (key1.compareToIgnoreCase(key2) < 0) {
return key1 + "~" + key2;
} else {
return key2 + "~" + key1;
}
}
}

@ -0,0 +1,154 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.gpl.siddhi.extensions.geo.stream;
import com.vividsolutions.jts.geom.Geometry;
import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.event.stream.populater.ComplexEventPopulater;
import org.wso2.siddhi.core.exception.ExecutionPlanCreationException;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.stream.StreamProcessor;
import org.wso2.gpl.siddhi.extensions.geo.internal.util.GeoOperation;
import org.wso2.gpl.siddhi.extensions.geo.internal.util.WithinDistanceOperation;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.definition.Attribute.Type;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
public class GeoStationaryStreamProcessor extends StreamProcessor {
private GeoOperation geoOperation;
private double radius;
private ConcurrentHashMap<String, Geometry> map = new ConcurrentHashMap<String, Geometry>();
/**
* The init method of the StreamProcessor, this method will be called before other methods
*
* @param inputDefinition the incoming stream definition
* @param attributeExpressionExecutors the executors of each function parameters
* @param executionPlanContext the context of the execution plan
* @return the additional output attributes introduced by the function
*/
@Override
protected List<Attribute> init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
geoOperation = new WithinDistanceOperation();
geoOperation.init(attributeExpressionExecutors, 1, attributeExpressionLength - 1);
if (attributeExpressionExecutors[attributeExpressionLength - 1].getReturnType() != Type.DOUBLE) {
throw new ExecutionPlanCreationException("Last parameter should be a double");
}
radius = (Double) attributeExpressionExecutors[attributeExpressionLength - 1].execute(null);
ArrayList<Attribute> attributeList = new ArrayList<Attribute>(1);
attributeList.add(new Attribute("stationary", Type.BOOL));
return attributeList;
}
/**
* This will be called only once, to acquire required resources
* after initializing the system and before processing the events.
*/
@Override
public void start() {
}
/**
* This will be called only once, to release the acquired resources
* before shutting down the system.
*/
@Override
public void stop() {
}
/**
* The serializable state of the element, that need to be
* persisted for the reconstructing the element to the same state
* on a different point of time
*
* @return stateful objects of the element as an array
*/
@Override
public Object[] currentState() {
return new Object[0];
}
/**
* The serialized state of the element, for reconstructing
* the element to the same state as if was on a previous point of time.
*
* @param state the stateful objects of the element as an array on
* the same order provided by currentState().
*/
@Override
public void restoreState(Object[] state) {
}
@Override
protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) {
while (streamEventChunk.hasNext()) {
ComplexEvent complexEvent = streamEventChunk.next();
Object[] data = new Object[attributeExpressionLength - 1];
for (int i = 1; i < attributeExpressionLength; i++) {
data[i - 1] = attributeExpressionExecutors[i].execute(complexEvent);
}
Geometry currentGeometry = geoOperation.getCurrentGeometry(data);
String id = attributeExpressionExecutors[0].execute(complexEvent).toString();
Geometry previousGeometry = map.get(id);
if (previousGeometry == null) {
currentGeometry.setUserData(false);
map.put(id, currentGeometry);
streamEventChunk.remove();
continue;
}
boolean stationary = (Boolean) geoOperation.operation(currentGeometry, previousGeometry, new Object[]{radius});
if((Boolean)previousGeometry.getUserData()) {
if(!stationary) {
//alert out
complexEventPopulater.populateComplexEvent(complexEvent, new Object[]{stationary});
currentGeometry.setUserData(stationary);
map.put(id, currentGeometry);
} else {
streamEventChunk.remove();
}
} else {
if (stationary) {
//alert in
previousGeometry.setUserData(stationary);
complexEventPopulater.populateComplexEvent(complexEvent, new Object[]{stationary});
} else {
currentGeometry.setUserData(stationary);
map.put(id, currentGeometry);
streamEventChunk.remove();
}
}
}
nextProcessor.process(streamEventChunk);
}
}

@ -0,0 +1,80 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.gpl.siddhi.extensions.geo.stream.function;
import com.vividsolutions.jts.geom.Coordinate;
import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.query.processor.stream.function.StreamFunctionProcessor;
import org.wso2.gpl.siddhi.extensions.geo.internal.util.ClosestOperation;
import org.wso2.gpl.siddhi.extensions.geo.internal.util.GeoOperation;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.Attribute;
import java.util.ArrayList;
import java.util.List;
public class GeoClosestPointsStreamFunctionProcessor extends StreamFunctionProcessor {
GeoOperation geoOperation;
@Override
protected List<Attribute> init(AbstractDefinition abstractDefinition, ExpressionExecutor[] expressionExecutors, ExecutionPlanContext executionPlanContext) {
this.geoOperation = new ClosestOperation();
this.geoOperation.init(attributeExpressionExecutors, 0, attributeExpressionExecutors.length);
List<Attribute> attributeList = new ArrayList<Attribute>(4);
attributeList.add(new Attribute("closestPointOf1From2Latitude", Attribute.Type.DOUBLE));
attributeList.add(new Attribute("closestPointOf1From2Longitude", Attribute.Type.DOUBLE));
attributeList.add(new Attribute("closestPointOf2From1Latitude", Attribute.Type.DOUBLE));
attributeList.add(new Attribute("closestPointOf2From1Longitude", Attribute.Type.DOUBLE));
return attributeList;
}
@Override
protected Object[] process(Object[] data) {
Coordinate[] coordinates = (Coordinate[]) geoOperation.process(data);
return new Object[]{coordinates[0].x, coordinates[0].y, coordinates[1].x, coordinates[1].y};
}
@Override
protected Object[] process(Object o) {
return null;
}
@Override
public void start() {
}
@Override
public void stop() {
}
@Override
public Object[] currentState() {
return new Object[0];
}
@Override
public void restoreState(Object[] objects) {
}
}

@ -0,0 +1,26 @@
#
# Copyright (C) 2015-2016 WSO2 Inc. (http://wso2.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
within=org.wso2.gpl.siddhi.extensions.geo.function.GeoWithinFunctionExecutor
intersects=org.wso2.gpl.siddhi.extensions.geo.function.GeoIntersectsFunctionExecutor
withinDistance=org.wso2.gpl.siddhi.extensions.geo.function.GeoWithinDistanceFunctionExecutor
crosses=org.wso2.gpl.siddhi.extensions.geo.stream.GeoCrossesStreamProcessor
proximity=org.wso2.gpl.siddhi.extensions.geo.stream.GeoProximityStreamProcessor
stationary=org.wso2.gpl.siddhi.extensions.geo.stream.GeoStationaryStreamProcessor
closestPoints=org.wso2.gpl.siddhi.extensions.geo.stream.function.GeoClosestPointsStreamFunctionProcessor
locationApproximate=org.wso2.gpl.siddhi.extensions.geo.stream.GeoLocationApproximateStreamProcessor
distance=org.wso2.gpl.siddhi.extensions.geo.function.GeoDistanceFunctionExecutor

@ -0,0 +1,49 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.gpl.siddhi.extensions.geo;
import org.apache.log4j.Logger;
import org.junit.BeforeClass;
import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.stream.input.InputHandler;
import java.util.ArrayList;
public abstract class GeoTestCase {
protected static SiddhiManager siddhiManager;
private static Logger logger = Logger.getLogger(GeoTestCase.class);
protected static ArrayList<Object[]> data;
protected static ArrayList<Boolean> expectedResult;
protected static int eventCount;
@BeforeClass
public static void setUp() throws Exception {
logger.info("Init Siddhi");// Create Siddhi Manager
siddhiManager = new SiddhiManager();
data = new ArrayList<Object[]>();
expectedResult = new ArrayList<Boolean>();
}
protected void generateEvents(ExecutionPlanRuntime executionPlanRuntime) throws Exception {
InputHandler inputHandler = executionPlanRuntime.getInputHandler("dataIn");
for (Object[] dataLine : data) {
inputHandler.send(dataLine);
}
}
}

@ -0,0 +1,88 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.gpl.siddhi.extensions.geo.function;
import junit.framework.Assert;
import org.apache.log4j.Logger;
import org.junit.Before;
import org.junit.Test;
import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.core.util.EventPrinter;
public class GeoDistanceTestCase {
static final Logger log = Logger.getLogger(GeoDistanceTestCase.class);
private volatile int count;
private volatile boolean eventArrived;
@Before
public void init() {
count = 0;
eventArrived = false;
}
@Test
public void testGeoDistanceTestCase() throws InterruptedException {
log.info("testGeoDistance TestCase");
SiddhiManager siddhiManager = new SiddhiManager();
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(
"@config(async = 'true') " +
"define stream cleanedStream (latitude double, longitude double, prevLatitude double, " +
"prevLongitude double); " +
"@info(name = 'query1') " +
"from cleanedStream " +
"select geo:distance(latitude, longitude, prevLatitude, prevLongitude) as distance " +
"insert into dataOut;");
executionPlanRuntime.addCallback("query1", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
EventPrinter.print(timeStamp, inEvents, removeEvents);
for (Event event : inEvents) {
count++;
if (count == 1) {
Assert.assertEquals(2322119.848252557, event.getData(0));
eventArrived = true;
} else if (count == 2) {
Assert.assertEquals(871946.8734223971, event.getData(0));
eventArrived = true;
}
}
}
});
InputHandler inputHandler = executionPlanRuntime.getInputHandler("cleanedStream");
executionPlanRuntime.start();
//getting distance near equator
inputHandler.send(new Object[]{8.116553, 77.523679, 9.850047, 98.597177});
Thread.sleep(500);
//getting distance away from equator
inputHandler.send(new Object[]{54.432063, 19.669778, 59.971487, 29.958951});
Thread.sleep(100);
Assert.assertEquals(2, count);
Assert.assertTrue(eventArrived);
executionPlanRuntime.shutdown();
}
}

@ -0,0 +1,73 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.gpl.siddhi.extensions.geo.function;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Test;
import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
import org.wso2.gpl.siddhi.extensions.geo.GeoTestCase;
public class GeoIntersectsTestCase extends GeoTestCase {
private static Logger logger = Logger.getLogger(GeoIntersectsTestCase.class);
@Test
public void testGeometry() throws Exception {
logger.info("TestGeometry");
data.clear();
expectedResult.clear();
eventCount = 0;
data.add(new Object[]{"{'type':'Polygon','coordinates':[[[0.5, 0.5],[0.5, 1.5],[1.5, 1.5],[1.5, 0.5],[0.5, 0.5]]]}"});
expectedResult.add(true);
data.add(new Object[]{"{'type':'Circle','coordinates':[-1, -1], 'radius':221148}"});
expectedResult.add(true);
data.add(new Object[]{"{'type':'Point','coordinates':[2, 0]}"});
expectedResult.add(false);
data.add(new Object[]{"{'type':'Polygon','coordinates':[[[2, 2],[2, 1],[1, 1],[1, 2],[2, 2]]]}"});
expectedResult.add(true);
String executionPlan = "@config(async = 'true') define stream dataIn (geometry string);"
+ "@info(name = 'query1') from dataIn" +
" select geo:intersects(geometry, \"{'type':'Polygon','coordinates':[[[0, 0],[0, 1],[1, 1],[1, 0],[0, 0]]]}\") as intersects \n" +
" insert into dataOut";
long start = System.currentTimeMillis();
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
long end = System.currentTimeMillis();
logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f)));
executionPlanRuntime.addCallback("query1", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
for (Event event : inEvents) {
logger.info(event);
Boolean isWithin = (Boolean) event.getData(0);
Assert.assertEquals(expectedResult.get(eventCount++), isWithin);
}
}
});
executionPlanRuntime.start();
generateEvents(executionPlanRuntime);
Thread.sleep(1000);
Assert.assertEquals(expectedResult.size(), eventCount);
}
}

@ -0,0 +1,73 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.gpl.siddhi.extensions.geo.function;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Test;
import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
import org.wso2.gpl.siddhi.extensions.geo.GeoTestCase;
public class GeoWithinDistanceTestCase extends GeoTestCase {
private static Logger logger = Logger.getLogger(GeoWithinDistanceTestCase.class);
@Test
public void testGeometry() throws Exception {
logger.info("TestGeometry");
data.clear();
expectedResult.clear();
eventCount = 0;
data.add(new Object[]{"{'type':'Polygon','coordinates':[[[0.5, 0.5],[0.5, 1.5],[1.5, 1.5],[1.5, 0.5],[0.5, 0.5]]]}"});
expectedResult.add(true);
data.add(new Object[]{"{'type':'Circle','coordinates':[-1, -1], 'radius':110575}"});
expectedResult.add(true);
data.add(new Object[]{"{'type':'Point','coordinates':[3, 0]}"});
expectedResult.add(false);
data.add(new Object[]{"{'type':'Polygon','coordinates':[[[3, 3],[3, 2],[2, 2],[2, 3],[3, 3]]]}"});
expectedResult.add(false);
String executionPlan = "@config(async = 'true') define stream dataIn (geometry string);"
+ "@info(name = 'query1') from dataIn" +
" select geo:withinDistance(geometry, \"{'type':'Polygon','coordinates':[[[0, 0],[0, 1],[1, 1],[1, 0],[0, 0]]]}\", 110574.61087757687) as intersects \n" +
" insert into dataOut";
long start = System.currentTimeMillis();
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
long end = System.currentTimeMillis();
logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f)));
executionPlanRuntime.addCallback("query1", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
for (Event event : inEvents) {
logger.info(event);
Boolean isWithin = (Boolean) event.getData(0);
Assert.assertEquals(expectedResult.get(eventCount++), isWithin);
}
}
});
executionPlanRuntime.start();
generateEvents(executionPlanRuntime);
Thread.sleep(1000);
Assert.assertEquals(expectedResult.size(), eventCount);
}
}

@ -0,0 +1,268 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.gpl.siddhi.extensions.geo.function;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Test;
import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.core.util.EventPrinter;
import org.wso2.gpl.siddhi.extensions.geo.GeoTestCase;
public class GeoWithinTestCase extends GeoTestCase {
private static Logger logger = Logger.getLogger(GeoWithinTestCase.class);
private volatile int count;
private volatile boolean eventArrived;
@Test
public void testPoint() throws Exception {
logger.info("TestPoint");
data.clear();
expectedResult.clear();
eventCount = 0;
data.add(new Object[]{"km-4354", 0.5d, 0.5d});
expectedResult.add(true);
data.add(new Object[]{"km-4354", 2d, 2d});
expectedResult.add(false);
data.add(new Object[]{"km-4354", -0.5d, 1.5d});
expectedResult.add(false);
data.add(new Object[]{"km-4354", 0.5d, 1.25d});
expectedResult.add(true);
data.add(new Object[]{"km-4354", 0.75d, 0.5d});
expectedResult.add(true);
data.add(new Object[]{"km-4354", 3.5d, 0.5d});
expectedResult.add(false);
String executionPlan = "@config(async = 'true') define stream dataIn (id string, longitude double, latitude double);"
+ "@info(name = 'query1') from dataIn " +
"select geo:within(longitude,latitude,\"{'type':'Polygon','coordinates':[[[0,0],[0,2],[1,2],[1,0],[0,0]]]}\") as notify \n" +
" \n" +
"insert into dataOut";
long start = System.currentTimeMillis();
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
long end = System.currentTimeMillis();
logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f)));
executionPlanRuntime.addCallback("query1", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
for (Event event : inEvents) {
logger.info(event);
Boolean isWithin = (Boolean) event.getData(0);
Assert.assertEquals(expectedResult.get(eventCount++), isWithin);
}
}
});
executionPlanRuntime.start();
generateEvents(executionPlanRuntime);
Thread.sleep(1000);
Assert.assertEquals(expectedResult.size(), eventCount);
}
@Test
public void testPoint2() throws Exception {
logger.info("TestPoint2");
data.clear();
expectedResult.clear();
eventCount = 0;
data.add(new Object[]{"km-4354", 0.75d, 1d, "{'type':'Polygon','coordinates':[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[0.75,0.5],[0.5,0.5]]]}"});
expectedResult.add(false);
data.add(new Object[]{"km-4354", 1d, 1d, "{'type': 'Circle', 'radius': 110575, 'coordinates':[1.5, 1.5]}"});
expectedResult.add(true);
data.add(new Object[]{"km-4354", 3d, 3d, "{'type': 'Circle', 'radius': 110575, 'coordinates':[0.5, 1.5]}"});
expectedResult.add(false);
data.add(new Object[]{"km-4354", 0.6d, 1.0d, "{'type':'MultiPolygon','coordinates':[[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[0.75,0.5],[0.5,0.5]]], [[[1, 1],[1,2],[2,2],[2,1],[1,1]]]]}"});
expectedResult.add(true);
data.add(new Object[]{"km-4354", 1.5d, 1.5d, "{'type':'MultiPolygon','coordinates':[[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[0.75,0.5],[0.5,0.5]]], [[[1, 1],[1,2],[2,2],[2,1],[1,1]]]]}"});
expectedResult.add(true);
data.add(new Object[]{"km-4354", -0.5d, 1.5d, "{'type':'MultiPolygon','coordinates':[[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[0.75,0.5],[0.5,0.5]]], [[[1, 1],[1,2],[2,2],[2,1],[1,1]]]]}"});
expectedResult.add(false);
String executionPlan = "@config(async = 'true') define stream dataIn (id string, longitude double, latitude double, geometry string);"
+ "@info(name = 'query1') from dataIn " +
"select geo:within(longitude, latitude, geometry) as notify \n" +
" \n" +
"insert into dataOut";
long start = System.currentTimeMillis();
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
long end = System.currentTimeMillis();
logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f)));
executionPlanRuntime.addCallback("query1", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
for (Event event : inEvents) {
logger.info(event);
Boolean isWithin = (Boolean) event.getData(0);
Assert.assertEquals(expectedResult.get(eventCount++), isWithin);
}
}
});
executionPlanRuntime.start();
generateEvents(executionPlanRuntime);
Thread.sleep(1000);
Assert.assertEquals(expectedResult.size(), eventCount);
}
@Test
public void testGeometry() throws Exception {
logger.info("TestGeometry");
data.clear();
expectedResult.clear();
eventCount = 0;
data.add(new Object[]{"km-4354", "{'type':'Polygon','coordinates':[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[0.75,0.5],[0.5,0.5]]]}"});
expectedResult.add(true);
data.add(new Object[]{"km-4354", "{'type': 'Circle', 'radius': 110575, 'coordinates':[1.5, 1.5]}"});
expectedResult.add(true);
data.add(new Object[]{"km-4354", "{'type': 'Circle', 'radius': 110575, 'coordinates':[0.5, 1.5]}"});
expectedResult.add(false);
data.add(new Object[]{"km-4354", "{'type':'Point', 'coordinates':[-1,1]}"});
expectedResult.add(false);
data.add(new Object[]{"km-4354", "{'type':'MultiPolygon','coordinates':[[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[0.75,0.5],[0.5,0.5]]], [[[1, 1],[1,2],[2,2],[2,1],[1,1]]]]}"});
expectedResult.add(true);
data.add(new Object[]{"km-4354", "{'type':'Point', 'coordinates':[1,1]}"});
expectedResult.add(true);
String executionPlan = "@config(async = 'true') define stream dataIn (id string, geometry string);"
+ "@info(name = 'query1') from dataIn " +
"select geo:within(geometry,\"{'type':'Polygon','coordinates':[[[0,0],[0,4],[3,4],[3,0],[0,0]]]}\") as notify \n" +
" \n" +
"insert into dataOut";
long start = System.currentTimeMillis();
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
long end = System.currentTimeMillis();
logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f)));
executionPlanRuntime.addCallback("query1", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
for (Event event : inEvents) {
logger.info(event);
Boolean isWithin = (Boolean) event.getData(0);
Assert.assertEquals(expectedResult.get(eventCount++), isWithin);
}
}
});
executionPlanRuntime.start();
generateEvents(executionPlanRuntime);
Thread.sleep(1000);
Assert.assertEquals(expectedResult.size(), eventCount);
}
@Test
public void testGeometry2() throws Exception {
logger.info("TestGeometry");
data.clear();
expectedResult.clear();
eventCount = 0;
data.add(new Object[]{"km-4354", "{'type':'Point', 'coordinates':[1.0, 1.5]}", "{'type':'Polygon','coordinates':[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[0.75,0.5],[0.5,0.5]]]}"});
expectedResult.add(false);
data.add(new Object[]{"km-4354", "{'type':'Point', 'coordinates':[1.5, 1.0]}", "{'type': 'Circle', 'radius': 110575, 'coordinates':[1.5, 1.5]}"});
expectedResult.add(true);
data.add(new Object[]{"km-4354", "{'type': 'Circle', 'radius': 10, 'coordinates':[0.5, 1.5]}", "{'type': 'Circle', 'radius': 110575, 'coordinates':[0.5, 1.5]}"});
expectedResult.add(true);
data.add(new Object[]{"km-4354", "{'type': 'Circle', 'radius': 20, 'coordinates':[0.5, 1.5]}", "{'type': 'Circle', 'radius': 10, 'coordinates':[0.5, 1.5]}"});
expectedResult.add(false);
data.add(new Object[]{"km-4354", "{'type':'Point', 'coordinates':[-0.5, 1.0]}", "{'type':'MultiPolygon','coordinates':[[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[0.75,0.5],[0.5,0.5]]], [[[1, 1],[1,2],[2,2],[2,1],[1,1]]]]}"});
expectedResult.add(false);
data.add(new Object[]{"km-4354", "{'type':'Polygon','coordinates':[[[0.5, 0.5],[0.5, -0.5],[-0.5, -0.5],[-0.5, 0.5], [0.5, 0.5]]]}", "{'type': 'Circle', 'radius': 110575, 'coordinates':[0, 0]}"});
expectedResult.add(true);
String executionPlan = "@config(async = 'true') define stream dataIn (id string, geometry string, otherGeometry string);"
+ "@info(name = 'query1') from dataIn " +
"select geo:within(geometry,otherGeometry) as notify \n" +
" \n" +
"insert into dataOut";
long start = System.currentTimeMillis();
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
long end = System.currentTimeMillis();
logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f)));
executionPlanRuntime.addCallback("query1", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
for (Event event : inEvents) {
logger.info(event);
Boolean isWithin = (Boolean) event.getData(0);
Assert.assertEquals(expectedResult.get(eventCount++), isWithin);
}
}
});
executionPlanRuntime.start();
generateEvents(executionPlanRuntime);
Thread.sleep(1000);
Assert.assertEquals(expectedResult.size(), eventCount);
}
@Test
public void testGeometry3() throws Exception {
logger.info("TestGeometryJoin");
count = 0;
eventArrived = false;
String executionPlan = "" +
"define stream dataIn (id string, latitude double, longitude double); " +
"define stream dataToTable (id string, geometry string); " +
"" +
"define table dataTable (id string, geometry string); " +
"" +
"from dataToTable " +
"insert into dataTable; " +
"" +
"@info(name = 'query1') " +
"from dataIn join dataTable " +
"on geo:within(dataIn.latitude, dataIn.longitude, dataTable.geometry) " +
"select dataIn.id as dataInID, dataTable.id as dataTableID " +
"insert into dataOut; ";
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
InputHandler dataIn = executionPlanRuntime.getInputHandler("dataIn");
InputHandler dataToTable = executionPlanRuntime.getInputHandler("dataToTable");
executionPlanRuntime.addCallback("query1", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
EventPrinter.print(timeStamp, inEvents, removeEvents);
for (Event event : inEvents) {
eventArrived = true;
count++;
}
}
});
executionPlanRuntime.start();
Thread.sleep(1000);
dataToTable.send(new Object[]{"1", "{'type': 'Circle', 'radius': 110575, 'coordinates':[1.5, 1.5]}"});
dataToTable.send(new Object[]{"2", "{'type': 'Circle', 'radius': 110575, 'coordinates':[12.5, 1.5]}"});
Thread.sleep(1000);
dataIn.send(new Object[]{"3", 1.5, 1.0});
dataIn.send(new Object[]{"4", 7.5, 1.0});
Thread.sleep(1000);
executionPlanRuntime.shutdown();
Assert.assertEquals(1, count);
Assert.assertEquals(true, eventArrived);
}
}

@ -0,0 +1,76 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.gpl.siddhi.extensions.geo.stream;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Test;
import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
import org.wso2.gpl.siddhi.extensions.geo.GeoTestCase;
public class GeoCrossesTestCase extends GeoTestCase {
private static Logger logger = Logger.getLogger(GeoCrossesTestCase.class);
@Test
public void testCrosses() throws Exception {
logger.info("TestCrosses");
data.clear();
expectedResult.clear();
eventCount = 0;
data.add(new Object[]{"km-4354", -0.5d, 0.5d});
data.add(new Object[]{"km-4354", 1.5d, 0.5d});
expectedResult.add(true);
data.add(new Object[]{"km-4354", 1.5d, 0.25d});
data.add(new Object[]{"tc-1729", 0.5d, 0.5d});
expectedResult.add(true);
data.add(new Object[]{"tc-1729", -0.5d, 0.5d});
expectedResult.add(false);
data.add(new Object[]{"km-4354", 1.3d, 0.1d});
data.add(new Object[]{"km-4354", 3d, 0.25d});
expectedResult.add(false);
data.add(new Object[]{"km-4354", 4d, 0.25d});
data.add(new Object[]{"km-4354", 1d, 0.5d});
expectedResult.add(true);
String executionPlan = "@config(async = 'true') define stream dataIn (id string, longitude double, latitude double);"
+ "@info(name = 'query1') from dataIn#geo:crosses(id,longitude,latitude,\"{'type':'Polygon','coordinates':[[[0, 0],[2, 0],[2, 1],[0, 1],[0, 0]]]}\") " +
"select crosses \n" +
"insert into dataOut";
long start = System.currentTimeMillis();
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
long end = System.currentTimeMillis();
logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f)));
executionPlanRuntime.addCallback("query1", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
for (Event event : inEvents) {
Boolean isWithin = (Boolean) event.getData(0);
Assert.assertEquals(expectedResult.get(eventCount++), isWithin);
}
}
});
executionPlanRuntime.start();
generateEvents(executionPlanRuntime);
Thread.sleep(1000);
Assert.assertEquals(expectedResult.size(), eventCount);
}
}

@ -0,0 +1,91 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.gpl.siddhi.extensions.geo.stream;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Test;
import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
import org.wso2.siddhi.core.util.EventPrinter;
import org.wso2.gpl.siddhi.extensions.geo.GeoTestCase;
import java.util.ArrayList;
public class GeoProximityTestCase extends GeoTestCase {
private static Logger logger = Logger.getLogger(GeoProximityTestCase.class);
ArrayList<String> expectedResultId;
@Test
public void testProximity() throws Exception {
logger.info("TestProximity");
data.clear();
expectedResultId = new ArrayList<String>();
eventCount = 0;
data.add(new Object[]{"1", 0d, 0d});
data.add(new Object[]{"2", 1d, 1d});
data.add(new Object[]{"3", 2d, 2d});
data.add(new Object[]{"1", 1.5d, 1.5d});
expectedResultId.add("2");
expectedResult.add(true);
expectedResultId.add("3");
expectedResult.add(true);
data.add(new Object[]{"1", 1.6d, 1.6d});
data.add(new Object[]{"2", 5d, 5d});
expectedResultId.add("1");
expectedResult.add(false);
data.add(new Object[]{"1", 2d, 2d});
data.add(new Object[]{"1", 5.5d, 5.5d});
expectedResultId.add("2");
expectedResult.add(true);
expectedResultId.add("3");
expectedResult.add(false);
String executionPlan = "@config(async = 'true')" +
"define stream dataIn (id string, longitude double, latitude double);"
+
"@info(name = 'query1') " +
"from dataIn#geo:proximity(id,longitude,latitude, 110574.61087757687) " +
"select inCloseProximity, proximityWith \n" +
"insert into dataOut";
long start = System.currentTimeMillis();
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
long end = System.currentTimeMillis();
logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f)));
executionPlanRuntime.addCallback("query1", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
EventPrinter.print(timeStamp, inEvents, removeEvents);
for (Event event : inEvents) {
Boolean proximity = (Boolean) event.getData(0);
Assert.assertEquals(expectedResult.get(eventCount), proximity);
String other = (String) event.getData(1);
Assert.assertEquals(expectedResultId.get(eventCount), other);
eventCount++;
}
}
});
executionPlanRuntime.start();
generateEvents(executionPlanRuntime);
Thread.sleep(1000);
Assert.assertEquals(expectedResult.size(), eventCount);
}
}

@ -0,0 +1,75 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.gpl.siddhi.extensions.geo.stream;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Test;
import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
import org.wso2.gpl.siddhi.extensions.geo.GeoTestCase;
public class GeoStationaryTestCase extends GeoTestCase {
private static Logger logger = Logger.getLogger(GeoStationaryTestCase.class);
@Test
public void testStationary() throws Exception {
logger.info("TestStationary");
data.clear();
expectedResult.clear();
eventCount = 0;
data.add(new Object[]{"km-4354", 0d, 0d});
data.add(new Object[]{"km-4354", 1d, 1d});
data.add(new Object[]{"km-4354", 1d, 1.5d});
expectedResult.add(true);
data.add(new Object[]{"km-4354", 1d, 1.75d});
data.add(new Object[]{"km-4354", 1d, 2.5d});
expectedResult.add(false);
data.add(new Object[]{"km-4354", 1d, 2.3d});
expectedResult.add(true);
data.add(new Object[]{"km-4354", 1d, 2.2d});
data.add(new Object[]{"km-4354", 1d, 2.6d});
data.add(new Object[]{"km-4354", 1d, 3.6d});
expectedResult.add(false);
String executionPlan = "@config(async = 'true') define stream dataIn (id string, longitude double, latitude double);"
+ "@info(name = 'query1') from dataIn#geo:stationary(id,longitude,latitude, 110574.61087757687) " +
"select stationary \n" +
"insert into dataOut";
long start = System.currentTimeMillis();
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
long end = System.currentTimeMillis();
logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f)));
executionPlanRuntime.addCallback("query1", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
for (Event event : inEvents) {
Boolean isWithin = (Boolean) event.getData(0);
Assert.assertEquals(expectedResult.get(eventCount++), isWithin);
}
}
});
executionPlanRuntime.start();
generateEvents(executionPlanRuntime);
Thread.sleep(1000);
Assert.assertEquals(expectedResult.size(), eventCount);
}
}

@ -0,0 +1,161 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.gpl.siddhi.extensions.geo.stream.function;
import junit.framework.Assert;
import org.apache.log4j.Logger;
import org.junit.Before;
import org.junit.Test;
import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.core.util.EventPrinter;
public class AverageLocationTestCase {
static final Logger log = Logger.getLogger(AverageLocationTestCase.class);
private volatile int count;
private volatile boolean eventArrived;
@Before
public void init() {
count = 0;
eventArrived = false;
}
@Test
public void testAverageLocationWithSameWeightsTestCase() throws InterruptedException {
log.info("testAverageLocation with same weight TestCase");
SiddhiManager siddhiManager = new SiddhiManager();
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(
"@config(async = 'true') " +
"define stream cleanedStream (locationRecorder string, latitude double, longitude double, " +
"beaconProximity string, uuid string, weight double, timestamp long); " +
"@info(name = 'query1') " +
"from cleanedStream#geo:locationApproximate(locationRecorder, latitude, longitude, " +
"beaconProximity, uuid, weight, timestamp) " +
"select * " +
"insert into dataOut;");
executionPlanRuntime.addCallback("query1", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
EventPrinter.print(timeStamp, inEvents, removeEvents);
for (Event event : inEvents) {
count++;
if (count == 1) {
Assert.assertEquals(6.876657000000001, event.getData(7));
Assert.assertEquals(79.897648, event.getData(8));
eventArrived = true;
} else if (count == 2) {
Assert.assertEquals(6.797727042508542, event.getData(7));
Assert.assertEquals(80.13557409252783, event.getData(8));
eventArrived = true;
} else if (count == 3) {
Assert.assertEquals(6.853572272662002, event.getData(7));
Assert.assertEquals(true, 80.34826512892124 == (Double) event.getData(8) || 80.34826512892126 == (Double) event.getData(8));
eventArrived = true;
} else if (count == 4) {
Assert.assertEquals(true, 8.026326160526303 == (Double) event.getData(7) || 8.0263261605263 == (Double) event.getData(7));
Assert.assertEquals(80.42794459517538, event.getData(8));
eventArrived = true;
}
}
}
});
InputHandler inputHandler = executionPlanRuntime.getInputHandler("cleanedStream");
executionPlanRuntime.start();
//nugegods, ratnapura, nuwara eliya, vavniya --> thbuttegama
inputHandler.send(new Object[]{"person1", 6.876657, 79.897648, "ENTER", "uuid1", 20.0d, 1452583935l});
Thread.sleep(500);
inputHandler.send(new Object[]{"person1", 6.718681, 80.373422, "ENTER", "uuid2", 20.0d, 1452583937l});
Thread.sleep(500);
inputHandler.send(new Object[]{"person1", 6.964981, 80.773796, "ENTER", "uuid3", 20.0d, 1452583939l});
Thread.sleep(500);
inputHandler.send(new Object[]{"person1", 8.729925, 80.475966, "ENTER", "uuid4", 100.0d, 1452583941l});
Thread.sleep(100);
Assert.assertEquals(4, count);
Assert.assertTrue(eventArrived);
executionPlanRuntime.shutdown();
}
@Test
public void testAverageLocationWithDifferentWeightsTestCase2() throws InterruptedException {
log.info("testAverageLocation with different weights TestCase");
SiddhiManager siddhiManager = new SiddhiManager();
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(
"@config(async = 'true') " +
"define stream cleanedStream (locationRecorder string, latitude double, longitude double, " +
"beaconProximity string, uuid string, weight double, timestamp long); " +
"@info(name = 'query1') " +
"from cleanedStream#geo:locationApproximate(locationRecorder, latitude, longitude, " +
"beaconProximity, uuid, weight, timestamp) " +
"select * " +
"insert into dataOut;");
executionPlanRuntime.addCallback("query1", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
EventPrinter.print(timeStamp, inEvents, removeEvents);
for (Event event : inEvents) {
count++;
if (count == 1) {
Assert.assertEquals(6.876657000000001, event.getData(7));
Assert.assertEquals(79.897648, event.getData(8));
eventArrived = true;
} else if (count == 2) {
Assert.assertEquals(6.797727042508542, event.getData(7));
Assert.assertEquals(80.13557409252783, event.getData(8));
eventArrived = true;
} else if (count == 3) {
Assert.assertEquals(6.853572272662002, event.getData(7));
Assert.assertEquals(true, 80.34826512892124 == (Double) event.getData(8) || 80.34826512892126 == (Double) event.getData(8));
eventArrived = true;
} else if (count == 4) {
Assert.assertEquals(7.322639705655454, event.getData(7));
Assert.assertEquals(80.38008364787895, event.getData(8));
eventArrived = true;
}
}
}
});
InputHandler inputHandler = executionPlanRuntime.getInputHandler("cleanedStream");
executionPlanRuntime.start();
//nugegods, ratnapura, nuwara eliya, vavniya --> kegalle
inputHandler.send(new Object[]{"person1", 6.876657, 79.897648, "ENTER", "uuid1", 20.0d, 1452583935l});
Thread.sleep(500);
inputHandler.send(new Object[]{"person1", 6.718681, 80.373422, "ENTER", "uuid2", 20.0d, 1452583937l});
Thread.sleep(500);
inputHandler.send(new Object[]{"person1", 6.964981, 80.773796, "ENTER", "uuid3", 20.0d, 1452583939l});
Thread.sleep(500);
inputHandler.send(new Object[]{"person1", 8.729925, 80.475966, "ENTER", "uuid4", 20.0d, 1452583941l});
Thread.sleep(100);
Assert.assertEquals(4, count);
Assert.assertTrue(eventArrived);
executionPlanRuntime.shutdown();
}
}

@ -0,0 +1,183 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.gpl.siddhi.extensions.geo.stream.function;
import com.vividsolutions.jts.geom.Coordinate;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.geom.prep.PreparedGeometry;
import com.vividsolutions.jts.operation.distance.DistanceOp;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Test;
import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
import org.wso2.siddhi.core.util.EventPrinter;
import org.wso2.gpl.siddhi.extensions.geo.GeoTestCase;
import org.wso2.gpl.siddhi.extensions.geo.internal.util.GeometryUtils;
public class GeoClosetPointTestCase extends GeoTestCase {
private static Logger logger = Logger.getLogger(GeoClosetPointTestCase.class);
// @Test
// public void testClosestBasic() throws Exception {
// logger.info("testClosest");
//
// PreparedGeometry geometry = GeometryUtils.preparedGeometryFromJSON(" {'type':'Polygon','coordinates':[[[0,0],[0,2],[1,2],[1,0],[0,0]]]}");
// Geometry point = GeometryUtils.createPoint(-1, -1);
//
// DistanceOp distOp = new DistanceOp(geometry.getGeometry(), point);
//
// Coordinate[] closestPt = distOp.nearestPoints();
// System.out.println(closestPt[0]);
//
// }
@Test
public void testClosestPoints() throws Exception {
logger.info("testClosestPoints");
data.clear();
expectedResult.clear();
eventCount = 0;
data.add(new Object[]{"km-4354", 0.5d, 0.5d});
expectedResult.add(true);
data.add(new Object[]{"km-4354", 2d, 2d});
expectedResult.add(false);
data.add(new Object[]{"km-4354", -0.5d, 1.5d});
expectedResult.add(false);
data.add(new Object[]{"km-4354", 0.5d, 1.25d});
expectedResult.add(true);
data.add(new Object[]{"km-4354", 0.0d, 0.0d});
expectedResult.add(true);
data.add(new Object[]{"km-4354", -1d, -1d});
expectedResult.add(false);
String executionPlan = "@config(async = 'true') define stream dataIn (id string, longitude double, latitude double);"
+ "@info(name = 'query1') from dataIn#geo:closestPoints(longitude,latitude,\"{'type':'Polygon','coordinates':[[[0,0],[0,2],[1,2],[1,0],[0,0]]]}\") " +
"select closestPointOf1From2Latitude, closestPointOf1From2Longitude, closestPointOf2From1Latitude, closestPointOf2From1Longitude " +
" \n" +
"insert into dataOut";
long start = System.currentTimeMillis();
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
final long end = System.currentTimeMillis();
logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f)));
executionPlanRuntime.addCallback("query1", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
EventPrinter.print(timeStamp, inEvents, removeEvents);
for (Event event : inEvents) {
eventCount++;
switch (eventCount){
case 1:
Assert.assertArrayEquals(new Object[]{0.5,0.5,0.5,0.5},event.getData());
break;
case 2:
Assert.assertArrayEquals(new Object[]{2.0, 2.0, 1.0, 2.0},event.getData());
break;
case 3:
Assert.assertArrayEquals(new Object[]{-0.5, 1.5, 0.0, 1.5},event.getData());
break;
case 4:
Assert.assertArrayEquals(new Object[]{0.5, 1.25, 0.5, 1.25},event.getData());
break;
case 5:
Assert.assertArrayEquals(new Object[]{0.0, 0.0, 0.0, 0.0},event.getData());
break;
case 6:
Assert.assertArrayEquals(new Object[]{-1.0, -1.0, 0.0, 0.0},event.getData());
break;
}
}
}
});
executionPlanRuntime.start();
generateEvents(executionPlanRuntime);
Thread.sleep(1000);
Assert.assertEquals(expectedResult.size(), eventCount);
}
@Test
public void testClosestPointsGeometry() throws Exception {
logger.info("testClosestPointsGeometry");
data.clear();
expectedResult.clear();
eventCount = 0;
data.add(new Object[]{"km-4354", "{'type':'Polygon','coordinates':[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[0.75,0.5],[0.5,0.5]]]}"});
expectedResult.add(true);
data.add(new Object[]{"km-4354", "{'type': 'Circle', 'radius': 110575, 'coordinates':[1.5, 1.5]}"});
expectedResult.add(true);
data.add(new Object[]{"km-4354", "{'type': 'Circle', 'radius': 110575, 'coordinates':[10, 1.5]}"});
expectedResult.add(false);
data.add(new Object[]{"km-4354", "{'type':'Point', 'coordinates':[-1,1]}"});
expectedResult.add(false);
data.add(new Object[]{"km-4354", "{'type':'MultiPolygon','coordinates':[[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[5,5],[0.5,0.5]]], [[[1, 1],[1,2],[2,2],[2,1],[1,1]]]]}"});
expectedResult.add(true);
data.add(new Object[]{"km-4354", "{'type':'Point', 'coordinates':[10,10]}"});
expectedResult.add(true);
String executionPlan = "@config(async = 'true') define stream dataIn (id string, geometry string);"
+ "@info(name = 'query1') from dataIn#geo:closestPoints(geometry,\"{'type':'Polygon','coordinates':[[[0,0],[0,4],[3,4],[3,0],[0,0]]]}\") " +
"select closestPointOf1From2Latitude, closestPointOf1From2Longitude, closestPointOf2From1Latitude, closestPointOf2From1Longitude \n" +
"insert into dataOut";
long start = System.currentTimeMillis();
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
long end = System.currentTimeMillis();
logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f)));
executionPlanRuntime.addCallback("query1", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
EventPrinter.print(timeStamp, inEvents, removeEvents);
for (Event event : inEvents) {
eventCount++;
switch (eventCount){
case 1:
Assert.assertArrayEquals(new Object[]{0.5, 0.5, 0.5, 0.5},event.getData());
break;
case 2:
Assert.assertArrayEquals(new Object[]{2.5000035190937595, 1.5, 2.5000035190937595, 1.5},event.getData());
break;
case 3:
Assert.assertArrayEquals(new Object[]{8.99999648090624, 1.5000000000000007, 3.0, 1.5000000000000009},event.getData());
break;
case 4:
Assert.assertArrayEquals(new Object[]{-1.0, 1.0, 0.0, 1.0},event.getData());
break;
case 5:
Assert.assertArrayEquals(new Object[]{0.5, 0.5, 0.5, 0.5},event.getData());
break;
case 6:
Assert.assertArrayEquals(new Object[]{10.0, 10.0, 3.0, 4.0},event.getData());
break;
}
}
}
});
executionPlanRuntime.start();
generateEvents(executionPlanRuntime);
Thread.sleep(1000);
Assert.assertEquals(expectedResult.size(), eventCount);
}
}

@ -0,0 +1,35 @@
#
# Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
#
# WSO2 Inc. licenses this file to you under the Apache License,
# Version 2.0 (the "License"); you may not use this file except
# in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# For the general syntax of property based configuration files see the
# documenation of org.apache.log4j.PropertyConfigurator.
# The root category uses the appender called A1. Since no priority is
# specified, the root category assumes the default priority for root
# which is DEBUG in log4j. The root category is the only category that
# has a default priority. All other categories need not be assigned a
# priority in which case they inherit their priority from the
# hierarchy.
#log4j.rootLogger=DEBUG, stdout
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
#log4j.appender.stdout.layout.ConversionPattern=%m%n
log4j.appender.stdout.layout.ConversionPattern=[%t] %-5p %c %x - %m%n
Loading…
Cancel
Save