API to integrate functionality of DAS

Four endpoints to publish stream, receiver, publisher and siddhi script were added to the device.mgt.api. This commit contains implementation of the above end points, jax-rs beans, custom exceptions and common methods used for the endpoints.
These changes were done in order to decouple the DAS ui from Entgra products.
These endpoints will function by consuming JSON payloads to, generate custom artifacts and deploying them depending on the payload.

BREAKING CHANGE: Four endpoints to publish stream, receiver, publisher and siddhi script were added to the device.mgt.api
3.x.x
Yohan Avishke 5 years ago
parent e42950dd76
commit bdf74ac041

@ -0,0 +1,69 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.wso2.carbon.device.mgt.jaxrs.beans.analytics;
import io.swagger.annotations.ApiModelProperty;
/**
* Receiver definition including :
* Attributes : Name, type, config and attached stream.
*/
public class Adapter {
@ApiModelProperty(value = "Adapter name")
private String adapterName;
@ApiModelProperty(value = "Attached stream name:version")
private String eventStreamWithVersion;
@ApiModelProperty(value = "Adapter type")
private AdapterType adapterType;
@ApiModelProperty(value = "Adapter main configurations")
private AdapterConfiguration adapterConfiguration;
public String getAdapterName() {
return adapterName;
}
public void setAdapterName(String adapterName) {
this.adapterName = adapterName;
}
public String getEventStreamWithVersion() {
return eventStreamWithVersion;
}
public void setEventStreamWithVersion(String eventStreamWithVersion) {
this.eventStreamWithVersion = eventStreamWithVersion;
}
public AdapterType getAdapterType() {
return adapterType;
}
public void setAdapterType(AdapterType adapterType) {
this.adapterType = adapterType;
}
public AdapterConfiguration getAdapterConfiguration() {
return adapterConfiguration;
}
public void setAdapterConfiguration(
AdapterConfiguration adapterConfiguration) {
this.adapterConfiguration = adapterConfiguration;
}
}

@ -0,0 +1,62 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.wso2.carbon.device.mgt.jaxrs.beans.analytics;
import io.swagger.annotations.ApiModelProperty;
import java.util.ArrayList;
import java.util.List;
/**
* Adapter main configurations
* Attributes : Adapter properties list, custom-mapping flag, mapping configurations
*/
public class AdapterConfiguration {
@ApiModelProperty(value = "Adapter properties list")
private List<AdapterProperty> adapterProperties = new ArrayList<>();
@ApiModelProperty(value = "Custom-mapping flag")
private boolean isCustomMappingEnabled;
@ApiModelProperty(value = "Mapping configurations")
private AdapterMappingConfiguration adapterMappingConfiguration;
public List<AdapterProperty> getAdapterProperties() {
return adapterProperties;
}
public void setAdapterProperties(
List<AdapterProperty> adapterProperties) {
this.adapterProperties = adapterProperties;
}
public boolean isCustomMappingEnabled() {
return isCustomMappingEnabled;
}
public void setCustomMappingEnabled(boolean customMappingEnabled) {
isCustomMappingEnabled = customMappingEnabled;
}
public AdapterMappingConfiguration getAdapterMappingConfiguration() {
return adapterMappingConfiguration;
}
public void setAdapterMappingConfiguration(
AdapterMappingConfiguration adapterMappingConfiguration) {
this.adapterMappingConfiguration = adapterMappingConfiguration;
}
}

@ -0,0 +1,104 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.wso2.carbon.device.mgt.jaxrs.beans.analytics;
import io.swagger.annotations.ApiModelProperty;
import java.util.List;
/**
* Adapter mapping configuration definition
* Attributes : Mapping format and lists for each type of property
*/
public class AdapterMappingConfiguration {
@ApiModelProperty(value = "Mapping format")
private MessageFormat messageFormat;
@ApiModelProperty(value = "Input mapping for json,text and xml mappings")
private String inputMappingString;
@ApiModelProperty(value = "Input mapping for json,map and xml mappings")
private List<MappingProperty> inputMappingProperties;
@ApiModelProperty(value = "Name-scape mapping for xml mapping")
private List<MappingProperty> namespaceMappingProperties;
@ApiModelProperty(value = "Correlation mapping for wso2 mapping")
private List<MappingProperty> correlationMappingProperties;
@ApiModelProperty(value = "Payload mapping for wso2 mapping")
private List<MappingProperty> payloadMappingProperties;
@ApiModelProperty(value = "Meta mapping for wso2 mapping")
private List<MappingProperty> metaMappingProperties;
public MessageFormat getMessageFormat() {
return messageFormat;
}
public void setMessageFormat(MessageFormat messageFormat) {
this.messageFormat = messageFormat;
}
public String getInputMappingString() {
return inputMappingString;
}
public void setInputMappingString(String inputMappingString) {
this.inputMappingString = inputMappingString;
}
public List<MappingProperty> getInputMappingProperties() {
return inputMappingProperties;
}
public void setInputMappingProperties(
List<MappingProperty> inputMappingProperties) {
this.inputMappingProperties = inputMappingProperties;
}
public List<MappingProperty> getNamespaceMappingProperties() {
return namespaceMappingProperties;
}
public void setNamespaceMappingProperties(
List<MappingProperty> namespaceMappingProperties) {
this.namespaceMappingProperties = namespaceMappingProperties;
}
public List<MappingProperty> getCorrelationMappingProperties() {
return correlationMappingProperties;
}
public void setCorrelationMappingProperties(
List<MappingProperty> correlationMappingProperties) {
this.correlationMappingProperties = correlationMappingProperties;
}
public List<MappingProperty> getPayloadMappingProperties() {
return payloadMappingProperties;
}
public void setPayloadMappingProperties(
List<MappingProperty> payloadMappingProperties) {
this.payloadMappingProperties = payloadMappingProperties;
}
public List<MappingProperty> getMetaMappingProperties() {
return metaMappingProperties;
}
public void setMetaMappingProperties(
List<MappingProperty> metaMappingProperties) {
this.metaMappingProperties = metaMappingProperties;
}
}

@ -0,0 +1,44 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.wso2.carbon.device.mgt.jaxrs.beans.analytics;
/**
* Receiver properties with it's display name(key) and user input(value)
*/
public class AdapterProperty {
private String key;
private String value;
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}

@ -0,0 +1,31 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.wso2.carbon.device.mgt.jaxrs.beans.analytics;
/**
* Available adapter types
*/
public enum AdapterType {
EMAIL, FILE_TAIL, HTTP, IOT_EVENT, JMS, KAFKA, MQTT, OAUTH_MQTT, SOAP, WEBSOCKET, WEBSOCKET_LOCAL,
WSO2_EVENT, XMPP, UI, RDBMS, SECURED_WEBSOCKET, CASSANDRA, LOGGER;
public String toStringFormatted() {
return super.toString().toLowerCase().replace("_", "-");
}
}

