1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# -*- coding: utf-8 -*-
"""
celery.concurrency.prefork
~~~~~~~~~~~~~~~~~~~~~~~~~~
Pool implementation using :mod:`multiprocessing`.
"""
from __future__ import absolute_import
import os
from billiard import forking_enable
from billiard.pool import RUN, CLOSE, Pool as BlockingPool
from celery import platforms
from celery import signals
from celery._state import set_default_app, _set_task_join_will_block
from celery.app import trace
from celery.concurrency.base import BasePool
from celery.five import items
from celery.utils.functional import noop
from celery.utils.log import get_logger
from .asynpool import AsynPool
__all__ = ['TaskPool', 'process_initializer', 'process_destructor']
#: List of signals to reset when a child process starts.
WORKER_SIGRESET = frozenset(['SIGTERM',
'SIGHUP',
'SIGTTIN',
'SIGTTOU',
'SIGUSR1'])
#: List of signals to ignore when a child process starts.
WORKER_SIGIGNORE = frozenset(['SIGINT'])
logger = get_logger(__name__)
warning, debug = logger.warning, logger.debug
def process_initializer(app, hostname):
"""Pool child process initializer.
This will initialize a child pool process to ensure the correct
app instance is used and things like
logging works.
"""
_set_task_join_will_block(True)
platforms.signals.reset(*WORKER_SIGRESET)
platforms.signals.ignore(*WORKER_SIGIGNORE)
platforms.set_mp_process_title('celeryd', hostname=hostname)
# This is for Windows and other platforms not supporting
# fork(). Note that init_worker makes sure it's only
# run once per process.
app.loader.init_worker()
app.loader.init_worker_process()
logfile = os.environ.get('CELERY_LOG_FILE') or None
if logfile and '%i' in logfile.lower():
# logfile path will differ so need to set up logging again.
app.log.already_setup = False
app.log.setup(int(os.environ.get('CELERY_LOG_LEVEL', 0) or 0),
logfile,
bool(os.environ.get('CELERY_LOG_REDIRECT', False)),
str(os.environ.get('CELERY_LOG_REDIRECT_LEVEL')),
hostname=hostname)
if os.environ.get('FORKED_BY_MULTIPROCESSING'):
# pool did execv after fork
trace.setup_worker_optimizations(app)
else:
app.set_current()
set_default_app(app)
app.finalize()
trace._tasks = app._tasks # enables fast_trace_task optimization.
# rebuild execution handler for all tasks.
from celery.app.trace import build_tracer
for name, task in items(app.tasks):
task.__trace__ = build_tracer(name, task, app.loader, hostname,
app=app)
from celery.worker import state as worker_state
worker_state.reset_state()
signals.worker_process_init.send(sender=None)
def process_destructor(pid, exitcode):
"""Pool child process destructor
Dispatch the :signal:`worker_process_shutdown` signal.
"""
signals.worker_process_shutdown.send(
sender=None, pid=pid, exitcode=exitcode,
)
class TaskPool(BasePool):
"""Multiprocessing Pool implementation."""
Pool = AsynPool
BlockingPool = BlockingPool
uses_semaphore = True
write_stats = None
def on_start(self):
"""Run the task pool.
Will pre-fork all workers so they're ready to accept tasks.
"""
forking_enable(self.forking_enable)
Pool = (self.BlockingPool if self.options.get('threads', True)
else self.Pool)
P = self._pool = Pool(processes=self.limit,
initializer=process_initializer,
on_process_exit=process_destructor,
synack=False,
**self.options)
# Create proxy methods
self.on_apply = P.apply_async
self.maintain_pool = P.maintain_pool
self.terminate_job = P.terminate_job
self.grow = P.grow
self.shrink = P.shrink
self.flush = getattr(P, 'flush', None) # FIXME add to billiard
def restart(self):
self._pool.restart()
self._pool.apply_async(noop)
def did_start_ok(self):
return self._pool.did_start_ok()
def register_with_event_loop(self, loop):
try:
reg = self._pool.register_with_event_loop
except AttributeError:
return
return reg(loop)
def on_stop(self):
"""Gracefully stop the pool."""
if self._pool is not None and self._pool._state in (RUN, CLOSE):
self._pool.close()
self._pool.join()
self._pool = None
def on_terminate(self):
"""Force terminate the pool."""
if self._pool is not None:
self._pool.terminate()
self._pool = None
def on_close(self):
if self._pool is not None and self._pool._state == RUN:
self._pool.close()
def _get_info(self):
try:
write_stats = self._pool.human_write_stats
except AttributeError:
def write_stats():
return 'N/A' # only supported by asynpool
return {
'max-concurrency': self.limit,
'processes': [p.pid for p in self._pool._pool],
'max-tasks-per-child': self._pool._maxtasksperchild or 'N/A',
'put-guarded-by-semaphore': self.putlocks,
'timeouts': (self._pool.soft_timeout or 0,
self._pool.timeout or 0),
'writes': write_stats()
}
@property
def num_processes(self):
return self._pool._processes