Skip to content

Commit 9e08e57

Browse files
committed
Merge pull request googleapis#1471 from dhermes/bigtable-row-commit
Implementing Row.commit() in Bigtable.
2 parents 02c5689 + 64b8ef0 commit 9e08e57

5 files changed

Lines changed: 336 additions & 4 deletions

File tree

gcloud/bigquery/test_table.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
# pylint: disable=too-many-lines
21
# Copyright 2015 Google Inc. All rights reserved.
32
#
43
# Licensed under the Apache License, Version 2.0 (the "License");

gcloud/bigtable/happybase/connection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def _get_cluster(timeout=None):
5353
:rtype: :class:`gcloud.bigtable.cluster.Cluster`
5454
:returns: The unique cluster owned by the project inferred from
5555
the environment.
56-
:raises: :class:`ValueError <exceptions.ValueError>` if their is a failed
56+
:raises: :class:`ValueError <exceptions.ValueError>` if there is a failed
5757
zone or any number of clusters other than one.
5858
"""
5959
client_kwargs = {'admin': True}

gcloud/bigtable/row.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@
2222
from gcloud._helpers import _microseconds_from_datetime
2323
from gcloud._helpers import _to_bytes
2424
from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2
25+
from gcloud.bigtable._generated import (
26+
bigtable_service_messages_pb2 as messages_pb2)
2527

2628

29+
_MAX_MUTATIONS = 100000
2730
_PACK_I64 = struct.Struct('>q').pack
2831

2932

@@ -341,6 +344,111 @@ def delete_cells(self, column_family_id, columns, time_range=None,
341344
# processed without error.
342345
mutations_list.extend(to_append)
343346

347+
def _commit_mutate(self):
348+
"""Makes a ``MutateRow`` API request.
349+
350+
Assumes no filter is set on the :class:`Row` and is meant to be called
351+
by :meth:`commit`.
352+
353+
:raises: :class:`ValueError <exceptions.ValueError>` if the number of
354+
mutations exceeds the ``_MAX_MUTATIONS``.
355+
"""
356+
mutations_list = self._get_mutations()
357+
num_mutations = len(mutations_list)
358+
if num_mutations == 0:
359+
return
360+
if num_mutations > _MAX_MUTATIONS:
361+
raise ValueError('%d total mutations exceed the maximum allowable '
362+
'%d.' % (num_mutations, _MAX_MUTATIONS))
363+
request_pb = messages_pb2.MutateRowRequest(
364+
table_name=self._table.name,
365+
row_key=self._row_key,
366+
mutations=mutations_list,
367+
)
368+
# We expect a `google.protobuf.empty_pb2.Empty`
369+
client = self._table._cluster._client
370+
client._data_stub.MutateRow(request_pb, client.timeout_seconds)
371+
372+
def _commit_check_and_mutate(self):
373+
"""Makes a ``CheckAndMutateRow`` API request.
374+
375+
Assumes a filter is set on the :class:`Row` and is meant to be called
376+
by :meth:`commit`.
377+
378+
:rtype: bool
379+
:returns: Flag indicating if the filter was matched (which also
380+
indicates which set of mutations were applied by the server).
381+
:raises: :class:`ValueError <exceptions.ValueError>` if the number of
382+
mutations exceeds the ``_MAX_MUTATIONS``.
383+
"""
384+
true_mutations = self._get_mutations(state=True)
385+
false_mutations = self._get_mutations(state=False)
386+
num_true_mutations = len(true_mutations)
387+
num_false_mutations = len(false_mutations)
388+
if num_true_mutations == 0 and num_false_mutations == 0:
389+
return
390+
if (num_true_mutations > _MAX_MUTATIONS or
391+
num_false_mutations > _MAX_MUTATIONS):
392+
raise ValueError(
393+
'Exceed the maximum allowable mutations (%d). Had %s true '
394+
'mutations and %d false mutations.' % (
395+
_MAX_MUTATIONS, num_true_mutations, num_false_mutations))
396+
397+
request_pb = messages_pb2.CheckAndMutateRowRequest(
398+
table_name=self._table.name,
399+
row_key=self._row_key,
400+
predicate_filter=self._filter.to_pb(),
401+
true_mutations=true_mutations,
402+
false_mutations=false_mutations,
403+
)
404+
# We expect a `.messages_pb2.CheckAndMutateRowResponse`
405+
client = self._table._cluster._client
406+
resp = client._data_stub.CheckAndMutateRow(
407+
request_pb, client.timeout_seconds)
408+
return resp.predicate_matched
409+
410+
def clear_mutations(self):
411+
"""Removes all currently accumulated mutations on the current row."""
412+
if self._filter is None:
413+
del self._pb_mutations[:]
414+
else:
415+
del self._true_pb_mutations[:]
416+
del self._false_pb_mutations[:]
417+
418+
def commit(self):
419+
"""Makes a ``MutateRow`` or ``CheckAndMutateRow`` API request.
420+
421+
If no mutations have been created in the row, no request is made.
422+
423+
Mutations are applied atomically and in order, meaning that earlier
424+
mutations can be masked / negated by later ones. Cells already present
425+
in the row are left unchanged unless explicitly changed by a mutation.
426+
427+
After committing the accumulated mutations, resets the local
428+
mutations to an empty list.
429+
430+
In the case that a filter is set on the :class:`Row`, the mutations
431+
will be applied conditionally, based on whether the filter matches
432+
any cells in the :class:`Row` or not. (Each method which adds a
433+
mutation has a ``state`` parameter for this purpose.)
434+
435+
:rtype: :class:`bool` or :data:`NoneType <types.NoneType>`
436+
:returns: :data:`None` if there is no filter, otherwise a flag
437+
indicating if the filter was matched (which also
438+
indicates which set of mutations were applied by the server).
439+
:raises: :class:`ValueError <exceptions.ValueError>` if the number of
440+
mutations exceeds the ``_MAX_MUTATIONS``.
441+
"""
442+
if self._filter is None:
443+
result = self._commit_mutate()
444+
else:
445+
result = self._commit_check_and_mutate()
446+
447+
# Reset mutations after commit-ing request.
448+
self.clear_mutations()
449+
450+
return result
451+
344452

345453
class RowFilter(object):
346454
"""Basic filter to apply to cells in a row.

