Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 30 additions & 44 deletions Lib/asyncio/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,39 +288,30 @@ def _set_result_unless_cancelled(fut, result):
fut.set_result(result)


def _set_concurrent_future_state(concurrent, source):
"""Copy state from a future to a concurrent.futures.Future."""
assert source.done()
if source.cancelled():
concurrent.cancel()
if not concurrent.set_running_or_notify_cancel():
return
exception = source.exception()
if exception is not None:
concurrent.set_exception(exception)
else:
result = source.result()
concurrent.set_result(result)


def _copy_future_state(source, dest):
"""Internal helper to copy state from another Future.

The other Future may be a concurrent.futures.Future.
"""
assert source.done()
if dest.cancelled():

if source.cancelled() and not dest.done():
dest.cancel()

if isinstance(dest, concurrent.futures.Future):
if not dest.set_running_or_notify_cancel():
return
elif dest.done():
return

assert not dest.done()
if source.cancelled():
dest.cancel()

exception = source.exception()
if exception is not None:
dest.set_exception(exception)
else:
exception = source.exception()
if exception is not None:
dest.set_exception(exception)
else:
result = source.result()
dest.set_result(result)
result = source.result()
dest.set_result(result)


def _chain_future(source, destination):
Expand All @@ -339,32 +330,27 @@ def _chain_future(source, destination):
source_loop = _get_loop(source) if isfuture(source) else None
dest_loop = _get_loop(destination) if isfuture(destination) else None

def _set_state(future, other):
if isfuture(future):
_copy_future_state(other, future)
def _call_set_state(future):
if future == source:
other = destination
other_loop = dest_loop
future_loop = source_loop
else:
_set_concurrent_future_state(future, other)

def _call_check_cancel(destination):
if destination.cancelled():
if source_loop is None or source_loop is dest_loop:
source.cancel()
else:
source_loop.call_soon_threadsafe(source.cancel)

def _call_set_state(source):
if (destination.cancelled() and
dest_loop is not None and dest_loop.is_closed()):
other = source
other_loop = source_loop
future_loop = dest_loop

if other.done():
return
if dest_loop is None or dest_loop is source_loop:
_set_state(destination, source)

if other_loop is None or other_loop is future_loop:
_copy_future_state(future, other)
else:
dest_loop.call_soon_threadsafe(_set_state, destination, source)
other_loop.call_soon_threadsafe(_copy_future_state, future, other)

destination.add_done_callback(_call_check_cancel)
destination.add_done_callback(_call_set_state)
source.add_done_callback(_call_set_state)


def wrap_future(future, *, loop=None):
"""Wrap concurrent.futures.Future object."""
if isfuture(future):
Expand Down
32 changes: 32 additions & 0 deletions Lib/test/test_asyncio/test_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,38 @@ def test_wrap_future_cancel2(self):
self.assertEqual(f1.result(), 42)
self.assertTrue(f2.cancelled())

def test_wrap_future_symmetry1(self):
f1 = concurrent.futures.Future()
f2 = asyncio.wrap_future(f1, loop=self.loop)
f1.set_result(42)
test_utils.run_briefly(self.loop)
self.assertEqual(f1.result(), 42)
self.assertEqual(f2.result(), 42)

def test_wrap_future_symmetry2(self):
f1 = concurrent.futures.Future()
f2 = asyncio.wrap_future(f1, loop=self.loop)
f2.set_result(42)
test_utils.run_briefly(self.loop)
self.assertEqual(f1.result(), 42)
self.assertEqual(f2.result(), 42)

def test_wrap_future_symmetry3(self):
f1 = concurrent.futures.Future()
f2 = asyncio.wrap_future(f1, loop=self.loop)
f2.cancel()
test_utils.run_briefly(self.loop)
self.assertTrue(f1.cancelled())
self.assertTrue(f2.cancelled())

def test_wrap_future_symmetry4(self):
f1 = concurrent.futures.Future()
f2 = asyncio.wrap_future(f1, loop=self.loop)
f1.cancel()
test_utils.run_briefly(self.loop)
self.assertTrue(f1.cancelled())
self.assertTrue(f2.cancelled())

def test_future_source_traceback(self):
self.loop.set_debug(True)

Expand Down
14 changes: 14 additions & 0 deletions Misc/NEWS.d/next/Library/2018-08-18-16-29-56.bpo-34430.WbEvhs.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
The two futures chained by :func:`asyncio.future.wrap_future` are now
symmetrical.

Before, the behaviour was:
1) When the wrapped future gets a result, the new future gets the same result,
2) When the new future is cancelled, the wrapped future is cancelled

now, these new behaviours have been implemented:
3) When the new future gets a result, the wrapped future gets the same result,
4) When the wrapped future is cancelled, the new future is cancelled

so, the new behaviour is:
1) When either one future is done, the other is done with the same results or exceptions,
2) When either one future is cancelled, the other is cancelled.