Spade
Mini Shell
| Directory:~$ /lib64/python3.6/asyncio/__pycache__/ |
| [Home] [System Details] [Kill Me] |
3
\�:�
@sdZddlZddlZddlZddlZddlZddlZddlZddlZddl Z ddl
Z
ddlZddlZddl
Z
ddlZddl
mZddlmZddlmZmZyddlZWnek
r�dZYnXddlmZddlmZdd lmZdd
lmZddlmZddlmZdd
lmZddl
m!Z!ddl"m#Z#e j$dk�rHddl%m&Z&nddlm&Z&dd�Z'e'd�Z(e'd�Z)dd�Z*dd�Z+dRdd�Z,dd�Z-Gdd
�d
e�Z.Gd!d"�d"e�Z/Gd#d$�d$�Z0Gd%d&�d&e0e/�Z1d'd(�d)d*�Z2e3ed+��rZGd,d-�d-ej4e�Z5Gd.d/�d/e5e�Z6Gd0d1�d1e6�Z7Gd2d3�d3e0e7�Z8d4d5�Z9ej:d6d7��Z;ej:d'd(�d8d9��Z<ej:d:dd'd;�d<d=��Z=d>d?�Z>Gd@dA�dAej?�Z@GdBdC�dCejA�ZBdDdE�ZCGdFdG�dGeD�ZEdHdI�ZFGdJdK�dKe
jG�ZGej:dLdM��ZHejIejJejKfdNdO�ZLdPdQ�ZMdS)SzUtilities
shared by tests.�N)�mock)�
HTTPServer)�WSGIRequestHandler�
WSGIServer�)�base_events)�compat)�events)�futures)� selectors)�tasks)� coroutine)�logger)�supportZwin32)�
socketpaircCs`ttd�r*tjjtj|�}tjj|�r*|Stjjtjjtj�d|�}tjj|�rT|St |��dS)N�
TEST_HOME_DIR�test)
�hasattrr�os�path�joinr�isfile�dirname�__file__�FileNotFoundError)�filename�fullname�r�"/usr/lib64/python3.6/test_utils.py� data_file-s
rzssl_cert.pemzssl_key.pemcCstdkrdStjtj�SdS)N)�ssl�
SSLContextZPROTOCOL_SSLv23rrrr�dummy_ssl_context<sr"c
Cs@tdd��}|�}|j|�}d|_z|j|�Wd|j�XdS)NcSsdS)Nrrrrr�onceDszrun_briefly.<locals>.onceF)r
Zcreate_taskZ_log_destroy_pending�run_until_complete�close)�loopr#�gen�trrr�run_brieflyCs
r)�cCsTtj�|}xB|�sN|dk r8|tj�}|dkr8tj��|jtjd|d��qWdS)Nrg����MbP?)r&)�timer
�TimeoutErrorr$rZsleep)r&Zpred�timeoutZdeadlinerrr� run_untilRsr.cCs|j|j�|j�dS)z�Legacy
API to run once through the event loop.
This is the recommended pattern for test code. It will poll the
selector once and run all callbacks scheduled in response to I/O
events.
N)Z call_soon�stopZrun_forever)r&rrr�run_once\sr0c@seZdZdd�Zdd�ZdS)�SilentWSGIRequestHandlercCstj�S)N)�io�StringIO)�selfrrr�
get_stderrisz#SilentWSGIRequestHandler.get_stderrcGsdS)Nr)r4�format�argsrrr�log_messagelsz$SilentWSGIRequestHandler.log_messageN)�__name__�
__module__�__qualname__r5r8rrrrr1gsr1cs(eZdZdZ�fdd�Zdd�Z�ZS)�SilentWSGIServer�cs"t�j�\}}|j|j�||fS)N)�super�get_request�
settimeout�request_timeout)r4�request�client_addr)� __class__rrr?tszSilentWSGIServer.get_requestcCsdS)Nr)r4rB�client_addressrrr�handle_erroryszSilentWSGIServer.handle_error)r9r:r;rAr?rF�
__classcell__rr)rDrr<psr<c@seZdZdd�ZdS)�SSLWSGIServerMixincCs^t}t}tj�}|j||�|j|dd�}y|j|||�|j�Wntk
rXYnXdS)NT)Zserver_side) �ONLYKEY�ONLYCERTr
r!Zload_cert_chainZwrap_socketZRequestHandlerClassr%�OSError)r4rBrEZkeyfileZcertfile�contextZssockrrr�finish_requestsz!SSLWSGIServerMixin.finish_requestN)r9r:r;rMrrrrrH}srHc@seZdZdS)�
SSLWSGIServerN)r9r:r;rrrrrN�srNF)�use_sslc
#svdd�}|r|n|}||t���j|��j�_tj�fdd�d�}|j�z
�VWd�j��j�|j �XdS)NcSsd}dg}|||�dgS)Nz200
OK�Content-type�
text/plainsTest
message)rPrQr)�environZstart_responseZstatusZheadersrrr�app�s
z_run_test_server.<locals>.appcs�jdd�S)Ng�������?)Z
poll_interval)Z
serve_foreverr)�httpdrr�<lambda>�sz"_run_test_server.<locals>.<lambda>)�target)
r1Zset_appZserver_address�address� threadingZThread�start�shutdownZserver_closer)rWrO�
server_cls�server_ssl_clsrSZserver_classZ
server_threadr)rTr�_run_test_server�s
r]ZAF_UNIXc@seZdZdd�ZdS)�UnixHTTPServercCstjj|�d|_d|_dS)Nz 127.0.0.1�P)�socketserver�UnixStreamServer�server_bindZserver_nameZserver_port)r4rrrrb�szUnixHTTPServer.server_bindN)r9r:r;rbrrrrr^�sr^cs(eZdZdZdd�Z�fdd�Z�ZS)�UnixWSGIServerr=cCstj|�|j�dS)N)r^rbZ
setup_environ)r4rrrrb�s
zUnixWSGIServer.server_bindcs"t�j�\}}|j|j�|dfS)N� 127.0.0.1�)rdre)r>r?r@rA)r4rBrC)rDrrr?�szUnixWSGIServer.get_request)r9r:r;rArbr?rGrr)rDrrc�srcc@seZdZdd�ZdS)�SilentUnixWSGIServercCsdS)Nr)r4rBrErrrrF�sz!SilentUnixWSGIServer.handle_errorN)r9r:r;rFrrrrrf�srfc@seZdZdS)�UnixSSLWSGIServerN)r9r:r;rrrrrg�srgc Cstj��}|jSQRXdS)N)�tempfileZNamedTemporaryFile�name)�filerrr�gen_unix_socket_path�s
rkccs<t�}z
|VWdytj|�Wntk
r4YnXXdS)N)rkr�unlinkrK)rrrr�unix_socket_path�s
rmc
cs,t��}t||ttd�EdHWdQRXdS)N)rWrOr[r\)rmr]rfrg)rOrrrr�run_test_unix_server�srnz 127.0.0.1)�host�portrOccst||f|ttd�EdHdS)N)rWrOr[r\)r]r<rN)rorprOrrr�run_test_server�s
rqcCsPi}x4t|�D](}|jd�r(|jd�r(qtdd�||<qWtd|f|j|��S)N�__)�return_valueZTestProtocol)�dir�
startswith�endswith�MockCallback�type� __bases__)�baseZdctrirrr�make_test_protocol�sr{c@s6eZdZdd�Zddd�Zdd�Zdd �Zd
d�ZdS)
�TestSelectorcCs
i|_dS)N)�keys)r4rrr�__init__szTestSelector.__init__NcCstj|d||�}||j|<|S)Nr)rZSelectorKeyr})r4�fileobjr �data�keyrrr�registers
zTestSelector.registercCs|jj|�S)N)r}�pop)r4rrrr�
unregisterszTestSelector.unregistercCsgS)Nr)r4r-rrr�selectszTestSelector.selectcCs|jS)N)r})r4rrr�get_mapszTestSelector.get_map)N)r9r:r;r~r�r�r�r�rrrrr|s
r|cs�eZdZdZd-�fdd� Zdd�Zdd�Z�fd d
�Zdd�Zd
d�Z dd�Z
dd�Zdd�Zdd�Z
dd�Zdd�Zdd�Zdd�Zdd
�Zd!d"�Zd#d$�Z�fd%d&�Z�fd'd(�Zd)d*�Zd+d,�Z�ZS).�TestLoopa�Loop
for unittests.
It manages self time directly.
If something scheduled to be executed later then
on next loop iteration after all ready handlers done
generator passed to __init__ is calling.
Generator should be like this:
def gen():
...
when = yield ...
... = yield time_advance
Value returned by yield is absolute time of next scheduled handler.
Value passed to yield is time advance to move loop's time forward.
Ncsvt�j�|dkr"dd�}d|_nd|_|�|_t|j�d|_d|_g|_t�|_ i|_
i|_|j�t
j�|_dS)Ncss
dVdS)Nrrrrrr',szTestLoop.__init__.<locals>.genFTrg��&�.>)r>r~�_check_on_close�_gen�next�_timeZ_clock_resolution�_timersr|Z _selector�readers�writers�reset_counters�weakref�WeakValueDictionary�_transports)r4r')rDrrr~(s
zTestLoop.__init__cCs|jS)N)r�)r4rrrr+?sz
TestLoop.timecCs|r|j|7_dS)zMove
test time
forward.N)r�)r4�advancerrr�advance_timeBszTestLoop.advance_timecsBt�j�|jr>y|jjd�Wntk
r4Yn
Xtd��dS)NrzTime generator is not
finished)r>r%r�r��send�
StopIteration�AssertionError)r4)rDrrr%Gs
zTestLoop.closecGstj|||�|j|<dS)N)r �Handler�)r4�fd�callbackr7rrr�_add_readerQszTestLoop._add_readercCs0|j|d7<||jkr(|j|=dSdSdS)NrTF)�remove_reader_countr�)r4r�rrr�_remove_readerTs
zTestLoop._remove_readercGsh||jkrtd|�d���|j|}|j|krDtd|j�d|����|j|krdtd|j�d|����dS)Nzfd
z is not registeredzunexpected callback: z != zunexpected callback
args:
)r�r�� _callback�_args)r4r�r�r7�handlerrr�
assert_reader\s
zTestLoop.assert_readercCs||jkrtd|�d���dS)Nzfd
z is registered)r�r�)r4r�rrr�assert_no_readergs
zTestLoop.assert_no_readercGstj|||�|j|<dS)N)r r�r�)r4r�r�r7rrr�_add_writerkszTestLoop._add_writercCs0|j|d7<||jkr(|j|=dSdSdS)NrTF)�remove_writer_countr�)r4r�rrr�_remove_writerns
zTestLoop._remove_writercGs^||jkstdj|���|j|}|j|ks>tdj|j|���|j|ksZtdj|j|���dS)Nzfd
{} is not registeredz{!r} !=
{!r})r�r�r6r�r�)r4r�r�r7r�rrr�
assert_writervs
zTestLoop.assert_writercCs8y|j|}Wntk
r"YnXtdj||���dS)Nz.File descriptor {!r} is used by
transport
{!r})r��KeyError�RuntimeErrorr6)r4r�Z transportrrr�_ensure_fd_no_transport~sz
TestLoop._ensure_fd_no_transportcGs|j|�|j||f|��S)zAdd
a reader callback.)r�r�)r4r�r�r7rrr�
add_reader�s
zTestLoop.add_readercCs|j|�|j|�S)zRemove a reader
callback.)r�r�)r4r�rrr�
remove_reader�s
zTestLoop.remove_readercGs|j|�|j||f|��S)zAdd a
writer callback..)r�r�)r4r�r�r7rrr�
add_writer�s
zTestLoop.add_writercCs|j|�|j|�S)zRemove a writer
callback.)r�r�)r4r�rrr�
remove_writer�s
zTestLoop.remove_writercCstjt�|_tjt�|_dS)N)�collections�defaultdict�intr�r�)r4rrrr��szTestLoop.reset_counterscs:t�j�x$|jD]}|jj|�}|j|�qWg|_dS)N)r>� _run_oncer�r�r�r�)r4�whenr�)rDrrr��s
zTestLoop._run_oncecs
|jj|�t�j||f|��S)N)r��appendr>�call_at)r4r�r�r7)rDrrr��szTestLoop.call_atcCsdS)Nr)r4Z
event_listrrr�_process_events�szTestLoop._process_eventscCsdS)Nr)r4rrr�_write_to_self�szTestLoop._write_to_self)N)r9r:r;�__doc__r~r+r�r%r�r�r�r�r�r�r�r�r�r�r�r�r�r�r�r�r�rGrr)rDrr�s,
r�cKstjfddgi|��S)N�spec�__call__)rZMock)�kwargsrrrrw�srwc@seZdZdZdd�ZdS)�MockPatternz�A
regex based str with a fuzzy __eq__.
Use this helper with 'mock.assert_called_with', or anywhere
where a regex comparison between strings is needed.
For instance:
mock_call.assert_called_with(MockPattern('spam.*ham'))
cCsttjt|�|tj��S)N)�bool�re�search�str�S)r4�otherrrr�__eq__�szMockPattern.__eq__N)r9r:r;r�r�rrrrr��sr�cCs$tj|�}|dkr
td|f��|S)Nzunable to get the source of
%r)r Z_get_function_source�
ValueError)�func�sourcerrr�get_function_source�s
r�c@sVeZdZedd��Zdd�dd�Zddd �Zd
d�Zdd
�Zdd�Z e
jsRdd�ZdS)�TestCasecCs&|j}|dk r|jdd�|j�dS)NT)�wait)Z_default_executorrZr%)r&�executorrrr�
close_loop�szTestCase.close_loopT)�cleanupcCs,|dk st�tjd�|r(|j|j|�dS)N)r�r �set_event_loopZ
addCleanupr�)r4r&r�rrrr��s
zTestCase.set_event_loopNcCst|�}|j|�|S)N)r�r�)r4r'r&rrr�
new_test_loop�s
zTestCase.new_test_loopcCs|jt_dS)N)�_get_running_loopr )r4rrr�unpatch_get_running_loop�sz!TestCase.unpatch_get_running_loopcCs
tj|_dd�t_tj�|_dS)NcSsdS)NrrrrrrU�sz
TestCase.setUp.<locals>.<lambda>)r r�rZthreading_setup�_thread_cleanup)r4rrr�setUp�s
zTestCase.setUpcCsB|j�tjd�|jtj�d�|j�tj|j �tj
�dS)N)NNN)r�r r�ZassertEqual�sys�exc_infoZ
doCleanupsrZthreading_cleanupr�Z
reap_children)r4rrr�tearDown�s
zTestCase.tearDowncOsGdd�d�}|�S)Nc@seZdZdd�Zdd�ZdS)z!TestCase.subTest.<locals>.EmptyCMcSsdS)Nr)r4rrr� __enter__�sz+TestCase.subTest.<locals>.EmptyCM.__enter__cWsdS)Nr)r4�excrrr�__exit__�sz*TestCase.subTest.<locals>.EmptyCM.__exit__N)r9r:r;r�r�rrrr�EmptyCM�sr�r)r4r7r�r�rrr�subTest�szTestCase.subTest)N)
r9r:r;�staticmethodr�r�r�r�r�r�rZPY34r�rrrrr��s
r�ccs2tj}ztjtjd�dVWdtj|�XdS)zrContext
manager to disable asyncio logger.
For example, it can be used to ignore warnings in debug mode.
rN)r�levelZsetLevel�loggingZCRITICAL)Z old_levelrrr�disable_logger�s
r�cCs*tjtj�}||_||_||_d|j_|S)z'Create a mock
of a non-blocking socket.g)rZ MagicMock�socket�protorx�familyZ
gettimeoutrs)r�rxr�Zsockrrr�mock_nonblocking_socketsr�cCstjddd�S)Nz'asyncio.sslproto._is_sslproto_availableF)rs)rZpatchrrrr�force_legacy_ssl_supportsr�)r*)Nr�r��
contextlibr2r�rr�r�r`r�rhrXr+Zunittestr�rZhttp.serverrZwsgiref.simple_serverrrr
�ImportErrorrerrr r
rrZ
coroutinesr
�logrrr�platformZ
windows_utilsrrrJrIr"r)r.r0r1r<rHrNr]rrar^rcrfrgrk�contextmanagerrmrnrqr{ZBaseSelectorr|Z
BaseEventLoopr�rwr�r�r�r�r�ZIPPROTO_TCPZSOCK_STREAMZAF_INETr�r�rrrr�<module>s�
4