-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathtest_rpc_queue_e2e.py
More file actions
168 lines (139 loc) · 6.35 KB
/
test_rpc_queue_e2e.py
File metadata and controls
168 lines (139 loc) · 6.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""E2E coverage for session.queue RPC methods."""
from __future__ import annotations
import asyncio
import time
import uuid
import pytest
from copilot.rpc import (
CommandsRespondToQueuedCommandRequest,
EnqueueCommandParams,
QueuedCommandHandled,
QueuePendingItems,
QueuePendingItemsKind,
RegisterEventInterestParams,
ReleaseEventInterestParams,
)
from copilot.session import PermissionHandler
from copilot.session_events import CommandQueuedData
from .testharness import E2ETestContext
pytestmark = pytest.mark.asyncio(loop_scope="module")
def _is_pending_command(item: QueuePendingItems, command: str) -> bool:
return item.kind == QueuePendingItemsKind.COMMAND and (
item.display_text == command or command.lstrip("/") in item.display_text
)
async def _wait_for_command_in_pending_items(session, command: str) -> QueuePendingItems:
deadline = time.monotonic() + 30.0
last_items = []
while time.monotonic() < deadline:
pending = await session.rpc.queue.pending_items()
last_items = pending.items
for item in pending.items:
if _is_pending_command(item, command):
assert item.kind == QueuePendingItemsKind.COMMAND
assert command.lstrip("/") in item.display_text
return item
await asyncio.sleep(0.2)
raise AssertionError(f"Timed out waiting for {command!r} in pending items: {last_items!r}")
async def _wait_for_command_not_in_pending_items(session, command: str) -> None:
deadline = time.monotonic() + 30.0
while time.monotonic() < deadline:
pending = await session.rpc.queue.pending_items()
if not any(_is_pending_command(item, command) for item in pending.items):
return
await asyncio.sleep(0.2)
pytest.fail(f"Timed out waiting for {command!r} to leave pending items.")
async def _assert_queue_empty(session) -> None:
pending = await session.rpc.queue.pending_items()
assert pending.items == []
assert pending.steering_messages == []
class TestRpcQueue:
async def test_fresh_queue_is_empty_and_empty_mutations_are_noops(self, ctx: E2ETestContext):
session = await ctx.client.create_session(
on_permission_request=PermissionHandler.approve_all,
)
try:
await _assert_queue_empty(session)
remove = await session.rpc.queue.remove_most_recent()
assert remove.removed is False
await _assert_queue_empty(session)
await session.rpc.queue.clear()
await _assert_queue_empty(session)
remove_after_clear = await session.rpc.queue.remove_most_recent()
assert remove_after_clear.removed is False
finally:
await session.disconnect()
async def test_pending_items_reports_queued_command_and_mutations_update_queue(
self, ctx: E2ETestContext
):
session = await ctx.client.create_session(
on_permission_request=PermissionHandler.approve_all,
)
interest = None
first_event = None
responded_to_first = False
try:
interest = await session.rpc.event_log.register_interest(
RegisterEventInterestParams(event_type="command.queued")
)
first_command = f"/sdk-queue-first-{uuid.uuid4().hex}"
second_command = f"/sdk-queue-second-{uuid.uuid4().hex}"
third_command = f"/sdk-queue-third-{uuid.uuid4().hex}"
first_queued: asyncio.Future = asyncio.get_event_loop().create_future()
def on_event(event):
if (
isinstance(event.data, CommandQueuedData)
and event.data.command == first_command
and not first_queued.done()
):
first_queued.set_result(event)
unsubscribe = session.on(on_event)
try:
first = await session.rpc.commands.enqueue(
EnqueueCommandParams(command=first_command)
)
assert first.queued is True
first_event = await asyncio.wait_for(first_queued, timeout=30.0)
finally:
unsubscribe()
second = await session.rpc.commands.enqueue(
EnqueueCommandParams(command=second_command)
)
assert second.queued is True
await _wait_for_command_in_pending_items(session, second_command)
remove = await session.rpc.queue.remove_most_recent()
assert remove.removed is True
await _wait_for_command_not_in_pending_items(session, second_command)
third = await session.rpc.commands.enqueue(EnqueueCommandParams(command=third_command))
assert third.queued is True
await _wait_for_command_in_pending_items(session, third_command)
await session.rpc.queue.clear()
await _wait_for_command_not_in_pending_items(session, third_command)
completed = await session.rpc.commands.respond_to_queued_command(
CommandsRespondToQueuedCommandRequest(
request_id=first_event.data.request_id,
result=QueuedCommandHandled(stop_processing_queue=True),
)
)
responded_to_first = completed.success
assert completed.success is True
deadline = time.monotonic() + 30.0
while time.monotonic() < deadline:
pending = await session.rpc.queue.pending_items()
if pending.items == [] and pending.steering_messages == []:
break
await asyncio.sleep(0.2)
await _assert_queue_empty(session)
finally:
if not responded_to_first and first_event is not None:
await session.rpc.commands.respond_to_queued_command(
CommandsRespondToQueuedCommandRequest(
request_id=first_event.data.request_id,
result=QueuedCommandHandled(stop_processing_queue=True),
)
)
await session.rpc.queue.clear()
if interest is not None and interest.handle:
await session.rpc.event_log.release_interest(
ReleaseEventInterestParams(handle=interest.handle)
)
await session.disconnect()