diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/service/api/DeviceEventManagementService.java b/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/service/api/DeviceEventManagementService.java index f4fcadba015..f2413ec828d 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/service/api/DeviceEventManagementService.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/service/api/DeviceEventManagementService.java @@ -288,13 +288,11 @@ public interface DeviceEventManagementService { } ) Response getLastKnownData(@ApiParam(name = "deviceId", value = "id of the device ", required = false) - @PathParam("deviceId") String deviceId, - @ApiParam(name = "type", value = "name of the device type", required = false) - @PathParam("type") String deviceType, - @ApiParam(name = "limit", value = "limit of the records that needs to be picked up", required = false) - @QueryParam("limit") int limit); - - + @PathParam("deviceId") String deviceId, + @ApiParam(name = "type", value = "name of the device type", required = false) + @PathParam("type") String deviceType, + @ApiParam(name = "limit", value = "limit of the records that needs to be picked up", required = false) + @QueryParam("limit") int limit); @GET @Path("/{type}") @@ -348,4 +346,4 @@ public interface DeviceEventManagementService { Response getDeviceTypeEventDefinition(@ApiParam(name = "type", value = "name of the device type", required = false) @PathParam("type")String deviceType) ; -} +} \ No newline at end of file diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/service/impl/DeviceEventManagementServiceImpl.java b/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/service/impl/DeviceEventManagementServiceImpl.java index 0b9827372d2..e2c3c5642ee 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/service/impl/DeviceEventManagementServiceImpl.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.api/src/main/java/org/wso2/carbon/device/mgt/jaxrs/service/impl/DeviceEventManagementServiceImpl.java @@ -76,8 +76,47 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe private static final String MQTT_CONTENT_VALIDATOR = "default"; private static final String TIMESTAMP_FIELD_NAME = "_timestamp"; + private static AnalyticsDataAPI getAnalyticsDataAPI() { + PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext(); + AnalyticsDataAPI analyticsDataAPI = + (AnalyticsDataAPI) ctx.getOSGiService(AnalyticsDataAPI.class, null); + if (analyticsDataAPI == null) { + String msg = "Analytics api service has not initialized."; + log.error(msg); + throw new IllegalStateException(msg); + } + return analyticsDataAPI; + } + + private static EventRecords getAllEventsForDevice(String tableName, String query, List sortByFields + , int offset, int limit) throws AnalyticsException { + int tenantId = CarbonContext.getThreadLocalCarbonContext().getTenantId(); + AnalyticsDataAPI analyticsDataAPI = getAnalyticsDataAPI(); + EventRecords eventRecords = new EventRecords(); + int eventCount = analyticsDataAPI.searchCount(tenantId, tableName, query); + if (eventCount == 0) { + eventRecords.setCount(0); + } + List resultEntries = analyticsDataAPI.search(tenantId, tableName, query, offset, limit, + sortByFields); + List recordIds = getRecordIds(resultEntries); + AnalyticsDataResponse response = analyticsDataAPI.get(tenantId, tableName, 1, null, recordIds); + eventRecords.setCount(eventCount); + eventRecords.setList(AnalyticsDataAPIUtil.listRecords(analyticsDataAPI, response)); + return eventRecords; + } + + private static List getRecordIds(List searchResults) { + List ids = new ArrayList<>(); + for (SearchResultEntry searchResult : searchResults) { + ids.add(searchResult.getId()); + } + return ids; + } + /** * Retrieves the stream definition from das for the given device type. + * * @return dynamic event attribute list */ @GET @@ -196,7 +235,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); } catch (EventStreamPersistenceAdminServiceEventStreamPersistenceAdminServiceExceptionException e) { log.error("Failed to create event store for, tenantDomain: " + tenantDomain + " deviceType" + deviceType, - e); + e); return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); } } @@ -229,9 +268,11 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe } eventStreamAdminServiceStub.removeEventStreamDefinition(streamName, Constants.DEFAULT_STREAM_VERSION); EventReceiverAdminServiceCallbackHandler eventReceiverAdminServiceCallbackHandler = - new EventReceiverAdminServiceCallbackHandler() {}; + new EventReceiverAdminServiceCallbackHandler() { + }; EventPublisherAdminServiceCallbackHandler eventPublisherAdminServiceCallbackHandler = - new EventPublisherAdminServiceCallbackHandler() {}; + new EventPublisherAdminServiceCallbackHandler() { + }; String eventReceiverName = getReceiverName(deviceType, tenantDomain, TransportType.MQTT); @@ -254,7 +295,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe tenantBasedEventReceiverAdminServiceStub = DeviceMgtAPIUtils.getEventReceiverAdminServiceStub(); tenantBasedEventStreamAdminServiceStub = DeviceMgtAPIUtils.getEventStreamAdminServiceStub(); tenantBasedEventStreamAdminServiceStub.removeEventStreamDefinition(streamName, - Constants.DEFAULT_STREAM_VERSION); + Constants.DEFAULT_STREAM_VERSION); tenantBasedEventReceiverAdminServiceStub.startundeployInactiveEventReceiverConfiguration( eventReceiverName, eventReceiverAdminServiceCallbackHandler); @@ -298,7 +339,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe @Override public Response getData(@PathParam("deviceId") String deviceId, @QueryParam("from") long from, @QueryParam("to") long to, @PathParam("type") String deviceType, @QueryParam("offset") - int offset, @QueryParam("limit") int limit) { + int offset, @QueryParam("limit") int limit) { if (from == 0 || to == 0) { String errorMessage = "Invalid values for from/to"; return Response.status(Response.Status.BAD_REQUEST).entity(errorMessage).build(); @@ -345,7 +386,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe @GET @Path("/last-known/{type}/{deviceId}") @Override - public Response getLastKnownData(@PathParam("deviceId") String deviceId, @PathParam("type") String deviceType,@QueryParam("limit") int limit) { + public Response getLastKnownData(@PathParam("deviceId") String deviceId, @PathParam("type") String deviceType, @QueryParam("limit") int limit) { String query = DEFAULT_META_DEVICE_ID_ATTRIBUTE + ":" + deviceId; String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(); String sensorTableName = getTableName(DeviceMgtAPIUtils.getStreamDefinition(deviceType, tenantDomain)); @@ -363,10 +404,10 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe List sortByFields = new ArrayList<>(); SortByField sortByField = new SortByField(TIMESTAMP_FIELD_NAME, SortType.DESC); sortByFields.add(sortByField); - if(limit == 0){ - EventRecords eventRecords = getAllEventsForDevice(sensorTableName, query, sortByFields, 0, 1); - return Response.status(Response.Status.OK.getStatusCode()).entity(eventRecords).build();} - else{ + if (limit == 0) { + EventRecords eventRecords = getAllEventsForDevice(sensorTableName, query, sortByFields, 0, 1); + return Response.status(Response.Status.OK.getStatusCode()).entity(eventRecords).build(); + } else { EventRecords eventRecords = getAllEventsForDevice(sensorTableName, query, sortByFields, 0, limit); return Response.status(Response.Status.OK.getStatusCode()).entity(eventRecords).build(); } @@ -384,7 +425,6 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe } } - private void publishEventReceivers(String streamNameWithVersion, TransportType transportType , String requestedTenantDomain, String deviceType) throws RemoteException, UserStoreException, JWTClientException { @@ -399,7 +439,8 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe .getActiveEventReceiverConfiguration(eventRecieverNameTobeRemoved); if (eventReceiverConfigurationDto != null) { EventReceiverAdminServiceCallbackHandler eventReceiverAdminServiceCallbackHandler = - new EventReceiverAdminServiceCallbackHandler() {}; + new EventReceiverAdminServiceCallbackHandler() { + }; receiverAdminServiceStub.startundeployActiveEventReceiverConfiguration(eventRecieverNameTobeRemoved , eventReceiverAdminServiceCallbackHandler); } @@ -474,7 +515,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe private void publishEventStore(String streamName, String version, EventAttributeList eventAttributes) throws RemoteException, UserStoreException, JWTClientException, - EventStreamPersistenceAdminServiceEventStreamPersistenceAdminServiceExceptionException { + EventStreamPersistenceAdminServiceEventStreamPersistenceAdminServiceExceptionException { EventStreamPersistenceAdminServiceStub eventStreamPersistenceAdminServiceStub = DeviceMgtAPIUtils.getEventStreamPersistenceAdminServiceStub(); try { @@ -545,44 +586,6 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe return deviceType.replace(" ", "_").trim() + "-" + tenantDomain + "-" + transportType.toString() + "-receiver"; } - private static AnalyticsDataAPI getAnalyticsDataAPI() { - PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext(); - AnalyticsDataAPI analyticsDataAPI = - (AnalyticsDataAPI) ctx.getOSGiService(AnalyticsDataAPI.class, null); - if (analyticsDataAPI == null) { - String msg = "Analytics api service has not initialized."; - log.error(msg); - throw new IllegalStateException(msg); - } - return analyticsDataAPI; - } - - private static EventRecords getAllEventsForDevice(String tableName, String query, List sortByFields - , int offset, int limit) throws AnalyticsException { - int tenantId = CarbonContext.getThreadLocalCarbonContext().getTenantId(); - AnalyticsDataAPI analyticsDataAPI = getAnalyticsDataAPI(); - EventRecords eventRecords = new EventRecords(); - int eventCount = analyticsDataAPI.searchCount(tenantId, tableName, query); - if (eventCount == 0) { - eventRecords.setCount(0); - } - List resultEntries = analyticsDataAPI.search(tenantId, tableName, query, offset, limit, - sortByFields); - List recordIds = getRecordIds(resultEntries); - AnalyticsDataResponse response = analyticsDataAPI.get(tenantId, tableName, 1, null, recordIds); - eventRecords.setCount(eventCount); - eventRecords.setList(AnalyticsDataAPIUtil.listRecords(analyticsDataAPI, response)); - return eventRecords; - } - - private static List getRecordIds(List searchResults) { - List ids = new ArrayList<>(); - for (SearchResultEntry searchResult : searchResults) { - ids.add(searchResult.getId()); - } - return ids; - } - private void cleanup(Stub stub) { if (stub != null) { try {