resolved issues in event management service

merge-requests/1/head
ayyoob 8 years ago
parent f834001d22
commit 2a89a2fb23

@ -18,6 +18,7 @@
*/ */
package org.wso2.carbon.device.mgt.jaxrs.beans.analytics; package org.wso2.carbon.device.mgt.jaxrs.beans.analytics;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModelProperty; import io.swagger.annotations.ApiModelProperty;
/** /**
@ -25,27 +26,28 @@ import io.swagger.annotations.ApiModelProperty;
*/ */
public class DeviceTypeEvent { public class DeviceTypeEvent {
@ApiModelProperty(value = "Attributes related to device type event") private EventAttributeList eventAttributes;
private EventAttributeList eventAttributeList; private TransportType transport;
@ApiModelProperty(value = "Transport to be used for device to server communication.")
private TransportType transportType;
@ApiModelProperty(value = "Attributes related to device type event")
@JsonProperty("eventAttributes")
public EventAttributeList getEventAttributeList() { public EventAttributeList getEventAttributeList() {
return eventAttributeList; return eventAttributes;
} }
public void setEventAttributeList( public void setEventAttributeList(
EventAttributeList eventAttributeList) { EventAttributeList eventAttributes) {
this.eventAttributeList = eventAttributeList; this.eventAttributes = eventAttributes;
} }
@ApiModelProperty(value = "Transport to be used for device to server communication.")
@JsonProperty("transport")
public TransportType getTransportType() { public TransportType getTransportType() {
return transportType; return transport;
} }
public void setTransportType(TransportType transportType) { public void setTransportType(TransportType transport) {
this.transportType = transportType; this.transport = transport;
} }
} }

