From 2a89a2fb2380d0c1c3c41843f5a094206195a6c6 Mon Sep 17 00:00:00 2001 From: ayyoob Date: Sat, 22 Apr 2017 18:31:13 +0530 Subject: [PATCH] resolved issues in event management service --- .../beans/analytics/DeviceTypeEvent.java | 24 +- .../api/DeviceEventManagementService.java | 7 +- .../DeviceEventManagementServiceImpl.java | 222 +++++++++++------- 3 files changed, 159 insertions(+), 94 deletions(-) diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/beans/analytics/DeviceTypeEvent.java b/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/beans/analytics/DeviceTypeEvent.java index 942c078529..7ff162b357 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/beans/analytics/DeviceTypeEvent.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/beans/analytics/DeviceTypeEvent.java @@ -18,6 +18,7 @@ */ package org.wso2.carbon.device.mgt.jaxrs.beans.analytics; +import com.fasterxml.jackson.annotation.JsonProperty; import io.swagger.annotations.ApiModelProperty; /** @@ -25,27 +26,28 @@ import io.swagger.annotations.ApiModelProperty; */ public class DeviceTypeEvent { - @ApiModelProperty(value = "Attributes related to device type event") - private EventAttributeList eventAttributeList; - @ApiModelProperty(value = "Transport to be used for device to server communication.") - private TransportType transportType; - + private EventAttributeList eventAttributes; + private TransportType transport; + @ApiModelProperty(value = "Attributes related to device type event") + @JsonProperty("eventAttributes") public EventAttributeList getEventAttributeList() { - return eventAttributeList; + return eventAttributes; } public void setEventAttributeList( - EventAttributeList eventAttributeList) { - this.eventAttributeList = eventAttributeList; + EventAttributeList eventAttributes) { + this.eventAttributes = eventAttributes; } + @ApiModelProperty(value = "Transport to be used for device to server communication.") + @JsonProperty("transport") public TransportType getTransportType() { - return transportType; + return transport; } - public void setTransportType(TransportType transportType) { - this.transportType = transportType; + public void setTransportType(TransportType transport) { + this.transport = transport; } } diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/service/api/DeviceEventManagementService.java b/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/service/api/DeviceEventManagementService.java index 803c91ffce..73a816b7a4 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/service/api/DeviceEventManagementService.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/service/api/DeviceEventManagementService.java @@ -17,6 +17,7 @@ import org.wso2.carbon.device.mgt.jaxrs.beans.DeviceTypeList; import org.wso2.carbon.device.mgt.jaxrs.beans.ErrorResponse; import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.DeviceTypeEvent; import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.EventAttributeList; +import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.EventRecords; import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.TransportType; import org.wso2.carbon.device.mgt.jaxrs.util.Constants; @@ -90,7 +91,6 @@ public interface DeviceEventManagementService { @ApiResponse( code = 200, message = "OK. \n Successfully added the event defintion.", - response = DeviceTypeList.class, responseHeaders = { @ResponseHeader( name = "Content-Type", @@ -144,7 +144,6 @@ public interface DeviceEventManagementService { @ApiResponse( code = 200, message = "OK. \n Successfully deleted the event definition.", - response = DeviceTypeList.class, responseHeaders = { @ResponseHeader( name = "Content-Type", @@ -196,7 +195,7 @@ public interface DeviceEventManagementService { @ApiResponse( code = 200, message = "OK. \n Successfully fetched the event definition.", - response = DeviceTypeList.class, + response = EventRecords.class, responseHeaders = { @ResponseHeader( name = "Content-Type", @@ -258,7 +257,7 @@ public interface DeviceEventManagementService { @ApiResponse( code = 200, message = "OK. \n Successfully fetched the event defintion.", - response = DeviceTypeList.class, + response = EventAttributeList.class, responseHeaders = { @ResponseHeader( name = "Content-Type", diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/service/impl/DeviceEventManagementServiceImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/service/impl/DeviceEventManagementServiceImpl.java index 69ff1e3e0d..14b1fd6776 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/service/impl/DeviceEventManagementServiceImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/service/impl/DeviceEventManagementServiceImpl.java @@ -37,7 +37,9 @@ import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.EventAttributeList; import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.TransportType; import org.wso2.carbon.device.mgt.jaxrs.service.api.DeviceEventManagementService; import org.wso2.carbon.device.mgt.jaxrs.util.DeviceMgtAPIUtils; +import org.wso2.carbon.event.publisher.stub.EventPublisherAdminServiceCallbackHandler; import org.wso2.carbon.event.publisher.stub.EventPublisherAdminServiceStub; +import org.wso2.carbon.event.receiver.stub.EventReceiverAdminServiceCallbackHandler; import org.wso2.carbon.event.receiver.stub.EventReceiverAdminServiceStub; import org.wso2.carbon.event.receiver.stub.types.BasicInputAdapterPropertyDto; import org.wso2.carbon.event.stream.stub.EventStreamAdminServiceStub; @@ -48,6 +50,7 @@ import org.wso2.carbon.identity.jwt.client.extension.exception.JWTClientExceptio import org.wso2.carbon.user.api.UserStoreException; import org.wso2.carbon.analytics.datasource.commons.Record; import org.wso2.carbon.analytics.datasource.commons.exception.AnalyticsException; + import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManagerFactory; @@ -88,7 +91,11 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe private static final String DAS_HOST_NAME = "${iot.analytics.host}"; private static final String DEFAULT_HTTP_PROTOCOL = "https"; private static final String DAS_ADMIN_SERVICE_EP = DEFAULT_HTTP_PROTOCOL + "://" + DAS_HOST_NAME - + ":" + DAS_PORT + "/services/EventReceiverAdminService" + "/"; + + ":" + DAS_PORT + "/services/"; + private static final String EVENT_RECIEVER_CONTEXT = "EventReceiverAdminService/"; + private static final String EVENT_PUBLISHER_CONTEXT = "EventPublisherAdminService/"; + private static final String EVENT_STREAM_CONTEXT = "EventStreamAdminService/"; + private static final String EVENT_PERSISTENCE_CONTEXT = "EventStreamPersistenceAdminService/"; private static final String AUTHORIZATION_HEADER = "Authorization"; private static final String AUTHORIZATION_HEADER_VALUE = "Bearer"; private static final String KEY_STORE_TYPE = "JKS"; @@ -104,6 +111,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe private static KeyStore trustStore; private static char[] keyStorePassword; private static SSLContext sslContext; + static { String keyStorePassword = ServerConfiguration.getInstance().getFirstProperty("Security.KeyStore.Password"); String trustStorePassword = ServerConfiguration.getInstance().getFirstProperty( @@ -119,28 +127,77 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe loadTrustStore(trustStoreLocation, trustStorePassword); //Create the SSL context with the loaded TrustStore/keystore. initSSLConnection(); - } catch (KeyStoreException|IOException|CertificateException|NoSuchAlgorithmException + } catch (KeyStoreException | IOException | CertificateException | NoSuchAlgorithmException | UnrecoverableKeyException | KeyManagementException e) { log.error("publishing dynamic event receiver is failed due to " + e.getMessage(), e); } } + @GET + @Path("/{type}") + @Override + public Response getDeviceTypeEventDefinition(@PathParam("type") String deviceType) { + String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(); + try { + if (deviceType == null || + !DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(deviceType)) { + String errorMessage = "Invalid device type"; + log.error(errorMessage); + return Response.status(Response.Status.BAD_REQUEST).build(); + } + String streamName = getStreamDefinition(deviceType, tenantDomain); + EventStreamAdminServiceStub eventStreamAdminServiceStub = getEventStreamAdminServiceStub(); + EventStreamDefinitionDto eventStreamDefinitionDto = eventStreamAdminServiceStub.getStreamDefinitionDto( + streamName + ":" + DEFAULT_STREAM_VERSION); + if (eventStreamDefinitionDto == null) { + eventStreamAdminServiceStub.cleanup(); + return Response.status(Response.Status.NO_CONTENT).build(); + } + EventStreamAttributeDto[] eventStreamAttributeDtos = eventStreamDefinitionDto.getPayloadData(); + EventAttributeList eventAttributeList = new EventAttributeList(); + List attributes = new ArrayList<>(); + for (EventStreamAttributeDto eventStreamAttributeDto : eventStreamAttributeDtos) { + attributes.add(new Attribute(eventStreamAttributeDto.getAttributeName() + , AttributeType.valueOf(eventStreamAttributeDto.getAttributeType().toUpperCase()))); + } + eventAttributeList.setList(attributes); + eventStreamAdminServiceStub.cleanup(); + return Response.ok().entity(eventAttributeList).build(); + } catch (AxisFault e) { + log.error("failed to retrieve event definitions for tenantDomain:" + tenantDomain, e); + return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); + } catch (RemoteException e) { + log.error("Failed to connect with the remote services:" + tenantDomain, e); + return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); + } catch (JWTClientException e) { + log.error("Failed to generate jwt token for tenantDomain:" + tenantDomain, e); + return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); + } catch (UserStoreException e) { + log.error("Failed to connect with the user store, tenantDomain: " + tenantDomain, e); + return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); + } catch (DeviceManagementException e) { + log.error("Failed to access device management service, tenantDomain: " + tenantDomain, e); + return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); + } + } + /** * Deploy Event Stream, Receiver, Publisher and Store Configuration. */ @POST @Path("/{type}") @Override - public Response deployDeviceTypeEventDefinition(@PathParam("type") String deviceType, @Valid DeviceTypeEvent deviceTypeEvent) { + public Response deployDeviceTypeEventDefinition(@PathParam("type") String deviceType, + @Valid DeviceTypeEvent deviceTypeEvent) { TransportType transportType = deviceTypeEvent.getTransportType(); EventAttributeList eventAttributes = deviceTypeEvent.getEventAttributeList(); String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(); boolean superTenantMode = false; try { if (eventAttributes == null || eventAttributes.getList() == null || eventAttributes.getList().size() == 0 || - deviceType == null || + deviceType == null || transportType == null || !DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(deviceType)) { - String errorMessage = "Invalid device type"; + String errorMessage = "Invalid Payload"; log.error(errorMessage); return Response.status(Response.Status.BAD_REQUEST).build(); } @@ -157,7 +214,8 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe MultitenantConstants.SUPER_TENANT_DOMAIN_NAME, true); if (!MultitenantConstants.SUPER_TENANT_DOMAIN_NAME.equals(tenantDomain)) { publishStreamDefinitons(streamName, DEFAULT_STREAM_VERSION, deviceType, eventAttributes); - publishEventReceivers(eventReceiverName, streamNameWithVersion, transportType, tenantDomain, deviceType); + publishEventReceivers(eventReceiverName, streamNameWithVersion, transportType, tenantDomain, + deviceType); } return Response.ok().build(); } catch (AxisFault e) { @@ -176,7 +234,8 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe log.error("Failed to access device management service, tenantDomain: " + tenantDomain, e); return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); } catch (EventStreamPersistenceAdminServiceEventStreamPersistenceAdminServiceExceptionException e) { - log.error("Failed to create event store for, tenantDomain: " + tenantDomain + " deviceType" + deviceType, e); + log.error("Failed to create event store for, tenantDomain: " + tenantDomain + " deviceType" + deviceType, + e); return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); } finally { if (superTenantMode) { @@ -190,6 +249,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe @Override public Response deleteDeviceTypeEventDefinitions(@PathParam("type") String deviceType) { String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(); + boolean superTenantMode = false; try { if (deviceType == null || !DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(deviceType)) { @@ -200,13 +260,47 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe String eventReceiverName = getReceiverName(deviceType, tenantDomain); String eventPublisherName = deviceType.trim().toLowerCase() + "_websocket_publisher"; String streamName = getStreamDefinition(deviceType, tenantDomain); + EventStreamAdminServiceStub eventStreamAdminServiceStub = getEventStreamAdminServiceStub(); + if (eventStreamAdminServiceStub.getStreamDefinitionDto(streamName + ":" + DEFAULT_STREAM_VERSION) == null) { + return Response.status(Response.Status.NO_CONTENT).build(); + } + EventReceiverAdminServiceCallbackHandler eventReceiverAdminServiceCallbackHandler = + new EventReceiverAdminServiceCallbackHandler() {}; + EventPublisherAdminServiceCallbackHandler eventPublisherAdminServiceCallbackHandler = + new EventPublisherAdminServiceCallbackHandler() {}; + + EventReceiverAdminServiceStub eventReceiverAdminServiceStub = getEventReceiverAdminServiceStub(); + eventReceiverAdminServiceStub.startundeployActiveEventReceiverConfiguration(eventReceiverName + , eventReceiverAdminServiceCallbackHandler); + + eventStreamAdminServiceStub.removeEventStreamDefinition(streamName, DEFAULT_STREAM_VERSION); - getEventStreamAdminServiceStub().removeEventStreamDefinition(streamName, DEFAULT_STREAM_VERSION); - getEventReceiverAdminServiceStub().undeployActiveEventReceiverConfiguration(eventReceiverName); - getEventPublisherAdminServiceStub().undeployActiveEventPublisherConfiguration(eventPublisherName); + EventPublisherAdminServiceStub eventPublisherAdminServiceStub = getEventPublisherAdminServiceStub(); + eventPublisherAdminServiceStub.startundeployActiveEventPublisherConfiguration(eventPublisherName + , eventPublisherAdminServiceCallbackHandler); + + eventStreamAdminServiceStub.cleanup(); + eventPublisherAdminServiceStub.cleanup(); + eventReceiverAdminServiceStub.cleanup(); + + superTenantMode = true; + PrivilegedCarbonContext.startTenantFlow(); + PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain( + MultitenantConstants.SUPER_TENANT_DOMAIN_NAME, true); + if (!MultitenantConstants.SUPER_TENANT_DOMAIN_NAME.equals(tenantDomain)) { + eventReceiverAdminServiceStub = getEventReceiverAdminServiceStub(); + eventStreamAdminServiceStub = getEventStreamAdminServiceStub(); + + eventReceiverAdminServiceStub.startundeployActiveEventReceiverConfiguration(eventReceiverName + , eventReceiverAdminServiceCallbackHandler); + eventStreamAdminServiceStub.removeEventStreamDefinition(streamName, DEFAULT_STREAM_VERSION); + + eventReceiverAdminServiceStub.cleanup(); + eventStreamAdminServiceStub.cleanup(); + } return Response.ok().build(); } catch (AxisFault e) { - log.error("failed to create event definitions for tenantDomain:" + tenantDomain, e); + log.error("failed to delete event definitions for tenantDomain:" + tenantDomain, e); return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); } catch (RemoteException e) { log.error("Failed to connect with the remote services:" + tenantDomain, e); @@ -220,6 +314,10 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe } catch (DeviceManagementException e) { log.error("Failed to access device management service, tenantDomain: " + tenantDomain, e); return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); + } finally { + if (superTenantMode) { + PrivilegedCarbonContext.endTenantFlow(); + } } } @@ -227,8 +325,8 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe @Path("/{type}/{deviceId}") @Override public Response getData(@PathParam("deviceId") String deviceId, @QueryParam("from") long from, - @QueryParam("to") long to,@PathParam("type") String deviceType, @QueryParam("offset") - int offset, @QueryParam("limit") int limit) { + @QueryParam("to") long to, @PathParam("type") String deviceType, @QueryParam("offset") + int offset, @QueryParam("limit") int limit) { String fromDate = String.valueOf(from); String toDate = String.valueOf(to); String query = "deviceId:" + deviceId + " AND _timestamp : [" + fromDate + " TO " + toDate + "]"; @@ -264,53 +362,8 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe } } - @GET - @Path("/{type}") - @Override - public Response getDeviceTypeEventDefinition(@PathParam("type") String deviceType) { - String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(); - try { - if (deviceType == null || - !DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(deviceType)) { - String errorMessage = "Invalid device type"; - log.error(errorMessage); - return Response.status(Response.Status.BAD_REQUEST).build(); - } - String streamName = getStreamDefinition(deviceType, tenantDomain); - EventStreamDefinitionDto eventStreamDefinitionDto = getEventStreamAdminServiceStub().getStreamDefinitionDto( - streamName + ":" + DEFAULT_STREAM_VERSION); - if (eventStreamDefinitionDto == null) { - return Response.status(Response.Status.NO_CONTENT).build(); - } - EventStreamAttributeDto[] eventStreamAttributeDtos = eventStreamDefinitionDto.getPayloadData(); - EventAttributeList eventAttributeList = new EventAttributeList(); - List attributes = new ArrayList<>(); - for (EventStreamAttributeDto eventStreamAttributeDto : eventStreamAttributeDtos) { - attributes.add(new Attribute(eventStreamAttributeDto.getAttributeName() - , AttributeType.valueOf(eventStreamAttributeDto.getAttributeType().toUpperCase()))); - } - eventAttributeList.setList(attributes); - return Response.ok().entity(eventAttributeList).build(); - } catch (AxisFault e) { - log.error("failed to create event definitions for tenantDomain:" + tenantDomain, e); - return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); - } catch (RemoteException e) { - log.error("Failed to connect with the remote services:" + tenantDomain, e); - return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); - } catch (JWTClientException e) { - log.error("Failed to generate jwt token for tenantDomain:" + tenantDomain, e); - return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); - } catch (UserStoreException e) { - log.error("Failed to connect with the user store, tenantDomain: " + tenantDomain, e); - return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); - } catch (DeviceManagementException e) { - log.error("Failed to access device management service, tenantDomain: " + tenantDomain, e); - return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); - } - } - - - private void publishEventReceivers(String eventRecieverName, String streamNameWithVersion, TransportType transportType + private void publishEventReceivers(String eventRecieverName, String streamNameWithVersion, + TransportType transportType , String requestedTenantDomain, String deviceType) throws RemoteException, UserStoreException, JWTClientException { EventReceiverAdminServiceStub receiverAdminServiceStub = getEventReceiverAdminServiceStub(); @@ -328,8 +381,11 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe basicInputAdapterPropertyDtos = new BasicInputAdapterPropertyDto[1]; basicInputAdapterPropertyDtos[0] = getBasicInputAdapterPropertyDto("contentValidator", "iot-mqtt"); } - receiverAdminServiceStub.deployJsonEventReceiverConfiguration(eventRecieverName, streamNameWithVersion - , adapterType, null, basicInputAdapterPropertyDtos, false); + if (receiverAdminServiceStub.getActiveEventReceiverConfiguration(eventRecieverName) == null) { + receiverAdminServiceStub.deployJsonEventReceiverConfiguration(eventRecieverName, streamNameWithVersion + , adapterType, null, basicInputAdapterPropertyDtos, false); + } + receiverAdminServiceStub.cleanup(); } private void publishStreamDefinitons(String streamName, String version, String deviceType @@ -339,7 +395,8 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe EventStreamDefinitionDto eventStreamDefinitionDto = new EventStreamDefinitionDto(); eventStreamDefinitionDto.setName(streamName); eventStreamDefinitionDto.setVersion(version); - EventStreamAttributeDto eventStreamAttributeDtos[] = new EventStreamAttributeDto[eventAttributes.getList().size()]; + EventStreamAttributeDto eventStreamAttributeDtos[] = + new EventStreamAttributeDto[eventAttributes.getList().size()]; int i = 0; for (Attribute attribute : eventAttributes.getList()) { EventStreamAttributeDto eventStreamAttributeDto = new EventStreamAttributeDto(); @@ -356,22 +413,25 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe eventStreamDefinitionDto.setMetaData(metaData); eventStreamDefinitionDto.setPayloadData(eventStreamAttributeDtos); String streamId = streamName + ":" + version; - if (eventStreamAdminServiceStub.getStreamDefinitionAsString(streamId) != null) { + if (eventStreamAdminServiceStub.getStreamDefinitionDto(streamId) != null) { eventStreamAdminServiceStub.editEventStreamDefinitionAsDto(eventStreamDefinitionDto, streamId); } else { eventStreamAdminServiceStub.addEventStreamDefinitionAsDto(eventStreamDefinitionDto); } - + eventStreamAdminServiceStub.cleanup(); } private void publishEventStore(String streamName, String version, EventAttributeList eventAttributes) throws RemoteException, UserStoreException, JWTClientException, EventStreamPersistenceAdminServiceEventStreamPersistenceAdminServiceExceptionException { - EventStreamPersistenceAdminServiceStub eventStreamAdminServiceStub = getEventStreamPersistenceAdminServiceStub(); + EventStreamPersistenceAdminServiceStub eventStreamAdminServiceStub = + getEventStreamPersistenceAdminServiceStub(); AnalyticsTable analyticsTable = new AnalyticsTable(); analyticsTable.setRecordStoreName(DEFAULT_EVENT_STORE_NAME); analyticsTable.setStreamVersion(version); - analyticsTable.setTableName(getTableName(streamName)); + analyticsTable.setTableName(streamName); + analyticsTable.setMergeSchema(false); + analyticsTable.setPersist(true); AnalyticsTableRecord analyticsTableRecords[] = new AnalyticsTableRecord[eventAttributes.getList().size() + 1]; int i = 0; for (Attribute attribute : eventAttributes.getList()) { @@ -397,21 +457,25 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe analyticsTableRecords[i] = analyticsTableRecord; analyticsTable.setAnalyticsTableRecords(analyticsTableRecords); eventStreamAdminServiceStub.addAnalyticsTable(analyticsTable); - + eventStreamAdminServiceStub.cleanup(); } private void publishWebsocketPublisherDefinition(String streamNameWithVersion, String deviceType) throws RemoteException, UserStoreException, JWTClientException { EventPublisherAdminServiceStub eventPublisherAdminServiceStub = getEventPublisherAdminServiceStub(); String eventPublisherName = deviceType.trim().toLowerCase() + "_websocket_publisher"; - eventPublisherAdminServiceStub.startdeployJsonEventPublisherConfiguration(eventPublisherName - , streamNameWithVersion, DEFAULT_WEBSOCKET_PUBLISHER_ADAPTER_TYPE, null, null,null, false, null); + if (eventPublisherAdminServiceStub.getActiveEventPublisherConfiguration(eventPublisherName) == null) { + eventPublisherAdminServiceStub.deployJsonEventPublisherConfiguration(eventPublisherName + , streamNameWithVersion, DEFAULT_WEBSOCKET_PUBLISHER_ADAPTER_TYPE, null, null + , null, false); + } + eventPublisherAdminServiceStub.cleanup(); } private EventStreamAdminServiceStub getEventStreamAdminServiceStub() throws AxisFault, UserStoreException, JWTClientException { EventStreamAdminServiceStub eventStreamAdminServiceStub = new EventStreamAdminServiceStub( - Utils.replaceSystemProperty(DAS_ADMIN_SERVICE_EP)); + Utils.replaceSystemProperty(DAS_ADMIN_SERVICE_EP + EVENT_STREAM_CONTEXT)); Options streamOptions = eventStreamAdminServiceStub._getServiceClient().getOptions(); if (streamOptions == null) { streamOptions = new Options(); @@ -421,7 +485,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe .getRealmConfiguration().getAdminUserName() + "@" + tenantDomain; JWTClient jwtClient = DeviceMgtAPIUtils.getJWTClientManagerService().getJWTClient(); - String authValue = AUTHORIZATION_HEADER_VALUE + " " + new String(Base64.encodeBase64( + String authValue = AUTHORIZATION_HEADER_VALUE + " " + new String(Base64.encodeBase64( jwtClient.getJwtToken(username).getBytes())); List
list = new ArrayList<>(); @@ -441,7 +505,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe private EventReceiverAdminServiceStub getEventReceiverAdminServiceStub() throws AxisFault, UserStoreException, JWTClientException { EventReceiverAdminServiceStub receiverAdminServiceStub = new EventReceiverAdminServiceStub( - Utils.replaceSystemProperty(DAS_ADMIN_SERVICE_EP)); + Utils.replaceSystemProperty(DAS_ADMIN_SERVICE_EP + EVENT_RECIEVER_CONTEXT)); Options eventReciverOptions = receiverAdminServiceStub._getServiceClient().getOptions(); if (eventReciverOptions == null) { eventReciverOptions = new Options(); @@ -451,7 +515,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe .getRealmConfiguration().getAdminUserName() + "@" + tenantDomain; JWTClient jwtClient = DeviceMgtAPIUtils.getJWTClientManagerService().getJWTClient(); - String authValue = AUTHORIZATION_HEADER_VALUE + " " + new String(Base64.encodeBase64( + String authValue = AUTHORIZATION_HEADER_VALUE + " " + new String(Base64.encodeBase64( jwtClient.getJwtToken(username).getBytes())); List
list = new ArrayList<>(); @@ -472,7 +536,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe private EventPublisherAdminServiceStub getEventPublisherAdminServiceStub() throws AxisFault, UserStoreException, JWTClientException { EventPublisherAdminServiceStub eventPublisherAdminServiceStub = new EventPublisherAdminServiceStub( - Utils.replaceSystemProperty(DAS_ADMIN_SERVICE_EP)); + Utils.replaceSystemProperty(DAS_ADMIN_SERVICE_EP + EVENT_PUBLISHER_CONTEXT)); Options eventReciverOptions = eventPublisherAdminServiceStub._getServiceClient().getOptions(); if (eventReciverOptions == null) { eventReciverOptions = new Options(); @@ -482,7 +546,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe .getRealmConfiguration().getAdminUserName() + "@" + tenantDomain; JWTClient jwtClient = DeviceMgtAPIUtils.getJWTClientManagerService().getJWTClient(); - String authValue = AUTHORIZATION_HEADER_VALUE + " " + new String(Base64.encodeBase64( + String authValue = AUTHORIZATION_HEADER_VALUE + " " + new String(Base64.encodeBase64( jwtClient.getJwtToken(username).getBytes())); List
list = new ArrayList<>(); @@ -504,7 +568,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe throws AxisFault, UserStoreException, JWTClientException { EventStreamPersistenceAdminServiceStub eventStreamPersistenceAdminServiceStub = new EventStreamPersistenceAdminServiceStub( - Utils.replaceSystemProperty(DAS_ADMIN_SERVICE_EP)); + Utils.replaceSystemProperty(DAS_ADMIN_SERVICE_EP + EVENT_PERSISTENCE_CONTEXT)); Options eventReciverOptions = eventStreamPersistenceAdminServiceStub._getServiceClient().getOptions(); if (eventReciverOptions == null) { eventReciverOptions = new Options(); @@ -514,7 +578,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe .getRealmConfiguration().getAdminUserName() + "@" + tenantDomain; JWTClient jwtClient = DeviceMgtAPIUtils.getJWTClientManagerService().getJWTClient(); - String authValue = AUTHORIZATION_HEADER_VALUE + " " + new String(Base64.encodeBase64( + String authValue = AUTHORIZATION_HEADER_VALUE + " " + new String(Base64.encodeBase64( jwtClient.getJwtToken(username).getBytes())); List
list = new ArrayList<>(); @@ -578,7 +642,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe * Initializes the SSL Context */ private static void initSSLConnection() throws NoSuchAlgorithmException, UnrecoverableKeyException, - KeyStoreException, KeyManagementException { + KeyStoreException, KeyManagementException { KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KEY_MANAGER_TYPE); keyManagerFactory.init(keyStore, keyStorePassword); TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TRUST_MANAGER_TYPE);