git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Random signal capture when using multiprocessing


Hi,

This is a minimal proof of concept for something that has been bugging me for a few days:

```
$ cat signal_multiprocessing_poc.py 

import random
import multiprocessing
import signal
import time

def signal_handler(signum, frame):
    raise Exception(f"Unexpected signal {signum}!")

def process_message(args):
    time.sleep(random.random() / 100.)

if __name__ == "__main__":
    signal.signal(signal.SIGTERM, signal_handler)
    n_round = 1
    while n_round < 10:
        job_list = [x for x in range(random.randint(100, 400))]
        print(f"Running round {n_round} with {len(job_list)} jobs")
        with multiprocessing.Pool(8) as p1:
            p1.map(process_message, job_list)
        n_round += 1

```

So basically I have some subprocesses that don't do anything, just sleep for a few milliseconds, and I capture SIGTERM signals. I don't expect 

This is the output:

```
$ python signal_multiprocessing_poc.py 
Running round 1 with 244 jobs
Running round 2 with 151 jobs
Running round 3 with 173 jobs
Running round 4 with 124 jobs
Running round 5 with 249 jobs
Running round 6 with 359 jobs
Process ForkPoolWorker-48:
Traceback (most recent call last):
  File "/home/j_mariamateos/miniconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/j_mariamateos/miniconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/home/j_mariamateos/miniconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/home/j_mariamateos/miniconda3/lib/python3.7/multiprocessing/queues.py", line 352, in get
    res = self._reader.recv_bytes()
  File "/home/j_mariamateos/miniconda3/lib/python3.7/multiprocessing/synchronize.py", line 98, in __exit__
    return self._semlock.__exit__(*args)
  File "signal_multiprocessing_poc.py", line 7, in signal_handler
    raise Exception(f"Unexpected signal {signum}!")
Exception: Unexpected signal 15!
Running round 7 with 185 jobs
Running round 8 with 246 jobs
Running round 9 with 217 jobs
Process ForkPoolWorker-68:
Traceback (most recent call last):
  File "/home/j_mariamateos/miniconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/j_mariamateos/miniconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/home/j_mariamateos/miniconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/home/j_mariamateos/miniconda3/lib/python3.7/multiprocessing/queues.py", line 352, in get
    res = self._reader.recv_bytes()
  File "/home/j_mariamateos/miniconda3/lib/python3.7/multiprocessing/synchronize.py", line 98, in __exit__
    return self._semlock.__exit__(*args)
  File "signal_multiprocessing_poc.py", line 7, in signal_handler
    raise Exception(f"Unexpected signal {signum}!")
Exception: Unexpected signal 15!
```

Can anyone help me understand why the random SIGTERM gets captured? I guess some of the children processes receive the signal for termination and that's why I see it, but I don't understand the randomness of it.

Of course, as a coworker just pointed out, this works (telling the children processes to ignore the signal handler):

```
import random
import multiprocessing
import signal
import time

def signal_handler(signum, frame):
    raise Exception(f"Unexpected signal {signum}!")

def init_worker():
    signal.signal(signal.SIGTERM, signal.SIG_IGN)
    signal.signal(signal.SIGINT, signal.SIG_IGN)

def process_message(args):
    time.sleep(random.random() / 100.)

if __name__ == "__main__":

    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)
    n_round = 1

    while n_round < 20:
        job_list = [x for x in range(random.randint(100, 400))]
        print(f"Running round {n_round} with {len(job_list)} jobs")
        with multiprocessing.Pool(8,init_worker) as p1:
            p1.map(process_message, job_list)
        n_round += 1
```

Thanks for your help,

-- 
Jos? Mar?a (Chema) Mateos || https://rinzewind.org