@ -17,6 +17,7 @@ import org.wso2.carbon.device.mgt.jaxrs.beans.DeviceTypeList;
import org.wso2.carbon.device.mgt.jaxrs.beans.ErrorResponse; import org.wso2.carbon.device.mgt.jaxrs.beans.ErrorResponse;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.DeviceTypeEvent; import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.DeviceTypeEvent;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.EventAttributeList; import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.EventAttributeList;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.EventRecords;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.TransportType; import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.TransportType;
import org.wso2.carbon.device.mgt.jaxrs.util.Constants; import org.wso2.carbon.device.mgt.jaxrs.util.Constants;
@ -90,7 +91,6 @@ public interface DeviceEventManagementService {
@ApiResponse( @ApiResponse(
code = 200, code = 200,
message = "OK. \n Successfully added the event defintion.", message = "OK. \n Successfully added the event defintion.",
response = DeviceTypeList.class,
responseHeaders = { responseHeaders = {
@ResponseHeader( @ResponseHeader(
name = "Content-Type", name = "Content-Type",
@ -144,7 +144,6 @@ public interface DeviceEventManagementService {
@ApiResponse( @ApiResponse(
code = 200, code = 200,
message = "OK. \n Successfully deleted the event definition.", message = "OK. \n Successfully deleted the event definition.",
response = DeviceTypeList.class,
responseHeaders = { responseHeaders = {
@ResponseHeader( @ResponseHeader(
name = "Content-Type", name = "Content-Type",
@ -196,7 +195,7 @@ public interface DeviceEventManagementService {
@ApiResponse( @ApiResponse(
code = 200, code = 200,
message = "OK. \n Successfully fetched the event definition.", message = "OK. \n Successfully fetched the event definition.",
response = DeviceTypeList.class, response = EventRecords.class,
responseHeaders = { responseHeaders = {
@ResponseHeader( @ResponseHeader(
name = "Content-Type", name = "Content-Type",
@ -258,7 +257,7 @@ public interface DeviceEventManagementService {
@ApiResponse( @ApiResponse(
code = 200, code = 200,
message = "OK. \n Successfully fetched the event defintion.", message = "OK. \n Successfully fetched the event defintion.",
response = DeviceTypeList.class, response = EventAttributeList.class,
responseHeaders = { responseHeaders = {
@ResponseHeader( @ResponseHeader(
name = "Content-Type", name = "Content-Type",

@ -37,7 +37,9 @@ import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.EventAttributeList;
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.TransportType; import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.TransportType;
import org.wso2.carbon.device.mgt.jaxrs.service.api.DeviceEventManagementService; import org.wso2.carbon.device.mgt.jaxrs.service.api.DeviceEventManagementService;
import org.wso2.carbon.device.mgt.jaxrs.util.DeviceMgtAPIUtils; import org.wso2.carbon.device.mgt.jaxrs.util.DeviceMgtAPIUtils;
import org.wso2.carbon.event.publisher.stub.EventPublisherAdminServiceCallbackHandler;
import org.wso2.carbon.event.publisher.stub.EventPublisherAdminServiceStub; import org.wso2.carbon.event.publisher.stub.EventPublisherAdminServiceStub;
import org.wso2.carbon.event.receiver.stub.EventReceiverAdminServiceCallbackHandler;
import org.wso2.carbon.event.receiver.stub.EventReceiverAdminServiceStub; import org.wso2.carbon.event.receiver.stub.EventReceiverAdminServiceStub;
import org.wso2.carbon.event.receiver.stub.types.BasicInputAdapterPropertyDto; import org.wso2.carbon.event.receiver.stub.types.BasicInputAdapterPropertyDto;
import org.wso2.carbon.event.stream.stub.EventStreamAdminServiceStub; import org.wso2.carbon.event.stream.stub.EventStreamAdminServiceStub;
@ -48,6 +50,7 @@ import org.wso2.carbon.identity.jwt.client.extension.exception.JWTClientExceptio
import org.wso2.carbon.user.api.UserStoreException; import org.wso2.carbon.user.api.UserStoreException;
import org.wso2.carbon.analytics.datasource.commons.Record; import org.wso2.carbon.analytics.datasource.commons.Record;
import org.wso2.carbon.analytics.datasource.commons.exception.AnalyticsException; import org.wso2.carbon.analytics.datasource.commons.exception.AnalyticsException;
import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.TrustManagerFactory;
@ -88,7 +91,11 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
private static final String DAS_HOST_NAME = "${iot.analytics.host}"; private static final String DAS_HOST_NAME = "${iot.analytics.host}";
private static final String DEFAULT_HTTP_PROTOCOL = "https"; private static final String DEFAULT_HTTP_PROTOCOL = "https";
private static final String DAS_ADMIN_SERVICE_EP = DEFAULT_HTTP_PROTOCOL + "://" + DAS_HOST_NAME private static final String DAS_ADMIN_SERVICE_EP = DEFAULT_HTTP_PROTOCOL + "://" + DAS_HOST_NAME
+ ":" + DAS_PORT + "/services/EventReceiverAdminService" + "/"; + ":" + DAS_PORT + "/services/";
private static final String EVENT_RECIEVER_CONTEXT = "EventReceiverAdminService/";
private static final String EVENT_PUBLISHER_CONTEXT = "EventPublisherAdminService/";
private static final String EVENT_STREAM_CONTEXT = "EventStreamAdminService/";
private static final String EVENT_PERSISTENCE_CONTEXT = "EventStreamPersistenceAdminService/";
private static final String AUTHORIZATION_HEADER = "Authorization"; private static final String AUTHORIZATION_HEADER = "Authorization";
private static final String AUTHORIZATION_HEADER_VALUE = "Bearer"; private static final String AUTHORIZATION_HEADER_VALUE = "Bearer";
private static final String KEY_STORE_TYPE = "JKS"; private static final String KEY_STORE_TYPE = "JKS";
@ -104,6 +111,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
private static KeyStore trustStore; private static KeyStore trustStore;
private static char[] keyStorePassword; private static char[] keyStorePassword;
private static SSLContext sslContext; private static SSLContext sslContext;
static { static {
String keyStorePassword = ServerConfiguration.getInstance().getFirstProperty("Security.KeyStore.Password"); String keyStorePassword = ServerConfiguration.getInstance().getFirstProperty("Security.KeyStore.Password");
String trustStorePassword = ServerConfiguration.getInstance().getFirstProperty( String trustStorePassword = ServerConfiguration.getInstance().getFirstProperty(
@ -125,22 +133,71 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
} }
} }
@GET
@Path("/{type}")
@Override
public Response getDeviceTypeEventDefinition(@PathParam("type") String deviceType) {
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
try {
if (deviceType == null ||
!DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(deviceType)) {
String errorMessage = "Invalid device type";
log.error(errorMessage);
return Response.status(Response.Status.BAD_REQUEST).build();
}
String streamName = getStreamDefinition(deviceType, tenantDomain);
EventStreamAdminServiceStub eventStreamAdminServiceStub = getEventStreamAdminServiceStub();
EventStreamDefinitionDto eventStreamDefinitionDto = eventStreamAdminServiceStub.getStreamDefinitionDto(
streamName + ":" + DEFAULT_STREAM_VERSION);
if (eventStreamDefinitionDto == null) {
eventStreamAdminServiceStub.cleanup();
return Response.status(Response.Status.NO_CONTENT).build();
}
EventStreamAttributeDto[] eventStreamAttributeDtos = eventStreamDefinitionDto.getPayloadData();
EventAttributeList eventAttributeList = new EventAttributeList();
List<Attribute> attributes = new ArrayList<>();
for (EventStreamAttributeDto eventStreamAttributeDto : eventStreamAttributeDtos) {
attributes.add(new Attribute(eventStreamAttributeDto.getAttributeName()
, AttributeType.valueOf(eventStreamAttributeDto.getAttributeType().toUpperCase())));
}
eventAttributeList.setList(attributes);
eventStreamAdminServiceStub.cleanup();
return Response.ok().entity(eventAttributeList).build();
} catch (AxisFault e) {
log.error("failed to retrieve event definitions for tenantDomain:" + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} catch (RemoteException e) {
log.error("Failed to connect with the remote services:" + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} catch (JWTClientException e) {
log.error("Failed to generate jwt token for tenantDomain:" + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} catch (UserStoreException e) {
log.error("Failed to connect with the user store, tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} catch (DeviceManagementException e) {
log.error("Failed to access device management service, tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
}
}
/** /**
* Deploy Event Stream, Receiver, Publisher and Store Configuration. * Deploy Event Stream, Receiver, Publisher and Store Configuration.
*/ */
@POST @POST
@Path("/{type}") @Path("/{type}")
@Override @Override
public Response deployDeviceTypeEventDefinition(@PathParam("type") String deviceType, @Valid DeviceTypeEvent deviceTypeEvent) { public Response deployDeviceTypeEventDefinition(@PathParam("type") String deviceType,
@Valid DeviceTypeEvent deviceTypeEvent) {
TransportType transportType = deviceTypeEvent.getTransportType(); TransportType transportType = deviceTypeEvent.getTransportType();
EventAttributeList eventAttributes = deviceTypeEvent.getEventAttributeList(); EventAttributeList eventAttributes = deviceTypeEvent.getEventAttributeList();
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(); String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
boolean superTenantMode = false; boolean superTenantMode = false;
try { try {
if (eventAttributes == null || eventAttributes.getList() == null || eventAttributes.getList().size() == 0 || if (eventAttributes == null || eventAttributes.getList() == null || eventAttributes.getList().size() == 0 ||
deviceType == null || deviceType == null || transportType == null ||
!DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(deviceType)) { !DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(deviceType)) {
String errorMessage = "Invalid device type"; String errorMessage = "Invalid Payload";
log.error(errorMessage); log.error(errorMessage);
return Response.status(Response.Status.BAD_REQUEST).build(); return Response.status(Response.Status.BAD_REQUEST).build();
} }
@ -157,7 +214,8 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
MultitenantConstants.SUPER_TENANT_DOMAIN_NAME, true); MultitenantConstants.SUPER_TENANT_DOMAIN_NAME, true);
if (!MultitenantConstants.SUPER_TENANT_DOMAIN_NAME.equals(tenantDomain)) { if (!MultitenantConstants.SUPER_TENANT_DOMAIN_NAME.equals(tenantDomain)) {
publishStreamDefinitons(streamName, DEFAULT_STREAM_VERSION, deviceType, eventAttributes); publishStreamDefinitons(streamName, DEFAULT_STREAM_VERSION, deviceType, eventAttributes);
publishEventReceivers(eventReceiverName, streamNameWithVersion, transportType, tenantDomain, deviceType); publishEventReceivers(eventReceiverName, streamNameWithVersion, transportType, tenantDomain,
deviceType);
} }
return Response.ok().build(); return Response.ok().build();
} catch (AxisFault e) { } catch (AxisFault e) {
@ -176,7 +234,8 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
log.error("Failed to access device management service, tenantDomain: " + tenantDomain, e); log.error("Failed to access device management service, tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} catch (EventStreamPersistenceAdminServiceEventStreamPersistenceAdminServiceExceptionException e) { } catch (EventStreamPersistenceAdminServiceEventStreamPersistenceAdminServiceExceptionException e) {
log.error("Failed to create event store for, tenantDomain: " + tenantDomain + " deviceType" + deviceType, e); log.error("Failed to create event store for, tenantDomain: " + tenantDomain + " deviceType" + deviceType,
e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} finally { } finally {
if (superTenantMode) { if (superTenantMode) {
@ -190,6 +249,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
@Override @Override
public Response deleteDeviceTypeEventDefinitions(@PathParam("type") String deviceType) { public Response deleteDeviceTypeEventDefinitions(@PathParam("type") String deviceType) {
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(); String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
boolean superTenantMode = false;
try { try {
if (deviceType == null || if (deviceType == null ||
!DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(deviceType)) { !DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(deviceType)) {
@ -200,13 +260,47 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
String eventReceiverName = getReceiverName(deviceType, tenantDomain); String eventReceiverName = getReceiverName(deviceType, tenantDomain);
String eventPublisherName = deviceType.trim().toLowerCase() + "_websocket_publisher"; String eventPublisherName = deviceType.trim().toLowerCase() + "_websocket_publisher";
String streamName = getStreamDefinition(deviceType, tenantDomain); String streamName = getStreamDefinition(deviceType, tenantDomain);
EventStreamAdminServiceStub eventStreamAdminServiceStub = getEventStreamAdminServiceStub();
if (eventStreamAdminServiceStub.getStreamDefinitionDto(streamName + ":" + DEFAULT_STREAM_VERSION) == null) {
return Response.status(Response.Status.NO_CONTENT).build();
}
EventReceiverAdminServiceCallbackHandler eventReceiverAdminServiceCallbackHandler =
new EventReceiverAdminServiceCallbackHandler() {};
EventPublisherAdminServiceCallbackHandler eventPublisherAdminServiceCallbackHandler =
new EventPublisherAdminServiceCallbackHandler() {};
EventReceiverAdminServiceStub eventReceiverAdminServiceStub = getEventReceiverAdminServiceStub();
eventReceiverAdminServiceStub.startundeployActiveEventReceiverConfiguration(eventReceiverName
, eventReceiverAdminServiceCallbackHandler);
getEventStreamAdminServiceStub().removeEventStreamDefinition(streamName, DEFAULT_STREAM_VERSION); eventStreamAdminServiceStub.removeEventStreamDefinition(streamName, DEFAULT_STREAM_VERSION);
getEventReceiverAdminServiceStub().undeployActiveEventReceiverConfiguration(eventReceiverName);
getEventPublisherAdminServiceStub().undeployActiveEventPublisherConfiguration(eventPublisherName); EventPublisherAdminServiceStub eventPublisherAdminServiceStub = getEventPublisherAdminServiceStub();
eventPublisherAdminServiceStub.startundeployActiveEventPublisherConfiguration(eventPublisherName
, eventPublisherAdminServiceCallbackHandler);
eventStreamAdminServiceStub.cleanup();
eventPublisherAdminServiceStub.cleanup();
eventReceiverAdminServiceStub.cleanup();
superTenantMode = true;
PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(
MultitenantConstants.SUPER_TENANT_DOMAIN_NAME, true);
if (!MultitenantConstants.SUPER_TENANT_DOMAIN_NAME.equals(tenantDomain)) {
eventReceiverAdminServiceStub = getEventReceiverAdminServiceStub();
eventStreamAdminServiceStub = getEventStreamAdminServiceStub();
eventReceiverAdminServiceStub.startundeployActiveEventReceiverConfiguration(eventReceiverName
, eventReceiverAdminServiceCallbackHandler);
eventStreamAdminServiceStub.removeEventStreamDefinition(streamName, DEFAULT_STREAM_VERSION);
eventReceiverAdminServiceStub.cleanup();
eventStreamAdminServiceStub.cleanup();
}
return Response.ok().build(); return Response.ok().build();
} catch (AxisFault e) { } catch (AxisFault e) {
log.error("failed to create event definitions for tenantDomain:" + tenantDomain, e); log.error("failed to delete event definitions for tenantDomain:" + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} catch (RemoteException e) { } catch (RemoteException e) {
log.error("Failed to connect with the remote services:" + tenantDomain, e); log.error("Failed to connect with the remote services:" + tenantDomain, e);
@ -220,6 +314,10 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
} catch (DeviceManagementException e) { } catch (DeviceManagementException e) {
log.error("Failed to access device management service, tenantDomain: " + tenantDomain, e); log.error("Failed to access device management service, tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} finally {
if (superTenantMode) {
PrivilegedCarbonContext.endTenantFlow();
}
} }
} }
@ -264,53 +362,8 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
} }
} }
@GET private void publishEventReceivers(String eventRecieverName, String streamNameWithVersion,
@Path("/{type}") TransportType transportType
@Override
public Response getDeviceTypeEventDefinition(@PathParam("type") String deviceType) {
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
try {
if (deviceType == null ||
!DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(deviceType)) {
String errorMessage = "Invalid device type";
log.error(errorMessage);
return Response.status(Response.Status.BAD_REQUEST).build();
}
String streamName = getStreamDefinition(deviceType, tenantDomain);
EventStreamDefinitionDto eventStreamDefinitionDto = getEventStreamAdminServiceStub().getStreamDefinitionDto(
streamName + ":" + DEFAULT_STREAM_VERSION);
if (eventStreamDefinitionDto == null) {
return Response.status(Response.Status.NO_CONTENT).build();
}
EventStreamAttributeDto[] eventStreamAttributeDtos = eventStreamDefinitionDto.getPayloadData();
EventAttributeList eventAttributeList = new EventAttributeList();
List<Attribute> attributes = new ArrayList<>();
for (EventStreamAttributeDto eventStreamAttributeDto : eventStreamAttributeDtos) {
attributes.add(new Attribute(eventStreamAttributeDto.getAttributeName()
, AttributeType.valueOf(eventStreamAttributeDto.getAttributeType().toUpperCase())));
}
eventAttributeList.setList(attributes);
return Response.ok().entity(eventAttributeList).build();
} catch (AxisFault e) {
log.error("failed to create event definitions for tenantDomain:" + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} catch (RemoteException e) {
log.error("Failed to connect with the remote services:" + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} catch (JWTClientException e) {
log.error("Failed to generate jwt token for tenantDomain:" + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} catch (UserStoreException e) {
log.error("Failed to connect with the user store, tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} catch (DeviceManagementException e) {
log.error("Failed to access device management service, tenantDomain: " + tenantDomain, e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
}
}
private void publishEventReceivers(String eventRecieverName, String streamNameWithVersion, TransportType transportType
, String requestedTenantDomain, String deviceType) , String requestedTenantDomain, String deviceType)
throws RemoteException, UserStoreException, JWTClientException { throws RemoteException, UserStoreException, JWTClientException {
EventReceiverAdminServiceStub receiverAdminServiceStub = getEventReceiverAdminServiceStub(); EventReceiverAdminServiceStub receiverAdminServiceStub = getEventReceiverAdminServiceStub();
@ -328,9 +381,12 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
basicInputAdapterPropertyDtos = new BasicInputAdapterPropertyDto[1]; basicInputAdapterPropertyDtos = new BasicInputAdapterPropertyDto[1];
basicInputAdapterPropertyDtos[0] = getBasicInputAdapterPropertyDto("contentValidator", "iot-mqtt"); basicInputAdapterPropertyDtos[0] = getBasicInputAdapterPropertyDto("contentValidator", "iot-mqtt");
} }
if (receiverAdminServiceStub.getActiveEventReceiverConfiguration(eventRecieverName) == null) {
receiverAdminServiceStub.deployJsonEventReceiverConfiguration(eventRecieverName, streamNameWithVersion receiverAdminServiceStub.deployJsonEventReceiverConfiguration(eventRecieverName, streamNameWithVersion
, adapterType, null, basicInputAdapterPropertyDtos, false); , adapterType, null, basicInputAdapterPropertyDtos, false);
} }
receiverAdminServiceStub.cleanup();
}
private void publishStreamDefinitons(String streamName, String version, String deviceType private void publishStreamDefinitons(String streamName, String version, String deviceType
, EventAttributeList eventAttributes) , EventAttributeList eventAttributes)
@ -339,7 +395,8 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
EventStreamDefinitionDto eventStreamDefinitionDto = new EventStreamDefinitionDto(); EventStreamDefinitionDto eventStreamDefinitionDto = new EventStreamDefinitionDto();
eventStreamDefinitionDto.setName(streamName); eventStreamDefinitionDto.setName(streamName);
eventStreamDefinitionDto.setVersion(version); eventStreamDefinitionDto.setVersion(version);
EventStreamAttributeDto eventStreamAttributeDtos[] = new EventStreamAttributeDto[eventAttributes.getList().size()]; EventStreamAttributeDto eventStreamAttributeDtos[] =
new EventStreamAttributeDto[eventAttributes.getList().size()];
int i = 0; int i = 0;
for (Attribute attribute : eventAttributes.getList()) { for (Attribute attribute : eventAttributes.getList()) {
EventStreamAttributeDto eventStreamAttributeDto = new EventStreamAttributeDto(); EventStreamAttributeDto eventStreamAttributeDto = new EventStreamAttributeDto();
@ -356,22 +413,25 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
eventStreamDefinitionDto.setMetaData(metaData); eventStreamDefinitionDto.setMetaData(metaData);
eventStreamDefinitionDto.setPayloadData(eventStreamAttributeDtos); eventStreamDefinitionDto.setPayloadData(eventStreamAttributeDtos);
String streamId = streamName + ":" + version; String streamId = streamName + ":" + version;
if (eventStreamAdminServiceStub.getStreamDefinitionAsString(streamId) != null) { if (eventStreamAdminServiceStub.getStreamDefinitionDto(streamId) != null) {
eventStreamAdminServiceStub.editEventStreamDefinitionAsDto(eventStreamDefinitionDto, streamId); eventStreamAdminServiceStub.editEventStreamDefinitionAsDto(eventStreamDefinitionDto, streamId);
} else { } else {
eventStreamAdminServiceStub.addEventStreamDefinitionAsDto(eventStreamDefinitionDto); eventStreamAdminServiceStub.addEventStreamDefinitionAsDto(eventStreamDefinitionDto);
} }
eventStreamAdminServiceStub.cleanup();
} }
private void publishEventStore(String streamName, String version, EventAttributeList eventAttributes) private void publishEventStore(String streamName, String version, EventAttributeList eventAttributes)
throws RemoteException, UserStoreException, JWTClientException, throws RemoteException, UserStoreException, JWTClientException,
EventStreamPersistenceAdminServiceEventStreamPersistenceAdminServiceExceptionException { EventStreamPersistenceAdminServiceEventStreamPersistenceAdminServiceExceptionException {
EventStreamPersistenceAdminServiceStub eventStreamAdminServiceStub = getEventStreamPersistenceAdminServiceStub(); EventStreamPersistenceAdminServiceStub eventStreamAdminServiceStub =
getEventStreamPersistenceAdminServiceStub();
AnalyticsTable analyticsTable = new AnalyticsTable(); AnalyticsTable analyticsTable = new AnalyticsTable();
analyticsTable.setRecordStoreName(DEFAULT_EVENT_STORE_NAME); analyticsTable.setRecordStoreName(DEFAULT_EVENT_STORE_NAME);
analyticsTable.setStreamVersion(version); analyticsTable.setStreamVersion(version);
analyticsTable.setTableName(getTableName(streamName)); analyticsTable.setTableName(streamName);
analyticsTable.setMergeSchema(false);
analyticsTable.setPersist(true);
AnalyticsTableRecord analyticsTableRecords[] = new AnalyticsTableRecord[eventAttributes.getList().size() + 1]; AnalyticsTableRecord analyticsTableRecords[] = new AnalyticsTableRecord[eventAttributes.getList().size() + 1];
int i = 0; int i = 0;
for (Attribute attribute : eventAttributes.getList()) { for (Attribute attribute : eventAttributes.getList()) {
@ -397,21 +457,25 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
analyticsTableRecords[i] = analyticsTableRecord; analyticsTableRecords[i] = analyticsTableRecord;
analyticsTable.setAnalyticsTableRecords(analyticsTableRecords); analyticsTable.setAnalyticsTableRecords(analyticsTableRecords);
eventStreamAdminServiceStub.addAnalyticsTable(analyticsTable); eventStreamAdminServiceStub.addAnalyticsTable(analyticsTable);
eventStreamAdminServiceStub.cleanup();
} }
private void publishWebsocketPublisherDefinition(String streamNameWithVersion, String deviceType) private void publishWebsocketPublisherDefinition(String streamNameWithVersion, String deviceType)
throws RemoteException, UserStoreException, JWTClientException { throws RemoteException, UserStoreException, JWTClientException {
EventPublisherAdminServiceStub eventPublisherAdminServiceStub = getEventPublisherAdminServiceStub(); EventPublisherAdminServiceStub eventPublisherAdminServiceStub = getEventPublisherAdminServiceStub();
String eventPublisherName = deviceType.trim().toLowerCase() + "_websocket_publisher"; String eventPublisherName = deviceType.trim().toLowerCase() + "_websocket_publisher";
eventPublisherAdminServiceStub.startdeployJsonEventPublisherConfiguration(eventPublisherName if (eventPublisherAdminServiceStub.getActiveEventPublisherConfiguration(eventPublisherName) == null) {
, streamNameWithVersion, DEFAULT_WEBSOCKET_PUBLISHER_ADAPTER_TYPE, null, null,null, false, null); eventPublisherAdminServiceStub.deployJsonEventPublisherConfiguration(eventPublisherName
, streamNameWithVersion, DEFAULT_WEBSOCKET_PUBLISHER_ADAPTER_TYPE, null, null
, null, false);
}
eventPublisherAdminServiceStub.cleanup();
} }
private EventStreamAdminServiceStub getEventStreamAdminServiceStub() private EventStreamAdminServiceStub getEventStreamAdminServiceStub()
throws AxisFault, UserStoreException, JWTClientException { throws AxisFault, UserStoreException, JWTClientException {
EventStreamAdminServiceStub eventStreamAdminServiceStub = new EventStreamAdminServiceStub( EventStreamAdminServiceStub eventStreamAdminServiceStub = new EventStreamAdminServiceStub(
Utils.replaceSystemProperty(DAS_ADMIN_SERVICE_EP)); Utils.replaceSystemProperty(DAS_ADMIN_SERVICE_EP + EVENT_STREAM_CONTEXT));
Options streamOptions = eventStreamAdminServiceStub._getServiceClient().getOptions(); Options streamOptions = eventStreamAdminServiceStub._getServiceClient().getOptions();
if (streamOptions == null) { if (streamOptions == null) {
streamOptions = new Options(); streamOptions = new Options();
@ -441,7 +505,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
private EventReceiverAdminServiceStub getEventReceiverAdminServiceStub() private EventReceiverAdminServiceStub getEventReceiverAdminServiceStub()
throws AxisFault, UserStoreException, JWTClientException { throws AxisFault, UserStoreException, JWTClientException {
EventReceiverAdminServiceStub receiverAdminServiceStub = new EventReceiverAdminServiceStub( EventReceiverAdminServiceStub receiverAdminServiceStub = new EventReceiverAdminServiceStub(
Utils.replaceSystemProperty(DAS_ADMIN_SERVICE_EP)); Utils.replaceSystemProperty(DAS_ADMIN_SERVICE_EP + EVENT_RECIEVER_CONTEXT));
Options eventReciverOptions = receiverAdminServiceStub._getServiceClient().getOptions(); Options eventReciverOptions = receiverAdminServiceStub._getServiceClient().getOptions();
if (eventReciverOptions == null) { if (eventReciverOptions == null) {
eventReciverOptions = new Options(); eventReciverOptions = new Options();
@ -472,7 +536,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
private EventPublisherAdminServiceStub getEventPublisherAdminServiceStub() private EventPublisherAdminServiceStub getEventPublisherAdminServiceStub()
throws AxisFault, UserStoreException, JWTClientException { throws AxisFault, UserStoreException, JWTClientException {
EventPublisherAdminServiceStub eventPublisherAdminServiceStub = new EventPublisherAdminServiceStub( EventPublisherAdminServiceStub eventPublisherAdminServiceStub = new EventPublisherAdminServiceStub(
Utils.replaceSystemProperty(DAS_ADMIN_SERVICE_EP)); Utils.replaceSystemProperty(DAS_ADMIN_SERVICE_EP + EVENT_PUBLISHER_CONTEXT));
Options eventReciverOptions = eventPublisherAdminServiceStub._getServiceClient().getOptions(); Options eventReciverOptions = eventPublisherAdminServiceStub._getServiceClient().getOptions();
if (eventReciverOptions == null) { if (eventReciverOptions == null) {
eventReciverOptions = new Options(); eventReciverOptions = new Options();
@ -504,7 +568,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
throws AxisFault, UserStoreException, JWTClientException { throws AxisFault, UserStoreException, JWTClientException {
EventStreamPersistenceAdminServiceStub eventStreamPersistenceAdminServiceStub EventStreamPersistenceAdminServiceStub eventStreamPersistenceAdminServiceStub
= new EventStreamPersistenceAdminServiceStub( = new EventStreamPersistenceAdminServiceStub(
Utils.replaceSystemProperty(DAS_ADMIN_SERVICE_EP)); Utils.replaceSystemProperty(DAS_ADMIN_SERVICE_EP + EVENT_PERSISTENCE_CONTEXT));
Options eventReciverOptions = eventStreamPersistenceAdminServiceStub._getServiceClient().getOptions(); Options eventReciverOptions = eventStreamPersistenceAdminServiceStub._getServiceClient().getOptions();
if (eventReciverOptions == null) { if (eventReciverOptions == null) {
eventReciverOptions = new Options(); eventReciverOptions = new Options();

Loading…
Cancel
Save