|
|
|
@ -4,18 +4,18 @@ import org.apache.axis2.AxisFault;
|
|
|
|
|
import org.apache.axis2.client.Stub;
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
|
|
import org.wso2.carbon.analytics.api.AnalyticsDataAPI;
|
|
|
|
|
import org.wso2.carbon.analytics.api.AnalyticsDataAPIUtil;
|
|
|
|
|
import org.wso2.carbon.analytics.dataservice.commons.AnalyticsDataResponse;
|
|
|
|
|
import org.wso2.carbon.analytics.dataservice.commons.SearchResultEntry;
|
|
|
|
|
import org.wso2.carbon.analytics.dataservice.commons.SortByField;
|
|
|
|
|
import org.wso2.carbon.analytics.dataservice.commons.SortType;
|
|
|
|
|
import org.wso2.carbon.analytics.datasource.commons.Record;
|
|
|
|
|
import org.wso2.carbon.analytics.datasource.commons.exception.AnalyticsException;
|
|
|
|
|
import org.wso2.carbon.analytics.stream.persistence.stub.EventStreamPersistenceAdminServiceEventStreamPersistenceAdminServiceExceptionException;
|
|
|
|
|
import org.wso2.carbon.analytics.stream.persistence.stub.EventStreamPersistenceAdminServiceStub;
|
|
|
|
|
import org.wso2.carbon.analytics.stream.persistence.stub.dto.AnalyticsTable;
|
|
|
|
|
import org.wso2.carbon.analytics.stream.persistence.stub.dto.AnalyticsTableRecord;
|
|
|
|
|
//import org.wso2.carbon.analytics.api.AnalyticsDataAPI;
|
|
|
|
|
//import org.wso2.carbon.analytics.api.AnalyticsDataAPIUtil;
|
|
|
|
|
//import org.wso2.carbon.analytics.dataservice.commons.AnalyticsDataResponse;
|
|
|
|
|
//import org.wso2.carbon.analytics.dataservice.commons.SearchResultEntry;
|
|
|
|
|
//import org.wso2.carbon.analytics.dataservice.commons.SortByField;
|
|
|
|
|
//import org.wso2.carbon.analytics.dataservice.commons.SortType;
|
|
|
|
|
//import org.wso2.carbon.analytics.datasource.commons.Record;
|
|
|
|
|
//import org.wso2.carbon.analytics.datasource.commons.exception.AnalyticsException;
|
|
|
|
|
//import org.wso2.carbon.analytics.stream.persistence.stub.EventStreamPersistenceAdminServiceEventStreamPersistenceAdminServiceExceptionException;
|
|
|
|
|
//import org.wso2.carbon.analytics.stream.persistence.stub.EventStreamPersistenceAdminServiceStub;
|
|
|
|
|
//import org.wso2.carbon.analytics.stream.persistence.stub.dto.AnalyticsTable;
|
|
|
|
|
//import org.wso2.carbon.analytics.stream.persistence.stub.dto.AnalyticsTableRecord;
|
|
|
|
|
import org.wso2.carbon.base.MultitenantConstants;
|
|
|
|
|
import org.wso2.carbon.context.CarbonContext;
|
|
|
|
|
import org.wso2.carbon.context.PrivilegedCarbonContext;
|
|
|
|
@ -81,49 +81,49 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
private static final String MQTT_CONTENT_VALIDATOR = "default";
|
|
|
|
|
private static final String TIMESTAMP_FIELD_NAME = "_timestamp";
|
|
|
|
|
|
|
|
|
|
private static AnalyticsDataAPI getAnalyticsDataAPI() {
|
|
|
|
|
PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext();
|
|
|
|
|
AnalyticsDataAPI analyticsDataAPI =
|
|
|
|
|
(AnalyticsDataAPI) ctx.getOSGiService(AnalyticsDataAPI.class, null);
|
|
|
|
|
if (analyticsDataAPI == null) {
|
|
|
|
|
String msg = "Analytics api service has not initialized.";
|
|
|
|
|
log.error(msg);
|
|
|
|
|
throw new IllegalStateException(msg);
|
|
|
|
|
}
|
|
|
|
|
return analyticsDataAPI;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static EventRecords getAllEventsForDevice(String tableName, String query, List<SortByField> sortByFields
|
|
|
|
|
, int offset, int limit) throws AnalyticsException {
|
|
|
|
|
int tenantId = CarbonContext.getThreadLocalCarbonContext().getTenantId();
|
|
|
|
|
AnalyticsDataAPI analyticsDataAPI = getAnalyticsDataAPI();
|
|
|
|
|
EventRecords eventRecords = new EventRecords();
|
|
|
|
|
int eventCount = analyticsDataAPI.searchCount(tenantId, tableName, query);
|
|
|
|
|
if (eventCount == 0) {
|
|
|
|
|
eventRecords.setCount(0);
|
|
|
|
|
}
|
|
|
|
|
List<SearchResultEntry> resultEntries = analyticsDataAPI.search(tenantId, tableName, query, offset, limit,
|
|
|
|
|
sortByFields);
|
|
|
|
|
List<String> recordIds = getRecordIds(resultEntries);
|
|
|
|
|
AnalyticsDataResponse response = analyticsDataAPI.get(tenantId, tableName, 1, null, recordIds);
|
|
|
|
|
eventRecords.setCount(eventCount);
|
|
|
|
|
List<Record> records = AnalyticsDataAPIUtil.listRecords(analyticsDataAPI, response);
|
|
|
|
|
records.sort(new Comparator<Record>() {
|
|
|
|
|
@Override public int compare(Record r1, Record r2) {
|
|
|
|
|
return Long.compare(r2.getTimestamp(), r1.getTimestamp());
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
eventRecords.setList(records);
|
|
|
|
|
return eventRecords;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static List<String> getRecordIds(List<SearchResultEntry> searchResults) {
|
|
|
|
|
List<String> ids = new ArrayList<>();
|
|
|
|
|
for (SearchResultEntry searchResult : searchResults) {
|
|
|
|
|
ids.add(searchResult.getId());
|
|
|
|
|
}
|
|
|
|
|
return ids;
|
|
|
|
|
}
|
|
|
|
|
// private static AnalyticsDataAPI getAnalyticsDataAPI() {
|
|
|
|
|
// PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext();
|
|
|
|
|
// AnalyticsDataAPI analyticsDataAPI =
|
|
|
|
|
// (AnalyticsDataAPI) ctx.getOSGiService(AnalyticsDataAPI.class, null);
|
|
|
|
|
// if (analyticsDataAPI == null) {
|
|
|
|
|
// String msg = "Analytics api service has not initialized.";
|
|
|
|
|
// log.error(msg);
|
|
|
|
|
// throw new IllegalStateException(msg);
|
|
|
|
|
// }
|
|
|
|
|
// return analyticsDataAPI;
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// private static EventRecords getAllEventsForDevice(String tableName, String query, List<SortByField> sortByFields
|
|
|
|
|
// , int offset, int limit) throws AnalyticsException {
|
|
|
|
|
// int tenantId = CarbonContext.getThreadLocalCarbonContext().getTenantId();
|
|
|
|
|
// AnalyticsDataAPI analyticsDataAPI = getAnalyticsDataAPI();
|
|
|
|
|
// EventRecords eventRecords = new EventRecords();
|
|
|
|
|
// int eventCount = analyticsDataAPI.searchCount(tenantId, tableName, query);
|
|
|
|
|
// if (eventCount == 0) {
|
|
|
|
|
// eventRecords.setCount(0);
|
|
|
|
|
// }
|
|
|
|
|
// List<SearchResultEntry> resultEntries = analyticsDataAPI.search(tenantId, tableName, query, offset, limit,
|
|
|
|
|
// sortByFields);
|
|
|
|
|
// List<String> recordIds = getRecordIds(resultEntries);
|
|
|
|
|
// AnalyticsDataResponse response = analyticsDataAPI.get(tenantId, tableName, 1, null, recordIds);
|
|
|
|
|
// eventRecords.setCount(eventCount);
|
|
|
|
|
// List<Record> records = AnalyticsDataAPIUtil.listRecords(analyticsDataAPI, response);
|
|
|
|
|
// records.sort(new Comparator<Record>() {
|
|
|
|
|
// @Override public int compare(Record r1, Record r2) {
|
|
|
|
|
// return Long.compare(r2.getTimestamp(), r1.getTimestamp());
|
|
|
|
|
// }
|
|
|
|
|
// });
|
|
|
|
|
// eventRecords.setList(records);
|
|
|
|
|
// return eventRecords;
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// private static List<String> getRecordIds(List<SearchResultEntry> searchResults) {
|
|
|
|
|
// List<String> ids = new ArrayList<>();
|
|
|
|
|
// for (SearchResultEntry searchResult : searchResults) {
|
|
|
|
|
// ids.add(searchResult.getId());
|
|
|
|
|
// }
|
|
|
|
|
// return ids;
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Retrieves the stream definition from das for the given device type.
|
|
|
|
@ -195,65 +195,65 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
/**
|
|
|
|
|
* Deploy Event Stream, Receiver, Publisher and Store Configuration.
|
|
|
|
|
*/
|
|
|
|
|
@POST
|
|
|
|
|
@Path("/{type}")
|
|
|
|
|
@Override
|
|
|
|
|
public Response deployDeviceTypeEventDefinition(@PathParam("type") String deviceType,
|
|
|
|
|
@QueryParam("skipPersist") boolean skipPersist,
|
|
|
|
|
@QueryParam("isSharedWithAllTenants") boolean isSharedWithAllTenants,
|
|
|
|
|
@Valid DeviceTypeEvent deviceTypeEvent) {
|
|
|
|
|
TransportType transportType = deviceTypeEvent.getTransportType();
|
|
|
|
|
EventAttributeList eventAttributes = deviceTypeEvent.getEventAttributeList();
|
|
|
|
|
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
|
try {
|
|
|
|
|
if (eventAttributes == null || eventAttributes.getList() == null || eventAttributes.getList().size() == 0 ||
|
|
|
|
|
deviceType == null || transportType == null ||
|
|
|
|
|
!DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(deviceType)) {
|
|
|
|
|
String errorMessage = "Invalid Payload";
|
|
|
|
|
log.error(errorMessage);
|
|
|
|
|
return Response.status(Response.Status.BAD_REQUEST).build();
|
|
|
|
|
}
|
|
|
|
|
String streamName = DeviceMgtAPIUtils.getStreamDefinition(deviceType, tenantDomain);
|
|
|
|
|
String streamNameWithVersion = streamName + ":" + Constants.DEFAULT_STREAM_VERSION;
|
|
|
|
|
publishStreamDefinitons(streamName, Constants.DEFAULT_STREAM_VERSION, deviceType, eventAttributes);
|
|
|
|
|
publishEventReceivers(streamNameWithVersion, transportType, tenantDomain, isSharedWithAllTenants, deviceType);
|
|
|
|
|
if (!skipPersist) {
|
|
|
|
|
publishEventStore(streamName, Constants.DEFAULT_STREAM_VERSION, eventAttributes);
|
|
|
|
|
}
|
|
|
|
|
publishWebsocketPublisherDefinition(streamNameWithVersion, deviceType);
|
|
|
|
|
try {
|
|
|
|
|
PrivilegedCarbonContext.startTenantFlow();
|
|
|
|
|
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(
|
|
|
|
|
MultitenantConstants.SUPER_TENANT_DOMAIN_NAME, true);
|
|
|
|
|
if (!MultitenantConstants.SUPER_TENANT_DOMAIN_NAME.equals(tenantDomain)) {
|
|
|
|
|
publishStreamDefinitons(streamName, Constants.DEFAULT_STREAM_VERSION, deviceType, eventAttributes);
|
|
|
|
|
publishEventReceivers(streamNameWithVersion, transportType, tenantDomain, isSharedWithAllTenants, deviceType);
|
|
|
|
|
}
|
|
|
|
|
} finally {
|
|
|
|
|
PrivilegedCarbonContext.endTenantFlow();
|
|
|
|
|
}
|
|
|
|
|
return Response.ok().build();
|
|
|
|
|
} catch (AxisFault e) {
|
|
|
|
|
log.error("Failed to create event definitions for tenantDomain:" + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
} catch (RemoteException e) {
|
|
|
|
|
log.error("Failed to connect with the remote services:" + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
} catch (JWTClientException e) {
|
|
|
|
|
log.error("Failed to generate jwt token for tenantDomain:" + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
} catch (UserStoreException e) {
|
|
|
|
|
log.error("Failed to connect with the user store, tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
} catch (DeviceManagementException e) {
|
|
|
|
|
log.error("Failed to access device management service, tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
} catch (EventStreamPersistenceAdminServiceEventStreamPersistenceAdminServiceExceptionException e) {
|
|
|
|
|
log.error("Failed to create event store for, tenantDomain: " + tenantDomain + " deviceType" + deviceType,
|
|
|
|
|
e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// @POST
|
|
|
|
|
// @Path("/{type}")
|
|
|
|
|
// @Override
|
|
|
|
|
// public Response deployDeviceTypeEventDefinition(@PathParam("type") String deviceType,
|
|
|
|
|
// @QueryParam("skipPersist") boolean skipPersist,
|
|
|
|
|
// @QueryParam("isSharedWithAllTenants") boolean isSharedWithAllTenants,
|
|
|
|
|
// @Valid DeviceTypeEvent deviceTypeEvent) {
|
|
|
|
|
// TransportType transportType = deviceTypeEvent.getTransportType();
|
|
|
|
|
// EventAttributeList eventAttributes = deviceTypeEvent.getEventAttributeList();
|
|
|
|
|
// String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
|
// try {
|
|
|
|
|
// if (eventAttributes == null || eventAttributes.getList() == null || eventAttributes.getList().size() == 0 ||
|
|
|
|
|
// deviceType == null || transportType == null ||
|
|
|
|
|
// !DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(deviceType)) {
|
|
|
|
|
// String errorMessage = "Invalid Payload";
|
|
|
|
|
// log.error(errorMessage);
|
|
|
|
|
// return Response.status(Response.Status.BAD_REQUEST).build();
|
|
|
|
|
// }
|
|
|
|
|
// String streamName = DeviceMgtAPIUtils.getStreamDefinition(deviceType, tenantDomain);
|
|
|
|
|
// String streamNameWithVersion = streamName + ":" + Constants.DEFAULT_STREAM_VERSION;
|
|
|
|
|
// publishStreamDefinitons(streamName, Constants.DEFAULT_STREAM_VERSION, deviceType, eventAttributes);
|
|
|
|
|
// publishEventReceivers(streamNameWithVersion, transportType, tenantDomain, isSharedWithAllTenants, deviceType);
|
|
|
|
|
// if (!skipPersist) {
|
|
|
|
|
// publishEventStore(streamName, Constants.DEFAULT_STREAM_VERSION, eventAttributes);
|
|
|
|
|
// }
|
|
|
|
|
// publishWebsocketPublisherDefinition(streamNameWithVersion, deviceType);
|
|
|
|
|
// try {
|
|
|
|
|
// PrivilegedCarbonContext.startTenantFlow();
|
|
|
|
|
// PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(
|
|
|
|
|
// MultitenantConstants.SUPER_TENANT_DOMAIN_NAME, true);
|
|
|
|
|
// if (!MultitenantConstants.SUPER_TENANT_DOMAIN_NAME.equals(tenantDomain)) {
|
|
|
|
|
// publishStreamDefinitons(streamName, Constants.DEFAULT_STREAM_VERSION, deviceType, eventAttributes);
|
|
|
|
|
// publishEventReceivers(streamNameWithVersion, transportType, tenantDomain, isSharedWithAllTenants, deviceType);
|
|
|
|
|
// }
|
|
|
|
|
// } finally {
|
|
|
|
|
// PrivilegedCarbonContext.endTenantFlow();
|
|
|
|
|
// }
|
|
|
|
|
// return Response.ok().build();
|
|
|
|
|
// } catch (AxisFault e) {
|
|
|
|
|
// log.error("Failed to create event definitions for tenantDomain:" + tenantDomain, e);
|
|
|
|
|
// return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
// } catch (RemoteException e) {
|
|
|
|
|
// log.error("Failed to connect with the remote services:" + tenantDomain, e);
|
|
|
|
|
// return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
// } catch (JWTClientException e) {
|
|
|
|
|
// log.error("Failed to generate jwt token for tenantDomain:" + tenantDomain, e);
|
|
|
|
|
// return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
// } catch (UserStoreException e) {
|
|
|
|
|
// log.error("Failed to connect with the user store, tenantDomain: " + tenantDomain, e);
|
|
|
|
|
// return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
// } catch (DeviceManagementException e) {
|
|
|
|
|
// log.error("Failed to access device management service, tenantDomain: " + tenantDomain, e);
|
|
|
|
|
// return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
// } catch (EventStreamPersistenceAdminServiceEventStreamPersistenceAdminServiceExceptionException e) {
|
|
|
|
|
// log.error("Failed to create event store for, tenantDomain: " + tenantDomain + " deviceType" + deviceType,
|
|
|
|
|
// e);
|
|
|
|
|
// return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Delete device type specific artifacts from DAS.
|
|
|
|
@ -349,175 +349,175 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
/**
|
|
|
|
|
* Returns device specific data for the give period of time.
|
|
|
|
|
*/
|
|
|
|
|
@GET
|
|
|
|
|
@Path("/{type}/{deviceId}")
|
|
|
|
|
@Override
|
|
|
|
|
public Response getData(@PathParam("deviceId") String deviceId, @QueryParam("from") long from,
|
|
|
|
|
@QueryParam("to") long to, @PathParam("type") String deviceType, @QueryParam("offset")
|
|
|
|
|
int offset, @QueryParam("limit") int limit) {
|
|
|
|
|
if (from == 0 || to == 0) {
|
|
|
|
|
String errorMessage = "Invalid values for from/to";
|
|
|
|
|
return Response.status(Response.Status.BAD_REQUEST).entity(errorMessage).build();
|
|
|
|
|
}
|
|
|
|
|
if (limit == 0) {
|
|
|
|
|
String errorMessage = "Invalid values for offset/limit";
|
|
|
|
|
return Response.status(Response.Status.BAD_REQUEST).entity(errorMessage).build();
|
|
|
|
|
}
|
|
|
|
|
String fromDate = String.valueOf(from);
|
|
|
|
|
String toDate = String.valueOf(to);
|
|
|
|
|
String query = DEFAULT_META_DEVICE_ID_ATTRIBUTE + ":" + deviceId
|
|
|
|
|
+ " AND _timestamp : [" + fromDate + " TO " + toDate + "]";
|
|
|
|
|
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
|
String sensorTableName = getTableName(DeviceMgtAPIUtils.getStreamDefinition(deviceType, tenantDomain));
|
|
|
|
|
try {
|
|
|
|
|
if (deviceType == null ||
|
|
|
|
|
!DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(deviceType)) {
|
|
|
|
|
String errorMessage = "Invalid device type";
|
|
|
|
|
log.error(errorMessage);
|
|
|
|
|
return Response.status(Response.Status.BAD_REQUEST).build();
|
|
|
|
|
}
|
|
|
|
|
if (!DeviceMgtAPIUtils.getDeviceAccessAuthorizationService().isUserAuthorized(
|
|
|
|
|
new DeviceIdentifier(deviceId, deviceType))) {
|
|
|
|
|
return Response.status(Response.Status.UNAUTHORIZED.getStatusCode()).build();
|
|
|
|
|
}
|
|
|
|
|
List<SortByField> sortByFields = new ArrayList<>();
|
|
|
|
|
SortByField sortByField = new SortByField(TIMESTAMP_FIELD_NAME, SortType.DESC);
|
|
|
|
|
sortByFields.add(sortByField);
|
|
|
|
|
EventRecords eventRecords = getAllEventsForDevice(sensorTableName, query, sortByFields, offset, limit);
|
|
|
|
|
return Response.status(Response.Status.OK.getStatusCode()).entity(eventRecords).build();
|
|
|
|
|
} catch (AnalyticsException e) {
|
|
|
|
|
String errorMsg = "Error on retrieving stats on table " + sensorTableName + " with query " + query;
|
|
|
|
|
log.error(errorMsg);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()).entity(errorMsg).build();
|
|
|
|
|
} catch (DeviceAccessAuthorizationException e) {
|
|
|
|
|
log.error(e.getErrorMessage(), e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
} catch (DeviceManagementException e) {
|
|
|
|
|
String errorMsg = "Error on retrieving stats on table " + sensorTableName + " with query " + query;
|
|
|
|
|
log.error(errorMsg);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()).entity(errorMsg).build();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// @GET
|
|
|
|
|
// @Path("/{type}/{deviceId}")
|
|
|
|
|
// @Override
|
|
|
|
|
// public Response getData(@PathParam("deviceId") String deviceId, @QueryParam("from") long from,
|
|
|
|
|
// @QueryParam("to") long to, @PathParam("type") String deviceType, @QueryParam("offset")
|
|
|
|
|
// int offset, @QueryParam("limit") int limit) {
|
|
|
|
|
// if (from == 0 || to == 0) {
|
|
|
|
|
// String errorMessage = "Invalid values for from/to";
|
|
|
|
|
// return Response.status(Response.Status.BAD_REQUEST).entity(errorMessage).build();
|
|
|
|
|
// }
|
|
|
|
|
// if (limit == 0) {
|
|
|
|
|
// String errorMessage = "Invalid values for offset/limit";
|
|
|
|
|
// return Response.status(Response.Status.BAD_REQUEST).entity(errorMessage).build();
|
|
|
|
|
// }
|
|
|
|
|
// String fromDate = String.valueOf(from);
|
|
|
|
|
// String toDate = String.valueOf(to);
|
|
|
|
|
// String query = DEFAULT_META_DEVICE_ID_ATTRIBUTE + ":" + deviceId
|
|
|
|
|
// + " AND _timestamp : [" + fromDate + " TO " + toDate + "]";
|
|
|
|
|
// String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
|
// String sensorTableName = getTableName(DeviceMgtAPIUtils.getStreamDefinition(deviceType, tenantDomain));
|
|
|
|
|
// try {
|
|
|
|
|
// if (deviceType == null ||
|
|
|
|
|
// !DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(deviceType)) {
|
|
|
|
|
// String errorMessage = "Invalid device type";
|
|
|
|
|
// log.error(errorMessage);
|
|
|
|
|
// return Response.status(Response.Status.BAD_REQUEST).build();
|
|
|
|
|
// }
|
|
|
|
|
// if (!DeviceMgtAPIUtils.getDeviceAccessAuthorizationService().isUserAuthorized(
|
|
|
|
|
// new DeviceIdentifier(deviceId, deviceType))) {
|
|
|
|
|
// return Response.status(Response.Status.UNAUTHORIZED.getStatusCode()).build();
|
|
|
|
|
// }
|
|
|
|
|
// List<SortByField> sortByFields = new ArrayList<>();
|
|
|
|
|
// SortByField sortByField = new SortByField(TIMESTAMP_FIELD_NAME, SortType.DESC);
|
|
|
|
|
// sortByFields.add(sortByField);
|
|
|
|
|
// EventRecords eventRecords = getAllEventsForDevice(sensorTableName, query, sortByFields, offset, limit);
|
|
|
|
|
// return Response.status(Response.Status.OK.getStatusCode()).entity(eventRecords).build();
|
|
|
|
|
// } catch (AnalyticsException e) {
|
|
|
|
|
// String errorMsg = "Error on retrieving stats on table " + sensorTableName + " with query " + query;
|
|
|
|
|
// log.error(errorMsg);
|
|
|
|
|
// return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()).entity(errorMsg).build();
|
|
|
|
|
// } catch (DeviceAccessAuthorizationException e) {
|
|
|
|
|
// log.error(e.getErrorMessage(), e);
|
|
|
|
|
// return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
// } catch (DeviceManagementException e) {
|
|
|
|
|
// String errorMsg = "Error on retrieving stats on table " + sensorTableName + " with query " + query;
|
|
|
|
|
// log.error(errorMsg);
|
|
|
|
|
// return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()).entity(errorMsg).build();
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns last known data points up to the limit if limit is specified. Otherwise returns last known data point.
|
|
|
|
|
* Limit parameter needs to be zero or positive.
|
|
|
|
|
*/
|
|
|
|
|
@GET
|
|
|
|
|
@Path("/last-known/{type}/{deviceId}")
|
|
|
|
|
@Override
|
|
|
|
|
public Response getLastKnownData(@PathParam("deviceId") String deviceId, @PathParam("type") String deviceType, @QueryParam("limit") int limit) {
|
|
|
|
|
String query = DEFAULT_META_DEVICE_ID_ATTRIBUTE + ":" + deviceId;
|
|
|
|
|
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
|
String sensorTableName = getTableName(DeviceMgtAPIUtils.getStreamDefinition(deviceType, tenantDomain));
|
|
|
|
|
try {
|
|
|
|
|
if (deviceType == null ||
|
|
|
|
|
!DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(deviceType)) {
|
|
|
|
|
String errorMessage = "Invalid device type";
|
|
|
|
|
log.error(errorMessage);
|
|
|
|
|
return Response.status(Response.Status.BAD_REQUEST).build();
|
|
|
|
|
}
|
|
|
|
|
if (!DeviceMgtAPIUtils.getDeviceAccessAuthorizationService().isUserAuthorized(
|
|
|
|
|
new DeviceIdentifier(deviceId, deviceType))) {
|
|
|
|
|
return Response.status(Response.Status.UNAUTHORIZED.getStatusCode()).build();
|
|
|
|
|
}
|
|
|
|
|
List<SortByField> sortByFields = new ArrayList<>();
|
|
|
|
|
SortByField sortByField = new SortByField(TIMESTAMP_FIELD_NAME, SortType.DESC);
|
|
|
|
|
sortByFields.add(sortByField);
|
|
|
|
|
if (limit == 0) {
|
|
|
|
|
EventRecords eventRecords = getAllEventsForDevice(sensorTableName, query, sortByFields, 0, 1);
|
|
|
|
|
return Response.status(Response.Status.OK.getStatusCode()).entity(eventRecords).build();
|
|
|
|
|
} else if (limit > 0) {
|
|
|
|
|
EventRecords eventRecords = getAllEventsForDevice(sensorTableName, query, sortByFields, 0, limit);
|
|
|
|
|
return Response.status(Response.Status.OK.getStatusCode()).entity(eventRecords).build();
|
|
|
|
|
} else {
|
|
|
|
|
String errorMessage = "Invalid limit value";
|
|
|
|
|
return Response.status(Response.Status.BAD_REQUEST).entity(errorMessage).build();
|
|
|
|
|
}
|
|
|
|
|
} catch (AnalyticsException e) {
|
|
|
|
|
String errorMsg = "Error on retrieving stats on table " + sensorTableName + " with query " + query;
|
|
|
|
|
log.error(errorMsg);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()).entity(errorMsg).build();
|
|
|
|
|
} catch (DeviceAccessAuthorizationException e) {
|
|
|
|
|
log.error(e.getErrorMessage(), e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
} catch (DeviceManagementException e) {
|
|
|
|
|
String errorMsg = "Error on retrieving stats on table " + sensorTableName + " with query " + query;
|
|
|
|
|
log.error(errorMsg);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()).entity(errorMsg).build();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// @GET
|
|
|
|
|
// @Path("/last-known/{type}/{deviceId}")
|
|
|
|
|
// @Override
|
|
|
|
|
// public Response getLastKnownData(@PathParam("deviceId") String deviceId, @PathParam("type") String deviceType, @QueryParam("limit") int limit) {
|
|
|
|
|
// String query = DEFAULT_META_DEVICE_ID_ATTRIBUTE + ":" + deviceId;
|
|
|
|
|
// String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
|
// String sensorTableName = getTableName(DeviceMgtAPIUtils.getStreamDefinition(deviceType, tenantDomain));
|
|
|
|
|
// try {
|
|
|
|
|
// if (deviceType == null ||
|
|
|
|
|
// !DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(deviceType)) {
|
|
|
|
|
// String errorMessage = "Invalid device type";
|
|
|
|
|
// log.error(errorMessage);
|
|
|
|
|
// return Response.status(Response.Status.BAD_REQUEST).build();
|
|
|
|
|
// }
|
|
|
|
|
// if (!DeviceMgtAPIUtils.getDeviceAccessAuthorizationService().isUserAuthorized(
|
|
|
|
|
// new DeviceIdentifier(deviceId, deviceType))) {
|
|
|
|
|
// return Response.status(Response.Status.UNAUTHORIZED.getStatusCode()).build();
|
|
|
|
|
// }
|
|
|
|
|
// List<SortByField> sortByFields = new ArrayList<>();
|
|
|
|
|
// SortByField sortByField = new SortByField(TIMESTAMP_FIELD_NAME, SortType.DESC);
|
|
|
|
|
// sortByFields.add(sortByField);
|
|
|
|
|
// if (limit == 0) {
|
|
|
|
|
// EventRecords eventRecords = getAllEventsForDevice(sensorTableName, query, sortByFields, 0, 1);
|
|
|
|
|
// return Response.status(Response.Status.OK.getStatusCode()).entity(eventRecords).build();
|
|
|
|
|
// } else if (limit > 0) {
|
|
|
|
|
// EventRecords eventRecords = getAllEventsForDevice(sensorTableName, query, sortByFields, 0, limit);
|
|
|
|
|
// return Response.status(Response.Status.OK.getStatusCode()).entity(eventRecords).build();
|
|
|
|
|
// } else {
|
|
|
|
|
// String errorMessage = "Invalid limit value";
|
|
|
|
|
// return Response.status(Response.Status.BAD_REQUEST).entity(errorMessage).build();
|
|
|
|
|
// }
|
|
|
|
|
// } catch (AnalyticsException e) {
|
|
|
|
|
// String errorMsg = "Error on retrieving stats on table " + sensorTableName + " with query " + query;
|
|
|
|
|
// log.error(errorMsg);
|
|
|
|
|
// return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()).entity(errorMsg).build();
|
|
|
|
|
// } catch (DeviceAccessAuthorizationException e) {
|
|
|
|
|
// log.error(e.getErrorMessage(), e);
|
|
|
|
|
// return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
// } catch (DeviceManagementException e) {
|
|
|
|
|
// String errorMsg = "Error on retrieving stats on table " + sensorTableName + " with query " + query;
|
|
|
|
|
// log.error(errorMsg);
|
|
|
|
|
// return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()).entity(errorMsg).build();
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns the filterd device list. Devices are filterd using the paramter given and the timestamp of the record.
|
|
|
|
|
* parameter should given as a range.
|
|
|
|
|
*/
|
|
|
|
|
@GET
|
|
|
|
|
@Path("filter/{type}/{parameter}")
|
|
|
|
|
@Override
|
|
|
|
|
public Response getFilteredDevices(@PathParam("type") String deviceType, @PathParam("parameter") String parameter,
|
|
|
|
|
@QueryParam("min") double min, @QueryParam("max") double max) {
|
|
|
|
|
String query;
|
|
|
|
|
Calendar c = java.util.Calendar.getInstance();
|
|
|
|
|
long currentTimestamp = c.getTimeInMillis();
|
|
|
|
|
long previousTimestamp = currentTimestamp - 300 * 1000;
|
|
|
|
|
String fromDate = String.valueOf(previousTimestamp);
|
|
|
|
|
String toDate = String.valueOf(currentTimestamp);
|
|
|
|
|
if (min != 0 & max != 0) {
|
|
|
|
|
query = parameter + " : [" + min + " TO " + max + "]" +
|
|
|
|
|
" AND _timestamp : [" + fromDate + " TO " + toDate + "]";
|
|
|
|
|
} else {
|
|
|
|
|
String errorMessage = "The of range values need to be given";
|
|
|
|
|
log.error(errorMessage);
|
|
|
|
|
return Response.status(Response.Status.BAD_REQUEST).build();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
|
String sensorTableName = getTableName(DeviceMgtAPIUtils.getStreamDefinition(deviceType, tenantDomain));
|
|
|
|
|
try {
|
|
|
|
|
if (deviceType == null ||
|
|
|
|
|
!DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(deviceType)) {
|
|
|
|
|
String errorMessage = "Invalid device type";
|
|
|
|
|
log.error(errorMessage);
|
|
|
|
|
return Response.status(Response.Status.BAD_REQUEST).build();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
List<SortByField> sortByFields = new ArrayList<>();
|
|
|
|
|
SortByField sortByField = new SortByField(TIMESTAMP_FIELD_NAME, SortType.DESC);
|
|
|
|
|
sortByFields.add(sortByField);
|
|
|
|
|
EventRecords eventRecords = getAllEventsForDevice(sensorTableName, query, sortByFields, 0, 100);
|
|
|
|
|
List<Record> filterdEvents = eventRecords.getRecord();
|
|
|
|
|
List<Record> uniqueFilterdEvents = new ArrayList<Record>();
|
|
|
|
|
Set<String> devices = new HashSet<>();
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < filterdEvents.size(); i++) {
|
|
|
|
|
String deviceid = (String) filterdEvents.get(i).getValue("meta_deviceId");
|
|
|
|
|
if (!devices.contains(deviceid) && DeviceMgtAPIUtils.getDeviceAccessAuthorizationService().isUserAuthorized(
|
|
|
|
|
new DeviceIdentifier(deviceid, deviceType))) {
|
|
|
|
|
devices.add(deviceid);
|
|
|
|
|
uniqueFilterdEvents.add(filterdEvents.get(i));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
EventRecords filterdRecords = new EventRecords();
|
|
|
|
|
filterdRecords.setList(uniqueFilterdEvents);
|
|
|
|
|
return Response.status(Response.Status.OK.getStatusCode()).entity(filterdRecords).build();
|
|
|
|
|
|
|
|
|
|
} catch (AnalyticsException e) {
|
|
|
|
|
String errorMsg = "Error on retrieving stats on table " + sensorTableName + " with query " + query;
|
|
|
|
|
log.error(errorMsg);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()).entity(errorMsg).build();
|
|
|
|
|
} catch (DeviceManagementException e) {
|
|
|
|
|
String errorMsg = "Error on retrieving stats on table " + sensorTableName + " with query " + query;
|
|
|
|
|
log.error(errorMsg);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()).entity(errorMsg).build();
|
|
|
|
|
} catch (DeviceAccessAuthorizationException e) {
|
|
|
|
|
String errorMsg = "Error on retrieving stats on table " + sensorTableName + " with query " + query;
|
|
|
|
|
log.error(errorMsg);
|
|
|
|
|
return Response.status(Response.Status.UNAUTHORIZED.getStatusCode()).build();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// @GET
|
|
|
|
|
// @Path("filter/{type}/{parameter}")
|
|
|
|
|
// @Override
|
|
|
|
|
// public Response getFilteredDevices(@PathParam("type") String deviceType, @PathParam("parameter") String parameter,
|
|
|
|
|
// @QueryParam("min") double min, @QueryParam("max") double max) {
|
|
|
|
|
// String query;
|
|
|
|
|
// Calendar c = java.util.Calendar.getInstance();
|
|
|
|
|
// long currentTimestamp = c.getTimeInMillis();
|
|
|
|
|
// long previousTimestamp = currentTimestamp - 300 * 1000;
|
|
|
|
|
// String fromDate = String.valueOf(previousTimestamp);
|
|
|
|
|
// String toDate = String.valueOf(currentTimestamp);
|
|
|
|
|
// if (min != 0 & max != 0) {
|
|
|
|
|
// query = parameter + " : [" + min + " TO " + max + "]" +
|
|
|
|
|
// " AND _timestamp : [" + fromDate + " TO " + toDate + "]";
|
|
|
|
|
// } else {
|
|
|
|
|
// String errorMessage = "The of range values need to be given";
|
|
|
|
|
// log.error(errorMessage);
|
|
|
|
|
// return Response.status(Response.Status.BAD_REQUEST).build();
|
|
|
|
|
// }
|
|
|
|
|
//
|
|
|
|
|
// String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
|
// String sensorTableName = getTableName(DeviceMgtAPIUtils.getStreamDefinition(deviceType, tenantDomain));
|
|
|
|
|
// try {
|
|
|
|
|
// if (deviceType == null ||
|
|
|
|
|
// !DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(deviceType)) {
|
|
|
|
|
// String errorMessage = "Invalid device type";
|
|
|
|
|
// log.error(errorMessage);
|
|
|
|
|
// return Response.status(Response.Status.BAD_REQUEST).build();
|
|
|
|
|
// }
|
|
|
|
|
//
|
|
|
|
|
// List<SortByField> sortByFields = new ArrayList<>();
|
|
|
|
|
// SortByField sortByField = new SortByField(TIMESTAMP_FIELD_NAME, SortType.DESC);
|
|
|
|
|
// sortByFields.add(sortByField);
|
|
|
|
|
// EventRecords eventRecords = getAllEventsForDevice(sensorTableName, query, sortByFields, 0, 100);
|
|
|
|
|
// List<Record> filterdEvents = eventRecords.getRecord();
|
|
|
|
|
// List<Record> uniqueFilterdEvents = new ArrayList<Record>();
|
|
|
|
|
// Set<String> devices = new HashSet<>();
|
|
|
|
|
//
|
|
|
|
|
// for (int i = 0; i < filterdEvents.size(); i++) {
|
|
|
|
|
// String deviceid = (String) filterdEvents.get(i).getValue("meta_deviceId");
|
|
|
|
|
// if (!devices.contains(deviceid) && DeviceMgtAPIUtils.getDeviceAccessAuthorizationService().isUserAuthorized(
|
|
|
|
|
// new DeviceIdentifier(deviceid, deviceType))) {
|
|
|
|
|
// devices.add(deviceid);
|
|
|
|
|
// uniqueFilterdEvents.add(filterdEvents.get(i));
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
//
|
|
|
|
|
// EventRecords filterdRecords = new EventRecords();
|
|
|
|
|
// filterdRecords.setList(uniqueFilterdEvents);
|
|
|
|
|
// return Response.status(Response.Status.OK.getStatusCode()).entity(filterdRecords).build();
|
|
|
|
|
//
|
|
|
|
|
// } catch (AnalyticsException e) {
|
|
|
|
|
// String errorMsg = "Error on retrieving stats on table " + sensorTableName + " with query " + query;
|
|
|
|
|
// log.error(errorMsg);
|
|
|
|
|
// return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()).entity(errorMsg).build();
|
|
|
|
|
// } catch (DeviceManagementException e) {
|
|
|
|
|
// String errorMsg = "Error on retrieving stats on table " + sensorTableName + " with query " + query;
|
|
|
|
|
// log.error(errorMsg);
|
|
|
|
|
// return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()).entity(errorMsg).build();
|
|
|
|
|
// } catch (DeviceAccessAuthorizationException e) {
|
|
|
|
|
// String errorMsg = "Error on retrieving stats on table " + sensorTableName + " with query " + query;
|
|
|
|
|
// log.error(errorMsg);
|
|
|
|
|
// return Response.status(Response.Status.UNAUTHORIZED.getStatusCode()).build();
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void publishEventReceivers(String streamNameWithVersion, TransportType transportType
|
|
|
|
@ -613,47 +613,47 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void publishEventStore(String streamName, String version, EventAttributeList eventAttributes)
|
|
|
|
|
throws RemoteException, UserStoreException, JWTClientException,
|
|
|
|
|
EventStreamPersistenceAdminServiceEventStreamPersistenceAdminServiceExceptionException {
|
|
|
|
|
EventStreamPersistenceAdminServiceStub eventStreamPersistenceAdminServiceStub =
|
|
|
|
|
DeviceMgtAPIUtils.getEventStreamPersistenceAdminServiceStub();
|
|
|
|
|
try {
|
|
|
|
|
AnalyticsTable analyticsTable = new AnalyticsTable();
|
|
|
|
|
analyticsTable.setRecordStoreName(DEFAULT_EVENT_STORE_NAME);
|
|
|
|
|
analyticsTable.setStreamVersion(version);
|
|
|
|
|
analyticsTable.setTableName(streamName);
|
|
|
|
|
analyticsTable.setMergeSchema(false);
|
|
|
|
|
analyticsTable.setPersist(true);
|
|
|
|
|
AnalyticsTableRecord analyticsTableRecords[] = new AnalyticsTableRecord[eventAttributes.getList().size() + 1];
|
|
|
|
|
int i = 0;
|
|
|
|
|
for (Attribute attribute : eventAttributes.getList()) {
|
|
|
|
|
AnalyticsTableRecord analyticsTableRecord = new AnalyticsTableRecord();
|
|
|
|
|
analyticsTableRecord.setColumnName(attribute.getName());
|
|
|
|
|
analyticsTableRecord.setColumnType(attribute.getType().toString().toUpperCase());
|
|
|
|
|
analyticsTableRecord.setFacet(false);
|
|
|
|
|
analyticsTableRecord.setIndexed(false);
|
|
|
|
|
analyticsTableRecord.setPersist(true);
|
|
|
|
|
analyticsTableRecord.setPrimaryKey(false);
|
|
|
|
|
analyticsTableRecord.setScoreParam(false);
|
|
|
|
|
analyticsTableRecords[i] = analyticsTableRecord;
|
|
|
|
|
i++;
|
|
|
|
|
}
|
|
|
|
|
AnalyticsTableRecord analyticsTableRecord = new AnalyticsTableRecord();
|
|
|
|
|
analyticsTableRecord.setColumnName(DEFAULT_META_DEVICE_ID_ATTRIBUTE);
|
|
|
|
|
analyticsTableRecord.setColumnType(AttributeType.STRING.toString().toUpperCase());
|
|
|
|
|
analyticsTableRecord.setFacet(false);
|
|
|
|
|
analyticsTableRecord.setIndexed(true);
|
|
|
|
|
analyticsTableRecord.setPersist(true);
|
|
|
|
|
analyticsTableRecord.setPrimaryKey(false);
|
|
|
|
|
analyticsTableRecord.setScoreParam(false);
|
|
|
|
|
analyticsTableRecords[i] = analyticsTableRecord;
|
|
|
|
|
analyticsTable.setAnalyticsTableRecords(analyticsTableRecords);
|
|
|
|
|
eventStreamPersistenceAdminServiceStub.addAnalyticsTable(analyticsTable);
|
|
|
|
|
} finally {
|
|
|
|
|
cleanup(eventStreamPersistenceAdminServiceStub);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// private void publishEventStore(String streamName, String version, EventAttributeList eventAttributes)
|
|
|
|
|
// throws RemoteException, UserStoreException, JWTClientException,
|
|
|
|
|
// EventStreamPersistenceAdminServiceEventStreamPersistenceAdminServiceExceptionException {
|
|
|
|
|
// EventStreamPersistenceAdminServiceStub eventStreamPersistenceAdminServiceStub =
|
|
|
|
|
// DeviceMgtAPIUtils.getEventStreamPersistenceAdminServiceStub();
|
|
|
|
|
// try {
|
|
|
|
|
// AnalyticsTable analyticsTable = new AnalyticsTable();
|
|
|
|
|
// analyticsTable.setRecordStoreName(DEFAULT_EVENT_STORE_NAME);
|
|
|
|
|
// analyticsTable.setStreamVersion(version);
|
|
|
|
|
// analyticsTable.setTableName(streamName);
|
|
|
|
|
// analyticsTable.setMergeSchema(false);
|
|
|
|
|
// analyticsTable.setPersist(true);
|
|
|
|
|
// AnalyticsTableRecord analyticsTableRecords[] = new AnalyticsTableRecord[eventAttributes.getList().size() + 1];
|
|
|
|
|
// int i = 0;
|
|
|
|
|
// for (Attribute attribute : eventAttributes.getList()) {
|
|
|
|
|
// AnalyticsTableRecord analyticsTableRecord = new AnalyticsTableRecord();
|
|
|
|
|
// analyticsTableRecord.setColumnName(attribute.getName());
|
|
|
|
|
// analyticsTableRecord.setColumnType(attribute.getType().toString().toUpperCase());
|
|
|
|
|
// analyticsTableRecord.setFacet(false);
|
|
|
|
|
// analyticsTableRecord.setIndexed(false);
|
|
|
|
|
// analyticsTableRecord.setPersist(true);
|
|
|
|
|
// analyticsTableRecord.setPrimaryKey(false);
|
|
|
|
|
// analyticsTableRecord.setScoreParam(false);
|
|
|
|
|
// analyticsTableRecords[i] = analyticsTableRecord;
|
|
|
|
|
// i++;
|
|
|
|
|
// }
|
|
|
|
|
// AnalyticsTableRecord analyticsTableRecord = new AnalyticsTableRecord();
|
|
|
|
|
// analyticsTableRecord.setColumnName(DEFAULT_META_DEVICE_ID_ATTRIBUTE);
|
|
|
|
|
// analyticsTableRecord.setColumnType(AttributeType.STRING.toString().toUpperCase());
|
|
|
|
|
// analyticsTableRecord.setFacet(false);
|
|
|
|
|
// analyticsTableRecord.setIndexed(true);
|
|
|
|
|
// analyticsTableRecord.setPersist(true);
|
|
|
|
|
// analyticsTableRecord.setPrimaryKey(false);
|
|
|
|
|
// analyticsTableRecord.setScoreParam(false);
|
|
|
|
|
// analyticsTableRecords[i] = analyticsTableRecord;
|
|
|
|
|
// analyticsTable.setAnalyticsTableRecords(analyticsTableRecords);
|
|
|
|
|
// eventStreamPersistenceAdminServiceStub.addAnalyticsTable(analyticsTable);
|
|
|
|
|
// } finally {
|
|
|
|
|
// cleanup(eventStreamPersistenceAdminServiceStub);
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
private void publishWebsocketPublisherDefinition(String streamNameWithVersion, String deviceType)
|
|
|
|
|
throws RemoteException, UserStoreException, JWTClientException {
|
|
|
|
|