Add siddhi extension to Get json array form given list of arguments

dev
charitha 7 years ago
parent 8ec3dd5f32
commit 1c90ad80ba

@ -48,8 +48,8 @@
<artifactId>json</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

@ -0,0 +1,96 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.extension.siddhi.execution.json;
import org.json.JSONArray;
import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.function.FunctionExecutor;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
/**
* getArray(elements..)
* Returns json array of elements as a string
* Accept Type(s): (STRING|INT|DOUBLE|FLOAT|OBJECT ..)
* Return Type(s): (STRING)
*/
public class GetArrayFunctionExtension extends FunctionExecutor {
private Attribute.Type returnType = Attribute.Type.STRING;
@Override
protected void init(ExpressionExecutor[] attributeExpressionExecutors,
ExecutionPlanContext executionPlanContext) {
if (attributeExpressionExecutors.length <= 0) {
throw new ExecutionPlanValidationException(
"Invalid no of arguments passed to json:getArray() function," + " required one or more, but found "
+ attributeExpressionExecutors.length);
}
Attribute.Type inputType = attributeExpressionExecutors[0].getReturnType();
for (int i = 1; i < attributeExpressionExecutors.length; i++) {
if (attributeExpressionExecutors[0].getReturnType() != inputType) {
throw new ExecutionPlanValidationException(
"Parameter types are inconsistent. All parameters should be same");
}
}
}
@Override
protected Object execute(Object[] data) {
JSONArray jsonArray = new JSONArray();
for (Object obj : data) {
jsonArray.put(obj);
}
return jsonArray.toString();
}
@Override
protected Object execute(Object data) {
return execute(new Object[]{data});
}
@Override
public void start() {
//Nothing to start
}
@Override
public void stop() {
//Nothing to stop
}
@Override
public Attribute.Type getReturnType() {
return returnType;
}
@Override
public Object[] currentState() {
return null; //No need to maintain a state.
}
@Override
public void restoreState(Object[] state) {
//Since there's no need to maintain a state, nothing needs to be done here.
}
}

@ -1,12 +1,12 @@
#
# Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
# Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
#
# WSO2 Inc. licenses this file to you under the Apache License,
# Version 2.0 (the "License"); you may not use this file except
# in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
@ -17,3 +17,4 @@
#
getProperty=org.wso2.extension.siddhi.execution.json.GetPropertyFunctionExtension
getArray=org.wso2.extension.siddhi.execution.json.GetArrayFunctionExtension

@ -1,12 +1,12 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@ -18,10 +18,10 @@
package org.wso2.extension.siddhi.execution.json;
import junit.framework.Assert;
import org.apache.log4j.Logger;
import org.junit.Before;
import org.junit.Test;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.event.Event;
@ -32,12 +32,12 @@ import org.wso2.extension.siddhi.execution.json.test.util.SiddhiTestHelper;
import java.util.concurrent.atomic.AtomicInteger;
public class getPropertyFunctionTestCase {
static final Logger log = Logger.getLogger(getPropertyFunctionTestCase.class);
public class ExtensionTestCase {
static final Logger log = Logger.getLogger(ExtensionTestCase.class);
private AtomicInteger count = new AtomicInteger(0);
private volatile boolean eventArrived;
@Before
@BeforeClass
public void init() {
count.set(0);
eventArrived = false;
@ -85,4 +85,39 @@ public class getPropertyFunctionTestCase {
Assert.assertTrue(eventArrived);
executionPlanRuntime.shutdown();
}
@Test(dependsOnMethods = {"testGetPropertyFunctionExtension"})
public void testGetArrayFunctionExtension() throws InterruptedException {
count.set(0);
eventArrived = false;
log.info("GetArrayFunctionExtension TestCase");
SiddhiManager siddhiManager = new SiddhiManager();
String inStreamDefinition = "define stream inputStream (arg1 string, arg2 string);";
String query = ("@info(name = 'query1') from inputStream select json:getArray(arg1, arg2) "
+ "as array insert into outputStream;");
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(inStreamDefinition + query);
executionPlanRuntime.addCallback("query1", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
EventPrinter.print(timeStamp, inEvents, removeEvents);
for (Event event : inEvents) {
count.incrementAndGet();
if (count.get() == 1) {
eventArrived = true;
}
}
}
});
InputHandler inputHandler = executionPlanRuntime.getInputHandler("inputStream");
executionPlanRuntime.start();
inputHandler.send(new Object[]{"Arg1","Arg2"});
SiddhiTestHelper.waitForEvents(100, 1, count, 60000);
Assert.assertEquals(1, count.get());
Assert.assertTrue(eventArrived);
executionPlanRuntime.shutdown();
}
}
Loading…
Cancel
Save