Python os 模块,environ() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.environ()。
def test_zipfile_timestamp():
# An environment variable can be used to influence the timestamp on
# TarInfo objects inside the zip. See issue #143. TemporaryDirectory is
# not a context manager under Python 3.
with temporary_directory() as tempdir:
for filename in ('one', 'two', 'three'):
path = os.path.join(tempdir, filename)
with codecs.open(path, 'w', encoding='utf-8') as fp:
fp.write(filename + '\n')
zip_base_name = os.path.join(tempdir, 'dummy')
# The earliest date representable in TarInfos,1980-01-01
with environ('SOURCE_DATE_EPOCH', '315576060'):
zip_filename = wheel.archive.make_wheelfile_inner(
zip_base_name, tempdir)
with readable_zipfile(zip_filename) as zf:
for info in zf.infolist():
assert info.date_time[:3] == (1980, 1, 1)
def _warn_unsafe_extraction_path(path):
"""
If the default extraction path is overridden and set to an insecure
location,such as /tmp,it opens up an opportunity for an attacker to
replace an extracted file with an unauthorized payload. Warn the user
if a kNown insecure location is used.
See distribute #375 for more details.
"""
if os.name == 'nt' and not path.startswith(os.environ['windir']):
# On Windows,permissions are generally restrictive by default
# and temp directories are not writable by other users,so
# bypass the warning.
return
mode = os.stat(path).st_mode
if mode & stat.S_IWOTH or mode & stat.S_IWGRP:
msg = (@H_558_404@"%s@H_558_404@ is writable by group/others and vulnerable to attack "
@H_558_404@"when "
@H_558_404@"used with get_resource_filename. Consider a more secure "
@H_558_404@"location (set with .set_extraction_path or the "
@H_558_404@"PYTHON_EGG_CACHE environment variable)." % path)
warnings.warn(msg, UserWarning)
def _pythonpath():
items = os.environ.get('PYTHONPATH', '').split(os.pathsep)
return @H_568_502@filter(None, items)
def initialize (self, sadFile):
self.app_cnt = 0
if self.__timeout is not None:
self.domMgr.configure([CF.DataType('COMPONENT_BINDING_TIMEOUT', to_any(self.__timeout))])
try:
self.domMgr.installApplication(sadFile)
except CF.DomainManager.ApplicationAlreadyInstalled:
pass
domroot = os.path.join(os.environ[@H_558_404@"SDRROOT"], @H_558_404@"dom")
sad = ossie.parsers.sad.parse(domroot + sadFile)
app_id = sad.get_id()
for appFact in self.domMgr._get_applicationFactories():
if appFact._get_identifier() == app_id:
self.appFact = appFact
return
raise KeyError, @H_558_404@"Couldn't find app factory"
def setUp(self):
tempFile = tempfile.NamedTemporaryFile()
self.fileServerDir = tempFile.name
tempFile.close()
os.mkdir(self.fileServerDir)
os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir
privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'),
encoding='base64')
signature = privateKey.sign(six.b(json.dumps(VERSIONS, sort_keys=True)),
encoding='base64').decode()
VERSIONS['signature'] = signature
keysFilePath = os.path.join(self.fileServerDir, 'keys.gz')
with gzip.open(keysFilePath, 'wb') as keysFile:
keysFile.write(json.dumps(KEYS, sort_keys=True))
versionsFilePath = os.path.join(self.fileServerDir, 'versions.gz')
with gzip.open(versionsFilePath, 'wb') as versionsFile:
versionsFile.write(json.dumps(VERSIONS, sort_keys=True))
os.environ['WXUPDATEDEMO_TESTING'] = 'True'
from wxupdatedemo.config import CLIENT_CONfig
self.clientConfig = CLIENT_CONfig
self.clientConfig.PUBLIC_KEY = PUBLIC_KEY
def setUp(self):
tempFile = tempfile.NamedTemporaryFile()
self.fileServerDir = tempFile.name
tempFile.close()
os.mkdir(self.fileServerDir)
os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir
privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'), sort_keys=True))
os.environ['WXUPDATEDEMO_TESTING'] = 'True'
from wxupdatedemo.config import CLIENT_CONfig
self.clientConfig = CLIENT_CONfig
self.clientConfig.PUBLIC_KEY = PUBLIC_KEY
self.clientConfig.APP_NAME = APP_NAME
def _warn_unsafe_extraction_path(path):
"""
If the default extraction path is overridden and set to an insecure
location, UserWarning)
def paths_on_pythonpath(paths):
"""
Add the indicated paths to the head of the PYTHONPATH environment
variable so that subprocesses will also see the packages at
these paths.
Do this in a context that restores the value on exit.
"""
nothing = @H_568_502@object()
orig_pythonpath = os.environ.get('PYTHONPATH', nothing)
current_pythonpath = os.environ.get('PYTHONPATH', '')
try:
prefix = os.pathsep.join(paths)
to_join = @H_568_502@filter(None, [prefix, current_pythonpath])
new_path = os.pathsep.join(to_join)
if new_path:
os.environ['PYTHONPATH'] = new_path
yield
finally:
if orig_pythonpath is nothing:
os.environ.pop('PYTHONPATH', None)
else:
os.environ['PYTHONPATH'] = orig_pythonpath
def to_config(config_cls, environ=os.environ):
if config_cls._prefix:
app_prefix = (config_cls._prefix,)
else:
app_prefix = ()
def default_get(environ, Metadata, prefix, name):
ce = Metadata[CNF_KEY]
if ce.name is not None:
var = ce.name
else:
var = (@H_558_404@"_".join(app_prefix + prefix + (name,))).upper()
log.debug(@H_558_404@"looking for env var '%s@H_558_404@'." % (var,))
val = environ.get(var, ce.default)
if val is RAISE:
raise MissingEnvValueError(var)
return val
return _to_config(config_cls, default_get, environ, ())
def _to_config(config_cls, prefix):
vals = {}
for a in attr.fields(config_cls):
try:
ce = a.Metadata[CNF_KEY]
except KeyError:
continue
if ce.sub_cls is None:
get = ce.callback or default_get
val = get(environ, a.Metadata, a.name)
else:
val = _to_config(
ce.sub_cls,
prefix + ((a.name if prefix else a.name),)
)
vals[a.name] = val
return config_cls(**vals)
def get_by_cluster_id(self, cluster_id):
instance = db().query(self.model).\
@H_568_502@filter(self.model.env_id == cluster_id).first()
if instance is not None:
try:
instance.repo = Repo(os.path.join(const.REPOS_DIR,
instance.repo_name))
except exc.NoSuchPathError:
logger.debug(@H_558_404@"Repo folder does not exist. cloning repo")
self._create_key_file(instance.repo_name, instance.user_key)
if instance.user_key:
os.environ['GIT_SSH'] = \
self._get_ssh_cmd(instance.repo_name)
repo_path = os.path.join(const.REPOS_DIR, instance.repo_name)
repo = Repo.clone_from(instance.git_url, repo_path)
instance.repo = repo
return instance
def create(self, data):
if not os.path.exists(const.REPOS_DIR):
os.mkdir(const.REPOS_DIR)
repo_path = os.path.join(const.REPOS_DIR, data['repo_name'])
if os.path.exists(repo_path):
logger.debug('Repo directory exists. Removing...')
shutil.rmtree(repo_path)
user_key = data.get('user_key', '')
if user_key:
self._create_key_file(data['repo_name'], user_key)
os.environ['GIT_SSH'] = self._get_ssh_cmd(data['repo_name'])
repo = Repo.clone_from(data['git_url'], repo_path)
instance = @H_568_502@super(GitRepo, self).create(data)
instance.repo = repo
return instance
def post(self):
if (request.form['username']):
data = {@H_558_404@"user": request.form['username'], @H_558_404@"key": request.form['password']}
result = dockletRequest.unauthorizedpost('/login/', data)
ok = result and result.get('success', None)
if (ok and (ok == @H_558_404@"true")):
# set cookie:docklet-jupyter-cookie for jupyter notebook
resp = make_response(redirect(request.args.get('next',None) or '/dashboard/'))
app_key = os.environ['APP_KEY']
resp.set_cookie('docklet-jupyter-cookie', cookie_tool.generate_cookie(request.form['username'], app_key))
# set session for docklet
session['username'] = request.form['username']
session['nickname'] = result['data']['nickname']
session['description'] = result['data']['description']
session['avatar'] = '/static/avatar/'+ result['data']['avatar']
session['usergroup'] = result['data']['group']
session['status'] = result['data']['status']
session['token'] = result['data']['token']
return resp
else:
return redirect('/login/')
else:
return redirect('/login/')
def get(self):
form = external_generate.external_auth_generate_request()
result = dockletRequest.unauthorizedpost('/external_login/', form)
ok = result and result.get('success', None)
if (ok and (ok == @H_558_404@"true")):
# set cookie:docklet-jupyter-cookie for jupyter notebook
resp = make_response(redirect(request.args.get('next',None) or '/dashboard/'))
app_key = os.environ['APP_KEY']
resp.set_cookie('docklet-jupyter-cookie', cookie_tool.generate_cookie(result['data']['username'], app_key))
# set session for docklet
session['username'] = result['data']['username']
session['nickname'] = result['data']['nickname']
session['description'] = result['data']['description']
session['avatar'] = '/static/avatar/'+ result['data']['avatar']
session['usergroup'] = result['data']['group']
session['status'] = result['data']['status']
session['token'] = result['data']['token']
return resp
else:
return redirect('/login/')
def post(self):
form = external_generate.external_auth_generate_request()
result = dockletRequest.unauthorizedpost('/external_login/', app_key))
# set session for docklet
session['username'] = result['data']['username']
session['nickname'] = result['data']['nickname']
session['description'] = result['data']['description']
session['avatar'] = '/static/avatar/'+ result['data']['avatar']
session['usergroup'] = result['data']['group']
session['status'] = result['data']['status']
session['token'] = result['data']['token']
return resp
else:
return redirect('/login/')
def pytest_configure(config):
settings['SCREENSHOTS_PATH'] = config.getoption('screenshots_path')
settings['PDIFF_PATH'] = config.getoption('pdiff_path')
settings['ALLOW_SCREENSHOT_CAPTURE'] = config.getoption('allow_screenshot_capture')
if 'ALLOW_SCREENSHOT_CAPTURE' in os.environ:
settings['ALLOW_SCREENSHOT_CAPTURE'] = True
try:
from sh import compare
settings['USE_IMAGEMAGICK'] = True
except ImportError:
pass
try:
from sh import perceptualdiff
settings['USE_PERCEPTUALDIFF'] = True
except ImportError:
pass
def _warn_unsafe_extraction_path(path):
"""
If the default extraction path is overridden and set to an insecure
location, UserWarning)
def _warn_unsafe_extraction_path(path):
"""
If the default extraction path is overridden and set to an insecure
location,so
# bypass the warning.
return
mode = os.stat(path).st_mode
if mode & stat.S_IWOTH or mode & stat.S_IWGRP:
msg = (
@H_558_404@"%s@H_558_404@ is writable by group/others and vulnerable to attack "
@H_558_404@"when "
@H_558_404@"used with get_resource_filename. Consider a more secure "
@H_558_404@"location (set with .set_extraction_path or the "
@H_558_404@"PYTHON_EGG_CACHE environment variable)." % path
)
warnings.warn(msg, UserWarning)
def set_environ(env_name, value):
"""Set the environment variable 'env_name' to 'value'
Save prevIoUs value,yield,and then restore the prevIoUs value stored in
the environment variable 'env_name'.
If 'value' is None,do nothing"""
value_changed = value is not None
if value_changed:
old_value = os.environ.get(env_name)
os.environ[env_name] = value
try:
yield
finally:
if value_changed:
if old_value is None:
del os.environ[env_name]
else:
os.environ[env_name] = old_value
def paths_on_pythonpath(paths):
"""
Add the indicated paths to the head of the PYTHONPATH environment
variable so that subprocesses will also see the packages at
these paths.
Do this in a context that restores the value on exit.
"""
nothing = @H_568_502@object()
orig_pythonpath = os.environ.get('PYTHONPATH', None)
else:
os.environ['PYTHONPATH'] = orig_pythonpath
def _test_Valgrind(self, valgrind):
# Clear the device cache to prevent false positives
deviceCacheDir = os.path.join(scatest.getSdrCache(), @H_558_404@".ExecutableDevice_node", @H_558_404@"ExecutableDevice1")
shutil.rmtree(deviceCacheDir, ignore_errors=True)
os.environ['VALGRIND'] = valgrind
try:
# Checking that the node and device launch as expected
nb, devMgr = self.launchdeviceManager(@H_558_404@"/nodes/test_ExecutableDevice_node/DeviceManager.dcd.xml")
finally:
del os.environ['VALGRIND']
self.assertFalse(devMgr is None)
self.assertEquals(@H_568_502@len(devMgr._get_registeredDevices()), msg='device Failed to launch with valgrind')
children = getChildren(nb.pid)
self.assertEqual(@H_568_502@len(children), 1)
devMgr.shutdown()
# Check that a valgrind logfile exists
logfile = os.path.join(deviceCacheDir, 'valgrind.%s.log' % children[0])
self.assertTrue(os.path.exists(logfile))
def test_setSDRROOT(self):
# None type
self.assertRaises(TypeError, sb.setSDRROOT, None)
# Bad dir should not change root
sdrroot = sb.getSDRROOT()
self.assertRaises(AssertionError, 'TEMP_PATH')
self.assertEquals(sdrroot, sb.getSDRROOT())
# Good dir with no dev/dom should not change root
self.assertRaises(AssertionError, 'jackhammer')
self.assertEquals(sdrroot, sb.getSDRROOT())
# New root
sb.setSDRROOT('sdr')
self.assertEquals(sb.getSDRROOT(), 'sdr')
# Restore sdrroot
sb.setSDRROOT(os.environ['SDRROOT'])
def _prependToEnvVar(self, newVal, envVar):
path = self._getEnvVarasList(envVar)
foundValue = False
for entry in path:
# Search to determine if the new value is already in the path
try:
if os.path.samefile(entry, newVal):
# The value is already in the path
foundValue = True
break
except OSError:
# If we can't find concrete files to compare,fall back to string compare
if entry == newVal:
# The value is already in the path
foundValue = True
break
if not foundValue:
# The value does not already exist
if os.environ.has_key(envVar):
newpath = newVal+os.path.pathsep + os.getenv(envVar)+os.path.pathsep
else:
newpath = newVal+os.path.pathsep
os.putenv(envVar, newpath)
os.environ[envVar] = newpath
def principal_unit():
"""Returns the principal unit of this unit,otherwise None"""
# Juju 2.2 and above provides JUJU_PRINCIPAL_UNIT
principal_unit = os.environ.get('JUJU_PRINCIPAL_UNIT', None)
# If it's empty,then this unit is the principal
if principal_unit == '':
return os.environ['JUJU_UNIT_NAME']
elif principal_unit is not None:
return principal_unit
# For Juju 2.1 and below,let's try work out the principle unit by
# the varIoUs charms' Metadata.yaml.
for reltype in relation_types():
for rid in relation_ids(reltype):
for unit in related_units(rid):
md = _Metadata_unit(unit)
if not md:
continue
subordinate = md.pop('subordinate', None)
if not subordinate:
return unit
return None
def principal_unit():
"""Returns the principal unit of this unit, None)
if not subordinate:
return unit
return None
def _get_user_provided_overrides(modules):
"""Load user-provided config overrides.
:param modules: stack modules to lookup in user overrides yaml file.
:returns: overrides dictionary.
"""
overrides = os.path.join(os.environ['JUJU_CHARM_DIR'],
'hardening.yaml')
if os.path.exists(overrides):
log(@H_558_404@"Found user-provided config overrides file '%s@H_558_404@'" %
(overrides), level=DEBUG)
settings = yaml.safe_load(@H_568_502@open(overrides))
if settings and settings.get(modules):
log(@H_558_404@"Applying '%s@H_558_404@' overrides" % (modules), level=DEBUG)
return settings.get(modules)
log(@H_558_404@"No overrides found for '%s@H_558_404@'" % (modules), level=DEBUG)
else:
log(@H_558_404@"No hardening config overrides file '%s@H_558_404@' found in charm "
@H_558_404@"root dir" % (overrides), level=DEBUG)
return {}
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--weights', default=@H_558_404@"YOLO_small.ckpt", @H_568_502@type=@H_568_502@str)
parser.add_argument('--data_dir', default=@H_558_404@"data", @H_568_502@type=@H_568_502@str)
parser.add_argument('--threshold', default=0.2, @H_568_502@type=@H_568_502@float)
parser.add_argument('--IoU_threshold', default=0.5, @H_568_502@type=@H_568_502@float)
parser.add_argument('--gpu', default='', @H_568_502@type=@H_568_502@str)
args = parser.parse_args()
if args.gpu is not None:
cfg.GPU = args.gpu
if args.data_dir != cfg.DATA_PATH:
update_config_paths(args.data_dir, args.weights)
os.environ['CUDA_VISIBLE_DEVICES'] = cfg.GPU
yolo = YOLONet()
pascal = pascal_voc('train')
solver = Solver(yolo, pascal)
print('Start training ...')
solver.train()
print('Done training.')
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--weights', @H_568_502@type=@H_568_502@str)
parser.add_argument('--weight_dir', default='weights', @H_568_502@type=@H_568_502@str)
parser.add_argument('--gpu', @H_568_502@type=@H_568_502@str)
args = parser.parse_args()
os.environ['CUDA_VISIBLE_DEVICES'] = args.gpu
yolo = YOLONet(False)
weight_file = os.path.join(args.data_dir, args.weight_dir, args.weights)
detector = Detector(yolo, weight_file)
# detect from camera
# cap = cv2.VideoCapture(-1)
# detector.camera_detector(cap)
# detect from image file
imname = 'test/person.jpg'
detector.image_detector(imname)
def get_bcl2fastq_v2(hostname):
try:
subprocess.check_call([@H_558_404@"which", @H_558_404@"bcl2fastq"])
# Restore the LD_LIBRARY_PATH set aside by sourceme.bash/shell10x.
# required for some installations of bcl2fastq.
new_environ = @H_568_502@dict(os.environ)
new_environ['LD_LIBRARY_PATH'] = os.environ.get('_TENX_LD_LIBRARY_PATH', '')
output = subprocess.check_output([@H_558_404@"bcl2fastq", @H_558_404@"--version"], env=new_environ, stderr=subprocess.STDOUT)
match = None
for l in output.split(@H_558_404@"\n@H_558_404@"):
match = re.match(@H_558_404@"bcl2fastq v([0-9.]+)", l)
if match is not None:
return (match.groups()[0], None)
return (None, @H_558_404@"bcl2fastq version not recognized -- please check the output of bcl2fastq --version")
except subprocess.CalledProcessError:
msg = @H_558_404@"On machine: %s@H_558_404@,bcl2fastq not found on PATH." % hostname
return (None, msg)
def __init__(self, additional_compose_file=None, additional_services=None):
# To resolve docker client server version mismatch issue.
os.environ[@H_558_404@"COMPOSE_API_VERSION"] = @H_558_404@"auto"
dir_name = os.path.split(os.getcwd())[-1]
self.project = @H_558_404@"{}{}".format(
re.sub(r'[^a-z0-9]', '', dir_name.lower()),
getpass.getuser()
)
self.additional_compose_file = additional_compose_file
self.services = [@H_558_404@"zookeeper", @H_558_404@"schematizer", @H_558_404@"kafka"]
if additional_services is not None:
self.services.extend(additional_services)
# This variable is meant to capture the running/not-running state of
# the dependent testing containers when tests start running. The idea
# is,we'll only start and stop containers if they aren't already
# running. If they are running,we'll just use the ones that exist.
# It takes a while to start all the containers,so when running lots of
# tests,it's best to start them out-of-band and leave them up for the
# duration of the session.
self.containers_already_running = self._are_containers_already_running()
def __init__(self,
indent_increment,
max_help_position,
width,
short_first):
self.parser = None
self.indent_increment = indent_increment
self.help_position = self.max_help_position = max_help_position
if width is None:
try:
width = @H_568_502@int(os.environ['COLUMNS'])
except (KeyError, ValueError):
width = 80
width -= 2
self.width = width
self.current_indent = 0
self.level = 0
self.help_width = None # computed later
self.short_first = short_first
self.default_tag = @H_558_404@"%d@H_558_404@efault"
self.option_strings = {}
self._short_opt_fmt = @H_558_404@"%s@H_558_404@ %s@H_558_404@"
self._long_opt_fmt = @H_558_404@"%s@H_558_404@=%s@H_558_404@"
def _getuserbase():
env_base = os.environ.get(@H_558_404@"IRONPYTHONUSERBASE", None)
def joinuser(*args):
return os.path.expanduser(os.path.join(*args))
# what about 'os2emx','riscos' ?
if os.name == @H_558_404@"nt":
base = os.environ.get(@H_558_404@"APPDATA") or @H_558_404@"~"
return env_base if env_base else joinuser(base, @H_558_404@"Python")
if sys.platform == @H_558_404@"darwin":
framework = get_config_var(@H_558_404@"PYTHONFRAMEWORK")
if framework:
return joinuser(@H_558_404@"~", @H_558_404@"Library", framework, @H_558_404@"%d@H_558_404@.%d@H_558_404@"%(
sys.version_info[:2]))
return env_base if env_base else joinuser(@H_558_404@"~", @H_558_404@".local")
def path(klass):
return os.environ['PATH'].split(os.pathsep)
def editPipeline(args, config):
pipelinedbutils = Pipelinedbutils(config)
request = json.loads(pipelinedbutils.getJobInfo(select=[@H_558_404@"request"], where={@H_558_404@"job_id": args.jobId})[0].request)
_, tmp = mkstemp()
with @H_568_502@open(tmp, 'w') as f:
f.write(@H_558_404@"{data}".format(data=json.dumps(request, indent=4)))
if @H_558_404@"EDITOR" in os.environ.keys():
editor = os.environ[@H_558_404@"EDITOR"]
else:
editor = @H_558_404@"/usr/bin/nano"
if subprocess.call([editor, tmp]) == 0:
with @H_568_502@open(tmp, 'r') as f:
request = json.load(f)
pipelinedbutils.updateJob(args.jobId, keyName=@H_558_404@"job_id", setValues={@H_558_404@"request": json.dumps(request)})
else:
print @H_558_404@"ERROR: there was a problem editing the request"
@H_568_502@exit(-1)
def __init__(self, path=None):
self.db_path = path
if path is None:
if 'UNIT_STATE_DB' in os.environ:
self.db_path = os.environ['UNIT_STATE_DB']
else:
self.db_path = os.path.join(
os.environ.get('CHARM_DIR', ''), '.unit-state.db')
self.conn = sqlite3.connect('%s' % self.db_path)
self.cursor = self.conn.cursor()
self.revision = None
self._closed = False
self._init()
def execution_environment():
"""A convenient bundling of the current execution context"""
context = {}
context['conf'] = config()
if relation_id():
context['reltype'] = relation_type()
context['relid'] = relation_id()
context['rel'] = relation_get()
context['unit'] = local_unit()
context['rels'] = relations()
context['env'] = os.environ
return context
def relation_type():
"""The scope for the current relation hook"""
return os.environ.get('JUJU_RELATION', None)
def relation_id(relation_name=None, service_or_unit=None):
"""The relation ID for the current or a specified relation"""
if not relation_name and not service_or_unit:
return os.environ.get('JUJU_RELATION_ID', None)
elif relation_name and service_or_unit:
service_name = service_or_unit.split('/')[0]
for relid in relation_ids(relation_name):
remote_service = remote_service_name(relid)
if remote_service == service_name:
return relid
else:
raise ValueError('Must specify neither or both of relation_name and service_or_unit')
def local_unit():
"""Local unit ID"""
return os.environ['JUJU_UNIT_NAME']
def hook_name():
"""The name of the currently executing hook"""
return os.environ.get('JUJU_HOOK_NAME', os.path.basename(sys.argv[0]))
def charm_dir():
"""Return the root directory of the current charm"""
return os.environ.get('CHARM_DIR')
def action_name():
"""Get the name of the currently executing action."""
return os.environ.get('JUJU_ACTION_NAME')
def action_uuid():
"""Get the UUID of the currently executing action."""
return os.environ.get('JUJU_ACTION_UUID')
def action_tag():
"""Get the tag for the currently executing action."""
return os.environ.get('JUJU_ACTION_TAG')
def enable(soft_fail=False):
"""
Enable ufw
:param soft_fail: If set to True silently disables IPv6 support in ufw,
otherwise a UFWIPv6Error exception is raised when IP6
support is broken.
:returns: True if ufw is successfully enabled
"""
if is_enabled():
return True
if not is_ipv6_ok(soft_fail):
disable_ipv6()
output = subprocess.check_output(['ufw', 'enable'],
universal_newlines=True,
env={'LANG': 'en_US',
'PATH': os.environ['PATH']})
m = re.findall('^Firewall is active and enabled on system startup\n',
output, re.M)
hookenv.log(output, level='DEBUG')
if @H_568_502@len(m) == 0:
hookenv.log(@H_558_404@"ufw Couldn't be enabled", level='WARN')
return False
else:
hookenv.log(@H_558_404@"ufw enabled", level='INFO')
return True
def disable():
"""
disable ufw
:returns: True if ufw is successfully disabled
"""
if not is_enabled():
return True
output = subprocess.check_output(['ufw', 'disable'],
'PATH': os.environ['PATH']})
m = re.findall(r'^Firewall stopped and disabled on system startup\n', level='DEBUG')
if @H_568_502@len(m) == 0:
hookenv.log(@H_558_404@"ufw Couldn't be disabled", level='WARN')
return False
else:
hookenv.log(@H_558_404@"ufw disabled", level='INFO')
return True
def default_policy(policy='deny', direction='incoming'):
"""
Changes the default policy for traffic `direction`
:param policy: allow,deny or reject
:param direction: traffic direction,possible values: incoming,outgoing,
routed
"""
if policy not in ['allow', 'deny', 'reject']:
raise UFWError(('UnkNown policy %s,valid values: '
'allow,deny,reject') % policy)
if direction not in ['incoming', 'outgoing', 'routed']:
raise UFWError(('UnkNown direction %s,valid values: '
'incoming,routed') % direction)
output = subprocess.check_output(['ufw', 'default', policy, direction],
'PATH': os.environ['PATH']})
hookenv.log(output, level='DEBUG')
m = re.findall(@H_558_404@"^Default %s@H_558_404@ policy changed to '%s@H_558_404@'\n@H_558_404@" % (direction,
policy), re.M)
if @H_568_502@len(m) == 0:
hookenv.log(@H_558_404@"ufw Couldn't change the default policy to %s@H_558_404@ for %s@H_558_404@"
% (policy, direction), level='WARN')
return False
else:
hookenv.log(@H_558_404@"ufw default policy for %s@H_558_404@ changed to %s@H_558_404@"
% (direction, policy), level='INFO')
return True
def default_execd_dir():
return os.path.join(os.environ['CHARM_DIR'], 'exec.d')
def run():
# REVISIT(ivc): current CNI implementation provided by this package is
# experimental and its primary purpose is to enable development of other
# components (e.g. functional tests,service/LBaaSv2 support)
cni_conf = utils.CNIConfig(jsonutils.load(sys.stdin))
args = ['--config-file', cni_conf.kuryr_conf]
try:
if cni_conf.debug:
args.append('-d')
except AttributeError:
pass
config.init(args)
config.setup_logging()
# Initialize o.vo registry.
k_objects.register_locally_defined_vifs()
os_vif.initialize()
if CONF.cni_daemon.daemon_enabled:
runner = cni_api.CNIDaemonizedRunner()
else:
runner = cni_api.CNIStandaloneRunner(K8sCNIPlugin())
LOG.info(@H_558_404@"Using '%s@H_558_404@' ", runner.__class__.__name__)
def _timeout(signum, frame):
runner._write_dict(sys.stdout, {
'msg': 'timeout',
'code': k_const.CNI_TIMEOUT_CODE,
})
LOG.debug('timed out')
sys.exit(1)
signal.signal(signal.SIgalRM, _timeout)
signal.alarm(_CNI_TIMEOUT)
status = runner.run(os.environ, cni_conf, sys.stdout)
LOG.debug(@H_558_404@"Exiting with status %s@H_558_404@", status)
if status:
sys.exit(status)