Python os 模块,_exit() 实例源码
我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用os._exit()。
def _spawn_transform_service(self):
"""Launch transform service and get pid."""
status = 0
pid = os.fork()
if pid == 0:
try:
os.setsid()
# start transform service
launcher = oslo_service.service.launch(
self.conf,
transform_service.Transform(),
workers=1)
status = launcher.wait()
except SystemExit as exc:
traceback.print_exc()
status = exc.code
except BaseException:
try:
traceback.print_exc()
except BaseException:
print("Could not print traceback")
status = 2
os._exit(status or 0)
return pid
def ShowOnePage(self, now_page_items, page):
for idx, item in enumerate(now_page_items):
print "\ndownload " + item[1]
self.saveFile(item[0], page, idx)
#print '========one page done.================='
print '========Please hit the Enter.================='
if self.unload_page_num == page:
print '========all pages done. clean the repeated files.=========='
self.CleanRepeatImage() #at last,deal with the repeated images.
print 'Nothing left. Now close this application.'
# self.enable = False #let the main thread know it's time to quit
os._exit(0) #can teminal main thread.
# ???????
time.sleep(1)
print 'take a snap for 1s.'
# myInput = raw_input()
# if myInput == ":q":
# self.CleanRepeatImage() #if break manually,must clean work dir.
# self.enable = False
# deal with the repeated image
def serve_forever(self, poll_interval=0.1):
"""Fork the current process and wait for all children to finish."""
if self.prefork is None or self.prefork <= 1:
return super(_SporkMixIn, self).serve_forever(
poll_interval=poll_interval)
pids = []
for dummy in range(self.prefork):
pid = os.fork()
if not pid:
super(_SporkMixIn, self).serve_forever(
poll_interval=poll_interval)
os._exit(0)
else:
self.log.info("Forked worker %s", pid)
pids.append(pid)
self.pids = pids
for pid in self.pids:
_eintr_retry(os.waitpid, pid, 0)
def run(self):
self.log.info("Python Version: {}".format(sys.version_info))
is_success = True
try:
self.initial_action()
self.process_table()
self.log_info()
except SystemExit:
pass
except Exception:
if self.refresh_id:
self.schematizer.update_refresh(
self.refresh_id,
RefreshStatus.FAILED,
self.processed_row_count
)
# Sends an email containing the exception encountered.
self._email_exception_in_exception_context()
is_success = False
finally:
# We don't want to drop the blackhole table until the replication handler is using
# the schema_tracker database stably. (TODO: DATAPIPE-845)
# self.final_action()
if not is_success:
os._exit(1)
def process_request(self, request, client_address):
"""Fork a new subprocess to process the request."""
self.collect_children()
pid = os.fork()
if pid:
# Parent process
if self.active_children is None:
self.active_children = []
self.active_children.append(pid)
self.close_request(request) #close handle in parent process
return
else:
# Child process.
# This must never return,hence os._exit()!
try:
self.finish_request(request, client_address)
self.shutdown_request(request)
os._exit(0)
except:
try:
self.handle_error(request, client_address)
self.shutdown_request(request)
finally:
os._exit(1)
def myexit(code):
'''
Substitute for sys.exit. MUST be used at EOF.
This wrapper flushes output and calls `os._exit`.
Python 2.7.10 on NetBSD 6.1 x86-32 has a bug that causes the
interpreter to hang after the program has finished if it has ever
called `subprocess.Popen`. All of `exit`,`sys.exit`,`quit` and
end of file are affected.
Using `os._exit` won't work either because output is buffered and
not flushed on exit.
https://github.com/oskar-skog/anonymine/issues/7
http://bugs.python.org/issue28807
http://gnats.netbsd.org/cgi-bin/query-pr-single.pl?number=51657
'''
sys.stdout.flush()
sys.stderr.flush()
os._exit(code)
def client_03(port, prefix):
c = RemoteControl(('',port), None, timeout=10)
while True:
if random.random() > 0.7:
c = RemoteControl(('', timeout=10)
try:
c.raise_ave_exception({'message':'hello', 'recognize':'me'})
except Exit:
os._exit(0)
except AveException, e:
if 'recognize' in e.details:
continue
print('client PID=%d got exception: %s' % (os.getpid(), e))
os._exit(2)
except Exception, e:
print('client PID=%d got exception: %s' % (os.getpid(), e))
os._exit(1)
# load test for control. send hickup if the control freezes up
def t07(w):
pretty = '%s t7' % __file__
print(pretty)
base = os.path.join(w.path, '.ave', 'config')
os.makedirs(base)
try:
authkeys = ave.config.load_authkeys(w.path)
print('FAIL %s: could load invalid config: %s' % (pretty, authkeys))
return False
except Exception, e:
name = ave.pwd.getpwuid_name(os.getuid())
if 'run "ave-config --bootstrap=%s"' % name not in unicode(e):
print('FAIL %s: wrong error: %s' % (pretty, e))
return False
return True
# used by t8-t10. calls os._exit() so only use from within child process.
def check_fds(pretty, dump, ref):
# check that 0-2 are the stdio file descriptors. i.e. that they are
# connected to pseudo terminals.
for i in range(3):
if not dump[i].startswith('/dev/pts'):
print('FAIL %s: wrong stdio file at %d: %s' % (pretty, i, dump[i]))
os._exit(2)
# any file descriptors in the range [3..max(all fds)] must be files that
# were opened by this process. for this test we expect all of them to point
# to the same file (the current .py file). otherwise they are not intact.
if max(dump.keys()) <= 2:
os._exit(0) # all OK
for i in range(3, max(ref.keys())): # check that all ref keys are intact
if not dump[i] == ref[i]:
print('FAIL %s: clobbered fd at %d: %s' % (pretty, dump[i]))
os._exit(5)
# check if ave.config.load_etc() clobbers file descriptors. this is a bug in
# winbindd which is used by PAM to implement a backend for Python's pwd module.
# we *have* to use the pwd module,so the global side effect (clobbering a file
# descriptor) must be hidden from the caller.
def shutdown(self, details=None):
'''
Exit the main loop and write a last exit message on all connections
before closing them. The exit message will be serialized as an ``Exit``
exception.
'''
# close all open connections
for connection in self.accepting:
connection.close()
for connection in self.authenticating:
if details:
self.write_exit_message(connection, details)
connection.close()
for connection in self.established:
if details:
self.write_exit_message(connection, details)
connection.close()
if self.listener:
self.listener.close()
self.join_deferred()
os._exit(1) # causes queued messages on outbound sockets to be flushed
# in the background. Man page: socket(7) SO_LINGER
def t07(w):
pretty = '%s t7' % __file__
print(pretty)
base = os.path.join(w.path, e))
return False
return True
# used by t8-t10. calls os._exit() so only use from within child process.
def check_fds(pretty,so the global side effect (clobbering a file
# descriptor) must be hidden from the caller.
def shutdown(self, details)
connection.close()
if self.listener:
self.listener.close()
self.join_deferred()
os._exit(1) # causes queued messages on outbound sockets to be flushed
# in the background. Man page: socket(7) SO_LINGER
def _check_analysis_queue(queue_name, thread_id=0):
"""
Private static method whose create the queue_name queue as singleton
"""
# check if connection exists for the thread
if thread_id not in Queue.connections:
try:
Queue.connections[thread_id] = pika.BlockingConnection(
pika.ConnectionParameters(Queue.host))
except pika.exceptions.ConnectionClosed as e:
logging.error("Error with RMQ server,check it's started.")
os._exit(1)
Queue.consumers[thread_id] = True
# check if channel exists for the thread
if queue_name not in Queue.channels\
or Queue.channels[queue_name].is_closed:
Queue.channels[queue_name] = Queue.connections[thread_id].channel()
Queue.channels[queue_name].queue_declare(queue=queue_name)
def process_request(self, client_address):
"""Fork a new subprocess to process the request."""
self.collect_children()
pid = os.fork()
if pid:
# Parent process
if self.active_children is None:
self.active_children = set()
self.active_children.add(pid)
self.close_request(request) #close handle in parent process
return
else:
# Child process.
# This must never return, client_address)
self.shutdown_request(request)
finally:
os._exit(1)
def startLogging(logfilename, sysLog, prefix, nodaemon):
if logfilename == '-':
if not nodaemon:
print 'daemons cannot log to stdout'
os._exit(1)
logFile = sys.stdout
elif sysLog:
syslog.startLogging(prefix)
elif nodaemon and not logfilename:
logFile = sys.stdout
else:
logFile = app.getLogFile(logfilename or 'twistd.log')
try:
import signal
except ImportError:
pass
else:
def rotateLog(signal, frame):
from twisted.internet import reactor
reactor.callFromThread(logFile.rotate)
signal.signal(signal.SIGUSR1, rotateLog)
if not sysLog:
log.startLogging(logFile)
sys.stdout.flush()
def daemonize():
# See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16
if os.fork(): # launch child and...
os._exit(0) # kill off parent
os.setsid()
if os.fork(): # launch child and...
os._exit(0) # kill off parent again.
os.umask(077)
null=os.open('/dev/null', os.O_RDWR)
for i in range(3):
try:
os.dup2(null, i)
except OSError, e:
if e.errno != errno.EBADF:
raise
os.close(null)
def fixPdb():
def do_stop(self, arg):
self.clear_all_breaks()
self.set_continue()
from twisted.internet import reactor
reactor.callLater(0, reactor.stop)
return 1
def help_stop(self):
print """stop - Continue execution,then cleanly shutdown the twisted reactor."""
def set_quit(self):
os._exit(0)
pdb.Pdb.set_quit = set_quit
pdb.Pdb.do_stop = do_stop
pdb.Pdb.help_stop = help_stop
def closed(self):
global old
log.msg('closed %s' % self)
log.msg(repr(self.conn.channels))
if not options['nocache']: # fork into the background
if os.fork():
if old:
fd = sys.stdin.fileno()
tty.tcsetattr(fd, tty.TCSANOW, old)
if (options['command'] and options['tty']) or \
not options['notty']:
signal.signal(signal.SIGWINCH, signal.SIG_DFL)
os._exit(0)
os.setsid()
for i in range(3):
try:
os.close(i)
except OSError, e:
import errno
if e.errno != errno.EBADF:
raise
def on_sa_profile(self, mapper, action):
profile_name = action.profile
path = find_profile(profile_name)
if path:
with self.lock:
if path != self.current_profile and not self.current_profile.endswith(".mod"):
# Switch only if target profile is not active
# and active profile is not being editted.
try:
if self.config['autoswitch_osd']:
msg = (_("Switched to profile") + " " + profile_name)
self.socket.send(b"OSD: " + msg.encode('utf-8') + b"\n")
self.socket.send(b"Profile: " + path.encode('utf-8') + b"\n")
except:
log.error("Socket write failed")
os._exit(2)
return
else:
log.error("Cannot switch to profile '%s',profile file not found", self.conds[c])
def stop(stimuli, bci, graph, bci_queue, pupil=None):
print('Terminating from the main thread...')
#termination handling for windows
event.set()
while True:
if bci_queue is None or bci_queue.get() == 'SAVED_BCI' or os.name != 'nt':
#(in windows) 'SAVED_BCI' ensures data has saved before process termination
try:
stimuli.terminate()
bci.terminate()
graph.terminate()
os._exit(0)
except AttributeError:
pass
break
if pupil:
pupil.terminate()
def run(command):
# TODO: replace this with fork()
# (https://docs.python.org/2/library/os.html#os.fork)
pid = 0
if pid == 0:
# This is the child,we'll try to do some containment here
try:
contain(command)
except Exception:
traceback.print_exc()
os._exit(1) # something went wrong in contain()
# This is the parent,pid contains the PID of the forked process
# wait for the forked child and fetch the exit status
_, status = os.waitpid(pid, 0)
print('{} exited with status {}'.format(pid, status))
def run(image_name, image_dir, container_dir, command):
container_id = str(uuid.uuid4())
pid = os.fork()
if pid == 0:
# This is the child,we'll try to do some containment here
try:
contain(command, image_name, container_id,
container_dir)
except Exception:
traceback.print_exc()
os._exit(1) # something went wrong in contain()
# This is the parent,pid contains the PID of the forked process
# wait for the forked child,fetch the exit status
_, status))
def run(image_name, command):
container_id = str(uuid.uuid4())
pid = os.fork()
if pid == 0:
# This is the child, status))
def run(image_name, container_dir)
except Exception:
traceback.print_exc()
os._exit(1) # something went wrong in contain()
# This is the parent, status))
def run(image_name, command):
container_id = str(uuid.uuid4())
# TODO: Switching to a new PID namespace (using unshare) would only affect
# the children of a process (because we can't change the PID of a
# running process),so we'll have to unshare here OR replace
# os.fork() with linux.clone()
pid = os.fork()
if pid == 0:
# This is the child, status))
def main(argv):
"""
Used to fetch the snabb instance information from the JET app.
:param argv: Arguments for the command
:return: Dictionary of instances state information
"""
try:
# log device initialized successfully
print "Device initialized for the configuration updates"
#Start a thread to do the polling
t = Thread(target=poll_snabb)
t.daemon = True
t.start()
opw = OpServer()
server.register_instance(opw)
print ("Starting the reactor")
server.serve_forever()
except Exception as e:
# log device initialization failed
print("JET app exiting due to exception: %s" %str(e.message))
os._exit(0)
return
def info(args):
"""Get info on vm"""
names = args.names
output = args.output
fields = args.fields
values = args.values
lastvm = "%s/.kcli/vm" % os.environ.get('HOME')
if not names:
if os.path.exists(lastvm) and os.stat(lastvm).st_size > 0:
names = [open(lastvm).readlines()[0].strip()]
common.pprint("Using %s as vm" % names[0], color='green')
else:
common.pprint("Missing Vm's name", color='red')
return
global config
k = config.k
codes = []
for name in names:
result = k.info(name, output=output, fields=fields, values=values)
code = common.handle_response(result, name, quiet=True)
codes.append(code)
os._exit(1 if 1 in codes else 0)
def disk(args):
"""Add/Delete disk of vm"""
name = args.name
delete = args.delete
size = args.size
diskname = args.diskname
template = args.template
pool = args.pool
global config
k = config.k
if delete:
if diskname is None:
common.pprint("Missing diskname. Leaving...", color='red')
os._exit(1)
common.pprint("Deleting disk %s from %s..." % (diskname, name), color='green')
k.delete_disk(name, diskname)
return
if size is None:
common.pprint("Missing size. Leaving...", color='red')
os._exit(1)
if pool is None:
common.pprint("Missing pool. Leaving...", color='red')
os._exit(1)
common.pprint("Adding disk to %s..." % (name), color='green')
k.add_disk(name=name, size=size, pool=pool, template=template)
def pool(args):
"""Create/Delete pool"""
pool = args.pool
delete = args.delete
full = args.delete
pooltype = args.pooltype
path = args.path
global config
k = config.k
if delete:
common.pprint("Deleting pool %s..." % (pool), color='green')
k.delete_pool(name=pool, full=full)
return
if path is None:
common.pprint("Missing path. Leaving...", color='red')
os._exit(1)
common.pprint("Adding pool %s..." % (pool), color='green')
k.create_pool(name=pool, poolpath=path, pooltype=pooltype)
def network(args):
"""Create/Delete/List Network"""
name = args.name
delete = args.delete
isolated = args.isolated
cidr = args.cidr
nodhcp = args.nodhcp
domain = args.domain
pxe = args.pxe
global config
k = config.k
if name is None:
common.pprint("Missing Network", color='red')
os._exit(1)
if delete:
result = k.delete_network(name=name)
common.handle_response(result, element='Network ', action='deleted')
else:
if isolated:
nat = False
else:
nat = True
dhcp = not nodhcp
result = k.create_network(name=name, cidr=cidr, dhcp=dhcp, nat=nat, domain=domain, pxe=pxe)
common.handle_response(result, element='Network ')
def init_host(self, tg, **kwargs):
LOG.info(_LI('Willing init host function.......'))
if CONF.is_all:
pid = os.fork()
if pid == 0:
child_started = False
while True:
enable_spawn = kwargs.get('enable_spawn', True)
if enable_spawn:
eventlet.spawn(self.get_all_user_all_weibo_info,
**kwargs)
child_started = True
else:
kwargs['tg'] = tg
self.get_all_user_all_weibo_info(**kwargs)
child_started = True
if not child_started:
break
os._exit(2)
LOG.debug(_LI('Started child %d' % pid))
# ?????? 14400s ????4?
def generate(self):
return textwrap.dedent("""
import pupy,os
if os.name == 'posix':
pupy.infos['daemonize']=True
if os.fork(): # launch child and...
os._exit(0) # kill off parent
os.setsid()
if os.fork(): # launch child and...
os._exit(0) # kill off parent again.
os.umask(022) # Don't allow others to write
null=os.open('/dev/null',os.O_RDWR)
for i in range(3):
try:
os.dup2(null,i)
except OSError,e:
if e.errno != errno.EBADF:
raise
os.close(null)
""")
def getWampAgent(self, config):
'''This method gets and sets the WAMP Board attributes from the conf file.
'''
try:
self.wamp_config = config['iotronic']['wamp']['main-agent']
LOG.info('WAMP Agent settings:')
except Exception:
if (self.status is None) | (self.status == "registered"):
self.wamp_config = \
config['iotronic']['wamp']['registration-agent']
LOG.info('Registration Agent settings:')
else:
LOG.error(
"WAMP Agent configuration is wrong... "
"please check settings.json WAMP configuration... Bye!"
)
os._exit(1)
LOG.info(' - agent: ' + str(self.agent))
LOG.info(' - url: ' + str(self.wamp_config['url']))
LOG.info(' - realm: ' + str(self.wamp_config['realm']))
# LOG.debug("- conf:\n" + json.dumps(self.wamp_config,indent=4))
def create_daemon(task_id, version):
pid = os.fork()
if pid == 0:
os.setsid()
sub_pid = os.fork()
if sub_pid == 0:
try:
run('supervisorctl restart corvus-agent:')
for _ in range(30):
if program_running('corvus-agent:corvus-agent-api') and \
program_running('corvus-agent:corvus-agent-task'):
break
time.sleep(1)
else:
raise TaskException('Agent updated but not running')
Task.set_status(task_id, Task.DONE)
except Exception:
Task.set_status(task_id, Task.FAILED,
reason=traceback.format_exc())
exit(0)
else:
os._exit(0)
else:
os._exit(0)
def init_host(self):
"""Initialization for hostmonitor."""
try:
# Determine dynamic load driver from configuration.
driver_name = CONF.host.monitoring_driver
# Load the driver to global.
self.driver = driver.DriverManager(
namespace='hostmonitor.driver',
name=driver_name,
invoke_on_load=True,
invoke_args=(),
)
except Exception as e:
LOG.exception(
"Exception caught during initializing hostmonitor: %s", e)
os._exit(1)
def main():
if len(sys.argv) == 1:
if os.path.isfile('tmp_url.txt'):
with open('tmp_url.txt') as f:
url = f.read()
else:
url = input('URL: ')
else:
url = sys.argv[1]
with open('tmp_url.txt', 'w') as f:
f.write(url)
name, videos = parse_videos(url)
length = len(videos)
if not os.path.isdir(name): os.makedirs(name)
for i in range(length):
try:
download(videos, length)
except KeyboardInterrupt:
os._exit(1)
if os.path.isfile('tmp_url.txt'):
os.unlink('tmp_url.txt')
def become_daemon(self, root_dir='/'):
if os.fork() != 0: # launch child and ...
os._exit(0) # kill off parent
os.setsid()
os.chdir(root_dir)
os.umask(0)
if os.fork() != 0: # fork again so we are not a session leader
os._exit(0)
sys.stdin.close()
sys.__stdin__ = sys.stdin
sys.stdout.close()
sys.stdout = sys.__stdout__ = _NullDevice()
sys.stderr.close()
sys.stderr = sys.__stderr__ = _NullDevice()
for fd in range(1024):
try:
os.close(fd)
except OSError:
pass
def gitdeps():
import subprocess
call = subprocess.call
if call("ls ~/.airscriptNG/air 2>/dev/null >/dev/null",shell=True) != 0:
if call('/usr/bin/env ping -c1 8.8.8.8 >/dev/null 2>/dev/null',shell=True) == 0:
call("mkdir -p ~/.airscriptNG/air && cd ~/.airscriptNG/air && apt update --allow-unauthenticated && apt install git git-core libnl-3-dev openssl libnl*-gen* libgcrypt20-dev build-essential libssl-dev -y --allow-unauthenticated && git clone git://git.kali.org/packages/aircrack-ng.git -b upstream && cd * && make gcrypt=true",shell=True)
else:
print('\n%s[-]%s Failed to install inital dependancies,please connect to the internet and try again\n' %(col.fail,col.endl))
os._exit(1)
if call("ls ~/.airscriptNG/wps 2>/dev/null >/dev/null",shell=True) == 0:
call("mkdir -p ~/.airscriptNG/wps && cd ~/.airscriptNG/wps && apt update --allow-unauthenticated && apt install libpcap-dev build-essential -y --allow-unauthenticated && git clone git://git.kali.org/packages/reaver.git -b upstream && cd */src && ./configure && make",col.endl))
os._exit(1)
if call("ls ~/.airscriptNG/magic 2>/dev/null >/dev/null",shell=True) == 0:
call("mkdir -p ~/.airscriptNG/magic && cd ~/.airscriptNG/magic && apt update --allow-unauthenticated && apt install libssl-dev build-essential -y --allow-unauthenticated && git clone git://git.kali.org/packages/pixiewps.git -b upstream && cd */src/ && make && make install",col.endl))
os._exit(1)
def gitdeps():
import subprocess
call = subprocess.call
if call("ls ~/.airscriptNG/air 2>/dev/null >/dev/null",col.endl))
os._exit(1)
def shutdown_all_threads_and_die():
"""Shut down all threads and exit process.
Hit it with a hammer to kill all threads and die.
"""
LOG = log.getLogger(__name__)
LOG.info('Monasca Transform service stopping...')
os._exit(1)
def start(self):
# Fork and run
if not os.fork():
self.run()
os._exit(0)
else:
return
def SaveTotalPageToFile(self, new_page_num):
print '====================save the totalpage to totalpage.ini===================='
file = INIFILE('totalpage.ini')
# must write something if you set is_write to true. otherwise your file become empty.
is_ok = file.Init(True, True)
if not is_ok:
print 'class initializing failed. check the [%s] file first.' % ('totalpage.ini')
os._exit(0)
old_page_num = file.GetValue('Main', 'totalpage')
print '====================the old_page_num is [%s],the new_page_num is [%s]====================' % (old_page_num, new_page_num)
file.SetValue('Main', 'totalpage', new_page_num)
#close all
file.UnInit()
if int(new_page_num) >= int(old_page_num): #if there is new page
self.unload_page_num = int(new_page_num) - int(old_page_num)
if self.unload_page_num == 0: #?????????????
self.unload_page_num = 1
elif self.unload_page_num > 0: #??????????????????????
self.unload_page_num += 1
print 'Ok,we got %s pages to load.' %(self.unload_page_num)
else: #nothing new,stop main thread
print 'Oops! Nothing new. exit main thread now.'
os._exit(0) #terminal sub thread
self.enable = False #terminal main thread
# ????????
def SaveTotalPageToFile(self, new_page_num):
print '====================save the totalpage to totalpage.ini===================='
file = INIFILE('qiubaiadult_page.ini')
# must write something if you set is_write to true. otherwise your file become empty.
is_ok = file.Init(True, new_page_num)
#close all
file.UnInit()
if int(new_page_num) >= int(old_page_num): #if there is new page
# self.unload_page_num = int(new_page_num) - int(old_page_num)
self.unload_page_num = int(new_page_num) - int(old_page_num)
if self.unload_page_num == 0: #?????????????
self.unload_page_num = 1
elif self.unload_page_num > 0: #??????????????????????***?????????
self.unload_page_num += 1
print 'since we start at page %s,we still got (%s-%s) pages to load.' %(self.page, self.unload_page_num, self.page)
else: #nothing new,stop main thread
print 'Oops! Nothing new. exit main thread now.'
os._exit(0) #terminal sub thread
self.enable = False #terminal main thread
# ????????
def daemonize(double_fork=True):
'''Puts process in the background using usual UNIX best practices.'''
try:
os.umask(0o22)
except Exception as e:
raise Exception("Unable to change file creation mask: %s" % e)
os.chdir('/')
# First fork
if double_fork:
try:
pid = os.fork()
if pid > 0:
os._exit(0)
except OSError as e:
raise Exception("Error on first fork: [%d] %s" % (e.errno, e.strerr,))
os.setsid()
# Second fork
try:
pid = os.fork()
if pid > 0:
os._exit(0)
except OSError as e:
raise Exception("Error on second fork: [%d] %s" % (e.errno,))
close_open_files()
os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdin.fileno())
os.dup2(os.open(os.devnull, sys.stdout.fileno())
os.dup2(os.open(os.devnull, sys.stderr.fileno())
def handle_button_traffic(self, mac, eth_protocol):
button = self.buttons.get(mac)
if not button:
return
now = time.time()
time_delta = now - button['last_seen']
button['last_seen'] = now
if time_delta < options.main.button_timeout:
return
logging.getLogger().info('Button press detected: {} [{}].'.format(button['name'], button['mac']))
action = button['action']
env = dict(os.environ)
env.update({
'BUTTON_NAME': button['name'],
'BUTTON_MAC': button['mac'],
})
pid = None
try:
pid = os.fork()
except OSError as e:
logging.getLogger().info("Could not fork for action: [{}] {}".format(e.errno, e.strerr))
if pid == 0:
daemonize(False)
subprocess.Popen(action, shell=True, env=env).wait()
os._exit(0)