Implemeting dynamic recivers and streams deployment into DAS

merge-requests/1/head
Jasintha 8 years ago committed by amalhub
parent 273be79ca2
commit e6b38fbdb4

@ -280,6 +280,16 @@
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.event.receiver.stub</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.event.stream.stub</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.wso2.carbon.commons</groupId>

@ -18,6 +18,7 @@
*/
package org.wso2.carbon.device.mgt.jaxrs.service.impl.admin;
import org.apache.axis2.AxisFault;
import org.apache.axis2.client.Options;
import org.apache.axis2.java.security.SSLProtocolSocketFactory;
import org.apache.axis2.transport.http.HTTPConstants;
@ -27,6 +28,9 @@ import org.apache.commons.httpclient.protocol.Protocol;
import org.apache.commons.httpclient.protocol.ProtocolSocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.wso2.carbon.application.mgt.stub.upload.CarbonAppUploaderStub;
import org.wso2.carbon.application.mgt.stub.upload.types.carbon.UploadedFileItem;
import org.wso2.carbon.base.ServerConfiguration;
@ -35,11 +39,16 @@ import org.wso2.carbon.core.util.Utils;
import org.wso2.carbon.device.mgt.jaxrs.service.api.admin.DeviceTypePublisherAdminService;
import org.wso2.carbon.device.mgt.jaxrs.util.DeviceMgtAPIUtils;
import org.wso2.carbon.identity.jwt.client.extension.JWTClient;
import org.wso2.carbon.identity.jwt.client.extension.exception.JWTClientException;
import org.wso2.carbon.registry.core.Registry;
import org.wso2.carbon.registry.core.Resource;
import org.wso2.carbon.registry.core.ResourceImpl;
import org.wso2.carbon.registry.core.exceptions.RegistryException;
import org.wso2.carbon.user.api.UserStoreException;
import org.wso2.carbon.utils.CarbonUtils;
import org.wso2.carbon.event.receiver.stub.EventReceiverAdminServiceStub;
import org.wso2.carbon.event.stream.stub.EventStreamAdminServiceStub;
import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
import javax.activation.DataHandler;
import javax.net.ssl.KeyManagerFactory;
@ -51,6 +60,8 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.core.Response;
import java.io.*;
import java.nio.file.Files;
import java.rmi.RemoteException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
@ -89,11 +100,15 @@ public class DeviceTypePublisherAdminServiceImpl implements DeviceTypePublisherA
private static final String SSLV3 = "SSLv3";
private KeyStore keyStore;
private KeyStore trustStore;
private char[] keyStorePassword;
private SSLContext sslContext;
private String tenantDomain;
private static final Log log = LogFactory.getLog(DeviceTypePublisherAdminServiceImpl.class);
private static final String DEFAULT_RESOURCE_LOCATION = "/resources/devicetypes";
private static final String CAR_FILE_LOCATION = CarbonUtils.getCarbonHome() + File.separator + "repository" +
@ -105,10 +120,15 @@ public class DeviceTypePublisherAdminServiceImpl implements DeviceTypePublisherA
private static final String IOT_MGT_HOST_NAME = "${iot.manager.host}";
private static final String DAS_URL = DEFAULT_HTTP_PROTOCOL + "://" + DAS_HOST_NAME
+ ":" + DAS_PORT + "/services/CarbonAppUploader" + "/";
private static final String DAS_EVENT_RECEIVER_EP = DEFAULT_HTTP_PROTOCOL + "://" + DAS_HOST_NAME
+ ":" + DAS_PORT + "/services/EventReceiverAdminService" + "/";
private static final String IOT_MGT_URL = DEFAULT_HTTP_PROTOCOL + "://" + IOT_MGT_HOST_NAME
+ ":" + IOT_MGT_PORT + "/services/CarbonAppUploader" + "/";
private static final String MEDIA_TYPE_XML = "application/xml";
private static final String DEVICE_MANAGEMENT_TYPE = "device_management";
private static final String TENANT_DOMAIN_PROPERTY = "\\$\\{tenant-domain\\}";
@Override
@POST
@ -117,7 +137,7 @@ public class DeviceTypePublisherAdminServiceImpl implements DeviceTypePublisherA
try {
//Getting the tenant Domain
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
String username = PrivilegedCarbonContext.getThreadLocalCarbonContext().getUsername();
String tenantAdminUser = username + "@" + tenantDomain;
@ -187,6 +207,14 @@ public class DeviceTypePublisherAdminServiceImpl implements DeviceTypePublisherA
registry.put(DEFAULT_RESOURCE_LOCATION + type + ".exist", resource);
}
}
if(!tenantDomain.equals(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME)) {
publishDynamicEventReceivers(type, tenantDomain);
}
publishDynamicEventReceivers(type,MultitenantConstants.SUPER_TENANT_DOMAIN_NAME);
publishDynamicEventStream(type,MultitenantConstants.SUPER_TENANT_DOMAIN_NAME);
} else {
return Response.status(Response.Status.BAD_REQUEST)
.entity("\"Error, Artifact does not exist.\"").build();
@ -222,6 +250,179 @@ public class DeviceTypePublisherAdminServiceImpl implements DeviceTypePublisherA
}
}
private void publishDynamicEventReceivers(String deviceType, String tenantDomain){
PrivilegedCarbonContext.getThreadLocalCarbonContext().startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(tenantDomain, true);
try {
EventReceiverAdminServiceStub receiverAdminServiceStub = new EventReceiverAdminServiceStub(Utils.replaceSystemProperty(DAS_EVENT_RECEIVER_EP));
Options eventReciverOptions = receiverAdminServiceStub._getServiceClient().getOptions();
if (eventReciverOptions == null) {
eventReciverOptions = new Options();
}
String username=null;
if(!tenantDomain.equals(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME)) {
username = PrivilegedCarbonContext.getThreadLocalCarbonContext().getUserRealm()
.getRealmConfiguration().getAdminUserName()+"@"+tenantDomain;
}else {
username = PrivilegedCarbonContext.getThreadLocalCarbonContext().getUserRealm()
.getRealmConfiguration().getAdminUserName();
}
JWTClient jwtClient = DeviceMgtAPIUtils.getJWTClientManagerService().getJWTClient();
String authValue = AUTHORIZATION_HEADER_VALUE + " " + new String(Base64.encodeBase64(
jwtClient.getJwtToken(username).getBytes()));
List<Header> list = new ArrayList<Header>();
Header httpHeader = new Header();
httpHeader.setName(AUTHORIZATION_HEADER);
httpHeader.setValue(authValue);
list.add(httpHeader);//"https"
eventReciverOptions.setProperty(HTTPConstants.HTTP_HEADERS, list);
eventReciverOptions.setProperty(HTTPConstants.CUSTOM_PROTOCOL_HANDLER
, new Protocol(DEFAULT_HTTP_PROTOCOL
, (ProtocolSocketFactory) new SSLProtocolSocketFactory(sslContext)
, Integer.parseInt(Utils.replaceSystemProperty(DAS_PORT))));
receiverAdminServiceStub._getServiceClient().setOptions(eventReciverOptions);
List<String> receiversList = getReceiversList(deviceType);
for (String receiverContent:receiversList) {
receiverAdminServiceStub.deployEventReceiverConfiguration(receiverContent);
}
} catch (AxisFault e) {
log.error("publishing dynamic event receiver is failed due to " + e.getMessage(), e);
} catch (RemoteException e) {
log.error("publishing dynamic event receiver is failed due to " + e.getMessage(), e);
} catch (IOException e) {
log.error("publishing dynamic event receiver is failed due to " + e.getMessage(), e);
} catch (JWTClientException e) {
log.error("publishing dynamic event receiver is failed due to " + e.getMessage(), e);
} catch (UserStoreException e) {
log.error("publishing dynamic event receiver is failed due to " + e.getMessage(), e);
} finally {
PrivilegedCarbonContext.endTenantFlow();
}
}
private void publishDynamicEventStream(String deviceType, String tenantDomain){
PrivilegedCarbonContext.getThreadLocalCarbonContext().startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(tenantDomain, true);
try {
EventStreamAdminServiceStub eventStreamAdminServiceStub = new EventStreamAdminServiceStub(Utils.replaceSystemProperty(DAS_EVENT_RECEIVER_EP));
Options eventReciverOptions = eventStreamAdminServiceStub._getServiceClient().getOptions();
if (eventReciverOptions == null) {
eventReciverOptions = new Options();
}
String username;
if(!tenantDomain.equals(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME)) {
username = PrivilegedCarbonContext.getThreadLocalCarbonContext().getUserRealm()
.getRealmConfiguration().getAdminUserName()+"@"+tenantDomain;
}else {
username = PrivilegedCarbonContext.getThreadLocalCarbonContext().getUserRealm()
.getRealmConfiguration().getAdminUserName();
}
JWTClient jwtClient = DeviceMgtAPIUtils.getJWTClientManagerService().getJWTClient();
String authValue = AUTHORIZATION_HEADER_VALUE + " " + new String(Base64.encodeBase64(
jwtClient.getJwtToken(username).getBytes()));
List<Header> list = new ArrayList<Header>();
Header httpHeader = new Header();
httpHeader.setName(AUTHORIZATION_HEADER);
httpHeader.setValue(authValue);
list.add(httpHeader);//"https"
eventReciverOptions.setProperty(HTTPConstants.HTTP_HEADERS, list);
eventReciverOptions.setProperty(HTTPConstants.CUSTOM_PROTOCOL_HANDLER
, new Protocol(DEFAULT_HTTP_PROTOCOL
, (ProtocolSocketFactory) new SSLProtocolSocketFactory(sslContext)
, Integer.parseInt(Utils.replaceSystemProperty(DAS_PORT))));
eventStreamAdminServiceStub._getServiceClient().setOptions(eventReciverOptions);
List<String> streamList = getStreamsList(deviceType);
for (String streamContent:streamList) {
JSONParser jsonParser = new JSONParser();
JSONObject steamJson = (JSONObject)jsonParser.parse(streamContent);
String name = (String) steamJson.get("name");
String version = (String) steamJson.get("version");
String streamId = name +":"+version;
if(eventStreamAdminServiceStub.getStreamDefinitionAsString(streamId)==null) {
eventStreamAdminServiceStub.addEventStreamDefinitionAsString(streamContent);
}
}
} catch (AxisFault e) {
log.error("publishing dynamic event receiver is failed due to " + e.getMessage(), e);
} catch (RemoteException e) {
log.error("publishing dynamic event receiver is failed due to " + e.getMessage(), e);
} catch (IOException e) {
log.error("publishing dynamic event receiver is failed due to " + e.getMessage(), e);
} catch (JWTClientException e) {
log.error("publishing dynamic event receiver is failed due to " + e.getMessage(), e);
} catch (UserStoreException e) {
log.error("publishing dynamic event receiver is failed due to " + e.getMessage(), e);
} catch (ParseException e) {
log.error("publishing dynamic event receiver is failed due to " + e.getMessage(), e);
} finally {
PrivilegedCarbonContext.endTenantFlow();
}
}
private List<String> getReceiversList(String deviceType) throws IOException {
File directory = new File(CAR_FILE_LOCATION + File.separator + deviceType+File.separator+"receiver");
File[] receiverFiles = directory.listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.toLowerCase().endsWith(".xml");
}
});
List<String> receiverList = new ArrayList<>();
for (File receiverFile:receiverFiles) {
String receiverContent =new String(Files.readAllBytes(receiverFile.toPath()));
receiverContent.replaceAll(TENANT_DOMAIN_PROPERTY,tenantDomain.toLowerCase());
receiverList.add(receiverContent);
}
return receiverList;
}
private List<String> getStreamsList(String deviceType) throws IOException {
File directory = new File(CAR_FILE_LOCATION + File.separator + deviceType+File.separator+"streams");
File[] receiverFiles = directory.listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.toLowerCase().endsWith(".json");
}
});
List<String> streamList = new ArrayList<>();
for (File StreamFile:receiverFiles) {
String receiverContent =new String(Files.readAllBytes(StreamFile.toPath()));
receiverContent.replaceAll(TENANT_DOMAIN_PROPERTY,tenantDomain.toLowerCase());
streamList.add(receiverContent);
}
return streamList;
}
private UploadedFileItem[] loadCappFromFileSystem(String deviceType) throws IOException {
File directory = new File(CAR_FILE_LOCATION + File.separator + deviceType);

@ -61,6 +61,10 @@
<groupId>org.wso2.carbon.registry</groupId>
<artifactId>org.wso2.carbon.registry.indexing</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.event.receiver.stub</artifactId>
</dependency>
</dependencies>
<build>
@ -134,6 +138,9 @@
<bundleDef>
org.wso2.orbit.com.fasterxml.jackson.core:jackson-annotations:${jackson-annotations.version}
</bundleDef>
<bundleDef>
org.wso2.carbon.analytics-common:org.wso2.carbon.event.receiver.stub:${carbon.analytics.common.version}
</bundleDef>
<!-- Below should be bundled with the email verification -->
</bundles>
<importBundles>

@ -92,7 +92,7 @@
<adviceFile>
<properties>
<propertyDef>org.wso2.carbon.p2.category.type:server</propertyDef>
<propertyDef>org.eclipse.equinox.p2.type.group:false</propertyDef>
<propertyDef>org.eclipse.equinox.p2.type.group:true</propertyDef>
</properties>
</adviceFile>
<bundles>

@ -1513,6 +1513,16 @@
<artifactId>javassist</artifactId>
<version>${javassist.version}</version>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.event.receiver.stub</artifactId>
<version>${carbon.analytics.common.version}</version>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics-common</groupId>
<artifactId>org.wso2.carbon.event.stream.stub</artifactId>
<version>${carbon.analytics.common.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
@ -1821,6 +1831,7 @@
<carbon.analytics.common.version>5.1.3</carbon.analytics.common.version>
<carbon.analytics.common.version.range>[5.1.3,6.0.0)</carbon.analytics.common.version.range>
<!-- Carbon Registry -->
<carbon.registry.version>4.6.0</carbon.registry.version>
<carbon.registry.imp.pkg.version.range>[4.4.8, 5.0.0)</carbon.registry.imp.pkg.version.range>

Loading…
Cancel
Save