@ -22,24 +22,24 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModelProperty;
/**
* This hold the attribute definition.
* Each attribute definition
* Attributes : name, type
*/
public class Attribute {
@ApiModelProperty(value = "Event Attribute Name")
@ApiModelProperty(value = "Event attribute name")
@JsonProperty("name")
private String name;
@ApiModelProperty(value = "Event Attribute Type")
@ApiModelProperty(value = "Event attribute data type")
@JsonProperty("type")
private AttributeType type;
private AttributeDataType type;
public Attribute() {
}
public Attribute(String name, AttributeType attributeType) {
public Attribute(String name, AttributeDataType attributeDataType) {
this.name = name;
this.type = attributeType;
this.type = attributeDataType;
}
public String getName() {
return name;
@ -49,11 +49,11 @@ public class Attribute {
this.name = name;
}
public AttributeType getType() {
public AttributeDataType getType() {
return type;
}
public void setType(AttributeType type) {
public void setType(AttributeDataType type) {
this.type = type;
}
}

@ -19,9 +19,9 @@
package org.wso2.carbon.device.mgt.jaxrs.beans.analytics;
/**
* This hold the definition of the attribute type for the attributes.
* Attribute data types
*/
public enum AttributeType {
public enum AttributeDataType {
STRING, LONG, BOOL, INT, FLOAT, DOUBLE;
@Override

@ -0,0 +1,62 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.wso2.carbon.device.mgt.jaxrs.beans.analytics;
import io.swagger.annotations.ApiModelProperty;
import java.util.List;
/**
* Stream event attribute data
* Attributes : three lists for each attribute type
*/
public class EventAttributeLists {
@ApiModelProperty(value = "Meta attribute list")
private List<Attribute> metaAttributes;
@ApiModelProperty(value = "Correlation attribute list")
private List<Attribute> correlationAttributes;
@ApiModelProperty(value = "Payload attribute list")
private List<Attribute> payloadAttributes;
public List<Attribute> getMetaAttributes() {
return metaAttributes;
}
public void setMetaAttributes(
List<Attribute> metaAttributes) {
this.metaAttributes = metaAttributes;
}
public List<Attribute> getCorrelationAttributes() {
return correlationAttributes;
}
public void setCorrelationAttributes(
List<Attribute> correlationAttributes) {
this.correlationAttributes = correlationAttributes;
}
public List<Attribute> getPayloadAttributes() {
return payloadAttributes;
}
public void setPayloadAttributes(
List<Attribute> payloadAttributes) {
this.payloadAttributes = payloadAttributes;
}
}

@ -0,0 +1,69 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.wso2.carbon.device.mgt.jaxrs.beans.analytics;
import io.swagger.annotations.ApiModelProperty;
/**
* Stream definition
* Attributes : name, version, definition and attribute data
*/
public class EventStream {
@ApiModelProperty(value = "Stream name")
private String streamName;
@ApiModelProperty(value = "Stream version")
private String streamVersion;
@ApiModelProperty(value = "Stream definition")
private String streamDefinition;
@ApiModelProperty(value = "Stream property attribute lists")
private EventAttributeLists eventAttributeLists;
public String getStreamName() {
return streamName;
}
public void setStreamName(String streamName) {
this.streamName = streamName;
}
public String getStreamVersion() {
return streamVersion;
}
public void setStreamVersion(String streamVersion) {
this.streamVersion = streamVersion;
}
public String getStreamDefinition() {
return streamDefinition;
}
public void setStreamDefinition(String streamDefinition) {
this.streamDefinition = streamDefinition;
}
public EventAttributeLists getEventAttributeLists() {
return eventAttributeLists;
}
public void setEventAttributeLists(
EventAttributeLists eventAttributeLists) {
this.eventAttributeLists = eventAttributeLists;
}
}

@ -0,0 +1,59 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.wso2.carbon.device.mgt.jaxrs.beans.analytics;
import io.swagger.annotations.ApiModelProperty;
/**
* Mapping property data
* Attributes : name, data type and mapping value name
*/
public class MappingProperty {
@ApiModelProperty(value = "Property name")
private String name;
@ApiModelProperty(value = "Property data type")
private String type;
@ApiModelProperty(value = "Property mapping value name")
private String valueOf;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getValueOf() {
return valueOf;
}
public void setValueOf(String valueOf) {
this.valueOf = valueOf;
}
}

@ -0,0 +1,31 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.wso2.carbon.device.mgt.jaxrs.beans.analytics;
/**
* Message protocol types
*/
public enum MessageFormat {
JSON, MAP, XML, WSO2EVENT, TEXT;
@Override
public String toString() {
return super.toString().toLowerCase();
}
}

@ -0,0 +1,48 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.wso2.carbon.device.mgt.jaxrs.beans.analytics;
import io.swagger.annotations.ApiModelProperty;
/**
* Execution plan definition including :
* Attributes : Name and Plan data.
*/
public class SiddhiExecutionPlan {
@ApiModelProperty(value = "Execution plan name")
private String executionPlanName;
@ApiModelProperty(value = "Execution plan")
private String executionPlanData;
public String getExecutionPlanName() {
return executionPlanName;
}
public void setExecutionPlanName(String executionPlanName) {
this.executionPlanName = executionPlanName;
}
public String getExecutionPlanData() {
return executionPlanData;
}
public void setExecutionPlanData(String executionPlanData) {
this.executionPlanData = executionPlanData;
}
}

@ -0,0 +1,36 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.wso2.carbon.device.mgt.jaxrs.exception;
public class InvalidExecutionPlanException extends Exception{
private String errorMessage;
public String getErrorMessage() {
return errorMessage;
}
public void setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
}
public InvalidExecutionPlanException(String msg) {
super(msg);
setErrorMessage(msg);
}
}

@ -0,0 +1,290 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.wso2.carbon.device.mgt.jaxrs.service.api;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Extension;
import io.swagger.annotations.ExtensionProperty;
import io.swagger.annotations.Info;
import io.swagger.annotations.ResponseHeader;
import io.swagger.annotations.SwaggerDefinition;
import io.swagger.annotations.Tag;
import org.wso2.carbon.apimgt.annotations.api.Scope;
import org.wso2.carbon.apimgt.annotations.api.Scopes;
import org.wso2.carbon.device.mgt.jaxrs.beans.ErrorResponse;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.Adapter;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.EventStream;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.SiddhiExecutionPlan;
import org.wso2.carbon.device.mgt.jaxrs.util.Constants;
import javax.validation.Valid;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@SwaggerDefinition(
info = @Info(
version = "1.0.0",
title = "",
extensions = {
@Extension(properties = {
@ExtensionProperty(name = "name", value = "AnalyticsManagement"),
@ExtensionProperty(name = "context", value = "/api/device-mgt/v1.0/analytics"),
})
}
),
tags = {
@Tag(name = "analytics_management", description = "")
}
)
@Scopes(
scopes = {
@Scope(
name = "Add Event stream",
description = "Add definition of a Event stream",
key = "perm:analytics:events",
permissions = {"/device-mgt/analytics/event/add"}
),
@Scope(
name = "Add Publisher",
description = "Add definition of a Publisher",
key = "perm:analytics:publisher",
permissions = {"/device-mgt/analytics/publisher/add"}
),
@Scope(
name = "Add Receiver",
description = "Add definition of a Receiver",
key = "perm:analytics:receiver",
permissions = {"/device-mgt/analytics/receiver/add"}
),
@Scope(
name = "Add Siddhi script",
description = "Add definition of a Siddhi script",
key = "perm:analytics:script",
permissions = {"/device-mgt/analytics/script/add"}
)
}
)
@Path("/analytics")
@Api(value = "Analytics Management", description = "This API corresponds to all tasks related analytics server")
@Consumes(MediaType.APPLICATION_JSON)
public interface AnalyticsManagementService {
@POST
@Path("/event")
@ApiOperation(
httpMethod = "POST",
value = "Adding the Event Type Definition",
notes = "Add the event definition to analytics.",
tags = "Analytics Management",
extensions = {
@Extension(properties = {
@ExtensionProperty(name = Constants.SCOPE, value = "perm:analytics:events")
})
}
)
@ApiResponses(
value = {
@ApiResponse(
code = 200,
message = "OK. \n Successfully added the event defintion.",
responseHeaders = {
@ResponseHeader(
name = "Content-Type",
description = "The content type of the body"),
@ResponseHeader(
name = "ETag",
description = "Entity Tag of the response resource.\n" +
"Used by caches, or in conditional requests."),
@ResponseHeader(
name = "Last-Modified",
description =
"Date and time the resource was last modified.\n" +
"Used by caches, or in conditional requests."),
}
),
@ApiResponse(
code = 400,
message =
"Bad Request."),
@ApiResponse(
code = 406,
message = "Not Acceptable.\n The requested media type is not supported."),
@ApiResponse(
code = 500,
message = "Internal Server Error. \n Server error occurred while publishing the " +
"Event stream.",
response = ErrorResponse.class)
}
)
Response deployEventDefinition(@Valid EventStream stream);
@POST
@Path("/receiver")
@ApiOperation(
httpMethod = "POST",
value = "Adding an Event Receiver",
notes = "Add the receiver for an event",
tags = "Analytics Management",
extensions = {
@Extension(properties = {
@ExtensionProperty(name = Constants.SCOPE, value = "perm:analytics:receiver")
})
}
)
@ApiResponses(
value = {
@ApiResponse(
code = 200,
message = "OK. \n Successfully added the receiver.",
responseHeaders = {
@ResponseHeader(
name = "Content-Type",
description = "The content type of the body."),
@ResponseHeader(
name = "ETag",
description = "Entity Tag of the response resource.\n" +
"Used by caches, or in conditional requests."),
@ResponseHeader(
name = "Last-Modified",
description =
"Date and time the resource was last modified.\n" +
"Used by caches, or in conditional requests."),
}
),
@ApiResponse(
code = 400,
message =
"Bad Request."),
@ApiResponse(
code = 406,
message = "Not Acceptable.\n The requested media type is not supported"),
@ApiResponse(
code = 500,
message = "Internal Server Error. \n Server error occurred while publishing the " +
"Event receiver.",
response = ErrorResponse.class)
}
)
Response deployEventReceiver(Adapter receiver);
@POST
@Path("/publisher")
@ApiOperation(
httpMethod = "POST",
value = "Adding an Event Publisher",
notes = "Add the publisher for an event",
tags = "Analytics Management",
extensions = {
@Extension(properties = {
@ExtensionProperty(name = Constants.SCOPE, value = "perm:analytics:publisher")
})
}
)
@ApiResponses(
value = {
@ApiResponse(
code = 200,
message = "OK. \n Successfully added the publisher.",
responseHeaders = {
@ResponseHeader(
name = "Content-Type",
description = "The content type of the body."),
@ResponseHeader(
name = "ETag",
description = "Entity Tag of the response resource.\n" +
"Used by caches, or in conditional requests."),
@ResponseHeader(
name = "Last-Modified",
description =
"Date and time the resource was last modified.\n" +
"Used by caches, or in conditional requests."),
}
),
@ApiResponse(
code = 400,
message =
"Bad Request."),
@ApiResponse(
code = 406,
message = "Not Acceptable.\n The requested media type is not supported."),
@ApiResponse(
code = 500,
message = "Internal Server Error. \n Server error occurred while publishing the " +
"Event publisher.",
response = ErrorResponse.class)
}
)
Response deployEventPublisher(@Valid Adapter publisher);
@POST
@Path("/executable")
@ApiOperation(
httpMethod = "POST",
value = "Adding a Siddhi executable",
notes = "Add a Siddhi executable for a event",
tags = "Analytics Management",
extensions = {
@Extension(properties = {
@ExtensionProperty(name = Constants.SCOPE, value = "perm:analytics:script")
})
}
)
@ApiResponses(
value = {
@ApiResponse(
code = 200,
message = "OK. \n Successfully added the execution plan.",
responseHeaders = {
@ResponseHeader(
name = "Content-Type",
description = "The content type of the body."),
@ResponseHeader(
name = "ETag",
description = "Entity Tag of the response resource.\n" +
"Used by caches, or in conditional requests."),
@ResponseHeader(
name = "Last-Modified",
description =
"Date and time the resource was last modified.\n" +
"Used by caches, or in conditional requests."),
}
),
@ApiResponse(
code = 400,
message =
"Bad Request."),
@ApiResponse(
code = 406,
message = "Not Acceptable.\n The requested media type is not supported."),
@ApiResponse(
code = 500,
message = "Internal Server Error. \n Server error occurred while " +
"publishing the Execution plan.",
response = ErrorResponse.class)
}
)
Response deploySiddhiExecutableScript(@Valid SiddhiExecutionPlan plan);
}

@ -0,0 +1,684 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.wso2.carbon.device.mgt.jaxrs.service.impl;
import org.apache.axis2.AxisFault;
import org.apache.axis2.client.Stub;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.base.MultitenantConstants;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.Attribute;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.AdapterMappingConfiguration;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.MappingProperty;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.AdapterConfiguration;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.AdapterProperty;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.MessageFormat;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.SiddhiExecutionPlan;
import org.wso2.carbon.device.mgt.jaxrs.exception.InvalidExecutionPlanException;
import org.wso2.carbon.device.mgt.jaxrs.service.api.AnalyticsManagementService;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.Adapter;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.EventAttributeLists;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.EventStream;
import org.wso2.carbon.device.mgt.jaxrs.util.DeviceMgtAPIUtils;
import org.wso2.carbon.event.processor.stub.EventProcessorAdminServiceStub;
import org.wso2.carbon.event.processor.stub.types.ExecutionPlanConfigurationDto;
import org.wso2.carbon.event.publisher.stub.EventPublisherAdminServiceStub;
import org.wso2.carbon.event.publisher.stub.types.BasicOutputAdapterPropertyDto;
import org.wso2.carbon.event.publisher.stub.types.EventPublisherConfigurationDto;
import org.wso2.carbon.event.receiver.stub.EventReceiverAdminServiceStub;
import org.wso2.carbon.event.receiver.stub.types.BasicInputAdapterPropertyDto;
import org.wso2.carbon.event.receiver.stub.types.EventMappingPropertyDto;
import org.wso2.carbon.event.receiver.stub.types.EventReceiverConfigurationDto;
import org.wso2.carbon.event.stream.stub.EventStreamAdminServiceStub;
import org.wso2.carbon.event.stream.stub.types.EventStreamAttributeDto;
import org.wso2.carbon.event.stream.stub.types.EventStreamDefinitionDto;
import org.wso2.carbon.identity.jwt.client.extension.exception.JWTClientException;
import org.wso2.carbon.user.api.UserStoreException;
import javax.validation.Valid;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;
import java.rmi.RemoteException;
import java.util.List;
/**
* This class is used to create endpoints to serve the deployment of streams, adapters, siddhi
* execution plans to the DAS
*/
@Path("/analytics")
public class AnalyticsManagementServiceImpl implements AnalyticsManagementService {
private static final Log log = LogFactory.getLog(AnalyticsManagementServiceImpl.class);
/**
* @param eventStream EventStream object with the properties of the stream
* @return A status code depending on the code result
* Function - Deploy a event stream
*/
@Override
@POST
@Path("/event")
public Response deployEventDefinition(@Valid EventStream eventStream) {
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
EventAttributeLists eventAttributes = eventStream.getEventAttributeLists();
// Categorize attributes to three lists depending on their type
List<Attribute> eventMetaAttributes = eventAttributes.getMetaAttributes();
List<Attribute> eventPayloadAttributes = eventAttributes.getPayloadAttributes();
List<Attribute> eventCorrelationAttributes = eventAttributes.getCorrelationAttributes();
try {
/* Conditions
* - At least one list should always be not null
*/
if (eventStream.getEventAttributeLists() == null ||
(eventMetaAttributes == null && eventCorrelationAttributes == null &&
eventPayloadAttributes == null)) {
log.error("Invalid payload: eventAttributeLists");
return Response.status(Response.Status.BAD_REQUEST).build();
} else {
String streamName = eventStream.getStreamName();
String streamVersion = eventStream.getStreamVersion();
String streamDefinition = eventStream.getStreamDefinition();
// Publish the event stream
publishStream(streamName, streamVersion, streamDefinition, eventMetaAttributes,
eventPayloadAttributes, eventCorrelationAttributes);
try {
/*
* Create a new tenant and if that tenant is not the super tenant execute the publish
* function for the new tenant
*/
PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(
MultitenantConstants.SUPER_TENANT_DOMAIN_NAME, true);
if (!MultitenantConstants.SUPER_TENANT_DOMAIN_NAME.equals(tenantDomain)) {
publishStream(streamName, streamVersion, streamDefinition, eventMetaAttributes,
eventPayloadAttributes, eventCorrelationAttributes);
}
} finally {
// End the new tenant flow
PrivilegedCarbonContext.endTenantFlow();
}
return Response.ok().build();
}
} catch (AxisFault e) {
log.error("Failed to create event definitions for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} catch (RemoteException e) {
log.error("Failed to connect with the remote services for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} catch (JWTClientException e) {
log.error("Failed to generate jwt token for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} catch (UserStoreException e) {
log.error("Failed to connect with the user store for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
}
}
/**
* @param receiver Receiver definition data
* @return a status code depending on the code execution
* * Function - Deploy a event receiver
*/
@Override
@POST
@Path("/receiver")
public Response deployEventReceiver(Adapter receiver) {
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
String receiverName = receiver.getAdapterName();
String adapterType = receiver.getAdapterType().toStringFormatted();
AdapterConfiguration adapterConfiguration = receiver.getAdapterConfiguration();
AdapterMappingConfiguration adapterMappingConfiguration = adapterConfiguration.getAdapterMappingConfiguration();
try {
List<AdapterProperty> adapterProperties = adapterConfiguration.getAdapterProperties();
if (adapterProperties == null) {
log.error("Invalid attribute payload");
return Response.status(Response.Status.BAD_REQUEST).build();
}
boolean customMapping = adapterConfiguration.isCustomMappingEnabled();
List<MappingProperty> inputMappingProperties = adapterMappingConfiguration.getInputMappingProperties();
List<MappingProperty> namespaceMappingProperties = adapterMappingConfiguration.getNamespaceMappingProperties();
List<MappingProperty> correlationMappingProperties = adapterMappingConfiguration.getCorrelationMappingProperties();
List<MappingProperty> payloadMappingProperties = adapterMappingConfiguration.getPayloadMappingProperties();
List<MappingProperty> metaMappingProperties = adapterMappingConfiguration.getMetaMappingProperties();
MessageFormat messageFormat = adapterMappingConfiguration.getMessageFormat();
/*
* Conditions
* - if CustomMappingEnabled check validity of property lists
* - if both inputMappingProperties and namespaceMappingProperties null check remaining property lists
* - if all correlationMappingProperties, payloadMappingProperties, metaMappingProperties null log error
* - if message format is null change the final result to TRUE
* - else continue
* */
if ((customMapping &&
(inputMappingProperties == null && namespaceMappingProperties == null) &&
(correlationMappingProperties == null && payloadMappingProperties == null &&
metaMappingProperties == null)) ||
messageFormat == null) {
log.error("Invalid mapping payload");
return Response.status(Response.Status.BAD_REQUEST).build();
}
String eventStreamWithVersion = receiver.getEventStreamWithVersion();
publishReceiver(receiverName, adapterType, adapterProperties, customMapping, inputMappingProperties,
namespaceMappingProperties, correlationMappingProperties, payloadMappingProperties,
metaMappingProperties, messageFormat, eventStreamWithVersion);
try {
/*
* Create a new tenant and if that tenant is not the super tenant execute the publish
* function for the new tenant
*/
PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(
MultitenantConstants.SUPER_TENANT_DOMAIN_NAME, true);
if (!MultitenantConstants.SUPER_TENANT_DOMAIN_NAME.equals(tenantDomain)) {
publishReceiver(receiverName, adapterType, adapterProperties, customMapping, inputMappingProperties,
namespaceMappingProperties, correlationMappingProperties, payloadMappingProperties,
metaMappingProperties, messageFormat, eventStreamWithVersion);
}
} finally {
// End the new tenant flow
PrivilegedCarbonContext.endTenantFlow();
}
return Response.ok().build();
} catch (AxisFault e) {
log.error("Failed to create event definitions for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} catch (RemoteException e) {
log.error("Failed to connect with the remote services for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} catch (JWTClientException e) {
log.error("Failed to generate jwt token for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} catch (UserStoreException e) {
log.error("Failed to connect with the user store for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
}
}
/**
* @param publisher Publisher definition
* @return a status code depending on the code execution
*/
@Override
@POST
@Path("/publisher")
public Response deployEventPublisher(@Valid Adapter publisher) {
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
String publisherName = publisher.getAdapterName();
String adapterType = publisher.getAdapterType().toStringFormatted();
AdapterConfiguration adapterConfiguration = publisher.getAdapterConfiguration();
AdapterMappingConfiguration adapterMappingConfiguration = adapterConfiguration.getAdapterMappingConfiguration();
try {
List<AdapterProperty> adapterProperties = adapterConfiguration.getAdapterProperties();
if (adapterProperties == null) {
log.error("Invalid attribute payload");
return Response.status(Response.Status.BAD_REQUEST).build();
}
boolean customMapping = adapterConfiguration.isCustomMappingEnabled();
String inputMappingString = adapterMappingConfiguration.getInputMappingString();
List<MappingProperty> inputMappingProperties = adapterMappingConfiguration.getInputMappingProperties();
List<MappingProperty> correlationMappingProperties = adapterMappingConfiguration.getCorrelationMappingProperties();
List<MappingProperty> payloadMappingProperties = adapterMappingConfiguration.getPayloadMappingProperties();
List<MappingProperty> metaMappingProperties = adapterMappingConfiguration.getMetaMappingProperties();
MessageFormat messageFormat = adapterMappingConfiguration.getMessageFormat();
/*
* Conditions
* - if CustomMappingEnabled check validity of property lists
* - if all correlationMappingProperties, payloadMappingProperties, metaMappingProperties null log error
* - if message format is null change the final result to TRUE
* - else continue
*/
if ((customMapping &&
(inputMappingProperties == null && inputMappingString == null) &&
(correlationMappingProperties == null && payloadMappingProperties == null &&
metaMappingProperties == null)) ||
messageFormat == null) {
log.error("Invalid mapping payload");
return Response.status(Response.Status.BAD_REQUEST).build();
}
String eventStreamWithVersion = publisher.getEventStreamWithVersion();
publishPublisher(publisherName, adapterType, adapterProperties, customMapping
, inputMappingString, inputMappingProperties, correlationMappingProperties
, payloadMappingProperties, metaMappingProperties, messageFormat
, eventStreamWithVersion);
try {
/*
* Create a new tenant and if that tenant is not the super tenant execute the publish
* function for the new tenant
*/
PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(
MultitenantConstants.SUPER_TENANT_DOMAIN_NAME, true);
if (!MultitenantConstants.SUPER_TENANT_DOMAIN_NAME.equals(tenantDomain)) {
publishPublisher(publisherName, adapterType, adapterProperties, customMapping
, inputMappingString, inputMappingProperties, correlationMappingProperties
, payloadMappingProperties, metaMappingProperties, messageFormat
, eventStreamWithVersion);
}
} finally {
// End the new tenant flow
PrivilegedCarbonContext.endTenantFlow();
}
return Response.ok().build();
} catch (AxisFault e) {
log.error("Failed to create event definitions for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} catch (RemoteException e) {
log.error("Failed to connect with the remote services for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} catch (JWTClientException e) {
log.error("Failed to generate jwt token for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} catch (UserStoreException e) {
log.error("Failed to connect with the user store for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
}
}
/**
* @param executionPlan Siddhi execution plan definition
* @return a status code depending on the code execution
*/
@Override
@POST
@Path("/executable")
public Response deploySiddhiExecutableScript(@Valid SiddhiExecutionPlan executionPlan) {
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
try {
publishSiddhiExecutionPlan(executionPlan.getExecutionPlanName(),
executionPlan.getExecutionPlanData());
try {
/*
* Create a new tenant and if that tenant is not the super tenant execute the publish
* function for the new tenant
*/
PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(
MultitenantConstants.SUPER_TENANT_DOMAIN_NAME, true);
if (!MultitenantConstants.SUPER_TENANT_DOMAIN_NAME.equals(tenantDomain)) {
publishSiddhiExecutionPlan(executionPlan.getExecutionPlanName(),
executionPlan.getExecutionPlanData());
}
} finally {
// End the new tenant flow
PrivilegedCarbonContext.endTenantFlow();
}
return Response.ok().build();
} catch (AxisFault e) {
log.error("Failed to create event definitions for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} catch (RemoteException e) {
log.error("Failed to connect with the remote services for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} catch (JWTClientException e) {
log.error("Failed to generate jwt token for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} catch (UserStoreException e) {
log.error("Failed to connect with the user store for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} catch (InvalidExecutionPlanException e) {
log.error("Invalid Execution plan: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
}
}
/**
* @param streamName Name of the stream
* @param streamVersion Version of the stream
* @param streamDefinition Definition of the stream
* @param eventMetaAttributes Meta attributes of the stream
* @param eventPayloadAttributes Payload attributes of the stream
* @param eventCorrelationAttributes Correlation attributes of the stream
* @throws RemoteException exception that may occur during a remote method call
* @throws UserStoreException exception that may occur during JWT token generation
* @throws JWTClientException exception that may occur during connecting to client store
*/
private void publishStream(String streamName, String streamVersion, String streamDefinition,
List<Attribute> eventMetaAttributes,
List<Attribute> eventPayloadAttributes,
List<Attribute> eventCorrelationAttributes)
throws RemoteException, UserStoreException, JWTClientException {
EventStreamAdminServiceStub eventStreamAdminServiceStub =
DeviceMgtAPIUtils.getEventStreamAdminServiceStub();
try {
EventStreamDefinitionDto eventStreamDefinitionDto = new EventStreamDefinitionDto();
eventStreamDefinitionDto.setName(streamName);
eventStreamDefinitionDto.setVersion(streamVersion);
eventStreamDefinitionDto.setDescription(streamDefinition);
eventStreamDefinitionDto.setMetaData(addEventAttributesToDto(eventMetaAttributes));
eventStreamDefinitionDto.setPayloadData(addEventAttributesToDto(eventPayloadAttributes));
eventStreamDefinitionDto.setCorrelationData(addEventAttributesToDto(eventCorrelationAttributes));
String streamId = streamName + ":" + streamVersion;
if (eventStreamAdminServiceStub.getStreamDefinitionDto(streamId) != null) {
eventStreamAdminServiceStub.editEventStreamDefinitionAsDto(eventStreamDefinitionDto, streamId);
} else {
eventStreamAdminServiceStub.addEventStreamDefinitionAsDto(eventStreamDefinitionDto);
}
} finally {
cleanup(eventStreamAdminServiceStub);
}
}
/**
* @param receiverName Receiver name
* @param adapterType Receiver type
* @param adapterProperties Receiver properties
* @param customMapping Is receiver mapped
* @param inputMappingProperties Receiver input attribute mapping
* @param namespaceMappingProperties Receiver name-scape attribute mapping
* @param correlationMappingProperties Receiver correlation attribute mapping
* @param payloadMappingProperties Receiver payload attribute mapping
* @param metaMappingProperties Receiver meta attribute mapping
* @param messageFormat Receiver mapping format
* @param eventStreamWithVersion Attached stream
* @throws RemoteException exception that may occur during a remote method call
* @throws UserStoreException exception that may occur during JWT token generation
* @throws JWTClientException exception that may occur during connecting to client store
*/
private void publishReceiver(String receiverName, String adapterType,
List<AdapterProperty> adapterProperties, boolean customMapping,
List<MappingProperty> inputMappingProperties,
List<MappingProperty> namespaceMappingProperties,
List<MappingProperty> correlationMappingProperties,
List<MappingProperty> payloadMappingProperties,
List<MappingProperty> metaMappingProperties,
MessageFormat messageFormat,
String eventStreamWithVersion)
throws RemoteException, UserStoreException, JWTClientException {
EventReceiverAdminServiceStub eventReceiverAdminServiceStub = DeviceMgtAPIUtils.getEventReceiverAdminServiceStub();
try {
EventReceiverConfigurationDto eventReceiverConfigurationDto = eventReceiverAdminServiceStub
.getActiveEventReceiverConfiguration(receiverName);
// Check if adapter already exists, if so un-deploy it
if (eventReceiverConfigurationDto != null) {
eventReceiverAdminServiceStub.undeployActiveEventReceiverConfiguration(receiverName);
}
// Adding attribute properties to DTOs
BasicInputAdapterPropertyDto[] basicInputAdapterPropertyDtos =
addReceiverConfigToDto(adapterProperties);
if (eventReceiverAdminServiceStub.getActiveEventReceiverConfiguration(receiverName) == null) {
// Call stub deploy methods according to the message format
if (!messageFormat.toString().equals("wso2event")) {
EventMappingPropertyDto[] inputMappingPropertyDtos =
addReceiverMappingToDto(inputMappingProperties);
if (messageFormat.toString().equals("xml")) {
EventMappingPropertyDto[] namespaceMappingPropertyDtos =
addReceiverMappingToDto(namespaceMappingProperties);
eventReceiverAdminServiceStub.deployXmlEventReceiverConfiguration(receiverName
, eventStreamWithVersion, adapterType, null
, namespaceMappingPropertyDtos, inputMappingPropertyDtos
, basicInputAdapterPropertyDtos, customMapping);
} else {
if (messageFormat.toString().equals("map")) {
eventReceiverAdminServiceStub.deployMapEventReceiverConfiguration(receiverName
, eventStreamWithVersion, adapterType, inputMappingPropertyDtos
, basicInputAdapterPropertyDtos, customMapping);
} else if (messageFormat.toString().equals("text")) {
eventReceiverAdminServiceStub.deployTextEventReceiverConfiguration(receiverName
, eventStreamWithVersion, adapterType, inputMappingPropertyDtos
, basicInputAdapterPropertyDtos, customMapping);
} else {
eventReceiverAdminServiceStub.deployJsonEventReceiverConfiguration(receiverName
, eventStreamWithVersion, adapterType, inputMappingPropertyDtos
, basicInputAdapterPropertyDtos, customMapping);
}
}
} else {
EventMappingPropertyDto[] correlationMappingPropertyDtos = addReceiverMappingToDto(correlationMappingProperties);
EventMappingPropertyDto[] metaMappingPropertyDtos = addReceiverMappingToDto(metaMappingProperties);
EventMappingPropertyDto[] payloadMappingPropertyDtos = addReceiverMappingToDto(payloadMappingProperties);
eventReceiverAdminServiceStub.deployWso2EventReceiverConfiguration(receiverName
, eventStreamWithVersion, adapterType, metaMappingPropertyDtos
, correlationMappingPropertyDtos, payloadMappingPropertyDtos
, basicInputAdapterPropertyDtos, customMapping
, eventStreamWithVersion);
}
}
} finally {
cleanup(eventReceiverAdminServiceStub);
}
}
/**
* @param publisherName Publisher name
* @param adapterType Publisher type
* @param adapterProperties Publisher properties
* @param customMapping Is publisher mapped
* @param correlationMappingProperties Publisher correlation attribute mapping
* @param payloadMappingProperties Publisher payload attribute mapping
* @param metaMappingProperties Publisher meta attribute mapping
* @param messageFormat Publisher mapping format
* @param eventStreamWithVersion Attached stream
* @throws RemoteException exception that may occur during a remote method call
* @throws UserStoreException exception that may occur during JWT token generation
* @throws JWTClientException exception that may occur during connecting to client store
*/
private void publishPublisher(String publisherName, String adapterType,
List<AdapterProperty> adapterProperties,
boolean customMapping,
String inputMappingString,
List<MappingProperty> inputMappingProperties,
List<MappingProperty> correlationMappingProperties,
List<MappingProperty> payloadMappingProperties,
List<MappingProperty> metaMappingProperties,
MessageFormat messageFormat,
String eventStreamWithVersion)
throws RemoteException, UserStoreException, JWTClientException {
EventPublisherAdminServiceStub eventPublisherAdminServiceStub = DeviceMgtAPIUtils.getEventPublisherAdminServiceStub();
// Check if adapter already exists, if so un-deploy it
try {
EventPublisherConfigurationDto eventPublisherConfigurationDto = eventPublisherAdminServiceStub
.getActiveEventPublisherConfiguration(publisherName);
if (eventPublisherConfigurationDto != null) {
eventPublisherAdminServiceStub.undeployActiveEventPublisherConfiguration(publisherName);
}
// Adding attribute properties to DTOs
BasicOutputAdapterPropertyDto[] basicOutputAdapterPropertyDtos =
addPublisherConfigToDto(adapterProperties);
if (eventPublisherAdminServiceStub.getActiveEventPublisherConfiguration(publisherName) == null) {
// Call stub deploy methods according to the message format
if (!messageFormat.toString().equals("wso2event")) {
if (!messageFormat.toString().equals("map")) {
if (messageFormat.toString().equals("xml")) {
eventPublisherAdminServiceStub.deployXmlEventPublisherConfiguration(publisherName
, eventStreamWithVersion, adapterType, inputMappingString
, basicOutputAdapterPropertyDtos, eventStreamWithVersion
, customMapping);
} else if (messageFormat.toString().equals("text")) {
eventPublisherAdminServiceStub.deployTextEventPublisherConfiguration(publisherName
, eventStreamWithVersion, adapterType, inputMappingString
, basicOutputAdapterPropertyDtos, eventStreamWithVersion
, customMapping);
} else {
eventPublisherAdminServiceStub.deployJsonEventPublisherConfiguration(publisherName
, eventStreamWithVersion, adapterType, inputMappingString
, basicOutputAdapterPropertyDtos, eventStreamWithVersion
, customMapping);
}
} else {
org.wso2.carbon.event.publisher.stub.types.EventMappingPropertyDto[] inputMappingPropertyDtos =
addPublisherMappingToDto(inputMappingProperties);
eventPublisherAdminServiceStub.deployMapEventPublisherConfiguration(publisherName
, eventStreamWithVersion, adapterType, inputMappingPropertyDtos
, basicOutputAdapterPropertyDtos, customMapping);
}
} else {
org.wso2.carbon.event.publisher.stub.types.EventMappingPropertyDto[] correlationMappingPropertyDtos =
addPublisherMappingToDto(correlationMappingProperties);
org.wso2.carbon.event.publisher.stub.types.EventMappingPropertyDto[] metaMappingPropertyDtos =
addPublisherMappingToDto(metaMappingProperties);
org.wso2.carbon.event.publisher.stub.types.EventMappingPropertyDto[] payloadMappingPropertyDtos =
addPublisherMappingToDto(payloadMappingProperties);
eventPublisherAdminServiceStub.deployWSO2EventPublisherConfiguration(publisherName
, eventStreamWithVersion, adapterType, metaMappingPropertyDtos
, correlationMappingPropertyDtos, payloadMappingPropertyDtos
, basicOutputAdapterPropertyDtos, customMapping
, eventStreamWithVersion);
}
}
} finally {
cleanup(eventPublisherAdminServiceStub);
}
}
/**
* @param executionPlanName Siddhi execution plan name
* @param executionPlanData Siddhi execution plan
* @throws RemoteException exception that may occur during a remote method call
* @throws UserStoreException exception that may occur during JWT token generation
* @throws JWTClientException exception that may occur during connecting to client store
* @throws InvalidExecutionPlanException exception that may occur if execution plan validation fails
*/
private void publishSiddhiExecutionPlan(String executionPlanName, String executionPlanData)
throws RemoteException, UserStoreException, JWTClientException,
InvalidExecutionPlanException {
EventProcessorAdminServiceStub eventProcessorAdminServiceStub = null;
// Flag to check the existence of a plan
boolean isAvailable = false;
try {
eventProcessorAdminServiceStub = DeviceMgtAPIUtils.getEventProcessorAdminServiceStub();
// Validate the plan code
String validationResponse = eventProcessorAdminServiceStub.validateExecutionPlan(executionPlanData);
if (validationResponse.equals("success")) {
String activeExecutionPlanName = null;
// Get active plans
ExecutionPlanConfigurationDto[] allActiveExecutionPlanConfigs = eventProcessorAdminServiceStub.getAllActiveExecutionPlanConfigurations();
if (allActiveExecutionPlanConfigs != null) {
for (ExecutionPlanConfigurationDto activeExecutionPlanConfig : allActiveExecutionPlanConfigs) {
activeExecutionPlanName = activeExecutionPlanConfig.getName();
if (activeExecutionPlanName.equals(executionPlanName)) {
isAvailable = true;
}
}
}
// Edit existing plan
if (isAvailable) {
eventProcessorAdminServiceStub.editActiveExecutionPlan(executionPlanData,
executionPlanName);
}
// Create a new plan
else {
eventProcessorAdminServiceStub.deployExecutionPlan(executionPlanData);
}
} else {
String errMsg = "Execution plan validation failed: " + validationResponse;
log.error(errMsg);
throw new InvalidExecutionPlanException(errMsg);
}
} finally {
cleanup(eventProcessorAdminServiceStub);
}
}
private EventStreamAttributeDto[] addEventAttributesToDto(List<Attribute> attributes) {
EventStreamAttributeDto[] eventStreamAttributeDtos = new EventStreamAttributeDto[attributes.size()];
for (int i = 0; i < attributes.size(); i++) {
EventStreamAttributeDto eventStreamAttributeDto = new EventStreamAttributeDto();
eventStreamAttributeDto.setAttributeName(attributes.get(i).getName());
eventStreamAttributeDto.setAttributeType(attributes.get(i).getType().toString());
eventStreamAttributeDtos[i] = eventStreamAttributeDto;
}
return eventStreamAttributeDtos;
}
private BasicInputAdapterPropertyDto[] addReceiverConfigToDto(
List<AdapterProperty> adapterProperties) {
BasicInputAdapterPropertyDto[] basicInputAdapterPropertyDtos
= new BasicInputAdapterPropertyDto[adapterProperties.size()];
for (int i = 0; i < adapterProperties.size(); i++) {
BasicInputAdapterPropertyDto basicInputAdapterPropertyDto = new BasicInputAdapterPropertyDto();
basicInputAdapterPropertyDto.setKey(adapterProperties.get(i).getKey());
basicInputAdapterPropertyDto.setValue(adapterProperties.get(i).getValue());
basicInputAdapterPropertyDtos[i] = basicInputAdapterPropertyDto;
}
return basicInputAdapterPropertyDtos;
}
private EventMappingPropertyDto[] addReceiverMappingToDto(List<MappingProperty> mapProperties) {
EventMappingPropertyDto[] eventMappingPropertyDtos = new EventMappingPropertyDto[mapProperties.size()];
for (int i = 0; i < mapProperties.size(); i++) {
EventMappingPropertyDto eventMappingPropertyDto = new EventMappingPropertyDto();
eventMappingPropertyDto.setName(mapProperties.get(i).getName());
eventMappingPropertyDto.setType(mapProperties.get(i).getType());
eventMappingPropertyDto.setValueOf(mapProperties.get(i).getValueOf());
eventMappingPropertyDtos[i] = eventMappingPropertyDto;
}
return eventMappingPropertyDtos;
}
private BasicOutputAdapterPropertyDto[] addPublisherConfigToDto(
List<AdapterProperty> adapterProperties) {
BasicOutputAdapterPropertyDto[] basicOutputAdapterPropertyDtos =
new BasicOutputAdapterPropertyDto[adapterProperties.size()];
for (int i = 0; i < adapterProperties.size(); i++) {
BasicOutputAdapterPropertyDto basicOutputAdapterPropertyDto =
new BasicOutputAdapterPropertyDto();
basicOutputAdapterPropertyDto.setKey(adapterProperties.get(i).getKey());
basicOutputAdapterPropertyDto.setValue(adapterProperties.get(i).getValue());
basicOutputAdapterPropertyDtos[i] = basicOutputAdapterPropertyDto;
}
return basicOutputAdapterPropertyDtos;
}
private org.wso2.carbon.event.publisher.stub.types.EventMappingPropertyDto[] addPublisherMappingToDto
(List<MappingProperty> mapProperties) {
org.wso2.carbon.event.publisher.stub.types.EventMappingPropertyDto[] eventMappingPropertyDtos
= new org.wso2.carbon.event.publisher.stub.types.EventMappingPropertyDto[mapProperties.size()];
for (int i = 0; i < mapProperties.size(); i++) {
org.wso2.carbon.event.publisher.stub.types.EventMappingPropertyDto eventMappingPropertyDto
= new org.wso2.carbon.event.publisher.stub.types.EventMappingPropertyDto();
eventMappingPropertyDto.setName(mapProperties.get(i).getName());
eventMappingPropertyDto.setType(mapProperties.get(i).getType());
eventMappingPropertyDto.setValueOf(mapProperties.get(i).getValueOf());
eventMappingPropertyDtos[i] = eventMappingPropertyDto;
}
return eventMappingPropertyDtos;
}
private void cleanup(Stub stub) {
if (stub != null) {
try {
stub.cleanup();
} catch (AxisFault axisFault) {
log.warn("Failed to clean the stub " + stub.getClass());
}
}
}
}

@ -56,6 +56,7 @@ import org.wso2.carbon.device.mgt.core.service.GroupManagementProviderService;
import org.wso2.carbon.device.mgt.jaxrs.beans.ErrorResponse;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.EventAttributeList;
import org.wso2.carbon.device.mgt.jaxrs.service.impl.util.InputValidationException;
import org.wso2.carbon.event.processor.stub.EventProcessorAdminServiceStub;
import org.wso2.carbon.event.publisher.stub.EventPublisherAdminServiceStub;
import org.wso2.carbon.event.receiver.stub.EventReceiverAdminServiceStub;
import org.wso2.carbon.event.stream.stub.EventStreamAdminServiceStub;
@ -109,6 +110,7 @@ public class DeviceMgtAPIUtils {
private static final String EVENT_PUBLISHER_CONTEXT = "EventPublisherAdminService/";
private static final String EVENT_STREAM_CONTEXT = "EventStreamAdminService/";
private static final String EVENT_PERSISTENCE_CONTEXT = "EventStreamPersistenceAdminService/";
private static final String EVENT_PROCESSOR_CONTEXT = "EventProcessorAdminService";
private static final String AUTHORIZATION_HEADER = "Authorization";
private static final String AUTHORIZATION_HEADER_VALUE = "Bearer";
public static final String DAS_PORT = "${iot.analytics.https.port}";
@ -637,6 +639,49 @@ public class DeviceMgtAPIUtils {
return eventStreamPersistenceAdminServiceStub;
}
/**
*
* @return
* @throws AxisFault
* @throws UserStoreException
* @throws JWTClientException
*/
public static EventProcessorAdminServiceStub getEventProcessorAdminServiceStub()
throws AxisFault, UserStoreException, JWTClientException {
// Send alert to event-processing
EventProcessorAdminServiceStub eventProcessorAdminServiceStub = new EventProcessorAdminServiceStub(
Utils.replaceSystemProperty(DAS_ADMIN_SERVICE_EP + EVENT_PROCESSOR_CONTEXT));
Options eventProcessorOption = eventProcessorAdminServiceStub._getServiceClient().getOptions();
if (eventProcessorOption == null) {
eventProcessorOption = new Options();
}
// Get the tenant Domain
int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId(true);
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
String username = PrivilegedCarbonContext.getThreadLocalCarbonContext().getUserRealm()
.getRealmConfiguration().getAdminUserName() + "@" + tenantDomain;
// Create the SSL context with the loaded TrustStore/keystore.
JWTClient jwtClient = getJWTClientManagerService().getJWTClient();
String authValue = AUTHORIZATION_HEADER_VALUE + " " + new String(Base64.encodeBase64(
jwtClient.getJwtToken(username).getBytes()));
List<Header> list = new ArrayList<>();
Header httpHeader = new Header();
httpHeader.setName(AUTHORIZATION_HEADER);
httpHeader.setValue(authValue);
list.add(httpHeader);//"https"
eventProcessorOption.setProperty(HTTPConstants.HTTP_HEADERS, list);
eventProcessorOption.setProperty(HTTPConstants.CUSTOM_PROTOCOL_HANDLER
, new Protocol(DEFAULT_HTTP_PROTOCOL
, (ProtocolSocketFactory) new SSLProtocolSocketFactory(sslContext)
, Integer.parseInt(Utils.replaceSystemProperty(DAS_PORT))));
eventProcessorAdminServiceStub._getServiceClient().setOptions(eventProcessorOption);
return eventProcessorAdminServiceStub;
}
/**
* This method is used to create the Cache that holds the event definition of the device type..
*

@ -46,6 +46,7 @@
<ref bean="deviceEventManagementService"/>
<ref bean="deviceAgentService"/>
<ref bean="swaggerResource"/>
<ref bean="analyticsManagementService"/>
</jaxrs:serviceBeans>
<jaxrs:providers>
<ref bean="jsonProvider"/>
@ -93,6 +94,7 @@
<bean id="deviceEventManagementService" class="org.wso2.carbon.device.mgt.jaxrs.service.impl.DeviceEventManagementServiceImpl"/>
<bean id="deviceAgentService" class="org.wso2.carbon.device.mgt.jaxrs.service.impl.DeviceAgentServiceImpl"/>
<bean id="jsonProvider" class="org.wso2.carbon.device.mgt.jaxrs.common.GsonMessageBodyHandler"/>
<bean id="analyticsManagementService" class="org.wso2.carbon.device.mgt.jaxrs.service.impl.AnalyticsManagementServiceImpl"/>
<!--<bean id="errorHandler" class="org.wso2.carbon.device.mgt.jaxrs.common.ErrorHandler"/>-->
<cxf:bus>

Loading…
Cancel
Save