forked from community/device-mgt-plugins
parent
4713448c4f
commit
965f85a71e
@ -1,139 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
~ Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
~
|
||||
~ WSO2 Inc. licenses this file to you under the Apache License,
|
||||
~ Version 2.0 (the "License"); you may not use this file except
|
||||
~ in compliance with the License.
|
||||
~ You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing,
|
||||
~ software distributed under the License is distributed on an
|
||||
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
~ KIND, either express or implied. See the License for the
|
||||
~ specific language governing permissions and limitations
|
||||
~ under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<groupId>org.wso2.gpl.siddhi.extensions</groupId>
|
||||
<artifactId>siddhi-extension-gpl-geo-parent</artifactId>
|
||||
<version>3.1.0-SNAPSHOT</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<packaging>bundle</packaging>
|
||||
<url>http://wso2.org</url>
|
||||
|
||||
<artifactId>org.wso2.gpl.siddhi.extension.geo.plugin</artifactId>
|
||||
<name>Siddhi Extension - Geo Component</name>
|
||||
|
||||
<description>
|
||||
FunctionExecutors
|
||||
1. GeoWithinFunctionExecutor
|
||||
Input : (longitude double, latitude double, geoJSONGeometryFence string)
|
||||
OR (geoJSONGeometry string, geoJSONGeometryFence string)
|
||||
Output : true if (longitude, latitude) or geoJSONGeometry is within the geoJSONGeometryFence
|
||||
|
||||
2. GeoIntersectsFunctionExecutor
|
||||
Input : (longitude double, latitude double, geoJSONGeometryFence string)
|
||||
OR (geoJSONGeometry string, geoJSONGeometryFence string)
|
||||
Output : true if (longitude, latitude) or geoJSONGeometry intersects the geoJSONGeometryFence
|
||||
|
||||
3. GeoWithinDistanceFunctionExecutor
|
||||
Input : (longitude double, latitude double, geoJSONGeometryFence string, distance double)
|
||||
OR (geoJSONGeometry string, geoJSONGeometryFence string, distance double)
|
||||
Output : true if (longitude, latitude) or geoJSONGeometry is within distance of the geoJSONGeometryFence
|
||||
|
||||
StreamProcessors
|
||||
1. GeoCrossesStreamProcessor
|
||||
Input : (id string, longitude double, latitude double, geoJSONGeometryFence string)
|
||||
OR (id string, geoJSONGeometry string, geoJSONGeometryFence string)
|
||||
Output : an event with `crosses` additional attribute set to true when the object ((longitude, latitude) or geoJSONGeometry)
|
||||
crosses into geoJSONGeometryFence and an event with `crosses` additional attribute set to false
|
||||
when the object crosses out of the geoJSONGeometryFence
|
||||
|
||||
2. GeoStationaryStreamProcessor
|
||||
Input : (id string, longitude double, latitude double, geoJSONGeometryFence string, radius double)
|
||||
OR (id string, geoJSONGeometry string, geoJSONGeometryFence string, radius double)
|
||||
Output : when the object ((longitude, latitude) or geoJSONGeometry) starts being stationary within the radius
|
||||
an event with `stationary` additional attribute set to true. When the object starts to move out of the radius
|
||||
an event with `stationary` additional attribute set to false.
|
||||
|
||||
3. GeoProximityStreamProcessor
|
||||
Input : (id string, longitude double, latitude double, geoJSONGeometryFence string, radius double)
|
||||
OR (id string, geoJSONGeometry string, geoJSONGeometryFence string, radius double)
|
||||
Output : when two objects ((longitude, latitude) or geoJSONGeometry) starts being in close proximity within the radius
|
||||
an event with `inCloseProximity` additional attribute set to true. When the object starts to move out of the radius
|
||||
an event with `inCloseProximity` additional. attribute set to false. On each event, additional attributes
|
||||
`proximityWith` gives the id of the object that this object is in close proximity and `proximityId` is an id unique to
|
||||
the pair of objects
|
||||
|
||||
</description>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.wso2.siddhi</groupId>
|
||||
<artifactId>siddhi-query-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.wso2.siddhi</groupId>
|
||||
<artifactId>siddhi-query-compiler</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.wso2.siddhi</groupId>
|
||||
<artifactId>siddhi-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>log4j</groupId>
|
||||
<artifactId>log4j</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.code.gson</groupId>
|
||||
<artifactId>gson</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.wso2.orbit.org.geotools</groupId>
|
||||
<artifactId>gt-geojson</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.felix</groupId>
|
||||
<artifactId>maven-bundle-plugin</artifactId>
|
||||
<version>2.3.7</version>
|
||||
<extensions>true</extensions>
|
||||
<configuration>
|
||||
<instructions>
|
||||
<Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
|
||||
<Bundle-Name>${project.artifactId}</Bundle-Name>
|
||||
<Bundle-Version>${project.version}</Bundle-Version>
|
||||
<Bundle-Description>Siddhi Extension for Geo Fencing</Bundle-Description>
|
||||
<Export-Package>
|
||||
org.wso2.gpl.siddhi.extensions.geo.*
|
||||
</Export-Package>
|
||||
<Private-Package>
|
||||
org.wso2.gpl.siddhi.extensions.geo.internal.*
|
||||
</Private-Package>
|
||||
<Import-Package>
|
||||
*;resolution:=optional
|
||||
</Import-Package>
|
||||
</instructions>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
@ -1,115 +0,0 @@
|
||||
|
||||
/*
|
||||
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.wso2.gpl.siddhi.extensions.geo.function;
|
||||
|
||||
import org.wso2.siddhi.core.config.ExecutionPlanContext;
|
||||
import org.wso2.siddhi.core.executor.ExpressionExecutor;
|
||||
import org.wso2.siddhi.core.executor.function.FunctionExecutor;
|
||||
import org.wso2.gpl.siddhi.extensions.geo.internal.util.GeoOperation;
|
||||
import org.wso2.siddhi.query.api.definition.Attribute.Type;
|
||||
|
||||
public abstract class AbstractGeoOperationExecutor extends FunctionExecutor {
|
||||
|
||||
GeoOperation geoOperation;
|
||||
|
||||
/**
|
||||
* The initialization method for FunctionExecutor, this method will be called before the other methods
|
||||
*
|
||||
* @param attributeExpressionExecutors are the executors of each function parameters
|
||||
* @param executionPlanContext the context of the execution plan
|
||||
*/
|
||||
@Override
|
||||
protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
|
||||
this.geoOperation.init(attributeExpressionExecutors, 0, attributeExpressionExecutors.length);
|
||||
}
|
||||
/**
|
||||
* The main executions method which will be called upon event arrival
|
||||
*
|
||||
* @param data the runtime values of the attributeExpressionExecutors
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
protected Object execute(Object[] data) {
|
||||
return geoOperation.process(data);
|
||||
}
|
||||
|
||||
/**
|
||||
* The main execution method which will be called upon event arrival
|
||||
* which has zero or one function parameter
|
||||
*
|
||||
* @param data null if the function parameter count is zero or
|
||||
* runtime data value of the function parameter
|
||||
* @return the function result
|
||||
*/
|
||||
@Override
|
||||
protected Object execute(Object data) {
|
||||
throw new IllegalStateException(this.getClass().getCanonicalName() + " cannot execute data " + data);
|
||||
}
|
||||
|
||||
/**
|
||||
* This will be called only once, to acquire required resources
|
||||
* after initializing the system and before processing the events.
|
||||
*/
|
||||
@Override
|
||||
public void start() {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* This will be called only once, to release the acquired resources
|
||||
* before shutting down the system.
|
||||
*/
|
||||
@Override
|
||||
public void stop() {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* The serializable state of the element, that need to be
|
||||
* persisted for the reconstructing the element to the same state
|
||||
* on a different point of time
|
||||
*
|
||||
* @return stateful objects of the element as an array
|
||||
*/
|
||||
@Override
|
||||
public Object[] currentState() {
|
||||
return new Object[0];
|
||||
}
|
||||
|
||||
/**
|
||||
* The serialized state of the element, for reconstructing
|
||||
* the element to the same state as if was on a previous point of time.
|
||||
*
|
||||
* @param state the stateful objects of the element as an array on
|
||||
* the same order provided by currentState().
|
||||
*/
|
||||
@Override
|
||||
public void restoreState(Object[] state) {
|
||||
|
||||
}
|
||||
|
||||
//TODO: look into cloning
|
||||
|
||||
|
||||
public Type getReturnType() {
|
||||
return geoOperation.getReturnType();
|
||||
}
|
||||
|
||||
}
|
@ -1,124 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.wso2.gpl.siddhi.extensions.geo.function;
|
||||
|
||||
import org.wso2.siddhi.core.config.ExecutionPlanContext;
|
||||
import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
|
||||
import org.wso2.siddhi.core.executor.ExpressionExecutor;
|
||||
import org.wso2.siddhi.core.executor.function.FunctionExecutor;
|
||||
import org.wso2.siddhi.query.api.definition.Attribute;
|
||||
import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
|
||||
|
||||
/**
|
||||
* geo:distance(location1Latitude, location1Longitude, location2Latitude, location2Longitude)
|
||||
*
|
||||
* This method gives the distance between two geo locations in meters
|
||||
*
|
||||
* location1Latitude - latitude value of 1st location
|
||||
* location1Longitude - longitude value of 1st location
|
||||
* location2Latitude - latitude value of 2nd location
|
||||
* location2Longitude - longitude value of 2nd location
|
||||
*
|
||||
* Accept Type(s) for geo:distance(location1Latitude, location1Longitude, location2Latitude, location2Longitude);
|
||||
* location1Latitude : DOUBLE
|
||||
* location1Longitude : DOUBLE
|
||||
* location2Latitude : DOUBLE
|
||||
* location2Longitude : DOUBLE
|
||||
*
|
||||
* Return Type(s): DOUBLE
|
||||
*/
|
||||
public class GeoDistanceFunctionExecutor extends FunctionExecutor {
|
||||
Attribute.Type returnType = Attribute.Type.DOUBLE;
|
||||
@Override
|
||||
public void start() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object[] currentState() {
|
||||
return new Object[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public void restoreState(Object[] state) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
|
||||
if (attributeExpressionExecutors.length != 4) {
|
||||
throw new ExecutionPlanValidationException("Invalid no of arguments passed to " +
|
||||
"geo:distance() function, " +
|
||||
"requires 4, but found " + attributeExpressionExecutors.length);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object execute(Object[] data) {
|
||||
if (data[0] == null) {
|
||||
throw new ExecutionPlanRuntimeException("Invalid input given to geo:distance() " +
|
||||
"function. First argument should be double");
|
||||
}
|
||||
if (data[1] == null) {
|
||||
throw new ExecutionPlanRuntimeException("Invalid input given to geo:distance() " +
|
||||
"function. Second argument should be double");
|
||||
}
|
||||
if (data[2] == null) {
|
||||
throw new ExecutionPlanRuntimeException("Invalid input given to geo:distance() " +
|
||||
"function. Third argument should be double");
|
||||
}
|
||||
if (data[3] == null) {
|
||||
throw new ExecutionPlanRuntimeException("Invalid input given to geo:distance() " +
|
||||
"function. Fourth argument should be double");
|
||||
}
|
||||
|
||||
double latitude = (Double) data[0];
|
||||
double longitude = (Double) data[1];
|
||||
double prevLatitude = (Double) data[2];
|
||||
double prevLongitude = (Double) data[3];
|
||||
|
||||
int R = 6371000; // Radius of the earth in m
|
||||
latitude = latitude * (Math.PI / 180);
|
||||
prevLatitude = prevLatitude * (Math.PI / 180);
|
||||
longitude = longitude * (Math.PI / 180);
|
||||
prevLongitude = prevLongitude * (Math.PI / 180);
|
||||
double dlon = prevLongitude - longitude;
|
||||
double dlat = prevLatitude - latitude;
|
||||
double a = Math.sin(dlat / 2) * Math.sin(dlat / 2) + Math.cos(latitude) * Math.cos(prevLatitude) *
|
||||
Math.sin(dlon / 2) * Math.sin(dlon / 2);
|
||||
double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
|
||||
return R * c;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object execute(Object data) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Attribute.Type getReturnType() {
|
||||
return returnType;
|
||||
}
|
||||
|
||||
}
|
@ -1,28 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.wso2.gpl.siddhi.extensions.geo.function;
|
||||
|
||||
import org.wso2.gpl.siddhi.extensions.geo.internal.util.IntersectsOperation;
|
||||
|
||||
public class GeoIntersectsFunctionExecutor extends AbstractGeoOperationExecutor {
|
||||
public GeoIntersectsFunctionExecutor(){
|
||||
this.geoOperation = new IntersectsOperation();
|
||||
}
|
||||
}
|
||||
|
@ -1,44 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.wso2.gpl.siddhi.extensions.geo.function;
|
||||
|
||||
import org.wso2.siddhi.core.config.ExecutionPlanContext;
|
||||
import org.wso2.siddhi.core.exception.ExecutionPlanCreationException;
|
||||
import org.wso2.siddhi.core.executor.ExpressionExecutor;
|
||||
import org.wso2.gpl.siddhi.extensions.geo.internal.util.WithinDistanceOperation;
|
||||
import org.wso2.siddhi.query.api.definition.Attribute;
|
||||
|
||||
public class GeoWithinDistanceFunctionExecutor extends AbstractGeoOperationExecutor {
|
||||
public GeoWithinDistanceFunctionExecutor() {
|
||||
this.geoOperation = new WithinDistanceOperation();
|
||||
}
|
||||
/**
|
||||
* The initialization method for FunctionExecutor, this method will be called before the other methods
|
||||
*
|
||||
* @param attributeExpressionExecutors are the executors of each function parameters
|
||||
* @param executionPlanContext the context of the execution plan
|
||||
*/
|
||||
@Override
|
||||
protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
|
||||
this.geoOperation.init(attributeExpressionExecutors, 0, attributeExpressionExecutors.length);
|
||||
if (attributeExpressionExecutors[attributeExpressionExecutors.length - 1].getReturnType() != Attribute.Type.DOUBLE) {
|
||||
throw new ExecutionPlanCreationException("Last argument should be a double");
|
||||
}
|
||||
}
|
||||
}
|
@ -1,27 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.wso2.gpl.siddhi.extensions.geo.function;
|
||||
|
||||
import org.wso2.gpl.siddhi.extensions.geo.internal.util.WithinOperation;
|
||||
|
||||
public class GeoWithinFunctionExecutor extends AbstractGeoOperationExecutor {
|
||||
public GeoWithinFunctionExecutor() {
|
||||
this.geoOperation = new WithinOperation();
|
||||
}
|
||||
}
|
@ -1,43 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.wso2.gpl.siddhi.extensions.geo.internal.util;
|
||||
|
||||
import com.vividsolutions.jts.geom.Geometry;
|
||||
import com.vividsolutions.jts.geom.prep.PreparedGeometry;
|
||||
import com.vividsolutions.jts.operation.distance.DistanceOp;
|
||||
import org.wso2.siddhi.query.api.definition.Attribute;
|
||||
|
||||
public class ClosestOperation extends org.wso2.gpl.siddhi.extensions.geo.internal.util.GeoOperation {
|
||||
@Override
|
||||
public Object operation(Geometry a, Geometry b, Object[] data) {
|
||||
DistanceOp distOp = new DistanceOp(a, b);
|
||||
return distOp.nearestPoints();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object operation(Geometry a, PreparedGeometry b, Object[] data) {
|
||||
DistanceOp distOp = new DistanceOp(a, b.getGeometry());
|
||||
return distOp.nearestPoints();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Attribute.Type getReturnType() {
|
||||
return null;
|
||||
}
|
||||
}
|
@ -1,92 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.wso2.gpl.siddhi.extensions.geo.internal.util;
|
||||
|
||||
import com.vividsolutions.jts.geom.Geometry;
|
||||
import com.vividsolutions.jts.geom.prep.PreparedGeometry;
|
||||
import org.wso2.siddhi.core.exception.ExecutionPlanCreationException;
|
||||
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
|
||||
import org.wso2.siddhi.core.executor.ExpressionExecutor;
|
||||
import org.wso2.siddhi.query.api.definition.Attribute;
|
||||
|
||||
public abstract class GeoOperation {
|
||||
public boolean point = false;
|
||||
protected Object data;
|
||||
public PreparedGeometry geometry = null;
|
||||
|
||||
public void init(ExpressionExecutor[] attributeExpressionExecutors, int start, int end) {
|
||||
int position = start;
|
||||
if (attributeExpressionExecutors[position].getReturnType() == Attribute.Type.DOUBLE) {
|
||||
point = true;
|
||||
if (attributeExpressionExecutors[position + 1].getReturnType() != Attribute.Type.DOUBLE) {
|
||||
throw new ExecutionPlanCreationException("Longitude and Latitude must be provided as double values");
|
||||
}
|
||||
++position;
|
||||
} else if (attributeExpressionExecutors[position].getReturnType() == Attribute.Type.STRING) {
|
||||
point = false;
|
||||
} else {
|
||||
throw new ExecutionPlanCreationException((position + 1) +
|
||||
" parameter should be a string for a geometry or a double for a latitude");
|
||||
}
|
||||
++position;
|
||||
if (position >= end) {
|
||||
return;
|
||||
}
|
||||
if (attributeExpressionExecutors[position].getReturnType() != Attribute.Type.STRING) {
|
||||
throw new ExecutionPlanCreationException((position + 1) + " parameter should be a GeoJSON geometry string");
|
||||
}
|
||||
if (attributeExpressionExecutors[position] instanceof ConstantExpressionExecutor) {
|
||||
String strGeometry = attributeExpressionExecutors[position].execute(null).toString();
|
||||
geometry = GeometryUtils.preparedGeometryFromJSON(strGeometry);
|
||||
}
|
||||
}
|
||||
|
||||
public Object process(Object[] data) {
|
||||
Geometry currentGeometry;
|
||||
if (point) {
|
||||
double longitude = (Double) data[0];
|
||||
double latitude = (Double) data[1];
|
||||
currentGeometry = GeometryUtils.createPoint(longitude, latitude);
|
||||
} else {
|
||||
currentGeometry = GeometryUtils.geometryFromJSON(data[0].toString());
|
||||
}
|
||||
if (geometry != null) {
|
||||
return operation(currentGeometry, geometry, data);
|
||||
} else {
|
||||
return operation(currentGeometry, GeometryUtils.geometryFromJSON(data[point ? 2 : 1].toString()),
|
||||
data);
|
||||
}
|
||||
}
|
||||
|
||||
public Geometry getCurrentGeometry(Object[] data) {
|
||||
if (point) {
|
||||
double longitude = (Double) data[0];
|
||||
double latitude = (Double) data[1];
|
||||
return GeometryUtils.createPoint(longitude, latitude);
|
||||
} else {
|
||||
return GeometryUtils.createGeometry(data[0]);
|
||||
}
|
||||
}
|
||||
|
||||
public abstract Object operation(Geometry a, Geometry b, Object[] data);
|
||||
|
||||
public abstract Object operation(Geometry a, PreparedGeometry b, Object[] data);
|
||||
|
||||
public abstract Attribute.Type getReturnType();
|
||||
}
|
@ -1,87 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.wso2.gpl.siddhi.extensions.geo.internal.util;
|
||||
|
||||
import com.google.gson.JsonArray;
|
||||
import com.google.gson.JsonObject;
|
||||
import com.google.gson.JsonParser;
|
||||
import com.vividsolutions.jts.geom.Coordinate;
|
||||
import com.vividsolutions.jts.geom.Geometry;
|
||||
import com.vividsolutions.jts.geom.GeometryFactory;
|
||||
import com.vividsolutions.jts.geom.Point;
|
||||
import com.vividsolutions.jts.geom.prep.PreparedGeometry;
|
||||
import com.vividsolutions.jts.geom.prep.PreparedGeometryFactory;
|
||||
import org.geotools.geojson.geom.GeometryJSON;
|
||||
import org.geotools.geometry.jts.JTSFactoryFinder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.StringWriter;
|
||||
|
||||
public class GeometryUtils {
|
||||
|
||||
public static final double TO_DEGREE = 110574.61087757687;
|
||||
private static final String COORDINATES = "coordinates";
|
||||
private static final String RADIUS = "radius";
|
||||
private static GeometryFactory geometryFactory = JTSFactoryFinder.getGeometryFactory();
|
||||
//JTSFactoryFinder.getGeometryFactory(new Hints(Hints.CRS, DefaultGeographicCRS.WGS84));
|
||||
private static GeometryJSON geometryJSON = new GeometryJSON(10);
|
||||
|
||||
public static Geometry geometryFromJSON(String strGeometry) {
|
||||
if (strGeometry.contains(RADIUS)) {
|
||||
JsonObject jsonObject = new JsonParser().parse(strGeometry).getAsJsonObject();
|
||||
JsonArray jLocCoordinatesArray = jsonObject.getAsJsonArray(COORDINATES);
|
||||
Coordinate coords = new Coordinate(Double.parseDouble(jLocCoordinatesArray.get(0)
|
||||
.toString()), Double.parseDouble(jLocCoordinatesArray.get(1).toString()));
|
||||
Point point = geometryFactory.createPoint(coords); // create the points for GeoJSON file points
|
||||
double radius = Double.parseDouble(jsonObject.get(RADIUS).toString()) / TO_DEGREE; //convert to degrees
|
||||
return point.buffer(radius); //draw the buffer
|
||||
} else {
|
||||
try {
|
||||
return geometryJSON.read(strGeometry.replace("'", "\""));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Failed to create a geometry from given str " + strGeometry, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static String geometrytoJSON(Geometry geometry) {
|
||||
StringWriter sw = new StringWriter();
|
||||
try {
|
||||
geometryJSON.write(geometry, sw);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Failed to create a json string from given geometry " + geometry, e);
|
||||
}
|
||||
return sw.toString();
|
||||
}
|
||||
|
||||
public static PreparedGeometry preparedGeometryFromJSON(String strGeometry) {
|
||||
return PreparedGeometryFactory.prepare(geometryFromJSON(strGeometry));
|
||||
}
|
||||
public static Point createPoint(double longitude, double latitude) {
|
||||
return geometryFactory.createPoint(new Coordinate(longitude, latitude));
|
||||
}
|
||||
public static Geometry createGeometry(Object data){
|
||||
if(data instanceof Geometry) {
|
||||
return (Geometry) data;
|
||||
}
|
||||
else {
|
||||
return geometryFromJSON(data.toString());
|
||||
}
|
||||
}
|
||||
}
|
@ -1,40 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.wso2.gpl.siddhi.extensions.geo.internal.util;
|
||||
|
||||
import com.vividsolutions.jts.geom.Geometry;
|
||||
import com.vividsolutions.jts.geom.prep.PreparedGeometry;
|
||||
import org.wso2.siddhi.query.api.definition.Attribute;
|
||||
|
||||
public class IntersectsOperation extends GeoOperation {
|
||||
@Override
|
||||
public Object operation(Geometry a, Geometry b, Object[] data) {
|
||||
return a.intersects(b);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object operation(Geometry a, PreparedGeometry b, Object[] data) {
|
||||
return b.intersects(a);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Attribute.Type getReturnType() {
|
||||
return Attribute.Type.BOOL;
|
||||
}
|
||||
}
|
@ -1,40 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.wso2.gpl.siddhi.extensions.geo.internal.util;
|
||||
|
||||
import com.vividsolutions.jts.geom.Geometry;
|
||||
import com.vividsolutions.jts.geom.prep.PreparedGeometry;
|
||||
import org.wso2.siddhi.query.api.definition.Attribute;
|
||||
|
||||
public class WithinDistanceOperation extends GeoOperation {
|
||||
@Override
|
||||
public Object operation(Geometry a, Geometry b, Object[] data) {
|
||||
return a.isWithinDistance(b, (Double) data[data.length - 1] / GeometryUtils.TO_DEGREE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object operation(Geometry a, PreparedGeometry b, Object[] data) {
|
||||
return a.isWithinDistance(b.getGeometry(), (Double) data[data.length - 1] / GeometryUtils.TO_DEGREE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Attribute.Type getReturnType() {
|
||||
return Attribute.Type.BOOL;
|
||||
}
|
||||
}
|
@ -1,40 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.wso2.gpl.siddhi.extensions.geo.internal.util;
|
||||
|
||||
import com.vividsolutions.jts.geom.Geometry;
|
||||
import com.vividsolutions.jts.geom.prep.PreparedGeometry;
|
||||
import org.wso2.siddhi.query.api.definition.Attribute;
|
||||
|
||||
public class WithinOperation extends GeoOperation {
|
||||
@Override
|
||||
public Object operation(Geometry a, Geometry b, Object[] data) {
|
||||
return a.within(b);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object operation(Geometry a, PreparedGeometry b, Object[] data) {
|
||||
return b.contains(a);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Attribute.Type getReturnType() {
|
||||
return Attribute.Type.BOOL;
|
||||
}
|
||||
}
|
@ -1,138 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.wso2.gpl.siddhi.extensions.geo.stream;
|
||||
|
||||
import org.wso2.siddhi.core.config.ExecutionPlanContext;
|
||||
import org.wso2.siddhi.core.event.ComplexEvent;
|
||||
import org.wso2.siddhi.core.event.ComplexEventChunk;
|
||||
import org.wso2.siddhi.core.event.stream.StreamEvent;
|
||||
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
|
||||
import org.wso2.siddhi.core.event.stream.populater.ComplexEventPopulater;
|
||||
import org.wso2.siddhi.core.executor.ExpressionExecutor;
|
||||
import org.wso2.siddhi.core.query.processor.Processor;
|
||||
import org.wso2.siddhi.core.query.processor.stream.StreamProcessor;
|
||||
import org.wso2.gpl.siddhi.extensions.geo.internal.util.GeoOperation;
|
||||
import org.wso2.gpl.siddhi.extensions.geo.internal.util.WithinOperation;
|
||||
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
|
||||
import org.wso2.siddhi.query.api.definition.Attribute;
|
||||
import org.wso2.siddhi.query.api.definition.Attribute.Type;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public class GeoCrossesStreamProcessor extends StreamProcessor {
|
||||
|
||||
private GeoOperation geoOperation;
|
||||
private Set<String> set = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
|
||||
|
||||
/**
|
||||
* The init method of the StreamProcessor, this method will be called before other methods
|
||||
*
|
||||
* @param inputDefinition the incoming stream definition
|
||||
* @param attributeExpressionExecutors the executors of each function parameters
|
||||
* @param executionPlanContext the context of the execution plan
|
||||
* @return the additional output attributes introduced by the function
|
||||
*/
|
||||
@Override
|
||||
protected List<Attribute> init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
|
||||
geoOperation = new WithinOperation();
|
||||
geoOperation.init(attributeExpressionExecutors, 1, attributeExpressionLength);
|
||||
List<Attribute> attributeList = new ArrayList<Attribute>(1);
|
||||
attributeList.add(new Attribute("crosses", Type.BOOL));
|
||||
return attributeList;
|
||||
}
|
||||
|
||||
/**
|
||||
* This will be called only once, to acquire required resources
|
||||
* after initializing the system and before processing the events.
|
||||
*/
|
||||
@Override
|
||||
public void start() {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* This will be called only once, to release the acquired resources
|
||||
* before shutting down the system.
|
||||
*/
|
||||
@Override
|
||||
public void stop() {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* The serializable state of the element, that need to be
|
||||
* persisted for the reconstructing the element to the same state
|
||||
* on a different point of time
|
||||
*
|
||||
* @return stateful objects of the element as an array
|
||||
*/
|
||||
@Override
|
||||
public Object[] currentState() {
|
||||
return new Object[0];
|
||||
}
|
||||
|
||||
/**
|
||||
* The serialized state of the element, for reconstructing
|
||||
* the element to the same state as if was on a previous point of time.
|
||||
*
|
||||
* @param state the stateful objects of the element as an array on
|
||||
* the same order provided by currentState().
|
||||
*/
|
||||
@Override
|
||||
public void restoreState(Object[] state) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) {
|
||||
while (streamEventChunk.hasNext()) {
|
||||
ComplexEvent complexEvent = streamEventChunk.next();
|
||||
|
||||
Object[] data = new Object[attributeExpressionLength - 1];
|
||||
for (int i = 1; i < attributeExpressionLength; i++) {
|
||||
data[i-1] = attributeExpressionExecutors[i].execute(complexEvent);
|
||||
}
|
||||
boolean within = (Boolean)geoOperation.process(data);
|
||||
|
||||
String id = attributeExpressionExecutors[0].execute(complexEvent).toString();
|
||||
if (set.contains(id)) {
|
||||
if (!within) {
|
||||
//alert out
|
||||
complexEventPopulater.populateComplexEvent(complexEvent, new Object[]{within});
|
||||
set.remove(id);
|
||||
} else {
|
||||
streamEventChunk.remove();
|
||||
}
|
||||
} else {
|
||||
if (within) {
|
||||
//alert in
|
||||
complexEventPopulater.populateComplexEvent(complexEvent, new Object[]{within});
|
||||
set.add(id);
|
||||
} else {
|
||||
streamEventChunk.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
nextProcessor.process(streamEventChunk);
|
||||
}
|
||||
}
|
@ -1,246 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.wso2.gpl.siddhi.extensions.geo.stream;
|
||||
|
||||
import org.wso2.siddhi.core.config.ExecutionPlanContext;
|
||||
import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
|
||||
import org.wso2.siddhi.core.executor.ExpressionExecutor;
|
||||
import org.wso2.siddhi.core.query.processor.stream.function.StreamFunctionProcessor;
|
||||
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
|
||||
import org.wso2.siddhi.query.api.definition.Attribute;
|
||||
import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* geoLocationApproximate(locationRecorder, latitude, longitude, sensorProximity, sensorUUID, sensorWeight, timestamp)
|
||||
*
|
||||
* This method computed the average location of the locationRecorder using the collection iBeacons which the location
|
||||
* recorder resides.
|
||||
*
|
||||
* locationRecorder - unique id of the object or item
|
||||
* latitude - latitude value of the iBeacon
|
||||
* longitude - longitude value of the iBeacon
|
||||
* sensorProximity - proximity which will be given by the iBeacon (eg: ENTER, RANGE, EXIT)
|
||||
* sensorUUID - unique id of the iBeacon
|
||||
* sensorWeight - weight of the iBeacon which influence the averaging of the location (eg: approximate distance from
|
||||
* the beacon
|
||||
* timestamp - timestamp of the log which will be used to remove iBeacon from one's collection when there is no
|
||||
* new log for 5 minutes
|
||||
*
|
||||
* Accept Type(s) for geoLocationApproximate(locationRecorder, latitude, longitude, sensorProximity, sensorUUID,
|
||||
* sensorWeight, timestamp);
|
||||
* locationRecorder : STRING
|
||||
* latitude : DOUBLE
|
||||
* longitude : DOUBLE
|
||||
* sensorProximity : STRING
|
||||
* sensorUUID : STRING
|
||||
* sensorWeight : DOUBLE
|
||||
* timestamp : LONG
|
||||
*
|
||||
* Return Type(s): DOUBLE, DOUBLE, BOOL
|
||||
*
|
||||
*/
|
||||
public class GeoLocationApproximateStreamProcessor extends StreamFunctionProcessor {
|
||||
|
||||
//locationRecorder,uuid -> BeaconValueHolder
|
||||
private Map<String, Map<String, BeaconValueHolder>>
|
||||
personSpecificRecordLocatorMaps;
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object[] currentState() {
|
||||
return new Object[]{personSpecificRecordLocatorMaps};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void restoreState(Object[] state) {
|
||||
personSpecificRecordLocatorMaps = (Map<String, Map<String, BeaconValueHolder>>) state[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object[] process(Object[] data) {
|
||||
if (data[0] == null) {
|
||||
throw new ExecutionPlanRuntimeException("Invalid input given to geo:locationApproximate() " +
|
||||
"function. First argument should be string");
|
||||
}
|
||||
if (data[1] == null) {
|
||||
throw new ExecutionPlanRuntimeException("Invalid input given to geo:locationApproximate() " +
|
||||
"function. Second argument should be double");
|
||||
}
|
||||
if (data[2] == null) {
|
||||
throw new ExecutionPlanRuntimeException("Invalid input given to geo:locationApproximate() " +
|
||||
"function. Third argument should be double");
|
||||
}
|
||||
if (data[3] == null) {
|
||||
throw new ExecutionPlanRuntimeException("Invalid input given to geo:locationApproximate() " +
|
||||
"function. Forth argument should be string");
|
||||
}
|
||||
if (data[4] == null) {
|
||||
throw new ExecutionPlanRuntimeException("Invalid input given to geo:locationApproximate() " +
|
||||
"function. Fifth argument should be string");
|
||||
}
|
||||
if (data[5] == null) {
|
||||
throw new ExecutionPlanRuntimeException("Invalid input given to geo:locationApproximate() " +
|
||||
"function. Sixth argument should be double");
|
||||
}
|
||||
if (data[6] == null) {
|
||||
throw new ExecutionPlanRuntimeException("Invalid input given to geo:locationApproximate() " +
|
||||
"function. Seventh argument should be long");
|
||||
}
|
||||
|
||||
String locationRecorder = (String) data[0];
|
||||
double latitude = (Double) data[1];
|
||||
double longitude = (Double) data[2];
|
||||
String beaconProximity = (String) data[3];
|
||||
String uuid = (String) data[4];
|
||||
double weight = (Double) data[5]; //is calculated previously eg: distance
|
||||
long timestamp = (Long) data[6];
|
||||
|
||||
if (personSpecificRecordLocatorMaps.get(locationRecorder) == null) {
|
||||
personSpecificRecordLocatorMaps.put(locationRecorder, new ConcurrentHashMap<String, BeaconValueHolder>());
|
||||
}
|
||||
//both "enter" and "range" attributes are there in the logs I retrieve when it comes to cleaning logs
|
||||
if ("ENTER".equalsIgnoreCase(beaconProximity) || "RANGE".equalsIgnoreCase(beaconProximity)) {
|
||||
if (personSpecificRecordLocatorMaps.get(locationRecorder).containsKey(uuid)) {
|
||||
BeaconValueHolder tempBeaconValue = personSpecificRecordLocatorMaps.get(locationRecorder).get(uuid);
|
||||
if (tempBeaconValue.getLastUpdatedTime() < timestamp) {
|
||||
BeaconValueHolder beaconValueHolder = new BeaconValueHolder(latitude, longitude, timestamp, weight);
|
||||
personSpecificRecordLocatorMaps.get(locationRecorder).put(uuid, beaconValueHolder);
|
||||
}
|
||||
} else {
|
||||
BeaconValueHolder beaconValueHolder = new BeaconValueHolder(latitude, longitude, timestamp, weight);
|
||||
personSpecificRecordLocatorMaps.get(locationRecorder).put(uuid, beaconValueHolder);
|
||||
}
|
||||
} else {
|
||||
if (personSpecificRecordLocatorMaps.get(locationRecorder).containsKey(uuid)) {
|
||||
BeaconValueHolder tempBeaconValue = personSpecificRecordLocatorMaps.get(locationRecorder).get(uuid);
|
||||
if (tempBeaconValue.getLastUpdatedTime() < timestamp) {
|
||||
personSpecificRecordLocatorMaps.get(locationRecorder).remove(uuid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int noOfSensors = personSpecificRecordLocatorMaps.get(locationRecorder).size();
|
||||
double sensorValues[][] = new double[noOfSensors][3];
|
||||
int actualNoOfSensors = 0;
|
||||
double totalWeight = 0;
|
||||
for (Map.Entry<String, BeaconValueHolder> beaconLocation : personSpecificRecordLocatorMaps.get(locationRecorder).entrySet()) {
|
||||
BeaconValueHolder beaconValueHolder = beaconLocation.getValue();
|
||||
long prevTimestamp = beaconValueHolder.getLastUpdatedTime();
|
||||
if ((timestamp - prevTimestamp) > 300000) {
|
||||
//if there is a beacon which has a log older than 5 minutes, removing the beacon assuming the
|
||||
//device has gone away from that beacon
|
||||
personSpecificRecordLocatorMaps.get(locationRecorder).remove(beaconLocation.getKey());
|
||||
} else {
|
||||
sensorValues[actualNoOfSensors][0] = beaconValueHolder.getLatitude();
|
||||
sensorValues[actualNoOfSensors][1] = beaconValueHolder.getLongitude();
|
||||
sensorValues[actualNoOfSensors][2] = beaconValueHolder.getBeaconDistance();
|
||||
totalWeight += beaconValueHolder.getBeaconDistance();
|
||||
actualNoOfSensors++;
|
||||
}
|
||||
}
|
||||
if (actualNoOfSensors == 0) {
|
||||
return new Object[]{latitude, longitude, false};
|
||||
}
|
||||
|
||||
double tempLatitude, tempLongitude;
|
||||
double x = 0;
|
||||
double y = 0;
|
||||
double z = 0;
|
||||
for (int i = 0; i < actualNoOfSensors; i++) {
|
||||
weight = sensorValues[i][2] / totalWeight;
|
||||
tempLatitude = sensorValues[i][0] * Math.PI / 180.0;
|
||||
tempLongitude = sensorValues[i][1] * Math.PI / 180.0;
|
||||
x += Math.cos(tempLatitude) * Math.cos(tempLongitude) * weight;
|
||||
y += Math.cos(tempLatitude) * Math.sin(tempLongitude) * weight;
|
||||
z += Math.sin(tempLatitude) * weight;
|
||||
}
|
||||
longitude = Math.atan2(y, x) * 180 / Math.PI;
|
||||
double hyp = Math.sqrt(x * x + y * y);
|
||||
latitude = Math.atan2(z, hyp) * 180 / Math.PI;
|
||||
|
||||
return new Object[]{latitude, longitude, true};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object[] process(Object data) {
|
||||
return new Object[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<Attribute> init(AbstractDefinition inputDefinition,
|
||||
ExpressionExecutor[] attributeExpressionExecutors,
|
||||
ExecutionPlanContext executionPlanContext) {
|
||||
personSpecificRecordLocatorMaps = new ConcurrentHashMap<String, Map<String, BeaconValueHolder>>();
|
||||
if (attributeExpressionExecutors.length != 7) {
|
||||
throw new ExecutionPlanValidationException("Invalid no of arguments passed to " +
|
||||
"geo:locationApproximate() function, " +
|
||||
"requires 7, but found " + attributeExpressionExecutors.length);
|
||||
}
|
||||
ArrayList<Attribute> attributes = new ArrayList<Attribute>(3);
|
||||
attributes.add(new Attribute("averagedLatitude", Attribute.Type.DOUBLE));
|
||||
attributes.add(new Attribute("averagedLongitude", Attribute.Type.DOUBLE));
|
||||
attributes.add(new Attribute("averageExist", Attribute.Type.BOOL));
|
||||
return attributes;
|
||||
}
|
||||
|
||||
private class BeaconValueHolder {
|
||||
private double latitude;
|
||||
private double longitude;
|
||||
private long lastUpdatedTime;
|
||||
private double beaconDistance;
|
||||
|
||||
public BeaconValueHolder(double latitude, double longitude, long lastUpdatedTime, double
|
||||
beaconDistance) {
|
||||
this.latitude = latitude;
|
||||
this.longitude = longitude;
|
||||
this.lastUpdatedTime = lastUpdatedTime;
|
||||
this.beaconDistance = beaconDistance;
|
||||
}
|
||||
|
||||
public long getLastUpdatedTime() {
|
||||
return lastUpdatedTime;
|
||||
}
|
||||
|
||||
public double getBeaconDistance() {
|
||||
return beaconDistance;
|
||||
}
|
||||
|
||||
public double getLatitude() {
|
||||
return latitude;
|
||||
}
|
||||
|
||||
public double getLongitude() {
|
||||
return longitude;
|
||||
}
|
||||
}
|
||||
}
|
@ -1,179 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.wso2.gpl.siddhi.extensions.geo.stream;
|
||||
|
||||
import com.vividsolutions.jts.geom.Geometry;
|
||||
import org.wso2.siddhi.core.config.ExecutionPlanContext;
|
||||
import org.wso2.siddhi.core.event.ComplexEventChunk;
|
||||
import org.wso2.siddhi.core.event.stream.StreamEvent;
|
||||
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
|
||||
import org.wso2.siddhi.core.event.stream.populater.ComplexEventPopulater;
|
||||
import org.wso2.siddhi.core.exception.ExecutionPlanCreationException;
|
||||
import org.wso2.siddhi.core.executor.ExpressionExecutor;
|
||||
import org.wso2.siddhi.core.query.processor.Processor;
|
||||
import org.wso2.siddhi.core.query.processor.stream.StreamProcessor;
|
||||
import org.wso2.gpl.siddhi.extensions.geo.internal.util.GeoOperation;
|
||||
import org.wso2.gpl.siddhi.extensions.geo.internal.util.WithinDistanceOperation;
|
||||
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
|
||||
import org.wso2.siddhi.query.api.definition.Attribute;
|
||||
import org.wso2.siddhi.query.api.definition.Attribute.Type;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public class GeoProximityStreamProcessor extends StreamProcessor {
|
||||
|
||||
private GeoOperation geoOperation;
|
||||
private double radius;
|
||||
private ConcurrentHashMap<String, Geometry> map = new ConcurrentHashMap<String, Geometry>();
|
||||
private Set<String> set = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
|
||||
|
||||
/**
|
||||
* The init method of the StreamProcessor, this method will be called before other methods
|
||||
*
|
||||
* @param inputDefinition the incoming stream definition
|
||||
* @param attributeExpressionExecutors the executors of each function parameters
|
||||
* @param executionPlanContext the context of the execution plan
|
||||
* @return the additional output attributes introduced by the function
|
||||
*/
|
||||
@Override
|
||||
protected List<Attribute> init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
|
||||
this.geoOperation = new WithinDistanceOperation();
|
||||
this.geoOperation.init(attributeExpressionExecutors, 1, attributeExpressionLength - 1);
|
||||
if (attributeExpressionExecutors[attributeExpressionLength - 1].getReturnType() != Type.DOUBLE) {
|
||||
throw new ExecutionPlanCreationException("Last parameter should be a double");
|
||||
}
|
||||
radius = (Double) attributeExpressionExecutors[attributeExpressionLength - 1].execute(null);
|
||||
ArrayList<Attribute> attributeList = new ArrayList<Attribute>();
|
||||
attributeList.add(new Attribute("proximityWith", Type.STRING));
|
||||
attributeList.add(new Attribute("inCloseProximity", Type.BOOL));
|
||||
return attributeList;
|
||||
}
|
||||
|
||||
/**
|
||||
* This will be called only once, to acquire required resources
|
||||
* after initializing the system and before processing the events.
|
||||
*/
|
||||
@Override
|
||||
public void start() {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* This will be called only once, to release the acquired resources
|
||||
* before shutting down the system.
|
||||
*/
|
||||
@Override
|
||||
public void stop() {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* The serializable state of the element, that need to be
|
||||
* persisted for the reconstructing the element to the same state
|
||||
* on a different point of time
|
||||
*
|
||||
* @return stateful objects of the element as an array
|
||||
*/
|
||||
@Override
|
||||
public Object[] currentState() {
|
||||
return new Object[0];
|
||||
}
|
||||
|
||||
/**
|
||||
* The serialized state of the element, for reconstructing
|
||||
* the element to the same state as if was on a previous point of time.
|
||||
*
|
||||
* @param state the stateful objects of the element as an array on
|
||||
* the same order provided by currentState().
|
||||
*/
|
||||
@Override
|
||||
public void restoreState(Object[] state) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* The main processing method that will be called upon event arrival
|
||||
*
|
||||
* @param streamEventChunk the event chunk that need to be processed
|
||||
* @param nextProcessor the next processor to which the success events need to be passed
|
||||
* @param streamEventCloner helps to clone the incoming event for local storage or modification
|
||||
* @param complexEventPopulater helps to populate the events with the resultant attributes
|
||||
*/
|
||||
@Override
|
||||
protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) {
|
||||
while (streamEventChunk.hasNext()) {
|
||||
StreamEvent streamEvent = streamEventChunk.next();
|
||||
Geometry currentGeometry, previousGeometry;
|
||||
Object[] data = new Object[attributeExpressionLength - 1];
|
||||
for (int i = 1; i < attributeExpressionLength; i++) {
|
||||
data[i - 1] = attributeExpressionExecutors[i].execute(streamEvent);
|
||||
}
|
||||
String currentId = attributeExpressionExecutors[0].execute(streamEvent).toString();
|
||||
String previousId;
|
||||
currentGeometry = geoOperation.getCurrentGeometry(data);
|
||||
if(!map.contains(currentId)) {
|
||||
map.put(currentId, currentGeometry);
|
||||
}
|
||||
for (Map.Entry<String, Geometry> entry : map.entrySet()) {
|
||||
previousId = entry.getKey();
|
||||
if (!previousId.equals(currentId)) {
|
||||
previousGeometry = entry.getValue();
|
||||
boolean within = (Boolean) geoOperation.operation(currentGeometry, previousGeometry, new Object[]{radius});
|
||||
String key = makeCompositeKey(currentId, previousId);
|
||||
boolean contains = set.contains(key);
|
||||
if (contains) {
|
||||
if (!within) {
|
||||
//alert out
|
||||
StreamEvent newStreamEvent = streamEventCloner.copyStreamEvent(streamEvent);
|
||||
complexEventPopulater.populateComplexEvent(newStreamEvent, new Object[]{previousId, within});
|
||||
streamEventChunk.insertBeforeCurrent(newStreamEvent);
|
||||
set.remove(key);
|
||||
}
|
||||
} else {
|
||||
if (within) {
|
||||
//alert in
|
||||
StreamEvent newStreamEvent = streamEventCloner.copyStreamEvent(streamEvent);
|
||||
complexEventPopulater.populateComplexEvent(newStreamEvent, new Object[]{previousId, within});
|
||||
streamEventChunk.insertBeforeCurrent(newStreamEvent);
|
||||
set.add(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
streamEventChunk.remove();
|
||||
}
|
||||
nextProcessor.process(streamEventChunk);
|
||||
}
|
||||
/*
|
||||
private Object[] toOutput(Geometry geometry, boolean within) {
|
||||
if (geoOperation.point) {
|
||||
return new Object[]{((Point) geometry).getX(), ((Point) geometry).getY(), within};
|
||||
} else {
|
||||
return new Object[]{GeometryUtils.geometrytoJSON(geometry), within};
|
||||
}
|
||||
}*/
|
||||
public String makeCompositeKey(String key1, String key2) {
|
||||
if (key1.compareToIgnoreCase(key2) < 0) {
|
||||
return key1 + "~" + key2;
|
||||
} else {
|
||||
return key2 + "~" + key1;
|
||||
}
|
||||
}
|
||||
}
|
@ -1,154 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.wso2.gpl.siddhi.extensions.geo.stream;
|
||||
|
||||
import com.vividsolutions.jts.geom.Geometry;
|
||||
import org.wso2.siddhi.core.config.ExecutionPlanContext;
|
||||
import org.wso2.siddhi.core.event.ComplexEvent;
|
||||
import org.wso2.siddhi.core.event.ComplexEventChunk;
|
||||
import org.wso2.siddhi.core.event.stream.StreamEvent;
|
||||
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
|
||||
import org.wso2.siddhi.core.event.stream.populater.ComplexEventPopulater;
|
||||
import org.wso2.siddhi.core.exception.ExecutionPlanCreationException;
|
||||
import org.wso2.siddhi.core.executor.ExpressionExecutor;
|
||||
import org.wso2.siddhi.core.query.processor.Processor;
|
||||
import org.wso2.siddhi.core.query.processor.stream.StreamProcessor;
|
||||
import org.wso2.gpl.siddhi.extensions.geo.internal.util.GeoOperation;
|
||||
import org.wso2.gpl.siddhi.extensions.geo.internal.util.WithinDistanceOperation;
|
||||
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
|
||||
import org.wso2.siddhi.query.api.definition.Attribute;
|
||||
import org.wso2.siddhi.query.api.definition.Attribute.Type;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public class GeoStationaryStreamProcessor extends StreamProcessor {
|
||||
|
||||
private GeoOperation geoOperation;
|
||||
private double radius;
|
||||
private ConcurrentHashMap<String, Geometry> map = new ConcurrentHashMap<String, Geometry>();
|
||||
|
||||
/**
|
||||
* The init method of the StreamProcessor, this method will be called before other methods
|
||||
*
|
||||
* @param inputDefinition the incoming stream definition
|
||||
* @param attributeExpressionExecutors the executors of each function parameters
|
||||
* @param executionPlanContext the context of the execution plan
|
||||
* @return the additional output attributes introduced by the function
|
||||
*/
|
||||
@Override
|
||||
protected List<Attribute> init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
|
||||
geoOperation = new WithinDistanceOperation();
|
||||
geoOperation.init(attributeExpressionExecutors, 1, attributeExpressionLength - 1);
|
||||
if (attributeExpressionExecutors[attributeExpressionLength - 1].getReturnType() != Type.DOUBLE) {
|
||||
throw new ExecutionPlanCreationException("Last parameter should be a double");
|
||||
}
|
||||
radius = (Double) attributeExpressionExecutors[attributeExpressionLength - 1].execute(null);
|
||||
ArrayList<Attribute> attributeList = new ArrayList<Attribute>(1);
|
||||
attributeList.add(new Attribute("stationary", Type.BOOL));
|
||||
return attributeList;
|
||||
}
|
||||
|
||||
/**
|
||||
* This will be called only once, to acquire required resources
|
||||
* after initializing the system and before processing the events.
|
||||
*/
|
||||
@Override
|
||||
public void start() {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* This will be called only once, to release the acquired resources
|
||||
* before shutting down the system.
|
||||
*/
|
||||
@Override
|
||||
public void stop() {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* The serializable state of the element, that need to be
|
||||
* persisted for the reconstructing the element to the same state
|
||||
* on a different point of time
|
||||
*
|
||||
* @return stateful objects of the element as an array
|
||||
*/
|
||||
@Override
|
||||
public Object[] currentState() {
|
||||
return new Object[0];
|
||||
}
|
||||
|
||||
/**
|
||||
* The serialized state of the element, for reconstructing
|
||||
* the element to the same state as if was on a previous point of time.
|
||||
*
|
||||
* @param state the stateful objects of the element as an array on
|
||||
* the same order provided by currentState().
|
||||
*/
|
||||
@Override
|
||||
public void restoreState(Object[] state) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) {
|
||||
while (streamEventChunk.hasNext()) {
|
||||
ComplexEvent complexEvent = streamEventChunk.next();
|
||||
|
||||
Object[] data = new Object[attributeExpressionLength - 1];
|
||||
for (int i = 1; i < attributeExpressionLength; i++) {
|
||||
data[i - 1] = attributeExpressionExecutors[i].execute(complexEvent);
|
||||
}
|
||||
Geometry currentGeometry = geoOperation.getCurrentGeometry(data);
|
||||
String id = attributeExpressionExecutors[0].execute(complexEvent).toString();
|
||||
Geometry previousGeometry = map.get(id);
|
||||
if (previousGeometry == null) {
|
||||
currentGeometry.setUserData(false);
|
||||
map.put(id, currentGeometry);
|
||||
streamEventChunk.remove();
|
||||
continue;
|
||||
}
|
||||
boolean stationary = (Boolean) geoOperation.operation(currentGeometry, previousGeometry, new Object[]{radius});
|
||||
|
||||
if((Boolean)previousGeometry.getUserData()) {
|
||||
if(!stationary) {
|
||||
//alert out
|
||||
complexEventPopulater.populateComplexEvent(complexEvent, new Object[]{stationary});
|
||||
currentGeometry.setUserData(stationary);
|
||||
map.put(id, currentGeometry);
|
||||
} else {
|
||||
streamEventChunk.remove();
|
||||
}
|
||||
} else {
|
||||
if (stationary) {
|
||||
//alert in
|
||||
previousGeometry.setUserData(stationary);
|
||||
complexEventPopulater.populateComplexEvent(complexEvent, new Object[]{stationary});
|
||||
} else {
|
||||
currentGeometry.setUserData(stationary);
|
||||
map.put(id, currentGeometry);
|
||||
streamEventChunk.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
nextProcessor.process(streamEventChunk);
|
||||
}
|
||||
}
|
@ -1,80 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.wso2.gpl.siddhi.extensions.geo.stream.function;
|
||||
|
||||
import com.vividsolutions.jts.geom.Coordinate;
|
||||
import org.wso2.siddhi.core.config.ExecutionPlanContext;
|
||||
import org.wso2.siddhi.core.executor.ExpressionExecutor;
|
||||
import org.wso2.siddhi.core.query.processor.stream.function.StreamFunctionProcessor;
|
||||
import org.wso2.gpl.siddhi.extensions.geo.internal.util.ClosestOperation;
|
||||
import org.wso2.gpl.siddhi.extensions.geo.internal.util.GeoOperation;
|
||||
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
|
||||
import org.wso2.siddhi.query.api.definition.Attribute;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class GeoClosestPointsStreamFunctionProcessor extends StreamFunctionProcessor {
|
||||
|
||||
GeoOperation geoOperation;
|
||||
|
||||
@Override
|
||||
protected List<Attribute> init(AbstractDefinition abstractDefinition, ExpressionExecutor[] expressionExecutors, ExecutionPlanContext executionPlanContext) {
|
||||
this.geoOperation = new ClosestOperation();
|
||||
this.geoOperation.init(attributeExpressionExecutors, 0, attributeExpressionExecutors.length);
|
||||
List<Attribute> attributeList = new ArrayList<Attribute>(4);
|
||||
attributeList.add(new Attribute("closestPointOf1From2Latitude", Attribute.Type.DOUBLE));
|
||||
attributeList.add(new Attribute("closestPointOf1From2Longitude", Attribute.Type.DOUBLE));
|
||||
attributeList.add(new Attribute("closestPointOf2From1Latitude", Attribute.Type.DOUBLE));
|
||||
attributeList.add(new Attribute("closestPointOf2From1Longitude", Attribute.Type.DOUBLE));
|
||||
return attributeList;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object[] process(Object[] data) {
|
||||
Coordinate[] coordinates = (Coordinate[]) geoOperation.process(data);
|
||||
|
||||
return new Object[]{coordinates[0].x, coordinates[0].y, coordinates[1].x, coordinates[1].y};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object[] process(Object o) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object[] currentState() {
|
||||
return new Object[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public void restoreState(Object[] objects) {
|
||||
|
||||
}
|
||||
}
|
@ -1,26 +0,0 @@
|
||||
#
|
||||
# Copyright (C) 2015-2016 WSO2 Inc. (http://wso2.com)
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
|
||||
within=org.wso2.gpl.siddhi.extensions.geo.function.GeoWithinFunctionExecutor
|
||||
intersects=org.wso2.gpl.siddhi.extensions.geo.function.GeoIntersectsFunctionExecutor
|
||||
withinDistance=org.wso2.gpl.siddhi.extensions.geo.function.GeoWithinDistanceFunctionExecutor
|
||||
crosses=org.wso2.gpl.siddhi.extensions.geo.stream.GeoCrossesStreamProcessor
|
||||
proximity=org.wso2.gpl.siddhi.extensions.geo.stream.GeoProximityStreamProcessor
|
||||
stationary=org.wso2.gpl.siddhi.extensions.geo.stream.GeoStationaryStreamProcessor
|
||||
closestPoints=org.wso2.gpl.siddhi.extensions.geo.stream.function.GeoClosestPointsStreamFunctionProcessor
|
||||
locationApproximate=org.wso2.gpl.siddhi.extensions.geo.stream.GeoLocationApproximateStreamProcessor
|
||||
distance=org.wso2.gpl.siddhi.extensions.geo.function.GeoDistanceFunctionExecutor
|
@ -1,49 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.wso2.gpl.siddhi.extensions.geo;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
import org.junit.BeforeClass;
|
||||
import org.wso2.siddhi.core.ExecutionPlanRuntime;
|
||||
import org.wso2.siddhi.core.SiddhiManager;
|
||||
import org.wso2.siddhi.core.stream.input.InputHandler;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
||||
public abstract class GeoTestCase {
|
||||
protected static SiddhiManager siddhiManager;
|
||||
private static Logger logger = Logger.getLogger(GeoTestCase.class);
|
||||
protected static ArrayList<Object[]> data;
|
||||
protected static ArrayList<Boolean> expectedResult;
|
||||
protected static int eventCount;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
logger.info("Init Siddhi");// Create Siddhi Manager
|
||||
siddhiManager = new SiddhiManager();
|
||||
data = new ArrayList<Object[]>();
|
||||
expectedResult = new ArrayList<Boolean>();
|
||||
}
|
||||
|
||||
protected void generateEvents(ExecutionPlanRuntime executionPlanRuntime) throws Exception {
|
||||
InputHandler inputHandler = executionPlanRuntime.getInputHandler("dataIn");
|
||||
for (Object[] dataLine : data) {
|
||||
inputHandler.send(dataLine);
|
||||
}
|
||||
}
|
||||
}
|
@ -1,88 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.wso2.gpl.siddhi.extensions.geo.function;
|
||||
|
||||
import junit.framework.Assert;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.wso2.siddhi.core.ExecutionPlanRuntime;
|
||||
import org.wso2.siddhi.core.SiddhiManager;
|
||||
import org.wso2.siddhi.core.event.Event;
|
||||
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
|
||||
import org.wso2.siddhi.core.stream.input.InputHandler;
|
||||
import org.wso2.siddhi.core.util.EventPrinter;
|
||||
|
||||
|
||||
public class GeoDistanceTestCase {
|
||||
static final Logger log = Logger.getLogger(GeoDistanceTestCase.class);
|
||||
private volatile int count;
|
||||
private volatile boolean eventArrived;
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
count = 0;
|
||||
eventArrived = false;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGeoDistanceTestCase() throws InterruptedException {
|
||||
log.info("testGeoDistance TestCase");
|
||||
SiddhiManager siddhiManager = new SiddhiManager();
|
||||
|
||||
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(
|
||||
"@config(async = 'true') " +
|
||||
"define stream cleanedStream (latitude double, longitude double, prevLatitude double, " +
|
||||
"prevLongitude double); " +
|
||||
"@info(name = 'query1') " +
|
||||
"from cleanedStream " +
|
||||
"select geo:distance(latitude, longitude, prevLatitude, prevLongitude) as distance " +
|
||||
"insert into dataOut;");
|
||||
|
||||
executionPlanRuntime.addCallback("query1", new QueryCallback() {
|
||||
@Override
|
||||
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
|
||||
EventPrinter.print(timeStamp, inEvents, removeEvents);
|
||||
for (Event event : inEvents) {
|
||||
count++;
|
||||
if (count == 1) {
|
||||
Assert.assertEquals(2322119.848252557, event.getData(0));
|
||||
eventArrived = true;
|
||||
} else if (count == 2) {
|
||||
Assert.assertEquals(871946.8734223971, event.getData(0));
|
||||
eventArrived = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
InputHandler inputHandler = executionPlanRuntime.getInputHandler("cleanedStream");
|
||||
executionPlanRuntime.start();
|
||||
//getting distance near equator
|
||||
inputHandler.send(new Object[]{8.116553, 77.523679, 9.850047, 98.597177});
|
||||
Thread.sleep(500);
|
||||
//getting distance away from equator
|
||||
inputHandler.send(new Object[]{54.432063, 19.669778, 59.971487, 29.958951});
|
||||
Thread.sleep(100);
|
||||
|
||||
Assert.assertEquals(2, count);
|
||||
Assert.assertTrue(eventArrived);
|
||||
executionPlanRuntime.shutdown();
|
||||
}
|
||||
}
|
@ -1,73 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.wso2.gpl.siddhi.extensions.geo.function;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.wso2.siddhi.core.ExecutionPlanRuntime;
|
||||
import org.wso2.siddhi.core.event.Event;
|
||||
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
|
||||
import org.wso2.gpl.siddhi.extensions.geo.GeoTestCase;
|
||||
|
||||
public class GeoIntersectsTestCase extends GeoTestCase {
|
||||
private static Logger logger = Logger.getLogger(GeoIntersectsTestCase.class);
|
||||
|
||||
@Test
|
||||
public void testGeometry() throws Exception {
|
||||
logger.info("TestGeometry");
|
||||
|
||||
data.clear();
|
||||
expectedResult.clear();
|
||||
eventCount = 0;
|
||||
|
||||
data.add(new Object[]{"{'type':'Polygon','coordinates':[[[0.5, 0.5],[0.5, 1.5],[1.5, 1.5],[1.5, 0.5],[0.5, 0.5]]]}"});
|
||||
expectedResult.add(true);
|
||||
data.add(new Object[]{"{'type':'Circle','coordinates':[-1, -1], 'radius':221148}"});
|
||||
expectedResult.add(true);
|
||||
data.add(new Object[]{"{'type':'Point','coordinates':[2, 0]}"});
|
||||
expectedResult.add(false);
|
||||
data.add(new Object[]{"{'type':'Polygon','coordinates':[[[2, 2],[2, 1],[1, 1],[1, 2],[2, 2]]]}"});
|
||||
expectedResult.add(true);
|
||||
|
||||
String executionPlan = "@config(async = 'true') define stream dataIn (geometry string);"
|
||||
+ "@info(name = 'query1') from dataIn" +
|
||||
" select geo:intersects(geometry, \"{'type':'Polygon','coordinates':[[[0, 0],[0, 1],[1, 1],[1, 0],[0, 0]]]}\") as intersects \n" +
|
||||
" insert into dataOut";
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
|
||||
long end = System.currentTimeMillis();
|
||||
logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f)));
|
||||
executionPlanRuntime.addCallback("query1", new QueryCallback() {
|
||||
@Override
|
||||
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
|
||||
for (Event event : inEvents) {
|
||||
logger.info(event);
|
||||
Boolean isWithin = (Boolean) event.getData(0);
|
||||
Assert.assertEquals(expectedResult.get(eventCount++), isWithin);
|
||||
}
|
||||
}
|
||||
});
|
||||
executionPlanRuntime.start();
|
||||
generateEvents(executionPlanRuntime);
|
||||
Thread.sleep(1000);
|
||||
Assert.assertEquals(expectedResult.size(), eventCount);
|
||||
}
|
||||
}
|
@ -1,73 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.wso2.gpl.siddhi.extensions.geo.function;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.wso2.siddhi.core.ExecutionPlanRuntime;
|
||||
import org.wso2.siddhi.core.event.Event;
|
||||
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
|
||||
import org.wso2.gpl.siddhi.extensions.geo.GeoTestCase;
|
||||
|
||||
public class GeoWithinDistanceTestCase extends GeoTestCase {
|
||||
private static Logger logger = Logger.getLogger(GeoWithinDistanceTestCase.class);
|
||||
|
||||
@Test
|
||||
public void testGeometry() throws Exception {
|
||||
logger.info("TestGeometry");
|
||||
|
||||
data.clear();
|
||||
expectedResult.clear();
|
||||
eventCount = 0;
|
||||
|
||||
data.add(new Object[]{"{'type':'Polygon','coordinates':[[[0.5, 0.5],[0.5, 1.5],[1.5, 1.5],[1.5, 0.5],[0.5, 0.5]]]}"});
|
||||
expectedResult.add(true);
|
||||
data.add(new Object[]{"{'type':'Circle','coordinates':[-1, -1], 'radius':110575}"});
|
||||
expectedResult.add(true);
|
||||
data.add(new Object[]{"{'type':'Point','coordinates':[3, 0]}"});
|
||||
expectedResult.add(false);
|
||||
data.add(new Object[]{"{'type':'Polygon','coordinates':[[[3, 3],[3, 2],[2, 2],[2, 3],[3, 3]]]}"});
|
||||
expectedResult.add(false);
|
||||
|
||||
String executionPlan = "@config(async = 'true') define stream dataIn (geometry string);"
|
||||
+ "@info(name = 'query1') from dataIn" +
|
||||
" select geo:withinDistance(geometry, \"{'type':'Polygon','coordinates':[[[0, 0],[0, 1],[1, 1],[1, 0],[0, 0]]]}\", 110574.61087757687) as intersects \n" +
|
||||
" insert into dataOut";
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
|
||||
long end = System.currentTimeMillis();
|
||||
logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f)));
|
||||
executionPlanRuntime.addCallback("query1", new QueryCallback() {
|
||||
@Override
|
||||
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
|
||||
for (Event event : inEvents) {
|
||||
logger.info(event);
|
||||
Boolean isWithin = (Boolean) event.getData(0);
|
||||
Assert.assertEquals(expectedResult.get(eventCount++), isWithin);
|
||||
}
|
||||
}
|
||||
});
|
||||
executionPlanRuntime.start();
|
||||
generateEvents(executionPlanRuntime);
|
||||
Thread.sleep(1000);
|
||||
Assert.assertEquals(expectedResult.size(), eventCount);
|
||||
}
|
||||
}
|
@ -1,268 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.wso2.gpl.siddhi.extensions.geo.function;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.wso2.siddhi.core.ExecutionPlanRuntime;
|
||||
import org.wso2.siddhi.core.event.Event;
|
||||
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
|
||||
import org.wso2.siddhi.core.stream.input.InputHandler;
|
||||
import org.wso2.siddhi.core.util.EventPrinter;
|
||||
import org.wso2.gpl.siddhi.extensions.geo.GeoTestCase;
|
||||
|
||||
public class GeoWithinTestCase extends GeoTestCase {
|
||||
private static Logger logger = Logger.getLogger(GeoWithinTestCase.class);
|
||||
|
||||
private volatile int count;
|
||||
private volatile boolean eventArrived;
|
||||
|
||||
@Test
|
||||
public void testPoint() throws Exception {
|
||||
logger.info("TestPoint");
|
||||
|
||||
data.clear();
|
||||
expectedResult.clear();
|
||||
eventCount = 0;
|
||||
data.add(new Object[]{"km-4354", 0.5d, 0.5d});
|
||||
expectedResult.add(true);
|
||||
data.add(new Object[]{"km-4354", 2d, 2d});
|
||||
expectedResult.add(false);
|
||||
data.add(new Object[]{"km-4354", -0.5d, 1.5d});
|
||||
expectedResult.add(false);
|
||||
data.add(new Object[]{"km-4354", 0.5d, 1.25d});
|
||||
expectedResult.add(true);
|
||||
data.add(new Object[]{"km-4354", 0.75d, 0.5d});
|
||||
expectedResult.add(true);
|
||||
data.add(new Object[]{"km-4354", 3.5d, 0.5d});
|
||||
expectedResult.add(false);
|
||||
|
||||
String executionPlan = "@config(async = 'true') define stream dataIn (id string, longitude double, latitude double);"
|
||||
+ "@info(name = 'query1') from dataIn " +
|
||||
"select geo:within(longitude,latitude,\"{'type':'Polygon','coordinates':[[[0,0],[0,2],[1,2],[1,0],[0,0]]]}\") as notify \n" +
|
||||
" \n" +
|
||||
"insert into dataOut";
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
|
||||
long end = System.currentTimeMillis();
|
||||
logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f)));
|
||||
executionPlanRuntime.addCallback("query1", new QueryCallback() {
|
||||
@Override
|
||||
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
|
||||
for (Event event : inEvents) {
|
||||
logger.info(event);
|
||||
Boolean isWithin = (Boolean) event.getData(0);
|
||||
Assert.assertEquals(expectedResult.get(eventCount++), isWithin);
|
||||
}
|
||||
}
|
||||
});
|
||||
executionPlanRuntime.start();
|
||||
generateEvents(executionPlanRuntime);
|
||||
Thread.sleep(1000);
|
||||
Assert.assertEquals(expectedResult.size(), eventCount);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPoint2() throws Exception {
|
||||
logger.info("TestPoint2");
|
||||
data.clear();
|
||||
expectedResult.clear();
|
||||
eventCount = 0;
|
||||
data.add(new Object[]{"km-4354", 0.75d, 1d, "{'type':'Polygon','coordinates':[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[0.75,0.5],[0.5,0.5]]]}"});
|
||||
expectedResult.add(false);
|
||||
data.add(new Object[]{"km-4354", 1d, 1d, "{'type': 'Circle', 'radius': 110575, 'coordinates':[1.5, 1.5]}"});
|
||||
expectedResult.add(true);
|
||||
data.add(new Object[]{"km-4354", 3d, 3d, "{'type': 'Circle', 'radius': 110575, 'coordinates':[0.5, 1.5]}"});
|
||||
expectedResult.add(false);
|
||||
data.add(new Object[]{"km-4354", 0.6d, 1.0d, "{'type':'MultiPolygon','coordinates':[[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[0.75,0.5],[0.5,0.5]]], [[[1, 1],[1,2],[2,2],[2,1],[1,1]]]]}"});
|
||||
expectedResult.add(true);
|
||||
data.add(new Object[]{"km-4354", 1.5d, 1.5d, "{'type':'MultiPolygon','coordinates':[[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[0.75,0.5],[0.5,0.5]]], [[[1, 1],[1,2],[2,2],[2,1],[1,1]]]]}"});
|
||||
expectedResult.add(true);
|
||||
data.add(new Object[]{"km-4354", -0.5d, 1.5d, "{'type':'MultiPolygon','coordinates':[[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[0.75,0.5],[0.5,0.5]]], [[[1, 1],[1,2],[2,2],[2,1],[1,1]]]]}"});
|
||||
expectedResult.add(false);
|
||||
|
||||
String executionPlan = "@config(async = 'true') define stream dataIn (id string, longitude double, latitude double, geometry string);"
|
||||
+ "@info(name = 'query1') from dataIn " +
|
||||
"select geo:within(longitude, latitude, geometry) as notify \n" +
|
||||
" \n" +
|
||||
"insert into dataOut";
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
|
||||
long end = System.currentTimeMillis();
|
||||
logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f)));
|
||||
executionPlanRuntime.addCallback("query1", new QueryCallback() {
|
||||
@Override
|
||||
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
|
||||
for (Event event : inEvents) {
|
||||
logger.info(event);
|
||||
Boolean isWithin = (Boolean) event.getData(0);
|
||||
Assert.assertEquals(expectedResult.get(eventCount++), isWithin);
|
||||
}
|
||||
}
|
||||
});
|
||||
executionPlanRuntime.start();
|
||||
generateEvents(executionPlanRuntime);
|
||||
Thread.sleep(1000);
|
||||
Assert.assertEquals(expectedResult.size(), eventCount);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGeometry() throws Exception {
|
||||
logger.info("TestGeometry");
|
||||
|
||||
data.clear();
|
||||
expectedResult.clear();
|
||||
eventCount = 0;
|
||||
data.add(new Object[]{"km-4354", "{'type':'Polygon','coordinates':[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[0.75,0.5],[0.5,0.5]]]}"});
|
||||
expectedResult.add(true);
|
||||
data.add(new Object[]{"km-4354", "{'type': 'Circle', 'radius': 110575, 'coordinates':[1.5, 1.5]}"});
|
||||
expectedResult.add(true);
|
||||
data.add(new Object[]{"km-4354", "{'type': 'Circle', 'radius': 110575, 'coordinates':[0.5, 1.5]}"});
|
||||
expectedResult.add(false);
|
||||
data.add(new Object[]{"km-4354", "{'type':'Point', 'coordinates':[-1,1]}"});
|
||||
expectedResult.add(false);
|
||||
data.add(new Object[]{"km-4354", "{'type':'MultiPolygon','coordinates':[[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[0.75,0.5],[0.5,0.5]]], [[[1, 1],[1,2],[2,2],[2,1],[1,1]]]]}"});
|
||||
expectedResult.add(true);
|
||||
data.add(new Object[]{"km-4354", "{'type':'Point', 'coordinates':[1,1]}"});
|
||||
expectedResult.add(true);
|
||||
|
||||
String executionPlan = "@config(async = 'true') define stream dataIn (id string, geometry string);"
|
||||
+ "@info(name = 'query1') from dataIn " +
|
||||
"select geo:within(geometry,\"{'type':'Polygon','coordinates':[[[0,0],[0,4],[3,4],[3,0],[0,0]]]}\") as notify \n" +
|
||||
" \n" +
|
||||
"insert into dataOut";
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
|
||||
long end = System.currentTimeMillis();
|
||||
logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f)));
|
||||
executionPlanRuntime.addCallback("query1", new QueryCallback() {
|
||||
@Override
|
||||
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
|
||||
for (Event event : inEvents) {
|
||||
logger.info(event);
|
||||
Boolean isWithin = (Boolean) event.getData(0);
|
||||
Assert.assertEquals(expectedResult.get(eventCount++), isWithin);
|
||||
}
|
||||
}
|
||||
});
|
||||
executionPlanRuntime.start();
|
||||
generateEvents(executionPlanRuntime);
|
||||
Thread.sleep(1000);
|
||||
Assert.assertEquals(expectedResult.size(), eventCount);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGeometry2() throws Exception {
|
||||
logger.info("TestGeometry");
|
||||
|
||||
data.clear();
|
||||
expectedResult.clear();
|
||||
eventCount = 0;
|
||||
data.add(new Object[]{"km-4354", "{'type':'Point', 'coordinates':[1.0, 1.5]}", "{'type':'Polygon','coordinates':[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[0.75,0.5],[0.5,0.5]]]}"});
|
||||
expectedResult.add(false);
|
||||
data.add(new Object[]{"km-4354", "{'type':'Point', 'coordinates':[1.5, 1.0]}", "{'type': 'Circle', 'radius': 110575, 'coordinates':[1.5, 1.5]}"});
|
||||
expectedResult.add(true);
|
||||
data.add(new Object[]{"km-4354", "{'type': 'Circle', 'radius': 10, 'coordinates':[0.5, 1.5]}", "{'type': 'Circle', 'radius': 110575, 'coordinates':[0.5, 1.5]}"});
|
||||
expectedResult.add(true);
|
||||
data.add(new Object[]{"km-4354", "{'type': 'Circle', 'radius': 20, 'coordinates':[0.5, 1.5]}", "{'type': 'Circle', 'radius': 10, 'coordinates':[0.5, 1.5]}"});
|
||||
expectedResult.add(false);
|
||||
data.add(new Object[]{"km-4354", "{'type':'Point', 'coordinates':[-0.5, 1.0]}", "{'type':'MultiPolygon','coordinates':[[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[0.75,0.5],[0.5,0.5]]], [[[1, 1],[1,2],[2,2],[2,1],[1,1]]]]}"});
|
||||
expectedResult.add(false);
|
||||
data.add(new Object[]{"km-4354", "{'type':'Polygon','coordinates':[[[0.5, 0.5],[0.5, -0.5],[-0.5, -0.5],[-0.5, 0.5], [0.5, 0.5]]]}", "{'type': 'Circle', 'radius': 110575, 'coordinates':[0, 0]}"});
|
||||
expectedResult.add(true);
|
||||
|
||||
String executionPlan = "@config(async = 'true') define stream dataIn (id string, geometry string, otherGeometry string);"
|
||||
+ "@info(name = 'query1') from dataIn " +
|
||||
"select geo:within(geometry,otherGeometry) as notify \n" +
|
||||
" \n" +
|
||||
"insert into dataOut";
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
|
||||
long end = System.currentTimeMillis();
|
||||
logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f)));
|
||||
executionPlanRuntime.addCallback("query1", new QueryCallback() {
|
||||
@Override
|
||||
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
|
||||
for (Event event : inEvents) {
|
||||
logger.info(event);
|
||||
Boolean isWithin = (Boolean) event.getData(0);
|
||||
Assert.assertEquals(expectedResult.get(eventCount++), isWithin);
|
||||
}
|
||||
}
|
||||
});
|
||||
executionPlanRuntime.start();
|
||||
generateEvents(executionPlanRuntime);
|
||||
Thread.sleep(1000);
|
||||
Assert.assertEquals(expectedResult.size(), eventCount);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGeometry3() throws Exception {
|
||||
logger.info("TestGeometryJoin");
|
||||
|
||||
count = 0;
|
||||
eventArrived = false;
|
||||
String executionPlan = "" +
|
||||
"define stream dataIn (id string, latitude double, longitude double); " +
|
||||
"define stream dataToTable (id string, geometry string); " +
|
||||
"" +
|
||||
"define table dataTable (id string, geometry string); " +
|
||||
"" +
|
||||
"from dataToTable " +
|
||||
"insert into dataTable; " +
|
||||
"" +
|
||||
"@info(name = 'query1') " +
|
||||
"from dataIn join dataTable " +
|
||||
"on geo:within(dataIn.latitude, dataIn.longitude, dataTable.geometry) " +
|
||||
"select dataIn.id as dataInID, dataTable.id as dataTableID " +
|
||||
"insert into dataOut; ";
|
||||
|
||||
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
|
||||
|
||||
InputHandler dataIn = executionPlanRuntime.getInputHandler("dataIn");
|
||||
InputHandler dataToTable = executionPlanRuntime.getInputHandler("dataToTable");
|
||||
|
||||
executionPlanRuntime.addCallback("query1", new QueryCallback() {
|
||||
@Override
|
||||
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
|
||||
EventPrinter.print(timeStamp, inEvents, removeEvents);
|
||||
for (Event event : inEvents) {
|
||||
eventArrived = true;
|
||||
count++;
|
||||
}
|
||||
}
|
||||
});
|
||||
executionPlanRuntime.start();
|
||||
Thread.sleep(1000);
|
||||
|
||||
dataToTable.send(new Object[]{"1", "{'type': 'Circle', 'radius': 110575, 'coordinates':[1.5, 1.5]}"});
|
||||
dataToTable.send(new Object[]{"2", "{'type': 'Circle', 'radius': 110575, 'coordinates':[12.5, 1.5]}"});
|
||||
Thread.sleep(1000);
|
||||
dataIn.send(new Object[]{"3", 1.5, 1.0});
|
||||
dataIn.send(new Object[]{"4", 7.5, 1.0});
|
||||
Thread.sleep(1000);
|
||||
executionPlanRuntime.shutdown();
|
||||
Assert.assertEquals(1, count);
|
||||
Assert.assertEquals(true, eventArrived);
|
||||
}
|
||||
}
|
@ -1,76 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.wso2.gpl.siddhi.extensions.geo.stream;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.wso2.siddhi.core.ExecutionPlanRuntime;
|
||||
import org.wso2.siddhi.core.event.Event;
|
||||
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
|
||||
import org.wso2.gpl.siddhi.extensions.geo.GeoTestCase;
|
||||
|
||||
public class GeoCrossesTestCase extends GeoTestCase {
|
||||
private static Logger logger = Logger.getLogger(GeoCrossesTestCase.class);
|
||||
@Test
|
||||
public void testCrosses() throws Exception {
|
||||
logger.info("TestCrosses");
|
||||
|
||||
data.clear();
|
||||
expectedResult.clear();
|
||||
eventCount = 0;
|
||||
|
||||
data.add(new Object[]{"km-4354", -0.5d, 0.5d});
|
||||
data.add(new Object[]{"km-4354", 1.5d, 0.5d});
|
||||
expectedResult.add(true);
|
||||
data.add(new Object[]{"km-4354", 1.5d, 0.25d});
|
||||
data.add(new Object[]{"tc-1729", 0.5d, 0.5d});
|
||||
expectedResult.add(true);
|
||||
data.add(new Object[]{"tc-1729", -0.5d, 0.5d});
|
||||
expectedResult.add(false);
|
||||
data.add(new Object[]{"km-4354", 1.3d, 0.1d});
|
||||
data.add(new Object[]{"km-4354", 3d, 0.25d});
|
||||
expectedResult.add(false);
|
||||
data.add(new Object[]{"km-4354", 4d, 0.25d});
|
||||
data.add(new Object[]{"km-4354", 1d, 0.5d});
|
||||
expectedResult.add(true);
|
||||
|
||||
String executionPlan = "@config(async = 'true') define stream dataIn (id string, longitude double, latitude double);"
|
||||
+ "@info(name = 'query1') from dataIn#geo:crosses(id,longitude,latitude,\"{'type':'Polygon','coordinates':[[[0, 0],[2, 0],[2, 1],[0, 1],[0, 0]]]}\") " +
|
||||
"select crosses \n" +
|
||||
"insert into dataOut";
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
|
||||
long end = System.currentTimeMillis();
|
||||
logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f)));
|
||||
executionPlanRuntime.addCallback("query1", new QueryCallback() {
|
||||
@Override
|
||||
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
|
||||
for (Event event : inEvents) {
|
||||
Boolean isWithin = (Boolean) event.getData(0);
|
||||
Assert.assertEquals(expectedResult.get(eventCount++), isWithin);
|
||||
}
|
||||
}
|
||||
});
|
||||
executionPlanRuntime.start();
|
||||
generateEvents(executionPlanRuntime);
|
||||
Thread.sleep(1000);
|
||||
Assert.assertEquals(expectedResult.size(), eventCount);
|
||||
}
|
||||
}
|
@ -1,91 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.wso2.gpl.siddhi.extensions.geo.stream;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.wso2.siddhi.core.ExecutionPlanRuntime;
|
||||
import org.wso2.siddhi.core.event.Event;
|
||||
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
|
||||
import org.wso2.siddhi.core.util.EventPrinter;
|
||||
import org.wso2.gpl.siddhi.extensions.geo.GeoTestCase;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
||||
public class GeoProximityTestCase extends GeoTestCase {
|
||||
private static Logger logger = Logger.getLogger(GeoProximityTestCase.class);
|
||||
ArrayList<String> expectedResultId;
|
||||
@Test
|
||||
public void testProximity() throws Exception {
|
||||
logger.info("TestProximity");
|
||||
|
||||
data.clear();
|
||||
expectedResultId = new ArrayList<String>();
|
||||
eventCount = 0;
|
||||
|
||||
data.add(new Object[]{"1", 0d, 0d});
|
||||
data.add(new Object[]{"2", 1d, 1d});
|
||||
data.add(new Object[]{"3", 2d, 2d});
|
||||
data.add(new Object[]{"1", 1.5d, 1.5d});
|
||||
expectedResultId.add("2");
|
||||
expectedResult.add(true);
|
||||
expectedResultId.add("3");
|
||||
expectedResult.add(true);
|
||||
data.add(new Object[]{"1", 1.6d, 1.6d});
|
||||
data.add(new Object[]{"2", 5d, 5d});
|
||||
expectedResultId.add("1");
|
||||
expectedResult.add(false);
|
||||
data.add(new Object[]{"1", 2d, 2d});
|
||||
data.add(new Object[]{"1", 5.5d, 5.5d});
|
||||
expectedResultId.add("2");
|
||||
expectedResult.add(true);
|
||||
expectedResultId.add("3");
|
||||
expectedResult.add(false);
|
||||
|
||||
String executionPlan = "@config(async = 'true')" +
|
||||
"define stream dataIn (id string, longitude double, latitude double);"
|
||||
+
|
||||
"@info(name = 'query1') " +
|
||||
"from dataIn#geo:proximity(id,longitude,latitude, 110574.61087757687) " +
|
||||
"select inCloseProximity, proximityWith \n" +
|
||||
"insert into dataOut";
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
|
||||
long end = System.currentTimeMillis();
|
||||
logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f)));
|
||||
executionPlanRuntime.addCallback("query1", new QueryCallback() {
|
||||
@Override
|
||||
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
|
||||
EventPrinter.print(timeStamp, inEvents, removeEvents);
|
||||
for (Event event : inEvents) {
|
||||
Boolean proximity = (Boolean) event.getData(0);
|
||||
Assert.assertEquals(expectedResult.get(eventCount), proximity);
|
||||
String other = (String) event.getData(1);
|
||||
Assert.assertEquals(expectedResultId.get(eventCount), other);
|
||||
eventCount++;
|
||||
}
|
||||
}
|
||||
});
|
||||
executionPlanRuntime.start();
|
||||
generateEvents(executionPlanRuntime);
|
||||
Thread.sleep(1000);
|
||||
Assert.assertEquals(expectedResult.size(), eventCount);
|
||||
}
|
||||
}
|
@ -1,75 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.wso2.gpl.siddhi.extensions.geo.stream;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.wso2.siddhi.core.ExecutionPlanRuntime;
|
||||
import org.wso2.siddhi.core.event.Event;
|
||||
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
|
||||
import org.wso2.gpl.siddhi.extensions.geo.GeoTestCase;
|
||||
|
||||
public class GeoStationaryTestCase extends GeoTestCase {
|
||||
private static Logger logger = Logger.getLogger(GeoStationaryTestCase.class);
|
||||
@Test
|
||||
public void testStationary() throws Exception {
|
||||
logger.info("TestStationary");
|
||||
|
||||
data.clear();
|
||||
expectedResult.clear();
|
||||
eventCount = 0;
|
||||
|
||||
data.add(new Object[]{"km-4354", 0d, 0d});
|
||||
data.add(new Object[]{"km-4354", 1d, 1d});
|
||||
data.add(new Object[]{"km-4354", 1d, 1.5d});
|
||||
expectedResult.add(true);
|
||||
data.add(new Object[]{"km-4354", 1d, 1.75d});
|
||||
data.add(new Object[]{"km-4354", 1d, 2.5d});
|
||||
expectedResult.add(false);
|
||||
data.add(new Object[]{"km-4354", 1d, 2.3d});
|
||||
expectedResult.add(true);
|
||||
data.add(new Object[]{"km-4354", 1d, 2.2d});
|
||||
data.add(new Object[]{"km-4354", 1d, 2.6d});
|
||||
data.add(new Object[]{"km-4354", 1d, 3.6d});
|
||||
expectedResult.add(false);
|
||||
|
||||
String executionPlan = "@config(async = 'true') define stream dataIn (id string, longitude double, latitude double);"
|
||||
+ "@info(name = 'query1') from dataIn#geo:stationary(id,longitude,latitude, 110574.61087757687) " +
|
||||
"select stationary \n" +
|
||||
"insert into dataOut";
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
|
||||
long end = System.currentTimeMillis();
|
||||
logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f)));
|
||||
executionPlanRuntime.addCallback("query1", new QueryCallback() {
|
||||
@Override
|
||||
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
|
||||
for (Event event : inEvents) {
|
||||
Boolean isWithin = (Boolean) event.getData(0);
|
||||
Assert.assertEquals(expectedResult.get(eventCount++), isWithin);
|
||||
}
|
||||
}
|
||||
});
|
||||
executionPlanRuntime.start();
|
||||
generateEvents(executionPlanRuntime);
|
||||
Thread.sleep(1000);
|
||||
Assert.assertEquals(expectedResult.size(), eventCount);
|
||||
}
|
||||
}
|
@ -1,161 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.wso2.gpl.siddhi.extensions.geo.stream.function;
|
||||
|
||||
import junit.framework.Assert;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.wso2.siddhi.core.ExecutionPlanRuntime;
|
||||
import org.wso2.siddhi.core.SiddhiManager;
|
||||
import org.wso2.siddhi.core.event.Event;
|
||||
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
|
||||
import org.wso2.siddhi.core.stream.input.InputHandler;
|
||||
import org.wso2.siddhi.core.util.EventPrinter;
|
||||
|
||||
|
||||
public class AverageLocationTestCase {
|
||||
static final Logger log = Logger.getLogger(AverageLocationTestCase.class);
|
||||
private volatile int count;
|
||||
private volatile boolean eventArrived;
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
count = 0;
|
||||
eventArrived = false;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAverageLocationWithSameWeightsTestCase() throws InterruptedException {
|
||||
log.info("testAverageLocation with same weight TestCase");
|
||||
SiddhiManager siddhiManager = new SiddhiManager();
|
||||
|
||||
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(
|
||||
"@config(async = 'true') " +
|
||||
"define stream cleanedStream (locationRecorder string, latitude double, longitude double, " +
|
||||
"beaconProximity string, uuid string, weight double, timestamp long); " +
|
||||
"@info(name = 'query1') " +
|
||||
"from cleanedStream#geo:locationApproximate(locationRecorder, latitude, longitude, " +
|
||||
"beaconProximity, uuid, weight, timestamp) " +
|
||||
"select * " +
|
||||
"insert into dataOut;");
|
||||
|
||||
executionPlanRuntime.addCallback("query1", new QueryCallback() {
|
||||
@Override
|
||||
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
|
||||
EventPrinter.print(timeStamp, inEvents, removeEvents);
|
||||
for (Event event : inEvents) {
|
||||
count++;
|
||||
if (count == 1) {
|
||||
Assert.assertEquals(6.876657000000001, event.getData(7));
|
||||
Assert.assertEquals(79.897648, event.getData(8));
|
||||
eventArrived = true;
|
||||
} else if (count == 2) {
|
||||
Assert.assertEquals(6.797727042508542, event.getData(7));
|
||||
Assert.assertEquals(80.13557409252783, event.getData(8));
|
||||
eventArrived = true;
|
||||
} else if (count == 3) {
|
||||
Assert.assertEquals(6.853572272662002, event.getData(7));
|
||||
Assert.assertEquals(true, 80.34826512892124 == (Double) event.getData(8) || 80.34826512892126 == (Double) event.getData(8));
|
||||
eventArrived = true;
|
||||
} else if (count == 4) {
|
||||
Assert.assertEquals(true, 8.026326160526303 == (Double) event.getData(7) || 8.0263261605263 == (Double) event.getData(7));
|
||||
Assert.assertEquals(80.42794459517538, event.getData(8));
|
||||
eventArrived = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
InputHandler inputHandler = executionPlanRuntime.getInputHandler("cleanedStream");
|
||||
executionPlanRuntime.start();
|
||||
//nugegods, ratnapura, nuwara eliya, vavniya --> thbuttegama
|
||||
inputHandler.send(new Object[]{"person1", 6.876657, 79.897648, "ENTER", "uuid1", 20.0d, 1452583935l});
|
||||
Thread.sleep(500);
|
||||
inputHandler.send(new Object[]{"person1", 6.718681, 80.373422, "ENTER", "uuid2", 20.0d, 1452583937l});
|
||||
Thread.sleep(500);
|
||||
inputHandler.send(new Object[]{"person1", 6.964981, 80.773796, "ENTER", "uuid3", 20.0d, 1452583939l});
|
||||
Thread.sleep(500);
|
||||
inputHandler.send(new Object[]{"person1", 8.729925, 80.475966, "ENTER", "uuid4", 100.0d, 1452583941l});
|
||||
Thread.sleep(100);
|
||||
|
||||
Assert.assertEquals(4, count);
|
||||
Assert.assertTrue(eventArrived);
|
||||
executionPlanRuntime.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAverageLocationWithDifferentWeightsTestCase2() throws InterruptedException {
|
||||
log.info("testAverageLocation with different weights TestCase");
|
||||
SiddhiManager siddhiManager = new SiddhiManager();
|
||||
|
||||
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(
|
||||
"@config(async = 'true') " +
|
||||
"define stream cleanedStream (locationRecorder string, latitude double, longitude double, " +
|
||||
"beaconProximity string, uuid string, weight double, timestamp long); " +
|
||||
"@info(name = 'query1') " +
|
||||
"from cleanedStream#geo:locationApproximate(locationRecorder, latitude, longitude, " +
|
||||
"beaconProximity, uuid, weight, timestamp) " +
|
||||
"select * " +
|
||||
"insert into dataOut;");
|
||||
|
||||
executionPlanRuntime.addCallback("query1", new QueryCallback() {
|
||||
@Override
|
||||
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
|
||||
EventPrinter.print(timeStamp, inEvents, removeEvents);
|
||||
for (Event event : inEvents) {
|
||||
count++;
|
||||
if (count == 1) {
|
||||
Assert.assertEquals(6.876657000000001, event.getData(7));
|
||||
Assert.assertEquals(79.897648, event.getData(8));
|
||||
eventArrived = true;
|
||||
} else if (count == 2) {
|
||||
Assert.assertEquals(6.797727042508542, event.getData(7));
|
||||
Assert.assertEquals(80.13557409252783, event.getData(8));
|
||||
eventArrived = true;
|
||||
} else if (count == 3) {
|
||||
Assert.assertEquals(6.853572272662002, event.getData(7));
|
||||
Assert.assertEquals(true, 80.34826512892124 == (Double) event.getData(8) || 80.34826512892126 == (Double) event.getData(8));
|
||||
eventArrived = true;
|
||||
} else if (count == 4) {
|
||||
Assert.assertEquals(7.322639705655454, event.getData(7));
|
||||
Assert.assertEquals(80.38008364787895, event.getData(8));
|
||||
eventArrived = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
InputHandler inputHandler = executionPlanRuntime.getInputHandler("cleanedStream");
|
||||
executionPlanRuntime.start();
|
||||
//nugegods, ratnapura, nuwara eliya, vavniya --> kegalle
|
||||
inputHandler.send(new Object[]{"person1", 6.876657, 79.897648, "ENTER", "uuid1", 20.0d, 1452583935l});
|
||||
Thread.sleep(500);
|
||||
inputHandler.send(new Object[]{"person1", 6.718681, 80.373422, "ENTER", "uuid2", 20.0d, 1452583937l});
|
||||
Thread.sleep(500);
|
||||
inputHandler.send(new Object[]{"person1", 6.964981, 80.773796, "ENTER", "uuid3", 20.0d, 1452583939l});
|
||||
Thread.sleep(500);
|
||||
inputHandler.send(new Object[]{"person1", 8.729925, 80.475966, "ENTER", "uuid4", 20.0d, 1452583941l});
|
||||
Thread.sleep(100);
|
||||
|
||||
Assert.assertEquals(4, count);
|
||||
Assert.assertTrue(eventArrived);
|
||||
executionPlanRuntime.shutdown();
|
||||
}
|
||||
}
|
@ -1,183 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.wso2.gpl.siddhi.extensions.geo.stream.function;
|
||||
|
||||
import com.vividsolutions.jts.geom.Coordinate;
|
||||
import com.vividsolutions.jts.geom.Geometry;
|
||||
import com.vividsolutions.jts.geom.prep.PreparedGeometry;
|
||||
import com.vividsolutions.jts.operation.distance.DistanceOp;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.wso2.siddhi.core.ExecutionPlanRuntime;
|
||||
import org.wso2.siddhi.core.event.Event;
|
||||
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
|
||||
import org.wso2.siddhi.core.util.EventPrinter;
|
||||
import org.wso2.gpl.siddhi.extensions.geo.GeoTestCase;
|
||||
import org.wso2.gpl.siddhi.extensions.geo.internal.util.GeometryUtils;
|
||||
|
||||
public class GeoClosetPointTestCase extends GeoTestCase {
|
||||
private static Logger logger = Logger.getLogger(GeoClosetPointTestCase.class);
|
||||
|
||||
|
||||
// @Test
|
||||
// public void testClosestBasic() throws Exception {
|
||||
// logger.info("testClosest");
|
||||
//
|
||||
// PreparedGeometry geometry = GeometryUtils.preparedGeometryFromJSON(" {'type':'Polygon','coordinates':[[[0,0],[0,2],[1,2],[1,0],[0,0]]]}");
|
||||
// Geometry point = GeometryUtils.createPoint(-1, -1);
|
||||
//
|
||||
// DistanceOp distOp = new DistanceOp(geometry.getGeometry(), point);
|
||||
//
|
||||
// Coordinate[] closestPt = distOp.nearestPoints();
|
||||
// System.out.println(closestPt[0]);
|
||||
//
|
||||
// }
|
||||
|
||||
@Test
|
||||
public void testClosestPoints() throws Exception {
|
||||
logger.info("testClosestPoints");
|
||||
|
||||
data.clear();
|
||||
expectedResult.clear();
|
||||
eventCount = 0;
|
||||
data.add(new Object[]{"km-4354", 0.5d, 0.5d});
|
||||
expectedResult.add(true);
|
||||
data.add(new Object[]{"km-4354", 2d, 2d});
|
||||
expectedResult.add(false);
|
||||
data.add(new Object[]{"km-4354", -0.5d, 1.5d});
|
||||
expectedResult.add(false);
|
||||
data.add(new Object[]{"km-4354", 0.5d, 1.25d});
|
||||
expectedResult.add(true);
|
||||
data.add(new Object[]{"km-4354", 0.0d, 0.0d});
|
||||
expectedResult.add(true);
|
||||
data.add(new Object[]{"km-4354", -1d, -1d});
|
||||
expectedResult.add(false);
|
||||
|
||||
String executionPlan = "@config(async = 'true') define stream dataIn (id string, longitude double, latitude double);"
|
||||
+ "@info(name = 'query1') from dataIn#geo:closestPoints(longitude,latitude,\"{'type':'Polygon','coordinates':[[[0,0],[0,2],[1,2],[1,0],[0,0]]]}\") " +
|
||||
"select closestPointOf1From2Latitude, closestPointOf1From2Longitude, closestPointOf2From1Latitude, closestPointOf2From1Longitude " +
|
||||
" \n" +
|
||||
"insert into dataOut";
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
|
||||
final long end = System.currentTimeMillis();
|
||||
logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f)));
|
||||
executionPlanRuntime.addCallback("query1", new QueryCallback() {
|
||||
@Override
|
||||
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
|
||||
EventPrinter.print(timeStamp, inEvents, removeEvents);
|
||||
for (Event event : inEvents) {
|
||||
eventCount++;
|
||||
switch (eventCount){
|
||||
case 1:
|
||||
Assert.assertArrayEquals(new Object[]{0.5,0.5,0.5,0.5},event.getData());
|
||||
break;
|
||||
case 2:
|
||||
Assert.assertArrayEquals(new Object[]{2.0, 2.0, 1.0, 2.0},event.getData());
|
||||
break;
|
||||
case 3:
|
||||
Assert.assertArrayEquals(new Object[]{-0.5, 1.5, 0.0, 1.5},event.getData());
|
||||
break;
|
||||
case 4:
|
||||
Assert.assertArrayEquals(new Object[]{0.5, 1.25, 0.5, 1.25},event.getData());
|
||||
break;
|
||||
case 5:
|
||||
Assert.assertArrayEquals(new Object[]{0.0, 0.0, 0.0, 0.0},event.getData());
|
||||
break;
|
||||
case 6:
|
||||
Assert.assertArrayEquals(new Object[]{-1.0, -1.0, 0.0, 0.0},event.getData());
|
||||
break;
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
executionPlanRuntime.start();
|
||||
generateEvents(executionPlanRuntime);
|
||||
Thread.sleep(1000);
|
||||
Assert.assertEquals(expectedResult.size(), eventCount);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testClosestPointsGeometry() throws Exception {
|
||||
logger.info("testClosestPointsGeometry");
|
||||
|
||||
data.clear();
|
||||
expectedResult.clear();
|
||||
eventCount = 0;
|
||||
data.add(new Object[]{"km-4354", "{'type':'Polygon','coordinates':[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[0.75,0.5],[0.5,0.5]]]}"});
|
||||
expectedResult.add(true);
|
||||
data.add(new Object[]{"km-4354", "{'type': 'Circle', 'radius': 110575, 'coordinates':[1.5, 1.5]}"});
|
||||
expectedResult.add(true);
|
||||
data.add(new Object[]{"km-4354", "{'type': 'Circle', 'radius': 110575, 'coordinates':[10, 1.5]}"});
|
||||
expectedResult.add(false);
|
||||
data.add(new Object[]{"km-4354", "{'type':'Point', 'coordinates':[-1,1]}"});
|
||||
expectedResult.add(false);
|
||||
data.add(new Object[]{"km-4354", "{'type':'MultiPolygon','coordinates':[[[[0.5, 0.5],[0.5,1.5],[0.75,1.5],[5,5],[0.5,0.5]]], [[[1, 1],[1,2],[2,2],[2,1],[1,1]]]]}"});
|
||||
expectedResult.add(true);
|
||||
data.add(new Object[]{"km-4354", "{'type':'Point', 'coordinates':[10,10]}"});
|
||||
expectedResult.add(true);
|
||||
|
||||
String executionPlan = "@config(async = 'true') define stream dataIn (id string, geometry string);"
|
||||
+ "@info(name = 'query1') from dataIn#geo:closestPoints(geometry,\"{'type':'Polygon','coordinates':[[[0,0],[0,4],[3,4],[3,0],[0,0]]]}\") " +
|
||||
"select closestPointOf1From2Latitude, closestPointOf1From2Longitude, closestPointOf2From1Latitude, closestPointOf2From1Longitude \n" +
|
||||
"insert into dataOut";
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
|
||||
long end = System.currentTimeMillis();
|
||||
logger.info(String.format("Time to create ExecutionPlanRunTime: [%f sec]", ((end - start) / 1000f)));
|
||||
executionPlanRuntime.addCallback("query1", new QueryCallback() {
|
||||
@Override
|
||||
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
|
||||
EventPrinter.print(timeStamp, inEvents, removeEvents);
|
||||
for (Event event : inEvents) {
|
||||
eventCount++;
|
||||
switch (eventCount){
|
||||
case 1:
|
||||
Assert.assertArrayEquals(new Object[]{0.5, 0.5, 0.5, 0.5},event.getData());
|
||||
break;
|
||||
case 2:
|
||||
Assert.assertArrayEquals(new Object[]{2.5000035190937595, 1.5, 2.5000035190937595, 1.5},event.getData());
|
||||
break;
|
||||
case 3:
|
||||
Assert.assertArrayEquals(new Object[]{8.99999648090624, 1.5000000000000007, 3.0, 1.5000000000000009},event.getData());
|
||||
break;
|
||||
case 4:
|
||||
Assert.assertArrayEquals(new Object[]{-1.0, 1.0, 0.0, 1.0},event.getData());
|
||||
break;
|
||||
case 5:
|
||||
Assert.assertArrayEquals(new Object[]{0.5, 0.5, 0.5, 0.5},event.getData());
|
||||
break;
|
||||
case 6:
|
||||
Assert.assertArrayEquals(new Object[]{10.0, 10.0, 3.0, 4.0},event.getData());
|
||||
break;
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
executionPlanRuntime.start();
|
||||
generateEvents(executionPlanRuntime);
|
||||
Thread.sleep(1000);
|
||||
Assert.assertEquals(expectedResult.size(), eventCount);
|
||||
}
|
||||
}
|
@ -1,35 +0,0 @@
|
||||
#
|
||||
# Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
#
|
||||
# WSO2 Inc. licenses this file to you under the Apache License,
|
||||
# Version 2.0 (the "License"); you may not use this file except
|
||||
# in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
# For the general syntax of property based configuration files see the
|
||||
# documenation of org.apache.log4j.PropertyConfigurator.
|
||||
|
||||
# The root category uses the appender called A1. Since no priority is
|
||||
# specified, the root category assumes the default priority for root
|
||||
# which is DEBUG in log4j. The root category is the only category that
|
||||
# has a default priority. All other categories need not be assigned a
|
||||
# priority in which case they inherit their priority from the
|
||||
# hierarchy.
|
||||
|
||||
#log4j.rootLogger=DEBUG, stdout
|
||||
log4j.rootLogger=INFO, stdout
|
||||
|
||||
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
|
||||
#log4j.appender.stdout.layout.ConversionPattern=%m%n
|
||||
log4j.appender.stdout.layout.ConversionPattern=[%t] %-5p %c %x - %m%n
|
Loading…
Reference in new issue