Made device plugins to connect securely with the MQTT broker

revert-dabc3590
ayyoob 9 years ago
parent d3a769776b
commit cdb1f62ba9

@ -46,24 +46,14 @@
<artifactId>cxf-rt-frontend-jaxrs</artifactId> <artifactId>cxf-rt-frontend-jaxrs</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-http</artifactId>
<scope>provided</scope>
</dependency>
<!--MQTT --> <!--MQTT -->
<dependency> <dependency>
<groupId>org.eclipse.paho</groupId> <groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<scope>provided</scope>
</dependency> </dependency>
<!--IOT --> <!--IOT -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.1</version>
<scope>provided</scope>
</dependency>
<dependency> <dependency>
<groupId>org.wso2.carbon.devicemgt-plugins</groupId> <groupId>org.wso2.carbon.devicemgt-plugins</groupId>
<artifactId>org.wso2.carbon.device.mgt.iot</artifactId> <artifactId>org.wso2.carbon.device.mgt.iot</artifactId>
@ -100,12 +90,6 @@
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>commons-httpclient.wso2</groupId>
<artifactId>commons-httpclient</artifactId>
<scope>provided</scope>
</dependency>
<dependency> <dependency>
<groupId>org.wso2.carbon</groupId> <groupId>org.wso2.carbon</groupId>
<artifactId>org.wso2.carbon.utils</artifactId> <artifactId>org.wso2.carbon.utils</artifactId>
@ -174,14 +158,19 @@
<artifactId>org.wso2.carbon.apimgt.annotations</artifactId> <artifactId>org.wso2.carbon.apimgt.annotations</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>org.wso2.carbon.analytics</groupId>
<artifactId>org.wso2.carbon.analytics.api</artifactId>
<scope>provided</scope>
</dependency>
<dependency> <dependency>
<groupId>org.wso2.carbon.devicemgt</groupId> <groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>org.wso2.carbon.apimgt.webapp.publisher</artifactId> <artifactId>org.wso2.carbon.identity.jwt.client.extension</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.wso2.carbon.analytics</groupId> <groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>org.wso2.carbon.analytics.api</artifactId> <artifactId>org.wso2.carbon.apimgt.application.extension</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
</dependencies> </dependencies>

@ -74,9 +74,17 @@ public class AndroidSenseControllerServiceImpl implements AndroidSenseController
if (waitForServerStartup()) { if (waitForServerStartup()) {
return; return;
} }
//The delay is added till the server starts up.
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
AndroidSenseControllerServiceImpl.androidSenseMQTTConnector = androidSenseMQTTConnector; AndroidSenseControllerServiceImpl.androidSenseMQTTConnector = androidSenseMQTTConnector;
if (MqttConfig.getInstance().isEnabled()) { if (MqttConfig.getInstance().isEnabled()) {
androidSenseMQTTConnector.connect(); synchronized (androidSenseMQTTConnector) {
androidSenseMQTTConnector.connect();
}
} else { } else {
log.warn("MQTT disabled in 'devicemgt-config.xml'. Hence, VirtualFireAlarmMQTTConnector not started."); log.warn("MQTT disabled in 'devicemgt-config.xml'. Hence, VirtualFireAlarmMQTTConnector not started.");
} }

