@@ -75,8 +75,8 @@ def testTimeoutOpen(self):
7575
7676class SocketStub (object ):
7777 ''' a socket proxy that re-defines sendall() '''
78- def __init__ (self , reads = [] ):
79- self .reads = reads
78+ def __init__ (self , reads = () ):
79+ self .reads = list ( reads ) # Intentionally make a copy.
8080 self .writes = []
8181 self .block = False
8282 def sendall (self , data ):
@@ -102,7 +102,7 @@ def msg(self, msg, *args):
102102 self ._messages += out .getvalue ()
103103 return
104104
105- def new_select (* s_args ):
105+ def mock_select (* s_args ):
106106 block = False
107107 for l in s_args :
108108 for fob in l :
@@ -113,6 +113,30 @@ def new_select(*s_args):
113113 else :
114114 return s_args
115115
116+ class MockPoller (object ):
117+ test_case = None # Set during TestCase setUp.
118+
119+ def __init__ (self ):
120+ self ._file_objs = []
121+
122+ def register (self , fd , eventmask ):
123+ self .test_case .assertTrue (hasattr (fd , 'fileno' ), fd )
124+ self .test_case .assertEqual (eventmask , select .POLLIN | select .POLLPRI )
125+ self ._file_objs .append (fd )
126+
127+ def poll (self , timeout = None ):
128+ block = False
129+ for fob in self ._file_objs :
130+ if isinstance (fob , TelnetAlike ):
131+ block = fob .sock .block
132+ if block :
133+ return []
134+ else :
135+ return zip (self ._file_objs , [select .POLLIN ]* len (self ._file_objs ))
136+
137+ def unregister (self , fd ):
138+ self ._file_objs .remove (fd )
139+
116140@contextlib .contextmanager
117141def test_socket (reads ):
118142 def new_conn (* ignored ):
@@ -125,23 +149,36 @@ def new_conn(*ignored):
125149 socket .create_connection = old_conn
126150 return
127151
128- def test_telnet (reads = [] , cls = TelnetAlike ):
152+ def test_telnet (reads = () , cls = TelnetAlike , use_poll = None ):
129153 ''' return a telnetlib.Telnet object that uses a SocketStub with
130154 reads queued up to be read '''
131155 for x in reads :
132156 assert type (x ) is bytes , x
133157 with test_socket (reads ):
134158 telnet = cls ('dummy' , 0 )
135159 telnet ._messages = '' # debuglevel output
160+ if use_poll is not None :
161+ if use_poll and not telnet ._has_poll :
162+ raise unittest .SkipTest ('select.poll() required.' )
163+ telnet ._has_poll = use_poll
136164 return telnet
137165
138- class ReadTests (TestCase ):
166+
167+ class ExpectAndReadTestCase (TestCase ):
139168 def setUp (self ):
140169 self .old_select = select .select
141- select .select = new_select
170+ self .old_poll = select .poll
171+ select .select = mock_select
172+ select .poll = MockPoller
173+ MockPoller .test_case = self
174+
142175 def tearDown (self ):
176+ MockPoller .test_case = None
177+ select .poll = self .old_poll
143178 select .select = self .old_select
144179
180+
181+ class ReadTests (ExpectAndReadTestCase ):
145182 def test_read_until (self ):
146183 """
147184 read_until(expected, timeout=None)
@@ -158,6 +195,21 @@ def test_read_until(self):
158195 data = telnet .read_until (b'match' )
159196 self .assertEqual (data , expect )
160197
198+ def test_read_until_with_poll (self ):
199+ """Use select.poll() to implement telnet.read_until()."""
200+ want = [b'x' * 10 , b'match' , b'y' * 10 ]
201+ telnet = test_telnet (want , use_poll = True )
202+ select .select = lambda * _ : self .fail ('unexpected select() call.' )
203+ data = telnet .read_until (b'match' )
204+ self .assertEqual (data , b'' .join (want [:- 1 ]))
205+
206+ def test_read_until_with_select (self ):
207+ """Use select.select() to implement telnet.read_until()."""
208+ want = [b'x' * 10 , b'match' , b'y' * 10 ]
209+ telnet = test_telnet (want , use_poll = False )
210+ select .poll = lambda * _ : self .fail ('unexpected poll() call.' )
211+ data = telnet .read_until (b'match' )
212+ self .assertEqual (data , b'' .join (want [:- 1 ]))
161213
162214 def test_read_all (self ):
163215 """
@@ -349,8 +401,38 @@ def test_debug_accepts_str_port(self):
349401 self .assertRegex (telnet ._messages , r'0.*test' )
350402
351403
404+ class ExpectTests (ExpectAndReadTestCase ):
405+ def test_expect (self ):
406+ """
407+ expect(expected, [timeout])
408+ Read until the expected string has been seen, or a timeout is
409+ hit (default is no timeout); may block.
410+ """
411+ want = [b'x' * 10 , b'match' , b'y' * 10 ]
412+ telnet = test_telnet (want )
413+ (_ ,_ ,data ) = telnet .expect ([b'match' ])
414+ self .assertEqual (data , b'' .join (want [:- 1 ]))
415+
416+ def test_expect_with_poll (self ):
417+ """Use select.poll() to implement telnet.expect()."""
418+ want = [b'x' * 10 , b'match' , b'y' * 10 ]
419+ telnet = test_telnet (want , use_poll = True )
420+ select .select = lambda * _ : self .fail ('unexpected select() call.' )
421+ (_ ,_ ,data ) = telnet .expect ([b'match' ])
422+ self .assertEqual (data , b'' .join (want [:- 1 ]))
423+
424+ def test_expect_with_select (self ):
425+ """Use select.select() to implement telnet.expect()."""
426+ want = [b'x' * 10 , b'match' , b'y' * 10 ]
427+ telnet = test_telnet (want , use_poll = False )
428+ select .poll = lambda * _ : self .fail ('unexpected poll() call.' )
429+ (_ ,_ ,data ) = telnet .expect ([b'match' ])
430+ self .assertEqual (data , b'' .join (want [:- 1 ]))
431+
432+
352433def test_main (verbose = None ):
353- support .run_unittest (GeneralTests , ReadTests , WriteTests , OptionTests )
434+ support .run_unittest (GeneralTests , ReadTests , WriteTests , OptionTests ,
435+ ExpectTests )
354436
355437if __name__ == '__main__' :
356438 test_main ()
0 commit comments