gcloud/bigtable/test_row.py

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,194 @@ def test_delete_cells_with_string_columns(self):
357357
)
358358
self.assertEqual(row._pb_mutations, [expected_pb1, expected_pb2])
359359

360+
def test_commit(self):
361+
from google.protobuf import empty_pb2
362+
from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2
363+
from gcloud.bigtable._generated import (
364+
bigtable_service_messages_pb2 as messages_pb2)
365+
from gcloud.bigtable._testing import _FakeStub
366+
367+
row_key = b'row_key'
368+
table_name = 'projects/more-stuff'
369+
column_family_id = u'column_family_id'
370+
column = b'column'
371+
timeout_seconds = 711
372+
client = _Client(timeout_seconds=timeout_seconds)
373+
table = _Table(table_name, client=client)
374+
row = self._makeOne(row_key, table)
375+
376+
# Create request_pb
377+
value = b'bytes-value'
378+
mutation = data_pb2.Mutation(
379+
set_cell=data_pb2.Mutation.SetCell(
380+
family_name=column_family_id,
381+
column_qualifier=column,
382+
timestamp_micros=-1, # Default value.
383+
value=value,
384+
),
385+
)
386+
request_pb = messages_pb2.MutateRowRequest(
387+
table_name=table_name,
388+
row_key=row_key,
389+
mutations=[mutation],
390+
)
391+
392+
# Create response_pb
393+
response_pb = empty_pb2.Empty()
394+
395+
# Patch the stub used by the API method.
396+
client._data_stub = stub = _FakeStub(response_pb)
397+
398+
# Create expected_result.
399+
expected_result = None # commit() has no return value when no filter.
400+
401+
# Perform the method and check the result.
402+
row.set_cell(column_family_id, column, value)
403+
result = row.commit()
404+
self.assertEqual(result, expected_result)
405+
self.assertEqual(stub.method_calls, [(
406+
'MutateRow',
407+
(request_pb, timeout_seconds),
408+
{},
409+
)])
410+
self.assertEqual(row._pb_mutations, [])
411+
self.assertEqual(row._true_pb_mutations, None)
412+
self.assertEqual(row._false_pb_mutations, None)
413+
414+
def test_commit_too_many_mutations(self):
415+
from gcloud._testing import _Monkey
416+
from gcloud.bigtable import row as MUT
417+
418+
row_key = b'row_key'
419+
table = object()
420+
row = self._makeOne(row_key, table)
421+
row._pb_mutations = [1, 2, 3]
422+
num_mutations = len(row._pb_mutations)
423+
with _Monkey(MUT, _MAX_MUTATIONS=num_mutations - 1):
424+
with self.assertRaises(ValueError):
425+
row.commit()
426+
427+
def test_commit_no_mutations(self):
428+
from gcloud.bigtable._testing import _FakeStub
429+
430+
row_key = b'row_key'
431+
client = _Client()
432+
table = _Table(None, client=client)
433+
row = self._makeOne(row_key, table)
434+
self.assertEqual(row._pb_mutations, [])
435+
436+
# Patch the stub used by the API method.
437+
client._data_stub = stub = _FakeStub()
438+
439+
# Perform the method and check the result.
440+
result = row.commit()
441+
self.assertEqual(result, None)
442+
# Make sure no request was sent.
443+
self.assertEqual(stub.method_calls, [])
444+
445+
def test_commit_with_filter(self):
446+
from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2
447+
from gcloud.bigtable._generated import (
448+
bigtable_service_messages_pb2 as messages_pb2)
449+
from gcloud.bigtable._testing import _FakeStub
450+
from gcloud.bigtable.row import RowSampleFilter
451+
452+
row_key = b'row_key'
453+
table_name = 'projects/more-stuff'
454+
column_family_id = u'column_family_id'
455+
column = b'column'
456+
timeout_seconds = 262
457+
client = _Client(timeout_seconds=timeout_seconds)
458+
table = _Table(table_name, client=client)
459+
row_filter = RowSampleFilter(0.33)
460+
row = self._makeOne(row_key, table, filter_=row_filter)
461+
462+
# Create request_pb
463+
value1 = b'bytes-value'
464+
mutation1 = data_pb2.Mutation(
465+
set_cell=data_pb2.Mutation.SetCell(
466+
family_name=column_family_id,
467+
column_qualifier=column,
468+
timestamp_micros=-1, # Default value.
469+
value=value1,
470+
),
471+
)
472+
value2 = b'other-bytes'
473+
mutation2 = data_pb2.Mutation(
474+
set_cell=data_pb2.Mutation.SetCell(
475+
family_name=column_family_id,
476+
column_qualifier=column,
477+
timestamp_micros=-1, # Default value.
478+
value=value2,
479+
),
480+
)
481+
request_pb = messages_pb2.CheckAndMutateRowRequest(
482+
table_name=table_name,
483+
row_key=row_key,
484+
predicate_filter=row_filter.to_pb(),
485+
true_mutations=[mutation1],
486+
false_mutations=[mutation2],
487+
)
488+
489+
# Create response_pb
490+
predicate_matched = True
491+
response_pb = messages_pb2.CheckAndMutateRowResponse(
492+
predicate_matched=predicate_matched)
493+
494+
# Patch the stub used by the API method.
495+
client._data_stub = stub = _FakeStub(response_pb)
496+
497+
# Create expected_result.
498+
expected_result = predicate_matched
499+
500+
# Perform the method and check the result.
501+
row.set_cell(column_family_id, column, value1, state=True)
502+
row.set_cell(column_family_id, column, value2, state=False)
503+
result = row.commit()
504+
self.assertEqual(result, expected_result)
505+
self.assertEqual(stub.method_calls, [(
506+
'CheckAndMutateRow',
507+
(request_pb, timeout_seconds),
508+
{},
509+
)])
510+
self.assertEqual(row._pb_mutations, None)
511+
self.assertEqual(row._true_pb_mutations, [])
512+
self.assertEqual(row._false_pb_mutations, [])
513+
514+
def test_commit_with_filter_too_many_mutations(self):
515+
from gcloud._testing import _Monkey
516+
from gcloud.bigtable import row as MUT
517+
518+
row_key = b'row_key'
519+
table = object()
520+
filter_ = object()
521+
row = self._makeOne(row_key, table, filter_=filter_)
522+
row._true_pb_mutations = [1, 2, 3]
523+
num_mutations = len(row._true_pb_mutations)
524+
with _Monkey(MUT, _MAX_MUTATIONS=num_mutations - 1):
525+
with self.assertRaises(ValueError):
526+
row.commit()
527+
528+
def test_commit_with_filter_no_mutations(self):
529+
from gcloud.bigtable._testing import _FakeStub
530+
531+
row_key = b'row_key'
532+
client = _Client()
533+
table = _Table(None, client=client)
534+
filter_ = object()
535+
row = self._makeOne(row_key, table, filter_=filter_)
536+
self.assertEqual(row._true_pb_mutations, [])
537+
self.assertEqual(row._false_pb_mutations, [])
538+
539+
# Patch the stub used by the API method.
540+
client._data_stub = stub = _FakeStub()
541+
542+
# Perform the method and check the result.
543+
result = row.commit()
544+
self.assertEqual(result, None)
545+
# Make sure no request was sent.
546+
self.assertEqual(stub.method_calls, [])
547+
360548

361549
class Test_BoolFilter(unittest2.TestCase):
362550

@@ -1345,3 +1533,24 @@ def test_to_pb_false_only(self):
13451533
),
13461534
)
13471535
self.assertEqual(filter_pb, expected_pb)
1536+
1537+
1538+
class _Client(object):
1539+
1540+
data_stub = None
1541+
1542+
def __init__(self, timeout_seconds=None):
1543+
self.timeout_seconds = timeout_seconds
1544+
1545+
1546+
class _Cluster(object):
1547+
1548+
def __init__(self, client=None):
1549+
self._client = client
1550+
1551+
1552+
class _Table(object):
1553+
1554+
def __init__(self, name, client=None):
1555+
self.name = name
1556+
self._cluster = _Cluster(client)

0 commit comments

Comments
 (0)