@ -24,6 +24,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.wso2.carbon.apimgt.application.extension.APIManagementProviderService;
import org.wso2.carbon.apimgt.application.extension.dto.ApiApplicationKey;
import org.wso2.carbon.apimgt.application.extension.exception.APIManagerException;
import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.analytics.data.publisher.exception.DataPublisherConfigurationException; import org.wso2.carbon.device.mgt.analytics.data.publisher.exception.DataPublisherConfigurationException;
import org.wso2.carbon.device.mgt.analytics.data.publisher.service.EventsPublisherService; import org.wso2.carbon.device.mgt.analytics.data.publisher.service.EventsPublisherService;
@ -31,6 +34,7 @@ import org.wso2.carbon.device.mgt.common.Device;
import org.wso2.carbon.device.mgt.common.DeviceIdentifier; import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
import org.wso2.carbon.device.mgt.common.DeviceManagementException; import org.wso2.carbon.device.mgt.common.DeviceManagementException;
import org.wso2.carbon.device.mgt.core.service.DeviceManagementProviderService; import org.wso2.carbon.device.mgt.core.service.DeviceManagementProviderService;
import org.wso2.carbon.device.mgt.iot.androidsense.service.impl.util.APIUtil;
import org.wso2.carbon.device.mgt.iot.androidsense.service.impl.util.DeviceData; import org.wso2.carbon.device.mgt.iot.androidsense.service.impl.util.DeviceData;
import org.wso2.carbon.device.mgt.iot.androidsense.service.impl.util.SensorData; import org.wso2.carbon.device.mgt.iot.androidsense.service.impl.util.SensorData;
import org.wso2.carbon.device.mgt.iot.androidsense.plugin.constants.AndroidSenseConstants; import org.wso2.carbon.device.mgt.iot.androidsense.plugin.constants.AndroidSenseConstants;
@ -38,6 +42,10 @@ import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig;
import org.wso2.carbon.device.mgt.iot.sensormgt.SensorDataManager; import org.wso2.carbon.device.mgt.iot.sensormgt.SensorDataManager;
import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException; import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException;
import org.wso2.carbon.device.mgt.iot.transport.mqtt.MQTTTransportHandler; import org.wso2.carbon.device.mgt.iot.transport.mqtt.MQTTTransportHandler;
import org.wso2.carbon.identity.jwt.client.extension.JWTClient;
import org.wso2.carbon.identity.jwt.client.extension.dto.AccessTokenInfo;
import org.wso2.carbon.identity.jwt.client.extension.exception.JWTClientException;
import org.wso2.carbon.user.api.UserStoreException;
import org.wso2.carbon.utils.multitenancy.MultitenantUtils; import org.wso2.carbon.utils.multitenancy.MultitenantUtils;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@ -48,6 +56,8 @@ public class AndroidSenseMQTTConnector extends MQTTTransportHandler {
private static Log log = LogFactory.getLog(AndroidSenseMQTTConnector.class); private static Log log = LogFactory.getLog(AndroidSenseMQTTConnector.class);
private static String subscribeTopic = AndroidSenseConstants.MQTT_SUBSCRIBE_WORDS_TOPIC; private static String subscribeTopic = AndroidSenseConstants.MQTT_SUBSCRIBE_WORDS_TOPIC;
private static String iotServerSubscriber = UUID.randomUUID().toString().substring(0, 5); private static String iotServerSubscriber = UUID.randomUUID().toString().substring(0, 5);
private static final String KEY_TYPE = "PRODUCTION";
private static final String EMPTY_STRING = "";
private AndroidSenseMQTTConnector() { private AndroidSenseMQTTConnector() {
super(iotServerSubscriber, AndroidSenseConstants.DEVICE_TYPE, super(iotServerSubscriber, AndroidSenseConstants.DEVICE_TYPE,
@ -59,7 +69,25 @@ public class AndroidSenseMQTTConnector extends MQTTTransportHandler {
Runnable connector = new Runnable() { Runnable connector = new Runnable() {
public void run() { public void run() {
while (!isConnected()) { while (!isConnected()) {
PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(
AndroidSenseConstants.DEVICE_TYPE_PROVIDER_DOMAIN, true);
try { try {
String applicationUsername = PrivilegedCarbonContext.getThreadLocalCarbonContext().getUserRealm()
.getRealmConfiguration().getAdminUserName();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setUsername(applicationUsername);
APIManagementProviderService apiManagementProviderService = APIUtil
.getAPIManagementProviderService();
String[] tags = {AndroidSenseConstants.DEVICE_TYPE};
ApiApplicationKey apiApplicationKey = apiManagementProviderService.generateAndRetrieveApplicationKeys(
AndroidSenseConstants.DEVICE_TYPE, tags, KEY_TYPE, applicationUsername, true);
JWTClient jwtClient = APIUtil.getJWTClientManagerService().getJWTClient();
String scopes = "device_type_" + AndroidSenseConstants.DEVICE_TYPE + " device_mqtt_connector";
AccessTokenInfo accessTokenInfo = jwtClient.getAccessToken(apiApplicationKey.getConsumerKey(),
apiApplicationKey.getConsumerSecret(), applicationUsername, scopes);
//create token
String accessToken = accessTokenInfo.getAccessToken();
setUsernameAndPassword(accessToken, EMPTY_STRING);
connectToQueue(); connectToQueue();
subscribeToQueue(); subscribeToQueue();
} catch (TransportHandlerException e) { } catch (TransportHandlerException e) {
@ -69,6 +97,14 @@ public class AndroidSenseMQTTConnector extends MQTTTransportHandler {
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
log.error("MQTT-Subscriber: Thread Sleep Interrupt Exception.", ex); log.error("MQTT-Subscriber: Thread Sleep Interrupt Exception.", ex);
} }
}catch (JWTClientException e) {
log.error("Failed to retrieve token from JWT Client.", e);
} catch (UserStoreException e) {
log.error("Failed to retrieve the user.", e);
} catch (APIManagerException e) {
log.error("Failed to create an application and generate keys.", e);
} finally {
PrivilegedCarbonContext.endTenantFlow();
} }
} }
} }

@ -10,9 +10,11 @@ import org.wso2.carbon.analytics.dataservice.commons.SortByField;
import org.wso2.carbon.analytics.dataservice.core.AnalyticsDataServiceUtils; import org.wso2.carbon.analytics.dataservice.core.AnalyticsDataServiceUtils;
import org.wso2.carbon.analytics.datasource.commons.Record; import org.wso2.carbon.analytics.datasource.commons.Record;
import org.wso2.carbon.analytics.datasource.commons.exception.AnalyticsException; import org.wso2.carbon.analytics.datasource.commons.exception.AnalyticsException;
import org.wso2.carbon.apimgt.application.extension.APIManagementProviderService;
import org.wso2.carbon.context.CarbonContext; import org.wso2.carbon.context.CarbonContext;
import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.core.service.DeviceManagementProviderService; import org.wso2.carbon.device.mgt.core.service.DeviceManagementProviderService;
import org.wso2.carbon.identity.jwt.client.extension.service.JWTClientManagerService;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
@ -127,4 +129,28 @@ public class APIUtil {
recordBean.setValues(record.getValues()); recordBean.setValues(record.getValues());
return recordBean; return recordBean;
} }
public static APIManagementProviderService getAPIManagementProviderService() {
PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext();
APIManagementProviderService apiManagementProviderService =
(APIManagementProviderService) ctx.getOSGiService(APIManagementProviderService.class, null);
if (apiManagementProviderService == null) {
String msg = "API management provider service has not initialized.";
log.error(msg);
throw new IllegalStateException(msg);
}
return apiManagementProviderService;
}
public static JWTClientManagerService getJWTClientManagerService() {
PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext();
JWTClientManagerService jwtClientManagerService =
(JWTClientManagerService) ctx.getOSGiService(JWTClientManagerService.class, null);
if (jwtClientManagerService == null) {
String msg = "JWT Client manager service has not initialized.";
log.error(msg);
throw new IllegalStateException(msg);
}
return jwtClientManagerService;
}
} }

@ -49,4 +49,5 @@ public class AndroidSenseConstants {
//MQTT Subscribe topic //MQTT Subscribe topic
public static final String MQTT_SUBSCRIBE_WORDS_TOPIC = "wso2/android_sense/+/data"; public static final String MQTT_SUBSCRIBE_WORDS_TOPIC = "wso2/android_sense/+/data";
public static final String DATA_SOURCE_NAME = "jdbc/AndroidSenseDM_DB"; public static final String DATA_SOURCE_NAME = "jdbc/AndroidSenseDM_DB";
public final static String DEVICE_TYPE_PROVIDER_DOMAIN = "carbon.super";
} }

@ -30,7 +30,6 @@ import java.util.List;
public class AndroidSenseManagerService implements DeviceManagementService { public class AndroidSenseManagerService implements DeviceManagementService {
private DeviceManager deviceManager; private DeviceManager deviceManager;
private static final String SUPER_TENANT_DOMAIN = "carbon.super";
@Override @Override
public String getType() { public String getType() {
@ -39,7 +38,7 @@ public class AndroidSenseManagerService implements DeviceManagementService {
@Override @Override
public String getProviderTenantDomain() { public String getProviderTenantDomain() {
return SUPER_TENANT_DOMAIN; return AndroidSenseConstants.DEVICE_TYPE_PROVIDER_DOMAIN;
} }
@Override @Override

@ -65,7 +65,7 @@
<artifactId>org.wso2.carbon.identity.jwt.client.extension</artifactId> <artifactId>org.wso2.carbon.identity.jwt.client.extension</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.jayway.jsonpath.wso2</groupId> <groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId> <artifactId>json-path</artifactId>
</dependency> </dependency>
<dependency> <dependency>

@ -54,9 +54,7 @@ public class HTTPEventAdapterFactory extends InputEventAdapterFactory {
@Override @Override
public List<String> getSupportedMessageFormats() { public List<String> getSupportedMessageFormats() {
List<String> supportInputMessageTypes = new ArrayList<String>(); List<String> supportInputMessageTypes = new ArrayList<String>();
supportInputMessageTypes.add(MessageType.XML);
supportInputMessageTypes.add(MessageType.JSON); supportInputMessageTypes.add(MessageType.JSON);
supportInputMessageTypes.add(MessageType.TEXT);
return supportInputMessageTypes; return supportInputMessageTypes;
} }

@ -32,7 +32,6 @@ public class HTTPContentValidator implements ContentValidator {
@Override @Override
public ContentInfo validate(Map<String, String> paramMap) { public ContentInfo validate(Map<String, String> paramMap) {
String deviceId = paramMap.get("deviceId"); String deviceId = paramMap.get("deviceId");
String msg = paramMap.get(HTTPEventAdapterConstants.PAYLOAD_TAG); String msg = paramMap.get(HTTPEventAdapterConstants.PAYLOAD_TAG);
String deviceIdJsonPath = paramMap.get(HTTPEventAdapterConstants.DEVICE_ID_JSON_PATH); String deviceIdJsonPath = paramMap.get(HTTPEventAdapterConstants.DEVICE_ID_JSON_PATH);
Object res = JsonPath.read(msg, deviceIdJsonPath); Object res = JsonPath.read(msg, deviceIdJsonPath);

@ -30,7 +30,7 @@ public class Constants {
public static final String CLIENT_NAME = "client_name"; public static final String CLIENT_NAME = "client_name";
public static final String DEFAULT = "default"; public static final String DEFAULT = "default";
public static final String MQTT_CONTENT_VALIDATION_DEFAULT_PARAMETERS = public static final String MQTT_CONTENT_VALIDATION_DEFAULT_PARAMETERS =
"device_id_json_path:meta_deviceId,device_id_topic_hierarchy_index:2"; "device_id_json_path:event.metaData.deviceId,device_id_topic_hierarchy_index:2";
public static final String TOPIC = "topic"; public static final String TOPIC = "topic";
public static final String PAYLOAD = "payload"; public static final String PAYLOAD = "payload";
public static final String DEVICE_ID_JSON_PATH = "device_id_json_path"; public static final String DEVICE_ID_JSON_PATH = "device_id_json_path";

@ -66,7 +66,7 @@ public class MQTTEventAdapter implements InputEventAdapter {
String params[] = contentValidationParams.split(","); String params[] = contentValidationParams.split(",");
Map<String, String> paramsMap = new HashMap<>(); Map<String, String> paramsMap = new HashMap<>();
for (String param: params) { for (String param: params) {
String paramsKeyAndValue[] = param.split("/_(.+)?/"); String paramsKeyAndValue[] = splitOnFirst(param, ':');
if (paramsKeyAndValue.length != 2) { if (paramsKeyAndValue.length != 2) {
throw new InputEventAdapterException("Invalid parameters for content validation - " + param); throw new InputEventAdapterException("Invalid parameters for content validation - " + param);
} }
@ -95,6 +95,13 @@ public class MQTTEventAdapter implements InputEventAdapter {
} }
} }
private String[] splitOnFirst(String str, char c) {
int idx = str.indexOf(c);
String head = str.substring(0, idx);
String tail = str.substring(idx + 1);
return new String[] { head, tail} ;
}
@Override @Override
public void testConnect() throws TestConnectionNotSupportedException { public void testConnect() throws TestConnectionNotSupportedException {
throw new TestConnectionNotSupportedException("not-supported"); throw new TestConnectionNotSupportedException("not-supported");

@ -38,11 +38,7 @@ public class MQTTEventAdapterFactory extends InputEventAdapterFactory {
@Override @Override
public List<String> getSupportedMessageFormats() { public List<String> getSupportedMessageFormats() {
List<String> supportInputMessageTypes = new ArrayList<String>(); List<String> supportInputMessageTypes = new ArrayList<String>();
supportInputMessageTypes.add(MessageType.TEXT);
supportInputMessageTypes.add(MessageType.JSON); supportInputMessageTypes.add(MessageType.JSON);
supportInputMessageTypes.add(MessageType.XML);
return supportInputMessageTypes; return supportInputMessageTypes;
} }
@ -52,28 +48,23 @@ public class MQTTEventAdapterFactory extends InputEventAdapterFactory {
// set topic // set topic
Property topicProperty = new Property(MQTTEventAdapterConstants.ADAPTER_MESSAGE_TOPIC); Property topicProperty = new Property(MQTTEventAdapterConstants.ADAPTER_MESSAGE_TOPIC);
topicProperty.setDisplayName( topicProperty.setDisplayName(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_MESSAGE_TOPIC));
resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_MESSAGE_TOPIC));
topicProperty.setRequired(true); topicProperty.setRequired(true);
topicProperty.setHint( topicProperty.setHint(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_MESSAGE_TOPIC_HINT));
resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_MESSAGE_TOPIC_HINT));
propertyList.add(topicProperty); propertyList.add(topicProperty);
//Broker Url //Broker Url
Property brokerUrl = new Property(MQTTEventAdapterConstants.ADAPTER_CONF_URL); Property brokerUrl = new Property(MQTTEventAdapterConstants.ADAPTER_CONF_URL);
brokerUrl.setDisplayName( brokerUrl.setDisplayName(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_URL));
resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_URL));
brokerUrl.setRequired(true); brokerUrl.setRequired(true);
brokerUrl.setHint(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_URL_HINT)); brokerUrl.setHint(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_URL_HINT));
propertyList.add(brokerUrl); propertyList.add(brokerUrl);
//DCR endpoint details //DCR endpoint details
Property dcrUrl = new Property(MQTTEventAdapterConstants.ADAPTER_CONF_DCR_URL); Property dcrUrl = new Property(MQTTEventAdapterConstants.ADAPTER_CONF_DCR_URL);
dcrUrl.setDisplayName( dcrUrl.setDisplayName(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_DCR_URL));
resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_DCR_URL));
dcrUrl.setRequired(false); dcrUrl.setRequired(false);
dcrUrl.setHint( dcrUrl.setHint(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_DCR_URL_HINT));
resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_DCR_URL_HINT));
propertyList.add(dcrUrl); propertyList.add(dcrUrl);
//Content Validator details //Content Validator details
@ -83,7 +74,6 @@ public class MQTTEventAdapterFactory extends InputEventAdapterFactory {
contentValidator.setRequired(false); contentValidator.setRequired(false);
contentValidator.setHint( contentValidator.setHint(
resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_CONTENT_VALIDATOR_CLASSNAME_HINT)); resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_CONTENT_VALIDATOR_CLASSNAME_HINT));
propertyList.add(contentValidator);
contentValidator.setDefaultValue(Constants.DEFAULT); contentValidator.setDefaultValue(Constants.DEFAULT);
propertyList.add(contentValidator); propertyList.add(contentValidator);
@ -94,7 +84,6 @@ public class MQTTEventAdapterFactory extends InputEventAdapterFactory {
contentValidatorParams.setRequired(false); contentValidatorParams.setRequired(false);
contentValidatorParams.setHint( contentValidatorParams.setHint(
resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_CONTENT_VALIDATOR_PARAMS_HINT)); resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_CONTENT_VALIDATOR_PARAMS_HINT));
propertyList.add(contentValidatorParams);
contentValidatorParams.setDefaultValue(Constants.MQTT_CONTENT_VALIDATION_DEFAULT_PARAMETERS); contentValidatorParams.setDefaultValue(Constants.MQTT_CONTENT_VALIDATION_DEFAULT_PARAMETERS);
propertyList.add(contentValidatorParams); propertyList.add(contentValidatorParams);
@ -103,9 +92,7 @@ public class MQTTEventAdapterFactory extends InputEventAdapterFactory {
userName.setDisplayName( userName.setDisplayName(
resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_USERNAME)); resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_USERNAME));
userName.setRequired(false); userName.setRequired(false);
userName.setHint( userName.setHint(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_USERNAME_HINT));
resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_USERNAME_HINT));
propertyList.add(userName);
propertyList.add(userName); propertyList.add(userName);
//Broker Required Scopes. //Broker Required Scopes.
@ -113,28 +100,23 @@ public class MQTTEventAdapterFactory extends InputEventAdapterFactory {
scopes.setDisplayName( scopes.setDisplayName(
resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_SCOPES)); resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_SCOPES));
scopes.setRequired(false); scopes.setRequired(false);
scopes.setHint( scopes.setHint(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_SCOPES_HINT));
resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_SCOPES_HINT));
propertyList.add(scopes); propertyList.add(scopes);
//Broker clear session //Broker clear session
Property clearSession = new Property(MQTTEventAdapterConstants.ADAPTER_CONF_CLEAN_SESSION); Property clearSession = new Property(MQTTEventAdapterConstants.ADAPTER_CONF_CLEAN_SESSION);
clearSession.setDisplayName( clearSession.setDisplayName(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_CLEAN_SESSION));
resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_CLEAN_SESSION));
clearSession.setRequired(false); clearSession.setRequired(false);
clearSession.setOptions(new String[]{"true", "false"}); clearSession.setOptions(new String[]{"true", "false"});
clearSession.setDefaultValue("true"); clearSession.setDefaultValue("true");
clearSession.setHint( clearSession.setHint(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_CLEAN_SESSION_HINT));
resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_CLEAN_SESSION_HINT));
propertyList.add(clearSession); propertyList.add(clearSession);
// set clientId // set clientId
Property clientId = new Property(MQTTEventAdapterConstants.ADAPTER_CONF_CLIENTID); Property clientId = new Property(MQTTEventAdapterConstants.ADAPTER_CONF_CLIENTID);
clientId.setDisplayName( clientId.setDisplayName(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_CLIENTID));
resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_CLIENTID));
clientId.setRequired(false); clientId.setRequired(false);
clientId.setHint( clientId.setHint(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_CLIENTID_HINT));
resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_CLIENTID_HINT));
propertyList.add(clientId); propertyList.add(clientId);
return propertyList; return propertyList;

