77import com .google .protobuf .ServiceException ;
88import com .typesafe .config .Config ;
99import io .grpc .stub .StreamObserver ;
10+ import java .io .IOException ;
1011import java .util .ArrayList ;
1112import java .util .Collections ;
12- import java .util .HashMap ;
1313import java .util .Iterator ;
1414import java .util .List ;
15- import java .util .Map ;
1615import java .util .Optional ;
1716import org .hypertrace .core .documentstore .Collection ;
1817import org .hypertrace .core .documentstore .Datastore ;
1918import org .hypertrace .core .documentstore .Document ;
2019import org .hypertrace .core .documentstore .JSONDocument ;
21- import org .hypertrace .core .documentstore .Key ;
2220import org .hypertrace .core .documentstore .SingleValueKey ;
2321import org .hypertrace .core .grpcutils .client .GrpcChannelRegistry ;
2422import org .hypertrace .core .grpcutils .context .RequestContext ;
2523import org .hypertrace .entity .data .service .DocumentParser ;
2624import org .hypertrace .entity .data .service .v1 .AttributeValue ;
2725import org .hypertrace .entity .data .service .v1 .Entity ;
2826import org .hypertrace .entity .data .service .v1 .Query ;
29- import org .hypertrace .entity .query .service .v1 .BulkEntityUpdateRequest ;
30- import org .hypertrace .entity .query .service .v1 .BulkEntityUpdateRequest .EntityUpdateInfo ;
3127import org .hypertrace .entity .query .service .v1 .ColumnIdentifier ;
3228import org .hypertrace .entity .query .service .v1 .ColumnMetadata ;
3329import org .hypertrace .entity .query .service .v1 .EntityQueryRequest ;
4137import org .hypertrace .entity .query .service .v1 .SetAttribute ;
4238import org .hypertrace .entity .query .service .v1 .TotalEntitiesRequest ;
4339import org .hypertrace .entity .query .service .v1 .TotalEntitiesResponse ;
44- import org .hypertrace .entity .query .service .v1 .UpdateOperation ;
4540import org .hypertrace .entity .query .service .v1 .Value ;
4641import org .hypertrace .entity .query .service .v1 .ValueType ;
4742import org .hypertrace .entity .service .constants .EntityServiceConstants ;
@@ -240,8 +235,8 @@ Row convertToEntityQueryResult(
240235 public void update (EntityUpdateRequest request , StreamObserver <ResultSetChunk > responseObserver ) {
241236 // Validations
242237 RequestContext requestContext = RequestContext .CURRENT .get ();
243- Optional <String > maybeTenantId = requestContext .getTenantId ();
244- if (maybeTenantId .isEmpty ()) {
238+ Optional <String > tenantId = requestContext .getTenantId ();
239+ if (tenantId .isEmpty ()) {
245240 responseObserver .onError (new ServiceException ("Tenant id is missing in the request." ));
246241 return ;
247242 }
@@ -261,9 +256,14 @@ public void update(EntityUpdateRequest request, StreamObserver<ResultSetChunk> r
261256 doUpdate (requestContext , request );
262257
263258 // Finally return the selections
264- List <Entity > entities =
265- getProjectedEntities (
266- request .getEntityIdsList (), request .getSelectionList (), requestContext );
259+ Query entitiesQuery = Query .newBuilder ().addAllEntityId (request .getEntityIdsList ()).build ();
260+ List <String > docStoreSelections =
261+ entityQueryConverter .convertSelectionsToDocStoreSelections (
262+ requestContext , request .getSelectionList ());
263+ Iterator <Document > documentIterator =
264+ entitiesCollection .search (
265+ DocStoreConverter .transform (tenantId .get (), entitiesQuery , docStoreSelections ));
266+ List <Entity > entities = convertDocsToEntities (documentIterator );
267267 responseObserver .onNext (
268268 convertEntitiesToResultSetChunk (requestContext , entities , request .getSelectionList ()));
269269 responseObserver .onCompleted ();
@@ -274,7 +274,7 @@ public void update(EntityUpdateRequest request, StreamObserver<ResultSetChunk> r
274274 }
275275
276276 private void doUpdate (RequestContext requestContext , EntityUpdateRequest request )
277- throws Exception {
277+ throws IOException {
278278 if (request .getOperation ().hasSetAttribute ()) {
279279 SetAttribute setAttribute = request .getOperation ().getSetAttribute ();
280280 String attributeId = setAttribute .getAttribute ().getColumnName ();
@@ -289,128 +289,20 @@ private void doUpdate(RequestContext requestContext, EntityUpdateRequest request
289289 AttributeValue attributeValue =
290290 EntityQueryConverter .convertToAttributeValue (setAttribute .getValue ()).build ();
291291 String jsonValue = DocStoreJsonFormat .printer ().print (attributeValue );
292- JSONDocument jsonDocument = new JSONDocument (jsonValue );
293292
294- Map <Key , Map <String , Document >> entitiesUpdateMap = new HashMap <>();
295293 for (String entityId : request .getEntityIdsList ()) {
296294 SingleValueKey key =
297295 new SingleValueKey (requestContext .getTenantId ().orElseThrow (), entityId );
298- if (entitiesUpdateMap .containsKey (key )) {
299- entitiesUpdateMap .get (key ).put (subDocPath , jsonDocument );
300- } else {
301- Map <String , Document > subDocument = new HashMap <>();
302- subDocument .put (subDocPath , jsonDocument );
303- entitiesUpdateMap .put (key , subDocument );
296+ // TODO better error reporting once doc store exposes the,
297+ if (!entitiesCollection .updateSubDoc (key , subDocPath , new JSONDocument (jsonValue ))) {
298+ LOG .warn (
299+ "Failed to update entity {}, subDocPath {}, with new doc {}." ,
300+ key ,
301+ subDocPath ,
302+ jsonValue );
304303 }
305304 }
306- try {
307- entitiesCollection .bulkUpdateSubDocs (entitiesUpdateMap );
308- } catch (Exception e ) {
309- LOG .error (
310- "Failed to update entities {}, subDocPath {}, with new doc {}." ,
311- entitiesUpdateMap ,
312- subDocPath ,
313- jsonValue ,
314- e );
315- throw e ;
316- }
317- }
318- }
319-
320- @ Override
321- public void bulkUpdate (
322- BulkEntityUpdateRequest request , StreamObserver <ResultSetChunk > responseObserver ) {
323- // Validations
324- RequestContext requestContext = RequestContext .CURRENT .get ();
325- Optional <String > maybeTenantId = requestContext .getTenantId ();
326- if (maybeTenantId .isEmpty ()) {
327- responseObserver .onError (new ServiceException ("Tenant id is missing in the request." ));
328- return ;
329- }
330- if (StringUtils .isEmpty (request .getEntityType ())) {
331- responseObserver .onError (new ServiceException ("Entity type is missing in the request." ));
332- return ;
333- }
334- if (request .getEntitiesCount () == 0 ) {
335- responseObserver .onError (new ServiceException ("Entities are missing in the request." ));
336- }
337- Map <String , EntityUpdateInfo > entitiesMap = request .getEntitiesMap ();
338- try {
339- doBulkUpdate (requestContext , entitiesMap );
340- responseObserver .onCompleted ();
341- } catch (Exception e ) {
342- responseObserver .onError (
343- new ServiceException ("Error occurred while executing " + request , e ));
344- }
345- }
346-
347- private List <Entity > getProjectedEntities (
348- Iterable <String > entityIdsList ,
349- List <Expression > selectionList ,
350- RequestContext requestContext ) {
351- Query entitiesQuery = Query .newBuilder ().addAllEntityId (entityIdsList ).build ();
352- List <String > docStoreSelections =
353- entityQueryConverter .convertSelectionsToDocStoreSelections (requestContext , selectionList );
354- Iterator <Document > documentIterator =
355- entitiesCollection .search (
356- DocStoreConverter .transform (
357- requestContext .getTenantId ().orElseThrow (), entitiesQuery , docStoreSelections ));
358- return convertDocsToEntities (documentIterator );
359- }
360-
361- private void doBulkUpdate (
362- RequestContext requestContext , Map <String , EntityUpdateInfo > entitiesMap ) throws Exception {
363- Map <Key , Map <String , Document >> entitiesUpdateMap = new HashMap <>();
364- for (String entityId : entitiesMap .keySet ()) {
365- Map <String , Document > transformedUpdateOperations =
366- transformUpdateOperations (
367- entitiesMap .get (entityId ).getUpdateOperationList (), requestContext );
368- if (transformedUpdateOperations .isEmpty ()) {
369- continue ;
370- }
371- entitiesUpdateMap .put (
372- new SingleValueKey (requestContext .getTenantId ().orElseThrow (), entityId ),
373- transformedUpdateOperations );
374- }
375-
376- if (entitiesUpdateMap .isEmpty ()) {
377- LOG .error ("There are no entities to update!" );
378- return ;
379- }
380-
381- try {
382- entitiesCollection .bulkUpdateSubDocs (entitiesUpdateMap );
383- } catch (Exception e ) {
384- LOG .error ("Failed to update entities {}" , entitiesMap , e );
385- throw e ;
386- }
387- }
388-
389- private Map <String , Document > transformUpdateOperations (
390- List <UpdateOperation > updateOperationList , RequestContext requestContext ) throws Exception {
391- Map <String , Document > documentMap = new HashMap <>();
392- for (UpdateOperation updateOperation : updateOperationList ) {
393- if (!updateOperation .hasSetAttribute ()) {
394- continue ;
395- }
396- SetAttribute setAttribute = updateOperation .getSetAttribute ();
397- String attributeId = setAttribute .getAttribute ().getColumnName ();
398- String subDocPath =
399- entityAttributeMapping
400- .getDocStorePathByAttributeId (requestContext , attributeId )
401- .orElseThrow (
402- () -> new IllegalArgumentException ("Unknown attribute FQN " + attributeId ));
403- AttributeValue attributeValue =
404- EntityQueryConverter .convertToAttributeValue (setAttribute .getValue ()).build ();
405- try {
406- String jsonValue = DocStoreJsonFormat .printer ().print (attributeValue );
407- documentMap .put (subDocPath , new JSONDocument (jsonValue ));
408- } catch (Exception e ) {
409- LOG .error ("Failed to put update corresponding to {} in the documentMap" , subDocPath , e );
410- throw e ;
411- }
412305 }
413- return Collections .unmodifiableMap (documentMap );
414306 }
415307
416308 @ Override
0 commit comments