Update the Analytics Artifacts Management API

merge-requests/313/head
Yohan Avishke 5 years ago committed by Dharmakeerthi Lasantha
parent 032746c1aa
commit 4557c5fc5c

@ -29,10 +29,13 @@ public class Adapter {
@ApiModelProperty(value = "Attached stream name:version")
private String eventStreamWithVersion;
@ApiModelProperty(value = "Adapter type")
private AdapterType adapterType;
private TransportType adapterType;
@ApiModelProperty(value = "Adapter main configurations")
private AdapterConfiguration adapterConfiguration;
@ApiModelProperty(value = "Adapter definition", notes = "use only when creating adapter as a String")
private String definition;
public String getAdapterName() {
return adapterName;
}
@ -49,11 +52,11 @@ public class Adapter {
this.eventStreamWithVersion = eventStreamWithVersion;
}
public AdapterType getAdapterType() {
public TransportType getAdapterType() {
return adapterType;
}
public void setAdapterType(AdapterType adapterType) {
public void setAdapterType(TransportType adapterType) {
this.adapterType = adapterType;
}
@ -65,4 +68,12 @@ public class Adapter {
AdapterConfiguration adapterConfiguration) {
this.adapterConfiguration = adapterConfiguration;
}
public String getDefinition() {
return definition;
}
public void setDefinition(String definition) {
this.definition = definition;
}
}

@ -18,6 +18,8 @@
package org.wso2.carbon.device.mgt.jaxrs.beans.analytics;
import io.swagger.annotations.ApiModelProperty;
import java.util.ArrayList;
import java.util.List;
/**
@ -30,15 +32,15 @@ public class AdapterMappingConfiguration {
@ApiModelProperty(value = "Input mapping for json,text and xml mappings")
private String inputMappingString;
@ApiModelProperty(value = "Input mapping for json,map and xml mappings")
private List<MappingProperty> inputMappingProperties;
private List<MappingProperty> inputMappingProperties = new ArrayList<>();
@ApiModelProperty(value = "Name-scape mapping for xml mapping")
private List<MappingProperty> namespaceMappingProperties;
private List<MappingProperty> namespaceMappingProperties = new ArrayList<>();
@ApiModelProperty(value = "Correlation mapping for wso2 mapping")
private List<MappingProperty> correlationMappingProperties;
private List<MappingProperty> correlationMappingProperties = new ArrayList<>();
@ApiModelProperty(value = "Payload mapping for wso2 mapping")
private List<MappingProperty> payloadMappingProperties;
private List<MappingProperty> payloadMappingProperties = new ArrayList<>();
@ApiModelProperty(value = "Meta mapping for wso2 mapping")
private List<MappingProperty> metaMappingProperties;
private List<MappingProperty> metaMappingProperties = new ArrayList<>();
public MessageFormat getMessageFormat() {
return messageFormat;

@ -1,30 +0,0 @@
/*
* Copyright (c) 2019, Entgra (pvt) Ltd. (http://entgra.io) All Rights Reserved.
*
* Entgra (pvt) Ltd. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.jaxrs.beans.analytics;
/**
* Available adapter types
*/
public enum AdapterType {
EMAIL, FILE_TAIL, HTTP, IOT_EVENT, JMS, KAFKA, MQTT, OAUTH_MQTT, SOAP, WEBSOCKET, WEBSOCKET_LOCAL,
WSO2_EVENT, XMPP, UI, RDBMS, SECURED_WEBSOCKET, CASSANDRA, LOGGER;
public String toStringFormatted() {
return super.toString().toLowerCase().replace("_", "-");
}
}

@ -1,34 +0,0 @@
/*
* Copyright (c) 2019, Entgra (pvt) Ltd. (http://entgra.io) All Rights Reserved.
*
* Entgra (pvt) Ltd. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.jaxrs.beans.analytics;
import io.swagger.annotations.ApiModelProperty;
public class EventPublisher {
@ApiModelProperty(value = "Publisher definition")
private String definition;
public String getDefinition() {
return definition;
}
public void setDefinition(String definition) {
this.definition = definition;
}
}

@ -1,34 +0,0 @@
/*
* Copyright (c) 2019, Entgra (pvt) Ltd. (http://entgra.io) All Rights Reserved.
*
* Entgra (pvt) Ltd. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.jaxrs.beans.analytics;
import io.swagger.annotations.ApiModelProperty;
public class EventReceiver {
@ApiModelProperty(value = "Stream definition")
private String definition;
public String getDefinition() {
return definition;
}
public void setDefinition(String definition) {
this.definition = definition;
}
}

@ -17,10 +17,9 @@
*/
package org.wso2.carbon.device.mgt.jaxrs.beans.analytics;
import com.sun.istack.NotNull;
import io.swagger.annotations.ApiModelProperty;
import org.hibernate.validator.constraints.NotEmpty;
import java.util.ArrayList;
import java.util.List;
/**
@ -37,12 +36,13 @@ public class EventStream {
@ApiModelProperty(value = "Stream description")
private String description;
@ApiModelProperty(value = "Meta attribute list")
private List<Attribute> metaData;
private List<Attribute> metaData = new ArrayList<>();
@ApiModelProperty(value = "Correlation attribute list")
private List<Attribute> correlationData;
private List<Attribute> correlationData = new ArrayList<>();
@ApiModelProperty(value = "Payload attribute list")
private List<Attribute> payloadData;
@ApiModelProperty(value = "Stream definition") @NotNull @NotEmpty
private List<Attribute> payloadData = new ArrayList<>();
@ApiModelProperty(value = "Stream definition" , notes = "use only when creating stream as a String")
private String definition;
public String getName() {

@ -21,6 +21,11 @@ package org.wso2.carbon.device.mgt.jaxrs.beans.analytics;
* This hold the default transport types support by the server.
*/
public enum TransportType {
HTTP, MQTT;
EMAIL, FILE_TAIL, HTTP, IOT_EVENT, JMS, KAFKA, MQTT, OAUTH_MQTT, SOAP, WEBSOCKET, WEBSOCKET_LOCAL,
WSO2_EVENT, XMPP, UI, RDBMS, SECURED_WEBSOCKET, CASSANDRA, LOGGER;
public String toStringFormatted() {
return super.toString().toLowerCase().replace("_", "-");
}
}

@ -28,11 +28,11 @@ public class InvalidExecutionPlanException extends WebApplicationException {
private static final long serialVersionUID = 7583096344745990515L;
public InvalidExecutionPlanException(ErrorResponse error) {
super(Response.status(Response.Status.NOT_FOUND).entity(error).build());
super(Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(error).build());
}
public InvalidExecutionPlanException(ErrorDTO errorDTO) {
super(Response.status(Response.Status.NOT_FOUND)
super(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity(errorDTO)
.header(Constants.DeviceConstants.HEADER_CONTENT_TYPE, Constants.DeviceConstants.APPLICATION_JSON)
.build());

@ -17,14 +17,13 @@
*/
package org.wso2.carbon.device.mgt.jaxrs.service.api;
import io.swagger.annotations.ApiParam;
import org.wso2.carbon.apimgt.annotations.api.Scope;
import org.wso2.carbon.apimgt.annotations.api.Scopes;
import org.wso2.carbon.device.mgt.jaxrs.beans.ErrorResponse;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.Adapter;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.EventStream;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.SiddhiExecutionPlan;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.EventPublisher;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.EventReceiver;
import org.wso2.carbon.device.mgt.jaxrs.util.Constants;
import io.swagger.annotations.Api;
@ -91,17 +90,17 @@ import javax.ws.rs.core.Response;
}
)
@Path("/analytics/artifacts")
@Api(value = "Analytics Artifacts Management", description = "This API corresponds to services" +
" related to Analytics Artifacts management")
@Path("/analytics/artifacts")
@Consumes(MediaType.APPLICATION_JSON)
public interface AnalyticsArtifactsManagementService {
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Path("/stream/{id}")
@ApiOperation(
httpMethod = "POST",
value = "Create Event Stream Artifact as String",
value = "Create Event Stream Artifact through a String argument.",
notes = "Deploy a Json Stream Artifact in Analytics server.",
tags = "Analytics Artifacts Management",
extensions = {
@ -143,16 +142,20 @@ public interface AnalyticsArtifactsManagementService {
response = ErrorResponse.class)
}
)
Response deployEventDefinitionAsString(@PathParam("id") String id,
Response deployEventDefinitionAsString(
@ApiParam(name = "id", value = "Stream id(name:version).")
@PathParam("id") String id,
@ApiParam(name = "isEdited", value = "This stream is being edited or created.")
@QueryParam("isEdited") boolean isEdited,
@ApiParam(name = "stream", value = "Add the data to complete the EventStream object.",
required = true)
@Valid EventStream stream);
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Path("/stream")
@ApiOperation(
httpMethod = "POST",
value = "Create Event Stream Artifact as DTO",
value = "Create Event Stream Artifact through a DTO class.",
notes = "Deploy a Json Stream Artifact in Analytics server.",
tags = "Analytics Artifacts Management",
extensions = {
@ -194,14 +197,16 @@ public interface AnalyticsArtifactsManagementService {
response = ErrorResponse.class)
}
)
Response deployEventDefinitionAsDto(@Valid EventStream stream);
Response deployEventDefinitionAsDto(
@ApiParam(name = "stream", value = "Add the data to complete the EventStream object.",
required = true)
@Valid EventStream stream);
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Path("/receiver/{name}")
@ApiOperation(
httpMethod = "POST",
value = "Create Event Receiver Artifact as String",
value = "Create Event Receiver Artifact through a String argument.",
notes = "Deploy a XML Event Receiver Artifact in Analytics server.",
tags = "Analytics Artifacts Management",
extensions = {
@ -244,16 +249,20 @@ public interface AnalyticsArtifactsManagementService {
response = ErrorResponse.class)
}
)
Response deployEventReceiverAsString(@PathParam("name") String name,
Response deployEventReceiverAsString(
@ApiParam(name = "name", value = "Receiver name.")
@PathParam("name") String name,
@ApiParam(name = "isEdited", value = "This stream is being edited or created.")
@QueryParam("isEdited") boolean isEdited,
@Valid EventReceiver receiver);
@ApiParam(name = "receiver", value = "Add the data to complete the EventReceiver object.",
required = true)
@Valid Adapter receiver);
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Path("/receiver")
@ApiOperation(
httpMethod = "POST",
value = "Create Event Receiver Artifact as DTO",
value = "Create Event Receiver Artifact through a DTO class.",
notes = "Deploy a JSON Event Receiver Artifact in Analytics server.",
tags = "Analytics Artifacts Management",
extensions = {
@ -296,14 +305,16 @@ public interface AnalyticsArtifactsManagementService {
response = ErrorResponse.class)
}
)
Response deployEventReceiverAsDto(@Valid Adapter receiver);
Response deployEventReceiverAsDto(
@ApiParam(name = "receiver", value = "Add the data to complete the Adapter object.",
required = true)
@Valid Adapter receiver);
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Path("/publisher/{name}")
@ApiOperation(
httpMethod = "POST",
value = "Create Event Publisher Artifact as String",
value = "Create Event Publisher Artifact through a String argument.",
notes = "Deploy a XML Event Publisher Artifact in Analytics server.",
tags = "Analytics Artifacts Management",
extensions = {
@ -346,16 +357,20 @@ public interface AnalyticsArtifactsManagementService {
response = ErrorResponse.class)
}
)
Response deployEventPublisherAsString(@PathParam("name") String name,
Response deployEventPublisherAsString(
@ApiParam(name = "name", value = "Publisher name.")
@PathParam("name") String name,
@ApiParam(name = "isEdited", value = "This stream is being edited or created.")
@QueryParam("isEdited") boolean isEdited,
@Valid EventPublisher publisher);
@ApiParam(name = "publisher", value = "Add the data to complete the EventPublisher object.",
required = true)
@Valid Adapter publisher);
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Path("/publisher")
@ApiOperation(
httpMethod = "POST",
value = "Create Event Publisher Artifact as DTO",
value = "Create Event Publisher Artifact through a DTO class.",
notes = "Deploy a JSON Event Publisher Artifact in Analytics server.",
tags = "Analytics Artifacts Management",
extensions = {
@ -398,15 +413,17 @@ public interface AnalyticsArtifactsManagementService {
response = ErrorResponse.class)
}
)
Response deployEventPublisherAsDto(@Valid Adapter publisher);
Response deployEventPublisherAsDto(
@ApiParam(name = "publisher", value = "Add the data to complete the Adapter object.",
required = true)
@Valid Adapter publisher);
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Path("/siddhi-script/{name}")
@ApiOperation(
httpMethod = "POST",
value = "Create Siddhi Script Artifact as String",
notes = "Deploy a SiddhiQL Siddhi script Artifact in Analytics server.",
value = "Create Siddhi Script Artifact through a String argument.",
notes = "Deploy a SiddhiQL script Artifact in Analytics server.",
tags = "Analytics Artifacts Management",
extensions = {
@Extension(properties = {
@ -449,6 +466,11 @@ public interface AnalyticsArtifactsManagementService {
}
)
Response deploySiddhiExecutableScript(
@PathParam("name") String name, @QueryParam("isEdited") boolean isEdited,
@ApiParam(name = "name", value = "Siddhi Executable Script name.")
@PathParam("name") String name,
@ApiParam(name = "isEdited", value = "This stream is being edited or created.")
@QueryParam("isEdited") boolean isEdited,
@ApiParam(name = "plan", value = "Add the data to complete the SiddhiExecutionPlan object.",
required = true)
@Valid SiddhiExecutionPlan plan);
}

@ -18,6 +18,7 @@
package org.wso2.carbon.device.mgt.jaxrs.service.impl;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.jaxrs.beans.ErrorResponse;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.Attribute;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.AdapterMappingConfiguration;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.MappingProperty;
@ -25,8 +26,7 @@ import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.AdapterConfiguration;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.AdapterProperty;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.MessageFormat;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.SiddhiExecutionPlan;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.EventPublisher;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.EventReceiver;
import org.wso2.carbon.device.mgt.jaxrs.exception.BadRequestException;
import org.wso2.carbon.device.mgt.jaxrs.exception.ErrorDTO;
import org.wso2.carbon.device.mgt.jaxrs.exception.InvalidExecutionPlanException;
import org.wso2.carbon.device.mgt.jaxrs.service.api.AnalyticsArtifactsManagementService;
@ -58,7 +58,6 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import java.nio.charset.StandardCharsets;
import java.rmi.RemoteException;
import java.util.List;
@ -67,14 +66,10 @@ import java.util.List;
* siddhi scripts to the Analytics server as Artifacts
*/
@Path("/analytics/artifacts")
public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifactsManagementService {
public class AnalyticsArtifactsManagementServiceImpl
implements AnalyticsArtifactsManagementService {
private static final Log log = LogFactory.getLog(AnalyticsArtifactsManagementServiceImpl.class);
/**
* @param stream EventStream object with the properties of the stream
* @return A status code depending on the code result
* Function - Used to deploy stream as an artifact using a String
*/
@Override
@POST
@Path("/stream/{id}")
@ -84,90 +79,70 @@ public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifac
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
EventStreamAdminServiceStub eventStreamAdminServiceStub = null;
try {
String streamDefinition = new String(stream.getDefinition().getBytes(), StandardCharsets.UTF_8);
String streamDefinition = stream.getDefinition();
eventStreamAdminServiceStub = DeviceMgtAPIUtils.getEventStreamAdminServiceStub();
if (!isEdited) {
eventStreamAdminServiceStub.addEventStreamDefinitionAsString(streamDefinition);
} else {
// Find and edit stream
if (eventStreamAdminServiceStub.getStreamDetailsForStreamId(id) != null) {
eventStreamAdminServiceStub.editEventStreamDefinitionAsString(streamDefinition, id);
}
}
return Response.ok().build();
} catch (AxisFault e) {
log.error("Failed to create event definitions for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
String errMsg = "Failed to create event definitions for tenantDomain: " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
} catch (RemoteException e) {
log.error("Failed to connect with the remote services for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
String errMsg = "Failed to connect with the remote services for tenantDomain: " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
} catch (JWTClientException e) {
log.error("Failed to generate jwt token for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
String errMsg = "Failed to generate jwt token for tenantDomain: " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
} catch (UserStoreException e) {
log.error("Failed to connect with the user store for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
String errMsg = "Failed to connect with the user store for tenantDomain: " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
} finally {
cleanup(eventStreamAdminServiceStub);
}
}
/**
* @param stream EventStream object with the properties of the stream
* @return A status code depending on the code result
* Function - Used to deploy stream as an artifact using a DTO
*/
@Override
@POST
@Path("/stream")
public Response deployEventDefinitionAsDto(@Valid EventStream stream) {
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
// Categorize attributes to three lists depending on their type
List<Attribute> metaData = stream.getMetaData();
List<Attribute> payloadData = stream.getPayloadData();
List<Attribute> correlationData = stream.getCorrelationData();
try {
/* Conditions
* - At least one list should always be not null
*/
if (metaData == null && correlationData == null && payloadData == null) {
log.error("Invalid payload: event attributes");
return Response.status(Response.Status.BAD_REQUEST).build();
} else {
// Publish the event stream
publishStream(stream, metaData, correlationData, payloadData);
deployStream(stream);
return Response.ok().build();
}
} catch (AxisFault e) {
log.error("Failed to create event definitions for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
String errMsg = "Failed to create event definitions for tenant " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
} catch (RemoteException e) {
log.error("Failed to connect with the remote services for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
String errMsg = "Failed to connect with the remote services for tenant " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
} catch (JWTClientException e) {
log.error("Failed to generate jwt token for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
String errMsg = "Failed to generate jwt token for tenant " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
} catch (UserStoreException e) {
log.error("Failed to connect with the user store for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
String errMsg = "Failed to connect with the user store for tenant " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
}
}
/**
* @param name Receiver name
* @param isEdited If receiver is created or edited
* @param receiver Receiver object with the properties of the receiver
* @return A status code depending on the code result
* Function - Used to deploy receiver as an artifact using a String
*/
@Override
@POST
@Path("/receiver/{name}")
public Response deployEventReceiverAsString(@PathParam("name") String name,
@QueryParam("isEdited") boolean isEdited,
@Valid EventReceiver receiver) {
@Valid Adapter receiver) {
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
EventReceiverAdminServiceStub eventReceiverAdminServiceStub;
try {
@ -178,202 +153,128 @@ public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifac
} else {
eventReceiverAdminServiceStub.editActiveEventReceiverConfiguration(receiverDefinition, name);
}
return Response.ok().build();
} catch (AxisFault e) {
log.error("Failed to create event definitions for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
String errMsg = "Failed to create event definitions for tenantDomain: " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
} catch (RemoteException e) {
log.error("Failed to connect with the remote services for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
String errMsg = "Failed to connect with the remote services for tenantDomain: " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
} catch (JWTClientException e) {
log.error("Failed to generate jwt token for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
String errMsg = "Failed to generate jwt token for tenantDomain: " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
} catch (UserStoreException e) {
log.error("Failed to connect with the user store for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
String errMsg = "Failed to connect with the user store for tenantDomain: " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
}
return Response.ok().entity(name).build();
}
/**
* @param receiver Receiver object with the properties of the receiver
* @return A status code depending on the code result
* Function - Used to deploy receiver as an artifact using a DTO
*/
@Override
@POST
@Path("/receiver")
public Response deployEventReceiverAsDto(@Valid Adapter receiver) {
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
String receiverName = receiver.getAdapterName();
String adapterType = receiver.getAdapterType().toStringFormatted();
AdapterConfiguration adapterConfiguration = receiver.getAdapterConfiguration();
AdapterMappingConfiguration adapterMappingConfiguration = adapterConfiguration.getAdapterMappingConfiguration();
try {
List<AdapterProperty> adapterProperties = adapterConfiguration.getAdapterProperties();
if (adapterProperties == null) {
log.error("Invalid attribute payload");
return Response.status(Response.Status.BAD_REQUEST).build();
}
AdapterConfiguration adapterConfiguration = receiver.getAdapterConfiguration();
boolean customMapping = adapterConfiguration.isCustomMappingEnabled();
List<MappingProperty> inputMappingProperties = adapterMappingConfiguration.getInputMappingProperties();
List<MappingProperty> namespaceMappingProperties = adapterMappingConfiguration.getNamespaceMappingProperties();
List<MappingProperty> correlationMappingProperties = adapterMappingConfiguration.getCorrelationMappingProperties();
List<MappingProperty> payloadMappingProperties = adapterMappingConfiguration.getPayloadMappingProperties();
List<MappingProperty> metaMappingProperties = adapterMappingConfiguration.getMetaMappingProperties();
MessageFormat messageFormat = adapterMappingConfiguration.getMessageFormat();
/*
* Conditions
* - if CustomMappingEnabled check validity of property lists
* - if both inputMappingProperties and namespaceMappingProperties null check remaining property lists
* - if all correlationMappingProperties, payloadMappingProperties, metaMappingProperties null log error
* - if message format is null change the final result to TRUE
* - else continue
* */
if ((customMapping &&
(inputMappingProperties == null && namespaceMappingProperties == null) &&
(correlationMappingProperties == null && payloadMappingProperties == null &&
metaMappingProperties == null)) || messageFormat == null) {
String errMsg = "Invalid mapping payload";
log.error(errMsg);
return Response.status(Response.Status.BAD_REQUEST).entity(errMsg).build();
validateAdapterProperties(adapterConfiguration.getAdapterProperties());
if (customMapping) {
validateAdapterMapping(adapterConfiguration.getAdapterMappingConfiguration());
}
String eventStreamWithVersion = receiver.getEventStreamWithVersion();
publishReceiver(receiverName, adapterType, adapterProperties, customMapping, inputMappingProperties,
namespaceMappingProperties, correlationMappingProperties, payloadMappingProperties,
metaMappingProperties, messageFormat, eventStreamWithVersion);
deployReceiver(receiver, customMapping, adapterConfiguration);
return Response.ok().build();
} catch (AxisFault e) {
log.error("Failed to create event definitions for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
String errMsg = "Failed to create event definitions for tenantDomain: " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
} catch (RemoteException e) {
log.error("Failed to connect with the remote services for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
String errMsg = "Failed to connect with the remote services for tenantDomain: " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
} catch (JWTClientException e) {
log.error("Failed to generate jwt token for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
String errMsg = "Failed to generate jwt token for tenantDomain: " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
} catch (UserStoreException e) {
log.error("Failed to connect with the user store for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
String errMsg = "Failed to connect with the user store for tenantDomain: " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
}
}
/**
* @param name Publisher name
* @param isEdited If receiver is created or edited
* @param publisher Publisher object with the properties of the publisher
* @return A status code depending on the code result
* Function - Used to deploy publisher as an artifact using a String
*/
@Override
@POST
@Path("/publisher/{name}")
public Response deployEventPublisherAsString(@PathParam("name") String name,
@QueryParam("isEdited") boolean isEdited,
@Valid EventPublisher publisher) {
@Valid Adapter publisher) {
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
EventPublisherAdminServiceStub eventPublisherAdminServiceStub;
try {
String publisherDefinition = publisher.getDefinition();
eventPublisherAdminServiceStub = DeviceMgtAPIUtils.getEventPublisherAdminServiceStub();
if (!isEdited) {
eventPublisherAdminServiceStub.deployEventPublisherConfiguration(publisherDefinition);
} else {
eventPublisherAdminServiceStub.editActiveEventPublisherConfiguration(publisherDefinition, name);
}
return Response.ok().build();
} catch (AxisFault e) {
log.error("Failed to create event definitions for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
String errMsg = "Failed to create event definitions for tenantDomain: " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
} catch (RemoteException e) {
log.error("Failed to connect with the remote services for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
String errMsg = "Failed to connect with the remote services for tenantDomain: " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
} catch (JWTClientException e) {
log.error("Failed to generate jwt token for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
String errMsg = "Failed to generate jwt token for tenantDomain: " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
} catch (UserStoreException e) {
log.error("Failed to connect with the user store for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
String errMsg = "Failed to connect with the user store for tenantDomain: " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
}
return Response.ok().entity(publisher).build();
}
/**
* @param publisher Publisher object with the properties of the publisher
* @return A status code depending on the code result
* Function - Used to deploy publisher as an artifact using a DTO
*/
@Override
@POST
@Path("/publisher")
public Response deployEventPublisherAsDto(@Valid Adapter publisher) {
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
String publisherName = publisher.getAdapterName();
String adapterType = publisher.getAdapterType().toStringFormatted();
AdapterConfiguration adapterConfiguration = publisher.getAdapterConfiguration();
AdapterMappingConfiguration adapterMappingConfiguration = adapterConfiguration.getAdapterMappingConfiguration();
try {
List<AdapterProperty> adapterProperties = adapterConfiguration.getAdapterProperties();
if (adapterProperties == null) {
log.error("Invalid attribute payload");
return Response.status(Response.Status.BAD_REQUEST).build();
}
validateAdapterProperties(adapterConfiguration.getAdapterProperties());
boolean customMapping = adapterConfiguration.isCustomMappingEnabled();
String inputMappingString = adapterMappingConfiguration.getInputMappingString();
List<MappingProperty> inputMappingProperties = adapterMappingConfiguration.getInputMappingProperties();
List<MappingProperty> correlationMappingProperties = adapterMappingConfiguration.getCorrelationMappingProperties();
List<MappingProperty> payloadMappingProperties = adapterMappingConfiguration.getPayloadMappingProperties();
List<MappingProperty> metaMappingProperties = adapterMappingConfiguration.getMetaMappingProperties();
MessageFormat messageFormat = adapterMappingConfiguration.getMessageFormat();
/*
* Conditions
* - if CustomMappingEnabled check validity of property lists
* - if all correlationMappingProperties, payloadMappingProperties, metaMappingProperties null log error
* - if message format is null change the final result to TRUE
* - else continue
*/
if ((customMapping &&
(inputMappingProperties == null && inputMappingString == null) &&
(correlationMappingProperties == null && payloadMappingProperties == null &&
metaMappingProperties == null)) || messageFormat == null) {
String errMsg = "Invalid mapping payload";
log.error(errMsg);
return Response.status(Response.Status.BAD_REQUEST).entity(errMsg).build();
if (customMapping) {
validateAdapterMapping(adapterConfiguration.getAdapterMappingConfiguration());
}
String eventStreamWithVersion = publisher.getEventStreamWithVersion();
publishPublisher(publisherName, adapterType, adapterProperties, customMapping
, inputMappingString, inputMappingProperties, correlationMappingProperties
, payloadMappingProperties, metaMappingProperties, messageFormat
, eventStreamWithVersion);
deployPublisher(publisher, customMapping, adapterConfiguration);
return Response.ok().build();
} catch (AxisFault e) {
log.error("Failed to create event definitions for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
String errMsg = "Failed to create event definitions for tenantDomain: " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
} catch (RemoteException e) {
log.error("Failed to connect with the remote services for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
String errMsg = "Failed to connect with the remote services for tenantDomain: " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
} catch (JWTClientException e) {
log.error("Failed to generate jwt token for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
String errMsg = "Failed to generate jwt token for tenantDomain: " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
} catch (UserStoreException e) {
log.error("Failed to connect with the user store for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
String errMsg = "Failed to connect with the user store for tenantDomain: " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
}
}
/**
* @param name Siddhi plan name
* @param isEdited If receiver is created or edited
* @param plan Siddhi plan definition
* @return a status code depending on the code execution
* Function - Used to deploy Siddhi script as an artifact using a String
*/
@Override
@POST
@Path("/siddhi-script/{name}")
@ -385,38 +286,46 @@ public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifac
publishSiddhiExecutionPlan(name, isEdited, plan.getDefinition());
return Response.ok().build();
} catch (AxisFault e) {
log.error("Failed to create event definitions for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
String errMsg = "Failed to create event definitions for tenantDomain: " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
} catch (RemoteException e) {
log.error("Failed to connect with the remote services for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} catch (InvalidExecutionPlanException e) {
log.error("Invalid Execution plan: " + tenantDomain, e);
return e.getResponse();
String errMsg = "Failed to connect with the remote services for tenantDomain: " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
} catch (JWTClientException e) {
log.error("Failed to generate jwt token for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
String errMsg = "Failed to generate jwt token for tenantDomain: " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
} catch (UserStoreException e) {
log.error("Failed to connect with the user store for tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
String errMsg = "Failed to connect with the user store for tenantDomain: " + tenantDomain;
log.error(errMsg, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
}
}
/**
* Set data to a Stream dto and deploy dto through a stub
*
* @param stream Stream definition
* @param metaData Meta attributes of the stream
* @param correlationData Correlation attributes of the stream
* @param payloadData Payload attributes of the stream
* @throws RemoteException exception that may occur during a remote method call
* @throws UserStoreException exception that may occur during JWT token generation
* @throws JWTClientException exception that may occur during connecting to client store
*/
private void publishStream(EventStream stream, List<Attribute> metaData,
List<Attribute> correlationData, List<Attribute> payloadData)
private void deployStream(EventStream stream)
throws RemoteException, UserStoreException, JWTClientException {
EventStreamAdminServiceStub eventStreamAdminServiceStub =
DeviceMgtAPIUtils.getEventStreamAdminServiceStub();
EventStreamAdminServiceStub eventStreamAdminServiceStub = null;
List<Attribute> metaData = stream.getMetaData();
List<Attribute> payloadData = stream.getPayloadData();
List<Attribute> correlationData = stream.getCorrelationData();
if (metaData.isEmpty() && correlationData.isEmpty() && payloadData.isEmpty()) {
String errMsg = String.format("Failed to validate Stream property attributes of %s:%s",
stream.getName(), stream.getVersion());
ErrorResponse errorResponse = new ErrorResponse();
errorResponse.setMessage(errMsg);
log.error(errMsg);
throw new BadRequestException(errorResponse);
}
try {
eventStreamAdminServiceStub = DeviceMgtAPIUtils.getEventStreamAdminServiceStub();
EventStreamDefinitionDto eventStreamDefinitionDto = new EventStreamDefinitionDto();
eventStreamDefinitionDto.setName(stream.getName());
eventStreamDefinitionDto.setVersion(stream.getVersion());
@ -425,6 +334,7 @@ public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifac
eventStreamDefinitionDto.setMetaData(addEventAttributesToDto(metaData));
eventStreamDefinitionDto.setPayloadData(addEventAttributesToDto(payloadData));
eventStreamDefinitionDto.setCorrelationData(addEventAttributesToDto(correlationData));
String streamId = stream.getName() + ":" + stream.getVersion();
if (eventStreamAdminServiceStub.getStreamDefinitionDto(streamId) != null) {
eventStreamAdminServiceStub.editEventStreamDefinitionAsDto(eventStreamDefinitionDto, streamId);
@ -438,55 +348,40 @@ public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifac
}
/**
* @param receiverName Receiver name
* @param adapterType Receiver type
* @param adapterProperties Receiver properties
* @param customMapping Is receiver mapped
* @param inputMappingProperties Receiver input attribute mapping
* @param namespaceMappingProperties Receiver name-scape attribute mapping
* @param correlationMappingProperties Receiver correlation attribute mapping
* @param payloadMappingProperties Receiver payload attribute mapping
* @param metaMappingProperties Receiver meta attribute mapping
* @param messageFormat Receiver mapping format
* @param eventStreamWithVersion Attached stream
* @throws RemoteException exception that may occur during a remote method call
* @throws UserStoreException exception that may occur during JWT token generation
* @throws JWTClientException exception that may occur during connecting to client store
* Set data to a receiver dto and deploy dto through a stub
*
* @param receiver Event Receiver adapter
* @param customMapping Is Receiver mapped
* @param adapterConfiguration Adapter property and mapping configuration
* @throws RemoteException Exception that may occur during a remote method call
* @throws UserStoreException Exception that may occur during JWT token generation
* @throws JWTClientException Exception that may occur during connecting to client store
*/
private void publishReceiver(String receiverName, String adapterType,
List<AdapterProperty> adapterProperties, boolean customMapping,
List<MappingProperty> inputMappingProperties,
List<MappingProperty> namespaceMappingProperties,
List<MappingProperty> correlationMappingProperties,
List<MappingProperty> payloadMappingProperties,
List<MappingProperty> metaMappingProperties,
MessageFormat messageFormat,
String eventStreamWithVersion)
private void deployReceiver(Adapter receiver, boolean customMapping,
AdapterConfiguration adapterConfiguration)
throws RemoteException, UserStoreException, JWTClientException {
EventReceiverAdminServiceStub eventReceiverAdminServiceStub = DeviceMgtAPIUtils.getEventReceiverAdminServiceStub();
try {
String receiverName = receiver.getAdapterName();
String adapterType = receiver.getAdapterType().toStringFormatted();
String eventStreamWithVersion = receiver.getEventStreamWithVersion();
List<AdapterProperty> adapterProperties = adapterConfiguration.getAdapterProperties();
EventReceiverConfigurationDto eventReceiverConfigurationDto = eventReceiverAdminServiceStub
.getActiveEventReceiverConfiguration(receiverName);
// Check if adapter already exists, if so un-deploy it
if (eventReceiverConfigurationDto != null) {
eventReceiverAdminServiceStub.undeployActiveEventReceiverConfiguration(receiverName);
}
BasicInputAdapterPropertyDto[] basicInputAdapterPropertyDtos = addReceiverConfigToDto(adapterProperties);
// Adding attribute properties to DTOs
BasicInputAdapterPropertyDto[] basicInputAdapterPropertyDtos =
addReceiverConfigToDto(adapterProperties);
if (eventReceiverAdminServiceStub.getActiveEventReceiverConfiguration(receiverName) == null) {
// Call stub deploy methods according to the message format
AdapterMappingConfiguration adapterMappingConfiguration = adapterConfiguration.getAdapterMappingConfiguration();
MessageFormat messageFormat = adapterMappingConfiguration.getMessageFormat();
if (!messageFormat.toString().equals("wso2event")) {
EventMappingPropertyDto[] inputMappingPropertyDtos =
addReceiverMappingToDto(inputMappingProperties);
addReceiverMappingToDto(adapterMappingConfiguration.getInputMappingProperties());
if (messageFormat.toString().equals("xml")) {
EventMappingPropertyDto[] namespaceMappingPropertyDtos =
addReceiverMappingToDto(namespaceMappingProperties);
addReceiverMappingToDto(adapterMappingConfiguration.getNamespaceMappingProperties());
eventReceiverAdminServiceStub.deployXmlEventReceiverConfiguration(receiverName
, eventStreamWithVersion, adapterType, null
, namespaceMappingPropertyDtos, inputMappingPropertyDtos
@ -507,9 +402,15 @@ public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifac
}
}
} else {
EventMappingPropertyDto[] correlationMappingPropertyDtos = addReceiverMappingToDto(correlationMappingProperties);
EventMappingPropertyDto[] metaMappingPropertyDtos = addReceiverMappingToDto(metaMappingProperties);
EventMappingPropertyDto[] payloadMappingPropertyDtos = addReceiverMappingToDto(payloadMappingProperties);
EventMappingPropertyDto[] correlationMappingPropertyDtos = addReceiverMappingToDto(
adapterMappingConfiguration.getCorrelationMappingProperties()
);
EventMappingPropertyDto[] metaMappingPropertyDtos = addReceiverMappingToDto(
adapterMappingConfiguration.getInputMappingProperties()
);
EventMappingPropertyDto[] payloadMappingPropertyDtos = addReceiverMappingToDto(
adapterMappingConfiguration.getPayloadMappingProperties()
);
eventReceiverAdminServiceStub.deployWso2EventReceiverConfiguration(receiverName
, eventStreamWithVersion, adapterType, metaMappingPropertyDtos
@ -517,86 +418,81 @@ public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifac
, basicInputAdapterPropertyDtos, customMapping
, eventStreamWithVersion);
}
}
} finally {
cleanup(eventReceiverAdminServiceStub);
}
}
/**
* @param publisherName Publisher name
* @param adapterType Publisher type
* @param adapterProperties Publisher properties
* @param customMapping Is publisher mapped
* @param correlationMappingProperties Publisher correlation attribute mapping
* @param payloadMappingProperties Publisher payload attribute mapping
* @param metaMappingProperties Publisher meta attribute mapping
* @param messageFormat Publisher mapping format
* @param eventStreamWithVersion Attached stream
* @throws RemoteException exception that may occur during a remote method call
* @throws UserStoreException exception that may occur during JWT token generation
* @throws JWTClientException exception that may occur during connecting to client store
* Set data to a publisher dto and deploy dto through a stub
*
* @param publisher Event Publisher adapter
* @param customMapping Is Publisher mapped
* @param adapterConfiguration Publisher property and mapping configuration
* @throws RemoteException Exception that may occur during a remote method call
* @throws UserStoreException Exception that may occur during JWT token generation
* @throws JWTClientException Exception that may occur during connecting to client store
*/
private void publishPublisher(String publisherName, String adapterType,
List<AdapterProperty> adapterProperties,
boolean customMapping,
String inputMappingString,
List<MappingProperty> inputMappingProperties,
List<MappingProperty> correlationMappingProperties,
List<MappingProperty> payloadMappingProperties,
List<MappingProperty> metaMappingProperties,
MessageFormat messageFormat,
String eventStreamWithVersion)
private void deployPublisher(Adapter publisher, boolean customMapping,
AdapterConfiguration adapterConfiguration)
throws RemoteException, UserStoreException, JWTClientException {
EventPublisherAdminServiceStub eventPublisherAdminServiceStub = DeviceMgtAPIUtils.getEventPublisherAdminServiceStub();
// Check if adapter already exists, if so un-deploy it
try {
String publisherName = publisher.getAdapterName();
String adapterType = publisher.getAdapterType().toStringFormatted();
String eventStreamWithVersion = publisher.getEventStreamWithVersion();
List<AdapterProperty> adapterProperties = adapterConfiguration.getAdapterProperties();
EventPublisherConfigurationDto eventPublisherConfigurationDto = eventPublisherAdminServiceStub
.getActiveEventPublisherConfiguration(publisherName);
if (eventPublisherConfigurationDto != null) {
eventPublisherAdminServiceStub.undeployActiveEventPublisherConfiguration(publisherName);
}
// Adding attribute properties to DTOs
BasicOutputAdapterPropertyDto[] basicOutputAdapterPropertyDtos =
addPublisherConfigToDto(adapterProperties);
if (eventPublisherAdminServiceStub.getActiveEventPublisherConfiguration(publisherName) == null) {
// Call stub deploy methods according to the message format
AdapterMappingConfiguration adapterMappingConfiguration = adapterConfiguration.getAdapterMappingConfiguration();
MessageFormat messageFormat = adapterMappingConfiguration.getMessageFormat();
if (!messageFormat.toString().equals("wso2event")) {
if (!messageFormat.toString().equals("map")) {
if (messageFormat.toString().equals("xml")) {
eventPublisherAdminServiceStub.deployXmlEventPublisherConfiguration(publisherName
, eventStreamWithVersion, adapterType, inputMappingString
, eventStreamWithVersion, adapterType, adapterMappingConfiguration.getInputMappingString()
, basicOutputAdapterPropertyDtos, eventStreamWithVersion
, customMapping);
} else if (messageFormat.toString().equals("text")) {
eventPublisherAdminServiceStub.deployTextEventPublisherConfiguration(publisherName
, eventStreamWithVersion, adapterType, inputMappingString
, eventStreamWithVersion, adapterType, adapterMappingConfiguration.getInputMappingString()
, basicOutputAdapterPropertyDtos, eventStreamWithVersion
, customMapping);
} else {
eventPublisherAdminServiceStub.deployJsonEventPublisherConfiguration(publisherName
, eventStreamWithVersion, adapterType, inputMappingString
, eventStreamWithVersion, adapterType, adapterMappingConfiguration.getInputMappingString()
, basicOutputAdapterPropertyDtos, eventStreamWithVersion
, customMapping);
}
} else {
org.wso2.carbon.event.publisher.stub.types.EventMappingPropertyDto[] inputMappingPropertyDtos =
addPublisherMappingToDto(inputMappingProperties);
addPublisherMappingToDto(
adapterMappingConfiguration.getInputMappingProperties()
);
eventPublisherAdminServiceStub.deployMapEventPublisherConfiguration(publisherName
, eventStreamWithVersion, adapterType, inputMappingPropertyDtos
, basicOutputAdapterPropertyDtos, customMapping);
}
} else {
org.wso2.carbon.event.publisher.stub.types.EventMappingPropertyDto[] correlationMappingPropertyDtos =
addPublisherMappingToDto(correlationMappingProperties);
addPublisherMappingToDto(
adapterMappingConfiguration.getCorrelationMappingProperties()
);
org.wso2.carbon.event.publisher.stub.types.EventMappingPropertyDto[] metaMappingPropertyDtos =
addPublisherMappingToDto(metaMappingProperties);
addPublisherMappingToDto(
adapterMappingConfiguration.getMetaMappingProperties()
);
org.wso2.carbon.event.publisher.stub.types.EventMappingPropertyDto[] payloadMappingPropertyDtos =
addPublisherMappingToDto(payloadMappingProperties);
addPublisherMappingToDto(
adapterMappingConfiguration.getPayloadMappingProperties()
);
eventPublisherAdminServiceStub.deployWSO2EventPublisherConfiguration(publisherName
, eventStreamWithVersion, adapterType, metaMappingPropertyDtos
, correlationMappingPropertyDtos, payloadMappingPropertyDtos
@ -604,41 +500,39 @@ public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifac
, eventStreamWithVersion);
}
}
} finally {
cleanup(eventPublisherAdminServiceStub);
}
}
/**
* @param name plan name
* @param isEdited is plan edited
* @param plan plan data
* @throws RemoteException exception that may occur during a remote method call
* @throws UserStoreException exception that may occur during JWT token generation
* @throws JWTClientException exception that may occur during connecting to client store
* @throws InvalidExecutionPlanException exception that may occur if execution plan validation fails
* Publish a siddhi execution plan using a stub
*
* @param name Plan name
* @param isEdited Is plan edited
* @param plan Plan data
* @throws RemoteException Exception that may occur during a remote method call
* @throws UserStoreException Exception that may occur during JWT token generation
* @throws JWTClientException Exception that may occur during connecting to client store
* @throws InvalidExecutionPlanException Exception that may occur if execution plan validation fails
*/
private void publishSiddhiExecutionPlan(String name, boolean isEdited,
String plan)
throws RemoteException, UserStoreException, JWTClientException,
InvalidExecutionPlanException {
throws RemoteException, UserStoreException, JWTClientException {
EventProcessorAdminServiceStub eventProcessorAdminServiceStub = null;
try {
eventProcessorAdminServiceStub = DeviceMgtAPIUtils.getEventProcessorAdminServiceStub();
// Validate the plan code
String validationResponse = eventProcessorAdminServiceStub.validateExecutionPlan(plan);
if (validationResponse.equals("success")) {
if (!isEdited) {
// Create a new plan
eventProcessorAdminServiceStub.deployExecutionPlan(plan);
} else {
// Edit plan
eventProcessorAdminServiceStub.editActiveExecutionPlan(plan, name);
}
} else {
ErrorDTO errorDTO = new ErrorDTO();
errorDTO.setMessage(validationResponse);
log.error(validationResponse);
throw new InvalidExecutionPlanException(errorDTO);
}
} finally {
@ -646,18 +540,74 @@ public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifac
}
}
/**
* This will set payload of event attribute's mapping to the DTO
*
* @param attributes list of event attributes
* @return DTO with all the event attributes
*/
private EventStreamAttributeDto[] addEventAttributesToDto(List<Attribute> attributes) {
EventStreamAttributeDto[] eventStreamAttributeDtos = new EventStreamAttributeDto[attributes.size()];
for (int i = 0; i < attributes.size(); i++) {
for (Attribute attribute : attributes) {
EventStreamAttributeDto eventStreamAttributeDto = new EventStreamAttributeDto();
eventStreamAttributeDto.setAttributeName(attributes.get(i).getName());
eventStreamAttributeDto.setAttributeType(attributes.get(i).getType().toString());
eventStreamAttributeDtos[i] = eventStreamAttributeDto;
eventStreamAttributeDto.setAttributeName(attribute.getName());
eventStreamAttributeDto.setAttributeType(attribute.getType().toString());
}
return eventStreamAttributeDtos;
}
/**
* Validate adapter payload attributes
*
* @param adapterProperties Adapter payload attributes
*/
private void validateAdapterProperties(List<AdapterProperty> adapterProperties) {
if (adapterProperties.isEmpty()) {
String errMsg = "Invalid payload: event property attributes invalid!!!";
ErrorResponse errorResponse = new ErrorResponse();
errorResponse.setMessage(errMsg);
log.error(errMsg);
throw new BadRequestException(errorResponse);
}
}
/**
* Validate adapter mapping attributes
* <p>
* Conditions
* - if both inputMappingProperties and namespaceMappingProperties null check remaining property lists
* - if all correlationMappingProperties, payloadMappingProperties, metaMappingProperties null log error
* - if message format is null change the final result to TRUE
* - else continue
*
* @param adapterMappingConfiguration Adapter mapping attributes
*/
private void validateAdapterMapping(AdapterMappingConfiguration adapterMappingConfiguration) {
if (adapterMappingConfiguration.getInputMappingString() == null &&
(adapterMappingConfiguration.getInputMappingProperties().isEmpty() &&
adapterMappingConfiguration.getNamespaceMappingProperties().isEmpty()) &&
(
adapterMappingConfiguration.getCorrelationMappingProperties().isEmpty() &&
adapterMappingConfiguration.getPayloadMappingProperties().isEmpty() &&
adapterMappingConfiguration.getMetaMappingProperties().isEmpty()
)
|| adapterMappingConfiguration.getMessageFormat() == null) {
String errMsg = "Invalid payload: event mapping attributes invalid!!!";
ErrorResponse errorResponse = new ErrorResponse();
errorResponse.setMessage(errMsg);
log.error(errMsg);
throw new BadRequestException(errorResponse);
}
}
/**
* This will set payload of receiver attributes to the DTO
*
* @param adapterProperties List of receiver attributes
* @return DTO with all the receiver attributes
*/
private BasicInputAdapterPropertyDto[] addReceiverConfigToDto(
List<AdapterProperty> adapterProperties) {
BasicInputAdapterPropertyDto[] basicInputAdapterPropertyDtos
@ -671,7 +621,14 @@ public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifac
return basicInputAdapterPropertyDtos;
}
private EventMappingPropertyDto[] addReceiverMappingToDto(List<MappingProperty> mapProperties) {
/**
* This will set payload of receiver mapping attributes to the DTO
*
* @param mapProperties List of receiver mapping attributes
* @return DTO with all the receiver mapping attributes
*/
private EventMappingPropertyDto[] addReceiverMappingToDto
(List<MappingProperty> mapProperties) {
EventMappingPropertyDto[] eventMappingPropertyDtos = new EventMappingPropertyDto[mapProperties.size()];
for (int i = 0; i < mapProperties.size(); i++) {
EventMappingPropertyDto eventMappingPropertyDto = new EventMappingPropertyDto();
@ -683,6 +640,12 @@ public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifac
return eventMappingPropertyDtos;
}
/**
* This will set payload of publisher attributes to the DTO
*
* @param adapterProperties List of publisher attributes
* @return DTO with all the publisher attributes
*/
private BasicOutputAdapterPropertyDto[] addPublisherConfigToDto(
List<AdapterProperty> adapterProperties) {
BasicOutputAdapterPropertyDto[] basicOutputAdapterPropertyDtos =
@ -697,6 +660,12 @@ public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifac
return basicOutputAdapterPropertyDtos;
}
/**
* This will set payload of publisher mapping attributes to the DTO
*
* @param mapProperties List of publisher mapping attributes
* @return DTO with all the publisher mapping attributes
*/
private org.wso2.carbon.event.publisher.stub.types.EventMappingPropertyDto[] addPublisherMappingToDto
(List<MappingProperty> mapProperties) {
org.wso2.carbon.event.publisher.stub.types.EventMappingPropertyDto[] eventMappingPropertyDtos
@ -712,6 +681,11 @@ public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifac
return eventMappingPropertyDtos;
}
/**
* Clean Service client in the stub
*
* @param stub Stud that needs to be cleaned
*/
private void cleanup(Stub stub) {
if (stub != null) {
try {

Loading…
Cancel
Save