@ -143,6 +143,9 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
String scopes = this.mqttBrokerConnectionConfiguration.getBrokerScopes(); String scopes = this.mqttBrokerConnectionConfiguration.getBrokerScopes();
//getJWT Client Parameters. //getJWT Client Parameters.
if (dcrUrlString != null && !dcrUrlString.isEmpty()) { if (dcrUrlString != null && !dcrUrlString.isEmpty()) {
PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(tenantId, true);
PrivilegedCarbonContext.getThreadLocalCarbonContext().setUsername(username);
try { try {
URL dcrUrl = new URL(dcrUrlString); URL dcrUrl = new URL(dcrUrlString);
HttpClient httpClient = MQTTUtil.getHttpClient(dcrUrl.getProtocol()); HttpClient httpClient = MQTTUtil.getHttpClient(dcrUrl.getProtocol());
@ -153,7 +156,6 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
registrationProfile.setOwner(username); registrationProfile.setOwner(username);
registrationProfile.setTokenScope(Constants.TOKEN_SCOPE); registrationProfile.setTokenScope(Constants.TOKEN_SCOPE);
registrationProfile.setApplicationType(Constants.APPLICATION_TYPE); registrationProfile.setApplicationType(Constants.APPLICATION_TYPE);
int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId(true);
registrationProfile.setClientName(username + "_" + tenantId); registrationProfile.setClientName(username + "_" + tenantId);
String jsonString = registrationProfile.toJSON(); String jsonString = registrationProfile.toJSON();
StringEntity requestEntity = new StringEntity(jsonString, ContentType.APPLICATION_JSON); StringEntity requestEntity = new StringEntity(jsonString, ContentType.APPLICATION_JSON);
@ -180,6 +182,8 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
log.error("Invalid dcrUrl : " + dcrUrlString); log.error("Invalid dcrUrl : " + dcrUrlString);
} catch (KeyManagementException | NoSuchAlgorithmException | KeyStoreException | IOException e) { } catch (KeyManagementException | NoSuchAlgorithmException | KeyStoreException | IOException e) {
log.error("Failed to create an https connection.", e); log.error("Failed to create an https connection.", e);
} finally {
PrivilegedCarbonContext.endTenantFlow();
} }
} }
} }
@ -231,15 +235,16 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Event received in MQTT Event Adapter - " + msgText); log.debug("Event received in MQTT Event Adapter - " + msgText);
} }
ContentInfo contentInfo;
synchronized (contentValidationParams) {
contentValidationParams.put(Constants.TOPIC, topic);
contentValidationParams.put(Constants.PAYLOAD, msgText);
contentInfo = contentValidator.validate(contentValidationParams);
contentValidationParams.remove(Constants.TOPIC);
contentValidationParams.remove(Constants.PAYLOAD);
}
if (contentValidator != null) { if (contentValidator != null) {
ContentInfo contentInfo;
synchronized (contentValidationParams) {
contentValidationParams.put(Constants.TOPIC, topic);
contentValidationParams.put(Constants.PAYLOAD, msgText);
contentInfo = contentValidator.validate(contentValidationParams);
contentValidationParams.remove(Constants.TOPIC);
contentValidationParams.remove(Constants.PAYLOAD);
}
if (contentInfo != null && contentInfo.isValidContent()) { if (contentInfo != null && contentInfo.isValidContent()) {
eventAdapterListener.onEvent(contentInfo.getMsgText()); eventAdapterListener.onEvent(contentInfo.getMsgText());
} }

@ -32,6 +32,7 @@ import javax.websocket.OnOpen;
import javax.websocket.Session; import javax.websocket.Session;
import javax.websocket.server.PathParam; import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint; import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
/** /**
* Connect to web socket with Super tenant * Connect to web socket with Super tenant
@ -61,13 +62,17 @@ public class SuperTenantSubscriptionEndpoint extends SubscriptionEndpoint {
try { try {
PrivilegedCarbonContext.startTenantFlow(); PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(MultitenantConstants.SUPER_TENANT_ID); PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(MultitenantConstants.SUPER_TENANT_ID);
ServiceHolder.getInstance().getUiOutputCallbackControllerService().subscribeWebsocket(streamName, ServiceHolder.getInstance().getUiOutputCallbackControllerService().subscribeWebsocket(streamName,
version, version, session);
session);
} finally { } finally {
PrivilegedCarbonContext.endTenantFlow(); PrivilegedCarbonContext.endTenantFlow();
} }
} else {
try {
session.close(new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, "Unauthorized Access"));
} catch (IOException e) {
log.error("Failed to disconnect the unauthorized client.");
}
} }
} }

@ -31,6 +31,7 @@ import javax.websocket.OnOpen;
import javax.websocket.Session; import javax.websocket.Session;
import javax.websocket.server.PathParam; import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint; import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
/** /**
* Connect to web socket with a tenant * Connect to web socket with a tenant
@ -66,6 +67,12 @@ public class TenantSubscriptionEndpoint extends SubscriptionEndpoint {
} finally { } finally {
PrivilegedCarbonContext.endTenantFlow(); PrivilegedCarbonContext.endTenantFlow();
} }
} else {
try {
session.close(new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, "Unauthorized Access"));
} catch (IOException e) {
log.error("Failed to disconnect the unauthorized client.");
}
} }
} }

@ -16,6 +16,7 @@ import util.AuthenticationInfo;
import javax.websocket.Session; import javax.websocket.Session;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.rmi.RemoteException; import java.rmi.RemoteException;
@ -32,16 +33,22 @@ public class OAuthTokenValdiator {
private static final String QUERY_KEY_VALUE_SEPERATOR = "="; private static final String QUERY_KEY_VALUE_SEPERATOR = "=";
private static final String TOKEN_TYPE = "bearer"; private static final String TOKEN_TYPE = "bearer";
private static final String TOKEN_IDENTIFIER = "token"; private static final String TOKEN_IDENTIFIER = "token";
private static OAuthTokenValdiator oAuthTokenValdiator = new OAuthTokenValdiator(); private static OAuthTokenValdiator oAuthTokenValdiator;
public static OAuthTokenValdiator getInstance() { public static OAuthTokenValdiator getInstance() {
return oAuthTokenValdiator; if (oAuthTokenValdiator == null) {
synchronized (OAuthTokenValdiator.class) {
if (oAuthTokenValdiator == null) {
oAuthTokenValdiator = new OAuthTokenValdiator();
}
}
}
return oAuthTokenValdiator;
} }
private OAuthTokenValdiator() { private OAuthTokenValdiator() {
Properties properties = null;
try { try {
properties = getWebSocketConfig(); Properties properties = getWebSocketConfig();
this.stubs = new GenericObjectPool(new OAuthTokenValidaterStubFactory(properties)); this.stubs = new GenericObjectPool(new OAuthTokenValidaterStubFactory(properties));
} catch (IOException e) { } catch (IOException e) {
log.error("Failed to parse the web socket config file " + WEBSOCKET_CONFIG_LOCATION); log.error("Failed to parse the web socket config file " + WEBSOCKET_CONFIG_LOCATION);
@ -145,9 +152,12 @@ public class OAuthTokenValdiator {
*/ */
private Properties getWebSocketConfig() throws IOException { private Properties getWebSocketConfig() throws IOException {
Properties properties = new Properties(); Properties properties = new Properties();
InputStream inputStream = getClass().getClassLoader().getResourceAsStream(WEBSOCKET_CONFIG_LOCATION); File configFile =new File(WEBSOCKET_CONFIG_LOCATION);
if (inputStream != null) { if (configFile.exists()) {
properties.load(inputStream); InputStream fileInputStream = new FileInputStream(configFile);
if (fileInputStream != null) {
properties.load(fileInputStream);
}
} }
return properties; return properties;
} }

@ -32,7 +32,7 @@ public class UIConstants {
public static final String ADAPTER_UI_COLON = ":"; public static final String ADAPTER_UI_COLON = ":";
public static final String MAXIMUM_TOTAL_HTTP_CONNECTION = "maximumTotalHttpConnection"; public static final String MAXIMUM_TOTAL_HTTP_CONNECTION = "maximumTotalHttpConnection";
public static final String MAXIMUM_HTTP_CONNECTION_PER_HOST = "maximumHttpConnectionPerHost"; public static final String MAXIMUM_HTTP_CONNECTION_PER_HOST = "maximumHttpConnectionPerHost";
public static final String TOKEN_VALIDATION_ENDPOINT_URL = "tokenValidationEndpointUrl"; public static final String TOKEN_VALIDATION_ENDPOINT_URL = "tokenValidationEndpoint";
public static final String USERNAME = "username"; public static final String USERNAME = "username";
public static final String PASSWORD = "password"; public static final String PASSWORD = "password";
} }

@ -70,11 +70,18 @@ public class DigitalDisplayControllerServiceImpl implements DigitalDisplayContro
return; return;
} }
DigitalDisplayControllerServiceImpl.digitalDisplayMQTTConnector = digitalDisplayMQTTConnector; DigitalDisplayControllerServiceImpl.digitalDisplayMQTTConnector = digitalDisplayMQTTConnector;
//The delay is added for the server starts up.
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (MqttConfig.getInstance().isEnabled()) { if (MqttConfig.getInstance().isEnabled()) {
digitalDisplayMQTTConnector.connect(); synchronized (digitalDisplayMQTTConnector) {
digitalDisplayMQTTConnector.connect();
}
} else { } else {
log.warn("MQTT disabled in 'devicemgt-config.xml'. " + log.warn("MQTT disabled in 'devicemgt-config.xml'. Hence, not started.");
"Hence, DigitalDisplayMQTTConnector not started.");
} }
} }
}; };

