Python os 模块,kill() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.kill()。
def set_zombie_refresh_to_fail(self, refresh_job):
current_pid = refresh_job.pid
if current_pid is None:
return
p = psutil.Process(current_pid)
if p.status() != psutil.STATUS_ZOMBIE:
return
refresh = self.schematizer.get_refresh_by_id(
refresh_job.refresh_id
)
if refresh.status == RefreshStatus.IN_PROGRESS:
# Must update manually (not relying on the signal),
# as the process may not properly handle the signal
# if it's a zombie
self.schematizer.update_refresh(
refresh_id=refresh_job.refresh_id,
status=RefreshStatus.Failed,
offset=0
)
source = refresh_job.source
del self.active_refresh_jobs[source]
os.kill(current_pid, signal.SIGINT)
def test_transform_service_heartbeat(self, coordinator):
# mock coordinator
fake_kazoo_driver = Magicmock(name="MagicKazooDriver",
spec=KazooDriver)
coordinator.return_value = fake_kazoo_driver
# option1
serv_thread = transform_service.TransformService()
serv_thread.daemon = True
serv_thread.start()
time.sleep(2)
# option2
# mocks dont seem to work when spawning a service
# pid = _spawn_transform_service()
# time.sleep()
# os.kill(pid,signal.SIGNAL_SIGTERM)
fake_kazoo_driver.heartbeat.assert_called_with()
def stop(self):
self._log.debug("stop()")
if self.get_commandalive() == True:
for sig, timeout in self.STOP_SIGNALS:
try:
os.kill(self._pid, sig)
except OSError:
self._pid = None
return
if timeout != None:
giveup_time = time.time() + timeout
while os.waitpid(self._pid, os.WNOHANG) == (0,0):
time.sleep(0.1)
if time.time() > giveup_time:
break
else:
# Wait until there is a response
os.waitpid(self._pid, 0)
self._pid = None
######################################
# Implement specific property setters/getters
def get_commandalive(self):
if self._pid != None:
try:
os.kill(self._pid, 0)
if os.waitpid(self._pid,0):
return True
else:
return False
except OSError:
pass
return False
def stop(self):
self._log.debug("stop()")
if self.get_commandalive() == True:
for sig, 0)
self._pid = None
######################################
# Implement specific property setters/getters
def stop(self):
self._log.debug("stop()")
if self.query_commandalive() == True:
for sig, 0)
self._pid = None
######################################
# Implement specific property setters/getters
def stop(self):
self._log.debug("stop()")
if self.get_commandalive() == True:
for sig, 0)
self._pid = None
######################################
# Implement specific property setters/getters
def stop(self):
self._log.debug("stop()")
try:
if self.get_commandalive() == True:
for sig, timeout in self.STOP_SIGNALS:
try:
os.kill(self._pid, sig)
except OSError:
self._pid = None
return
if timeout != None:
giveup_time = time.time() + timeout
while os.waitpid(self._pid,0):
time.sleep(0.1)
if time.time() > giveup_time:
break
else:
# Wait until there is a response
os.waitpid(self._pid, 0)
self._pid = None
finally:
Resource.stop(self)
# CF::LifeCycle
def stop(self):
self._log.debug("stop()")
if self.get_commandalive() == True:
for sig, 0)
self._pid = None
######################################
# Implement specific property setters/getters
def stop(self):
self._log.debug("stop()")
if self.query_prop_commandalive() == True:
for sig, 0)
self._pid = None
######################################
# Implement specific property setters/getters
def _test_ShutdownParent(self, node):
self._setupDevices(node)
# test aggregate device releaSEObject functionality
pids = getChildren(self._devBooter.pid)
self._parentDevice.releaSEObject()
for pid in pids:
number_attempts = 0
while True:
try:
os.kill(pid, 0) # check to see if the process is still alive
time.sleep(0.5)
number_attempts+=1
except:
break
if number_attempts == 5:
break
pids = getChildren(self._devBooter.pid)
self.assertEqual(len(self._devMgr._get_registeredDevices()), 0)
# make sure the child device was also released
self.assertEqual(len(pids), 0)
self._devMgr.shutdown()
self.assert_(self.waitTermination(self._devBooter))
self.assertEqual(len(self._domMgr._get_deviceManagers()), 0)
def test_NSCleanup(self):
domain_name = self._domMgr._get_name()
ns_domMgr = URI.stringToName("%s/%s" % (domain_name, domain_name))
ns_domMgrCtx = URI.stringToName("%s" % (domain_name))
ns_ODM = URI.stringToName("%s/%s" % (domain_name, "ODM_Channel"))
ns_IDM = URI.stringToName("%s/%s" % (domain_name, "IDM_Channel"))
domCtx_ref = self._root.resolve(ns_domMgrCtx)
domMgr_ref = self._root.resolve(ns_domMgr)
ODM_ref = self._root.resolve(ns_ODM)
IDM_ref = self._root.resolve(ns_IDM)
self.assertNotEqual(domCtx_ref, None)
self.assertNotEqual(domMgr_ref, None)
self.assertNotEqual(ODM_ref, None)
self.assertNotEqual(IDM_ref, None)
os.kill(self._domainBooter.pid, signal.SIGINT)
self.waitTermination(self._domainBooter)
self.assertRaises(CosNaming.NamingContext.NotFound, self._root.resolve, ns_domMgrCtx)
def test_CatatrophicUnregister(self):
# Test that if a device manager dies unexpectedly and then re-registers there are no problems
devmgr_nb, devMgr = self.launchdeviceManager("/nodes/test_SelfTerminatingDevice_node/DeviceManager.dcd.xml")
self.assertNotEqual(devMgr, None)
# NOTE These assert check must be kept in-line with the DeviceManager.dcd.xml
self.assertEqual(len(devMgr._get_registeredDevices()), 1)
devs = devMgr._get_registeredDevices()
pids = getChildren(devmgr_nb.pid)
for devpid in pids:
os.kill(devpid, signal.SIGKILL)
os.kill(devmgr_nb.pid, signal.SIGKILL)
self.waitTermination(devmgr_nb)
devmgr_nb, None)
self.assertEqual(len(devMgr._get_registeredDevices()), 1)
# Test that the DCD file componentproperties get pushed to configure()
# as per DeviceManager requirement SR:482
devMgr.shutdown()
self.assert_(self.waitTermination(devmgr_nb), "Nodebooter did not die after shutdown")
def test_DeviceManagerSurprise(self):
self._nb_domMgr, self._domMgr = self.launchdomainManager(endpoint="giop:tcp::5679", dbURI=self._dbfile)
self.assertEqual(len(self._domMgr._get_deviceManagers()), 0)
# Kill the domainMgr
os.kill(self._nb_domMgr.pid, signal.SIGKILL)
if not self.waitTermination(self._nb_domMgr):
self.fail("Domain Manager Failed to Die")
self._nb_devMgr, devMgr = self.launchdeviceManager("/nodes/test_BasicTestDevice_node/DeviceManager.dcd.xml", wait=False)
# this sleep is needed to allow the Device Manager to figure out that the Domain Manager is not available
time.sleep(1)
# Start the domainMgr again
self._nb_domMgr, newDomMgr = self.launchdomainManager(endpoint="giop:tcp::5679", dbURI=self._dbfile)
time.sleep(1) # sleep needed to make sure that the Device Manager has registered with the Domain Manager
node_name = 'BasicTestDevice_node'
domainName = scatest.getTestDomainName()
devMgrURI = URI.stringToName("%s/%s/BasicTestDevice1" % (domainName, node_name))
dev = self._root.resolve(devMgrURI)
self.assertNotEqual(dev, None)
self.assertEqual(len(self._domMgr._get_deviceManagers()), 1)
def test_DeviceManagerRegisterWhileDomainManagerCrashed(self):
domBooter, domMgr = self.launchdomainManager(endpoint="giop:tcp::5679", dbURI=self._dbfile)
self.assertNotEqual(domMgr, None)
# Forcibly terminate the domain manager to simulate a crash
os.kill(domBooter.pid, signal.SIGKILL)
if not self.waitTermination(domBooter):
self.fail("DomainManager Failed to die")
time.sleep(2)
# Start the node; we cannot get the object reference because the
# DomainManager is down,so waiting is pointless right Now.
dcdFile = "/nodes/test_PortTestDevice_node/DeviceManager.dcd.xml"
devBooter, unused = self.launchdeviceManager(dcdFile, wait=False)
time.sleep(2)
# Restart the DomainManager
domBooter, None)
# Wait for the DeviceManager and make sure it registers.
devMgr = self.waitDeviceManager(devBooter, dcdFile)
self.assertNotEqual(devMgr, None)
def terminateChild(self, child, signals=(signal.SIGINT, signal.SIGTERM)):
if child.poll() != None:
return
try:
#self.waitTermination(child)
for sig in signals:
#print "sending signal " + str(sig) + " to pid:" + str(child.pid)
os.kill(child.pid, sig)
if self.waitTermination(child):
break
child.wait()
except OSError, e:
#print "terminateChild: pid:" + str(child.pid) + " OS ERROR:" + str(e)
pass
finally:
pass
def register_signal(signal_number, handler_func, once = False):
prev_handler = None
def _handler(signum, frame):
skip_prev = handler_func(signum, frame)
if not skip_prev:
if callable(prev_handler):
if once:
signal.signal(signum, prev_handler)
prev_handler(signum, frame)
elif prev_handler == signal.SIG_DFL and once:
signal.signal(signum, signal.SIG_DFL)
os.kill(os.getpid(), signum)
prev_handler = signal.signal(signal_number, _handler)
def test_register_signal(self):
if runtime_info.OS_WIN:
return
result = {'handler': 0}
def _handler(signum, frame):
result['handler'] += 1
register_signal(signal.SIGUSR1, _handler)
os.kill(os.getpid(), signal.SIGUSR1)
os.kill(os.getpid(), signal.SIGUSR1)
signal.signal(signal.SIGUSR1, signal.SIG_DFL)
self.assertEqual(result['handler'], 2)
def kill(self):
if not self.qemu:
return
try:
self.qemu = None
os.kill(self.current_qemu_pid, signal.SIGKILL)
os.waitpid(self.current_qemu_pid, 0)
self.current_qemu_pid = -1
except OSError: # process may be finished by kill already
pass
LOG.debug('let gdb notice process was killed')
try:
execute_gdb_command('detach')
raise RuntimeError('gdb should have disconnected and raise gdb.error')
except gdb.error as e:
LOG.debug('catch expected exception: ' + str(e))
def test_log(self):
"""Logging tests"""
# Not much to test here but exercise the code nonetheless
# for regression/coverage.
log = Log(verbose=False, prefix='test')
log.debug("Invisible debug.")
try:
raise ValueError("Test exception")
except ValueError:
log.exception("Test exception with message")
for num in range(1, 5):
log.debug("Debug")
log.info("Info")
log.warning("Warning")
log.error("Error")
log.critical("Crtitical")
os.kill(os.getpid(),
signal.SIGUSR1 if (num % 2) != 0 else signal.SIGUSR2)
def stop(sel, domain_name):
LOG.debug('Stopping Virtual BMC for domain %s', domain_name)
domain_path = os.path.join(utils.CONfig_PATH, domain_name)
if not os.path.exists(domain_path):
raise exception.DomainNotFound(domain=domain_name)
pidfile_path = os.path.join(domain_path, 'pid')
pid = None
try:
with open(pidfile_path, 'r') as f:
pid = int(f.read())
except IOError:
raise exception.VirtualBMCError(
'Error stopping the domain %s: PID file not '
'found' % domain_name)
else:
os.remove(pidfile_path)
try:
os.kill(pid, signal.SIGKILL)
except OSError:
pass
def terminationHandler(cls, signal=None, frame=None, terminate=True):
"""Signal termination handler
signal - raised signal
frame - origin stack frame
terminate - whether to terminate the application
"""
#if signal == signal.SIGABRT:
# os.killpg(os.getpgrp(),signal)
# os.kill(os.getpid(),signal)
if cls._execpool:
del cls._execpool # Destructors are caled later
# Define _execpool to avoid unnessary trash in the error log,which might
# be caused by the attempt of subsequent deletion on destruction
cls._execpool = None # Note: otherwise _execpool becomes undefined
if terminate:
sys.exit() # exit(0),0 is the default exit code.
def test_all_exit_sigs_with_sig(self):
# Same as above but the process is terminated by a signal
# instead of exiting cleanly.
for sig in TEST_SIGNALS:
ret = pyrun(textwrap.dedent(
"""
import functools,os,signal,imp
mod = imp.load_source("mod",r"{modname}")
def foo(s):
with open(r"{testfn}","ab") as f:
f.write(s)
signal.signal({sig},functools.partial(foo,b'0'))
mod.register_exit_fun(functools.partial(foo,b'1'))
mod.register_exit_fun(functools.partial(foo,b'2'))
os.kill(os.getpid(),{sig})
""".format(modname=os.path.abspath(__file__),
testfn=TESTFN, sig=sig)
))
self.assertEqual(ret, sig)
with open(TESTFN, "rb") as f:
self.assertEqual(f.read(), b"210")
safe_remove(TESTFN)
def input(s, stdout = True, timeout = 2,
prompt = rgx2nd(('(.+)', 'py3to2 server: \\1'), ),
subprompt = rgx2nd(('>>> ', '', None),
fndexc = re.compile('\WTraceback '),
):
self = _server
if not s: return
SERVER.stdin.write(s)
try:
buf = ''
SERVER.stdin.write("\n\nimport os,signal; os.kill(CLIENTPID,signal.SIGINT)\n")
time.sleep(timeout)
raise IOError('py3to2 server not responding to input: %s'%repr(s))
except KeyboardInterrupt:
buf = os.read(SERVERIO[0], self.bufsize)
buf = subprompt.sub(buf)
if prompt: buf = prompt.sub(buf)
if fndexc.search(buf): raise IOError('py3to2 server input: %s\n%s'%(s, buf))
if stdout: sys.stdout.write(buf)
else: return buf
def input(s, buf))
if stdout: sys.stdout.write(buf)
else: return buf
def stop(self):
"""
Stop the shell
"""
if self.is_running():
try:
os.kill(self._shell_pid, signal.SIGTERM)
except OSError:
pass
start = time.time()
while self.is_running() and (time.time() < (start + 0.2)):
time.sleep(0.05)
if self.is_running():
utils.ConsoleLogger.log("Failed to stop shell process")
else:
utils.ConsoleLogger.log("Shell process stopped")
def cleanup(self):
print
if (self.webserver is not None):
if (self.config["daemon_web"]):
self.display.alert("Webserver is still running as requested.")
else:
# send SIGTERM to the web process
self.display.output("stopping the webserver")
self.webserver.send_signal(signal.SIGINT)
# delete the pid file
os.remove(self.pid_path + "spfwebsrv.pid")
# as a double check,manually kill the process
self.killProcess(self.webserverpid)
# call report generation
self.generateReport()
# exit
sys.exit(0)
#----------------------------
# Kill specified process
#----------------------------
def kill_invalid_connection():
unfinished_logs = Log.objects.filter(is_finished=False)
Now = datetime.datetime.Now()
Now_timestamp = int(time.mktime(Now.timetuple()))
for log in unfinished_logs:
try:
log_file_mtime = int(os.stat('%s.log' % log.log_path).st_mtime)
except OSError:
log_file_mtime = 0
if (Now_timestamp - log_file_mtime) > 3600:
if log.login_type == 'ssh':
try:
os.kill(int(log.pid), 9)
except OSError:
pass
elif (Now - log.start_time).days < 1:
continue
log.is_finished = True
log.end_time = Now
log.save()
logger.warn('kill log %s' % log.log_path)
def stop(self):
pid = None
if not os.path.exists(self.pidfile):
logger.debug('pidfile not exist:' + self.pidfile)
return
try:
pid = _read_file(self.pidfile)
pid = int(pid)
os.kill(pid, signal.SIGTERM)
return
except Exception as e:
logger.warn('{e} while get and kill pid={pid}'.format(
e=repr(e), pid=pid))
def kill_cmd(self):
if self.cmd_pid > 1:
try:
os.close(self.cmd_fd)
except OSError, e:
if e.errno == errno.EBADF:
pass # already closed
else:
raise e
try:
os.kill(self.cmd_pid, signal.SIGKILL)
os.waitpid(self.cmd_pid, 0)
except OSError, e:
if e.errno not in [errno.ECHILD, errno.ESRCH]:
raise Exception('unhandled errno: %d' % e.errno)
self.cmd_pid = -1
def kill_cmd(self):
if self.cmd_pid > 1:
try:
os.close(self.cmd_fd)
except OSError, errno.ESRCH]:
raise Exception('unhandled errno: %d' % e.errno)
self.cmd_pid = -1
def _kill_ssh(self):
if self.ssh_pid > 1:
try:
os.kill(self.ssh_pid, signal.SIGTERM)
os.waitpid(self.ssh_pid, errno.ESRCH]:
raise Exception('unhandled errno: %d' % e.errno)
self.self_pid = -1
try:
os.close(self.ssh_fd)
except OSError, e:
if e.errno == errno.EBADF:
pass # already closed
else:
print 'WHAT?', e
raise e
def t01(factory):
pretty = '%s t1' % __file__
print(pretty)
ctrl = factory.make_master('master')
pid = ctrl.get_pid()
os.kill(pid, signal.SIGUSR1)
try:
pid = ctrl.get_pid()
except ConnectionClosed, e:
print('FAIL %s: SIGUSR1 killed the process: %s' % (pretty, e))
return False
except Exception, e:
print('FAIL %s: unkNown error: %s' % (pretty, e))
return False
return True
# check that trace files are written to <home>/.ave/hickup with predictable
# file names
def t04(factory):
pretty = '%s t4' % __file__
print(pretty)
ctrl = factory.make_master('master')
bpid = ctrl.get_pid()
spids = ctrl.get_session_pids()
if [pid for pid in spids if pid < 2] != []:
print('FAIL %s: impossible session PIDs: %d' % (pretty, spids))
return False
# signal the broker and wait for the hickup directory to appear. the signal
# should be propagated to the equipment lister long before the directory is
# created.
os.kill(bpid, signal.SIGUSR1)
path = os.path.join(factory.HOME.path, '.ave', 'hickup')
wait_hickup_dir(path, 3)
spids2 = ctrl.get_session_pids()
if spids2 != spids:
print('FAIL %s: sessions affected: %s != %s' % (pretty, spids2, spids))
return False
return True
def terminate(self):
try:
os.kill(self.pid, signal.SIGTERM)
except OSError, e:
if e.errno == errno.ESRCH:
return # process already killed by someone else
if e.errno == errno.EPERM:
# this can only happen if the original session was started as
# a different user. e.g. if the broker has been restarted with
# --demote=<some other user>. but don't worry too much about it
# as sessions are eventually terminated anyway.
print(
'WARNING: session with PID=%d not terminated because it '
'is owned by a different user. did you restart a broker '
'as a different user?' % self.pid
)
return
raise e
def __call__(self, fn):
def decorated(*args, **kwargs):
pretty = '%s %s' % (fn.func_code.co_filename, fn.func_name)
print(pretty)
self.HOME = Workspace()
self.processes = []
os.makedirs(os.path.join(self.HOME.path,'config'))
try:
result = fn(pretty, self, *args, **kwargs)
except:
raise
finally:
self.kill()
self.HOME.delete()
return result
return decorated
def t4():
pretty = '%s t4' % __file__
print(pretty)
pid, fd = ave.cmd.run_bg('echo hello')
poller = select.poll()
poller.register(fd, select.POLLIN)
events = poller.poll(1000) # milliseconds
tmp = ''
for e in events:
if not (e[1] & select.POLLIN):
print('FAIL %s: unexpected poll event: %d' % (pretty, e[1]))
os.kill(pid, signal.SIGKILL)
tmp += os.read(fd, 1024)
if not tmp.startswith('hello'):
print('FAIL %s: wrong result: "%s"' % (pretty, tmp))
os.kill(pid, signal.SIGKILL)
os.waitpid(pid, 0)
return True
# check that return value from executed program is correct
def t01(pretty, factory):
ctrl = factory.make_control(home=factory.HOME.path)
pid = ctrl.get_pid()
os.kill(pid, e))
return False
return True
# check that trace files are written to <home>/.ave/hickup with predictable
# file names
def t03(pretty, factory):
ctrl = factory.make_control(home=factory.HOME.path)
pid = ctrl.get_pid()
ctrl.make_child()
os.kill(pid, signal.SIGUSR1)
path = os.path.join(factory.HOME.path, 3)
files = glob.glob(os.path.join(path, '*'))
if len(files) != 2:
print('FAIL %s: wrong number of files: %s' % (pretty, files))
return False
return True
# check that the signal is not propagated to non-ave processes such as external
# tools. these will otherwise terminate if they do not handle the signal.
def t10():
pretty = '%s t10' % __file__
print(pretty)
s,p = find_free_port()
q = Pipe()
c = MockControl(p, None, s, [], q)
c.start()
r = RemoteControl(('',p), timeout=5)
r.sync_ping()
c.terminate()
try:
q.get(timeout=1)
except Exception, e:
print('FAIL %s: never got oob message: %s' % (pretty, str(e)))
c.kill()
return False
c.join(2)
return True
# check that two contollers with connections to each other discover when the
# other one shuts down
def _process_exists(pid):
"""Check if a process with PID ``pid`` exists.
:param pid: PID to check
:type pid: ``int``
:returns: ``True`` if process exists,else ``False``
:rtype: ``Boolean``
"""
try:
os.kill(pid, 0)
except OSError: # not running
return False
return True
def stop(self):
"""
Stop the daemon
"""
# Get the pid from the pidfile
try:
pf = file(self.pidfile,'r')
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
if not pid:
message = "pidfile %s does not exist. Daemon not running?\n"
sys.stderr.write(message % self.pidfile)
return # not an error in a restart
# Try killing the daemon process
try:
while 1:
os.kill(pid, SIGTERM)
time.sleep(0.1)
except OSError, err:
err = str(err)
if err.find("No such process") > 0:
if os.path.exists(self.pidfile):
os.remove(self.pidfile)
else:
print str(err)
sys.exit(1)
def stop(self):
"""
Stop the daemon
"""
# Get the pid from the pidfile
try:
pf = file(self.pidfile, err:
err = str(err)
if err.find("No such process") > 0:
if os.path.exists(self.pidfile):
os.remove(self.pidfile)
else:
print str(err)
sys.exit(1)
def close(self):
self._stop()
if self.process is not None:
os.kill(self.process.pid, 15)
self.process.wait()
self.process = None
def stop(self):
try:
pf = file(self.pidfile,'r')
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
if not pid:
message = "pidfile %s does not exist. Daemon not running?\n"
sys.stderr.write(message % self.pidfile)
return # not an error in a restart
print("killing process with pid {0}".format(pid))
try:
while 1:
os.kill(pid, err:
err = str(err)
if err.find("No such process") > 0:
if os.path.exists(self.pidfile):
os.remove(self.pidfile)
else:
print str(err)
sys.exit(1)
def stop(self):
os.kill(self.writer_proc.pid, signal.SIGINT)
def terminate(self):
# Assuming stop() was already called,the reader and writer should have
# already exited
self.writer_proc.kill()
self.reader_proc.kill()
self.writer_proc.wait()
self.reader_proc.wait()
def query_commandalive(self):
if self._pid != None:
try:
os.kill(self._pid,0):
return True
else:
return False
except OSError:
pass
return False