Merge pull request #1193 from lashanfaliq95/master

Updated the last-known api to be able to return a number of records.
4.x.x
Rasika Perera 7 years ago committed by GitHub
commit 108675261f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -290,10 +290,12 @@ public interface DeviceEventManagementService {
response = ErrorResponse.class) response = ErrorResponse.class)
} }
) )
Response getLastKnownData(@ApiParam(name = "deviceId", value = "The device ID.", required = false) Response getLastKnownData(@ApiParam(name = "deviceId", value = "id of the device ", required = true)
@PathParam("deviceId") String deviceId, @PathParam("deviceId") String deviceId,
@ApiParam(name = "type", value = "The device type, such as android, ios, or windows.", required = false) @ApiParam(name = "type", value = "name of the device type", required = true)
@PathParam("type") String deviceType); @PathParam("type") String deviceType,
@ApiParam(name = "limit", value = "limit of the records that needs to be picked up", required = false)
@QueryParam("limit") int limit);
@GET @GET
@Path("/{type}") @Path("/{type}")

@ -76,8 +76,47 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
private static final String MQTT_CONTENT_VALIDATOR = "default"; private static final String MQTT_CONTENT_VALIDATOR = "default";
private static final String TIMESTAMP_FIELD_NAME = "_timestamp"; private static final String TIMESTAMP_FIELD_NAME = "_timestamp";
private static AnalyticsDataAPI getAnalyticsDataAPI() {
PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext();
AnalyticsDataAPI analyticsDataAPI =
(AnalyticsDataAPI) ctx.getOSGiService(AnalyticsDataAPI.class, null);
if (analyticsDataAPI == null) {
String msg = "Analytics api service has not initialized.";
log.error(msg);
throw new IllegalStateException(msg);
}
return analyticsDataAPI;
}
private static EventRecords getAllEventsForDevice(String tableName, String query, List<SortByField> sortByFields
, int offset, int limit) throws AnalyticsException {
int tenantId = CarbonContext.getThreadLocalCarbonContext().getTenantId();
AnalyticsDataAPI analyticsDataAPI = getAnalyticsDataAPI();
EventRecords eventRecords = new EventRecords();
int eventCount = analyticsDataAPI.searchCount(tenantId, tableName, query);
if (eventCount == 0) {
eventRecords.setCount(0);
}
List<SearchResultEntry> resultEntries = analyticsDataAPI.search(tenantId, tableName, query, offset, limit,
sortByFields);
List<String> recordIds = getRecordIds(resultEntries);
AnalyticsDataResponse response = analyticsDataAPI.get(tenantId, tableName, 1, null, recordIds);
eventRecords.setCount(eventCount);
eventRecords.setList(AnalyticsDataAPIUtil.listRecords(analyticsDataAPI, response));
return eventRecords;
}
private static List<String> getRecordIds(List<SearchResultEntry> searchResults) {
List<String> ids = new ArrayList<>();
for (SearchResultEntry searchResult : searchResults) {
ids.add(searchResult.getId());
}
return ids;
}
/** /**
* Retrieves the stream definition from das for the given device type. * Retrieves the stream definition from das for the given device type.
*
* @return dynamic event attribute list * @return dynamic event attribute list
*/ */
@GET @GET
@ -196,7 +235,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} catch (EventStreamPersistenceAdminServiceEventStreamPersistenceAdminServiceExceptionException e) { } catch (EventStreamPersistenceAdminServiceEventStreamPersistenceAdminServiceExceptionException e) {
log.error("Failed to create event store for, tenantDomain: " + tenantDomain + " deviceType" + deviceType, log.error("Failed to create event store for, tenantDomain: " + tenantDomain + " deviceType" + deviceType,
e); e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} }
} }
@ -229,9 +268,11 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
} }
eventStreamAdminServiceStub.removeEventStreamDefinition(streamName, Constants.DEFAULT_STREAM_VERSION); eventStreamAdminServiceStub.removeEventStreamDefinition(streamName, Constants.DEFAULT_STREAM_VERSION);
EventReceiverAdminServiceCallbackHandler eventReceiverAdminServiceCallbackHandler = EventReceiverAdminServiceCallbackHandler eventReceiverAdminServiceCallbackHandler =
new EventReceiverAdminServiceCallbackHandler() {}; new EventReceiverAdminServiceCallbackHandler() {
};
EventPublisherAdminServiceCallbackHandler eventPublisherAdminServiceCallbackHandler = EventPublisherAdminServiceCallbackHandler eventPublisherAdminServiceCallbackHandler =
new EventPublisherAdminServiceCallbackHandler() {}; new EventPublisherAdminServiceCallbackHandler() {
};
String eventReceiverName = getReceiverName(deviceType, tenantDomain, TransportType.MQTT); String eventReceiverName = getReceiverName(deviceType, tenantDomain, TransportType.MQTT);
@ -254,7 +295,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
tenantBasedEventReceiverAdminServiceStub = DeviceMgtAPIUtils.getEventReceiverAdminServiceStub(); tenantBasedEventReceiverAdminServiceStub = DeviceMgtAPIUtils.getEventReceiverAdminServiceStub();
tenantBasedEventStreamAdminServiceStub = DeviceMgtAPIUtils.getEventStreamAdminServiceStub(); tenantBasedEventStreamAdminServiceStub = DeviceMgtAPIUtils.getEventStreamAdminServiceStub();
tenantBasedEventStreamAdminServiceStub.removeEventStreamDefinition(streamName, tenantBasedEventStreamAdminServiceStub.removeEventStreamDefinition(streamName,
Constants.DEFAULT_STREAM_VERSION); Constants.DEFAULT_STREAM_VERSION);
tenantBasedEventReceiverAdminServiceStub.startundeployInactiveEventReceiverConfiguration( tenantBasedEventReceiverAdminServiceStub.startundeployInactiveEventReceiverConfiguration(
eventReceiverName, eventReceiverAdminServiceCallbackHandler); eventReceiverName, eventReceiverAdminServiceCallbackHandler);
@ -298,7 +339,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
@Override @Override
public Response getData(@PathParam("deviceId") String deviceId, @QueryParam("from") long from, public Response getData(@PathParam("deviceId") String deviceId, @QueryParam("from") long from,
@QueryParam("to") long to, @PathParam("type") String deviceType, @QueryParam("offset") @QueryParam("to") long to, @PathParam("type") String deviceType, @QueryParam("offset")
int offset, @QueryParam("limit") int limit) { int offset, @QueryParam("limit") int limit) {
if (from == 0 || to == 0) { if (from == 0 || to == 0) {
String errorMessage = "Invalid values for from/to"; String errorMessage = "Invalid values for from/to";
return Response.status(Response.Status.BAD_REQUEST).entity(errorMessage).build(); return Response.status(Response.Status.BAD_REQUEST).entity(errorMessage).build();
@ -340,12 +381,13 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
} }
/** /**
* Returns the last know data point of the device type. * Returns last known data points up to the limit if limit is specified. Otherwise returns last known data point.
* Limit parameter needs to be zero or positive.
*/ */
@GET @GET
@Path("/last-known/{type}/{deviceId}") @Path("/last-known/{type}/{deviceId}")
@Override @Override
public Response getLastKnownData(@PathParam("deviceId") String deviceId, @PathParam("type") String deviceType) { public Response getLastKnownData(@PathParam("deviceId") String deviceId, @PathParam("type") String deviceType, @QueryParam("limit") int limit) {
String query = DEFAULT_META_DEVICE_ID_ATTRIBUTE + ":" + deviceId; String query = DEFAULT_META_DEVICE_ID_ATTRIBUTE + ":" + deviceId;
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(); String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
String sensorTableName = getTableName(DeviceMgtAPIUtils.getStreamDefinition(deviceType, tenantDomain)); String sensorTableName = getTableName(DeviceMgtAPIUtils.getStreamDefinition(deviceType, tenantDomain));
@ -363,8 +405,16 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
List<SortByField> sortByFields = new ArrayList<>(); List<SortByField> sortByFields = new ArrayList<>();
SortByField sortByField = new SortByField(TIMESTAMP_FIELD_NAME, SortType.DESC); SortByField sortByField = new SortByField(TIMESTAMP_FIELD_NAME, SortType.DESC);
sortByFields.add(sortByField); sortByFields.add(sortByField);
EventRecords eventRecords = getAllEventsForDevice(sensorTableName, query, sortByFields, 0, 1); if (limit == 0) {
return Response.status(Response.Status.OK.getStatusCode()).entity(eventRecords).build(); EventRecords eventRecords = getAllEventsForDevice(sensorTableName, query, sortByFields, 0, 1);
return Response.status(Response.Status.OK.getStatusCode()).entity(eventRecords).build();
} else if (limit > 0) {
EventRecords eventRecords = getAllEventsForDevice(sensorTableName, query, sortByFields, 0, limit);
return Response.status(Response.Status.OK.getStatusCode()).entity(eventRecords).build();
} else {
String errorMessage = "Invalid limit value";
return Response.status(Response.Status.BAD_REQUEST).entity(errorMessage).build();
}
} catch (AnalyticsException e) { } catch (AnalyticsException e) {
String errorMsg = "Error on retrieving stats on table " + sensorTableName + " with query " + query; String errorMsg = "Error on retrieving stats on table " + sensorTableName + " with query " + query;
log.error(errorMsg); log.error(errorMsg);
@ -393,7 +443,8 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
.getActiveEventReceiverConfiguration(eventRecieverNameTobeRemoved); .getActiveEventReceiverConfiguration(eventRecieverNameTobeRemoved);
if (eventReceiverConfigurationDto != null) { if (eventReceiverConfigurationDto != null) {
EventReceiverAdminServiceCallbackHandler eventReceiverAdminServiceCallbackHandler = EventReceiverAdminServiceCallbackHandler eventReceiverAdminServiceCallbackHandler =
new EventReceiverAdminServiceCallbackHandler() {}; new EventReceiverAdminServiceCallbackHandler() {
};
receiverAdminServiceStub.startundeployActiveEventReceiverConfiguration(eventRecieverNameTobeRemoved receiverAdminServiceStub.startundeployActiveEventReceiverConfiguration(eventRecieverNameTobeRemoved
, eventReceiverAdminServiceCallbackHandler); , eventReceiverAdminServiceCallbackHandler);
} }
@ -468,7 +519,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
private void publishEventStore(String streamName, String version, EventAttributeList eventAttributes) private void publishEventStore(String streamName, String version, EventAttributeList eventAttributes)
throws RemoteException, UserStoreException, JWTClientException, throws RemoteException, UserStoreException, JWTClientException,
EventStreamPersistenceAdminServiceEventStreamPersistenceAdminServiceExceptionException { EventStreamPersistenceAdminServiceEventStreamPersistenceAdminServiceExceptionException {
EventStreamPersistenceAdminServiceStub eventStreamPersistenceAdminServiceStub = EventStreamPersistenceAdminServiceStub eventStreamPersistenceAdminServiceStub =
DeviceMgtAPIUtils.getEventStreamPersistenceAdminServiceStub(); DeviceMgtAPIUtils.getEventStreamPersistenceAdminServiceStub();
try { try {
@ -539,44 +590,6 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
return deviceType.replace(" ", "_").trim() + "-" + tenantDomain + "-" + transportType.toString() + "-receiver"; return deviceType.replace(" ", "_").trim() + "-" + tenantDomain + "-" + transportType.toString() + "-receiver";
} }
private static AnalyticsDataAPI getAnalyticsDataAPI() {
PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext();
AnalyticsDataAPI analyticsDataAPI =
(AnalyticsDataAPI) ctx.getOSGiService(AnalyticsDataAPI.class, null);
if (analyticsDataAPI == null) {
String msg = "Analytics api service has not initialized.";
log.error(msg);
throw new IllegalStateException(msg);
}
return analyticsDataAPI;
}
private static EventRecords getAllEventsForDevice(String tableName, String query, List<SortByField> sortByFields
, int offset, int limit) throws AnalyticsException {
int tenantId = CarbonContext.getThreadLocalCarbonContext().getTenantId();
AnalyticsDataAPI analyticsDataAPI = getAnalyticsDataAPI();
EventRecords eventRecords = new EventRecords();
int eventCount = analyticsDataAPI.searchCount(tenantId, tableName, query);
if (eventCount == 0) {
eventRecords.setCount(0);
}
List<SearchResultEntry> resultEntries = analyticsDataAPI.search(tenantId, tableName, query, offset, limit,
sortByFields);
List<String> recordIds = getRecordIds(resultEntries);
AnalyticsDataResponse response = analyticsDataAPI.get(tenantId, tableName, 1, null, recordIds);
eventRecords.setCount(eventCount);
eventRecords.setList(AnalyticsDataAPIUtil.listRecords(analyticsDataAPI, response));
return eventRecords;
}
private static List<String> getRecordIds(List<SearchResultEntry> searchResults) {
List<String> ids = new ArrayList<>();
for (SearchResultEntry searchResult : searchResults) {
ids.add(searchResult.getId());
}
return ids;
}
private void cleanup(Stub stub) { private void cleanup(Stub stub) {
if (stub != null) { if (stub != null) {
try { try {

Loading…
Cancel
Save