Set MQTT and XMPP clients to connect after server start up

charithag 9 years ago
parent 7d82f1ba32
commit b8d768d29e

@ -11,7 +11,7 @@
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
@ -24,10 +24,11 @@ import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.apimgt.annotations.api.API;
import org.wso2.carbon.apimgt.annotations.device.DeviceType;
import org.wso2.carbon.apimgt.annotations.device.feature.Feature;
import org.wso2.carbon.device.mgt.iot.arduino.plugin.constants.ArduinoConstants;
import org.wso2.carbon.device.mgt.iot.DeviceManagement;
import org.wso2.carbon.device.mgt.iot.arduino.controller.service.impl.dto.DeviceJSON;
import org.wso2.carbon.device.mgt.iot.arduino.controller.service.impl.transport.ArduinoMQTTConnector;
import org.wso2.carbon.device.mgt.iot.arduino.controller.service.impl.util.ArduinoServiceUtils;
import org.wso2.carbon.device.mgt.iot.arduino.plugin.constants.ArduinoConstants;
import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig;
import org.wso2.carbon.device.mgt.iot.exception.DeviceControllerException;
import org.wso2.carbon.device.mgt.iot.sensormgt.SensorDataManager;
@ -35,7 +36,14 @@ import org.wso2.carbon.device.mgt.iot.sensormgt.SensorRecord;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.*;
import javax.ws.rs.Consumes;
import javax.ws.rs.FormParam;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@ -50,34 +58,41 @@ import java.util.concurrent.ConcurrentHashMap;
@DeviceType( value = "arduino")
public class ArduinoControllerService {
private static Log log = LogFactory.getLog(ArduinoControllerService.class);
public static final String HTTP_PROTOCOL = "HTTP";
public static final String MQTT_PROTOCOL = "MQTT";
//TODO; replace this tenant domain
private static final String SUPER_TENANT = "carbon.super";
private static Log log = LogFactory.getLog(ArduinoControllerService.class);
private static Map<String, LinkedList<String>> replyMsgQueue = new HashMap<>();
private static Map<String, LinkedList<String>> internalControlsQueue = new HashMap<>();
@Context //injected response proxy supporting multiple thread
private HttpServletResponse response;
public static final String HTTP_PROTOCOL = "HTTP";
public static final String MQTT_PROTOCOL = "MQTT";
private ArduinoMQTTConnector arduinoMQTTConnector;
private static Map<String, LinkedList<String>> replyMsgQueue = new HashMap<>();
private static Map<String, LinkedList<String>> internalControlsQueue = new HashMap<>();
private ConcurrentHashMap<String, String> deviceToIpMap = new ConcurrentHashMap<>();
/**
* @param arduinoMQTTConnector an object of type "ArduinoMQTTConnector" specific for this ArduinoControllerService
* @return the queue containing all the MQTT reply messages from all Arduinos communicating to this service
*/
@SuppressWarnings("unused")
public void setArduinoMQTTConnector(
final ArduinoMQTTConnector arduinoMQTTConnector) {
this.arduinoMQTTConnector = arduinoMQTTConnector;
if (MqttConfig.getInstance().isEnabled()) {
arduinoMQTTConnector.connect();
} else {
log.warn("MQTT disabled in 'devicemgt-config.xml'. Hence, ArduinoMQTTConnector not started.");
public static Map<String, LinkedList<String>> getReplyMsgQueue() {
return replyMsgQueue;
}
/**
* @return the queue containing all the MQTT controls received to be sent to any Arduinos connected to this server
*/
public static Map<String, LinkedList<String>> getInternalControlsQueue() {
return internalControlsQueue;
}
private boolean waitForServerStartup() {
while (!DeviceManagement.isServerReady()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
return true;
}
}
return false;
}
/**
@ -89,17 +104,27 @@ public class ArduinoControllerService {
}
/**
* @return the queue containing all the MQTT reply messages from all Arduinos communicating to this service
*/
public static Map<String, LinkedList<String>> getReplyMsgQueue() {
return replyMsgQueue;
}
/**
* @return the queue containing all the MQTT controls received to be sent to any Arduinos connected to this server
* @param arduinoMQTTConnector an object of type "ArduinoMQTTConnector" specific for this ArduinoControllerService
*/
public static Map<String, LinkedList<String>> getInternalControlsQueue() {
return internalControlsQueue;
@SuppressWarnings("unused")
public void setArduinoMQTTConnector(
final ArduinoMQTTConnector arduinoMQTTConnector) {
Runnable connector = new Runnable() {
public void run() {
if (waitForServerStartup()) {
return;
}
ArduinoControllerService.this.arduinoMQTTConnector = arduinoMQTTConnector;
if (MqttConfig.getInstance().isEnabled()) {
arduinoMQTTConnector.connect();
} else {
log.warn("MQTT disabled in 'devicemgt-config.xml'. Hence, ArduinoMQTTConnector not started.");
}
}
};
Thread connectorThread = new Thread(connector);
connectorThread.setDaemon(true);
connectorThread.start();
}
/* ---------------------------------------------------------------------------------------

@ -11,7 +11,7 @@
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
@ -24,11 +24,12 @@ import org.wso2.carbon.apimgt.annotations.api.API;
import org.wso2.carbon.apimgt.annotations.device.DeviceType;
import org.wso2.carbon.apimgt.annotations.device.feature.Feature;
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
import org.wso2.carbon.device.mgt.iot.DeviceManagement;
import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig;
import org.wso2.carbon.device.mgt.iot.digitaldisplay.controller.api.exception.DigitalDisplayException;
import org.wso2.carbon.device.mgt.iot.digitaldisplay.controller.api.util.DigitalDisplayMQTTConnector;
import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException;
import org.wso2.carbon.device.mgt.iot.digitaldisplay.plugin.constants.DigitalDisplayConstants;
import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.FormParam;
@ -48,19 +49,40 @@ public class DigitalDisplayControllerService {
private static DigitalDisplayMQTTConnector digitalDisplayMQTTConnector;
private boolean waitForServerStartup() {
while (!DeviceManagement.isServerReady()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
return true;
}
}
return false;
}
public DigitalDisplayMQTTConnector getDigitalDisplayMQTTConnector() {
return DigitalDisplayControllerService.digitalDisplayMQTTConnector;
}
public void setDigitalDisplayMQTTConnector(
public void setDigitalDisplayMQTTConnector(final
DigitalDisplayMQTTConnector digitalDisplayMQTTConnector) {
DigitalDisplayControllerService.digitalDisplayMQTTConnector = digitalDisplayMQTTConnector;
if (MqttConfig.getInstance().isEnabled()) {
digitalDisplayMQTTConnector.connect();
} else {
log.warn("MQTT disabled in 'devicemgt-config.xml'. " +
Runnable connector = new Runnable() {
public void run() {
if (waitForServerStartup()) {
return;
}
DigitalDisplayControllerService.digitalDisplayMQTTConnector = digitalDisplayMQTTConnector;
if (MqttConfig.getInstance().isEnabled()) {
digitalDisplayMQTTConnector.connect();
} else {
log.warn("MQTT disabled in 'devicemgt-config.xml'. " +
"Hence, DigitalDisplayMQTTConnector not started.");
}
}
}
};
Thread connectorThread = new Thread(connector);
connectorThread.setDaemon(true);
connectorThread.start();
}
/**

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
@ -11,7 +11,7 @@
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
@ -19,12 +19,17 @@ package org.wso2.carbon.device.mgt.iot.droneanalyzer.controller.api.impl;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.device.mgt.iot.DeviceManagement;
import org.wso2.carbon.device.mgt.iot.controlqueue.xmpp.XmppConfig;
import org.wso2.carbon.device.mgt.iot.droneanalyzer.plugin.constants.DroneConstants;
import org.wso2.carbon.device.mgt.iot.droneanalyzer.controller.api.impl.transport.DroneAnalyzerXMPPConnector;
import org.wso2.carbon.device.mgt.iot.droneanalyzer.controller.api.impl.trasformer.MessageTransformer;
import org.wso2.carbon.device.mgt.iot.droneanalyzer.plugin.constants.DroneConstants;
import javax.websocket.*;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
@ -36,14 +41,35 @@ public class DroneRealTimeService {
private DroneAnalyzerXMPPConnector xmppConnector;
public DroneRealTimeService() {
messageController = new MessageTransformer();
xmppConnector = new DroneAnalyzerXMPPConnector(messageController);
Runnable connector = new Runnable() {
public void run() {
if (waitForServerStartup()) {
return;
}
messageController = new MessageTransformer();
xmppConnector = new DroneAnalyzerXMPPConnector(messageController);
if (XmppConfig.getInstance().isEnabled()) {
xmppConnector.connect();
} else {
log.warn("XMPP disabled in 'devicemgt-config.xml'. Hence, DroneAnalyzerXMPPConnector not started.");
}
}
};
Thread connectorThread = new Thread(connector);
connectorThread.setDaemon(true);
connectorThread.start();
}
if (XmppConfig.getInstance().isEnabled()){
xmppConnector.connect();
} else {
log.warn("XMPP disabled in 'devicemgt-config.xml'. Hence, DroneAnalyzerXMPPConnector not started.");
private boolean waitForServerStartup() {
while (!DeviceManagement.isServerReady()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
return true;
}
}
return false;
}
@OnOpen

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
@ -11,7 +11,7 @@
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
@ -25,19 +25,27 @@ import org.wso2.carbon.apimgt.annotations.device.DeviceType;
import org.wso2.carbon.apimgt.annotations.device.feature.Feature;
import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
import org.wso2.carbon.device.mgt.iot.DeviceManagement;
import org.wso2.carbon.device.mgt.iot.DeviceValidator;
import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig;
import org.wso2.carbon.device.mgt.iot.exception.DeviceControllerException;
import org.wso2.carbon.device.mgt.iot.raspberrypi.controller.service.impl.transport.RaspberryPiMQTTConnector;
import org.wso2.carbon.device.mgt.iot.raspberrypi.plugin.constants.RaspberrypiConstants;
import org.wso2.carbon.device.mgt.iot.raspberrypi.controller.service.impl.dto.DeviceJSON;
import org.wso2.carbon.device.mgt.iot.raspberrypi.controller.service.impl.transport.RaspberryPiMQTTConnector;
import org.wso2.carbon.device.mgt.iot.raspberrypi.controller.service.impl.util.RaspberrypiServiceUtils;
import org.wso2.carbon.device.mgt.iot.raspberrypi.plugin.constants.RaspberrypiConstants;
import org.wso2.carbon.device.mgt.iot.sensormgt.SensorDataManager;
import org.wso2.carbon.device.mgt.iot.sensormgt.SensorRecord;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.*;
import javax.ws.rs.Consumes;
import javax.ws.rs.FormParam;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@ -48,30 +56,25 @@ import java.util.concurrent.ConcurrentHashMap;
@DeviceType(value = "raspberrypi")
public class RaspberryPiControllerService {
private static Log log = LogFactory.getLog(RaspberryPiControllerService.class);
public static final String HTTP_PROTOCOL = "HTTP";
public static final String MQTT_PROTOCOL = "MQTT";
//TODO; replace this tenant domain
private static final String SUPER_TENANT = "carbon.super";
private static Log log = LogFactory.getLog(RaspberryPiControllerService.class);
@Context //injected response proxy supporting multiple thread
private HttpServletResponse response;
public static final String HTTP_PROTOCOL = "HTTP";
public static final String MQTT_PROTOCOL = "MQTT";
private ConcurrentHashMap<String, String> deviceToIpMap = new ConcurrentHashMap<>();
private RaspberryPiMQTTConnector raspberryPiMQTTConnector;
/**
* @param raspberryPiMQTTConnector
*/
public void setRaspberryPiMQTTConnector(final RaspberryPiMQTTConnector raspberryPiMQTTConnector) {
this.raspberryPiMQTTConnector = raspberryPiMQTTConnector;
if (MqttConfig.getInstance().isEnabled()) {
raspberryPiMQTTConnector.connect();
} else {
log.warn("MQTT disabled in 'devicemgt-config.xml'. Hence, RaspberryPiMQTTConnector not started.");
private boolean waitForServerStartup() {
while (!DeviceManagement.isServerReady()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
return true;
}
}
return false;
}
/**
@ -81,6 +84,29 @@ public class RaspberryPiControllerService {
return raspberryPiMQTTConnector;
}
/**
* @param raspberryPiMQTTConnector
*/
public void setRaspberryPiMQTTConnector(
final RaspberryPiMQTTConnector raspberryPiMQTTConnector) {
Runnable connector = new Runnable() {
public void run() {
if (waitForServerStartup()) {
return;
}
RaspberryPiControllerService.this.raspberryPiMQTTConnector = raspberryPiMQTTConnector;
if (MqttConfig.getInstance().isEnabled()) {
raspberryPiMQTTConnector.connect();
} else {
log.warn("MQTT disabled in 'devicemgt-config.xml'. Hence, RaspberryPiMQTTConnector not started.");
}
}
};
Thread connectorThread = new Thread(connector);
connectorThread.setDaemon(true);
connectorThread.start();
}
/* ---------------------------------------------------------------------------------------
Device specific APIs - Control APIs + Data-Publishing APIs

@ -36,7 +36,6 @@ import org.wso2.carbon.device.mgt.iot.exception.DeviceControllerException;
import org.wso2.carbon.device.mgt.iot.sensormgt.SensorDataManager;
import org.wso2.carbon.device.mgt.iot.sensormgt.SensorRecord;
import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.plugin.constants.VirtualFireAlarmConstants;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.controller.service.impl.dto.DeviceJSON;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.controller.service.impl.exception.VirtualFireAlarmException;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.controller.service.impl.transport.VirtualFireAlarmMQTTConnector;
@ -45,6 +44,7 @@ import org.wso2.carbon.device.mgt.iot.virtualfirealarm.controller.service.impl.u
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.controller.service.impl.util.VirtualFireAlarmServiceUtils;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.controller.service.impl.util.scep.ContentType;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.controller.service.impl.util.scep.SCEPOperation;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.plugin.constants.VirtualFireAlarmConstants;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -97,7 +97,6 @@ public class VirtualFireAlarmControllerService {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
return true;
}
}
@ -197,13 +196,8 @@ public class VirtualFireAlarmControllerService {
final VirtualFireAlarmMQTTConnector virtualFireAlarmMQTTConnector) {
Runnable connector = new Runnable() {
public void run() {
while (!DeviceManagement.isServerReady()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
if (waitForServerStartup()) {
return;
}
VirtualFireAlarmControllerService.this.virtualFireAlarmMQTTConnector = virtualFireAlarmMQTTConnector;
if (MqttConfig.getInstance().isEnabled()) {

@ -30,8 +30,8 @@ import org.wso2.carbon.device.mgt.core.dao.DeviceManagementDAOFactory;
import org.wso2.carbon.device.mgt.core.dto.DeviceType;
import org.wso2.carbon.device.mgt.core.service.DeviceManagementProviderService;
import org.wso2.carbon.device.mgt.iot.util.DeviceTypes;
import org.wso2.carbon.device.mgt.iot.util.iotdevice.util.IotDeviceManagementUtil;
import org.wso2.carbon.device.mgt.iot.util.ZipArchive;
import org.wso2.carbon.device.mgt.iot.util.iotdevice.util.IotDeviceManagementUtil;
import java.io.IOException;
import java.util.ArrayList;
@ -41,7 +41,7 @@ import java.util.Map;
public class DeviceManagement {
private static Log log = LogFactory.getLog(DeviceManagement.class);
private static boolean serverReady = false;
private static volatile boolean serverReady = false;
private PrivilegedCarbonContext ctx;
private String tenantDomain;

Loading…
Cancel
Save