few fixes on event reciever definition

revert-70aa11f8
ayyoob 7 years ago
parent 7cba43ccbb
commit 5b46dc03a5

@ -122,14 +122,10 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
deviceTypeEvent.setTransportType(TransportType.HTTP); deviceTypeEvent.setTransportType(TransportType.HTTP);
eventReceiverAdminServiceStub = DeviceMgtAPIUtils.getEventReceiverAdminServiceStub(); eventReceiverAdminServiceStub = DeviceMgtAPIUtils.getEventReceiverAdminServiceStub();
EventReceiverConfigurationDto eventReceiverConfigurationDto = eventReceiverAdminServiceStub EventReceiverConfigurationDto eventReceiverConfigurationDto = eventReceiverAdminServiceStub
.getActiveEventReceiverConfiguration(getReceiverName(deviceType, tenantDomain)); .getActiveEventReceiverConfiguration(getReceiverName(deviceType, tenantDomain, TransportType.MQTT));
if (eventReceiverConfigurationDto != null) { if (eventReceiverConfigurationDto != null) {
String eventAdapterType = eventReceiverConfigurationDto.getFromAdapterConfigurationDto()
.getEventAdapterType();
if (OAUTH_MQTT_ADAPTER_TYPE.equals(eventAdapterType)) {
deviceTypeEvent.setTransportType(TransportType.MQTT); deviceTypeEvent.setTransportType(TransportType.MQTT);
} }
}
return Response.ok().entity(deviceTypeEvent).build(); return Response.ok().entity(deviceTypeEvent).build();
} catch (AxisFault e) { } catch (AxisFault e) {
log.error("failed to retrieve event definitions for tenantDomain:" + tenantDomain, e); log.error("failed to retrieve event definitions for tenantDomain:" + tenantDomain, e);
@ -172,11 +168,10 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
log.error(errorMessage); log.error(errorMessage);
return Response.status(Response.Status.BAD_REQUEST).build(); return Response.status(Response.Status.BAD_REQUEST).build();
} }
String eventReceiverName = getReceiverName(deviceType, tenantDomain);
String streamName = DeviceMgtAPIUtils.getStreamDefinition(deviceType, tenantDomain); String streamName = DeviceMgtAPIUtils.getStreamDefinition(deviceType, tenantDomain);
String streamNameWithVersion = streamName + ":" + Constants.DEFAULT_STREAM_VERSION; String streamNameWithVersion = streamName + ":" + Constants.DEFAULT_STREAM_VERSION;
publishStreamDefinitons(streamName, Constants.DEFAULT_STREAM_VERSION, deviceType, eventAttributes); publishStreamDefinitons(streamName, Constants.DEFAULT_STREAM_VERSION, deviceType, eventAttributes);
publishEventReceivers(eventReceiverName, streamNameWithVersion, transportType, tenantDomain, deviceType); publishEventReceivers(streamNameWithVersion, transportType, tenantDomain, deviceType);
publishEventStore(streamName, Constants.DEFAULT_STREAM_VERSION, eventAttributes); publishEventStore(streamName, Constants.DEFAULT_STREAM_VERSION, eventAttributes);
publishWebsocketPublisherDefinition(streamNameWithVersion, deviceType); publishWebsocketPublisherDefinition(streamNameWithVersion, deviceType);
superTenantMode = true; superTenantMode = true;
@ -185,8 +180,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
MultitenantConstants.SUPER_TENANT_DOMAIN_NAME, true); MultitenantConstants.SUPER_TENANT_DOMAIN_NAME, true);
if (!MultitenantConstants.SUPER_TENANT_DOMAIN_NAME.equals(tenantDomain)) { if (!MultitenantConstants.SUPER_TENANT_DOMAIN_NAME.equals(tenantDomain)) {
publishStreamDefinitons(streamName, Constants.DEFAULT_STREAM_VERSION, deviceType, eventAttributes); publishStreamDefinitons(streamName, Constants.DEFAULT_STREAM_VERSION, deviceType, eventAttributes);
publishEventReceivers(eventReceiverName, streamNameWithVersion, transportType, tenantDomain, publishEventReceivers(streamNameWithVersion, transportType, tenantDomain, deviceType);
deviceType);
} }
DeviceMgtAPIUtils.getDynamicEventCache().remove(deviceType); DeviceMgtAPIUtils.getDynamicEventCache().remove(deviceType);
return Response.ok().build(); return Response.ok().build();
@ -234,7 +228,6 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
String errorMessage = "Invalid device type"; String errorMessage = "Invalid device type";
return Response.status(Response.Status.BAD_REQUEST).entity(errorMessage).build(); return Response.status(Response.Status.BAD_REQUEST).entity(errorMessage).build();
} }
String eventReceiverName = getReceiverName(deviceType, tenantDomain);
String eventPublisherName = deviceType.trim().replace(" ", "_") + "_websocket_publisher"; String eventPublisherName = deviceType.trim().replace(" ", "_") + "_websocket_publisher";
String streamName = DeviceMgtAPIUtils.getStreamDefinition(deviceType, tenantDomain); String streamName = DeviceMgtAPIUtils.getStreamDefinition(deviceType, tenantDomain);
eventStreamAdminServiceStub = DeviceMgtAPIUtils.getEventStreamAdminServiceStub(); eventStreamAdminServiceStub = DeviceMgtAPIUtils.getEventStreamAdminServiceStub();
@ -247,7 +240,12 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
EventPublisherAdminServiceCallbackHandler eventPublisherAdminServiceCallbackHandler = EventPublisherAdminServiceCallbackHandler eventPublisherAdminServiceCallbackHandler =
new EventPublisherAdminServiceCallbackHandler() {}; new EventPublisherAdminServiceCallbackHandler() {};
String eventReceiverName = getReceiverName(deviceType, tenantDomain, TransportType.MQTT);
eventReceiverAdminServiceStub = DeviceMgtAPIUtils.getEventReceiverAdminServiceStub(); eventReceiverAdminServiceStub = DeviceMgtAPIUtils.getEventReceiverAdminServiceStub();
if (eventReceiverAdminServiceStub.getInactiveEventReceiverConfigurationContent(eventReceiverName) == null) {
eventReceiverName = getReceiverName(deviceType, tenantDomain, TransportType.HTTP);
}
eventReceiverAdminServiceStub.startundeployInactiveEventReceiverConfiguration(eventReceiverName eventReceiverAdminServiceStub.startundeployInactiveEventReceiverConfiguration(eventReceiverName
, eventReceiverAdminServiceCallbackHandler); , eventReceiverAdminServiceCallbackHandler);
@ -262,10 +260,10 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
if (!MultitenantConstants.SUPER_TENANT_DOMAIN_NAME.equals(tenantDomain)) { if (!MultitenantConstants.SUPER_TENANT_DOMAIN_NAME.equals(tenantDomain)) {
tenantBasedEventReceiverAdminServiceStub = DeviceMgtAPIUtils.getEventReceiverAdminServiceStub(); tenantBasedEventReceiverAdminServiceStub = DeviceMgtAPIUtils.getEventReceiverAdminServiceStub();
tenantBasedEventStreamAdminServiceStub = DeviceMgtAPIUtils.getEventStreamAdminServiceStub(); tenantBasedEventStreamAdminServiceStub = DeviceMgtAPIUtils.getEventStreamAdminServiceStub();
tenantBasedEventStreamAdminServiceStub.removeEventStreamDefinition(streamName, Constants.DEFAULT_STREAM_VERSION); tenantBasedEventStreamAdminServiceStub.removeEventStreamDefinition(streamName,
Constants.DEFAULT_STREAM_VERSION);
tenantBasedEventReceiverAdminServiceStub.startundeployInactiveEventReceiverConfiguration( tenantBasedEventReceiverAdminServiceStub.startundeployInactiveEventReceiverConfiguration(
eventReceiverName eventReceiverName, eventReceiverAdminServiceCallbackHandler);
, eventReceiverAdminServiceCallbackHandler);
} }
return Response.ok().build(); return Response.ok().build();
@ -381,29 +379,23 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
} }
} }
private void publishEventReceivers(String eventRecieverName, String streamNameWithVersion, private void publishEventReceivers(String streamNameWithVersion, TransportType transportType
TransportType transportType, String requestedTenantDomain, , String requestedTenantDomain, String deviceType)
String deviceType)
throws RemoteException, UserStoreException, JWTClientException { throws RemoteException, UserStoreException, JWTClientException {
EventReceiverAdminServiceStub receiverAdminServiceStub = DeviceMgtAPIUtils.getEventReceiverAdminServiceStub(); EventReceiverAdminServiceStub receiverAdminServiceStub = DeviceMgtAPIUtils.getEventReceiverAdminServiceStub();
try { try {
EventReceiverConfigurationDto eventReceiverConfigurationDto = receiverAdminServiceStub TransportType transportTypeToBeRemoved = TransportType.HTTP;
.getActiveEventReceiverConfiguration(getReceiverName(deviceType, requestedTenantDomain));
if (eventReceiverConfigurationDto != null) {
String eventAdapterType = eventReceiverConfigurationDto.getFromAdapterConfigurationDto()
.getEventAdapterType();
if (OAUTH_MQTT_ADAPTER_TYPE.equals(eventAdapterType)) {
if (transportType == TransportType.MQTT) {
return;
}
} else if (THRIFT_ADAPTER_TYPE.equals(eventAdapterType)) {
if (transportType == TransportType.HTTP) { if (transportType == TransportType.HTTP) {
return; transportTypeToBeRemoved = TransportType.MQTT;
}
} }
// remove mqtt event reciever before publishing String eventRecieverNameTobeRemoved = getReceiverName(deviceType, requestedTenantDomain, transportTypeToBeRemoved);
receiverAdminServiceStub.undeployActiveEventReceiverConfiguration(eventRecieverName); EventReceiverConfigurationDto eventReceiverConfigurationDto = receiverAdminServiceStub
.getActiveEventReceiverConfiguration(eventRecieverNameTobeRemoved);
if (eventReceiverConfigurationDto != null) {
EventReceiverAdminServiceCallbackHandler eventReceiverAdminServiceCallbackHandler =
new EventReceiverAdminServiceCallbackHandler() {};
receiverAdminServiceStub.startundeployActiveEventReceiverConfiguration(eventRecieverNameTobeRemoved
, eventReceiverAdminServiceCallbackHandler);
} }
String adapterType = OAUTH_MQTT_ADAPTER_TYPE; String adapterType = OAUTH_MQTT_ADAPTER_TYPE;
@ -421,6 +413,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
basicInputAdapterPropertyDtos = new BasicInputAdapterPropertyDto[1]; basicInputAdapterPropertyDtos = new BasicInputAdapterPropertyDto[1];
basicInputAdapterPropertyDtos[0] = getBasicInputAdapterPropertyDto("events.duplicated.in.cluster", "false"); basicInputAdapterPropertyDtos[0] = getBasicInputAdapterPropertyDto("events.duplicated.in.cluster", "false");
} }
String eventRecieverName = getReceiverName(deviceType, requestedTenantDomain, transportType);
if (receiverAdminServiceStub.getActiveEventReceiverConfiguration(eventRecieverName) == null) { if (receiverAdminServiceStub.getActiveEventReceiverConfiguration(eventRecieverName) == null) {
if (transportType == TransportType.MQTT) { if (transportType == TransportType.MQTT) {
receiverAdminServiceStub.deployJsonEventReceiverConfiguration(eventRecieverName, streamNameWithVersion receiverAdminServiceStub.deployJsonEventReceiverConfiguration(eventRecieverName, streamNameWithVersion
@ -548,8 +541,8 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
return streamName.toUpperCase().replace('.', '_'); return streamName.toUpperCase().replace('.', '_');
} }
private String getReceiverName(String deviceType, String tenantDomain) { private String getReceiverName(String deviceType, String tenantDomain, TransportType transportType) {
return deviceType.replace(" ", "_").trim() + "-" + tenantDomain + "-receiver"; return deviceType.replace(" ", "_").trim() + "-" + tenantDomain + "-" + transportType.toString() + "-receiver";
} }
public static AnalyticsDataAPI getAnalyticsDataAPI() { public static AnalyticsDataAPI getAnalyticsDataAPI() {

Loading…
Cancel
Save