|
|
|
@ -10,8 +10,9 @@ import org.wso2.carbon.analytics.dataservice.commons.AnalyticsDataResponse;
|
|
|
|
|
import org.wso2.carbon.analytics.dataservice.commons.SearchResultEntry;
|
|
|
|
|
import org.wso2.carbon.analytics.dataservice.commons.SortByField;
|
|
|
|
|
import org.wso2.carbon.analytics.dataservice.commons.SortType;
|
|
|
|
|
import org.wso2.carbon.analytics.stream.persistence.stub
|
|
|
|
|
.EventStreamPersistenceAdminServiceEventStreamPersistenceAdminServiceExceptionException;
|
|
|
|
|
import org.wso2.carbon.analytics.datasource.commons.Record;
|
|
|
|
|
import org.wso2.carbon.analytics.datasource.commons.exception.AnalyticsException;
|
|
|
|
|
import org.wso2.carbon.analytics.stream.persistence.stub.EventStreamPersistenceAdminServiceEventStreamPersistenceAdminServiceExceptionException;
|
|
|
|
|
import org.wso2.carbon.analytics.stream.persistence.stub.EventStreamPersistenceAdminServiceStub;
|
|
|
|
|
import org.wso2.carbon.analytics.stream.persistence.stub.dto.AnalyticsTable;
|
|
|
|
|
import org.wso2.carbon.analytics.stream.persistence.stub.dto.AnalyticsTableRecord;
|
|
|
|
@ -21,11 +22,11 @@ import org.wso2.carbon.context.PrivilegedCarbonContext;
|
|
|
|
|
import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
|
|
|
|
|
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
|
|
|
|
|
import org.wso2.carbon.device.mgt.common.authorization.DeviceAccessAuthorizationException;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.DeviceTypeEvent;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.EventRecords;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.Attribute;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.AttributeType;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.DeviceTypeEvent;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.EventAttributeList;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.EventRecords;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.TransportType;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.service.api.DeviceEventManagementService;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.util.Constants;
|
|
|
|
@ -41,7 +42,6 @@ import org.wso2.carbon.event.stream.stub.types.EventStreamAttributeDto;
|
|
|
|
|
import org.wso2.carbon.event.stream.stub.types.EventStreamDefinitionDto;
|
|
|
|
|
import org.wso2.carbon.identity.jwt.client.extension.exception.JWTClientException;
|
|
|
|
|
import org.wso2.carbon.user.api.UserStoreException;
|
|
|
|
|
import org.wso2.carbon.analytics.datasource.commons.exception.AnalyticsException;
|
|
|
|
|
|
|
|
|
|
import javax.validation.Valid;
|
|
|
|
|
import javax.ws.rs.DELETE;
|
|
|
|
@ -53,7 +53,11 @@ import javax.ws.rs.QueryParam;
|
|
|
|
|
import javax.ws.rs.core.Response;
|
|
|
|
|
import java.rmi.RemoteException;
|
|
|
|
|
import java.util.ArrayList;
|
|
|
|
|
import java.util.Calendar;
|
|
|
|
|
import java.util.HashSet;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.Set;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* This is used for device type integration with DAS, to create streams and receiver dynamically and a common endpoint
|
|
|
|
@ -429,6 +433,78 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns the filterd device list. Devices are filterd using the paramter given and the timestamp of the record.
|
|
|
|
|
* parameter should given as a range.
|
|
|
|
|
*/
|
|
|
|
|
@GET
|
|
|
|
|
@Path("filter/{type}/{parameter}")
|
|
|
|
|
@Override
|
|
|
|
|
public Response getFilteredDevices(@PathParam("type") String deviceType, @PathParam("parameter") String parameter,
|
|
|
|
|
@QueryParam("min") double min, @QueryParam("max") double max) {
|
|
|
|
|
String query;
|
|
|
|
|
Calendar c = java.util.Calendar.getInstance();
|
|
|
|
|
long currentTimestamp = c.getTimeInMillis();
|
|
|
|
|
long previousTimestamp = currentTimestamp - 300 * 1000;
|
|
|
|
|
String fromDate = String.valueOf(previousTimestamp);
|
|
|
|
|
String toDate = String.valueOf(currentTimestamp);
|
|
|
|
|
if (min != 0 & max != 0) {
|
|
|
|
|
query = parameter + " : [" + min + " TO " + max + "]" +
|
|
|
|
|
" AND _timestamp : [" + fromDate + " TO " + toDate + "]";
|
|
|
|
|
} else {
|
|
|
|
|
String errorMessage = "The of range values need to be given";
|
|
|
|
|
log.error(errorMessage);
|
|
|
|
|
return Response.status(Response.Status.BAD_REQUEST).build();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
|
String sensorTableName = getTableName(DeviceMgtAPIUtils.getStreamDefinition(deviceType, tenantDomain));
|
|
|
|
|
try {
|
|
|
|
|
if (deviceType == null ||
|
|
|
|
|
!DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(deviceType)) {
|
|
|
|
|
String errorMessage = "Invalid device type";
|
|
|
|
|
log.error(errorMessage);
|
|
|
|
|
return Response.status(Response.Status.BAD_REQUEST).build();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
List<SortByField> sortByFields = new ArrayList<>();
|
|
|
|
|
SortByField sortByField = new SortByField(TIMESTAMP_FIELD_NAME, SortType.DESC);
|
|
|
|
|
sortByFields.add(sortByField);
|
|
|
|
|
EventRecords eventRecords = getAllEventsForDevice(sensorTableName, query, sortByFields, 0, 100);
|
|
|
|
|
List<Record> filterdEvents = eventRecords.getRecord();
|
|
|
|
|
List<Record> uniqueFilterdEvents = new ArrayList<Record>();
|
|
|
|
|
Set<String> devices = new HashSet<>();
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < filterdEvents.size(); i++) {
|
|
|
|
|
String deviceid = (String) filterdEvents.get(i).getValue("meta_deviceId");
|
|
|
|
|
if (!devices.contains(deviceid) && DeviceMgtAPIUtils.getDeviceAccessAuthorizationService().isUserAuthorized(
|
|
|
|
|
new DeviceIdentifier(deviceid, deviceType))) {
|
|
|
|
|
devices.add(deviceid);
|
|
|
|
|
uniqueFilterdEvents.add(filterdEvents.get(i));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
EventRecords filterdRecords = new EventRecords();
|
|
|
|
|
filterdRecords.setList(uniqueFilterdEvents);
|
|
|
|
|
return Response.status(Response.Status.OK.getStatusCode()).entity(filterdRecords).build();
|
|
|
|
|
|
|
|
|
|
} catch (AnalyticsException e) {
|
|
|
|
|
String errorMsg = "Error on retrieving stats on table " + sensorTableName + " with query " + query;
|
|
|
|
|
log.error(errorMsg);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()).entity(errorMsg).build();
|
|
|
|
|
} catch (DeviceManagementException e) {
|
|
|
|
|
String errorMsg = "Error on retrieving stats on table " + sensorTableName + " with query " + query;
|
|
|
|
|
log.error(errorMsg);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()).entity(errorMsg).build();
|
|
|
|
|
} catch (DeviceAccessAuthorizationException e) {
|
|
|
|
|
String errorMsg = "Error on retrieving stats on table " + sensorTableName + " with query " + query;
|
|
|
|
|
log.error(errorMsg);
|
|
|
|
|
return Response.status(Response.Status.UNAUTHORIZED.getStatusCode()).build();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void publishEventReceivers(String streamNameWithVersion, TransportType transportType
|
|
|
|
|
, String requestedTenantDomain, String deviceType)
|
|
|
|
|
throws RemoteException, UserStoreException, JWTClientException {
|
|
|
|
@ -590,6 +666,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
return deviceType.replace(" ", "_").trim() + "-" + tenantDomain + "-" + transportType.toString() + "-receiver";
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void cleanup(Stub stub) {
|
|
|
|
|
if (stub != null) {
|
|
|
|
|
try {
|
|
|
|
|