Skip to content

Commit ebab496

Browse files
committed
Revert "impl for bulkupdate api (#121)"
This reverts commit 4cffb07.
1 parent 7305e9d commit ebab496

5 files changed

Lines changed: 36 additions & 424 deletions

File tree

entity-service-impl/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ dependencies {
1111
annotationProcessor("org.projectlombok:lombok:1.18.18")
1212
compileOnly("org.projectlombok:lombok:1.18.18")
1313

14-
implementation("org.hypertrace.core.documentstore:document-store:0.6.0")
14+
implementation("org.hypertrace.core.documentstore:document-store:0.5.8")
1515
implementation("org.hypertrace.core.grpcutils:grpc-context-utils:0.4.0")
1616
implementation("org.hypertrace.core.grpcutils:grpc-client-utils:0.4.0")
1717
implementation("org.hypertrace.core.attribute.service:caching-attribute-service-client:0.12.3")

entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java

Lines changed: 19 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -7,27 +7,23 @@
77
import com.google.protobuf.ServiceException;
88
import com.typesafe.config.Config;
99
import io.grpc.stub.StreamObserver;
10+
import java.io.IOException;
1011
import java.util.ArrayList;
1112
import java.util.Collections;
12-
import java.util.HashMap;
1313
import java.util.Iterator;
1414
import java.util.List;
15-
import java.util.Map;
1615
import java.util.Optional;
1716
import org.hypertrace.core.documentstore.Collection;
1817
import org.hypertrace.core.documentstore.Datastore;
1918
import org.hypertrace.core.documentstore.Document;
2019
import org.hypertrace.core.documentstore.JSONDocument;
21-
import org.hypertrace.core.documentstore.Key;
2220
import org.hypertrace.core.documentstore.SingleValueKey;
2321
import org.hypertrace.core.grpcutils.client.GrpcChannelRegistry;
2422
import org.hypertrace.core.grpcutils.context.RequestContext;
2523
import org.hypertrace.entity.data.service.DocumentParser;
2624
import org.hypertrace.entity.data.service.v1.AttributeValue;
2725
import org.hypertrace.entity.data.service.v1.Entity;
2826
import org.hypertrace.entity.data.service.v1.Query;
29-
import org.hypertrace.entity.query.service.v1.BulkEntityUpdateRequest;
30-
import org.hypertrace.entity.query.service.v1.BulkEntityUpdateRequest.EntityUpdateInfo;
3127
import org.hypertrace.entity.query.service.v1.ColumnIdentifier;
3228
import org.hypertrace.entity.query.service.v1.ColumnMetadata;
3329
import org.hypertrace.entity.query.service.v1.EntityQueryRequest;
@@ -41,7 +37,6 @@
4137
import org.hypertrace.entity.query.service.v1.SetAttribute;
4238
import org.hypertrace.entity.query.service.v1.TotalEntitiesRequest;
4339
import org.hypertrace.entity.query.service.v1.TotalEntitiesResponse;
44-
import org.hypertrace.entity.query.service.v1.UpdateOperation;
4540
import org.hypertrace.entity.query.service.v1.Value;
4641
import org.hypertrace.entity.query.service.v1.ValueType;
4742
import org.hypertrace.entity.service.constants.EntityServiceConstants;
@@ -240,8 +235,8 @@ Row convertToEntityQueryResult(
240235
public void update(EntityUpdateRequest request, StreamObserver<ResultSetChunk> responseObserver) {
241236
// Validations
242237
RequestContext requestContext = RequestContext.CURRENT.get();
243-
Optional<String> maybeTenantId = requestContext.getTenantId();
244-
if (maybeTenantId.isEmpty()) {
238+
Optional<String> tenantId = requestContext.getTenantId();
239+
if (tenantId.isEmpty()) {
245240
responseObserver.onError(new ServiceException("Tenant id is missing in the request."));
246241
return;
247242
}
@@ -261,9 +256,14 @@ public void update(EntityUpdateRequest request, StreamObserver<ResultSetChunk> r
261256
doUpdate(requestContext, request);
262257

263258
// Finally return the selections
264-
List<Entity> entities =
265-
getProjectedEntities(
266-
request.getEntityIdsList(), request.getSelectionList(), requestContext);
259+
Query entitiesQuery = Query.newBuilder().addAllEntityId(request.getEntityIdsList()).build();
260+
List<String> docStoreSelections =
261+
entityQueryConverter.convertSelectionsToDocStoreSelections(
262+
requestContext, request.getSelectionList());
263+
Iterator<Document> documentIterator =
264+
entitiesCollection.search(
265+
DocStoreConverter.transform(tenantId.get(), entitiesQuery, docStoreSelections));
266+
List<Entity> entities = convertDocsToEntities(documentIterator);
267267
responseObserver.onNext(
268268
convertEntitiesToResultSetChunk(requestContext, entities, request.getSelectionList()));
269269
responseObserver.onCompleted();
@@ -274,7 +274,7 @@ public void update(EntityUpdateRequest request, StreamObserver<ResultSetChunk> r
274274
}
275275

276276
private void doUpdate(RequestContext requestContext, EntityUpdateRequest request)
277-
throws Exception {
277+
throws IOException {
278278
if (request.getOperation().hasSetAttribute()) {
279279
SetAttribute setAttribute = request.getOperation().getSetAttribute();
280280
String attributeId = setAttribute.getAttribute().getColumnName();
@@ -289,128 +289,20 @@ private void doUpdate(RequestContext requestContext, EntityUpdateRequest request
289289
AttributeValue attributeValue =
290290
EntityQueryConverter.convertToAttributeValue(setAttribute.getValue()).build();
291291
String jsonValue = DocStoreJsonFormat.printer().print(attributeValue);
292-
JSONDocument jsonDocument = new JSONDocument(jsonValue);
293292

294-
Map<Key, Map<String, Document>> entitiesUpdateMap = new HashMap<>();
295293
for (String entityId : request.getEntityIdsList()) {
296294
SingleValueKey key =
297295
new SingleValueKey(requestContext.getTenantId().orElseThrow(), entityId);
298-
if (entitiesUpdateMap.containsKey(key)) {
299-
entitiesUpdateMap.get(key).put(subDocPath, jsonDocument);
300-
} else {
301-
Map<String, Document> subDocument = new HashMap<>();
302-
subDocument.put(subDocPath, jsonDocument);
303-
entitiesUpdateMap.put(key, subDocument);
296+
// TODO better error reporting once doc store exposes the,
297+
if (!entitiesCollection.updateSubDoc(key, subDocPath, new JSONDocument(jsonValue))) {
298+
LOG.warn(
299+
"Failed to update entity {}, subDocPath {}, with new doc {}.",
300+
key,
301+
subDocPath,
302+
jsonValue);
304303
}
305304
}
306-
try {
307-
entitiesCollection.bulkUpdateSubDocs(entitiesUpdateMap);
308-
} catch (Exception e) {
309-
LOG.error(
310-
"Failed to update entities {}, subDocPath {}, with new doc {}.",
311-
entitiesUpdateMap,
312-
subDocPath,
313-
jsonValue,
314-
e);
315-
throw e;
316-
}
317-
}
318-
}
319-
320-
@Override
321-
public void bulkUpdate(
322-
BulkEntityUpdateRequest request, StreamObserver<ResultSetChunk> responseObserver) {
323-
// Validations
324-
RequestContext requestContext = RequestContext.CURRENT.get();
325-
Optional<String> maybeTenantId = requestContext.getTenantId();
326-
if (maybeTenantId.isEmpty()) {
327-
responseObserver.onError(new ServiceException("Tenant id is missing in the request."));
328-
return;
329-
}
330-
if (StringUtils.isEmpty(request.getEntityType())) {
331-
responseObserver.onError(new ServiceException("Entity type is missing in the request."));
332-
return;
333-
}
334-
if (request.getEntitiesCount() == 0) {
335-
responseObserver.onError(new ServiceException("Entities are missing in the request."));
336-
}
337-
Map<String, EntityUpdateInfo> entitiesMap = request.getEntitiesMap();
338-
try {
339-
doBulkUpdate(requestContext, entitiesMap);
340-
responseObserver.onCompleted();
341-
} catch (Exception e) {
342-
responseObserver.onError(
343-
new ServiceException("Error occurred while executing " + request, e));
344-
}
345-
}
346-
347-
private List<Entity> getProjectedEntities(
348-
Iterable<String> entityIdsList,
349-
List<Expression> selectionList,
350-
RequestContext requestContext) {
351-
Query entitiesQuery = Query.newBuilder().addAllEntityId(entityIdsList).build();
352-
List<String> docStoreSelections =
353-
entityQueryConverter.convertSelectionsToDocStoreSelections(requestContext, selectionList);
354-
Iterator<Document> documentIterator =
355-
entitiesCollection.search(
356-
DocStoreConverter.transform(
357-
requestContext.getTenantId().orElseThrow(), entitiesQuery, docStoreSelections));
358-
return convertDocsToEntities(documentIterator);
359-
}
360-
361-
private void doBulkUpdate(
362-
RequestContext requestContext, Map<String, EntityUpdateInfo> entitiesMap) throws Exception {
363-
Map<Key, Map<String, Document>> entitiesUpdateMap = new HashMap<>();
364-
for (String entityId : entitiesMap.keySet()) {
365-
Map<String, Document> transformedUpdateOperations =
366-
transformUpdateOperations(
367-
entitiesMap.get(entityId).getUpdateOperationList(), requestContext);
368-
if (transformedUpdateOperations.isEmpty()) {
369-
continue;
370-
}
371-
entitiesUpdateMap.put(
372-
new SingleValueKey(requestContext.getTenantId().orElseThrow(), entityId),
373-
transformedUpdateOperations);
374-
}
375-
376-
if (entitiesUpdateMap.isEmpty()) {
377-
LOG.error("There are no entities to update!");
378-
return;
379-
}
380-
381-
try {
382-
entitiesCollection.bulkUpdateSubDocs(entitiesUpdateMap);
383-
} catch (Exception e) {
384-
LOG.error("Failed to update entities {}", entitiesMap, e);
385-
throw e;
386-
}
387-
}
388-
389-
private Map<String, Document> transformUpdateOperations(
390-
List<UpdateOperation> updateOperationList, RequestContext requestContext) throws Exception {
391-
Map<String, Document> documentMap = new HashMap<>();
392-
for (UpdateOperation updateOperation : updateOperationList) {
393-
if (!updateOperation.hasSetAttribute()) {
394-
continue;
395-
}
396-
SetAttribute setAttribute = updateOperation.getSetAttribute();
397-
String attributeId = setAttribute.getAttribute().getColumnName();
398-
String subDocPath =
399-
entityAttributeMapping
400-
.getDocStorePathByAttributeId(requestContext, attributeId)
401-
.orElseThrow(
402-
() -> new IllegalArgumentException("Unknown attribute FQN " + attributeId));
403-
AttributeValue attributeValue =
404-
EntityQueryConverter.convertToAttributeValue(setAttribute.getValue()).build();
405-
try {
406-
String jsonValue = DocStoreJsonFormat.printer().print(attributeValue);
407-
documentMap.put(subDocPath, new JSONDocument(jsonValue));
408-
} catch (Exception e) {
409-
LOG.error("Failed to put update corresponding to {} in the documentMap", subDocPath, e);
410-
throw e;
411-
}
412305
}
413-
return Collections.unmodifiableMap(documentMap);
414306
}
415307

416308
@Override

0 commit comments

Comments
 (0)