Python os 模块,fork() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.fork()。
def _spawn_transform_service(self):
"""Launch transform service and get pid."""
status = 0
pid = os.fork()
if pid == 0:
try:
os.setsid()
# start transform service
launcher = oslo_service.service.launch(
self.conf,
transform_service.Transform(),
workers=1)
status = launcher.wait()
except SystemExit as exc:
traceback.print_exc()
status = exc.code
except BaseException:
try:
traceback.print_exc()
except BaseException:
print("Could not print traceback")
status = 2
os._exit(status or 0)
return pid
def serve_forever(self, poll_interval=0.1):
"""Fork the current process and wait for all children to finish."""
if self.prefork is None or self.prefork <= 1:
return super(_SporkMixIn, self).serve_forever(
poll_interval=poll_interval)
pids = []
for dummy in range(self.prefork):
pid = os.fork()
if not pid:
super(_SporkMixIn, self).serve_forever(
poll_interval=poll_interval)
os._exit(0)
else:
self.log.info("Forked worker %s", pid)
pids.append(pid)
self.pids = pids
for pid in self.pids:
_eintr_retry(os.waitpid, pid, 0)
def run_daemon(server, pidfile, daemonize=True):
"""Run the server as a daemon
:param server: cutlery (a Spoon or Spork)
:param pidfile: the file to keep the parent PID
:param daemonize: if True fork the processes into
a daemon.
:return:
"""
logger = logging.getLogger(server.server_logger)
if daemonize:
detach(pidfile=pidfile, logger=logger)
elif pidfile:
with open(pidfile, "w+") as pidf:
pidf.write("%s\n" % os.getpid())
try:
server.serve_forever()
finally:
try:
os.remove(pidfile)
except OSError:
pass
def shutdown(self):
"""Stops the serve_forever loop.
Blocks until the loop has finished. This must be called while
serve_forever() is running in another thread,or it will
deadlock.
"""
self.__shutdown_request = True
self.__is_shut_down.wait()
# The distinction between handling,getting,processing and
# finishing a request is fairly arbitrary. Remember:
#
# - handle_request() is the top-level call. It calls
# select,get_request(),verify_request() and process_request()
# - get_request() is different for stream or datagram sockets
# - process_request() is the place that may fork a new process
# or create a new thread to finish the request
# - finish_request() instantiates the request handler class;
# this constructor will handle the request all by itself
def process_request(self, request, client_address):
"""Fork a new subprocess to process the request."""
self.collect_children()
pid = os.fork()
if pid:
# Parent process
if self.active_children is None:
self.active_children = []
self.active_children.append(pid)
self.close_request(request) #close handle in parent process
return
else:
# Child process.
# This must never return,hence os._exit()!
try:
self.finish_request(request, client_address)
self.shutdown_request(request)
os._exit(0)
except:
try:
self.handle_error(request, client_address)
self.shutdown_request(request)
finally:
os._exit(1)
def get_command_line():
'''
Returns prefix of command line used for spawning a child process
'''
if process.current_process()._identity==() and is_forking(sys.argv):
raise RuntimeError('''
Attempt to start a new process before the current process
has finished its bootstrapping phase.
This probably means that you are on Windows and you have
forgotten to use the proper idiom in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce a Windows executable.''')
if getattr(sys, 'frozen', False):
return [sys.executable, '--multiprocessing-fork']
else:
prog = 'from multiprocessing.forking import main; main()'
return [_python_exe, '-c', prog, '--multiprocessing-fork']
def __init__(self, cmd, bufsize=-1):
_cleanup()
self.cmd = cmd
p2cread, p2cwrite = os.pipe()
c2pread, c2pwrite = os.pipe()
self.pid = os.fork()
if self.pid == 0:
# Child
os.dup2(p2cread, 0)
os.dup2(c2pwrite, 1)
os.dup2(c2pwrite, 2)
self._run_child(cmd)
os.close(p2cread)
self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
os.close(c2pwrite)
self.fromchild = os.fdopen(c2pread, 'r', bufsize)
def __init__(self, tasks=None, workers=None, block=True, progressDialog=None, randomReseed=True, **kwds):
"""
=============== ===================================================================
**Arguments:**
tasks list of objects to be processed (Parallelize will determine how to
distribute the tasks). If unspecified,then each worker will receive
a single task with a unique id number.
workers number of worker processes or None to use number of CPUs in the
system
progressDialog optional dict of arguments for ProgressDialog
to update while tasks are processed
randomReseed If True,each forked process will reseed its random number generator
to ensure independent results. Works with the built-in random
and numpy.random.
kwds objects to be shared by proxy with child processes (they will
appear as attributes of the tasker)
=============== ===================================================================
"""
## Generate progress dialog.
## Note that we want to avoid letting forked child processes play with progress dialogs..
self.showProgress = False
if progressDialog is not None:
self.showProgress = True
if isinstance(progressDialog, basestring):
progressDialog = {'labelText': progressDialog}
from ..widgets.ProgressDialog import ProgressDialog
self.progressDlg = ProgressDialog(**progressDialog)
if workers is None:
workers = self.suggestedWorkerCount()
if not hasattr(os, 'fork'):
workers = 1
self.workers = workers
if tasks is None:
tasks = range(workers)
self.tasks = list(tasks)
self.reseed = randomReseed
self.kwds = kwds.copy()
self.kwds['_taskStarted'] = self._taskStarted
def start_process(cmd, target, env, *args):
try:
pid = os.fork()
except OSError as e:
logger.error(repr(e) + ' while fork')
raise
if pid == 0:
_close_fds()
args = list(args)
env = dict(os.environ, **env)
args.append(env)
os.execlpe(cmd, *args)
else:
_waitpid(pid)
def shutdown(self):
"""Stops the serve_forever loop.
Blocks until the loop has finished. This must be called while
serve_forever() is running in another thread,verify_request() and process_request()
# - get_request() is different for stream or datagram sockets
# - process_request() is the place that may fork a new process
# or create a new thread to finish the request
# - finish_request() instantiates the request handler class;
# this constructor will handle the request all by itself
def process_request(self, client_address):
"""Fork a new subprocess to process the request."""
self.collect_children()
pid = os.fork()
if pid:
# Parent process
if self.active_children is None:
self.active_children = set()
self.active_children.add(pid)
self.close_request(request) #close handle in parent process
return
else:
# Child process.
# This must never return, client_address)
self.shutdown_request(request)
finally:
os._exit(1)
def __init__(self, fun, args=None, kwargs=None, nice_level=0,
child_on_start=None, child_on_exit=None):
if args is None:
args = []
if kwargs is None:
kwargs = {}
self.fun = fun
self.args = args
self.kwargs = kwargs
self.tempdir = tempdir = py.path.local.mkdtemp()
self.RETVAL = tempdir.ensure('retval')
self.STDOUT = tempdir.ensure('stdout')
self.STDERR = tempdir.ensure('stderr')
pid = os.fork()
if pid: # in parent process
self.pid = pid
else: # in child process
self.pid = None
self._child(nice_level, child_on_start, child_on_exit)
def daemonize():
# See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16
if os.fork(): # launch child and...
os._exit(0) # kill off parent
os.setsid()
if os.fork(): # launch child and...
os._exit(0) # kill off parent again.
os.umask(077)
null=os.open('/dev/null', os.O_RDWR)
for i in range(3):
try:
os.dup2(null, i)
except OSError, e:
if e.errno != errno.EBADF:
raise
os.close(null)
def closed(self):
global old
log.msg('closed %s' % self)
log.msg(repr(self.conn.channels))
if not options['nocache']: # fork into the background
if os.fork():
if old:
fd = sys.stdin.fileno()
tty.tcsetattr(fd, tty.TCSANOW, old)
if (options['command'] and options['tty']) or \
not options['notty']:
signal.signal(signal.SIGWINCH, signal.SIG_DFL)
os._exit(0)
os.setsid()
for i in range(3):
try:
os.close(i)
except OSError, e:
import errno
if e.errno != errno.EBADF:
raise
def get_command_line():
'''
Returns prefix of command line used for spawning a child process
'''
if getattr(process.current_process(), '_inheriting', False):
raise RuntimeError('''
Attempt to start a new process before the current process
has finished its bootstrapping phase.
This probably means that you are on Windows and you have
forgotten to use the proper idiom in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce a Windows executable.''')
if getattr(sys, '--multiprocessing-fork']
else:
prog = 'from multiprocessing.forking import main; main()'
opts = util._args_from_interpreter_flags()
return [_python_exe] + opts + ['-c', '--multiprocessing-fork']
def spawn(argv, master_read=_read, stdin_read=_read):
"""Create a spawned process."""
if type(argv) == type(''):
argv = (argv,)
pid, master_fd = fork()
if pid == CHILD:
os.execlp(argv[0], *argv)
try:
mode = tty.tcgetattr(STDIN_FILENO)
tty.setraw(STDIN_FILENO)
restore = 1
except tty.error: # This is the same as termios.error
restore = 0
try:
_copy(master_fd, master_read, stdin_read)
except (IOError, OSError):
if restore:
tty.tcsetattr(STDIN_FILENO, tty.TCSAFLUSH, mode)
os.close(master_fd)
def run(image_name, image_dir, container_dir, command):
container_id = str(uuid.uuid4())
pid = os.fork()
if pid == 0:
# This is the child,we'll try to do some containment here
try:
contain(command, image_name, container_id,
container_dir)
except Exception:
traceback.print_exc()
os._exit(1) # something went wrong in contain()
# This is the parent,pid contains the PID of the forked process
# wait for the forked child,fetch the exit status
_, status = os.waitpid(pid, 0)
print('{} exited with status {}'.format(pid, status))
def run(image_name, command):
container_id = str(uuid.uuid4())
pid = os.fork()
if pid == 0:
# This is the child, status))
def run(image_name, container_dir)
except Exception:
traceback.print_exc()
os._exit(1) # something went wrong in contain()
# This is the parent, status))
def init_host(self, tg, **kwargs):
LOG.info(_LI('Willing init host function.......'))
if CONF.is_all:
pid = os.fork()
if pid == 0:
child_started = False
while True:
enable_spawn = kwargs.get('enable_spawn', True)
if enable_spawn:
eventlet.spawn(self.get_all_user_all_weibo_info,
**kwargs)
child_started = True
else:
kwargs['tg'] = tg
self.get_all_user_all_weibo_info(**kwargs)
child_started = True
if not child_started:
break
os._exit(2)
LOG.debug(_LI('Started child %d' % pid))
# ?????? 14400s ????4?
def generate(self):
return textwrap.dedent("""
import pupy,os
if os.name == 'posix':
pupy.infos['daemonize']=True
if os.fork(): # launch child and...
os._exit(0) # kill off parent
os.setsid()
if os.fork(): # launch child and...
os._exit(0) # kill off parent again.
os.umask(022) # Don't allow others to write
null=os.open('/dev/null',os.O_RDWR)
for i in range(3):
try:
os.dup2(null,i)
except OSError,e:
if e.errno != errno.EBADF:
raise
os.close(null)
""")
def create_daemon(task_id, version):
pid = os.fork()
if pid == 0:
os.setsid()
sub_pid = os.fork()
if sub_pid == 0:
try:
run('supervisorctl restart corvus-agent:')
for _ in range(30):
if program_running('corvus-agent:corvus-agent-api') and \
program_running('corvus-agent:corvus-agent-task'):
break
time.sleep(1)
else:
raise TaskException('Agent updated but not running')
Task.set_status(task_id, Task.DONE)
except Exception:
Task.set_status(task_id, Task.FAILED,
reason=traceback.format_exc())
exit(0)
else:
os._exit(0)
else:
os._exit(0)
def fork_and_exec():
""" This routine forks and execs a new child process and keeps track of its PID. Before we fork,
set the current restart epoch in an env variable that processes can read if they care. """
global restart_epoch
os.environ['RESTART_EPOCH'] = str(restart_epoch)
print ("forking and execing new child process at epoch {}".format(restart_epoch))
restart_epoch += 1
child_pid = os.fork()
if child_pid == 0:
# Child process
os.execl(sys.argv[1], sys.argv[1])
else:
# Parent process
print ("forked new child process with PID={}".format(child_pid))
pid_list.append(child_pid)
def become_daemon(self, root_dir='/'):
if os.fork() != 0: # launch child and ...
os._exit(0) # kill off parent
os.setsid()
os.chdir(root_dir)
os.umask(0)
if os.fork() != 0: # fork again so we are not a session leader
os._exit(0)
sys.stdin.close()
sys.__stdin__ = sys.stdin
sys.stdout.close()
sys.stdout = sys.__stdout__ = _NullDevice()
sys.stderr.close()
sys.stderr = sys.__stderr__ = _NullDevice()
for fd in range(1024):
try:
os.close(fd)
except OSError:
pass
def _handle_child(child_socket, root_dir, in_dir, out_dir):
create_namespace()
pid = fork()
if pid != 0:
child_socket.close()
waitpid(pid, 0)
exit()
enter_namespace(root_dir, out_dir)
socket_file = child_socket.makefile('rwb')
while True:
try:
func = cloudpickle.load(socket_file)
except EOFError:
exit()
try:
ret, err = func(), None
except Exception as e:
ret, err = None, e
data = cloudpickle.dumps((ret, err))
socket_file.write(pack('I', len(data)))
socket_file.write(data)
socket_file.flush()
def run(command):
child_pid = os.fork()
if child_pid == 0:
os.execlp(command[0], *command)
else:
while True:
try:
os.waitpid(child_pid, 0)
except OSError as error:
if error.errno == errno.ECHILD:
# No child processes.
# It has exited already.
break
elif error.errno == errno.EINTR:
# Interrupted system call.
# This happens when resizing the terminal.
pass
else:
# An actual error occurred.
raise
def create_child(*args):
parentfp, childfp = socket.socketpair()
pid = os.fork()
if not pid:
mitogen.core.set_block(childfp.fileno())
os.dup2(childfp.fileno(), 0)
os.dup2(childfp.fileno(), 1)
childfp.close()
parentfp.close()
os.execvp(args[0], args)
childfp.close()
# Decouple the socket from the lifetime of the Python socket object.
fd = os.dup(parentfp.fileno())
parentfp.close()
LOG.debug('create_child() child %d fd %d,parent %d,cmd: %s',
pid, fd, os.getpid(), Argv(args))
return pid, fd
def tty_create_child(*args):
master_fd, slave_fd = os.openpty()
disable_echo(master_fd)
disable_echo(slave_fd)
pid = os.fork()
if not pid:
mitogen.core.set_block(slave_fd)
os.dup2(slave_fd, 0)
os.dup2(slave_fd, 1)
os.dup2(slave_fd, 2)
close_nonstandard_fds()
os.setsid()
os.close(os.open(os.ttyname(1), os.O_RDWR))
os.execvp(args[0], args)
os.close(slave_fd)
LOG.debug('tty_create_child() child %d fd %d, master_fd, master_fd
def _first_stage():
import os,sys,zlib
R,W=os.pipe()
r,w=os.pipe()
if os.fork():
os.dup2(0,100)
os.dup2(R,0)
os.dup2(r,101)
for f in R,r,W,w:os.close(f)
os.environ['ARGV0']=e=sys.executable
os.execv(e,['mitogen:CONTEXT_NAME'])
os.write(1,'EC0\n')
C=zlib.decompress(sys.stdin.read(input()))
os.fdopen(W,'w',0).write(C)
os.fdopen(w,0).write('%s\n'%len(C)+C)
os.write(1,'EC1\n')
sys.exit(0)
def shutdown(self):
"""Stops the serve_forever loop.
Blocks until the loop has finished. This must be called while
serve_forever() is running in another thread,verify_request() and process_request()
# - get_request() is different for stream or datagram sockets
# - process_request() is the place that may fork a new process
# or create a new thread to finish the request
# - finish_request() instantiates the request handler class;
# this constructor will handle the request all by itself
def process_request(self, client_address)
self.shutdown_request(request)
finally:
os._exit(1)
def reexec(self):
"""\
Relaunch the master and workers.
"""
if self.pidfile is not None:
self.pidfile.rename("%s.oldbin" % self.pidfile.fname)
self.reexec_pid = os.fork()
if self.reexec_pid != 0:
self.master_name = "Old Master"
return
environ = self.cfg.env_orig.copy()
fds = [l.fileno() for l in self.LISTENERS]
environ['GUNICORN_FD'] = ",".join([str(fd) for fd in fds])
os.chdir(self.START_CTX['cwd'])
self.cfg.pre_exec(self)
# exec the process using the original environnement
os.execvpe(self.START_CTX[0], self.START_CTX['args'], environ)
def shutdown(self):
"""Stops the serve_forever loop.
Blocks until the loop has finished. This must be called while
serve_forever() is running in another thread,verify_request() and process_request()
# - get_request() is different for stream or datagram sockets
# - process_request() is the place that may fork a new process
# or create a new thread to finish the request
# - finish_request() instantiates the request handler class;
# this constructor will handle the request all by itself
def process_request(self, client_address)
self.shutdown_request(request)
finally:
os._exit(1)
def start(self):
# Fork and run
if not os.fork():
self.run()
os._exit(0)
else:
return
def daemonize(self):
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError, e:
sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
os.chdir("/")
os.setsid()
os.umask(0)
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError, e:
sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
sys.stdout.flush()
sys.stderr.flush()
si = file(self.stdin, 'r')
so = file(self.stdout, 'a+')
se = file(self.stderr, 'a+', 0)
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
atexit.register(self.delpid)
pid = str(os.getpid())
file(self.pidfile,'w+').write("%s\n" % pid)
def close_open_files():
'''Closes all open files. Useful after a fork.'''
maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if maxfd == resource.RLIM_INFINITY:
maxfd = MAXFD
for fd in reversed(range(maxfd)):
try:
os.close(fd)
except OSError, e:
if e.errno == errno.EBADF:
pass # File not open
else:
raise Exception("Failed to close file descriptor %d: %s" % (fd, e))
def daemonize(double_fork=True):
'''Puts process in the background using usual UNIX best practices.'''
try:
os.umask(0o22)
except Exception as e:
raise Exception("Unable to change file creation mask: %s" % e)
os.chdir('/')
# First fork
if double_fork:
try:
pid = os.fork()
if pid > 0:
os._exit(0)
except OSError as e:
raise Exception("Error on first fork: [%d] %s" % (e.errno, e.strerr,))
os.setsid()
# Second fork
try:
pid = os.fork()
if pid > 0:
os._exit(0)
except OSError as e:
raise Exception("Error on second fork: [%d] %s" % (e.errno,))
close_open_files()
os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdin.fileno())
os.dup2(os.open(os.devnull, sys.stdout.fileno())
os.dup2(os.open(os.devnull, sys.stderr.fileno())
def handle_button_traffic(self, mac, eth_protocol):
button = self.buttons.get(mac)
if not button:
return
now = time.time()
time_delta = now - button['last_seen']
button['last_seen'] = now
if time_delta < options.main.button_timeout:
return
logging.getLogger().info('Button press detected: {} [{}].'.format(button['name'], button['mac']))
action = button['action']
env = dict(os.environ)
env.update({
'BUTTON_NAME': button['name'],
'BUTTON_MAC': button['mac'],
})
pid = None
try:
pid = os.fork()
except OSError as e:
logging.getLogger().info("Could not fork for action: [{}] {}".format(e.errno, e.strerr))
if pid == 0:
daemonize(False)
subprocess.Popen(action, shell=True, env=env).wait()
os._exit(0)
def __init__(self, process_obj):
sys.stdout.flush()
sys.stderr.flush()
self.returncode = None
self.pid = os.fork()
if self.pid == 0:
if 'random' in sys.modules:
import random
random.seed()
code = process_obj._bootstrap()
sys.stdout.flush()
sys.stderr.flush()
os._exit(code)
def is_forking(argv):
'''
Return whether commandline indicates we are forking
'''
if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
assert len(argv) == 3
return True
else:
return False
def __init__(self, capturestderr=False, bufsize=-1):
"""The parameter 'cmd' is the shell command to execute in a
sub-process. On UNIX,'cmd' may be a sequence,in which case arguments
will be passed directly to the program without shell intervention (as
with os.spawnv()). If 'cmd' is a string it will be passed to the shell
(as with os.system()). The 'capturestderr' flag,if true,specifies
that the object should capture standard error output of the child
process. The default is false. If the 'bufsize' parameter is
specified,it specifies the size of the I/O buffers to/from the child
process."""
_cleanup()
self.cmd = cmd
p2cread, c2pwrite = os.pipe()
if capturestderr:
errout, errin = os.pipe()
self.pid = os.fork()
if self.pid == 0:
# Child
os.dup2(p2cread, 1)
if capturestderr:
os.dup2(errin, bufsize)
if capturestderr:
os.close(errin)
self.childerr = os.fdopen(errout, bufsize)
else:
self.childerr = None
def __init__(self, 'fork'):
workers = 1
self.workers = workers
if tasks is None:
tasks = range(workers)
self.tasks = list(tasks)
self.reseed = randomReseed
self.kwds = kwds.copy()
self.kwds['_taskStarted'] = self._taskStarted
def __init__(self):
""" Creates a child thread,which returns. The parent
thread waits for a KeyboardInterrupt and then kills
the child thread.
"""
self.child = os.fork()
if self.child == 0:
return
else:
self.watch()
def f(path):
"one file object + forking"
with lockpath.keeping(path) as file:
if os.fork():
os.wait()
else:
lockfile(file)
def F(path):
"separate file objects + forking"
with lockpath.keeping(path):
if os.fork():
os.wait()
else:
lockpath(path)
def daemonize(stdin="/dev/null", stdout="/dev/null", stderr="/dev/null"):
'''
Daemonize current script
'''
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError, e:
sys.stderr.write ("fork #1 failed: (%d) %s\n" % (e.errno, e.strerror) )
sys.exit(1)
os.chdir("/")
os.umask(0)
os.setsid()
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError, e:
sys.stderr.write ("fork #2 failed: (%d) %s\n" % (e.errno, e.strerror) )
sys.exit(1)
stdin_par = os.path.dirname(stdin)
stdout_par = os.path.dirname(stdout)
stderr_par = os.path.dirname(stderr)
if not stdin_par:
os.path.makedirs(stdin_par)
if not stdout_par:
os.path.makedirs(stdout_par)
if not stderr_par:
os.path.makedirs(stderr_par)
si = open(stdin, 'r')
so = open(stdout, 'a+')
se = open(stderr, 0)
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())