@ -5,12 +5,21 @@ import org.apache.commons.logging.LogFactory;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.json.JSONObject; import org.json.JSONObject;
import org.wso2.carbon.apimgt.application.extension.APIManagementProviderService;
import org.wso2.carbon.apimgt.application.extension.dto.ApiApplicationKey;
import org.wso2.carbon.apimgt.application.extension.exception.APIManagerException;
import org.wso2.carbon.base.MultitenantConstants;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig; import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig;
import org.wso2.carbon.device.mgt.iot.digitaldisplay.service.impl.model.ScreenShotModel; import org.wso2.carbon.device.mgt.iot.digitaldisplay.service.impl.model.ScreenShotModel;
import org.wso2.carbon.device.mgt.iot.digitaldisplay.service.impl.websocket.DigitalDisplayWebSocketServerEndPoint; import org.wso2.carbon.device.mgt.iot.digitaldisplay.service.impl.websocket.DigitalDisplayWebSocketServerEndPoint;
import org.wso2.carbon.device.mgt.iot.digitaldisplay.plugin.constants.DigitalDisplayConstants; import org.wso2.carbon.device.mgt.iot.digitaldisplay.plugin.constants.DigitalDisplayConstants;
import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException; import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException;
import org.wso2.carbon.device.mgt.iot.transport.mqtt.MQTTTransportHandler; import org.wso2.carbon.device.mgt.iot.transport.mqtt.MQTTTransportHandler;
import org.wso2.carbon.identity.jwt.client.extension.JWTClient;
import org.wso2.carbon.identity.jwt.client.extension.dto.AccessTokenInfo;
import org.wso2.carbon.identity.jwt.client.extension.exception.JWTClientException;
import org.wso2.carbon.user.api.UserStoreException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -24,6 +33,8 @@ public class DigitalDisplayMQTTConnector extends MQTTTransportHandler {
private static final String MQTT_TOPIC_APPENDER = "wso2/iot"; private static final String MQTT_TOPIC_APPENDER = "wso2/iot";
private static final String subscribeTopic = private static final String subscribeTopic =
MQTT_TOPIC_APPENDER + "/" + DigitalDisplayConstants.DEVICE_TYPE + "/+/digital_display_publisher"; MQTT_TOPIC_APPENDER + "/" + DigitalDisplayConstants.DEVICE_TYPE + "/+/digital_display_publisher";
private static final String KEY_TYPE = "PRODUCTION";
private static final String EMPTY_STRING = "";
private static String iotServerSubscriber = UUID.randomUUID().toString().substring(0, 5); private static String iotServerSubscriber = UUID.randomUUID().toString().substring(0, 5);
@ -41,24 +52,41 @@ public class DigitalDisplayMQTTConnector extends MQTTTransportHandler {
Runnable connector = new Runnable() { Runnable connector = new Runnable() {
public void run() { public void run() {
while (!isConnected()) { while (!isConnected()) {
PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(
DigitalDisplayConstants.DEVICE_TYPE_PROVIDER_DOMAIN, true);
try { try {
String brokerUsername = MqttConfig.getInstance().getMqttQueueUsername(); String applicationUsername = PrivilegedCarbonContext.getThreadLocalCarbonContext().getUserRealm()
String brokerPassword = MqttConfig.getInstance().getMqttQueuePassword(); .getRealmConfiguration().getAdminUserName();
setUsernameAndPassword(brokerUsername, brokerPassword); PrivilegedCarbonContext.getThreadLocalCarbonContext().setUsername(applicationUsername);
APIManagementProviderService apiManagementProviderService = APIUtil.getAPIManagementProviderService();
String[] tags = {DigitalDisplayConstants.DEVICE_TYPE};
ApiApplicationKey apiApplicationKey = apiManagementProviderService.generateAndRetrieveApplicationKeys(
DigitalDisplayConstants.DEVICE_TYPE, tags, KEY_TYPE, applicationUsername, true);
JWTClient jwtClient = APIUtil.getJWTClientManagerService().getJWTClient();
String scopes = "device_type_" + DigitalDisplayConstants.DEVICE_TYPE + " device_mqtt_connector";
AccessTokenInfo accessTokenInfo = jwtClient.getAccessToken(apiApplicationKey.getConsumerKey(),
apiApplicationKey.getConsumerSecret(), applicationUsername, scopes);
//create token
String accessToken = accessTokenInfo.getAccessToken();
setUsernameAndPassword(accessToken, EMPTY_STRING);
connectToQueue(); connectToQueue();
subscribeToQueue();
} catch (TransportHandlerException e) { } catch (TransportHandlerException e) {
log.error("Connection to MQTT Broker at: " + mqttBrokerEndPoint + " failed", e); log.error("Connection/Subscription to MQTT Broker at: " + mqttBrokerEndPoint + " failed", e);
try { try {
Thread.sleep(timeoutInterval); Thread.sleep(timeoutInterval);
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
log.error("MQTT-Connector: Thread Sleep Interrupt Exception.", ex); log.error("MQTT-Connector: Thread Sleep Interrupt Exception.", ex);
} }
} } catch (JWTClientException e) {
log.error("Failed to retrieve token from JWT Client.", e);
try { } catch (UserStoreException e) {
subscribeToQueue(); log.error("Failed to retrieve the user.", e);
} catch (TransportHandlerException e) { } catch (APIManagerException e) {
log.warn("Subscription to MQTT Broker at: " + mqttBrokerEndPoint + " failed", e); log.error("Failed to create an application and generate keys.", e);
} finally {
PrivilegedCarbonContext.endTenantFlow();
} }
} }
} }

@ -34,4 +34,5 @@ public class DigitalDisplayConstants {
public final static String GET_DEVICE_STATUS_CONSTANT = "get_device_status"; public final static String GET_DEVICE_STATUS_CONSTANT = "get_device_status";
public final static String PUBLISH_TOPIC = "wso2/iot/digital_display/%s/digital_display_subscriber"; public final static String PUBLISH_TOPIC = "wso2/iot/digital_display/%s/digital_display_subscriber";
public static final String DATA_SOURCE_NAME = "jdbc/DigitalDisplayDM_DB"; public static final String DATA_SOURCE_NAME = "jdbc/DigitalDisplayDM_DB";
public final static String DEVICE_TYPE_PROVIDER_DOMAIN = "carbon.super";
} }

@ -30,7 +30,6 @@ import java.util.List;
public class DigitalDisplayManagerService implements DeviceManagementService{ public class DigitalDisplayManagerService implements DeviceManagementService{
private DeviceManager deviceManager; private DeviceManager deviceManager;
private final static String DEVICE_TYPE_PROVIDER_DOMAIN = "carbon.super";
@Override @Override
public String getType() { public String getType() {
@ -39,7 +38,7 @@ public class DigitalDisplayManagerService implements DeviceManagementService{
@Override @Override
public String getProviderTenantDomain() { public String getProviderTenantDomain() {
return DEVICE_TYPE_PROVIDER_DOMAIN; return DigitalDisplayConstants.DEVICE_TYPE_PROVIDER_DOMAIN;
} }
@Override @Override

@ -72,10 +72,18 @@ public class RaspberryPiControllerServiceImpl implements RaspberryPiControllerSe
return; return;
} }
RaspberryPiControllerServiceImpl.this.raspberryPiMQTTConnector = raspberryPiMQTTConnector; RaspberryPiControllerServiceImpl.this.raspberryPiMQTTConnector = raspberryPiMQTTConnector;
//The delay is added for the server starts up.
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (MqttConfig.getInstance().isEnabled()) { if (MqttConfig.getInstance().isEnabled()) {
raspberryPiMQTTConnector.connect(); synchronized (raspberryPiMQTTConnector) {
raspberryPiMQTTConnector.connect();
}
} else { } else {
log.warn("MQTT disabled in 'devicemgt-config.xml'. Hence, RaspberryPiMQTTConnector not started."); log.warn("MQTT disabled in 'devicemgt-config.xml'. Hence, not started.");
} }
} }
}; };

@ -22,17 +22,26 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.wso2.carbon.apimgt.application.extension.APIManagementProviderService;
import org.wso2.carbon.apimgt.application.extension.dto.ApiApplicationKey;
import org.wso2.carbon.apimgt.application.extension.exception.APIManagerException;
import org.wso2.carbon.base.MultitenantConstants;
import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.common.Device; import org.wso2.carbon.device.mgt.common.Device;
import org.wso2.carbon.device.mgt.common.DeviceIdentifier; import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
import org.wso2.carbon.device.mgt.common.DeviceManagementException; import org.wso2.carbon.device.mgt.common.DeviceManagementException;
import org.wso2.carbon.device.mgt.core.service.DeviceManagementProviderService; import org.wso2.carbon.device.mgt.core.service.DeviceManagementProviderService;
import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig; import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig;
import org.wso2.carbon.device.mgt.iot.raspberrypi.service.impl.util.APIUtil;
import org.wso2.carbon.device.mgt.iot.raspberrypi.service.impl.util.RaspberrypiServiceUtils; import org.wso2.carbon.device.mgt.iot.raspberrypi.service.impl.util.RaspberrypiServiceUtils;
import org.wso2.carbon.device.mgt.iot.raspberrypi.plugin.constants.RaspberrypiConstants; import org.wso2.carbon.device.mgt.iot.raspberrypi.plugin.constants.RaspberrypiConstants;
import org.wso2.carbon.device.mgt.iot.sensormgt.SensorDataManager; import org.wso2.carbon.device.mgt.iot.sensormgt.SensorDataManager;
import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException; import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException;
import org.wso2.carbon.device.mgt.iot.transport.mqtt.MQTTTransportHandler; import org.wso2.carbon.device.mgt.iot.transport.mqtt.MQTTTransportHandler;
import org.wso2.carbon.identity.jwt.client.extension.JWTClient;
import org.wso2.carbon.identity.jwt.client.extension.dto.AccessTokenInfo;
import org.wso2.carbon.identity.jwt.client.extension.exception.JWTClientException;
import org.wso2.carbon.user.api.UserStoreException;
import org.wso2.carbon.utils.multitenancy.MultitenantUtils; import org.wso2.carbon.utils.multitenancy.MultitenantUtils;
import java.util.Calendar; import java.util.Calendar;
@ -41,6 +50,8 @@ import java.util.UUID;
public class RaspberryPiMQTTConnector extends MQTTTransportHandler { public class RaspberryPiMQTTConnector extends MQTTTransportHandler {
private static Log log = LogFactory.getLog(RaspberryPiMQTTConnector.class); private static Log log = LogFactory.getLog(RaspberryPiMQTTConnector.class);
private static final String subscribeTopic = "wso2/" + RaspberrypiConstants.DEVICE_TYPE + "/+/publisher"; private static final String subscribeTopic = "wso2/" + RaspberrypiConstants.DEVICE_TYPE + "/+/publisher";
private static final String KEY_TYPE = "PRODUCTION";
private static final String EMPTY_STRING = "";
private static final String iotServerSubscriber = UUID.randomUUID().toString().substring(0, 5); private static final String iotServerSubscriber = UUID.randomUUID().toString().substring(0, 5);
@ -54,25 +65,42 @@ public class RaspberryPiMQTTConnector extends MQTTTransportHandler {
Runnable connector = new Runnable() { Runnable connector = new Runnable() {
public void run() { public void run() {
while (!isConnected()) { while (!isConnected()) {
try { PrivilegedCarbonContext.startTenantFlow();
String brokerUsername = MqttConfig.getInstance().getMqttQueueUsername(); PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(
String brokerPassword = MqttConfig.getInstance().getMqttQueuePassword(); RaspberrypiConstants.DEVICE_TYPE_PROVIDER_DOMAIN, true);
setUsernameAndPassword(brokerUsername, brokerPassword); try {
String applicationUsername = PrivilegedCarbonContext.getThreadLocalCarbonContext().getUserRealm()
.getRealmConfiguration().getAdminUserName();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setUsername(applicationUsername);
APIManagementProviderService apiManagementProviderService = APIUtil.getAPIManagementProviderService();
String[] tags = {RaspberrypiConstants.DEVICE_TYPE};
ApiApplicationKey apiApplicationKey = apiManagementProviderService.generateAndRetrieveApplicationKeys(
RaspberrypiConstants.DEVICE_TYPE, tags, KEY_TYPE, applicationUsername, true);
JWTClient jwtClient = APIUtil.getJWTClientManagerService().getJWTClient();
String scopes = "device_type_" + RaspberrypiConstants.DEVICE_TYPE + " device_mqtt_connector";
AccessTokenInfo accessTokenInfo = jwtClient.getAccessToken(apiApplicationKey.getConsumerKey(),
apiApplicationKey.getConsumerSecret(), applicationUsername, scopes);
//create token
String accessToken = accessTokenInfo.getAccessToken();
setUsernameAndPassword(accessToken, EMPTY_STRING);
connectToQueue(); connectToQueue();
} catch (TransportHandlerException e) {
log.error("Connection to MQTT Broker at: " + mqttBrokerEndPoint + " failed", e);
try {
Thread.sleep(timeoutInterval);
} catch (InterruptedException ex) {
log.error("MQTT-Connector: Thread Sleep Interrupt Exception.", ex);
}
}
try {
subscribeToQueue(); subscribeToQueue();
} catch (TransportHandlerException e) { } catch (TransportHandlerException e) {
log.warn("Subscription to MQTT Broker at: " + mqttBrokerEndPoint + " failed", e); log.error("Connection/Subscription to MQTT Broker at: " + mqttBrokerEndPoint + " failed", e);
} try {
Thread.sleep(timeoutInterval);
} catch (InterruptedException ex) {
log.error("MQTT-Connector: Thread Sleep Interrupt Exception.", ex);
}
} catch (JWTClientException e) {
log.error("Failed to retrieve token from JWT Client.", e);
} catch (UserStoreException e) {
log.error("Failed to retrieve the user.", e);
} catch (APIManagerException e) {
log.error("Failed to create an application and generate keys.", e);
} finally {
PrivilegedCarbonContext.endTenantFlow();
}
} }
} }
}; };

@ -35,5 +35,6 @@ public class RaspberrypiConstants {
//sensor events summerized table name //sensor events summerized table name
public static final String TEMPERATURE_EVENT_TABLE = "ORG_WSO2_IOT_DEVICES_TEMPERATURE"; public static final String TEMPERATURE_EVENT_TABLE = "ORG_WSO2_IOT_DEVICES_TEMPERATURE";
public static final String DATA_SOURCE_NAME = "jdbc/RaspberryPiDM_DB"; public static final String DATA_SOURCE_NAME = "jdbc/RaspberryPiDM_DB";
public final static String DEVICE_TYPE_PROVIDER_DOMAIN = "carbon.super";
} }

@ -33,7 +33,6 @@ import java.util.List;
public class RaspberrypiManagerService implements DeviceManagementService { public class RaspberrypiManagerService implements DeviceManagementService {
private DeviceManager deviceManager; private DeviceManager deviceManager;
private final static String DEVICE_TYPE_PROVIDER_DOMAIN = "carbon.super";
@Override @Override
public String getType() { public String getType() {
@ -42,7 +41,7 @@ public class RaspberrypiManagerService implements DeviceManagementService {
@Override @Override
public String getProviderTenantDomain() { public String getProviderTenantDomain() {
return DEVICE_TYPE_PROVIDER_DOMAIN; return RaspberrypiConstants.DEVICE_TYPE_PROVIDER_DOMAIN;
} }
@Override @Override

@ -185,8 +185,16 @@ public class VirtualFireAlarmControllerServiceImpl implements VirtualFireAlarmCo
return; return;
} }
VirtualFireAlarmControllerServiceImpl.this.virtualFireAlarmMQTTConnector = virtualFireAlarmMQTTConnector; VirtualFireAlarmControllerServiceImpl.this.virtualFireAlarmMQTTConnector = virtualFireAlarmMQTTConnector;
//The delay is added for the server starts up.
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (MqttConfig.getInstance().isEnabled()) { if (MqttConfig.getInstance().isEnabled()) {
virtualFireAlarmMQTTConnector.connect(); synchronized (virtualFireAlarmMQTTConnector) {
virtualFireAlarmMQTTConnector.connect();
}
} else { } else {
log.warn("MQTT disabled in 'devicemgt-config.xml'. Hence, VirtualFireAlarmMQTTConnector not started."); log.warn("MQTT disabled in 'devicemgt-config.xml'. Hence, VirtualFireAlarmMQTTConnector not started.");
} }

@ -22,6 +22,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.wso2.carbon.apimgt.application.extension.APIManagementProviderService;
import org.wso2.carbon.apimgt.application.extension.dto.ApiApplicationKey;
import org.wso2.carbon.apimgt.application.extension.exception.APIManagerException;
import org.wso2.carbon.base.MultitenantConstants;
import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.common.Device; import org.wso2.carbon.device.mgt.common.Device;
import org.wso2.carbon.device.mgt.common.DeviceIdentifier; import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
@ -32,9 +36,14 @@ import org.wso2.carbon.device.mgt.iot.sensormgt.SensorDataManager;
import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException; import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException;
import org.wso2.carbon.device.mgt.iot.transport.mqtt.MQTTTransportHandler; import org.wso2.carbon.device.mgt.iot.transport.mqtt.MQTTTransportHandler;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl.exception.VirtualFireAlarmException; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl.exception.VirtualFireAlarmException;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl.util.APIUtil;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl.util.SecurityManager; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl.util.SecurityManager;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl.util.VirtualFireAlarmServiceUtils; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.impl.util.VirtualFireAlarmServiceUtils;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.plugin.constants.VirtualFireAlarmConstants; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.plugin.constants.VirtualFireAlarmConstants;
import org.wso2.carbon.identity.jwt.client.extension.JWTClient;
import org.wso2.carbon.identity.jwt.client.extension.dto.AccessTokenInfo;
import org.wso2.carbon.identity.jwt.client.extension.exception.JWTClientException;
import org.wso2.carbon.user.api.UserStoreException;
import org.wso2.carbon.utils.multitenancy.MultitenantUtils; import org.wso2.carbon.utils.multitenancy.MultitenantUtils;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@ -64,6 +73,8 @@ public class VirtualFireAlarmMQTTConnector extends MQTTTransportHandler {
// wildcard (+) is in place for device_owner & device_id // wildcard (+) is in place for device_owner & device_id
private static String subscribeTopic = "wso2/" + VirtualFireAlarmConstants.DEVICE_TYPE + "/+/publisher"; private static String subscribeTopic = "wso2/" + VirtualFireAlarmConstants.DEVICE_TYPE + "/+/publisher";
private static String iotServerSubscriber = UUID.randomUUID().toString().substring(0, 5); private static String iotServerSubscriber = UUID.randomUUID().toString().substring(0, 5);
private static final String KEY_TYPE = "PRODUCTION";
private static final String EMPTY_STRING = "";
/** /**
* Default constructor for the VirtualFirealarmMQTTConnector. * Default constructor for the VirtualFirealarmMQTTConnector.
@ -83,24 +94,41 @@ public class VirtualFireAlarmMQTTConnector extends MQTTTransportHandler {
Runnable connector = new Runnable() { Runnable connector = new Runnable() {
public void run() { public void run() {
while (!isConnected()) { while (!isConnected()) {
PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(
VirtualFireAlarmConstants.DEVICE_TYPE_PROVIDER_DOMAIN, true);
try { try {
String brokerUsername = MqttConfig.getInstance().getMqttQueueUsername(); String applicationUsername = PrivilegedCarbonContext.getThreadLocalCarbonContext().getUserRealm()
String brokerPassword = MqttConfig.getInstance().getMqttQueuePassword(); .getRealmConfiguration().getAdminUserName();
setUsernameAndPassword(brokerUsername, brokerPassword); PrivilegedCarbonContext.getThreadLocalCarbonContext().setUsername(applicationUsername);
APIManagementProviderService apiManagementProviderService = APIUtil.getAPIManagementProviderService();
String[] tags = {VirtualFireAlarmConstants.DEVICE_TYPE};
ApiApplicationKey apiApplicationKey = apiManagementProviderService.generateAndRetrieveApplicationKeys(
VirtualFireAlarmConstants.DEVICE_TYPE, tags, KEY_TYPE, applicationUsername, true);
JWTClient jwtClient = APIUtil.getJWTClientManagerService().getJWTClient();
String scopes = "device_type_" + VirtualFireAlarmConstants.DEVICE_TYPE + " device_mqtt_connector";
AccessTokenInfo accessTokenInfo = jwtClient.getAccessToken(apiApplicationKey.getConsumerKey(),
apiApplicationKey.getConsumerSecret(), applicationUsername, scopes);
//create token
String accessToken = accessTokenInfo.getAccessToken();
setUsernameAndPassword(accessToken, EMPTY_STRING);
connectToQueue(); connectToQueue();
subscribeToQueue();
} catch (TransportHandlerException e) { } catch (TransportHandlerException e) {
log.error("Connection to MQTT Broker at: " + mqttBrokerEndPoint + " failed", e); log.error("Connection/Subscription to MQTT Broker at: " + mqttBrokerEndPoint + " failed", e);
try { try {
Thread.sleep(timeoutInterval); Thread.sleep(timeoutInterval);
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
log.error("MQTT-Connector: Thread Sleep Interrupt Exception.", ex); log.error("MQTT-Connector: Thread Sleep Interrupt Exception.", ex);
} }
} } catch (JWTClientException e) {
log.error("Failed to retrieve token from JWT Client.", e);
try { } catch (UserStoreException e) {
subscribeToQueue(); log.error("Failed to retrieve the user.", e);
} catch (TransportHandlerException e) { } catch (APIManagerException e) {
log.warn("Subscription to MQTT Broker at: " + mqttBrokerEndPoint + " failed", e); log.error("Failed to create an application and generate keys.", e);
} finally {
PrivilegedCarbonContext.endTenantFlow();
} }
} }
} }

@ -37,4 +37,5 @@ public class VirtualFireAlarmConstants {
//sensor events summerized table name for humidity //sensor events summerized table name for humidity
public static final String HUMIDITY_EVENT_TABLE = "DEVICE_HUMIDITY_SUMMARY"; public static final String HUMIDITY_EVENT_TABLE = "DEVICE_HUMIDITY_SUMMARY";
public static final String DATA_SOURCE_NAME = "jdbc/VirtualFireAlarmDM_DB"; public static final String DATA_SOURCE_NAME = "jdbc/VirtualFireAlarmDM_DB";
public final static String DEVICE_TYPE_PROVIDER_DOMAIN = "carbon.super";
} }

@ -32,7 +32,6 @@ import java.util.List;
public class VirtualFireAlarmManagerService implements DeviceManagementService{ public class VirtualFireAlarmManagerService implements DeviceManagementService{
private DeviceManager deviceManager; private DeviceManager deviceManager;
private final static String DEVICE_TYPE_PROVIDER_DOMAIN = "carbon.super";
@Override @Override
public String getType() { public String getType() {
@ -42,7 +41,7 @@ public class VirtualFireAlarmManagerService implements DeviceManagementService{
@Override @Override
public String getProviderTenantDomain() { public String getProviderTenantDomain() {
return DEVICE_TYPE_PROVIDER_DOMAIN; return VirtualFireAlarmConstants.DEVICE_TYPE_PROVIDER_DOMAIN;
} }
@Override @Override

@ -99,12 +99,6 @@
<bundleDef> <bundleDef>
org.wso2.carbon.devicemgt-plugins:org.wso2.carbon.event.output.adapter.extensions.ui:${carbon.devicemgt.plugins.version} org.wso2.carbon.devicemgt-plugins:org.wso2.carbon.event.output.adapter.extensions.ui:${carbon.devicemgt.plugins.version}
</bundleDef> </bundleDef>
<bundleDef>
com.jayway.jsonpath.wso2:json-path:${json.path.version}
</bundleDef>
<bundleDef>
net.minidev.wso2:json-smart:${json.smart.version}
</bundleDef>
</bundles> </bundles>
<importFeatures> <importFeatures>
<importFeatureDef>org.wso2.carbon.core.server:${carbon.kernel.version} <importFeatureDef>org.wso2.carbon.core.server:${carbon.kernel.version}

@ -1,5 +1,5 @@
tokenValidationEndpoint=https://localhost:9443/services/OAuth2TokenValidationService tokenValidationEndpoint=https://localhost:9443/services/OAuth2TokenValidationService
username=admin username=admin
password=admin password=admin
maxConnectionsPerHost=2 maximumHttpConnectionPerHost=2
maxTotalConnections=100 maximumTotalHttpConnection=100

@ -1052,7 +1052,7 @@
<version>${json-simple.version}</version> <version>${json-simple.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.jayway.jsonpath.wso2</groupId> <groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId> <artifactId>json-path</artifactId>
<version>${json.path.version}</version> <version>${json.path.version}</version>
</dependency> </dependency>
@ -1202,8 +1202,7 @@
<httpcore.version>4.3.3.wso2v1</httpcore.version> <httpcore.version>4.3.3.wso2v1</httpcore.version>
<!--json version--> <!--json version-->
<json-simple.version>1.1.wso2v1</json-simple.version> <json-simple.version>1.1.wso2v1</json-simple.version>
<json.path.version>1.2.0.wso2v1</json.path.version> <json.path.version>0.9.1</json.path.version>
<json.smart.version>2.1.0.wso2v1</json.smart.version>
<!--websocket related lib versions--> <!--websocket related lib versions-->
<tomcat.websocket.version>7.0.54</tomcat.websocket.version> <tomcat.websocket.version>7.0.54</tomcat.websocket.version>
<javax.websocket.version>1.0</javax.websocket.version> <javax.websocket.version>1.0</javax.websocket.version>

Loading…
Cancel
Save