@ -46,36 +46,52 @@ public class MQTTNotificationStrategy implements NotificationStrategy {
private String mqttAdapterName ;
private static final Log log = LogFactory . getLog ( MQTTNotificationStrategy . class ) ;
private final PushNotificationConfig config ;
private final String providerTenantDomain ;
private static final Object lockObj = new Object ( ) ;
public MQTTNotificationStrategy ( PushNotificationConfig config ) {
this . config = config ;
OutputEventAdapterConfiguration adapterConfig = new OutputEventAdapterConfiguration ( ) ;
adapterConfig . setType ( MQTTAdapterConstants . MQTT_ADAPTER_TYPE ) ;
mqttAdapterName = config . getProperty ( MQTTAdapterConstants . MQTT_ADAPTER_PROPERTY_NAME ) ;
adapterConfig . setName ( mqttAdapterName ) ;
adapterConfig . setMessageFormat ( MessageType . TEXT ) ;
Map < String , String > configProperties = new HashMap < String , String > ( ) ;
String brokerUrl = config . getProperty ( MQTTAdapterConstants . MQTT_ADAPTER_PROPERTY_BROKER_URL ) ;
if ( brokerUrl ! = null & & ! brokerUrl . isEmpty ( ) ) {
configProperties . put ( MQTTAdapterConstants . MQTT_ADAPTER_PROPERTY_BROKER_URL , brokerUrl ) ;
if ( config . getProperties ( ) ! = null & & config . getProperties ( ) . size ( ) > 0 ) {
String brokerUrl = config . getProperty ( MQTTAdapterConstants . MQTT_ADAPTER_PROPERTY_BROKER_URL ) ;
if ( brokerUrl ! = null & & ! brokerUrl . isEmpty ( ) ) {
configProperties . put ( MQTTAdapterConstants . MQTT_ADAPTER_PROPERTY_BROKER_URL , brokerUrl ) ;
}
mqttAdapterName = config . getProperty ( MQTTAdapterConstants . MQTT_ADAPTER_PROPERTY_NAME ) ;
configProperties . put ( MQTTAdapterConstants . MQTT_ADAPTER_PROPERTY_USERNAME ,
config . getProperty ( MQTTAdapterConstants . MQTT_ADAPTER_PROPERTY_USERNAME ) ) ;
configProperties . put ( MQTTAdapterConstants . MQTT_ADAPTER_PROPERTY_PASSWORD ,
config . getProperty ( MQTTAdapterConstants . MQTT_ADAPTER_PROPERTY_PASSWORD ) ) ;
configProperties . put ( MQTTAdapterConstants . MQTT_ADAPTER_PROPERTY_CLEAR_SESSION ,
config . getProperty ( MQTTAdapterConstants . MQTT_ADAPTER_PROPERTY_CLEAR_SESSION ) ) ;
configProperties . put ( MQTTAdapterConstants . MQTT_ADAPTER_PROPERTY_SCOPES ,
config . getProperty ( MQTTAdapterConstants . MQTT_ADAPTER_PROPERTY_SCOPES ) ) ;
configProperties . put ( MQTTAdapterConstants . MQTT_ADAPTER_PROPERTY_MESSAGE_QOS ,
config . getProperty ( MQTTAdapterConstants . MQTT_ADAPTER_PROPERTY_MESSAGE_QOS ) ) ;
} else {
mqttAdapterName = "mqtt.adapter." + PrivilegedCarbonContext . getThreadLocalCarbonContext ( ) . getTenantDomain ( )
. toLowerCase ( ) ;
}
configProperties . put ( MQTTAdapterConstants . MQTT_ADAPTER_PROPERTY_USERNAME ,
config . getProperty ( MQTTAdapterConstants . MQTT_ADAPTER_PROPERTY_USERNAME ) ) ;
configProperties . put ( MQTTAdapterConstants . MQTT_ADAPTER_PROPERTY_PASSWORD ,
config . getProperty ( MQTTAdapterConstants . MQTT_ADAPTER_PROPERTY_PASSWORD ) ) ;
configProperties . put ( MQTTAdapterConstants . MQTT_ADAPTER_PROPERTY_CLEAR_SESSION ,
config . getProperty ( MQTTAdapterConstants . MQTT_ADAPTER_PROPERTY_CLEAR_SESSION ) ) ;
configProperties . put ( MQTTAdapterConstants . MQTT_ADAPTER_PROPERTY_SCOPES ,
config . getProperty ( MQTTAdapterConstants . MQTT_ADAPTER_PROPERTY_SCOPES ) ) ;
configProperties . put ( MQTTAdapterConstants . MQTT_ADAPTER_PROPERTY_MESSAGE_QOS ,
config . getProperty ( MQTTAdapterConstants . MQTT_ADAPTER_PROPERTY_MESSAGE_QOS ) ) ;
adapterConfig . setName ( mqttAdapterName ) ;
adapterConfig . setStaticProperties ( configProperties ) ;
try {
MQTTDataHolder . getInstance ( ) . getOutputEventAdapterService ( ) . create ( adapterConfig ) ;
synchronized ( lockObj ) {
try {
MQTTDataHolder . getInstance ( ) . getOutputEventAdapterService ( ) . isPolled ( mqttAdapterName ) ;
} catch ( OutputEventAdapterException e ) {
//event adapter not created
MQTTDataHolder . getInstance ( ) . getOutputEventAdapterService ( ) . create ( adapterConfig ) ;
}
}
} catch ( OutputEventAdapterException e ) {
throw new InvalidConfigurationException ( "Error occurred while initializing MQTT output event adapter" , e ) ;
}
providerTenantDomain = PrivilegedCarbonContext . getThreadLocalCarbonContext ( ) . getTenantDomain ( )
. toLowerCase ( ) ;
}
@Override
@ -85,19 +101,47 @@ public class MQTTNotificationStrategy implements NotificationStrategy {
@Override
public void execute ( NotificationContext ctx ) throws PushNotificationExecutionFailedException {
String adapterName = mqttAdapterName ;
String tenantDomain = PrivilegedCarbonContext . getThreadLocalCarbonContext ( ) . getTenantDomain ( true ) ;
if ( ! providerTenantDomain . equals ( tenantDomain ) ) {
//this is to handle the device type shared with all tenant mode.
adapterName = "mqtt.adapter." + PrivilegedCarbonContext . getThreadLocalCarbonContext ( ) . getTenantDomain ( )
. toLowerCase ( ) ;
try {
MQTTDataHolder . getInstance ( ) . getOutputEventAdapterService ( ) . isPolled ( adapterName ) ;
} catch ( OutputEventAdapterException e ) {
//event adapter not created
synchronized ( lockObj ) {
OutputEventAdapterConfiguration adapterConfig = new OutputEventAdapterConfiguration ( ) ;
adapterConfig . setType ( MQTTAdapterConstants . MQTT_ADAPTER_TYPE ) ;
adapterConfig . setMessageFormat ( MessageType . TEXT ) ;
adapterConfig . setName ( adapterName ) ;
Map < String , String > configProperties = new HashMap < String , String > ( ) ;
adapterConfig . setStaticProperties ( configProperties ) ;
try {
MQTTDataHolder . getInstance ( ) . getOutputEventAdapterService ( ) . create ( adapterConfig ) ;
} catch ( OutputEventAdapterException e1 ) {
throw new PushNotificationExecutionFailedException
( "Error occurred while initializing MQTT output event adapter for shared tenant: "
+ tenantDomain , e ) ;
}
}
}
}
Operation operation = ctx . getOperation ( ) ;
Properties properties = operation . getProperties ( ) ;
if ( properties ! = null & & properties . get ( MQTT_ADAPTER_TOPIC ) ! = null ) {
Map < String , String > dynamicProperties = new HashMap < > ( ) ;
dynamicProperties . put ( "topic" , ( String ) properties . get ( MQTT_ADAPTER_TOPIC ) ) ;
MQTTDataHolder . getInstance ( ) . getOutputEventAdapterService ( ) . publish ( mqttAdapterName , dynamicProperties ,
MQTTDataHolder . getInstance ( ) . getOutputEventAdapterService ( ) . publish ( a dapterName, dynamicProperties ,
operation . getPayLoad ( ) ) ;
} else {
if ( PolicyOperation . POLICY_OPERATION_CODE . equals ( operation . getCode ( ) ) ) {
PolicyOperation policyOperation = ( PolicyOperation ) operation ;
List < ProfileOperation > profileOperations = policyOperation . getProfileOperations ( ) ;
String tenantDomain = PrivilegedCarbonContext . getThreadLocalCarbonContext ( ) . getTenantDomain ( true ) ;
String deviceType = ctx . getDeviceId ( ) . getType ( ) ;
String deviceId = ctx . getDeviceId ( ) . getId ( ) ;
for ( ProfileOperation profileOperation : profileOperations ) {
@ -106,7 +150,7 @@ public class MQTTNotificationStrategy implements NotificationStrategy {
+ deviceType + "/" + deviceId + "/" + profileOperation . getType ( )
. toString ( ) . toLowerCase ( ) + "/" + profileOperation . getCode ( ) . toLowerCase ( ) ;
dynamicProperties . put ( "topic" , topic ) ;
MQTTDataHolder . getInstance ( ) . getOutputEventAdapterService ( ) . publish ( mqttA dapterName, dynamicProperties ,
MQTTDataHolder . getInstance ( ) . getOutputEventAdapterService ( ) . publish ( a dapterName, dynamicProperties ,
profileOperation . getPayLoad ( ) ) ;
}
@ -119,7 +163,7 @@ public class MQTTNotificationStrategy implements NotificationStrategy {
if ( operation . getPayLoad ( ) = = null ) {
operation . setPayLoad ( "" ) ;
}
MQTTDataHolder . getInstance ( ) . getOutputEventAdapterService ( ) . publish ( mqttA dapterName, dynamicProperties ,
MQTTDataHolder . getInstance ( ) . getOutputEventAdapterService ( ) . publish ( a dapterName, dynamicProperties ,
operation . getPayLoad ( ) ) ;
}