@ -26,8 +26,6 @@ import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.AdapterConfiguration;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.AdapterProperty ;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.MessageFormat ;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.SiddhiExecutionPlan ;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.EventPublisher ;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.EventReceiver ;
import org.wso2.carbon.device.mgt.jaxrs.exception.BadRequestException ;
import org.wso2.carbon.device.mgt.jaxrs.exception.ErrorDTO ;
import org.wso2.carbon.device.mgt.jaxrs.exception.InvalidExecutionPlanException ;
@ -118,22 +116,22 @@ public class AnalyticsArtifactsManagementServiceImpl
public Response deployEventDefinitionAsDto ( @Valid EventStream stream ) {
String tenantDomain = PrivilegedCarbonContext . getThreadLocalCarbonContext ( ) . getTenantDomain ( ) ;
try {
publish Stream( stream ) ;
deploy Stream( stream ) ;
return Response . ok ( ) . build ( ) ;
} catch ( AxisFault e ) {
String errMsg = "Failed to create event definitions for tenant Domain: " + tenantDomain ;
String errMsg = "Failed to create event definitions for tenant " + tenantDomain ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . entity ( errMsg ) . build ( ) ;
} catch ( RemoteException e ) {
String errMsg = "Failed to connect with the remote services for tenant Domain: " + tenantDomain ;
String errMsg = "Failed to connect with the remote services for tenant " + tenantDomain ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . entity ( errMsg ) . build ( ) ;
} catch ( JWTClientException e ) {
String errMsg = "Failed to generate jwt token for tenant Domain: " + tenantDomain ;
String errMsg = "Failed to generate jwt token for tenant " + tenantDomain ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . entity ( errMsg ) . build ( ) ;
} catch ( UserStoreException e ) {
String errMsg = "Failed to connect with the user store for tenant Domain: " + tenantDomain ;
String errMsg = "Failed to connect with the user store for tenant " + tenantDomain ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . entity ( errMsg ) . build ( ) ;
}
@ -144,7 +142,7 @@ public class AnalyticsArtifactsManagementServiceImpl
@Path ( "/receiver/{name}" )
public Response deployEventReceiverAsString ( @PathParam ( "name" ) String name ,
@QueryParam ( "isEdited" ) boolean isEdited ,
@Valid EventReceiv er receiver ) {
@Valid Adapt er receiver ) {
String tenantDomain = PrivilegedCarbonContext . getThreadLocalCarbonContext ( ) . getTenantDomain ( ) ;
EventReceiverAdminServiceStub eventReceiverAdminServiceStub ;
try {
@ -155,20 +153,24 @@ public class AnalyticsArtifactsManagementServiceImpl
} else {
eventReceiverAdminServiceStub . editActiveEventReceiverConfiguration ( receiverDefinition , name ) ;
}
return Response . ok ( ) . build ( ) ;
} catch ( AxisFault e ) {
log . error ( "Failed to create event definitions for tenantDomain: " + tenantDomain , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . build ( ) ;
String errMsg = "Failed to create event definitions for tenantDomain: " + tenantDomain ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . entity ( errMsg ) . build ( ) ;
} catch ( RemoteException e ) {
log . error ( "Failed to connect with the remote services for tenantDomain: " + tenantDomain , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . build ( ) ;
String errMsg = "Failed to connect with the remote services for tenantDomain: " + tenantDomain ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . entity ( errMsg ) . build ( ) ;
} catch ( JWTClientException e ) {
log . error ( "Failed to generate jwt token for tenantDomain: " + tenantDomain , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . build ( ) ;
String errMsg = "Failed to generate jwt token for tenantDomain: " + tenantDomain ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . entity ( errMsg ) . build ( ) ;
} catch ( UserStoreException e ) {
log . error ( "Failed to connect with the user store for tenantDomain: " + tenantDomain , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . build ( ) ;
String errMsg = "Failed to connect with the user store for tenantDomain: " + tenantDomain ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . entity ( errMsg ) . build ( ) ;
}
return Response . ok ( ) . entity ( name ) . build ( ) ;
}
@Override
@ -176,59 +178,31 @@ public class AnalyticsArtifactsManagementServiceImpl
@Path ( "/receiver" )
public Response deployEventReceiverAsDto ( @Valid Adapter receiver ) {
String tenantDomain = PrivilegedCarbonContext . getThreadLocalCarbonContext ( ) . getTenantDomain ( ) ;
String receiverName = receiver . getAdapterName ( ) ;
String adapterType = receiver . getAdapterType ( ) . toStringFormatted ( ) ;
AdapterConfiguration adapterConfiguration = receiver . getAdapterConfiguration ( ) ;
AdapterMappingConfiguration adapterMappingConfiguration = adapterConfiguration . getAdapterMappingConfiguration ( ) ;
try {
List < AdapterProperty > adapterProperties = adapterConfiguration . getAdapterProperties ( ) ;
if ( adapterProperties = = null ) {
log . error ( "Invalid attribute payload" ) ;
return Response . status ( Response . Status . BAD_REQUEST ) . build ( ) ;
}
AdapterConfiguration adapterConfiguration = receiver . getAdapterConfiguration ( ) ;
boolean customMapping = adapterConfiguration . isCustomMappingEnabled ( ) ;
List < MappingProperty > inputMappingProperties = adapterMappingConfiguration . getInputMappingProperties ( ) ;
List < MappingProperty > namespaceMappingProperties = adapterMappingConfiguration . getNamespaceMappingProperties ( ) ;
List < MappingProperty > correlationMappingProperties = adapterMappingConfiguration . getCorrelationMappingProperties ( ) ;
List < MappingProperty > payloadMappingProperties = adapterMappingConfiguration . getPayloadMappingProperties ( ) ;
List < MappingProperty > metaMappingProperties = adapterMappingConfiguration . getMetaMappingProperties ( ) ;
MessageFormat messageFormat = adapterMappingConfiguration . getMessageFormat ( ) ;
/ *
* Conditions
* - if CustomMappingEnabled check validity of property lists
* - if both inputMappingProperties and namespaceMappingProperties null check remaining property lists
* - if all correlationMappingProperties , payloadMappingProperties , metaMappingProperties null log error
* - if message format is null change the final result to TRUE
* - else continue
* * /
if ( ( customMapping & &
( inputMappingProperties = = null & & namespaceMappingProperties = = null ) & &
( correlationMappingProperties = = null & & payloadMappingProperties = = null & &
metaMappingProperties = = null ) ) | | messageFormat = = null ) {
String errMsg = "Invalid mapping payload" ;
log . error ( errMsg ) ;
return Response . status ( Response . Status . BAD_REQUEST ) . entity ( errMsg ) . build ( ) ;
validateAdapterProperties ( adapterConfiguration . getAdapterProperties ( ) ) ;
if ( customMapping ) {
validateAdapterMapping ( adapterConfiguration . getAdapterMappingConfiguration ( ) ) ;
}
String eventStreamWithVersion = receiver . getEventStreamWithVersion ( ) ;
publishReceiver ( receiverName , adapterType , adapterProperties , customMapping , inputMappingProperties ,
namespaceMappingProperties , correlationMappingProperties , payloadMappingProperties ,
metaMappingProperties , messageFormat , eventStreamWithVersion ) ;
deployReceiver ( receiver , customMapping , adapterConfiguration ) ;
return Response . ok ( ) . build ( ) ;
} catch ( AxisFault e ) {
log . error ( "Failed to create event definitions for tenantDomain: " + tenantDomain , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . build ( ) ;
String errMsg = "Failed to create event definitions for tenantDomain: " + tenantDomain ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . entity ( errMsg ) . build ( ) ;
} catch ( RemoteException e ) {
log . error ( "Failed to connect with the remote services for tenantDomain: " + tenantDomain , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . build ( ) ;
String errMsg = "Failed to connect with the remote services for tenantDomain: " + tenantDomain ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . entity ( errMsg ) . build ( ) ;
} catch ( JWTClientException e ) {
log . error ( "Failed to generate jwt token for tenantDomain: " + tenantDomain , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . build ( ) ;
String errMsg = "Failed to generate jwt token for tenantDomain: " + tenantDomain ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . entity ( errMsg ) . build ( ) ;
} catch ( UserStoreException e ) {
log . error ( "Failed to connect with the user store for tenantDomain: " + tenantDomain , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . build ( ) ;
String errMsg = "Failed to connect with the user store for tenantDomain: " + tenantDomain ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . entity ( errMsg ) . build ( ) ;
}
}
@ -237,33 +211,35 @@ public class AnalyticsArtifactsManagementServiceImpl
@Path ( "/publisher/{name}" )
public Response deployEventPublisherAsString ( @PathParam ( "name" ) String name ,
@QueryParam ( "isEdited" ) boolean isEdited ,
@Valid EventPublish er publisher ) {
@Valid Adapt er publisher ) {
String tenantDomain = PrivilegedCarbonContext . getThreadLocalCarbonContext ( ) . getTenantDomain ( ) ;
EventPublisherAdminServiceStub eventPublisherAdminServiceStub ;
try {
String publisherDefinition = publisher . getDefinition ( ) ;
eventPublisherAdminServiceStub = DeviceMgtAPIUtils . getEventPublisherAdminServiceStub ( ) ;
if ( ! isEdited ) {
eventPublisherAdminServiceStub . deployEventPublisherConfiguration ( publisherDefinition ) ;
} else {
eventPublisherAdminServiceStub . editActiveEventPublisherConfiguration ( publisherDefinition , name ) ;
}
return Response . ok ( ) . build ( ) ;
} catch ( AxisFault e ) {
log . error ( "Failed to create event definitions for tenantDomain: " + tenantDomain , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . build ( ) ;
String errMsg = "Failed to create event definitions for tenantDomain: " + tenantDomain ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . entity ( errMsg ) . build ( ) ;
} catch ( RemoteException e ) {
log . error ( "Failed to connect with the remote services for tenantDomain: " + tenantDomain , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . build ( ) ;
String errMsg = "Failed to connect with the remote services for tenantDomain: " + tenantDomain ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . entity ( errMsg ) . build ( ) ;
} catch ( JWTClientException e ) {
log . error ( "Failed to generate jwt token for tenantDomain: " + tenantDomain , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . build ( ) ;
String errMsg = "Failed to generate jwt token for tenantDomain: " + tenantDomain ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . entity ( errMsg ) . build ( ) ;
} catch ( UserStoreException e ) {
log . error ( "Failed to connect with the user store for tenantDomain: " + tenantDomain , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . build ( ) ;
String errMsg = "Failed to connect with the user store for tenantDomain: " + tenantDomain ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . entity ( errMsg ) . build ( ) ;
}
return Response . ok ( ) . entity ( publisher ) . build ( ) ;
}
@Override
@ -271,59 +247,31 @@ public class AnalyticsArtifactsManagementServiceImpl
@Path ( "/publisher" )
public Response deployEventPublisherAsDto ( @Valid Adapter publisher ) {
String tenantDomain = PrivilegedCarbonContext . getThreadLocalCarbonContext ( ) . getTenantDomain ( ) ;
String publisherName = publisher . getAdapterName ( ) ;
String adapterType = publisher . getAdapterType ( ) . toStringFormatted ( ) ;
AdapterConfiguration adapterConfiguration = publisher . getAdapterConfiguration ( ) ;
AdapterMappingConfiguration adapterMappingConfiguration = adapterConfiguration . getAdapterMappingConfiguration ( ) ;
try {
List < AdapterProperty > adapterProperties = adapterConfiguration . getAdapterProperties ( ) ;
if ( adapterProperties = = null ) {
log . error ( "Invalid attribute payload" ) ;
return Response . status ( Response . Status . BAD_REQUEST ) . build ( ) ;
}
validateAdapterProperties ( adapterConfiguration . getAdapterProperties ( ) ) ;
boolean customMapping = adapterConfiguration . isCustomMappingEnabled ( ) ;
String inputMappingString = adapterMappingConfiguration . getInputMappingString ( ) ;
List < MappingProperty > inputMappingProperties = adapterMappingConfiguration . getInputMappingProperties ( ) ;
List < MappingProperty > correlationMappingProperties = adapterMappingConfiguration . getCorrelationMappingProperties ( ) ;
List < MappingProperty > payloadMappingProperties = adapterMappingConfiguration . getPayloadMappingProperties ( ) ;
List < MappingProperty > metaMappingProperties = adapterMappingConfiguration . getMetaMappingProperties ( ) ;
MessageFormat messageFormat = adapterMappingConfiguration . getMessageFormat ( ) ;
/ *
* Conditions
* - if CustomMappingEnabled check validity of property lists
* - if all correlationMappingProperties , payloadMappingProperties , metaMappingProperties null log error
* - if message format is null change the final result to TRUE
* - else continue
* /
if ( ( customMapping & &
( inputMappingProperties = = null & & inputMappingString = = null ) & &
( correlationMappingProperties = = null & & payloadMappingProperties = = null & &
metaMappingProperties = = null ) ) | | messageFormat = = null ) {
String errMsg = "Invalid mapping payload" ;
log . error ( errMsg ) ;
return Response . status ( Response . Status . BAD_REQUEST ) . entity ( errMsg ) . build ( ) ;
if ( customMapping ) {
validateAdapterMapping ( adapterConfiguration . getAdapterMappingConfiguration ( ) ) ;
}
String eventStreamWithVersion = publisher . getEventStreamWithVersion ( ) ;
publishPublisher ( publisherName , adapterType , adapterProperties , customMapping
, inputMappingString , inputMappingProperties , correlationMappingProperties
, payloadMappingProperties , metaMappingProperties , messageFormat
, eventStreamWithVersion ) ;
deployPublisher ( publisher , customMapping , adapterConfiguration ) ;
return Response . ok ( ) . build ( ) ;
} catch ( AxisFault e ) {
log . error ( "Failed to create event definitions for tenantDomain: " + tenantDomain , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . build ( ) ;
String errMsg = "Failed to create event definitions for tenantDomain: " + tenantDomain ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . entity ( errMsg ) . build ( ) ;
} catch ( RemoteException e ) {
log . error ( "Failed to connect with the remote services for tenantDomain: " + tenantDomain , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . build ( ) ;
String errMsg = "Failed to connect with the remote services for tenantDomain: " + tenantDomain ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . entity ( errMsg ) . build ( ) ;
} catch ( JWTClientException e ) {
log . error ( "Failed to generate jwt token for tenantDomain: " + tenantDomain , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . build ( ) ;
String errMsg = "Failed to generate jwt token for tenantDomain: " + tenantDomain ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . entity ( errMsg ) . build ( ) ;
} catch ( UserStoreException e ) {
log . error ( "Failed to connect with the user store for tenantDomain: " + tenantDomain , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . build ( ) ;
String errMsg = "Failed to connect with the user store for tenantDomain: " + tenantDomain ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . entity ( errMsg ) . build ( ) ;
}
}
@ -338,46 +286,46 @@ public class AnalyticsArtifactsManagementServiceImpl
publishSiddhiExecutionPlan ( name , isEdited , plan . getDefinition ( ) ) ;
return Response . ok ( ) . build ( ) ;
} catch ( AxisFault e ) {
log . error ( "Failed to create event definitions for tenantDomain: " + tenantDomain , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . build ( ) ;
String errMsg = "Failed to create event definitions for tenantDomain: " + tenantDomain ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . entity ( errMsg ) . build ( ) ;
} catch ( RemoteException e ) {
log . error ( "Failed to connect with the remote services for tenantDomain: " + tenantDomain , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . build ( ) ;
} catch ( InvalidExecutionPlanException e ) {
log . error ( "Invalid Execution plan: " + tenantDomain , e ) ;
return e . getResponse ( ) ;
String errMsg = "Failed to connect with the remote services for tenantDomain: " + tenantDomain ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . entity ( errMsg ) . build ( ) ;
} catch ( JWTClientException e ) {
log . error ( "Failed to generate jwt token for tenantDomain: " + tenantDomain , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . build ( ) ;
String errMsg = "Failed to generate jwt token for tenantDomain: " + tenantDomain ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . entity ( errMsg ) . build ( ) ;
} catch ( UserStoreException e ) {
log . error ( "Failed to connect with the user store for tenantDomain: " + tenantDomain , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . build ( ) ;
String errMsg = "Failed to connect with the user store for tenantDomain: " + tenantDomain ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . INTERNAL_SERVER_ERROR ) . entity ( errMsg ) . build ( ) ;
}
}
/ * *
* Set data to a stream dto and publish dto using a stub
* Set data to a Stream dto and deploy dto through a stub
*
* @param stream Stream definition
* @throws RemoteException Exception that may occur during a remote method call
* @throws UserStoreException Exception that may occur during JWT token generation
* @throws JWTClientException Exception that may occur during connecting to client store
* /
private void publish Stream( EventStream stream )
private void deployStream ( EventStream stream )
throws RemoteException , UserStoreException , JWTClientException {
EventStreamAdminServiceStub eventStreamAdminServiceStub = null ;
List < Attribute > metaData = stream . getMetaData ( ) ;
List < Attribute > payloadData = stream . getPayloadData ( ) ;
List < Attribute > correlationData = stream . getCorrelationData ( ) ;
if ( metaData . isEmpty ( ) & & correlationData . isEmpty ( ) & & payloadData . isEmpty ( ) ) {
String errMsg = "Invalid payload: event mapping attributes invalid!!!" ;
String errMsg = String . format ( "Failed to validate Stream property attributes of %s:%s" ,
stream . getName ( ) , stream . getVersion ( ) ) ;
ErrorResponse errorResponse = new ErrorResponse ( ) ;
errorResponse . setMessage ( errMsg ) ;
log . error ( errMsg ) ;
throw new BadRequestException ( errorResponse ) ;
}
EventStreamAdminServiceStub eventStreamAdminServiceStub =
DeviceMgtAPIUtils . getEventStreamAdminServiceStub ( ) ;
try {
eventStreamAdminServiceStub = DeviceMgtAPIUtils . getEventStreamAdminServiceStub ( ) ;
EventStreamDefinitionDto eventStreamDefinitionDto = new EventStreamDefinitionDto ( ) ;
eventStreamDefinitionDto . setName ( stream . getName ( ) ) ;
eventStreamDefinitionDto . setVersion ( stream . getVersion ( ) ) ;
@ -386,6 +334,7 @@ public class AnalyticsArtifactsManagementServiceImpl
eventStreamDefinitionDto . setMetaData ( addEventAttributesToDto ( metaData ) ) ;
eventStreamDefinitionDto . setPayloadData ( addEventAttributesToDto ( payloadData ) ) ;
eventStreamDefinitionDto . setCorrelationData ( addEventAttributesToDto ( correlationData ) ) ;
String streamId = stream . getName ( ) + ":" + stream . getVersion ( ) ;
if ( eventStreamAdminServiceStub . getStreamDefinitionDto ( streamId ) ! = null ) {
eventStreamAdminServiceStub . editEventStreamDefinitionAsDto ( eventStreamDefinitionDto , streamId ) ;
@ -399,88 +348,75 @@ public class AnalyticsArtifactsManagementServiceImpl
}
/ * *
* Set data to a receiver dto and publish dto using a stub
* Set data to a receiver dto and deploy dto through a stub
*
* @param receiverName Receiver name
* @param adapterType Receiver type
* @param adapterProperties Receiver properties
* @param customMapping Is receiver mapped
* @param inputMappingProperties Receiver input attribute mapping
* @param namespaceMappingProperties Receiver name - scape attribute mapping
* @param correlationMappingProperties Receiver correlation attribute mapping
* @param payloadMappingProperties Receiver payload attribute mapping
* @param metaMappingProperties Receiver meta attribute mapping
* @param messageFormat Receiver mapping format
* @param eventStreamWithVersion Attached stream
* @param receiver Event Receiver adapter
* @param customMapping Is Receiver mapped
* @param adapterConfiguration Adapter property and mapping configuration
* @throws RemoteException Exception that may occur during a remote method call
* @throws UserStoreException Exception that may occur during JWT token generation
* @throws JWTClientException Exception that may occur during connecting to client store
* /
private void publishReceiver ( String receiverName , String adapterType ,
List < AdapterProperty > adapterProperties , boolean customMapping ,
List < MappingProperty > inputMappingProperties ,
List < MappingProperty > namespaceMappingProperties ,
List < MappingProperty > correlationMappingProperties ,
List < MappingProperty > payloadMappingProperties ,
List < MappingProperty > metaMappingProperties ,
MessageFormat messageFormat ,
String eventStreamWithVersion )
private void deployReceiver ( Adapter receiver , boolean customMapping ,
AdapterConfiguration adapterConfiguration )
throws RemoteException , UserStoreException , JWTClientException {
EventReceiverAdminServiceStub eventReceiverAdminServiceStub = DeviceMgtAPIUtils . getEventReceiverAdminServiceStub ( ) ;
try {
String receiverName = receiver . getAdapterName ( ) ;
String adapterType = receiver . getAdapterType ( ) . toStringFormatted ( ) ;
String eventStreamWithVersion = receiver . getEventStreamWithVersion ( ) ;
List < AdapterProperty > adapterProperties = adapterConfiguration . getAdapterProperties ( ) ;
EventReceiverConfigurationDto eventReceiverConfigurationDto = eventReceiverAdminServiceStub
. getActiveEventReceiverConfiguration ( receiverName ) ;
// Check if adapter already exists, if so un-deploy it
if ( eventReceiverConfigurationDto ! = null ) {
eventReceiverAdminServiceStub . undeployActiveEventReceiverConfiguration ( receiverName ) ;
}
BasicInputAdapterPropertyDto [ ] basicInputAdapterPropertyDtos = addReceiverConfigToDto ( adapterProperties ) ;
// Adding attribute properties to DTOs
BasicInputAdapterPropertyDto [ ] basicInputAdapterPropertyDtos =
addReceiverConfigToDto ( adapterProperties ) ;
if ( eventReceiverAdminServiceStub . getActiveEventReceiverConfiguration ( receiverName ) = = null ) {
// Call stub deploy methods according to the message format
if ( ! messageFormat . toString ( ) . equals ( "wso2event" ) ) {
EventMappingPropertyDto [ ] inputMappingPropertyDtos =
addReceiverMappingToDto ( inputMappingProperties ) ;
if ( messageFormat . toString ( ) . equals ( "xml" ) ) {
EventMappingPropertyDto [ ] namespaceMappingPropertyDtos =
addReceiverMappingToDto ( namespaceMappingProperties ) ;
eventReceiverAdminServiceStub . deployXmlEventReceiverConfiguration ( receiverName
, eventStreamWithVersion , adapterType , null
, namespaceMappingPropertyDtos , inputMappingPropertyDtos
AdapterMappingConfiguration adapterMappingConfiguration = adapterConfiguration . getAdapterMappingConfiguration ( ) ;
MessageFormat messageFormat = adapterMappingConfiguration . getMessageFormat ( ) ;
if ( ! messageFormat . toString ( ) . equals ( "wso2event" ) ) {
EventMappingPropertyDto [ ] inputMappingPropertyDtos =
addReceiverMappingToDto ( adapterMappingConfiguration . getInputMappingProperties ( ) ) ;
if ( messageFormat . toString ( ) . equals ( "xml" ) ) {
EventMappingPropertyDto [ ] namespaceMappingPropertyDtos =
addReceiverMappingToDto ( adapterMappingConfiguration . getNamespaceMappingProperties ( ) ) ;
eventReceiverAdminServiceStub . deployXmlEventReceiverConfiguration ( receiverName
, eventStreamWithVersion , adapterType , null
, namespaceMappingPropertyDtos , inputMappingPropertyDtos
, basicInputAdapterPropertyDtos , customMapping ) ;
} else {
if ( messageFormat . toString ( ) . equals ( "map" ) ) {
eventReceiverAdminServiceStub . deployMapEventReceiverConfiguration ( receiverName
, eventStreamWithVersion , adapterType , inputMappingPropertyDtos
, basicInputAdapterPropertyDtos , customMapping ) ;
} else if ( messageFormat . toString ( ) . equals ( "text" ) ) {
eventReceiverAdminServiceStub . deployTextEventReceiverConfiguration ( receiverName
, eventStreamWithVersion , adapterType , inputMappingPropertyDtos
, basicInputAdapterPropertyDtos , customMapping ) ;
} else {
if ( messageFormat . toString ( ) . equals ( "map" ) ) {
eventReceiverAdminServiceStub . deployMapEventReceiverConfiguration ( receiverName
, eventStreamWithVersion , adapterType , inputMappingPropertyDtos
, basicInputAdapterPropertyDtos , customMapping ) ;
} else if ( messageFormat . toString ( ) . equals ( "text" ) ) {
eventReceiverAdminServiceStub . deployTextEventReceiverConfiguration ( receiverName
, eventStreamWithVersion , adapterType , inputMappingPropertyDtos
, basicInputAdapterPropertyDtos , customMapping ) ;
} else {
eventReceiverAdminServiceStub . deployJsonEventReceiverConfiguration ( receiverName
, eventStreamWithVersion , adapterType , inputMappingPropertyDtos
, basicInputAdapterPropertyDtos , customMapping ) ;
}
eventReceiverAdminServiceStub . deployJsonEventReceiverConfiguration ( receiverName
, eventStreamWithVersion , adapterType , inputMappingPropertyDtos
, basicInputAdapterPropertyDtos , customMapping ) ;
}
} else {
EventMappingPropertyDto [ ] correlationMappingPropertyDtos = addReceiverMappingToDto ( correlationMappingProperties ) ;
EventMappingPropertyDto [ ] metaMappingPropertyDtos = addReceiverMappingToDto ( metaMappingProperties ) ;
EventMappingPropertyDto [ ] payloadMappingPropertyDtos = addReceiverMappingToDto ( payloadMappingProperties ) ;
eventReceiverAdminServiceStub . deployWso2EventReceiverConfiguration ( receiverName
, eventStreamWithVersion , adapterType , metaMappingPropertyDtos
, correlationMappingPropertyDtos , payloadMappingPropertyDtos
, basicInputAdapterPropertyDtos , customMapping
, eventStreamWithVersion ) ;
}
} else {
EventMappingPropertyDto [ ] correlationMappingPropertyDtos = addReceiverMappingToDto (
adapterMappingConfiguration . getCorrelationMappingProperties ( )
) ;
EventMappingPropertyDto [ ] metaMappingPropertyDtos = addReceiverMappingToDto (
adapterMappingConfiguration . getInputMappingProperties ( )
) ;
EventMappingPropertyDto [ ] payloadMappingPropertyDtos = addReceiverMappingToDto (
adapterMappingConfiguration . getPayloadMappingProperties ( )
) ;
eventReceiverAdminServiceStub . deployWso2EventReceiverConfiguration ( receiverName
, eventStreamWithVersion , adapterType , metaMappingPropertyDtos
, correlationMappingPropertyDtos , payloadMappingPropertyDtos
, basicInputAdapterPropertyDtos , customMapping
, eventStreamWithVersion ) ;
}
} finally {
cleanup ( eventReceiverAdminServiceStub ) ;
@ -488,88 +424,82 @@ public class AnalyticsArtifactsManagementServiceImpl
}
/ * *
* Set data to a publisher dto and publish dto using a stub
* Set data to a publisher dto and deploy dto through a stub
*
* @param publisherName Publisher name
* @param adapterType Publisher type
* @param adapterProperties Publisher properties
* @param customMapping Is publisher mapped
* @param correlationMappingProperties Publisher correlation attribute mapping
* @param payloadMappingProperties Publisher payload attribute mapping
* @param metaMappingProperties Publisher meta attribute mapping
* @param messageFormat Publisher mapping format
* @param eventStreamWithVersion Attached stream
* @param publisher Event Publisher adapter
* @param customMapping Is Publisher mapped
* @param adapterConfiguration Publisher property and mapping configuration
* @throws RemoteException Exception that may occur during a remote method call
* @throws UserStoreException Exception that may occur during JWT token generation
* @throws JWTClientException Exception that may occur during connecting to client store
* /
private void publishPublisher ( String publisherName , String adapterType ,
List < AdapterProperty > adapterProperties ,
boolean customMapping ,
String inputMappingString ,
List < MappingProperty > inputMappingProperties ,
List < MappingProperty > correlationMappingProperties ,
List < MappingProperty > payloadMappingProperties ,
List < MappingProperty > metaMappingProperties ,
MessageFormat messageFormat ,
String eventStreamWithVersion )
private void deployPublisher ( Adapter publisher , boolean customMapping ,
AdapterConfiguration adapterConfiguration )
throws RemoteException , UserStoreException , JWTClientException {
EventPublisherAdminServiceStub eventPublisherAdminServiceStub = DeviceMgtAPIUtils . getEventPublisherAdminServiceStub ( ) ;
// Check if adapter already exists, if so un-deploy it
try {
String publisherName = publisher . getAdapterName ( ) ;
String adapterType = publisher . getAdapterType ( ) . toStringFormatted ( ) ;
String eventStreamWithVersion = publisher . getEventStreamWithVersion ( ) ;
List < AdapterProperty > adapterProperties = adapterConfiguration . getAdapterProperties ( ) ;
EventPublisherConfigurationDto eventPublisherConfigurationDto = eventPublisherAdminServiceStub
. getActiveEventPublisherConfiguration ( publisherName ) ;
if ( eventPublisherConfigurationDto ! = null ) {
eventPublisherAdminServiceStub . undeployActiveEventPublisherConfiguration ( publisherName ) ;
}
// Adding attribute properties to DTOs
BasicOutputAdapterPropertyDto [ ] basicOutputAdapterPropertyDtos =
addPublisherConfigToDto ( adapterProperties ) ;
if ( eventPublisherAdminServiceStub . getActiveEventPublisherConfiguration ( publisherName ) = = null ) {
// Call stub deploy methods according to the message format
if ( ! messageFormat . toString ( ) . equals ( "wso2event" ) ) {
if ( ! messageFormat . toString ( ) . equals ( "map" ) ) {
if ( messageFormat . toString ( ) . equals ( "xml" ) ) {
eventPublisherAdminServiceStub . deployXmlEventPublisherConfiguration ( publisherName
, eventStreamWithVersion , adapterType , inputMappingString
, basicOutputAdapterPropertyDtos , eventStreamWithVersion
, customMapping ) ;
} else if ( messageFormat . toString ( ) . equals ( "text" ) ) {
eventPublisherAdminServiceStub . deployTextEventPublisherConfiguration ( publisherName
, eventStreamWithVersion , adapterType , inputMappingString
, basicOutputAdapterPropertyDtos , eventStreamWithVersion
, customMapping ) ;
} else {
eventPublisherAdminServiceStub . deployJsonEventPublisherConfiguration ( publisherName
, eventStreamWithVersion , adapterType , inputMappingString
, basicOutputAdapterPropertyDtos , eventStreamWithVersion
, customMapping ) ;
}
AdapterMappingConfiguration adapterMappingConfiguration = adapterConfiguration . getAdapterMappingConfiguration ( ) ;
MessageFormat messageFormat = adapterMappingConfiguration . getMessageFormat ( ) ;
if ( ! messageFormat . toString ( ) . equals ( "wso2event" ) ) {
if ( ! messageFormat . toString ( ) . equals ( "map" ) ) {
if ( messageFormat . toString ( ) . equals ( "xml" ) ) {
eventPublisherAdminServiceStub . deployXmlEventPublisherConfiguration ( publisherName
, eventStreamWithVersion , adapterType , adapterMappingConfiguration . getInputMappingString ( )
, basicOutputAdapterPropertyDtos , eventStreamWithVersion
, customMapping ) ;
} else if ( messageFormat . toString ( ) . equals ( "text" ) ) {
eventPublisherAdminServiceStub . deployTextEventPublisherConfiguration ( publisherName
, eventStreamWithVersion , adapterType , adapterMappingConfiguration . getInputMappingString ( )
, basicOutputAdapterPropertyDtos , eventStreamWithVersion
, customMapping ) ;
} else {
org . wso2 . carbon . event . publisher . stub . types . EventMappingPropertyDto [ ] inputMappingPropertyDtos =
addPublisherMappingToDto ( inputMappingProperties ) ;
eventPublisherAdminServiceStub . deployMapEventPublisherConfiguration ( publisherName
, eventStreamWithVersion , adapterType , inputMappingPropertyDtos
, basicOutputAdapterPropertyDtos , customMapping ) ;
eventPublisherAdminServiceStub . deployJsonEventPublisherConfiguration ( publisherName
, eventStreamWithVersion , adapterType , adapterMappingConfiguration . getInputMappingString ( )
, basicOutputAdapterPropertyDtos , eventStreamWithVersion
, customMapping ) ;
}
} else {
org . wso2 . carbon . event . publisher . stub . types . EventMappingPropertyDto [ ] correlationMappingPropertyDtos =
addPublisherMappingToDto ( correlationMappingProperties ) ;
org . wso2 . carbon . event . publisher . stub . types . EventMappingPropertyDto [ ] metaMappingPropertyDtos =
addPublisherMappingToDto ( metaMappingProperties ) ;
org . wso2 . carbon . event . publisher . stub . types . EventMappingPropertyDto [ ] payloadMappingPropertyDtos =
addPublisherMappingToDto ( payloadMappingProperties ) ;
eventPublisherAdminServiceStub . deployWSO2EventPublisherConfiguration ( publisherName
, eventStreamWithVersion , adapterType , metaMappingPropertyDtos
, correlationMappingPropertyDtos , payloadMappingPropertyDtos
, basicOutputAdapterPropertyDtos , customMapping
, eventStreamWithVersion ) ;
org . wso2 . carbon . event . publisher . stub . types . EventMappingPropertyDto [ ] inputMappingPropertyDtos =
addPublisherMappingToDto (
adapterMappingConfiguration . getInputMappingProperties ( )
) ;
eventPublisherAdminServiceStub . deployMapEventPublisherConfiguration ( publisherName
, eventStreamWithVersion , adapterType , inputMappingPropertyDtos
, basicOutputAdapterPropertyDtos , customMapping ) ;
}
} else {
org . wso2 . carbon . event . publisher . stub . types . EventMappingPropertyDto [ ] correlationMappingPropertyDtos =
addPublisherMappingToDto (
adapterMappingConfiguration . getCorrelationMappingProperties ( )
) ;
org . wso2 . carbon . event . publisher . stub . types . EventMappingPropertyDto [ ] metaMappingPropertyDtos =
addPublisherMappingToDto (
adapterMappingConfiguration . getMetaMappingProperties ( )
) ;
org . wso2 . carbon . event . publisher . stub . types . EventMappingPropertyDto [ ] payloadMappingPropertyDtos =
addPublisherMappingToDto (
adapterMappingConfiguration . getPayloadMappingProperties ( )
) ;
eventPublisherAdminServiceStub . deployWSO2EventPublisherConfiguration ( publisherName
, eventStreamWithVersion , adapterType , metaMappingPropertyDtos
, correlationMappingPropertyDtos , payloadMappingPropertyDtos
, basicOutputAdapterPropertyDtos , customMapping
, eventStreamWithVersion ) ;
}
} finally {
cleanup ( eventPublisherAdminServiceStub ) ;
}
@ -588,24 +518,21 @@ public class AnalyticsArtifactsManagementServiceImpl
* /
private void publishSiddhiExecutionPlan ( String name , boolean isEdited ,
String plan )
throws RemoteException , UserStoreException , JWTClientException ,
InvalidExecutionPlanException {
throws RemoteException , UserStoreException , JWTClientException {
EventProcessorAdminServiceStub eventProcessorAdminServiceStub = null ;
try {
eventProcessorAdminServiceStub = DeviceMgtAPIUtils . getEventProcessorAdminServiceStub ( ) ;
// Validate the plan code
String validationResponse = eventProcessorAdminServiceStub . validateExecutionPlan ( plan ) ;
if ( validationResponse . equals ( "success" ) ) {
if ( ! isEdited ) {
// Create a new plan
eventProcessorAdminServiceStub . deployExecutionPlan ( plan ) ;
} else {
// Edit plan
eventProcessorAdminServiceStub . editActiveExecutionPlan ( plan , name ) ;
}
} else {
ErrorDTO errorDTO = new ErrorDTO ( ) ;
errorDTO . setMessage ( validationResponse ) ;
log . error ( validationResponse ) ;
throw new InvalidExecutionPlanException ( errorDTO ) ;
}
} finally {
@ -630,6 +557,51 @@ public class AnalyticsArtifactsManagementServiceImpl
return eventStreamAttributeDtos ;
}
/ * *
* Validate adapter payload attributes
*
* @param adapterProperties Adapter payload attributes
* /
private void validateAdapterProperties ( List < AdapterProperty > adapterProperties ) {
if ( adapterProperties . isEmpty ( ) ) {
String errMsg = "Invalid payload: event property attributes invalid!!!" ;
ErrorResponse errorResponse = new ErrorResponse ( ) ;
errorResponse . setMessage ( errMsg ) ;
log . error ( errMsg ) ;
throw new BadRequestException ( errorResponse ) ;
}
}
/ * *
* Validate adapter mapping attributes
* < p >
* Conditions
* - if both inputMappingProperties and namespaceMappingProperties null check remaining property lists
* - if all correlationMappingProperties , payloadMappingProperties , metaMappingProperties null log error
* - if message format is null change the final result to TRUE
* - else continue
*
* @param adapterMappingConfiguration Adapter mapping attributes
* /
private void validateAdapterMapping ( AdapterMappingConfiguration adapterMappingConfiguration ) {
if ( adapterMappingConfiguration . getInputMappingString ( ) = = null & &
( adapterMappingConfiguration . getInputMappingProperties ( ) . isEmpty ( ) & &
adapterMappingConfiguration . getNamespaceMappingProperties ( ) . isEmpty ( ) ) & &
(
adapterMappingConfiguration . getCorrelationMappingProperties ( ) . isEmpty ( ) & &
adapterMappingConfiguration . getPayloadMappingProperties ( ) . isEmpty ( ) & &
adapterMappingConfiguration . getMetaMappingProperties ( ) . isEmpty ( )
)
| | adapterMappingConfiguration . getMessageFormat ( ) = = null ) {
String errMsg = "Invalid payload: event mapping attributes invalid!!!" ;
ErrorResponse errorResponse = new ErrorResponse ( ) ;
errorResponse . setMessage ( errMsg ) ;
log . error ( errMsg ) ;
throw new BadRequestException ( errorResponse ) ;
}
}
/ * *
* This will set payload of receiver attributes to the DTO
*
@ -655,7 +627,8 @@ public class AnalyticsArtifactsManagementServiceImpl
* @param mapProperties List of receiver mapping attributes
* @return DTO with all the receiver mapping attributes
* /
private EventMappingPropertyDto [ ] addReceiverMappingToDto ( List < MappingProperty > mapProperties ) {
private EventMappingPropertyDto [ ] addReceiverMappingToDto
( List < MappingProperty > mapProperties ) {
EventMappingPropertyDto [ ] eventMappingPropertyDtos = new EventMappingPropertyDto [ mapProperties . size ( ) ] ;
for ( int i = 0 ; i < mapProperties . size ( ) ; i + + ) {
EventMappingPropertyDto eventMappingPropertyDto = new EventMappingPropertyDto ( ) ;