diff --git a/components/extensions/siddhi-extensions/org.wso2.extension.siddhi.execution.json/pom.xml b/components/extensions/siddhi-extensions/org.wso2.extension.siddhi.execution.json/pom.xml
new file mode 100644
index 0000000000..fea9346764
--- /dev/null
+++ b/components/extensions/siddhi-extensions/org.wso2.extension.siddhi.execution.json/pom.xml
@@ -0,0 +1,127 @@
+
+
+
+
+
+
+ org.wso2.carbon.devicemgt-plugins
+ siddhi-extensions
+ 4.1.21-SNAPSHOT
+ ../pom.xml
+
+
+ 4.0.0
+ org.wso2.extension.siddhi.execution.json
+ bundle
+ WSO2 Siddhi Execution Extension - Json
+ http://wso2.org
+
+
+
+ org.wso2.siddhi
+ siddhi-core
+
+
+ org.wso2.siddhi
+ siddhi-query-api
+
+
+ log4j
+ log4j
+
+
+ org.json.wso2
+ json
+
+
+ org.testng
+ testng
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ ${wso2.maven.compiler.target}
+
+
+
+ org.apache.felix
+ maven-bundle-plugin
+ true
+
+
+ ${project.artifactId}
+ ${project.artifactId}
+
+ org.wso2.extension.siddhi.execution.json,
+ org.wso2.extension.siddhi.execution.json.*
+
+
+ org.json;version="${orbit.version.json.range}",
+ org.wso2.siddhi.core.*,
+ org.wso2.siddhi.query.api.*,
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+ file:src/test/resources/log4j.properties
+
+
+ src/test/resources/testng.xml
+
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+
+ ${basedir}/target/coverage-reports/jacoco-unit.exec
+
+
+
+ jacoco-initialize
+
+ prepare-agent
+
+
+
+ jacoco-site
+ test
+
+ report
+
+
+ ${basedir}/target/coverage-reports/jacoco-unit.exec
+ ${basedir}/target/coverage-reports/site
+
+
+
+
+
+
+
diff --git a/components/extensions/siddhi-extensions/org.wso2.extension.siddhi.execution.json/src/main/java/org/wso2/extension/siddhi/execution/json/GetArrayFunctionExtension.java b/components/extensions/siddhi-extensions/org.wso2.extension.siddhi.execution.json/src/main/java/org/wso2/extension/siddhi/execution/json/GetArrayFunctionExtension.java
new file mode 100644
index 0000000000..08712416eb
--- /dev/null
+++ b/components/extensions/siddhi-extensions/org.wso2.extension.siddhi.execution.json/src/main/java/org/wso2/extension/siddhi/execution/json/GetArrayFunctionExtension.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.wso2.extension.siddhi.execution.json;
+
+import org.json.JSONArray;
+import org.wso2.siddhi.core.config.ExecutionPlanContext;
+import org.wso2.siddhi.core.executor.ExpressionExecutor;
+import org.wso2.siddhi.core.executor.function.FunctionExecutor;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
+
+/**
+ * getArray(elements..)
+ * Returns json array of elements as a string
+ * Accept Type(s): (STRING|INT|DOUBLE|FLOAT|OBJECT ..)
+ * Return Type(s): (STRING)
+ */
+public class GetArrayFunctionExtension extends FunctionExecutor {
+
+ private Attribute.Type returnType = Attribute.Type.STRING;
+
+ @Override
+ protected void init(ExpressionExecutor[] attributeExpressionExecutors,
+ ExecutionPlanContext executionPlanContext) {
+ if (attributeExpressionExecutors.length <= 0) {
+ throw new ExecutionPlanValidationException(
+ "Invalid no of arguments passed to json:getArray() function," + " required one or more, but found "
+ + attributeExpressionExecutors.length);
+ }
+ Attribute.Type inputType = attributeExpressionExecutors[0].getReturnType();
+ for (int i = 1; i < attributeExpressionExecutors.length; i++) {
+ if (attributeExpressionExecutors[0].getReturnType() != inputType) {
+ throw new ExecutionPlanValidationException(
+ "Parameter types are inconsistent. All parameters should be same");
+ }
+ }
+ }
+
+ @Override
+ protected Object execute(Object[] data) {
+
+ JSONArray jsonArray = new JSONArray();
+ for (Object obj : data) {
+ jsonArray.put(obj);
+ }
+ return jsonArray.toString();
+ }
+
+ @Override
+ protected Object execute(Object data) {
+ return execute(new Object[]{data});
+ }
+
+ @Override
+ public void start() {
+ //Nothing to start
+ }
+
+ @Override
+ public void stop() {
+ //Nothing to stop
+ }
+
+ @Override
+ public Attribute.Type getReturnType() {
+ return returnType;
+ }
+
+ @Override
+ public Object[] currentState() {
+ return null; //No need to maintain a state.
+ }
+
+ @Override
+ public void restoreState(Object[] state) {
+ //Since there's no need to maintain a state, nothing needs to be done here.
+ }
+}
+
+
diff --git a/components/extensions/siddhi-extensions/org.wso2.extension.siddhi.execution.json/src/main/java/org/wso2/extension/siddhi/execution/json/GetPropertyFunctionExtension.java b/components/extensions/siddhi-extensions/org.wso2.extension.siddhi.execution.json/src/main/java/org/wso2/extension/siddhi/execution/json/GetPropertyFunctionExtension.java
new file mode 100644
index 0000000000..6bb8ddd721
--- /dev/null
+++ b/components/extensions/siddhi-extensions/org.wso2.extension.siddhi.execution.json/src/main/java/org/wso2/extension/siddhi/execution/json/GetPropertyFunctionExtension.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.wso2.extension.siddhi.execution.json;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.wso2.siddhi.core.config.ExecutionPlanContext;
+import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
+import org.wso2.siddhi.core.executor.ExpressionExecutor;
+import org.wso2.siddhi.core.executor.function.FunctionExecutor;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
+
+/**
+ * getProperty(json , propertyName)
+ * Returns the vale of the property from the given json json
+ * Accept Type(s): (STRING, STRING)
+ * Return Type(s): (STRING|INT|DOUBLE|FLOAT|OBJECT)
+ */
+public class GetPropertyFunctionExtension extends FunctionExecutor {
+
+ private Attribute.Type returnType = Attribute.Type.STRING;
+
+ @Override
+ protected void init(ExpressionExecutor[] attributeExpressionExecutors,
+ ExecutionPlanContext executionPlanContext) {
+ if (attributeExpressionExecutors.length != 2) {
+ throw new ExecutionPlanValidationException(
+ "Invalid no of arguments passed to json:getProperty() function," + " required 2, but found "
+ + attributeExpressionExecutors.length);
+ }
+ if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
+ throw new ExecutionPlanValidationException(
+ "Invalid parameter type found for the first argument of json:getProperty() function, required "
+ + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[0].getReturnType()
+ .toString());
+ }
+ if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
+ throw new ExecutionPlanValidationException(
+ "Invalid parameter type found for the second argument of json:getProperty() function, required "
+ + Attribute.Type.STRING + ", but found "
+ + attributeExpressionExecutors[1].getReturnType().toString());
+ }
+ }
+
+ @Override
+ protected Object execute(Object[] data) {
+ if (data[0] == null) {
+ throw new ExecutionPlanRuntimeException("Invalid input given to json:getProperty() function." +
+ " First argument cannot be null");
+ }
+ if (data[1] == null) {
+ throw new ExecutionPlanRuntimeException("Invalid input given to json:getProperty() function. " +
+ "Second argument cannot be null");
+ }
+ String jsonString = (String) data[0];
+ String property = (String) data[1];
+ Object jsonObject;
+ try {
+ jsonObject = new JSONObject(jsonString).get(property);
+ return jsonObject == null ? null : jsonObject.toString();
+ } catch (JSONException e) {
+ throw new ExecutionPlanRuntimeException("Cannot parse JSON String in json:getPeroperty() function. " + e);
+ }
+ }
+
+ @Override
+ protected Object execute(Object data) {
+ return null; //Since the getProperty function takes in 2 parameters, this method does not get called.
+ // Hence,not implemented.
+ }
+
+ @Override
+ public void start() {
+ //Nothing to start
+ }
+
+ @Override
+ public void stop() {
+ //Nothing to stop
+ }
+
+ @Override
+ public Attribute.Type getReturnType() {
+ return returnType;
+ }
+
+ @Override
+ public Object[] currentState() {
+ return null; //No need to maintain a state.
+ }
+
+ @Override
+ public void restoreState(Object[] state) {
+ //Since there's no need to maintain a state, nothing needs to be done here.
+ }
+}
+
+
diff --git a/components/extensions/siddhi-extensions/org.wso2.extension.siddhi.execution.json/src/main/resources/json.siddhiext b/components/extensions/siddhi-extensions/org.wso2.extension.siddhi.execution.json/src/main/resources/json.siddhiext
new file mode 100644
index 0000000000..fa4336f4ac
--- /dev/null
+++ b/components/extensions/siddhi-extensions/org.wso2.extension.siddhi.execution.json/src/main/resources/json.siddhiext
@@ -0,0 +1,20 @@
+#
+# Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+#
+# WSO2 Inc. licenses this file to you under the Apache License,
+# Version 2.0 (the "License"); you may not use this file except
+# in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+getProperty=org.wso2.extension.siddhi.execution.json.GetPropertyFunctionExtension
+getArray=org.wso2.extension.siddhi.execution.json.GetArrayFunctionExtension
\ No newline at end of file
diff --git a/components/extensions/siddhi-extensions/org.wso2.extension.siddhi.execution.json/src/test/java/org/wso2/extension/siddhi/execution/json/ExtensionTestCase.java b/components/extensions/siddhi-extensions/org.wso2.extension.siddhi.execution.json/src/test/java/org/wso2/extension/siddhi/execution/json/ExtensionTestCase.java
new file mode 100644
index 0000000000..761336a2da
--- /dev/null
+++ b/components/extensions/siddhi-extensions/org.wso2.extension.siddhi.execution.json/src/test/java/org/wso2/extension/siddhi/execution/json/ExtensionTestCase.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.wso2.extension.siddhi.execution.json;
+
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.query.output.callback.QueryCallback;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.core.util.EventPrinter;
+import org.wso2.extension.siddhi.execution.json.test.util.SiddhiTestHelper;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ExtensionTestCase {
+ static final Logger log = Logger.getLogger(ExtensionTestCase.class);
+ private AtomicInteger count = new AtomicInteger(0);
+ private volatile boolean eventArrived;
+
+ @BeforeClass
+ public void init() {
+ count.set(0);
+ eventArrived = false;
+ }
+
+ @Test
+ public void testGetPropertyFunctionExtension() throws InterruptedException {
+ log.info("GetPropertyFunctionExtension TestCase");
+ SiddhiManager siddhiManager = new SiddhiManager();
+
+ String inStreamDefinition = "define stream inputStream (payload string, id string, volume long);";
+ String query = ("@info(name = 'query1') from inputStream select id, json:getProperty(payload, 'latitude') "
+ + "as latitude insert into outputStream;");
+ ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(inStreamDefinition + query);
+
+ executionPlanRuntime.addCallback("query1", new QueryCallback() {
+ @Override
+ public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+ EventPrinter.print(timeStamp, inEvents, removeEvents);
+ for (Event event : inEvents) {
+ count.incrementAndGet();
+ if (count.get() == 1) {
+ Assert.assertEquals("1.5", event.getData(1));
+ eventArrived = true;
+ }
+ if (count.get() == 2) {
+ Assert.assertEquals("67.5", event.getData(1));
+ eventArrived = true;
+ }
+ if (count.get() == 3) {
+ Assert.assertEquals("7.5", event.getData(1));
+ eventArrived = true;
+ }
+ }
+ }
+ });
+
+ InputHandler inputHandler = executionPlanRuntime.getInputHandler("inputStream");
+ executionPlanRuntime.start();
+ inputHandler.send(new Object[]{"{'latitude' : 1.5, 'longitude' : 78.5}","IBM",100l});
+ inputHandler.send(new Object[]{"{'latitude' : 67.5, 'longitude' : 34.9}","WSO2", 200l});
+ inputHandler.send(new Object[]{"{'latitude' : 7.5, 'longitude' : 44.9}", "XYZ", 200l});
+ SiddhiTestHelper.waitForEvents(100, 3, count, 60000);
+ Assert.assertEquals(3, count.get());
+ Assert.assertTrue(eventArrived);
+ executionPlanRuntime.shutdown();
+ }
+
+ @Test(dependsOnMethods = {"testGetPropertyFunctionExtension"})
+ public void testGetArrayFunctionExtension() throws InterruptedException {
+ count.set(0);
+ eventArrived = false;
+ log.info("GetArrayFunctionExtension TestCase");
+ SiddhiManager siddhiManager = new SiddhiManager();
+
+ String inStreamDefinition = "define stream inputStream (arg1 string, arg2 string);";
+ String query = ("@info(name = 'query1') from inputStream select json:getArray(arg1, arg2) "
+ + "as array insert into outputStream;");
+
+ ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(inStreamDefinition + query);
+
+ executionPlanRuntime.addCallback("query1", new QueryCallback() {
+ @Override
+ public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+ EventPrinter.print(timeStamp, inEvents, removeEvents);
+ for (Event event : inEvents) {
+ count.incrementAndGet();
+ if (count.get() == 1) {
+ eventArrived = true;
+ }
+ }
+ }
+ });
+
+ InputHandler inputHandler = executionPlanRuntime.getInputHandler("inputStream");
+ executionPlanRuntime.start();
+ inputHandler.send(new Object[]{"Arg1","Arg2"});
+ SiddhiTestHelper.waitForEvents(100, 1, count, 60000);
+ Assert.assertEquals(1, count.get());
+ Assert.assertTrue(eventArrived);
+ executionPlanRuntime.shutdown();
+ }
+}
diff --git a/components/extensions/siddhi-extensions/org.wso2.extension.siddhi.execution.json/src/test/java/org/wso2/extension/siddhi/execution/json/test/util/SiddhiTestHelper.java b/components/extensions/siddhi-extensions/org.wso2.extension.siddhi.execution.json/src/test/java/org/wso2/extension/siddhi/execution/json/test/util/SiddhiTestHelper.java
new file mode 100644
index 0000000000..9cfd6c307d
--- /dev/null
+++ b/components/extensions/siddhi-extensions/org.wso2.extension.siddhi.execution.json/src/test/java/org/wso2/extension/siddhi/execution/json/test/util/SiddhiTestHelper.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * WSO2 Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.wso2.extension.siddhi.execution.json.test.util;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class SiddhiTestHelper {
+ public static void waitForEvents(long sleepTime, int expectedCount, AtomicInteger actualCount, long timeout) throws InterruptedException {
+ long currentWaitTime = 0;
+ long startTime = System.currentTimeMillis();
+ while ((actualCount.get() < expectedCount) && (currentWaitTime <= timeout)) {
+ Thread.sleep(sleepTime);
+ currentWaitTime = System.currentTimeMillis() - startTime;
+ }
+ }
+}
diff --git a/components/extensions/siddhi-extensions/org.wso2.extension.siddhi.execution.json/src/test/resources/log4j.properties b/components/extensions/siddhi-extensions/org.wso2.extension.siddhi.execution.json/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..96c79e9449
--- /dev/null
+++ b/components/extensions/siddhi-extensions/org.wso2.extension.siddhi.execution.json/src/test/resources/log4j.properties
@@ -0,0 +1,36 @@
+#
+# Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+#
+# WSO2 Inc. licenses this file to you under the Apache License,
+# Version 2.0 (the "License"); you may not use this file except
+# in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+# For the general syntax of property based configuration files see the
+# documenation of org.apache.log4j.PropertyConfigurator.
+
+# The root category uses the appender called A1. Since no priority is
+# specified, the root category assumes the default priority for root
+# which is DEBUG in log4j. The root category is the only category that
+# has a default priority. All other categories need not be assigned a
+# priority in which case they inherit their priority from the
+# hierarchy.
+
+#log4j.rootLogger=DEBUG, stdout
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%m%n
+#log4j.appender.stdout.layout.ConversionPattern=[%t] %-5p %c %x - %m%n
diff --git a/components/extensions/siddhi-extensions/org.wso2.extension.siddhi.execution.json/src/test/resources/testng.xml b/components/extensions/siddhi-extensions/org.wso2.extension.siddhi.execution.json/src/test/resources/testng.xml
new file mode 100644
index 0000000000..80918d2539
--- /dev/null
+++ b/components/extensions/siddhi-extensions/org.wso2.extension.siddhi.execution.json/src/test/resources/testng.xml
@@ -0,0 +1,30 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+