diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/service/impl/DeviceEventManagementServiceImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/service/impl/DeviceEventManagementServiceImpl.java index cff753a4615..0e804eb95da 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/service/impl/DeviceEventManagementServiceImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/service/impl/DeviceEventManagementServiceImpl.java @@ -122,13 +122,9 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe deviceTypeEvent.setTransportType(TransportType.HTTP); eventReceiverAdminServiceStub = DeviceMgtAPIUtils.getEventReceiverAdminServiceStub(); EventReceiverConfigurationDto eventReceiverConfigurationDto = eventReceiverAdminServiceStub - .getActiveEventReceiverConfiguration(getReceiverName(deviceType, tenantDomain)); + .getActiveEventReceiverConfiguration(getReceiverName(deviceType, tenantDomain, TransportType.MQTT)); if (eventReceiverConfigurationDto != null) { - String eventAdapterType = eventReceiverConfigurationDto.getFromAdapterConfigurationDto() - .getEventAdapterType(); - if (OAUTH_MQTT_ADAPTER_TYPE.equals(eventAdapterType)) { - deviceTypeEvent.setTransportType(TransportType.MQTT); - } + deviceTypeEvent.setTransportType(TransportType.MQTT); } return Response.ok().entity(deviceTypeEvent).build(); } catch (AxisFault e) { @@ -172,11 +168,10 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe log.error(errorMessage); return Response.status(Response.Status.BAD_REQUEST).build(); } - String eventReceiverName = getReceiverName(deviceType, tenantDomain); String streamName = DeviceMgtAPIUtils.getStreamDefinition(deviceType, tenantDomain); String streamNameWithVersion = streamName + ":" + Constants.DEFAULT_STREAM_VERSION; publishStreamDefinitons(streamName, Constants.DEFAULT_STREAM_VERSION, deviceType, eventAttributes); - publishEventReceivers(eventReceiverName, streamNameWithVersion, transportType, tenantDomain, deviceType); + publishEventReceivers(streamNameWithVersion, transportType, tenantDomain, deviceType); publishEventStore(streamName, Constants.DEFAULT_STREAM_VERSION, eventAttributes); publishWebsocketPublisherDefinition(streamNameWithVersion, deviceType); superTenantMode = true; @@ -185,8 +180,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe MultitenantConstants.SUPER_TENANT_DOMAIN_NAME, true); if (!MultitenantConstants.SUPER_TENANT_DOMAIN_NAME.equals(tenantDomain)) { publishStreamDefinitons(streamName, Constants.DEFAULT_STREAM_VERSION, deviceType, eventAttributes); - publishEventReceivers(eventReceiverName, streamNameWithVersion, transportType, tenantDomain, - deviceType); + publishEventReceivers(streamNameWithVersion, transportType, tenantDomain, deviceType); } DeviceMgtAPIUtils.getDynamicEventCache().remove(deviceType); return Response.ok().build(); @@ -234,7 +228,6 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe String errorMessage = "Invalid device type"; return Response.status(Response.Status.BAD_REQUEST).entity(errorMessage).build(); } - String eventReceiverName = getReceiverName(deviceType, tenantDomain); String eventPublisherName = deviceType.trim().replace(" ", "_") + "_websocket_publisher"; String streamName = DeviceMgtAPIUtils.getStreamDefinition(deviceType, tenantDomain); eventStreamAdminServiceStub = DeviceMgtAPIUtils.getEventStreamAdminServiceStub(); @@ -247,7 +240,12 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe EventPublisherAdminServiceCallbackHandler eventPublisherAdminServiceCallbackHandler = new EventPublisherAdminServiceCallbackHandler() {}; + + String eventReceiverName = getReceiverName(deviceType, tenantDomain, TransportType.MQTT); eventReceiverAdminServiceStub = DeviceMgtAPIUtils.getEventReceiverAdminServiceStub(); + if (eventReceiverAdminServiceStub.getInactiveEventReceiverConfigurationContent(eventReceiverName) == null) { + eventReceiverName = getReceiverName(deviceType, tenantDomain, TransportType.HTTP); + } eventReceiverAdminServiceStub.startundeployInactiveEventReceiverConfiguration(eventReceiverName , eventReceiverAdminServiceCallbackHandler); @@ -262,10 +260,10 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe if (!MultitenantConstants.SUPER_TENANT_DOMAIN_NAME.equals(tenantDomain)) { tenantBasedEventReceiverAdminServiceStub = DeviceMgtAPIUtils.getEventReceiverAdminServiceStub(); tenantBasedEventStreamAdminServiceStub = DeviceMgtAPIUtils.getEventStreamAdminServiceStub(); - tenantBasedEventStreamAdminServiceStub.removeEventStreamDefinition(streamName, Constants.DEFAULT_STREAM_VERSION); + tenantBasedEventStreamAdminServiceStub.removeEventStreamDefinition(streamName, + Constants.DEFAULT_STREAM_VERSION); tenantBasedEventReceiverAdminServiceStub.startundeployInactiveEventReceiverConfiguration( - eventReceiverName - , eventReceiverAdminServiceCallbackHandler); + eventReceiverName, eventReceiverAdminServiceCallbackHandler); } return Response.ok().build(); @@ -381,29 +379,23 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe } } - private void publishEventReceivers(String eventRecieverName, String streamNameWithVersion, - TransportType transportType, String requestedTenantDomain, - String deviceType) + private void publishEventReceivers(String streamNameWithVersion, TransportType transportType + , String requestedTenantDomain, String deviceType) throws RemoteException, UserStoreException, JWTClientException { EventReceiverAdminServiceStub receiverAdminServiceStub = DeviceMgtAPIUtils.getEventReceiverAdminServiceStub(); try { + TransportType transportTypeToBeRemoved = TransportType.HTTP; + if (transportType == TransportType.HTTP) { + transportTypeToBeRemoved = TransportType.MQTT; + } + String eventRecieverNameTobeRemoved = getReceiverName(deviceType, requestedTenantDomain, transportTypeToBeRemoved); EventReceiverConfigurationDto eventReceiverConfigurationDto = receiverAdminServiceStub - .getActiveEventReceiverConfiguration(getReceiverName(deviceType, requestedTenantDomain)); + .getActiveEventReceiverConfiguration(eventRecieverNameTobeRemoved); if (eventReceiverConfigurationDto != null) { - String eventAdapterType = eventReceiverConfigurationDto.getFromAdapterConfigurationDto() - .getEventAdapterType(); - if (OAUTH_MQTT_ADAPTER_TYPE.equals(eventAdapterType)) { - if (transportType == TransportType.MQTT) { - return; - } - - } else if (THRIFT_ADAPTER_TYPE.equals(eventAdapterType)) { - if (transportType == TransportType.HTTP) { - return; - } - } - // remove mqtt event reciever before publishing - receiverAdminServiceStub.undeployActiveEventReceiverConfiguration(eventRecieverName); + EventReceiverAdminServiceCallbackHandler eventReceiverAdminServiceCallbackHandler = + new EventReceiverAdminServiceCallbackHandler() {}; + receiverAdminServiceStub.startundeployActiveEventReceiverConfiguration(eventRecieverNameTobeRemoved + , eventReceiverAdminServiceCallbackHandler); } String adapterType = OAUTH_MQTT_ADAPTER_TYPE; @@ -421,6 +413,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe basicInputAdapterPropertyDtos = new BasicInputAdapterPropertyDto[1]; basicInputAdapterPropertyDtos[0] = getBasicInputAdapterPropertyDto("events.duplicated.in.cluster", "false"); } + String eventRecieverName = getReceiverName(deviceType, requestedTenantDomain, transportType); if (receiverAdminServiceStub.getActiveEventReceiverConfiguration(eventRecieverName) == null) { if (transportType == TransportType.MQTT) { receiverAdminServiceStub.deployJsonEventReceiverConfiguration(eventRecieverName, streamNameWithVersion @@ -548,8 +541,8 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe return streamName.toUpperCase().replace('.', '_'); } - private String getReceiverName(String deviceType, String tenantDomain) { - return deviceType.replace(" ", "_").trim() + "-" + tenantDomain + "-receiver"; + private String getReceiverName(String deviceType, String tenantDomain, TransportType transportType) { + return deviceType.replace(" ", "_").trim() + "-" + tenantDomain + "-" + transportType.toString() + "-receiver"; } public static AnalyticsDataAPI getAnalyticsDataAPI() {