upstream changes

NuwanSameera 9 years ago
commit 41de719f15

@ -11,7 +11,7 @@
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
@ -24,10 +24,11 @@ import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.apimgt.annotations.api.API;
import org.wso2.carbon.apimgt.annotations.device.DeviceType;
import org.wso2.carbon.apimgt.annotations.device.feature.Feature;
import org.wso2.carbon.device.mgt.iot.arduino.plugin.constants.ArduinoConstants;
import org.wso2.carbon.device.mgt.iot.DeviceManagement;
import org.wso2.carbon.device.mgt.iot.arduino.controller.service.impl.dto.DeviceJSON;
import org.wso2.carbon.device.mgt.iot.arduino.controller.service.impl.transport.ArduinoMQTTConnector;
import org.wso2.carbon.device.mgt.iot.arduino.controller.service.impl.util.ArduinoServiceUtils;
import org.wso2.carbon.device.mgt.iot.arduino.plugin.constants.ArduinoConstants;
import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig;
import org.wso2.carbon.device.mgt.iot.exception.DeviceControllerException;
import org.wso2.carbon.device.mgt.iot.sensormgt.SensorDataManager;
@ -35,7 +36,14 @@ import org.wso2.carbon.device.mgt.iot.sensormgt.SensorRecord;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.*;
import javax.ws.rs.Consumes;
import javax.ws.rs.FormParam;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@ -50,34 +58,41 @@ import java.util.concurrent.ConcurrentHashMap;
@DeviceType( value = "arduino")
public class ArduinoControllerService {
private static Log log = LogFactory.getLog(ArduinoControllerService.class);
public static final String HTTP_PROTOCOL = "HTTP";
public static final String MQTT_PROTOCOL = "MQTT";
//TODO; replace this tenant domain
private static final String SUPER_TENANT = "carbon.super";
private static Log log = LogFactory.getLog(ArduinoControllerService.class);
private static Map<String, LinkedList<String>> replyMsgQueue = new HashMap<>();
private static Map<String, LinkedList<String>> internalControlsQueue = new HashMap<>();
@Context //injected response proxy supporting multiple thread
private HttpServletResponse response;
public static final String HTTP_PROTOCOL = "HTTP";
public static final String MQTT_PROTOCOL = "MQTT";
private ArduinoMQTTConnector arduinoMQTTConnector;
private static Map<String, LinkedList<String>> replyMsgQueue = new HashMap<>();
private static Map<String, LinkedList<String>> internalControlsQueue = new HashMap<>();
private ConcurrentHashMap<String, String> deviceToIpMap = new ConcurrentHashMap<>();
/**
* @param arduinoMQTTConnector an object of type "ArduinoMQTTConnector" specific for this ArduinoControllerService
* @return the queue containing all the MQTT reply messages from all Arduinos communicating to this service
*/
@SuppressWarnings("unused")
public void setArduinoMQTTConnector(
final ArduinoMQTTConnector arduinoMQTTConnector) {
this.arduinoMQTTConnector = arduinoMQTTConnector;
if (MqttConfig.getInstance().isEnabled()) {
arduinoMQTTConnector.connect();
} else {
log.warn("MQTT disabled in 'devicemgt-config.xml'. Hence, ArduinoMQTTConnector not started.");
public static Map<String, LinkedList<String>> getReplyMsgQueue() {
return replyMsgQueue;
}
/**
* @return the queue containing all the MQTT controls received to be sent to any Arduinos connected to this server
*/
public static Map<String, LinkedList<String>> getInternalControlsQueue() {
return internalControlsQueue;
}
private boolean waitForServerStartup() {
while (!DeviceManagement.isServerReady()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
return true;
}
}
return false;
}
/**
@ -89,17 +104,27 @@ public class ArduinoControllerService {
}
/**
* @return the queue containing all the MQTT reply messages from all Arduinos communicating to this service
*/
public static Map<String, LinkedList<String>> getReplyMsgQueue() {
return replyMsgQueue;
}
/**
* @return the queue containing all the MQTT controls received to be sent to any Arduinos connected to this server
* @param arduinoMQTTConnector an object of type "ArduinoMQTTConnector" specific for this ArduinoControllerService
*/
public static Map<String, LinkedList<String>> getInternalControlsQueue() {
return internalControlsQueue;
@SuppressWarnings("unused")
public void setArduinoMQTTConnector(
final ArduinoMQTTConnector arduinoMQTTConnector) {
Runnable connector = new Runnable() {
public void run() {
if (waitForServerStartup()) {
return;
}
ArduinoControllerService.this.arduinoMQTTConnector = arduinoMQTTConnector;
if (MqttConfig.getInstance().isEnabled()) {
arduinoMQTTConnector.connect();
} else {
log.warn("MQTT disabled in 'devicemgt-config.xml'. Hence, ArduinoMQTTConnector not started.");
}
}
};
Thread connectorThread = new Thread(connector);
connectorThread.setDaemon(true);
connectorThread.start();
}
/* ---------------------------------------------------------------------------------------

@ -88,58 +88,60 @@ public class ArduinoMQTTConnector extends MQTTTransportHandler {
@Override
public void processIncomingMessage(MqttMessage message, String... messageParams) throws TransportHandlerException {
String topic = messageParams[0];
// owner and the deviceId are extracted from the MQTT topic to which the messgae was received.
String ownerAndId = topic.replace(serverName + File.separator, "");
ownerAndId = ownerAndId.replace(File.separator + ArduinoConstants.DEVICE_TYPE + File.separator, ":");
if(messageParams.length != 0) {
String topic = messageParams[0];
// owner and the deviceId are extracted from the MQTT topic to which the messgae was received.
String ownerAndId = topic.replace(serverName + File.separator, "");
ownerAndId = ownerAndId.replace(File.separator + ArduinoConstants.DEVICE_TYPE + File.separator, ":");
String owner = ownerAndId.split(":")[0];
String deviceId = ownerAndId.split(":")[1];
String owner = ownerAndId.split(":")[0];
String deviceId = ownerAndId.split(":")[1];
if (log.isDebugEnabled()) {
log.debug("Received MQTT message for: [OWNER-" + owner + "] & [DEVICE.ID-" + deviceId + "]");
}
if (log.isDebugEnabled()) {
log.debug("Received MQTT message for: [OWNER-" + owner + "] & [DEVICE.ID-" + deviceId + "]");
}
int lastIndex = message.toString().lastIndexOf(":");
String msgContext = message.toString().substring(lastIndex + 1);
int lastIndex = message.toString().lastIndexOf(":");
String msgContext = message.toString().substring(lastIndex + 1);
LinkedList<String> deviceControlList;
LinkedList<String> replyMessageList;
LinkedList<String> deviceControlList;
LinkedList<String> replyMessageList;
if (msgContext.equals(MESSAGE_TO_SEND) || msgContext.equals(ArduinoConstants.STATE_ON) || msgContext.equals(
ArduinoConstants.STATE_OFF)) {
if (msgContext.equals(MESSAGE_TO_SEND) || msgContext.equals(ArduinoConstants.STATE_ON) || msgContext.equals(
ArduinoConstants.STATE_OFF)) {
if (log.isDebugEnabled()) {
log.debug("Received a control message: ");
log.debug("Control message topic: " + topic);
log.debug("Control message: " + message.toString());
}
if (log.isDebugEnabled()) {
log.debug("Received a control message: ");
log.debug("Control message topic: " + topic);
log.debug("Control message: " + message.toString());
}
synchronized (ArduinoControllerService.getInternalControlsQueue()) {
deviceControlList = ArduinoControllerService.getInternalControlsQueue().get(deviceId);
if (deviceControlList == null) {
ArduinoControllerService.getInternalControlsQueue()
.put(deviceId, deviceControlList = new LinkedList<String>());
synchronized (ArduinoControllerService.getInternalControlsQueue()) {
deviceControlList = ArduinoControllerService.getInternalControlsQueue().get(deviceId);
if (deviceControlList == null) {
ArduinoControllerService.getInternalControlsQueue()
.put(deviceId, deviceControlList = new LinkedList<String>());
}
}
}
deviceControlList.add(message.toString());
deviceControlList.add(message.toString());
} else if (msgContext.equals(MESSAGE_RECEIVED)) {
} else if (msgContext.equals(MESSAGE_RECEIVED)) {
if (log.isDebugEnabled()) {
log.debug("Received reply from a device: ");
log.debug("Reply message topic: " + topic);
log.debug("Reply message: " + message.toString().substring(0, lastIndex));
}
if (log.isDebugEnabled()) {
log.debug("Received reply from a device: ");
log.debug("Reply message topic: " + topic);
log.debug("Reply message: " + message.toString().substring(0, lastIndex));
}
synchronized (ArduinoControllerService.getReplyMsgQueue()) {
replyMessageList = ArduinoControllerService.getReplyMsgQueue().get(deviceId);
if (replyMessageList == null) {
ArduinoControllerService.getReplyMsgQueue()
.put(deviceId, replyMessageList = new LinkedList<>());
synchronized (ArduinoControllerService.getReplyMsgQueue()) {
replyMessageList = ArduinoControllerService.getReplyMsgQueue().get(deviceId);
if (replyMessageList == null) {
ArduinoControllerService.getReplyMsgQueue()
.put(deviceId, replyMessageList = new LinkedList<>());
}
}
replyMessageList.add(message.toString());
}
replyMessageList.add(message.toString());
}
}

@ -11,7 +11,7 @@
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
@ -24,11 +24,12 @@ import org.wso2.carbon.apimgt.annotations.api.API;
import org.wso2.carbon.apimgt.annotations.device.DeviceType;
import org.wso2.carbon.apimgt.annotations.device.feature.Feature;
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
import org.wso2.carbon.device.mgt.iot.DeviceManagement;
import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig;
import org.wso2.carbon.device.mgt.iot.digitaldisplay.controller.api.exception.DigitalDisplayException;
import org.wso2.carbon.device.mgt.iot.digitaldisplay.controller.api.util.DigitalDisplayMQTTConnector;
import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException;
import org.wso2.carbon.device.mgt.iot.digitaldisplay.plugin.constants.DigitalDisplayConstants;
import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.FormParam;
@ -48,19 +49,41 @@ public class DigitalDisplayControllerService {
private static DigitalDisplayMQTTConnector digitalDisplayMQTTConnector;
private boolean waitForServerStartup() {
while (!DeviceManagement.isServerReady()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
return true;
}
}
return false;
}
public DigitalDisplayMQTTConnector getDigitalDisplayMQTTConnector() {
return DigitalDisplayControllerService.digitalDisplayMQTTConnector;
}
public void setDigitalDisplayMQTTConnector(
public void setDigitalDisplayMQTTConnector(final
DigitalDisplayMQTTConnector digitalDisplayMQTTConnector) {
DigitalDisplayControllerService.digitalDisplayMQTTConnector = digitalDisplayMQTTConnector;
if (MqttConfig.getInstance().isEnabled()) {
digitalDisplayMQTTConnector.connect();
} else {
log.warn("MQTT disabled in 'devicemgt-config.xml'. " +
"Hence, DigitalDisplayMQTTConnector not started.");
}
Runnable connector = new Runnable() {
public void run() {
if (waitForServerStartup()) {
return;
}
DigitalDisplayControllerService.digitalDisplayMQTTConnector = digitalDisplayMQTTConnector;
if (MqttConfig.getInstance().isEnabled()) {
digitalDisplayMQTTConnector.connect();
} else {
log.warn("MQTT disabled in 'devicemgt-config.xml'. " +
"Hence, DigitalDisplayMQTTConnector not started.");
}
}
};
Thread connectorThread = new Thread(connector);
connectorThread.setDaemon(true);
connectorThread.start();
}
/**

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
@ -11,7 +11,7 @@
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
@ -19,12 +19,17 @@ package org.wso2.carbon.device.mgt.iot.droneanalyzer.controller.api.impl;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.device.mgt.iot.DeviceManagement;
import org.wso2.carbon.device.mgt.iot.controlqueue.xmpp.XmppConfig;
import org.wso2.carbon.device.mgt.iot.droneanalyzer.plugin.constants.DroneConstants;
import org.wso2.carbon.device.mgt.iot.droneanalyzer.controller.api.impl.transport.DroneAnalyzerXMPPConnector;
import org.wso2.carbon.device.mgt.iot.droneanalyzer.controller.api.impl.trasformer.MessageTransformer;
import org.wso2.carbon.device.mgt.iot.droneanalyzer.plugin.constants.DroneConstants;
import javax.websocket.*;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
@ -36,14 +41,35 @@ public class DroneRealTimeService {
private DroneAnalyzerXMPPConnector xmppConnector;
public DroneRealTimeService() {
messageController = new MessageTransformer();
xmppConnector = new DroneAnalyzerXMPPConnector(messageController);
Runnable connector = new Runnable() {
public void run() {
if (waitForServerStartup()) {
return;
}
messageController = new MessageTransformer();
xmppConnector = new DroneAnalyzerXMPPConnector(messageController);
if (XmppConfig.getInstance().isEnabled()) {
xmppConnector.connect();
} else {
log.warn("XMPP disabled in 'devicemgt-config.xml'. Hence, DroneAnalyzerXMPPConnector not started.");
}
}
};
Thread connectorThread = new Thread(connector);
connectorThread.setDaemon(true);
connectorThread.start();
}
if (XmppConfig.getInstance().isEnabled()){
xmppConnector.connect();
} else {
log.warn("XMPP disabled in 'devicemgt-config.xml'. Hence, DroneAnalyzerXMPPConnector not started.");
private boolean waitForServerStartup() {
while (!DeviceManagement.isServerReady()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
return true;
}
}
return false;
}
@OnOpen

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
@ -11,7 +11,7 @@
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
@ -25,19 +25,27 @@ import org.wso2.carbon.apimgt.annotations.device.DeviceType;
import org.wso2.carbon.apimgt.annotations.device.feature.Feature;
import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
import org.wso2.carbon.device.mgt.iot.DeviceManagement;
import org.wso2.carbon.device.mgt.iot.DeviceValidator;
import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig;
import org.wso2.carbon.device.mgt.iot.exception.DeviceControllerException;
import org.wso2.carbon.device.mgt.iot.raspberrypi.controller.service.impl.transport.RaspberryPiMQTTConnector;
import org.wso2.carbon.device.mgt.iot.raspberrypi.plugin.constants.RaspberrypiConstants;
import org.wso2.carbon.device.mgt.iot.raspberrypi.controller.service.impl.dto.DeviceJSON;
import org.wso2.carbon.device.mgt.iot.raspberrypi.controller.service.impl.transport.RaspberryPiMQTTConnector;
import org.wso2.carbon.device.mgt.iot.raspberrypi.controller.service.impl.util.RaspberrypiServiceUtils;
import org.wso2.carbon.device.mgt.iot.raspberrypi.plugin.constants.RaspberrypiConstants;
import org.wso2.carbon.device.mgt.iot.sensormgt.SensorDataManager;
import org.wso2.carbon.device.mgt.iot.sensormgt.SensorRecord;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.*;
import javax.ws.rs.Consumes;
import javax.ws.rs.FormParam;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@ -48,30 +56,25 @@ import java.util.concurrent.ConcurrentHashMap;
@DeviceType(value = "raspberrypi")
public class RaspberryPiControllerService {
private static Log log = LogFactory.getLog(RaspberryPiControllerService.class);
public static final String HTTP_PROTOCOL = "HTTP";
public static final String MQTT_PROTOCOL = "MQTT";
//TODO; replace this tenant domain
private static final String SUPER_TENANT = "carbon.super";
private static Log log = LogFactory.getLog(RaspberryPiControllerService.class);
@Context //injected response proxy supporting multiple thread
private HttpServletResponse response;
public static final String HTTP_PROTOCOL = "HTTP";
public static final String MQTT_PROTOCOL = "MQTT";
private ConcurrentHashMap<String, String> deviceToIpMap = new ConcurrentHashMap<>();
private RaspberryPiMQTTConnector raspberryPiMQTTConnector;
/**
* @param raspberryPiMQTTConnector
*/
public void setRaspberryPiMQTTConnector(final RaspberryPiMQTTConnector raspberryPiMQTTConnector) {
this.raspberryPiMQTTConnector = raspberryPiMQTTConnector;
if (MqttConfig.getInstance().isEnabled()) {
raspberryPiMQTTConnector.connect();
} else {
log.warn("MQTT disabled in 'devicemgt-config.xml'. Hence, RaspberryPiMQTTConnector not started.");
private boolean waitForServerStartup() {
while (!DeviceManagement.isServerReady()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
return true;
}
}
return false;
}
/**
@ -81,6 +84,29 @@ public class RaspberryPiControllerService {
return raspberryPiMQTTConnector;
}
/**
* @param raspberryPiMQTTConnector
*/
public void setRaspberryPiMQTTConnector(
final RaspberryPiMQTTConnector raspberryPiMQTTConnector) {
Runnable connector = new Runnable() {
public void run() {
if (waitForServerStartup()) {
return;
}
RaspberryPiControllerService.this.raspberryPiMQTTConnector = raspberryPiMQTTConnector;
if (MqttConfig.getInstance().isEnabled()) {
raspberryPiMQTTConnector.connect();
} else {
log.warn("MQTT disabled in 'devicemgt-config.xml'. Hence, RaspberryPiMQTTConnector not started.");
}
}
};
Thread connectorThread = new Thread(connector);
connectorThread.setDaemon(true);
connectorThread.start();
}
/* ---------------------------------------------------------------------------------------
Device specific APIs - Control APIs + Data-Publishing APIs

@ -86,37 +86,39 @@ public class RaspberryPiMQTTConnector extends MQTTTransportHandler {
@Override
public void processIncomingMessage(MqttMessage message, String... messageParams) throws TransportHandlerException {
String topic = messageParams[0];
// owner and the deviceId are extracted from the MQTT topic to which the messgae was received.
String ownerAndId = topic.replace(serverName + File.separator, "");
ownerAndId = ownerAndId.replace(File.separator + RaspberrypiConstants.DEVICE_TYPE + File.separator, ":");
ownerAndId = ownerAndId.replace(File.separator + "publisher", "");
String owner = ownerAndId.split(":")[0];
String deviceId = ownerAndId.split(":")[1];
String receivedMessage = message.toString();
if (log.isDebugEnabled()) {
log.debug("Received MQTT message for: [OWNER-" + owner + "] & [DEVICE.ID-" + deviceId + "]");
log.debug("Message [" + receivedMessage + "] topic: [" + topic + "]");
}
if (receivedMessage.contains("PUBLISHER")) {
float temperature = Float.parseFloat(receivedMessage.split(":")[2]);
if(messageParams.length != 0) {
String topic = messageParams[0];
// owner and the deviceId are extracted from the MQTT topic to which the messgae was received.
String ownerAndId = topic.replace(serverName + File.separator, "");
ownerAndId = ownerAndId.replace(File.separator + RaspberrypiConstants.DEVICE_TYPE + File.separator, ":");
ownerAndId = ownerAndId.replace(File.separator + "publisher", "");
if (!RaspberrypiServiceUtils.publishToDAS(owner, deviceId, temperature)) {
log.error("MQTT Subscriber: Publishing data to DAS failed.");
}
String owner = ownerAndId.split(":")[0];
String deviceId = ownerAndId.split(":")[1];
String receivedMessage = message.toString();
if (log.isDebugEnabled()) {
log.debug("MQTT Subscriber: Published data to DAS successfully.");
log.debug("Received MQTT message for: [OWNER-" + owner + "] & [DEVICE.ID-" + deviceId + "]");
log.debug("Message [" + receivedMessage + "] topic: [" + topic + "]");
}
} else if (receivedMessage.contains("TEMPERATURE")) {
String temperatureValue = receivedMessage.split(":")[1];
SensorDataManager.getInstance().setSensorRecord(deviceId, RaspberrypiConstants.SENSOR_TEMPERATURE,
temperatureValue,
Calendar.getInstance().getTimeInMillis());
if (receivedMessage.contains("PUBLISHER")) {
float temperature = Float.parseFloat(receivedMessage.split(":")[2]);
if (!RaspberrypiServiceUtils.publishToDAS(owner, deviceId, temperature)) {
log.error("MQTT Subscriber: Publishing data to DAS failed.");
}
if (log.isDebugEnabled()) {
log.debug("MQTT Subscriber: Published data to DAS successfully.");
}
} else if (receivedMessage.contains("TEMPERATURE")) {
String temperatureValue = receivedMessage.split(":")[1];
SensorDataManager.getInstance().setSensorRecord(deviceId, RaspberrypiConstants.SENSOR_TEMPERATURE,
temperatureValue,
Calendar.getInstance().getTimeInMillis());
}
}
}

@ -11,7 +11,7 @@
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
@ -28,6 +28,7 @@ import org.wso2.carbon.certificate.mgt.core.exception.KeystoreException;
import org.wso2.carbon.certificate.mgt.core.service.CertificateManagementService;
import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
import org.wso2.carbon.device.mgt.iot.DeviceManagement;
import org.wso2.carbon.device.mgt.iot.DeviceValidator;
import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig;
import org.wso2.carbon.device.mgt.iot.controlqueue.xmpp.XmppConfig;
@ -35,7 +36,6 @@ import org.wso2.carbon.device.mgt.iot.exception.DeviceControllerException;
import org.wso2.carbon.device.mgt.iot.sensormgt.SensorDataManager;
import org.wso2.carbon.device.mgt.iot.sensormgt.SensorRecord;
import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.plugin.constants.VirtualFireAlarmConstants;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.controller.service.impl.dto.DeviceJSON;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.controller.service.impl.exception.VirtualFireAlarmException;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.controller.service.impl.transport.VirtualFireAlarmMQTTConnector;
@ -44,6 +44,7 @@ import org.wso2.carbon.device.mgt.iot.virtualfirealarm.controller.service.impl.u
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.controller.service.impl.util.VirtualFireAlarmServiceUtils;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.controller.service.impl.util.scep.ContentType;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.controller.service.impl.util.scep.SCEPOperation;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.plugin.constants.VirtualFireAlarmConstants;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -74,17 +75,14 @@ import java.util.concurrent.ConcurrentHashMap;
@DeviceType(value = "virtual_firealarm")
@SuppressWarnings("Non-Annoted WebService")
public class VirtualFireAlarmControllerService {
private static Log log = LogFactory.getLog(VirtualFireAlarmControllerService.class);
//TODO; replace this tenant domain
private static final String SUPER_TENANT = "carbon.super";
@Context //injected response proxy supporting multiple thread
private HttpServletResponse response;
private static final String XMPP_PROTOCOL = "XMPP";
private static final String HTTP_PROTOCOL = "HTTP";
private static final String MQTT_PROTOCOL = "MQTT";
private static Log log = LogFactory.getLog(VirtualFireAlarmControllerService.class);
@Context //injected response proxy supporting multiple thread
private HttpServletResponse response;
// consists of utility methods related to encrypting and decrypting messages
private SecurityManager securityManager;
// connects to the given MQTT broker and handles MQTT communication
@ -94,6 +92,27 @@ public class VirtualFireAlarmControllerService {
// holds a mapping of the IP addresses to Device-IDs for HTTP communication
private ConcurrentHashMap<String, String> deviceToIpMap = new ConcurrentHashMap<>();
private boolean waitForServerStartup() {
while (!DeviceManagement.isServerReady()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
return true;
}
}
return false;
}
/**
* Fetches the `SecurityManager` specific to this VirtualFirealarm controller service.
*
* @return the 'SecurityManager' instance bound to the 'securityManager' variable of this service.
*/
@SuppressWarnings("Unused")
public SecurityManager getSecurityManager() {
return securityManager;
}
/**
* Sets the `securityManager` variable of this VirtualFirealarm controller service.
*
@ -107,79 +126,90 @@ public class VirtualFireAlarmControllerService {
}
/**
* Sets the `virtualFireAlarmXMPPConnector` variable of this VirtualFirealarm controller service.
* Fetches the `VirtualFireAlarmXMPPConnector` specific to this VirtualFirealarm controller service.
*
* @param virtualFireAlarmXMPPConnector a 'VirtualFireAlarmXMPPConnector' object that handles all XMPP related
* communications of any connected VirtualFirealarm device-type
* @return the 'VirtualFireAlarmXMPPConnector' instance bound to the 'virtualFireAlarmXMPPConnector' variable of
* this service.
*/
@SuppressWarnings("Unused")
public void setVirtualFireAlarmXMPPConnector(final VirtualFireAlarmXMPPConnector virtualFireAlarmXMPPConnector) {
this.virtualFireAlarmXMPPConnector = virtualFireAlarmXMPPConnector;
if (XmppConfig.getInstance().isEnabled()) {
Runnable xmppStarter = new Runnable() {
@Override
public void run() {
virtualFireAlarmXMPPConnector.initConnector();
virtualFireAlarmXMPPConnector.connect();
}
};
Thread xmppStarterThread = new Thread(xmppStarter);
xmppStarterThread.setDaemon(true);
xmppStarterThread.start();
} else {
log.warn("XMPP disabled in 'devicemgt-config.xml'. Hence, VirtualFireAlarmXMPPConnector not started.");
}
public VirtualFireAlarmXMPPConnector getVirtualFireAlarmXMPPConnector() {
return virtualFireAlarmXMPPConnector;
}
/**
* Sets the `virtualFireAlarmMQTTConnector` variable of this VirtualFirealarm controller service.
* Sets the `virtualFireAlarmXMPPConnector` variable of this VirtualFirealarm controller service.
*
* @param virtualFireAlarmMQTTConnector a 'VirtualFireAlarmMQTTConnector' object that handles all MQTT related
* @param virtualFireAlarmXMPPConnector a 'VirtualFireAlarmXMPPConnector' object that handles all XMPP related
* communications of any connected VirtualFirealarm device-type
*/
@SuppressWarnings("Unused")
public void setVirtualFireAlarmMQTTConnector(
final VirtualFireAlarmMQTTConnector virtualFireAlarmMQTTConnector) {
this.virtualFireAlarmMQTTConnector = virtualFireAlarmMQTTConnector;
if (MqttConfig.getInstance().isEnabled()) {
virtualFireAlarmMQTTConnector.connect();
} else {
log.warn("MQTT disabled in 'devicemgt-config.xml'. Hence, VirtualFireAlarmMQTTConnector not started.");
}
}
public void setVirtualFireAlarmXMPPConnector(
final VirtualFireAlarmXMPPConnector virtualFireAlarmXMPPConnector) {
Runnable connector = new Runnable() {
public void run() {
if (waitForServerStartup()) {
return;
}
VirtualFireAlarmControllerService.this.virtualFireAlarmXMPPConnector = virtualFireAlarmXMPPConnector;
if (XmppConfig.getInstance().isEnabled()) {
Runnable xmppStarter = new Runnable() {
@Override
public void run() {
virtualFireAlarmXMPPConnector.initConnector();
virtualFireAlarmXMPPConnector.connect();
}
};
/**
* Fetches the `SecurityManager` specific to this VirtualFirealarm controller service.
*
* @return the 'SecurityManager' instance bound to the 'securityManager' variable of this service.
*/
@SuppressWarnings("Unused")
public SecurityManager getSecurityManager() {
return securityManager;
Thread xmppStarterThread = new Thread(xmppStarter);
xmppStarterThread.setDaemon(true);
xmppStarterThread.start();
} else {
log.warn("XMPP disabled in 'devicemgt-config.xml'. Hence, VirtualFireAlarmXMPPConnector not started.");
}
}
};
Thread connectorThread = new Thread(connector);
connectorThread.setDaemon(true);
connectorThread.start();
}
/**
* Fetches the `VirtualFireAlarmXMPPConnector` specific to this VirtualFirealarm controller service.
* Fetches the `VirtualFireAlarmMQTTConnector` specific to this VirtualFirealarm controller service.
*
* @return the 'VirtualFireAlarmXMPPConnector' instance bound to the 'virtualFireAlarmXMPPConnector' variable of
* @return the 'VirtualFireAlarmMQTTConnector' instance bound to the 'virtualFireAlarmMQTTConnector' variable of
* this service.
*/
@SuppressWarnings("Unused")
public VirtualFireAlarmXMPPConnector getVirtualFireAlarmXMPPConnector() {
return virtualFireAlarmXMPPConnector;
public VirtualFireAlarmMQTTConnector getVirtualFireAlarmMQTTConnector() {
return virtualFireAlarmMQTTConnector;
}
/**
* Fetches the `VirtualFireAlarmMQTTConnector` specific to this VirtualFirealarm controller service.
* Sets the `virtualFireAlarmMQTTConnector` variable of this VirtualFirealarm controller service.
*
* @return the 'VirtualFireAlarmMQTTConnector' instance bound to the 'virtualFireAlarmMQTTConnector' variable of
* this service.
* @param virtualFireAlarmMQTTConnector a 'VirtualFireAlarmMQTTConnector' object that handles all MQTT related
* communications of any connected VirtualFirealarm device-type
*/
@SuppressWarnings("Unused")
public VirtualFireAlarmMQTTConnector getVirtualFireAlarmMQTTConnector() {
return virtualFireAlarmMQTTConnector;
public void setVirtualFireAlarmMQTTConnector(
final VirtualFireAlarmMQTTConnector virtualFireAlarmMQTTConnector) {
Runnable connector = new Runnable() {
public void run() {
if (waitForServerStartup()) {
return;
}
VirtualFireAlarmControllerService.this.virtualFireAlarmMQTTConnector = virtualFireAlarmMQTTConnector;
if (MqttConfig.getInstance().isEnabled()) {
virtualFireAlarmMQTTConnector.connect();
} else {
log.warn("MQTT disabled in 'devicemgt-config.xml'. Hence, VirtualFireAlarmMQTTConnector not started.");
}
}
};
Thread connectorThread = new Thread(connector);
connectorThread.setDaemon(true);
connectorThread.start();
}
/**

@ -122,54 +122,56 @@ public class VirtualFireAlarmMQTTConnector extends MQTTTransportHandler {
*/
@Override
public void processIncomingMessage(MqttMessage mqttMessage, String... messageParams) {
String topic = messageParams[0];
// owner and the deviceId are extracted from the MQTT topic to which the messgae was received.
String ownerAndId = topic.replace(serverName + File.separator, "");
ownerAndId = ownerAndId.replace(File.separator + VirtualFireAlarmConstants.DEVICE_TYPE + File.separator, ":");
ownerAndId = ownerAndId.replace(File.separator + "publisher", "");
if(messageParams.length != 0) {
String topic = messageParams[0];
// owner and the deviceId are extracted from the MQTT topic to which the messgae was received.
String ownerAndId = topic.replace(serverName + File.separator, "");
ownerAndId = ownerAndId.replace(File.separator + VirtualFireAlarmConstants.DEVICE_TYPE + File.separator, ":");
ownerAndId = ownerAndId.replace(File.separator + "publisher", "");
String owner = ownerAndId.split(":")[0];
String deviceId = ownerAndId.split(":")[1];
String owner = ownerAndId.split(":")[0];
String deviceId = ownerAndId.split(":")[1];
if (log.isDebugEnabled()) {
log.debug("Received MQTT message for: [OWNER-" + owner + "] & [DEVICE.ID-" + deviceId + "]");
}
String actualMessage;
try {
// the hash-code of the deviceId is used as the alias for device certificates during SCEP enrollment.
// hence, the same is used here to fetch the device-specific-certificate from the key store.
PublicKey clientPublicKey = VirtualFireAlarmServiceUtils.getDevicePublicKey(deviceId);
PrivateKey serverPrivateKey = SecurityManager.getServerPrivateKey();
// the MQTT-messages from VirtualFireAlarm devices are in the form {"Msg":<MESSAGE>, "Sig":<SIGNATURE>}
actualMessage = VirtualFireAlarmServiceUtils.extractMessageFromPayload(mqttMessage.toString(),
serverPrivateKey, clientPublicKey);
if (log.isDebugEnabled()) {
log.debug("MQTT: Received Message [" + actualMessage + "] topic: [" + topic + "]");
log.debug("Received MQTT message for: [OWNER-" + owner + "] & [DEVICE.ID-" + deviceId + "]");
}
if (actualMessage.contains("PUBLISHER")) {
float temperature = Float.parseFloat(actualMessage.split(":")[2]);
if (!VirtualFireAlarmServiceUtils.publishToDAS(owner, deviceId, temperature)) {
log.error("MQTT Subscriber: Publishing data to DAS failed.");
}
String actualMessage;
try {
// the hash-code of the deviceId is used as the alias for device certificates during SCEP enrollment.
// hence, the same is used here to fetch the device-specific-certificate from the key store.
PublicKey clientPublicKey = VirtualFireAlarmServiceUtils.getDevicePublicKey(deviceId);
PrivateKey serverPrivateKey = SecurityManager.getServerPrivateKey();
// the MQTT-messages from VirtualFireAlarm devices are in the form {"Msg":<MESSAGE>, "Sig":<SIGNATURE>}
actualMessage = VirtualFireAlarmServiceUtils.extractMessageFromPayload(mqttMessage.toString(),
serverPrivateKey, clientPublicKey);
if (log.isDebugEnabled()) {
log.debug("MQTT Subscriber: Published data to DAS successfully.");
log.debug("MQTT: Received Message [" + actualMessage + "] topic: [" + topic + "]");
}
} else if (actualMessage.contains("TEMPERATURE")) {
String temperatureValue = actualMessage.split(":")[1];
SensorDataManager.getInstance().setSensorRecord(deviceId, VirtualFireAlarmConstants.SENSOR_TEMP,
temperatureValue,
Calendar.getInstance().getTimeInMillis());
if (actualMessage.contains("PUBLISHER")) {
float temperature = Float.parseFloat(actualMessage.split(":")[2]);
if (!VirtualFireAlarmServiceUtils.publishToDAS(owner, deviceId, temperature)) {
log.error("MQTT Subscriber: Publishing data to DAS failed.");
}
if (log.isDebugEnabled()) {
log.debug("MQTT Subscriber: Published data to DAS successfully.");
}
} else if (actualMessage.contains("TEMPERATURE")) {
String temperatureValue = actualMessage.split(":")[1];
SensorDataManager.getInstance().setSensorRecord(deviceId, VirtualFireAlarmConstants.SENSOR_TEMP,
temperatureValue,
Calendar.getInstance().getTimeInMillis());
}
} catch (VirtualFireAlarmException e) {
String errorMsg =
"CertificateManagementService failure oo Signature-Verification/Decryption was unsuccessful.";
log.error(errorMsg, e);
}
} catch (VirtualFireAlarmException e) {
String errorMsg =
"CertificateManagementService failure oo Signature-Verification/Decryption was unsuccessful.";
log.error(errorMsg, e);
}
}

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
@ -11,7 +11,7 @@
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
@ -30,8 +30,8 @@ import org.wso2.carbon.device.mgt.core.dao.DeviceManagementDAOFactory;
import org.wso2.carbon.device.mgt.core.dto.DeviceType;
import org.wso2.carbon.device.mgt.core.service.DeviceManagementProviderService;
import org.wso2.carbon.device.mgt.iot.util.DeviceTypes;
import org.wso2.carbon.device.mgt.iot.util.iotdevice.util.IotDeviceManagementUtil;
import org.wso2.carbon.device.mgt.iot.util.ZipArchive;
import org.wso2.carbon.device.mgt.iot.util.iotdevice.util.IotDeviceManagementUtil;
import java.io.IOException;
import java.util.ArrayList;
@ -41,8 +41,10 @@ import java.util.Map;
public class DeviceManagement {
private static Log log = LogFactory.getLog(DeviceManagement.class);
private PrivilegedCarbonContext ctx;
private static volatile boolean serverReady = false;
private PrivilegedCarbonContext ctx;
private String tenantDomain;
public DeviceManagement(String tenantDomain){
this.tenantDomain=tenantDomain;
PrivilegedCarbonContext.startTenantFlow();
@ -50,6 +52,13 @@ public class DeviceManagement {
ctx.setTenantDomain(tenantDomain, true);
}
public static boolean isServerReady() {
return serverReady;
}
public static void setServerReady(boolean serverReady) {
DeviceManagement.serverReady = serverReady;
}
public boolean isExist(String owner, DeviceIdentifier deviceIdentifier)
throws DeviceManagementException {

@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.ComponentContext;
import org.wso2.carbon.core.ServerStartupObserver;
import org.wso2.carbon.databridge.core.DataBridgeReceiverService;
import org.wso2.carbon.device.mgt.iot.UserManagement;
import org.wso2.carbon.device.mgt.iot.analytics.statistics.IoTEventsStatisticsClient;
@ -36,6 +37,7 @@ import org.wso2.carbon.device.mgt.iot.service.ConfigurationService;
import org.wso2.carbon.device.mgt.iot.service.ConfigurationServiceImpl;
import org.wso2.carbon.device.mgt.iot.service.DeviceTypeService;
import org.wso2.carbon.device.mgt.iot.service.DeviceTypeServiceImpl;
import org.wso2.carbon.device.mgt.iot.service.StartupListener;
import org.wso2.carbon.device.mgt.iot.util.iotdevice.dao.IotDeviceManagementDAOFactory;
import org.wso2.carbon.device.mgt.iot.util.iotdevice.dao.util.IotDeviceManagementDAOUtil;
import org.wso2.carbon.device.mgt.iot.util.iotdevice.exception.IotDeviceMgtPluginException;
@ -83,17 +85,15 @@ public class IotDeviceManagementServiceComponent {
log.debug("Activating Iot Device Management Service Component");
}
try {
BundleContext bundleContext = ctx.getBundleContext();
/* Initialize the data source configuration */
DeviceManagementConfigurationManager.getInstance().initConfig();
IotDeviceTypeConfigurationManager.getInstance().initConfig();
Map<String, IotDeviceTypeConfig> dsConfigMap =
IotDeviceTypeConfigurationManager.getInstance().getIotDeviceTypeConfigMap();
IotDeviceManagementDAOFactory.init(dsConfigMap);
bundleContext.registerService(ServerStartupObserver.class.getName(), new StartupListener(), null);
String setupOption = System.getProperty("setup");
if (setupOption != null) {
@ -123,7 +123,6 @@ public class IotDeviceManagementServiceComponent {
IoTEventsStatisticsClient.initializeDataSource();
UserManagement.registerApiAccessRoles();
bundleContext.registerService(DeviceTypeService.class.getName(),
new DeviceTypeServiceImpl(), null);

@ -0,0 +1,35 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.iot.service;
import org.wso2.carbon.core.ServerStartupObserver;
import org.wso2.carbon.device.mgt.iot.DeviceManagement;
public class StartupListener implements ServerStartupObserver {
@Override
public void completingServerStartup() {
}
@Override
public void completedServerStartup() {
DeviceManagement.setServerReady(true);
}
}
Loading…
Cancel
Save