@ -34,6 +34,13 @@ import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.json.simple.JSONObject ;
import org.json.simple.parser.JSONParser ;
import org.json.simple.parser.ParseException ;
import org.wso2.carbon.apimgt.keymgt.extension.DCRResponse ;
import org.wso2.carbon.apimgt.keymgt.extension.TokenRequest ;
import org.wso2.carbon.apimgt.keymgt.extension.TokenResponse ;
import org.wso2.carbon.apimgt.keymgt.extension.exception.BadRequestException ;
import org.wso2.carbon.apimgt.keymgt.extension.exception.KeyMgtException ;
import org.wso2.carbon.apimgt.keymgt.extension.service.KeyMgtService ;
import org.wso2.carbon.apimgt.keymgt.extension.service.KeyMgtServiceImpl ;
import org.wso2.carbon.context.PrivilegedCarbonContext ;
import org.wso2.carbon.device.mgt.output.adapter.mqtt.internal.OutputAdapterServiceDataHolder ;
import org.wso2.carbon.event.output.adapter.core.exception.ConnectionUnavailableException ;
@ -62,9 +69,12 @@ public class MQTTAdapterPublisher {
String clientId ;
int tenantId ;
private String tenantDomain ;
public MQTTAdapterPublisher ( MQTTBrokerConnectionConfiguration mqttBrokerConnectionConfiguration , String clientId
, int tenantId ) {
this . tenantId = tenantId ;
this . tenantDomain = PrivilegedCarbonContext . getThreadLocalCarbonContext ( ) . getTenantDomain ( ) ;
if ( clientId = = null | | clientId . trim ( ) . isEmpty ( ) ) {
this . clientId = MqttClient . generateClientId ( ) ;
}
@ -85,8 +95,9 @@ public class MQTTAdapterPublisher {
connectionOptions . setCleanSession ( cleanSession ) ;
connectionOptions . setKeepAliveInterval ( keepAlive ) ;
if ( mqttBrokerConnectionConfiguration . getUsername ( ) ! = null ) {
connectionOptions . setUserName ( getToken ( ) ) ;
connectionOptions . setPassword ( MQTTEventAdapterConstants . DEFAULT_PASSWORD . toCharArray ( ) ) ;
String accessToken = getToken ( ) ;
connectionOptions . setUserName ( accessToken . substring ( 0 , 18 ) ) ;
connectionOptions . setPassword ( accessToken . substring ( 19 ) . toCharArray ( ) ) ;
}
// Construct an MQTT blocking mode client
mqttClient = new MqttClient ( mqttBrokerConnectionConfiguration . getBrokerUrl ( ) , clientId , dataStore ) ;
@ -151,53 +162,22 @@ public class MQTTAdapterPublisher {
if ( dcrUrlString ! = null & & ! dcrUrlString . isEmpty ( ) ) {
try {
URL dcrUrl = new URL ( dcrUrlString ) ;
HttpClient httpClient = MQTTUtil . getHttpClient ( dcrUrl . getProtocol ( ) ) ;
HttpPost postMethod = new HttpPost ( dcrUrlString ) ;
RegistrationProfile registrationProfile = new RegistrationProfile ( ) ;
registrationProfile . setCallbackUrl ( MQTTEventAdapterConstants . EMPTY_STRING ) ;
registrationProfile . setGrantType ( MQTTEventAdapterConstants . GRANT_TYPE ) ;
registrationProfile . setOwner ( username ) ;
registrationProfile . setTokenScope ( MQTTEventAdapterConstants . TOKEN_SCOPE ) ;
if ( ! mqttBrokerConnectionConfiguration . isGlobalCredentailSet ( ) ) {
registrationProfile . setClientName ( MQTTEventAdapterConstants . APPLICATION_NAME_PREFIX
+ mqttBrokerConnectionConfiguration . getAdapterName ( ) +
"_" + tenantId ) ;
registrationProfile . setIsSaasApp ( false ) ;
} else {
registrationProfile . setClientName ( MQTTEventAdapterConstants . APPLICATION_NAME_PREFIX
+ mqttBrokerConnectionConfiguration . getAdapterName ( ) ) ;
registrationProfile . setIsSaasApp ( true ) ;
}
String jsonString = registrationProfile . toJSON ( ) ;
StringEntity requestEntity = new StringEntity ( jsonString , ContentType . APPLICATION_JSON ) ;
postMethod . setEntity ( requestEntity ) ;
String basicAuth = getBase64Encode ( username , password ) ;
postMethod . setHeader ( new BasicHeader ( MQTTEventAdapterConstants . AUTHORIZATION_HEADER_NAME ,
MQTTEventAdapterConstants . AUTHORIZATION_HEADER_VALUE_PREFIX +
basicAuth ) ) ;
HttpResponse httpResponse = httpClient . execute ( postMethod ) ;
if ( httpResponse ! = null ) {
String response = MQTTUtil . getResponseString ( httpResponse ) ;
try {
if ( response ! = null ) {
JSONParser jsonParser = new JSONParser ( ) ;
JSONObject jsonPayload = ( JSONObject ) jsonParser . parse ( response ) ;
String clientId = ( String ) jsonPayload . get ( MQTTEventAdapterConstants . CLIENT_ID ) ;
String clientSecret = ( String ) jsonPayload . get ( MQTTEventAdapterConstants . CLIENT_SECRET ) ;
return getToken ( clientId , clientSecret ) ;
}
} catch ( ParseException e ) {
String msg = "error occurred while parsing generating token for the adapter" ;
log . error ( msg , e ) ;
}
}
} catch ( MalformedURLException e ) {
throw new OutputEventAdapterRuntimeException ( "Invalid dcrUrl : " + dcrUrlString ) ;
} catch ( KeyManagementException | NoSuchAlgorithmException | KeyStoreException | IOException e ) {
throw new OutputEventAdapterRuntimeException ( "Failed to create an https connection." , e ) ;
KeyMgtService keyMgtService = new KeyMgtServiceImpl ( ) ;
String applicationName = MQTTEventAdapterConstants . APPLICATION_NAME_PREFIX
+ mqttBrokerConnectionConfiguration . getAdapterName ( ) ;
DCRResponse dcrResponse = keyMgtService . dynamicClientRegistration ( applicationName , username ,
"client_credentials" , null , new String [ ] { "device_management" } , false , Integer . MAX_VALUE ) ;
return getToken ( dcrResponse . getClientId ( ) , dcrResponse . getClientSecret ( ) ) ;
// connectionOptions.setUserName(accessToken.substring(0, 18));
// connectionOptions.setPassword(accessToken.substring(19).toCharArray());
} catch ( JWTClientException | UserStoreException e ) {
log . error ( "Failed to create an oauth token with jwt grant type." , e ) ;
log . error ( "Failed to create an oauth token with client_credentials grant type." , e ) ;
throw new OutputEventAdapterRuntimeException ( "Failed to create an oauth token with client_credentials grant type." , e ) ;
} catch ( KeyMgtException e ) {
log . error ( "Failed to create an application." , e ) ;
throw new OutputEventAdapterRuntimeException ( "Failed to create an application." , e ) ;
}
}
throw new OutputEventAdapterRuntimeException ( "Invalid configuration for mqtt publisher" ) ;
@ -206,24 +186,24 @@ public class MQTTAdapterPublisher {
private String getToken ( String clientId , String clientSecret )
throws UserStoreException , JWTClientException {
PrivilegedCarbonContext . startTenantFlow ( ) ;
PrivilegedCarbonContext . getThreadLocalCarbonContext ( ) . setTenant Id( tenantId , true ) ;
PrivilegedCarbonContext . getThreadLocalCarbonContext ( ) . setTenant Domain( tenantDomain , true ) ;
try {
String scopes = mqttBrokerConnectionConfiguration . getScopes ( ) ;
String username = mqttBrokerConnectionConfiguration . getUsername ( ) ;
if ( mqttBrokerConnectionConfiguration . isGlobalCredentailSet ( ) ) {
username = PrivilegedCarbonContext . getThreadLocalCarbonContext ( )
. getUserRealm ( ) . getRealmConfiguration ( ) . getAdminUserName ( ) + "@" + PrivilegedCarbonContext
. getThreadLocalCarbonContext ( ) . getTenantDomain ( true ) ;
}
JWTClientManagerService jwtClientManagerService =
OutputAdapterServiceDataHolder . getJwtClientManagerService ( ) ;
AccessTokenInfo accessTokenInfo = jwtClientManagerService . getJWTClient ( ) . getAccessToken (
clientId , clientSecret , username , scopes ) ;
return accessTokenInfo . getAccessToken ( ) ;
scopes + = " perm:topic:pub:" + tenantDomain + ":+:+:operation" ;
TokenRequest tokenRequest = new TokenRequest ( clientId , clientSecret ,
null , scopes . toString ( ) , "client_credentials" , null ,
null , null , null , Integer . MAX_VALUE ) ;
KeyMgtService keyMgtService = new KeyMgtServiceImpl ( ) ;
TokenResponse tokenResponse = keyMgtService . generateAccessToken ( tokenRequest ) ;
return tokenResponse . getAccessToken ( ) ;
} catch ( KeyMgtException | BadRequestException e ) {
log . error ( "Error while generating access token" , e ) ;
} finally {
PrivilegedCarbonContext . endTenantFlow ( ) ;
}
return null ;
}
private String getBase64Encode ( String key , String value ) {