Set MQTT and XMPP clients to connect after server start up

charithag 9 years ago
parent 4eece3dec9
commit 7d82f1ba32

@ -11,7 +11,7 @@
* Unless required by applicable law or agreed to in writing, * Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an * software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the * KIND, either express or implied. See the License for the
* specific language governing permissions and limitations * specific language governing permissions and limitations
* under the License. * under the License.
*/ */
@ -28,6 +28,7 @@ import org.wso2.carbon.certificate.mgt.core.exception.KeystoreException;
import org.wso2.carbon.certificate.mgt.core.service.CertificateManagementService; import org.wso2.carbon.certificate.mgt.core.service.CertificateManagementService;
import org.wso2.carbon.device.mgt.common.DeviceIdentifier; import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
import org.wso2.carbon.device.mgt.common.DeviceManagementException; import org.wso2.carbon.device.mgt.common.DeviceManagementException;
import org.wso2.carbon.device.mgt.iot.DeviceManagement;
import org.wso2.carbon.device.mgt.iot.DeviceValidator; import org.wso2.carbon.device.mgt.iot.DeviceValidator;
import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig; import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig;
import org.wso2.carbon.device.mgt.iot.controlqueue.xmpp.XmppConfig; import org.wso2.carbon.device.mgt.iot.controlqueue.xmpp.XmppConfig;
@ -74,17 +75,14 @@ import java.util.concurrent.ConcurrentHashMap;
@DeviceType(value = "virtual_firealarm") @DeviceType(value = "virtual_firealarm")
@SuppressWarnings("Non-Annoted WebService") @SuppressWarnings("Non-Annoted WebService")
public class VirtualFireAlarmControllerService { public class VirtualFireAlarmControllerService {
private static Log log = LogFactory.getLog(VirtualFireAlarmControllerService.class);
//TODO; replace this tenant domain //TODO; replace this tenant domain
private static final String SUPER_TENANT = "carbon.super"; private static final String SUPER_TENANT = "carbon.super";
@Context //injected response proxy supporting multiple thread
private HttpServletResponse response;
private static final String XMPP_PROTOCOL = "XMPP"; private static final String XMPP_PROTOCOL = "XMPP";
private static final String HTTP_PROTOCOL = "HTTP"; private static final String HTTP_PROTOCOL = "HTTP";
private static final String MQTT_PROTOCOL = "MQTT"; private static final String MQTT_PROTOCOL = "MQTT";
private static Log log = LogFactory.getLog(VirtualFireAlarmControllerService.class);
@Context //injected response proxy supporting multiple thread
private HttpServletResponse response;
// consists of utility methods related to encrypting and decrypting messages // consists of utility methods related to encrypting and decrypting messages
private SecurityManager securityManager; private SecurityManager securityManager;
// connects to the given MQTT broker and handles MQTT communication // connects to the given MQTT broker and handles MQTT communication
@ -94,6 +92,28 @@ public class VirtualFireAlarmControllerService {
// holds a mapping of the IP addresses to Device-IDs for HTTP communication // holds a mapping of the IP addresses to Device-IDs for HTTP communication
private ConcurrentHashMap<String, String> deviceToIpMap = new ConcurrentHashMap<>(); private ConcurrentHashMap<String, String> deviceToIpMap = new ConcurrentHashMap<>();
private boolean waitForServerStartup() {
while (!DeviceManagement.isServerReady()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
return true;
}
}
return false;
}
/**
* Fetches the `SecurityManager` specific to this VirtualFirealarm controller service.
*
* @return the 'SecurityManager' instance bound to the 'securityManager' variable of this service.
*/
@SuppressWarnings("Unused")
public SecurityManager getSecurityManager() {
return securityManager;
}
/** /**
* Sets the `securityManager` variable of this VirtualFirealarm controller service. * Sets the `securityManager` variable of this VirtualFirealarm controller service.
* *
@ -107,79 +127,95 @@ public class VirtualFireAlarmControllerService {
} }
/** /**
* Sets the `virtualFireAlarmXMPPConnector` variable of this VirtualFirealarm controller service. * Fetches the `VirtualFireAlarmXMPPConnector` specific to this VirtualFirealarm controller service.
* *
* @param virtualFireAlarmXMPPConnector a 'VirtualFireAlarmXMPPConnector' object that handles all XMPP related * @return the 'VirtualFireAlarmXMPPConnector' instance bound to the 'virtualFireAlarmXMPPConnector' variable of
* communications of any connected VirtualFirealarm device-type * this service.
*/ */
@SuppressWarnings("Unused") @SuppressWarnings("Unused")
public void setVirtualFireAlarmXMPPConnector(final VirtualFireAlarmXMPPConnector virtualFireAlarmXMPPConnector) { public VirtualFireAlarmXMPPConnector getVirtualFireAlarmXMPPConnector() {
this.virtualFireAlarmXMPPConnector = virtualFireAlarmXMPPConnector; return virtualFireAlarmXMPPConnector;
if (XmppConfig.getInstance().isEnabled()) {
Runnable xmppStarter = new Runnable() {
@Override
public void run() {
virtualFireAlarmXMPPConnector.initConnector();
virtualFireAlarmXMPPConnector.connect();
}
};
Thread xmppStarterThread = new Thread(xmppStarter);
xmppStarterThread.setDaemon(true);
xmppStarterThread.start();
} else {
log.warn("XMPP disabled in 'devicemgt-config.xml'. Hence, VirtualFireAlarmXMPPConnector not started.");
}
} }
/** /**
* Sets the `virtualFireAlarmMQTTConnector` variable of this VirtualFirealarm controller service. * Sets the `virtualFireAlarmXMPPConnector` variable of this VirtualFirealarm controller service.
* *
* @param virtualFireAlarmMQTTConnector a 'VirtualFireAlarmMQTTConnector' object that handles all MQTT related * @param virtualFireAlarmXMPPConnector a 'VirtualFireAlarmXMPPConnector' object that handles all XMPP related
* communications of any connected VirtualFirealarm device-type * communications of any connected VirtualFirealarm device-type
*/ */
@SuppressWarnings("Unused") @SuppressWarnings("Unused")
public void setVirtualFireAlarmMQTTConnector( public void setVirtualFireAlarmXMPPConnector(
final VirtualFireAlarmMQTTConnector virtualFireAlarmMQTTConnector) { final VirtualFireAlarmXMPPConnector virtualFireAlarmXMPPConnector) {
this.virtualFireAlarmMQTTConnector = virtualFireAlarmMQTTConnector; Runnable connector = new Runnable() {
if (MqttConfig.getInstance().isEnabled()) { public void run() {
virtualFireAlarmMQTTConnector.connect(); if (waitForServerStartup()) {
} else { return;
log.warn("MQTT disabled in 'devicemgt-config.xml'. Hence, VirtualFireAlarmMQTTConnector not started."); }
} VirtualFireAlarmControllerService.this.virtualFireAlarmXMPPConnector = virtualFireAlarmXMPPConnector;
}
if (XmppConfig.getInstance().isEnabled()) {
Runnable xmppStarter = new Runnable() {
@Override
public void run() {
virtualFireAlarmXMPPConnector.initConnector();
virtualFireAlarmXMPPConnector.connect();
}
};
/** Thread xmppStarterThread = new Thread(xmppStarter);
* Fetches the `SecurityManager` specific to this VirtualFirealarm controller service. xmppStarterThread.setDaemon(true);
* xmppStarterThread.start();
* @return the 'SecurityManager' instance bound to the 'securityManager' variable of this service. } else {
*/ log.warn("XMPP disabled in 'devicemgt-config.xml'. Hence, VirtualFireAlarmXMPPConnector not started.");
@SuppressWarnings("Unused") }
public SecurityManager getSecurityManager() { }
return securityManager; };
Thread connectorThread = new Thread(connector);
connectorThread.setDaemon(true);
connectorThread.start();
} }
/** /**
* Fetches the `VirtualFireAlarmXMPPConnector` specific to this VirtualFirealarm controller service. * Fetches the `VirtualFireAlarmMQTTConnector` specific to this VirtualFirealarm controller service.
* *
* @return the 'VirtualFireAlarmXMPPConnector' instance bound to the 'virtualFireAlarmXMPPConnector' variable of * @return the 'VirtualFireAlarmMQTTConnector' instance bound to the 'virtualFireAlarmMQTTConnector' variable of
* this service. * this service.
*/ */
@SuppressWarnings("Unused") @SuppressWarnings("Unused")
public VirtualFireAlarmXMPPConnector getVirtualFireAlarmXMPPConnector() { public VirtualFireAlarmMQTTConnector getVirtualFireAlarmMQTTConnector() {
return virtualFireAlarmXMPPConnector; return virtualFireAlarmMQTTConnector;
} }
/** /**
* Fetches the `VirtualFireAlarmMQTTConnector` specific to this VirtualFirealarm controller service. * Sets the `virtualFireAlarmMQTTConnector` variable of this VirtualFirealarm controller service.
* *
* @return the 'VirtualFireAlarmMQTTConnector' instance bound to the 'virtualFireAlarmMQTTConnector' variable of * @param virtualFireAlarmMQTTConnector a 'VirtualFireAlarmMQTTConnector' object that handles all MQTT related
* this service. * communications of any connected VirtualFirealarm device-type
*/ */
@SuppressWarnings("Unused") @SuppressWarnings("Unused")
public VirtualFireAlarmMQTTConnector getVirtualFireAlarmMQTTConnector() { public void setVirtualFireAlarmMQTTConnector(
return virtualFireAlarmMQTTConnector; final VirtualFireAlarmMQTTConnector virtualFireAlarmMQTTConnector) {
Runnable connector = new Runnable() {
public void run() {
while (!DeviceManagement.isServerReady()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
}
VirtualFireAlarmControllerService.this.virtualFireAlarmMQTTConnector = virtualFireAlarmMQTTConnector;
if (MqttConfig.getInstance().isEnabled()) {
virtualFireAlarmMQTTConnector.connect();
} else {
log.warn("MQTT disabled in 'devicemgt-config.xml'. Hence, VirtualFireAlarmMQTTConnector not started.");
}
}
};
Thread connectorThread = new Thread(connector);
connectorThread.setDaemon(true);
connectorThread.start();
} }
/** /**

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
* *
* WSO2 Inc. licenses this file to you under the Apache License, * WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except * Version 2.0 (the "License"); you may not use this file except
@ -11,7 +11,7 @@
* Unless required by applicable law or agreed to in writing, * Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an * software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the * KIND, either express or implied. See the License for the
* specific language governing permissions and limitations * specific language governing permissions and limitations
* under the License. * under the License.
*/ */
@ -41,8 +41,10 @@ import java.util.Map;
public class DeviceManagement { public class DeviceManagement {
private static Log log = LogFactory.getLog(DeviceManagement.class); private static Log log = LogFactory.getLog(DeviceManagement.class);
private PrivilegedCarbonContext ctx; private static boolean serverReady = false;
private PrivilegedCarbonContext ctx;
private String tenantDomain; private String tenantDomain;
public DeviceManagement(String tenantDomain){ public DeviceManagement(String tenantDomain){
this.tenantDomain=tenantDomain; this.tenantDomain=tenantDomain;
PrivilegedCarbonContext.startTenantFlow(); PrivilegedCarbonContext.startTenantFlow();
@ -50,6 +52,13 @@ public class DeviceManagement {
ctx.setTenantDomain(tenantDomain, true); ctx.setTenantDomain(tenantDomain, true);
} }
public static boolean isServerReady() {
return serverReady;
}
public static void setServerReady(boolean serverReady) {
DeviceManagement.serverReady = serverReady;
}
public boolean isExist(String owner, DeviceIdentifier deviceIdentifier) public boolean isExist(String owner, DeviceIdentifier deviceIdentifier)
throws DeviceManagementException { throws DeviceManagementException {

@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.osgi.framework.BundleContext; import org.osgi.framework.BundleContext;
import org.osgi.service.component.ComponentContext; import org.osgi.service.component.ComponentContext;
import org.wso2.carbon.core.ServerStartupObserver;
import org.wso2.carbon.databridge.core.DataBridgeReceiverService; import org.wso2.carbon.databridge.core.DataBridgeReceiverService;
import org.wso2.carbon.device.mgt.iot.UserManagement; import org.wso2.carbon.device.mgt.iot.UserManagement;
import org.wso2.carbon.device.mgt.iot.analytics.statistics.IoTEventsStatisticsClient; import org.wso2.carbon.device.mgt.iot.analytics.statistics.IoTEventsStatisticsClient;
@ -36,6 +37,7 @@ import org.wso2.carbon.device.mgt.iot.service.ConfigurationService;
import org.wso2.carbon.device.mgt.iot.service.ConfigurationServiceImpl; import org.wso2.carbon.device.mgt.iot.service.ConfigurationServiceImpl;
import org.wso2.carbon.device.mgt.iot.service.DeviceTypeService; import org.wso2.carbon.device.mgt.iot.service.DeviceTypeService;
import org.wso2.carbon.device.mgt.iot.service.DeviceTypeServiceImpl; import org.wso2.carbon.device.mgt.iot.service.DeviceTypeServiceImpl;
import org.wso2.carbon.device.mgt.iot.service.StartupListener;
import org.wso2.carbon.device.mgt.iot.util.iotdevice.dao.IotDeviceManagementDAOFactory; import org.wso2.carbon.device.mgt.iot.util.iotdevice.dao.IotDeviceManagementDAOFactory;
import org.wso2.carbon.device.mgt.iot.util.iotdevice.dao.util.IotDeviceManagementDAOUtil; import org.wso2.carbon.device.mgt.iot.util.iotdevice.dao.util.IotDeviceManagementDAOUtil;
import org.wso2.carbon.device.mgt.iot.util.iotdevice.exception.IotDeviceMgtPluginException; import org.wso2.carbon.device.mgt.iot.util.iotdevice.exception.IotDeviceMgtPluginException;
@ -83,17 +85,15 @@ public class IotDeviceManagementServiceComponent {
log.debug("Activating Iot Device Management Service Component"); log.debug("Activating Iot Device Management Service Component");
} }
try { try {
BundleContext bundleContext = ctx.getBundleContext(); BundleContext bundleContext = ctx.getBundleContext();
/* Initialize the data source configuration */ /* Initialize the data source configuration */
DeviceManagementConfigurationManager.getInstance().initConfig(); DeviceManagementConfigurationManager.getInstance().initConfig();
IotDeviceTypeConfigurationManager.getInstance().initConfig(); IotDeviceTypeConfigurationManager.getInstance().initConfig();
Map<String, IotDeviceTypeConfig> dsConfigMap = Map<String, IotDeviceTypeConfig> dsConfigMap =
IotDeviceTypeConfigurationManager.getInstance().getIotDeviceTypeConfigMap(); IotDeviceTypeConfigurationManager.getInstance().getIotDeviceTypeConfigMap();
IotDeviceManagementDAOFactory.init(dsConfigMap); IotDeviceManagementDAOFactory.init(dsConfigMap);
bundleContext.registerService(ServerStartupObserver.class.getName(), new StartupListener(), null);
String setupOption = System.getProperty("setup"); String setupOption = System.getProperty("setup");
if (setupOption != null) { if (setupOption != null) {
@ -123,7 +123,6 @@ public class IotDeviceManagementServiceComponent {
IoTEventsStatisticsClient.initializeDataSource(); IoTEventsStatisticsClient.initializeDataSource();
UserManagement.registerApiAccessRoles(); UserManagement.registerApiAccessRoles();
bundleContext.registerService(DeviceTypeService.class.getName(), bundleContext.registerService(DeviceTypeService.class.getName(),
new DeviceTypeServiceImpl(), null); new DeviceTypeServiceImpl(), null);

@ -0,0 +1,35 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.iot.service;
import org.wso2.carbon.core.ServerStartupObserver;
import org.wso2.carbon.device.mgt.iot.DeviceManagement;
public class StartupListener implements ServerStartupObserver {
@Override
public void completingServerStartup() {
}
@Override
public void completedServerStartup() {
DeviceManagement.setServerReady(true);
}
}
Loading…
Cancel
Save