@ -18,7 +18,6 @@
package org.wso2.carbon.device.mgt.jaxrs.service.impl ;
package org.wso2.carbon.device.mgt.jaxrs.service.impl ;
import org.wso2.carbon.context.PrivilegedCarbonContext ;
import org.wso2.carbon.context.PrivilegedCarbonContext ;
import org.wso2.carbon.device.mgt.jaxrs.beans.ErrorResponse ;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.Attribute ;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.Attribute ;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.AdapterMappingConfiguration ;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.AdapterMappingConfiguration ;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.MappingProperty ;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.MappingProperty ;
@ -116,8 +115,13 @@ public class AnalyticsArtifactsManagementServiceImpl
public Response deployEventDefinitionAsDto ( @Valid EventStream stream ) {
public Response deployEventDefinitionAsDto ( @Valid EventStream stream ) {
String tenantDomain = PrivilegedCarbonContext . getThreadLocalCarbonContext ( ) . getTenantDomain ( ) ;
String tenantDomain = PrivilegedCarbonContext . getThreadLocalCarbonContext ( ) . getTenantDomain ( ) ;
try {
try {
validateStreamProperties ( stream ) ;
deployStream ( stream ) ;
deployStream ( stream ) ;
return Response . ok ( ) . build ( ) ;
return Response . ok ( ) . build ( ) ;
} catch ( BadRequestException e ) {
String errMsg = "Failed to deploy stream due to invalid payload" ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . BAD_REQUEST ) . entity ( errMsg ) . build ( ) ;
} catch ( AxisFault e ) {
} catch ( AxisFault e ) {
String errMsg = "Failed to create event definitions for tenant " + tenantDomain ;
String errMsg = "Failed to create event definitions for tenant " + tenantDomain ;
log . error ( errMsg , e ) ;
log . error ( errMsg , e ) ;
@ -187,6 +191,10 @@ public class AnalyticsArtifactsManagementServiceImpl
}
}
deployReceiver ( receiver , customMapping , adapterConfiguration ) ;
deployReceiver ( receiver , customMapping , adapterConfiguration ) ;
return Response . ok ( ) . build ( ) ;
return Response . ok ( ) . build ( ) ;
} catch ( BadRequestException e ) {
String errMsg = "Failed to deploy receiver due to invalid payload" ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . BAD_REQUEST ) . entity ( errMsg ) . build ( ) ;
} catch ( AxisFault e ) {
} catch ( AxisFault e ) {
String errMsg = "Failed to create event definitions for tenantDomain: " + tenantDomain ;
String errMsg = "Failed to create event definitions for tenantDomain: " + tenantDomain ;
log . error ( errMsg , e ) ;
log . error ( errMsg , e ) ;
@ -256,6 +264,10 @@ public class AnalyticsArtifactsManagementServiceImpl
}
}
deployPublisher ( publisher , customMapping , adapterConfiguration ) ;
deployPublisher ( publisher , customMapping , adapterConfiguration ) ;
return Response . ok ( ) . build ( ) ;
return Response . ok ( ) . build ( ) ;
} catch ( BadRequestException e ) {
String errMsg = "Failed to deploy publisher due to invalid payload" ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . BAD_REQUEST ) . entity ( errMsg ) . build ( ) ;
} catch ( AxisFault e ) {
} catch ( AxisFault e ) {
String errMsg = "Failed to create event definitions for tenantDomain: " + tenantDomain ;
String errMsg = "Failed to create event definitions for tenantDomain: " + tenantDomain ;
log . error ( errMsg , e ) ;
log . error ( errMsg , e ) ;
@ -283,8 +295,12 @@ public class AnalyticsArtifactsManagementServiceImpl
@Valid SiddhiExecutionPlan plan ) {
@Valid SiddhiExecutionPlan plan ) {
String tenantDomain = PrivilegedCarbonContext . getThreadLocalCarbonContext ( ) . getTenantDomain ( ) ;
String tenantDomain = PrivilegedCarbonContext . getThreadLocalCarbonContext ( ) . getTenantDomain ( ) ;
try {
try {
publish SiddhiExecutionPlan( name , isEdited , plan . getDefinition ( ) ) ;
deploy SiddhiExecutionPlan( name , isEdited , plan . getDefinition ( ) ) ;
return Response . ok ( ) . build ( ) ;
return Response . ok ( ) . build ( ) ;
} catch ( InvalidExecutionPlanException e ) {
String errMsg = "Failed to deploy siddhi script due to invalid payload" ;
log . error ( errMsg , e ) ;
return Response . status ( Response . Status . BAD_REQUEST ) . entity ( errMsg ) . build ( ) ;
} catch ( AxisFault e ) {
} catch ( AxisFault e ) {
String errMsg = "Failed to create event definitions for tenantDomain: " + tenantDomain ;
String errMsg = "Failed to create event definitions for tenantDomain: " + tenantDomain ;
log . error ( errMsg , e ) ;
log . error ( errMsg , e ) ;
@ -315,14 +331,6 @@ public class AnalyticsArtifactsManagementServiceImpl
List < Attribute > metaData = stream . getMetaData ( ) ;
List < Attribute > metaData = stream . getMetaData ( ) ;
List < Attribute > payloadData = stream . getPayloadData ( ) ;
List < Attribute > payloadData = stream . getPayloadData ( ) ;
List < Attribute > correlationData = stream . getCorrelationData ( ) ;
List < Attribute > correlationData = stream . getCorrelationData ( ) ;
if ( metaData . isEmpty ( ) & & correlationData . isEmpty ( ) & & payloadData . isEmpty ( ) ) {
String errMsg = String . format ( "Failed to validate Stream property attributes of %s:%s" ,
stream . getName ( ) , stream . getVersion ( ) ) ;
ErrorResponse errorResponse = new ErrorResponse ( ) ;
errorResponse . setMessage ( errMsg ) ;
log . error ( errMsg ) ;
throw new BadRequestException ( errorResponse ) ;
}
try {
try {
eventStreamAdminServiceStub = DeviceMgtAPIUtils . getEventStreamAdminServiceStub ( ) ;
eventStreamAdminServiceStub = DeviceMgtAPIUtils . getEventStreamAdminServiceStub ( ) ;
@ -331,10 +339,15 @@ public class AnalyticsArtifactsManagementServiceImpl
eventStreamDefinitionDto . setVersion ( stream . getVersion ( ) ) ;
eventStreamDefinitionDto . setVersion ( stream . getVersion ( ) ) ;
eventStreamDefinitionDto . setNickName ( stream . getNickName ( ) ) ;
eventStreamDefinitionDto . setNickName ( stream . getNickName ( ) ) ;
eventStreamDefinitionDto . setDescription ( stream . getDescription ( ) ) ;
eventStreamDefinitionDto . setDescription ( stream . getDescription ( ) ) ;
eventStreamDefinitionDto . setMetaData ( addEventAttributesToDto ( metaData ) ) ;
if ( metaData ! = null ) {
eventStreamDefinitionDto . setPayloadData ( addEventAttributesToDto ( payloadData ) ) ;
eventStreamDefinitionDto . setMetaData ( addEventAttributesToDto ( metaData ) ) ;
eventStreamDefinitionDto . setCorrelationData ( addEventAttributesToDto ( correlationData ) ) ;
}
if ( payloadData ! = null ) {
eventStreamDefinitionDto . setPayloadData ( addEventAttributesToDto ( payloadData ) ) ;
}
if ( correlationData ! = null ) {
eventStreamDefinitionDto . setCorrelationData ( addEventAttributesToDto ( correlationData ) ) ;
}
String streamId = stream . getName ( ) + ":" + stream . getVersion ( ) ;
String streamId = stream . getName ( ) + ":" + stream . getVersion ( ) ;
if ( eventStreamAdminServiceStub . getStreamDefinitionDto ( streamId ) ! = null ) {
if ( eventStreamAdminServiceStub . getStreamDefinitionDto ( streamId ) ! = null ) {
eventStreamAdminServiceStub . editEventStreamDefinitionAsDto ( eventStreamDefinitionDto , streamId ) ;
eventStreamAdminServiceStub . editEventStreamDefinitionAsDto ( eventStreamDefinitionDto , streamId ) ;
@ -344,7 +357,6 @@ public class AnalyticsArtifactsManagementServiceImpl
} finally {
} finally {
cleanup ( eventStreamAdminServiceStub ) ;
cleanup ( eventStreamAdminServiceStub ) ;
}
}
}
}
/ * *
/ * *
@ -360,8 +372,8 @@ public class AnalyticsArtifactsManagementServiceImpl
private void deployReceiver ( Adapter receiver , boolean customMapping ,
private void deployReceiver ( Adapter receiver , boolean customMapping ,
AdapterConfiguration adapterConfiguration )
AdapterConfiguration adapterConfiguration )
throws RemoteException , UserStoreException , JWTClientException {
throws RemoteException , UserStoreException , JWTClientException {
EventReceiverAdminServiceStub eventReceiverAdminServiceStub = DeviceMgtAPIUtils . getEventReceiverAdminServiceStub ( ) ;
EventReceiverAdminServiceStub eventReceiverAdminServiceStub = DeviceMgtAPIUtils
. getEventReceiverAdminServiceStub ( ) ;
try {
try {
String receiverName = receiver . getAdapterName ( ) ;
String receiverName = receiver . getAdapterName ( ) ;
String adapterType = receiver . getAdapterType ( ) . toStringFormatted ( ) ;
String adapterType = receiver . getAdapterType ( ) . toStringFormatted ( ) ;
@ -374,55 +386,95 @@ public class AnalyticsArtifactsManagementServiceImpl
}
}
BasicInputAdapterPropertyDto [ ] basicInputAdapterPropertyDtos = addReceiverConfigToDto ( adapterProperties ) ;
BasicInputAdapterPropertyDto [ ] basicInputAdapterPropertyDtos = addReceiverConfigToDto ( adapterProperties ) ;
AdapterMappingConfiguration adapterMappingConfiguration = adapterConfiguration . getAdapterMappingConfiguration ( ) ;
if ( customMapping ) {
MessageFormat messageFormat = adapterMappingConfiguration . getMessageFormat ( ) ;
AdapterMappingConfiguration adapterMappingConfiguration = adapterConfiguration
if ( ! messageFormat . toString ( ) . equals ( "wso2event" ) ) {
. getAdapterMappingConfiguration ( ) ;
EventMappingPropertyDto [ ] inputMappingPropertyDtos =
MessageFormat messageFormat = adapterMappingConfiguration . getMessageFormat ( ) ;
addReceiverMappingToDto ( adapterMappingConfiguration . getInputMappingProperties ( ) ) ;
if ( ! messageFormat . toString ( ) . equals ( "wso2event" ) ) {
if ( messageFormat . toString ( ) . equals ( "xml" ) ) {
EventMappingPropertyDto [ ] inputMappingPropertyDtos =
EventMappingPropertyDto [ ] namespaceMappingPropertyDtos =
addReceiverMappingToDto ( adapterMappingConfiguration . getInputMappingProperties ( ) ) ;
addReceiverMappingToDto ( adapterMappingConfiguration . getNamespaceMappingProperties ( ) ) ;
if ( messageFormat . toString ( ) . equals ( "xml" ) ) {
eventReceiverAdminServiceStub . deployXmlEventReceiverConfiguration ( receiverName
EventMappingPropertyDto [ ] namespaceMappingPropertyDtos =
, eventStreamWithVersion , adapterType , null
addReceiverMappingToDto ( adapterMappingConfiguration . getNamespaceMappingProperties ( ) ) ;
, namespaceMappingPropertyDtos , inputMappingPropertyDtos
eventReceiverAdminServiceStub . deployXmlEventReceiverConfiguration ( receiverName
, basicInputAdapterPropertyDtos , customMapping ) ;
, eventStreamWithVersion , adapterType , null
} else {
, namespaceMappingPropertyDtos , inputMappingPropertyDtos
if ( messageFormat . toString ( ) . equals ( "map" ) ) {
, basicInputAdapterPropertyDtos , true ) ;
eventReceiverAdminServiceStub . deployMapEventReceiverConfiguration ( receiverName
, eventStreamWithVersion , adapterType , inputMappingPropertyDtos
, basicInputAdapterPropertyDtos , customMapping ) ;
} else if ( messageFormat . toString ( ) . equals ( "text" ) ) {
eventReceiverAdminServiceStub . deployTextEventReceiverConfiguration ( receiverName
, eventStreamWithVersion , adapterType , inputMappingPropertyDtos
, basicInputAdapterPropertyDtos , customMapping ) ;
} else {
} else {
eventReceiverAdminServiceStub . deployJsonEventReceiverConfiguration ( receiverName
if ( messageFormat . toString ( ) . equals ( "map" ) ) {
, eventStreamWithVersion , adapterType , inputMappingPropertyDtos
eventReceiverAdminServiceStub . deployMapEventReceiverConfiguration ( receiverName
, basicInputAdapterPropertyDtos , customMapping ) ;
, eventStreamWithVersion , adapterType , inputMappingPropertyDtos
, basicInputAdapterPropertyDtos , true ) ;
} else if ( messageFormat . toString ( ) . equals ( "text" ) ) {
eventReceiverAdminServiceStub . deployTextEventReceiverConfiguration ( receiverName
, eventStreamWithVersion , adapterType , inputMappingPropertyDtos
, basicInputAdapterPropertyDtos , true ) ;
} else {
eventReceiverAdminServiceStub . deployJsonEventReceiverConfiguration ( receiverName
, eventStreamWithVersion , adapterType , inputMappingPropertyDtos
, basicInputAdapterPropertyDtos , true ) ;
}
}
}
} else {
EventMappingPropertyDto [ ] correlationMappingPropertyDtos = addReceiverMappingToDto (
adapterMappingConfiguration . getCorrelationMappingProperties ( )
) ;
EventMappingPropertyDto [ ] metaMappingPropertyDtos = addReceiverMappingToDto (
adapterMappingConfiguration . getInputMappingProperties ( )
) ;
EventMappingPropertyDto [ ] payloadMappingPropertyDtos = addReceiverMappingToDto (
adapterMappingConfiguration . getPayloadMappingProperties ( )
) ;
eventReceiverAdminServiceStub . deployWso2EventReceiverConfiguration ( receiverName
, eventStreamWithVersion , adapterType , metaMappingPropertyDtos
, correlationMappingPropertyDtos , payloadMappingPropertyDtos
, basicInputAdapterPropertyDtos , true
, eventStreamWithVersion ) ;
}
}
} else {
} else {
EventMappingPropertyDto [ ] correlationMappingPropertyDtos = addReceiverMappingToDto (
deployReceiverWithoutMapping ( receiverName , eventStreamWithVersion , adapterType ,
adapterMappingConfiguration . getCorrelationMappingProperties ( )
eventReceiverAdminServiceStub , basicInputAdapterPropertyDtos ) ;
) ;
EventMappingPropertyDto [ ] metaMappingPropertyDtos = addReceiverMappingToDto (
adapterMappingConfiguration . getInputMappingProperties ( )
) ;
EventMappingPropertyDto [ ] payloadMappingPropertyDtos = addReceiverMappingToDto (
adapterMappingConfiguration . getPayloadMappingProperties ( )
) ;
eventReceiverAdminServiceStub . deployWso2EventReceiverConfiguration ( receiverName
, eventStreamWithVersion , adapterType , metaMappingPropertyDtos
, correlationMappingPropertyDtos , payloadMappingPropertyDtos
, basicInputAdapterPropertyDtos , customMapping
, eventStreamWithVersion ) ;
}
}
} finally {
} finally {
cleanup ( eventReceiverAdminServiceStub ) ;
cleanup ( eventReceiverAdminServiceStub ) ;
}
}
}
}
/ * *
* To deploy receiver if custom mapping is false
*
* @param receiverName Name of the receiver
* @param eventStreamWithVersion Attached event stream of the receiver
* @param adapterType Adapter type name
* @param eventReceiverAdminServiceStub Stub to deploy receiver
* @param basicInputAdapterPropertyDtos DTO to attach receiver data
* @throws RemoteException Exception that may occur during a remote method call
* /
private void deployReceiverWithoutMapping ( String receiverName , String eventStreamWithVersion
, String adapterType , EventReceiverAdminServiceStub eventReceiverAdminServiceStub
, BasicInputAdapterPropertyDto [ ] basicInputAdapterPropertyDtos )
throws RemoteException {
switch ( adapterType ) {
case "iot-event" :
case "wso2event" :
eventReceiverAdminServiceStub . deployWso2EventReceiverConfiguration ( receiverName
, eventStreamWithVersion , adapterType , null , null
, null , basicInputAdapterPropertyDtos , false
, eventStreamWithVersion ) ;
break ;
case "soap" :
eventReceiverAdminServiceStub . deployXmlEventReceiverConfiguration ( receiverName
, eventStreamWithVersion , adapterType , null , null
, null , basicInputAdapterPropertyDtos , false ) ;
break ;
default :
eventReceiverAdminServiceStub . deployTextEventReceiverConfiguration ( receiverName
, eventStreamWithVersion , adapterType , null
, basicInputAdapterPropertyDtos , false ) ;
}
}
/ * *
/ * *
* Set data to a publisher dto and deploy dto through a stub
* Set data to a publisher dto and deploy dto through a stub
*
*
@ -436,7 +488,8 @@ public class AnalyticsArtifactsManagementServiceImpl
private void deployPublisher ( Adapter publisher , boolean customMapping ,
private void deployPublisher ( Adapter publisher , boolean customMapping ,
AdapterConfiguration adapterConfiguration )
AdapterConfiguration adapterConfiguration )
throws RemoteException , UserStoreException , JWTClientException {
throws RemoteException , UserStoreException , JWTClientException {
EventPublisherAdminServiceStub eventPublisherAdminServiceStub = DeviceMgtAPIUtils . getEventPublisherAdminServiceStub ( ) ;
EventPublisherAdminServiceStub eventPublisherAdminServiceStub = DeviceMgtAPIUtils
. getEventPublisherAdminServiceStub ( ) ;
try {
try {
String publisherName = publisher . getAdapterName ( ) ;
String publisherName = publisher . getAdapterName ( ) ;
String adapterType = publisher . getAdapterType ( ) . toStringFormatted ( ) ;
String adapterType = publisher . getAdapterType ( ) . toStringFormatted ( ) ;
@ -451,53 +504,65 @@ public class AnalyticsArtifactsManagementServiceImpl
BasicOutputAdapterPropertyDto [ ] basicOutputAdapterPropertyDtos =
BasicOutputAdapterPropertyDto [ ] basicOutputAdapterPropertyDtos =
addPublisherConfigToDto ( adapterProperties ) ;
addPublisherConfigToDto ( adapterProperties ) ;
AdapterMappingConfiguration adapterMappingConfiguration = adapterConfiguration . getAdapterMappingConfiguration ( ) ;
if ( customMapping ) {
MessageFormat messageFormat = adapterMappingConfiguration . getMessageFormat ( ) ;
AdapterMappingConfiguration adapterMappingConfiguration = adapterConfiguration
if ( ! messageFormat . toString ( ) . equals ( "wso2event" ) ) {
. getAdapterMappingConfiguration ( ) ;
if ( ! messageFormat . toString ( ) . equals ( "map" ) ) {
MessageFormat messageFormat = adapterMappingConfiguration . getMessageFormat ( ) ;
if ( messageFormat . toString ( ) . equals ( "xml" ) ) {
if ( ! messageFormat . toString ( ) . equals ( "wso2event" ) ) {
eventPublisherAdminServiceStub . deployXmlEventPublisherConfiguration ( publisherName
if ( ! messageFormat . toString ( ) . equals ( "map" ) ) {
, eventStreamWithVersion , adapterType , adapterMappingConfiguration . getInputMappingString ( )
if ( messageFormat . toString ( ) . equals ( "xml" ) ) {
, basicOutputAdapterPropertyDtos , eventStreamWithVersion
eventPublisherAdminServiceStub . deployXmlEventPublisherConfiguration (
, customMapping ) ;
publisherName , eventStreamWithVersion , adapterType
} else if ( messageFormat . toString ( ) . equals ( "text" ) ) {
, adapterMappingConfiguration . getInputMappingString ( )
eventPublisherAdminServiceStub . deployTextEventPublisherConfiguration ( publisherName
, basicOutputAdapterPropertyDtos , eventStreamWithVersion
, eventStreamWithVersion , adapterType , adapterMappingConfiguration . getInputMappingString ( )
, true ) ;
, basicOutputAdapterPropertyDtos , eventStreamWithVersion
} else if ( messageFormat . toString ( ) . equals ( "text" ) ) {
, customMapping ) ;
eventPublisherAdminServiceStub . deployTextEventPublisherConfiguration (
publisherName , eventStreamWithVersion , adapterType
, adapterMappingConfiguration . getInputMappingString ( )
, basicOutputAdapterPropertyDtos , eventStreamWithVersion
, true ) ;
} else {
eventPublisherAdminServiceStub . deployJsonEventPublisherConfiguration (
publisherName , eventStreamWithVersion , adapterType
, adapterMappingConfiguration . getInputMappingString ( )
, basicOutputAdapterPropertyDtos , eventStreamWithVersion
, true ) ;
}
} else {
} else {
eventPublisherAdminServiceStub . deployJsonEventPublisherConfiguration ( publisherName
org . wso2 . carbon . event . publisher . stub . types . EventMappingPropertyDto [ ] inputMappingPropertyDtos =
, eventStreamWithVersion , adapterType , adapterMappingConfiguration . getInputMappingString ( )
addPublisherMappingToDto (
, basicOutputAdapterPropertyDtos , eventStreamWithVersion
adapterMappingConfiguration . getInputMappingProperties ( )
, customMapping ) ;
) ;
eventPublisherAdminServiceStub . deployMapEventPublisherConfiguration ( publisherName
, eventStreamWithVersion , adapterType , inputMappingPropertyDtos
, basicOutputAdapterPropertyDtos , true ) ;
}
}
} else {
} else {
org . wso2 . carbon . event . publisher . stub . types . EventMappingPropertyDto [ ] inputMappingPropertyDtos =
org . wso2 . carbon . event . publisher . stub . types . EventMappingPropertyDto [ ]
addPublisherMappingToDto (
correlationMappingPropertyDtos = addPublisherMappingToDto
adapterMappingConfiguration . getInputMappingProperties ( )
(
adapterMappingConfiguration . getCorrelationMappingProperties ( )
) ;
org . wso2 . carbon . event . publisher . stub . types . EventMappingPropertyDto [ ]
metaMappingPropertyDtos = addPublisherMappingToDto
(
adapterMappingConfiguration . getMetaMappingProperties ( )
) ;
) ;
eventPublisherAdminServiceStub . deployMapEventPublisherConfiguration ( publisherName
org . wso2 . carbon . event . publisher . stub . types . EventMappingPropertyDto [ ]
, eventStreamWithVersion , adapterType , inputMappingPropertyDtos
payloadMappingPropertyDtos = addPublisherMappingToDto
, basicOutputAdapterPropertyDtos , customMapping ) ;
(
adapterMappingConfiguration . getPayloadMappingProperties ( )
) ;
eventPublisherAdminServiceStub . deployWSO2EventPublisherConfiguration (
publisherName , eventStreamWithVersion , adapterType , metaMappingPropertyDtos
, correlationMappingPropertyDtos , payloadMappingPropertyDtos
, basicOutputAdapterPropertyDtos , true
, eventStreamWithVersion ) ;
}
}
} else {
} else {
org . wso2 . carbon . event . publisher . stub . types . EventMappingPropertyDto [ ] correlationMappingPropertyDtos =
deployPublisherWithoutMapping ( publisherName , eventStreamWithVersion , adapterType
addPublisherMappingToDto (
, eventPublisherAdminServiceStub , basicOutputAdapterPropertyDtos ) ;
adapterMappingConfiguration . getCorrelationMappingProperties ( )
) ;
org . wso2 . carbon . event . publisher . stub . types . EventMappingPropertyDto [ ] metaMappingPropertyDtos =
addPublisherMappingToDto (
adapterMappingConfiguration . getMetaMappingProperties ( )
) ;
org . wso2 . carbon . event . publisher . stub . types . EventMappingPropertyDto [ ] payloadMappingPropertyDtos =
addPublisherMappingToDto (
adapterMappingConfiguration . getPayloadMappingProperties ( )
) ;
eventPublisherAdminServiceStub . deployWSO2EventPublisherConfiguration ( publisherName
, eventStreamWithVersion , adapterType , metaMappingPropertyDtos
, correlationMappingPropertyDtos , payloadMappingPropertyDtos
, basicOutputAdapterPropertyDtos , customMapping
, eventStreamWithVersion ) ;
}
}
} finally {
} finally {
@ -505,6 +570,49 @@ public class AnalyticsArtifactsManagementServiceImpl
}
}
}
}
/ * *
* To deploy publisher if custom mapping is false
*
* @param publisherName Name of the publisher
* @param eventStreamWithVersion Attached event stream of the publisher
* @param adapterType Adapter type name
* @param eventPublisherAdminServiceStub Stub to deploy publisher
* @param basicOutputAdapterPropertyDtos DTO to attach publisher data
* @throws RemoteException Exception that may occur during a remote method call
* /
private void deployPublisherWithoutMapping ( String publisherName , String eventStreamWithVersion
, String adapterType , EventPublisherAdminServiceStub eventPublisherAdminServiceStub
, BasicOutputAdapterPropertyDto [ ] basicOutputAdapterPropertyDtos )
throws RemoteException {
switch ( adapterType ) {
case "wso2event" :
case "ui" :
case "secured-websocket" :
eventPublisherAdminServiceStub . deployWSO2EventPublisherConfiguration ( publisherName
, eventStreamWithVersion , adapterType , null
, null , null , basicOutputAdapterPropertyDtos
, false , eventStreamWithVersion ) ;
break ;
case "soap" :
eventPublisherAdminServiceStub . deployXmlEventPublisherConfiguration ( publisherName
, eventStreamWithVersion , adapterType , null
, basicOutputAdapterPropertyDtos , eventStreamWithVersion
, false ) ;
break ;
case "cassandra" :
case "rdbms" :
eventPublisherAdminServiceStub . deployMapEventPublisherConfiguration ( publisherName
, eventStreamWithVersion , adapterType , null
, basicOutputAdapterPropertyDtos , false ) ;
break ;
default :
eventPublisherAdminServiceStub . deployTextEventPublisherConfiguration ( publisherName
, eventStreamWithVersion , adapterType , null
, basicOutputAdapterPropertyDtos , eventStreamWithVersion
, false ) ;
}
}
/ * *
/ * *
* Publish a siddhi execution plan using a stub
* Publish a siddhi execution plan using a stub
*
*
@ -516,9 +624,9 @@ public class AnalyticsArtifactsManagementServiceImpl
* @throws JWTClientException Exception that may occur during connecting to client store
* @throws JWTClientException Exception that may occur during connecting to client store
* @throws InvalidExecutionPlanException Exception that may occur if execution plan validation fails
* @throws InvalidExecutionPlanException Exception that may occur if execution plan validation fails
* /
* /
private void publish SiddhiExecutionPlan( String name , boolean isEdited ,
private void deploy SiddhiExecutionPlan( String name , boolean isEdited , String plan )
String plan )
throws RemoteException , UserStoreException , JWTClientException ,
throws RemoteException , UserStoreException , JWTClient Exception {
InvalidExecutionPlan Exception {
EventProcessorAdminServiceStub eventProcessorAdminServiceStub = null ;
EventProcessorAdminServiceStub eventProcessorAdminServiceStub = null ;
try {
try {
eventProcessorAdminServiceStub = DeviceMgtAPIUtils . getEventProcessorAdminServiceStub ( ) ;
eventProcessorAdminServiceStub = DeviceMgtAPIUtils . getEventProcessorAdminServiceStub ( ) ;
@ -530,10 +638,7 @@ public class AnalyticsArtifactsManagementServiceImpl
eventProcessorAdminServiceStub . editActiveExecutionPlan ( plan , name ) ;
eventProcessorAdminServiceStub . editActiveExecutionPlan ( plan , name ) ;
}
}
} else {
} else {
ErrorDTO errorDTO = new ErrorDTO ( ) ;
throw new InvalidExecutionPlanException ( validationResponse ) ;
errorDTO . setMessage ( validationResponse ) ;
log . error ( validationResponse ) ;
throw new InvalidExecutionPlanException ( errorDTO ) ;
}
}
} finally {
} finally {
cleanup ( eventProcessorAdminServiceStub ) ;
cleanup ( eventProcessorAdminServiceStub ) ;
@ -541,20 +646,19 @@ public class AnalyticsArtifactsManagementServiceImpl
}
}
/ * *
/ * *
* This will set payload of event attribute ' s mapping to the DTO
* Validate stream properties
*
*
* @param attributes list of event attributes
* @param stream EventStream object
* @return DTO with all the event attributes
* /
* /
private EventStreamAttributeDto [ ] addEventAttributesToDto ( List < Attribute > attributes ) {
private void validateStreamProperties ( EventStream stream ) throws BadRequestException {
EventStreamAttributeDto [ ] eventStreamAttributeDtos = new EventStreamAttributeDto [ attributes . size ( ) ] ;
if ( ( stream . getMetaData ( ) = = null | | stream . getMetaData ( ) . isEmpty ( ) ) & &
for ( Attribute attribute : attributes ) {
( stream . getCorrelationData ( ) = = null | | stream . getCorrelationData ( ) . isEmpty ( ) ) & &
EventStreamAttributeDto eventStreamAttributeDto = new EventStreamAttributeDto ( ) ;
( stream . getPayloadData ( ) = = null | | stream . getPayloadData ( ) . isEmpty ( ) ) ) {
eventStreamAttributeDto . setAttributeName ( attribute . getName ( ) ) ;
String errMsg = String . format ( "Failed to validate Stream property attributes of %s:%s. " +
eventStreamAttributeDto . setAttributeType ( attribute . getType ( ) . toString ( ) ) ;
"Stream mapping can't be null or empty" ,
stream . getName ( ) , stream . getVersion ( ) ) ;
throw new BadRequestException ( errMsg ) ;
}
}
return eventStreamAttributeDtos ;
}
}
/ * *
/ * *
@ -562,13 +666,11 @@ public class AnalyticsArtifactsManagementServiceImpl
*
*
* @param adapterProperties Adapter payload attributes
* @param adapterProperties Adapter payload attributes
* /
* /
private void validateAdapterProperties ( List < AdapterProperty > adapterProperties ) {
private void validateAdapterProperties ( List < AdapterProperty > adapterProperties )
if ( adapterProperties . isEmpty ( ) ) {
throws BadRequestException {
String errMsg = "Invalid payload: event property attributes invalid!!!" ;
if ( adapterProperties = = null ) {
ErrorResponse errorResponse = new ErrorResponse ( ) ;
String errMsg = "Failed to validate adapter attributes. Adapter attributes can't be null" ;
errorResponse . setMessage ( errMsg ) ;
throw new BadRequestException ( errMsg ) ;
log . error ( errMsg ) ;
throw new BadRequestException ( errorResponse ) ;
}
}
}
}
@ -583,23 +685,49 @@ public class AnalyticsArtifactsManagementServiceImpl
*
*
* @param adapterMappingConfiguration Adapter mapping attributes
* @param adapterMappingConfiguration Adapter mapping attributes
* /
* /
private void validateAdapterMapping ( AdapterMappingConfiguration adapterMappingConfiguration ) {
private void validateAdapterMapping ( AdapterMappingConfiguration adapterMappingConfiguration )
throws BadRequestException {
if ( adapterMappingConfiguration . getInputMappingString ( ) = = null & &
if ( adapterMappingConfiguration = = null ) {
( adapterMappingConfiguration . getInputMappingProperties ( ) . isEmpty ( ) & &
String errMsg = "Failed to validate adapter mapping attributes. " +
adapterMappingConfiguration . getNamespaceMappingProperties ( ) . isEmpty ( ) ) & &
"Adapter mapping configuration can't be null" ;
(
throw new BadRequestException ( errMsg ) ;
adapterMappingConfiguration . getCorrelationMappingProperties ( ) . isEmpty ( ) & &
} else if ( adapterMappingConfiguration . getMessageFormat ( ) = = null | |
adapterMappingConfiguration . getPayloadMappingProperties ( ) . isEmpty ( ) & &
( ( adapterMappingConfiguration . getInputMappingString ( ) = = null )
adapterMappingConfiguration . getMetaMappingProperties ( ) . isEmpty ( )
& & ( adapterMappingConfiguration . getInputMappingProperties ( ) = = null | |
)
adapterMappingConfiguration . getInputMappingProperties ( ) . isEmpty ( ) )
| | adapterMappingConfiguration . getMessageFormat ( ) = = null ) {
& & ( adapterMappingConfiguration . getNamespaceMappingProperties ( ) = = null | |
String errMsg = "Invalid payload: event mapping attributes invalid!!!" ;
adapterMappingConfiguration . getNamespaceMappingProperties ( ) . isEmpty ( ) ) )
ErrorResponse errorResponse = new ErrorResponse ( ) ;
& &
errorResponse . setMessage ( errMsg ) ;
( ( adapterMappingConfiguration . getCorrelationMappingProperties ( ) = = null | |
log . error ( errMsg ) ;
adapterMappingConfiguration . getCorrelationMappingProperties ( ) . isEmpty ( ) )
throw new BadRequestException ( errorResponse ) ;
& & ( adapterMappingConfiguration . getPayloadMappingProperties ( ) = = null | |
adapterMappingConfiguration . getPayloadMappingProperties ( ) . isEmpty ( ) )
& & ( adapterMappingConfiguration . getMetaMappingProperties ( ) = = null | |
adapterMappingConfiguration . getMetaMappingProperties ( ) . isEmpty ( ) ) )
) {
String errMsg = "Failed to validate adapter mapping attributes. " +
"Adapter mapping configuration invalid" ;
ErrorDTO errorDTO = new ErrorDTO ( ) ;
errorDTO . setMessage ( errMsg ) ;
throw new BadRequestException ( errorDTO ) ;
}
}
/ * *
* This will set payload of event attribute ' s mapping to the DTO
*
* @param attributes list of event attributes
* @return DTO with all the event attributes
* /
private EventStreamAttributeDto [ ] addEventAttributesToDto ( List < Attribute > attributes ) {
EventStreamAttributeDto [ ] eventStreamAttributeDtos = new EventStreamAttributeDto [ attributes . size ( ) ] ;
for ( int i = 0 ; i < attributes . size ( ) ; i + + ) {
EventStreamAttributeDto eventStreamAttributeDto = new EventStreamAttributeDto ( ) ;
eventStreamAttributeDto . setAttributeName ( attributes . get ( i ) . getName ( ) ) ;
eventStreamAttributeDto . setAttributeType ( attributes . get ( i ) . getType ( ) . toString ( ) ) ;
eventStreamAttributeDtos [ i ] = eventStreamAttributeDto ;
}
}
return eventStreamAttributeDtos ;
}
}
/ * *
/ * *