Spade
Mini Shell
| Directory:~$ /lib64/python3.6/concurrent/futures/__pycache__/ |
| [Home] [System Details] [Kill Me] |
3
\P�@sbdZdZddlZddlZddlmZddlZddlmZddlZddlm Z ddl
mZddlZddl
Z
ddlmZddlZddlZe
j�Zd ad
d�ZdZGd
d�de�ZGdd�d�Zdd�ZGdd�de�ZGdd�de�ZGdd�de�Zdd�Zdd�Z
dd�Z!dd
�Z"d!d"�Z#d a$da%d#d$�Z&d%d&�Z'Gd'd(�d(e(�Z)Gd)d*�d*ej*�Z+ej,e�dS)+a* Implements
ProcessPoolExecutor.
The follow diagram and text describe the data-flow through the system:
|======================= In-process =====================|== Out-of-process
==|
+----------+ +----------+ +--------+ +-----------+
+---------+
| | => | Work Ids | => | | => | Call Q |
=> | |
| | +----------+ | | +-----------+ |
|
| | | ... | | | | ... | |
|
| | | 6 | | | | 5, call() | |
|
| | | 7 | | | | ... | |
|
| Process | | ... | | Local | +-----------+ |
Process |
| Pool | +----------+ | Worker | |
#1..n |
| Executor | | Thread | |
|
| | +----------- + | | +-----------+ |
|
| | <=> | Work Items | <=> | | <= | Result
Q | <= | |
| | +------------+ | | +-----------+ |
|
| | | 6: call() | | | | ... | |
|
| | | future | | | | 4, result | |
|
| | | ... | | | | 3, except | |
|
+----------+ +------------+ +--------+ +-----------+
+---------+
Executor.submit() called:
- creates a uniquely numbered _WorkItem and adds it to the "Work
Items" dict
- adds the id of the _WorkItem to the "Work Ids" queue
Local worker thread:
- reads work ids from the "Work Ids" queue and looks up the
corresponding
WorkItem from the "Work Items" dict: if the work item has been
cancelled then
it is simply removed from the dict, otherwise it is repackaged as a
_CallItem and put in the "Call Q". New _CallItems are put in
the "Call Q"
until "Call Q" is full. NOTE: the size of the "Call
Q" is kept small because
calls placed in the "Call Q" can no longer be cancelled with
Future.cancel().
- reads _ResultItems from "Result Q", updates the future stored
in the
"Work Items" dict and deletes the dict entry
Process #1..n:
- reads _CallItems from "Call Q", executes the calls, and puts
the resulting
_ResultItems in "Result Q"
z"Brian Quinlan
(brian@sweetapp.com)�N)�_base)�Full)�SimpleQueue)�wait)�partialFcCsJdattj��}x|D]\}}|jd�qWx|D]\}}|j�q2WdS)NT)� _shutdown�list�_threads_queues�items�put�join)r
�t�q�r�/usr/lib64/python3.6/process.py�_python_exitOsr�c@seZdZdd�Zdd�ZdS)�_RemoteTracebackcCs
||_dS)N)�tb)�selfrrrr�__init__asz_RemoteTraceback.__init__cCs|jS)N)r)rrrr�__str__csz_RemoteTraceback.__str__N)�__name__�
__module__�__qualname__rrrrrrr`src@seZdZdd�Zdd�ZdS)�_ExceptionWithTracebackcCs0tjt|�||�}dj|�}||_d||_dS)N�z
"""
%s""")� traceback�format_exception�typer�excr)rr
rrrrrgs
z
_ExceptionWithTraceback.__init__cCst|j|jffS)N)�_rebuild_excr
r)rrrr�
__reduce__lsz"_ExceptionWithTraceback.__reduce__N)rrrrr"rrrrrfsrcCst|�|_|S)N)r� __cause__)r
rrrrr!os
r!c@seZdZdd�ZdS)� _WorkItemcCs||_||_||_||_dS)N)�future�fn�args�kwargs)rr%r&r'r(rrrrtsz_WorkItem.__init__N)rrrrrrrrr$ssr$c@seZdZddd�ZdS)�_ResultItemNcCs||_||_||_dS)N)�work_id� exception�result)rr*r+r,rrrr{sz_ResultItem.__init__)NN)rrrrrrrrr)zsr)c@seZdZdd�ZdS)� _CallItemcCs||_||_||_||_dS)N)r*r&r'r()rr*r&r'r(rrrr�sz_CallItem.__init__N)rrrrrrrrr-�sr-cgs0t|�}x"ttj||��}|s"dS|Vq
WdS)z, Iterates over zip()ed iterables in chunks.
N)�zip�tuple� itertools�islice)� chunksize� iterables�it�chunkrrr�_get_chunks�sr6cs�fdd�|D�S)z�
Processes a chunk of an iterable passed to map.
Runs the function passed to map() on a chunk of the
iterable passed to map.
This function is run in a separate process.
csg|]}�|��qSrr)�.0r')r&rr�
<listcomp>�sz"_process_chunk.<locals>.<listcomp>r)r&r5r)r&r�_process_chunk�s r9cCs�x�|jdd�}|dkr(|jtj��dSy|j|j|j�}WnBtk
r~}z&t||j �}|jt
|j|d��WYdd}~XqX|jt
|j|d��qWdS)a�Evaluates calls from call_queue and places
the results in result_queue.
This worker is run in a separate process.
Args:
call_queue: A multiprocessing.Queue of _CallItems that will be read
and
evaluated by the worker.
result_queue: A multiprocessing.Queue of _ResultItems that will
written
to by the worker.
shutdown: A multiprocessing.Event that will be set as a signal to
the
worker that it should exit when call_queue is empty.
T)�blockN)r+)r,)�getr�os�getpidr&r'r(�
BaseExceptionr�
__traceback__r)r*)�
call_queue�result_queueZ call_item�r�er
rrr�_process_worker�s
&
rDcCsxxr|j�rdSy|jdd�}Wntjk
r4dSX||}|jj�rh|jt||j|j |j
�dd�q||=qqWdS)aMFills call_queue with _WorkItems from
pending_work_items.
This function never blocks.
Args:
pending_work_items: A dict mapping work ids to _WorkItems e.g.
{5: <_WorkItem...>, 6: <_WorkItem...>, ...}
work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work
ids
are consumed and the corresponding _WorkItems from
pending_work_items are transformed into _CallItems and put in
call_queue.
call_queue: A multiprocessing.Queue that will be filled with
_CallItems
derived from _WorkItems.
NF)r:T)Zfullr;�queueZEmptyr%Zset_running_or_notify_cancelrr-r&r'r()�pending_work_itemsZwork_idsr@r*� work_itemrrr�_add_call_item_to_queue�s
rHcs�d��fdd�}��fdd�}|j}�x~t||��dd��j�D�} | sNt�t|g| �}
||
krn|j�}nr|���dk r�d�_d�_d�x&|j�D]\}}
|
j j
td ��~
q�W|j�x�j�D]}|j
�q�W|�dSt|t��r|�s�t��j|�}|j���sd|�dSnJ|dk �rd|j|jd�}
|
dk �rd|j�rT|
j j
|j�n|
j j|j�~
|��|��r�y|�s�|�dSWntk
�r�YnXd�q(WdS)
a�Manages the communication between this process and the worker
processes.
This function is run in a local thread.
Args:
executor_reference: A weakref.ref to the ProcessPoolExecutor that
owns
this thread. Used to determine if the ProcessPoolExecutor has
been
garbage collected and that this function can exit.
process: A list of the multiprocessing.Process instances used as
workers.
pending_work_items: A dict mapping work ids to _WorkItems e.g.
{5: <_WorkItem...>, 6: <_WorkItem...>, ...}
work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
call_queue: A multiprocessing.Queue that will be filled with
_CallItems
derived from _WorkItems for processing by the process workers.
result_queue: A multiprocessing.Queue of _ResultItems generated by
the
process workers.
Ncstp�dkp�jS)N)r�_shutdown_threadr)�executorrr�
shutting_down�sz/_queue_management_worker.<locals>.shutting_downcsZtdd��j�D��}xtd|�D]}�jd�q"W�j�x�j�D]}|j�qFWdS)Ncss|]}|j�VqdS)N)Zis_alive)r7�prrr� <genexpr>�szD_queue_management_worker.<locals>.shutdown_worker.<locals>.<genexpr>r)�sum�values�rangeZ
put_nowait�closer)Znb_children_alive�irL)r@� processesrr�shutdown_worker�sz1_queue_management_worker.<locals>.shutdown_workercSsg|]
}|j�qSr)�sentinel)r7rLrrrr8sz,_queue_management_worker.<locals>.<listcomp>Tz^A
process in the process pool was terminated abruptly while the future was
running or pending.)Z_readerrHrO�AssertionErrorrZrecv�_brokenrIr
r%Z
set_exception�BrokenProcessPool�clearZ terminate�
isinstance�int�poprr*r+Z
set_resultr,r)Zexecutor_referencerSrFZwork_ids_queuer@rArKrT�readerZ sentinelsZreadyZresult_itemr*rGrLr)r@rJrSr�_queue_management_worker�sf
r^cCshtrtrtt��daytjd�}Wnttfk
r:dSX|dkrHdS|dkrTdSd|att��dS)NT�SC_SEM_NSEMS_MAXr�z@system
provides too few semaphores (%d available, 256
necessary)���)�_system_limits_checked�_system_limited�NotImplementedErrorr<�sysconf�AttributeError�
ValueError)Z nsems_maxrrr�_check_system_limitsQsrhccs.x(|D]
}|j�x|r$|j�VqWqWdS)z�
Specialized implementation of itertools.chain.from_iterable.
Each item in *iterable* should be a list. This function is
careful not to keep references to yielded objects.
N)�reverser\)�iterable�elementrrr�_chain_from_iterable_of_listshs
rlc@seZdZdZdS)rXzy
Raised when a process in a ProcessPoolExecutor terminated abruptly
while a future was in the running state.
N)rrr�__doc__rrrrrXtsrXcsheZdZddd�Zdd�Zdd�Zdd �Zejjj e_ dd
d��fdd
�
Z
ddd�Zejjj e_ �ZS)�ProcessPoolExecutorNcCs�t�|dkrtj�pd|_n|dkr.td��||_tj|jt�|_d|j_ t
�|_tj�|_
d|_i|_d|_tj�|_d|_d|_i|_dS)a/Initializes a
new ProcessPoolExecutor instance.
Args:
max_workers: The maximum number of processes that can be used
to
execute the given calls. If None or not given then as many
worker processes will be created as the machine has
processors.
Nrrz"max_workers must be greater than
0TF)rhr<� cpu_count�_max_workersrg�multiprocessingZQueue�EXTRA_QUEUED_CALLS�_call_queueZ
_ignore_epiper�
_result_queuerE� _work_ids�_queue_management_thread�
_processesrI� threadingZLock�_shutdown_lockrW�_queue_count�_pending_work_items)r�max_workersrrrr|s$
zProcessPoolExecutor.__init__cCsp|jfdd�}|jdkrl|j�tjttj||�|j|j |j
|j|jfd�|_d|j_|jj
�|jt|j<dS)NcSs|jd�dS)N)r)�_rrrr�
weakref_cb�szFProcessPoolExecutor._start_queue_management_thread.<locals>.weakref_cb)�targetr'T)rtrv�_adjust_process_countrxZThreadr^�weakref�refrwr{rursZdaemon�startr )rr~rrr�_start_queue_management_thread�s
z2ProcessPoolExecutor._start_queue_management_threadcCsJxDtt|j�|j�D].}tjt|j|jfd�}|j �||j|j
<qWdS)N)rr')rP�lenrwrprqZProcessrDrsrtr��pid)rr}rLrrrr��sz)ProcessPoolExecutor._adjust_process_countcOs�|j�t|jrtd��|jr$td��tj�}t||||�}||j|j <|j
j|j �|j d7_ |jjd�|j
�|SQRXdS)NzKA child process
terminated abruptly, the process pool is not usable anymorez*cannot
schedule new futures after
shutdownr)ryrWrXrI�RuntimeErrorrZFuturer$r{rzrurrtr�)rr&r'r(�f�wrrr�submit�szProcessPoolExecutor.submitr)�timeoutr2cs:|dkrtd��t�jtt|�t|d|i�|d�}t|�S)ajReturns
an iterator equivalent to map(fn, iter).
Args:
fn: A callable that will take as many arguments as there are
passed iterables.
timeout: The maximum number of seconds to wait. If None, then
there
is no limit on the wait time.
chunksize: If greater than one, the iterables will be chopped
into
chunks of size chunksize and submitted to the process pool.
If set to one, the items in the list will be sent one at a
time.
Returns:
An iterator equivalent to: map(func, *iterables) but the calls
may
be evaluated out-of-order.
Raises:
TimeoutError: If the entire result iterator could not be
generated
before the given timeout.
Exception: If fn(*args) raises for any values.
rzchunksize must be >=
1.r2)r�)rg�super�maprr9r6rl)rr&r�r2r3�results)� __class__rrr��szProcessPoolExecutor.mapTc CsT|j�d|_WdQRX|jr8|jjd�|r8|jj�d|_d|_d|_d|_dS)NT)ryrIrvrtrrrsrw)rrrrr�shutdown�s
zProcessPoolExecutor.shutdown)N)T)
rrrrr�r�r�r�Executorrmr�r��
__classcell__rr)r�rrn{s
(
rn)-rm�
__author__�atexitr<�concurrent.futuresrrErrqrZmultiprocessing.connectionrrxr�� functoolsrr0r�WeakKeyDictionaryr rrrr� Exceptionrrr!�objectr$r)r-r6r9rDrHr^rbrcrhrlr�rXr�rn�registerrrrr�<module>,sJ
%s