|
|
|
@ -18,6 +18,7 @@
|
|
|
|
|
package org.wso2.carbon.device.mgt.jaxrs.service.impl;
|
|
|
|
|
|
|
|
|
|
import org.wso2.carbon.context.PrivilegedCarbonContext;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.beans.ErrorResponse;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.Attribute;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.AdapterMappingConfiguration;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.MappingProperty;
|
|
|
|
@ -25,8 +26,7 @@ import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.AdapterConfiguration;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.AdapterProperty;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.MessageFormat;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.SiddhiExecutionPlan;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.EventPublisher;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.EventReceiver;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.exception.BadRequestException;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.exception.ErrorDTO;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.exception.InvalidExecutionPlanException;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.service.api.AnalyticsArtifactsManagementService;
|
|
|
|
@ -58,7 +58,6 @@ import javax.ws.rs.Path;
|
|
|
|
|
import javax.ws.rs.PathParam;
|
|
|
|
|
import javax.ws.rs.QueryParam;
|
|
|
|
|
import javax.ws.rs.core.Response;
|
|
|
|
|
import java.nio.charset.StandardCharsets;
|
|
|
|
|
import java.rmi.RemoteException;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
|
|
|
|
@ -67,14 +66,10 @@ import java.util.List;
|
|
|
|
|
* siddhi scripts to the Analytics server as Artifacts
|
|
|
|
|
*/
|
|
|
|
|
@Path("/analytics/artifacts")
|
|
|
|
|
public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifactsManagementService {
|
|
|
|
|
public class AnalyticsArtifactsManagementServiceImpl
|
|
|
|
|
implements AnalyticsArtifactsManagementService {
|
|
|
|
|
private static final Log log = LogFactory.getLog(AnalyticsArtifactsManagementServiceImpl.class);
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param stream EventStream object with the properties of the stream
|
|
|
|
|
* @return A status code depending on the code result
|
|
|
|
|
* Function - Used to deploy stream as an artifact using a String
|
|
|
|
|
*/
|
|
|
|
|
@Override
|
|
|
|
|
@POST
|
|
|
|
|
@Path("/stream/{id}")
|
|
|
|
@ -84,90 +79,70 @@ public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifac
|
|
|
|
|
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
|
EventStreamAdminServiceStub eventStreamAdminServiceStub = null;
|
|
|
|
|
try {
|
|
|
|
|
String streamDefinition = new String(stream.getDefinition().getBytes(), StandardCharsets.UTF_8);
|
|
|
|
|
String streamDefinition = stream.getDefinition();
|
|
|
|
|
eventStreamAdminServiceStub = DeviceMgtAPIUtils.getEventStreamAdminServiceStub();
|
|
|
|
|
if (!isEdited) {
|
|
|
|
|
eventStreamAdminServiceStub.addEventStreamDefinitionAsString(streamDefinition);
|
|
|
|
|
} else {
|
|
|
|
|
// Find and edit stream
|
|
|
|
|
if (eventStreamAdminServiceStub.getStreamDetailsForStreamId(id) != null) {
|
|
|
|
|
eventStreamAdminServiceStub.editEventStreamDefinitionAsString(streamDefinition, id);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return Response.ok().build();
|
|
|
|
|
} catch (AxisFault e) {
|
|
|
|
|
log.error("Failed to create event definitions for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
String errMsg = "Failed to create event definitions for tenantDomain: " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
} catch (RemoteException e) {
|
|
|
|
|
log.error("Failed to connect with the remote services for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
String errMsg = "Failed to connect with the remote services for tenantDomain: " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
} catch (JWTClientException e) {
|
|
|
|
|
log.error("Failed to generate jwt token for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
String errMsg = "Failed to generate jwt token for tenantDomain: " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
} catch (UserStoreException e) {
|
|
|
|
|
log.error("Failed to connect with the user store for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
String errMsg = "Failed to connect with the user store for tenantDomain: " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
} finally {
|
|
|
|
|
cleanup(eventStreamAdminServiceStub);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param stream EventStream object with the properties of the stream
|
|
|
|
|
* @return A status code depending on the code result
|
|
|
|
|
* Function - Used to deploy stream as an artifact using a DTO
|
|
|
|
|
*/
|
|
|
|
|
@Override
|
|
|
|
|
@POST
|
|
|
|
|
@Path("/stream")
|
|
|
|
|
public Response deployEventDefinitionAsDto(@Valid EventStream stream) {
|
|
|
|
|
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
|
// Categorize attributes to three lists depending on their type
|
|
|
|
|
List<Attribute> metaData = stream.getMetaData();
|
|
|
|
|
List<Attribute> payloadData = stream.getPayloadData();
|
|
|
|
|
List<Attribute> correlationData = stream.getCorrelationData();
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
/* Conditions
|
|
|
|
|
* - At least one list should always be not null
|
|
|
|
|
*/
|
|
|
|
|
if (metaData == null && correlationData == null && payloadData == null) {
|
|
|
|
|
log.error("Invalid payload: event attributes");
|
|
|
|
|
return Response.status(Response.Status.BAD_REQUEST).build();
|
|
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
// Publish the event stream
|
|
|
|
|
publishStream(stream, metaData, correlationData, payloadData);
|
|
|
|
|
deployStream(stream);
|
|
|
|
|
return Response.ok().build();
|
|
|
|
|
}
|
|
|
|
|
} catch (AxisFault e) {
|
|
|
|
|
log.error("Failed to create event definitions for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
String errMsg = "Failed to create event definitions for tenant " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
} catch (RemoteException e) {
|
|
|
|
|
log.error("Failed to connect with the remote services for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
String errMsg = "Failed to connect with the remote services for tenant " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
} catch (JWTClientException e) {
|
|
|
|
|
log.error("Failed to generate jwt token for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
String errMsg = "Failed to generate jwt token for tenant " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
} catch (UserStoreException e) {
|
|
|
|
|
log.error("Failed to connect with the user store for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
String errMsg = "Failed to connect with the user store for tenant " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param name Receiver name
|
|
|
|
|
* @param isEdited If receiver is created or edited
|
|
|
|
|
* @param receiver Receiver object with the properties of the receiver
|
|
|
|
|
* @return A status code depending on the code result
|
|
|
|
|
* Function - Used to deploy receiver as an artifact using a String
|
|
|
|
|
*/
|
|
|
|
|
@Override
|
|
|
|
|
@POST
|
|
|
|
|
@Path("/receiver/{name}")
|
|
|
|
|
public Response deployEventReceiverAsString(@PathParam("name") String name,
|
|
|
|
|
@QueryParam("isEdited") boolean isEdited,
|
|
|
|
|
@Valid EventReceiver receiver) {
|
|
|
|
|
@Valid Adapter receiver) {
|
|
|
|
|
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
|
EventReceiverAdminServiceStub eventReceiverAdminServiceStub;
|
|
|
|
|
try {
|
|
|
|
@ -178,202 +153,128 @@ public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifac
|
|
|
|
|
} else {
|
|
|
|
|
eventReceiverAdminServiceStub.editActiveEventReceiverConfiguration(receiverDefinition, name);
|
|
|
|
|
}
|
|
|
|
|
return Response.ok().build();
|
|
|
|
|
} catch (AxisFault e) {
|
|
|
|
|
log.error("Failed to create event definitions for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
String errMsg = "Failed to create event definitions for tenantDomain: " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
} catch (RemoteException e) {
|
|
|
|
|
log.error("Failed to connect with the remote services for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
String errMsg = "Failed to connect with the remote services for tenantDomain: " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
} catch (JWTClientException e) {
|
|
|
|
|
log.error("Failed to generate jwt token for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
String errMsg = "Failed to generate jwt token for tenantDomain: " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
} catch (UserStoreException e) {
|
|
|
|
|
log.error("Failed to connect with the user store for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
String errMsg = "Failed to connect with the user store for tenantDomain: " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
}
|
|
|
|
|
return Response.ok().entity(name).build();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param receiver Receiver object with the properties of the receiver
|
|
|
|
|
* @return A status code depending on the code result
|
|
|
|
|
* Function - Used to deploy receiver as an artifact using a DTO
|
|
|
|
|
*/
|
|
|
|
|
@Override
|
|
|
|
|
@POST
|
|
|
|
|
@Path("/receiver")
|
|
|
|
|
public Response deployEventReceiverAsDto(@Valid Adapter receiver) {
|
|
|
|
|
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
|
String receiverName = receiver.getAdapterName();
|
|
|
|
|
String adapterType = receiver.getAdapterType().toStringFormatted();
|
|
|
|
|
AdapterConfiguration adapterConfiguration = receiver.getAdapterConfiguration();
|
|
|
|
|
AdapterMappingConfiguration adapterMappingConfiguration = adapterConfiguration.getAdapterMappingConfiguration();
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
List<AdapterProperty> adapterProperties = adapterConfiguration.getAdapterProperties();
|
|
|
|
|
if (adapterProperties == null) {
|
|
|
|
|
log.error("Invalid attribute payload");
|
|
|
|
|
return Response.status(Response.Status.BAD_REQUEST).build();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
AdapterConfiguration adapterConfiguration = receiver.getAdapterConfiguration();
|
|
|
|
|
boolean customMapping = adapterConfiguration.isCustomMappingEnabled();
|
|
|
|
|
List<MappingProperty> inputMappingProperties = adapterMappingConfiguration.getInputMappingProperties();
|
|
|
|
|
List<MappingProperty> namespaceMappingProperties = adapterMappingConfiguration.getNamespaceMappingProperties();
|
|
|
|
|
List<MappingProperty> correlationMappingProperties = adapterMappingConfiguration.getCorrelationMappingProperties();
|
|
|
|
|
List<MappingProperty> payloadMappingProperties = adapterMappingConfiguration.getPayloadMappingProperties();
|
|
|
|
|
List<MappingProperty> metaMappingProperties = adapterMappingConfiguration.getMetaMappingProperties();
|
|
|
|
|
MessageFormat messageFormat = adapterMappingConfiguration.getMessageFormat();
|
|
|
|
|
/*
|
|
|
|
|
* Conditions
|
|
|
|
|
* - if CustomMappingEnabled check validity of property lists
|
|
|
|
|
* - if both inputMappingProperties and namespaceMappingProperties null check remaining property lists
|
|
|
|
|
* - if all correlationMappingProperties, payloadMappingProperties, metaMappingProperties null log error
|
|
|
|
|
* - if message format is null change the final result to TRUE
|
|
|
|
|
* - else continue
|
|
|
|
|
* */
|
|
|
|
|
if ((customMapping &&
|
|
|
|
|
(inputMappingProperties == null && namespaceMappingProperties == null) &&
|
|
|
|
|
(correlationMappingProperties == null && payloadMappingProperties == null &&
|
|
|
|
|
metaMappingProperties == null)) || messageFormat == null) {
|
|
|
|
|
String errMsg = "Invalid mapping payload";
|
|
|
|
|
log.error(errMsg);
|
|
|
|
|
return Response.status(Response.Status.BAD_REQUEST).entity(errMsg).build();
|
|
|
|
|
validateAdapterProperties(adapterConfiguration.getAdapterProperties());
|
|
|
|
|
if (customMapping) {
|
|
|
|
|
validateAdapterMapping(adapterConfiguration.getAdapterMappingConfiguration());
|
|
|
|
|
}
|
|
|
|
|
String eventStreamWithVersion = receiver.getEventStreamWithVersion();
|
|
|
|
|
|
|
|
|
|
publishReceiver(receiverName, adapterType, adapterProperties, customMapping, inputMappingProperties,
|
|
|
|
|
namespaceMappingProperties, correlationMappingProperties, payloadMappingProperties,
|
|
|
|
|
metaMappingProperties, messageFormat, eventStreamWithVersion);
|
|
|
|
|
deployReceiver(receiver, customMapping, adapterConfiguration);
|
|
|
|
|
return Response.ok().build();
|
|
|
|
|
} catch (AxisFault e) {
|
|
|
|
|
log.error("Failed to create event definitions for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
String errMsg = "Failed to create event definitions for tenantDomain: " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
} catch (RemoteException e) {
|
|
|
|
|
log.error("Failed to connect with the remote services for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
String errMsg = "Failed to connect with the remote services for tenantDomain: " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
} catch (JWTClientException e) {
|
|
|
|
|
log.error("Failed to generate jwt token for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
String errMsg = "Failed to generate jwt token for tenantDomain: " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
} catch (UserStoreException e) {
|
|
|
|
|
log.error("Failed to connect with the user store for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
String errMsg = "Failed to connect with the user store for tenantDomain: " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param name Publisher name
|
|
|
|
|
* @param isEdited If receiver is created or edited
|
|
|
|
|
* @param publisher Publisher object with the properties of the publisher
|
|
|
|
|
* @return A status code depending on the code result
|
|
|
|
|
* Function - Used to deploy publisher as an artifact using a String
|
|
|
|
|
*/
|
|
|
|
|
@Override
|
|
|
|
|
@POST
|
|
|
|
|
@Path("/publisher/{name}")
|
|
|
|
|
public Response deployEventPublisherAsString(@PathParam("name") String name,
|
|
|
|
|
@QueryParam("isEdited") boolean isEdited,
|
|
|
|
|
@Valid EventPublisher publisher) {
|
|
|
|
|
@Valid Adapter publisher) {
|
|
|
|
|
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
|
EventPublisherAdminServiceStub eventPublisherAdminServiceStub;
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
String publisherDefinition = publisher.getDefinition();
|
|
|
|
|
|
|
|
|
|
eventPublisherAdminServiceStub = DeviceMgtAPIUtils.getEventPublisherAdminServiceStub();
|
|
|
|
|
if (!isEdited) {
|
|
|
|
|
eventPublisherAdminServiceStub.deployEventPublisherConfiguration(publisherDefinition);
|
|
|
|
|
} else {
|
|
|
|
|
eventPublisherAdminServiceStub.editActiveEventPublisherConfiguration(publisherDefinition, name);
|
|
|
|
|
}
|
|
|
|
|
return Response.ok().build();
|
|
|
|
|
} catch (AxisFault e) {
|
|
|
|
|
log.error("Failed to create event definitions for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
String errMsg = "Failed to create event definitions for tenantDomain: " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
} catch (RemoteException e) {
|
|
|
|
|
log.error("Failed to connect with the remote services for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
String errMsg = "Failed to connect with the remote services for tenantDomain: " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
} catch (JWTClientException e) {
|
|
|
|
|
log.error("Failed to generate jwt token for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
String errMsg = "Failed to generate jwt token for tenantDomain: " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
} catch (UserStoreException e) {
|
|
|
|
|
log.error("Failed to connect with the user store for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
String errMsg = "Failed to connect with the user store for tenantDomain: " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
}
|
|
|
|
|
return Response.ok().entity(publisher).build();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param publisher Publisher object with the properties of the publisher
|
|
|
|
|
* @return A status code depending on the code result
|
|
|
|
|
* Function - Used to deploy publisher as an artifact using a DTO
|
|
|
|
|
*/
|
|
|
|
|
@Override
|
|
|
|
|
@POST
|
|
|
|
|
@Path("/publisher")
|
|
|
|
|
public Response deployEventPublisherAsDto(@Valid Adapter publisher) {
|
|
|
|
|
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
|
String publisherName = publisher.getAdapterName();
|
|
|
|
|
String adapterType = publisher.getAdapterType().toStringFormatted();
|
|
|
|
|
AdapterConfiguration adapterConfiguration = publisher.getAdapterConfiguration();
|
|
|
|
|
AdapterMappingConfiguration adapterMappingConfiguration = adapterConfiguration.getAdapterMappingConfiguration();
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
List<AdapterProperty> adapterProperties = adapterConfiguration.getAdapterProperties();
|
|
|
|
|
if (adapterProperties == null) {
|
|
|
|
|
log.error("Invalid attribute payload");
|
|
|
|
|
return Response.status(Response.Status.BAD_REQUEST).build();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
validateAdapterProperties(adapterConfiguration.getAdapterProperties());
|
|
|
|
|
boolean customMapping = adapterConfiguration.isCustomMappingEnabled();
|
|
|
|
|
String inputMappingString = adapterMappingConfiguration.getInputMappingString();
|
|
|
|
|
List<MappingProperty> inputMappingProperties = adapterMappingConfiguration.getInputMappingProperties();
|
|
|
|
|
List<MappingProperty> correlationMappingProperties = adapterMappingConfiguration.getCorrelationMappingProperties();
|
|
|
|
|
List<MappingProperty> payloadMappingProperties = adapterMappingConfiguration.getPayloadMappingProperties();
|
|
|
|
|
List<MappingProperty> metaMappingProperties = adapterMappingConfiguration.getMetaMappingProperties();
|
|
|
|
|
MessageFormat messageFormat = adapterMappingConfiguration.getMessageFormat();
|
|
|
|
|
/*
|
|
|
|
|
* Conditions
|
|
|
|
|
* - if CustomMappingEnabled check validity of property lists
|
|
|
|
|
* - if all correlationMappingProperties, payloadMappingProperties, metaMappingProperties null log error
|
|
|
|
|
* - if message format is null change the final result to TRUE
|
|
|
|
|
* - else continue
|
|
|
|
|
*/
|
|
|
|
|
if ((customMapping &&
|
|
|
|
|
(inputMappingProperties == null && inputMappingString == null) &&
|
|
|
|
|
(correlationMappingProperties == null && payloadMappingProperties == null &&
|
|
|
|
|
metaMappingProperties == null)) || messageFormat == null) {
|
|
|
|
|
String errMsg = "Invalid mapping payload";
|
|
|
|
|
log.error(errMsg);
|
|
|
|
|
return Response.status(Response.Status.BAD_REQUEST).entity(errMsg).build();
|
|
|
|
|
if (customMapping) {
|
|
|
|
|
validateAdapterMapping(adapterConfiguration.getAdapterMappingConfiguration());
|
|
|
|
|
}
|
|
|
|
|
String eventStreamWithVersion = publisher.getEventStreamWithVersion();
|
|
|
|
|
|
|
|
|
|
publishPublisher(publisherName, adapterType, adapterProperties, customMapping
|
|
|
|
|
, inputMappingString, inputMappingProperties, correlationMappingProperties
|
|
|
|
|
, payloadMappingProperties, metaMappingProperties, messageFormat
|
|
|
|
|
, eventStreamWithVersion);
|
|
|
|
|
deployPublisher(publisher, customMapping, adapterConfiguration);
|
|
|
|
|
return Response.ok().build();
|
|
|
|
|
} catch (AxisFault e) {
|
|
|
|
|
log.error("Failed to create event definitions for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
String errMsg = "Failed to create event definitions for tenantDomain: " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
} catch (RemoteException e) {
|
|
|
|
|
log.error("Failed to connect with the remote services for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
String errMsg = "Failed to connect with the remote services for tenantDomain: " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
} catch (JWTClientException e) {
|
|
|
|
|
log.error("Failed to generate jwt token for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
String errMsg = "Failed to generate jwt token for tenantDomain: " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
} catch (UserStoreException e) {
|
|
|
|
|
log.error("Failed to connect with the user store for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
String errMsg = "Failed to connect with the user store for tenantDomain: " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param name Siddhi plan name
|
|
|
|
|
* @param isEdited If receiver is created or edited
|
|
|
|
|
* @param plan Siddhi plan definition
|
|
|
|
|
* @return a status code depending on the code execution
|
|
|
|
|
* Function - Used to deploy Siddhi script as an artifact using a String
|
|
|
|
|
*/
|
|
|
|
|
@Override
|
|
|
|
|
@POST
|
|
|
|
|
@Path("/siddhi-script/{name}")
|
|
|
|
@ -385,38 +286,46 @@ public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifac
|
|
|
|
|
publishSiddhiExecutionPlan(name, isEdited, plan.getDefinition());
|
|
|
|
|
return Response.ok().build();
|
|
|
|
|
} catch (AxisFault e) {
|
|
|
|
|
log.error("Failed to create event definitions for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
String errMsg = "Failed to create event definitions for tenantDomain: " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
} catch (RemoteException e) {
|
|
|
|
|
log.error("Failed to connect with the remote services for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
} catch (InvalidExecutionPlanException e) {
|
|
|
|
|
log.error("Invalid Execution plan: " + tenantDomain, e);
|
|
|
|
|
return e.getResponse();
|
|
|
|
|
String errMsg = "Failed to connect with the remote services for tenantDomain: " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
} catch (JWTClientException e) {
|
|
|
|
|
log.error("Failed to generate jwt token for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
String errMsg = "Failed to generate jwt token for tenantDomain: " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
} catch (UserStoreException e) {
|
|
|
|
|
log.error("Failed to connect with the user store for tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
String errMsg = "Failed to connect with the user store for tenantDomain: " + tenantDomain;
|
|
|
|
|
log.error(errMsg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Set data to a Stream dto and deploy dto through a stub
|
|
|
|
|
*
|
|
|
|
|
* @param stream Stream definition
|
|
|
|
|
* @param metaData Meta attributes of the stream
|
|
|
|
|
* @param correlationData Correlation attributes of the stream
|
|
|
|
|
* @param payloadData Payload attributes of the stream
|
|
|
|
|
* @throws RemoteException exception that may occur during a remote method call
|
|
|
|
|
* @throws UserStoreException exception that may occur during JWT token generation
|
|
|
|
|
* @throws JWTClientException exception that may occur during connecting to client store
|
|
|
|
|
*/
|
|
|
|
|
private void publishStream(EventStream stream, List<Attribute> metaData,
|
|
|
|
|
List<Attribute> correlationData, List<Attribute> payloadData)
|
|
|
|
|
private void deployStream(EventStream stream)
|
|
|
|
|
throws RemoteException, UserStoreException, JWTClientException {
|
|
|
|
|
EventStreamAdminServiceStub eventStreamAdminServiceStub =
|
|
|
|
|
DeviceMgtAPIUtils.getEventStreamAdminServiceStub();
|
|
|
|
|
EventStreamAdminServiceStub eventStreamAdminServiceStub = null;
|
|
|
|
|
List<Attribute> metaData = stream.getMetaData();
|
|
|
|
|
List<Attribute> payloadData = stream.getPayloadData();
|
|
|
|
|
List<Attribute> correlationData = stream.getCorrelationData();
|
|
|
|
|
if (metaData.isEmpty() && correlationData.isEmpty() && payloadData.isEmpty()) {
|
|
|
|
|
String errMsg = String.format("Failed to validate Stream property attributes of %s:%s",
|
|
|
|
|
stream.getName(), stream.getVersion());
|
|
|
|
|
ErrorResponse errorResponse = new ErrorResponse();
|
|
|
|
|
errorResponse.setMessage(errMsg);
|
|
|
|
|
log.error(errMsg);
|
|
|
|
|
throw new BadRequestException(errorResponse);
|
|
|
|
|
}
|
|
|
|
|
try {
|
|
|
|
|
eventStreamAdminServiceStub = DeviceMgtAPIUtils.getEventStreamAdminServiceStub();
|
|
|
|
|
|
|
|
|
|
EventStreamDefinitionDto eventStreamDefinitionDto = new EventStreamDefinitionDto();
|
|
|
|
|
eventStreamDefinitionDto.setName(stream.getName());
|
|
|
|
|
eventStreamDefinitionDto.setVersion(stream.getVersion());
|
|
|
|
@ -425,6 +334,7 @@ public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifac
|
|
|
|
|
eventStreamDefinitionDto.setMetaData(addEventAttributesToDto(metaData));
|
|
|
|
|
eventStreamDefinitionDto.setPayloadData(addEventAttributesToDto(payloadData));
|
|
|
|
|
eventStreamDefinitionDto.setCorrelationData(addEventAttributesToDto(correlationData));
|
|
|
|
|
|
|
|
|
|
String streamId = stream.getName() + ":" + stream.getVersion();
|
|
|
|
|
if (eventStreamAdminServiceStub.getStreamDefinitionDto(streamId) != null) {
|
|
|
|
|
eventStreamAdminServiceStub.editEventStreamDefinitionAsDto(eventStreamDefinitionDto, streamId);
|
|
|
|
@ -438,55 +348,40 @@ public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifac
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param receiverName Receiver name
|
|
|
|
|
* @param adapterType Receiver type
|
|
|
|
|
* @param adapterProperties Receiver properties
|
|
|
|
|
* @param customMapping Is receiver mapped
|
|
|
|
|
* @param inputMappingProperties Receiver input attribute mapping
|
|
|
|
|
* @param namespaceMappingProperties Receiver name-scape attribute mapping
|
|
|
|
|
* @param correlationMappingProperties Receiver correlation attribute mapping
|
|
|
|
|
* @param payloadMappingProperties Receiver payload attribute mapping
|
|
|
|
|
* @param metaMappingProperties Receiver meta attribute mapping
|
|
|
|
|
* @param messageFormat Receiver mapping format
|
|
|
|
|
* @param eventStreamWithVersion Attached stream
|
|
|
|
|
* @throws RemoteException exception that may occur during a remote method call
|
|
|
|
|
* @throws UserStoreException exception that may occur during JWT token generation
|
|
|
|
|
* @throws JWTClientException exception that may occur during connecting to client store
|
|
|
|
|
* Set data to a receiver dto and deploy dto through a stub
|
|
|
|
|
*
|
|
|
|
|
* @param receiver Event Receiver adapter
|
|
|
|
|
* @param customMapping Is Receiver mapped
|
|
|
|
|
* @param adapterConfiguration Adapter property and mapping configuration
|
|
|
|
|
* @throws RemoteException Exception that may occur during a remote method call
|
|
|
|
|
* @throws UserStoreException Exception that may occur during JWT token generation
|
|
|
|
|
* @throws JWTClientException Exception that may occur during connecting to client store
|
|
|
|
|
*/
|
|
|
|
|
private void publishReceiver(String receiverName, String adapterType,
|
|
|
|
|
List<AdapterProperty> adapterProperties, boolean customMapping,
|
|
|
|
|
List<MappingProperty> inputMappingProperties,
|
|
|
|
|
List<MappingProperty> namespaceMappingProperties,
|
|
|
|
|
List<MappingProperty> correlationMappingProperties,
|
|
|
|
|
List<MappingProperty> payloadMappingProperties,
|
|
|
|
|
List<MappingProperty> metaMappingProperties,
|
|
|
|
|
MessageFormat messageFormat,
|
|
|
|
|
String eventStreamWithVersion)
|
|
|
|
|
private void deployReceiver(Adapter receiver, boolean customMapping,
|
|
|
|
|
AdapterConfiguration adapterConfiguration)
|
|
|
|
|
throws RemoteException, UserStoreException, JWTClientException {
|
|
|
|
|
EventReceiverAdminServiceStub eventReceiverAdminServiceStub = DeviceMgtAPIUtils.getEventReceiverAdminServiceStub();
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
String receiverName = receiver.getAdapterName();
|
|
|
|
|
String adapterType = receiver.getAdapterType().toStringFormatted();
|
|
|
|
|
String eventStreamWithVersion = receiver.getEventStreamWithVersion();
|
|
|
|
|
List<AdapterProperty> adapterProperties = adapterConfiguration.getAdapterProperties();
|
|
|
|
|
EventReceiverConfigurationDto eventReceiverConfigurationDto = eventReceiverAdminServiceStub
|
|
|
|
|
.getActiveEventReceiverConfiguration(receiverName);
|
|
|
|
|
|
|
|
|
|
// Check if adapter already exists, if so un-deploy it
|
|
|
|
|
if (eventReceiverConfigurationDto != null) {
|
|
|
|
|
eventReceiverAdminServiceStub.undeployActiveEventReceiverConfiguration(receiverName);
|
|
|
|
|
}
|
|
|
|
|
BasicInputAdapterPropertyDto[] basicInputAdapterPropertyDtos = addReceiverConfigToDto(adapterProperties);
|
|
|
|
|
|
|
|
|
|
// Adding attribute properties to DTOs
|
|
|
|
|
BasicInputAdapterPropertyDto[] basicInputAdapterPropertyDtos =
|
|
|
|
|
addReceiverConfigToDto(adapterProperties);
|
|
|
|
|
|
|
|
|
|
if (eventReceiverAdminServiceStub.getActiveEventReceiverConfiguration(receiverName) == null) {
|
|
|
|
|
// Call stub deploy methods according to the message format
|
|
|
|
|
AdapterMappingConfiguration adapterMappingConfiguration = adapterConfiguration.getAdapterMappingConfiguration();
|
|
|
|
|
MessageFormat messageFormat = adapterMappingConfiguration.getMessageFormat();
|
|
|
|
|
if (!messageFormat.toString().equals("wso2event")) {
|
|
|
|
|
EventMappingPropertyDto[] inputMappingPropertyDtos =
|
|
|
|
|
addReceiverMappingToDto(inputMappingProperties);
|
|
|
|
|
addReceiverMappingToDto(adapterMappingConfiguration.getInputMappingProperties());
|
|
|
|
|
if (messageFormat.toString().equals("xml")) {
|
|
|
|
|
EventMappingPropertyDto[] namespaceMappingPropertyDtos =
|
|
|
|
|
addReceiverMappingToDto(namespaceMappingProperties);
|
|
|
|
|
|
|
|
|
|
addReceiverMappingToDto(adapterMappingConfiguration.getNamespaceMappingProperties());
|
|
|
|
|
eventReceiverAdminServiceStub.deployXmlEventReceiverConfiguration(receiverName
|
|
|
|
|
, eventStreamWithVersion, adapterType, null
|
|
|
|
|
, namespaceMappingPropertyDtos, inputMappingPropertyDtos
|
|
|
|
@ -507,9 +402,15 @@ public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifac
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
EventMappingPropertyDto[] correlationMappingPropertyDtos = addReceiverMappingToDto(correlationMappingProperties);
|
|
|
|
|
EventMappingPropertyDto[] metaMappingPropertyDtos = addReceiverMappingToDto(metaMappingProperties);
|
|
|
|
|
EventMappingPropertyDto[] payloadMappingPropertyDtos = addReceiverMappingToDto(payloadMappingProperties);
|
|
|
|
|
EventMappingPropertyDto[] correlationMappingPropertyDtos = addReceiverMappingToDto(
|
|
|
|
|
adapterMappingConfiguration.getCorrelationMappingProperties()
|
|
|
|
|
);
|
|
|
|
|
EventMappingPropertyDto[] metaMappingPropertyDtos = addReceiverMappingToDto(
|
|
|
|
|
adapterMappingConfiguration.getInputMappingProperties()
|
|
|
|
|
);
|
|
|
|
|
EventMappingPropertyDto[] payloadMappingPropertyDtos = addReceiverMappingToDto(
|
|
|
|
|
adapterMappingConfiguration.getPayloadMappingProperties()
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
eventReceiverAdminServiceStub.deployWso2EventReceiverConfiguration(receiverName
|
|
|
|
|
, eventStreamWithVersion, adapterType, metaMappingPropertyDtos
|
|
|
|
@ -517,86 +418,81 @@ public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifac
|
|
|
|
|
, basicInputAdapterPropertyDtos, customMapping
|
|
|
|
|
, eventStreamWithVersion);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
} finally {
|
|
|
|
|
cleanup(eventReceiverAdminServiceStub);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param publisherName Publisher name
|
|
|
|
|
* @param adapterType Publisher type
|
|
|
|
|
* @param adapterProperties Publisher properties
|
|
|
|
|
* @param customMapping Is publisher mapped
|
|
|
|
|
* @param correlationMappingProperties Publisher correlation attribute mapping
|
|
|
|
|
* @param payloadMappingProperties Publisher payload attribute mapping
|
|
|
|
|
* @param metaMappingProperties Publisher meta attribute mapping
|
|
|
|
|
* @param messageFormat Publisher mapping format
|
|
|
|
|
* @param eventStreamWithVersion Attached stream
|
|
|
|
|
* @throws RemoteException exception that may occur during a remote method call
|
|
|
|
|
* @throws UserStoreException exception that may occur during JWT token generation
|
|
|
|
|
* @throws JWTClientException exception that may occur during connecting to client store
|
|
|
|
|
* Set data to a publisher dto and deploy dto through a stub
|
|
|
|
|
*
|
|
|
|
|
* @param publisher Event Publisher adapter
|
|
|
|
|
* @param customMapping Is Publisher mapped
|
|
|
|
|
* @param adapterConfiguration Publisher property and mapping configuration
|
|
|
|
|
* @throws RemoteException Exception that may occur during a remote method call
|
|
|
|
|
* @throws UserStoreException Exception that may occur during JWT token generation
|
|
|
|
|
* @throws JWTClientException Exception that may occur during connecting to client store
|
|
|
|
|
*/
|
|
|
|
|
private void publishPublisher(String publisherName, String adapterType,
|
|
|
|
|
List<AdapterProperty> adapterProperties,
|
|
|
|
|
boolean customMapping,
|
|
|
|
|
String inputMappingString,
|
|
|
|
|
List<MappingProperty> inputMappingProperties,
|
|
|
|
|
List<MappingProperty> correlationMappingProperties,
|
|
|
|
|
List<MappingProperty> payloadMappingProperties,
|
|
|
|
|
List<MappingProperty> metaMappingProperties,
|
|
|
|
|
MessageFormat messageFormat,
|
|
|
|
|
String eventStreamWithVersion)
|
|
|
|
|
private void deployPublisher(Adapter publisher, boolean customMapping,
|
|
|
|
|
AdapterConfiguration adapterConfiguration)
|
|
|
|
|
throws RemoteException, UserStoreException, JWTClientException {
|
|
|
|
|
EventPublisherAdminServiceStub eventPublisherAdminServiceStub = DeviceMgtAPIUtils.getEventPublisherAdminServiceStub();
|
|
|
|
|
// Check if adapter already exists, if so un-deploy it
|
|
|
|
|
try {
|
|
|
|
|
String publisherName = publisher.getAdapterName();
|
|
|
|
|
String adapterType = publisher.getAdapterType().toStringFormatted();
|
|
|
|
|
String eventStreamWithVersion = publisher.getEventStreamWithVersion();
|
|
|
|
|
List<AdapterProperty> adapterProperties = adapterConfiguration.getAdapterProperties();
|
|
|
|
|
EventPublisherConfigurationDto eventPublisherConfigurationDto = eventPublisherAdminServiceStub
|
|
|
|
|
.getActiveEventPublisherConfiguration(publisherName);
|
|
|
|
|
if (eventPublisherConfigurationDto != null) {
|
|
|
|
|
eventPublisherAdminServiceStub.undeployActiveEventPublisherConfiguration(publisherName);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Adding attribute properties to DTOs
|
|
|
|
|
BasicOutputAdapterPropertyDto[] basicOutputAdapterPropertyDtos =
|
|
|
|
|
addPublisherConfigToDto(adapterProperties);
|
|
|
|
|
|
|
|
|
|
if (eventPublisherAdminServiceStub.getActiveEventPublisherConfiguration(publisherName) == null) {
|
|
|
|
|
// Call stub deploy methods according to the message format
|
|
|
|
|
AdapterMappingConfiguration adapterMappingConfiguration = adapterConfiguration.getAdapterMappingConfiguration();
|
|
|
|
|
MessageFormat messageFormat = adapterMappingConfiguration.getMessageFormat();
|
|
|
|
|
if (!messageFormat.toString().equals("wso2event")) {
|
|
|
|
|
if (!messageFormat.toString().equals("map")) {
|
|
|
|
|
if (messageFormat.toString().equals("xml")) {
|
|
|
|
|
eventPublisherAdminServiceStub.deployXmlEventPublisherConfiguration(publisherName
|
|
|
|
|
, eventStreamWithVersion, adapterType, inputMappingString
|
|
|
|
|
, eventStreamWithVersion, adapterType, adapterMappingConfiguration.getInputMappingString()
|
|
|
|
|
, basicOutputAdapterPropertyDtos, eventStreamWithVersion
|
|
|
|
|
, customMapping);
|
|
|
|
|
} else if (messageFormat.toString().equals("text")) {
|
|
|
|
|
eventPublisherAdminServiceStub.deployTextEventPublisherConfiguration(publisherName
|
|
|
|
|
, eventStreamWithVersion, adapterType, inputMappingString
|
|
|
|
|
, eventStreamWithVersion, adapterType, adapterMappingConfiguration.getInputMappingString()
|
|
|
|
|
, basicOutputAdapterPropertyDtos, eventStreamWithVersion
|
|
|
|
|
, customMapping);
|
|
|
|
|
} else {
|
|
|
|
|
eventPublisherAdminServiceStub.deployJsonEventPublisherConfiguration(publisherName
|
|
|
|
|
, eventStreamWithVersion, adapterType, inputMappingString
|
|
|
|
|
, eventStreamWithVersion, adapterType, adapterMappingConfiguration.getInputMappingString()
|
|
|
|
|
, basicOutputAdapterPropertyDtos, eventStreamWithVersion
|
|
|
|
|
, customMapping);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
org.wso2.carbon.event.publisher.stub.types.EventMappingPropertyDto[] inputMappingPropertyDtos =
|
|
|
|
|
addPublisherMappingToDto(inputMappingProperties);
|
|
|
|
|
addPublisherMappingToDto(
|
|
|
|
|
adapterMappingConfiguration.getInputMappingProperties()
|
|
|
|
|
);
|
|
|
|
|
eventPublisherAdminServiceStub.deployMapEventPublisherConfiguration(publisherName
|
|
|
|
|
, eventStreamWithVersion, adapterType, inputMappingPropertyDtos
|
|
|
|
|
, basicOutputAdapterPropertyDtos, customMapping);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
org.wso2.carbon.event.publisher.stub.types.EventMappingPropertyDto[] correlationMappingPropertyDtos =
|
|
|
|
|
addPublisherMappingToDto(correlationMappingProperties);
|
|
|
|
|
addPublisherMappingToDto(
|
|
|
|
|
adapterMappingConfiguration.getCorrelationMappingProperties()
|
|
|
|
|
);
|
|
|
|
|
org.wso2.carbon.event.publisher.stub.types.EventMappingPropertyDto[] metaMappingPropertyDtos =
|
|
|
|
|
addPublisherMappingToDto(metaMappingProperties);
|
|
|
|
|
addPublisherMappingToDto(
|
|
|
|
|
adapterMappingConfiguration.getMetaMappingProperties()
|
|
|
|
|
);
|
|
|
|
|
org.wso2.carbon.event.publisher.stub.types.EventMappingPropertyDto[] payloadMappingPropertyDtos =
|
|
|
|
|
addPublisherMappingToDto(payloadMappingProperties);
|
|
|
|
|
|
|
|
|
|
addPublisherMappingToDto(
|
|
|
|
|
adapterMappingConfiguration.getPayloadMappingProperties()
|
|
|
|
|
);
|
|
|
|
|
eventPublisherAdminServiceStub.deployWSO2EventPublisherConfiguration(publisherName
|
|
|
|
|
, eventStreamWithVersion, adapterType, metaMappingPropertyDtos
|
|
|
|
|
, correlationMappingPropertyDtos, payloadMappingPropertyDtos
|
|
|
|
@ -604,41 +500,39 @@ public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifac
|
|
|
|
|
, eventStreamWithVersion);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
} finally {
|
|
|
|
|
cleanup(eventPublisherAdminServiceStub);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param name plan name
|
|
|
|
|
* @param isEdited is plan edited
|
|
|
|
|
* @param plan plan data
|
|
|
|
|
* @throws RemoteException exception that may occur during a remote method call
|
|
|
|
|
* @throws UserStoreException exception that may occur during JWT token generation
|
|
|
|
|
* @throws JWTClientException exception that may occur during connecting to client store
|
|
|
|
|
* @throws InvalidExecutionPlanException exception that may occur if execution plan validation fails
|
|
|
|
|
* Publish a siddhi execution plan using a stub
|
|
|
|
|
*
|
|
|
|
|
* @param name Plan name
|
|
|
|
|
* @param isEdited Is plan edited
|
|
|
|
|
* @param plan Plan data
|
|
|
|
|
* @throws RemoteException Exception that may occur during a remote method call
|
|
|
|
|
* @throws UserStoreException Exception that may occur during JWT token generation
|
|
|
|
|
* @throws JWTClientException Exception that may occur during connecting to client store
|
|
|
|
|
* @throws InvalidExecutionPlanException Exception that may occur if execution plan validation fails
|
|
|
|
|
*/
|
|
|
|
|
private void publishSiddhiExecutionPlan(String name, boolean isEdited,
|
|
|
|
|
String plan)
|
|
|
|
|
throws RemoteException, UserStoreException, JWTClientException,
|
|
|
|
|
InvalidExecutionPlanException {
|
|
|
|
|
throws RemoteException, UserStoreException, JWTClientException {
|
|
|
|
|
EventProcessorAdminServiceStub eventProcessorAdminServiceStub = null;
|
|
|
|
|
try {
|
|
|
|
|
eventProcessorAdminServiceStub = DeviceMgtAPIUtils.getEventProcessorAdminServiceStub();
|
|
|
|
|
// Validate the plan code
|
|
|
|
|
String validationResponse = eventProcessorAdminServiceStub.validateExecutionPlan(plan);
|
|
|
|
|
if (validationResponse.equals("success")) {
|
|
|
|
|
if (!isEdited) {
|
|
|
|
|
// Create a new plan
|
|
|
|
|
eventProcessorAdminServiceStub.deployExecutionPlan(plan);
|
|
|
|
|
} else {
|
|
|
|
|
// Edit plan
|
|
|
|
|
eventProcessorAdminServiceStub.editActiveExecutionPlan(plan, name);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
ErrorDTO errorDTO = new ErrorDTO();
|
|
|
|
|
errorDTO.setMessage(validationResponse);
|
|
|
|
|
log.error(validationResponse);
|
|
|
|
|
throw new InvalidExecutionPlanException(errorDTO);
|
|
|
|
|
}
|
|
|
|
|
} finally {
|
|
|
|
@ -646,18 +540,74 @@ public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifac
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* This will set payload of event attribute's mapping to the DTO
|
|
|
|
|
*
|
|
|
|
|
* @param attributes list of event attributes
|
|
|
|
|
* @return DTO with all the event attributes
|
|
|
|
|
*/
|
|
|
|
|
private EventStreamAttributeDto[] addEventAttributesToDto(List<Attribute> attributes) {
|
|
|
|
|
EventStreamAttributeDto[] eventStreamAttributeDtos = new EventStreamAttributeDto[attributes.size()];
|
|
|
|
|
for (int i = 0; i < attributes.size(); i++) {
|
|
|
|
|
for (Attribute attribute : attributes) {
|
|
|
|
|
EventStreamAttributeDto eventStreamAttributeDto = new EventStreamAttributeDto();
|
|
|
|
|
eventStreamAttributeDto.setAttributeName(attributes.get(i).getName());
|
|
|
|
|
eventStreamAttributeDto.setAttributeType(attributes.get(i).getType().toString());
|
|
|
|
|
eventStreamAttributeDtos[i] = eventStreamAttributeDto;
|
|
|
|
|
eventStreamAttributeDto.setAttributeName(attribute.getName());
|
|
|
|
|
eventStreamAttributeDto.setAttributeType(attribute.getType().toString());
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
return eventStreamAttributeDtos;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Validate adapter payload attributes
|
|
|
|
|
*
|
|
|
|
|
* @param adapterProperties Adapter payload attributes
|
|
|
|
|
*/
|
|
|
|
|
private void validateAdapterProperties(List<AdapterProperty> adapterProperties) {
|
|
|
|
|
if (adapterProperties.isEmpty()) {
|
|
|
|
|
String errMsg = "Invalid payload: event property attributes invalid!!!";
|
|
|
|
|
ErrorResponse errorResponse = new ErrorResponse();
|
|
|
|
|
errorResponse.setMessage(errMsg);
|
|
|
|
|
log.error(errMsg);
|
|
|
|
|
throw new BadRequestException(errorResponse);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Validate adapter mapping attributes
|
|
|
|
|
* <p>
|
|
|
|
|
* Conditions
|
|
|
|
|
* - if both inputMappingProperties and namespaceMappingProperties null check remaining property lists
|
|
|
|
|
* - if all correlationMappingProperties, payloadMappingProperties, metaMappingProperties null log error
|
|
|
|
|
* - if message format is null change the final result to TRUE
|
|
|
|
|
* - else continue
|
|
|
|
|
*
|
|
|
|
|
* @param adapterMappingConfiguration Adapter mapping attributes
|
|
|
|
|
*/
|
|
|
|
|
private void validateAdapterMapping(AdapterMappingConfiguration adapterMappingConfiguration) {
|
|
|
|
|
|
|
|
|
|
if (adapterMappingConfiguration.getInputMappingString() == null &&
|
|
|
|
|
(adapterMappingConfiguration.getInputMappingProperties().isEmpty() &&
|
|
|
|
|
adapterMappingConfiguration.getNamespaceMappingProperties().isEmpty()) &&
|
|
|
|
|
(
|
|
|
|
|
adapterMappingConfiguration.getCorrelationMappingProperties().isEmpty() &&
|
|
|
|
|
adapterMappingConfiguration.getPayloadMappingProperties().isEmpty() &&
|
|
|
|
|
adapterMappingConfiguration.getMetaMappingProperties().isEmpty()
|
|
|
|
|
)
|
|
|
|
|
|| adapterMappingConfiguration.getMessageFormat() == null) {
|
|
|
|
|
String errMsg = "Invalid payload: event mapping attributes invalid!!!";
|
|
|
|
|
ErrorResponse errorResponse = new ErrorResponse();
|
|
|
|
|
errorResponse.setMessage(errMsg);
|
|
|
|
|
log.error(errMsg);
|
|
|
|
|
throw new BadRequestException(errorResponse);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* This will set payload of receiver attributes to the DTO
|
|
|
|
|
*
|
|
|
|
|
* @param adapterProperties List of receiver attributes
|
|
|
|
|
* @return DTO with all the receiver attributes
|
|
|
|
|
*/
|
|
|
|
|
private BasicInputAdapterPropertyDto[] addReceiverConfigToDto(
|
|
|
|
|
List<AdapterProperty> adapterProperties) {
|
|
|
|
|
BasicInputAdapterPropertyDto[] basicInputAdapterPropertyDtos
|
|
|
|
@ -671,7 +621,14 @@ public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifac
|
|
|
|
|
return basicInputAdapterPropertyDtos;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private EventMappingPropertyDto[] addReceiverMappingToDto(List<MappingProperty> mapProperties) {
|
|
|
|
|
/**
|
|
|
|
|
* This will set payload of receiver mapping attributes to the DTO
|
|
|
|
|
*
|
|
|
|
|
* @param mapProperties List of receiver mapping attributes
|
|
|
|
|
* @return DTO with all the receiver mapping attributes
|
|
|
|
|
*/
|
|
|
|
|
private EventMappingPropertyDto[] addReceiverMappingToDto
|
|
|
|
|
(List<MappingProperty> mapProperties) {
|
|
|
|
|
EventMappingPropertyDto[] eventMappingPropertyDtos = new EventMappingPropertyDto[mapProperties.size()];
|
|
|
|
|
for (int i = 0; i < mapProperties.size(); i++) {
|
|
|
|
|
EventMappingPropertyDto eventMappingPropertyDto = new EventMappingPropertyDto();
|
|
|
|
@ -683,6 +640,12 @@ public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifac
|
|
|
|
|
return eventMappingPropertyDtos;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* This will set payload of publisher attributes to the DTO
|
|
|
|
|
*
|
|
|
|
|
* @param adapterProperties List of publisher attributes
|
|
|
|
|
* @return DTO with all the publisher attributes
|
|
|
|
|
*/
|
|
|
|
|
private BasicOutputAdapterPropertyDto[] addPublisherConfigToDto(
|
|
|
|
|
List<AdapterProperty> adapterProperties) {
|
|
|
|
|
BasicOutputAdapterPropertyDto[] basicOutputAdapterPropertyDtos =
|
|
|
|
@ -697,6 +660,12 @@ public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifac
|
|
|
|
|
return basicOutputAdapterPropertyDtos;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* This will set payload of publisher mapping attributes to the DTO
|
|
|
|
|
*
|
|
|
|
|
* @param mapProperties List of publisher mapping attributes
|
|
|
|
|
* @return DTO with all the publisher mapping attributes
|
|
|
|
|
*/
|
|
|
|
|
private org.wso2.carbon.event.publisher.stub.types.EventMappingPropertyDto[] addPublisherMappingToDto
|
|
|
|
|
(List<MappingProperty> mapProperties) {
|
|
|
|
|
org.wso2.carbon.event.publisher.stub.types.EventMappingPropertyDto[] eventMappingPropertyDtos
|
|
|
|
@ -712,6 +681,11 @@ public class AnalyticsArtifactsManagementServiceImpl implements AnalyticsArtifac
|
|
|
|
|
return eventMappingPropertyDtos;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Clean Service client in the stub
|
|
|
|
|
*
|
|
|
|
|
* @param stub Stud that needs to be cleaned
|
|
|
|
|
*/
|
|
|
|
|
private void cleanup(Stub stub) {
|
|
|
|
|
if (stub != null) {
|
|
|
|
|
try {
|
|
|
|
|