@@ -283,7 +283,7 @@ def __call__(self):
283283 """
284284
285285 @abstractmethod
286- def receive (self , data ):
286+ def accept (self , data ):
287287 """
288288 Take the data object to be processed.
289289 """
@@ -512,7 +512,7 @@ def __call__(self):
512512 # Nothing to do.
513513 pass
514514
515- def receive (self , data ):
515+ def accept (self , data ):
516516 pass
517517
518518 def ready_for_more (self ):
@@ -527,7 +527,7 @@ def __init__(self, send):
527527 self .send = send
528528 self .view = memoryview (b'' )
529529
530- def receive (self , data ):
530+ def accept (self , data ):
531531 self .view = data
532532
533533 def __call__ (self ):
@@ -558,7 +558,7 @@ def __init__(self, statement, *parameters):
558558
559559 # XXX: A bit of a hack...
560560 # This is actually a good indication that statements need a .copy()
561- # execution method for producing a "Copy" cursor that reads or writes.
561+ # execution method for producing a "CopyCursor" that reads or writes.
562562 class WireReady (BaseException ):
563563 pass
564564 def raise_wire_ready (self ):
@@ -631,7 +631,7 @@ def __call__(self):
631631 self .callable (self .lines )
632632 self .lines = None
633633
634- def receive (self , lines ):
634+ def accept (self , lines ):
635635 self .lines = lines
636636
637637 def ready_for_more (self ):
@@ -651,7 +651,9 @@ def _e_metas(self):
651651
652652 @property
653653 def state (self ):
654- return '<XXX: fail>'
654+ if self .transformer is None :
655+ return 'initialized'
656+ return str (self .producer .total_messages ) + ' messages transferred'
655657
656658 def __init__ (self , producer , * receivers ):
657659 self .producer = producer
@@ -681,7 +683,7 @@ def __exit__(self, typ, val, tb):
681683 # Exiting the CopyManager is a fairly complex operation.
682684 #
683685 # In cases of failure, re-alignment may need to happen
684- # for when the receivers are not on message boundary.
686+ # for when the receivers are not on a message boundary.
685687 ##
686688 if typ is not None and not issubclass (typ , Exception ):
687689 # Don't recover on interrupts.
@@ -750,19 +752,10 @@ def _service_producer(self):
750752 raise
751753
752754 self .transformer (nextdata )
755+
753756 # Distribute data to receivers.
754- # XXX: More of a local state update. Should probably die on failure.
755- faults = {}
756757 for x in self .receivers :
757- try :
758- x .receive (self .transformer .get (x .protocol ))
759- except Exception as e :
760- faults [x ] = e
761- if faults :
762- # The CopyManager is eager.
763- for x in faults :
764- self .receivers .discard (x )
765- raise Fault (self , faults )
758+ x .accept (self .transformer .get (x .protocol ))
766759
767760 def _service_receivers (self ):
768761 faults = {}
@@ -773,6 +766,7 @@ def _service_receivers(self):
773766 except Exception as e :
774767 faults [x ] = e
775768 if faults :
769+ # The CopyManager is eager to continue the operation.
776770 for x in faults :
777771 self .receivers .discard (x )
778772 raise Fault (self , faults )
0 commit comments