Python os 模块,listdir() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.listdir()。
def test_performance(standard=True):
folder = pyku.utils.FOLDER
pics = sorted([os.path.join(folder, pic)
for pic in os.listdir(folder)
if os.path.isfile(os.path.join(folder, pic))])
if standard:
# Standard raw pixel data
model = pyku.DigitClassifier()
else:
# Zoning data
pyku.utils.DSIZE = 28.
model = pyku.DigitClassifier(
saved_model=pyku.utils.TRAIN_DATA+'zoning_data.npz',
feature=pyku.DigitClassifier._zoning)
for pic in pics[:52]:
a = pyku.Sudoku(pic, classifier=model)
a.extract()
return None
def sftp_upload(host,port,username,password,local,remote):
sf = paramiko.Transport((host,port))
sf.connect(username = username,password = password)
sftp = paramiko.SFTPClient.from_transport(sf)
try:
if os.path.isdir(local):#?????????????
for f in os.listdir(local):#??????
sftp.put(os.path.join(local+f),os.path.join(remote+f))#????????
else:
sftp.put(local,remote)#????
except Exception,e:
print('upload exception:',e)
sf.close()
#if __name__ == '__main__':
# host = '121.69.75.194'#??
# port = 22 #??
# username = 'wac' #???
# password = '8112whz' #??
# local = '/Users/ngxin/Documents/xin/face_recognition/my_faces/'
# remote = '/home/wac/ngxin/ftp_upload/'
# local = 'F:\\sftptest\\'#?????????????????windows?????window???????????
# remote = '/opt/tianpy5/python/test/'#?????????????????linux????
#sftp_upload(host,port,username,password,local,remote)#??
#sftp_download(host,remote)#??
def build_wheel(wheel_directory, config_settings=None,
Metadata_directory=None):
config_settings = _fix_config(config_settings)
wheel_directory = os.path.abspath(wheel_directory)
sys.argv = sys.argv[:1] + ['bdist_wheel'] + \
config_settings["--global-option"]
_run_setup()
if wheel_directory != 'dist':
shutil.rmtree(wheel_directory)
shutil.copytree('dist', wheel_directory)
wheels = [f for f in os.listdir(wheel_directory)
if f.endswith('.whl')]
assert len(wheels) == 1
return wheels[0]
def add_corpus():
"""add files in corpus to database"""
db = get_db()
files = os.listdir("corpus")
basenames = set()
for filename in files:
basenames.add(filename.split('.')[0])
for basename in basenames:
basepath = os.path.join('corpus', basename)
with open(basepath + '.nmap', "r") as f:
nmap = f.read()
try:
with open(basepath + '.xml', "r") as f:
xml = f.read()
except IOError:
xml = ""
try:
with open(basepath + '.gnmap', "r") as f:
gnmap = f.read()
except IOError:
gnamp = ""
for i in range(0, 100):
rando_ip = "%d.%d.%d.%d" % (random.randrange(1,254),
random.randrange(1,254))
(ip, real_ctime) = nmap_to_ip_ctime(nmap)
for i in range(0, random.randrange(1, 10)):
rando_ctime = real_ctime - random.randrange(3600, 3600*24*365)
create_sighting(nmap, xml, gnmap, rando_ctime, rando_ip)
def __init__(self, model_nm, cell_nm, attention_type):
"""
:param model_nm:
:param cell_nm:
:param attention_type:
"""
self.model_nm = model_nm
self.cell_nm = cell_nm
self.attention_type = attention_type
self.last_ckpt = None
self.last_id = 0
self.step_save_location = 'steps.p'
self.data_save_location = 'data'
self.mapper_save_location = 'mapper.p'
self.steps_per_ckpt = None
self.num_steps_per_prediction = None
self.present_checkpoints = None
self.outfile = None
# initialize the steps if not initialized
if self.step_save_location not in os.listdir(self.get_checkpoint_location()):
pickle.dump(0,open(self.get_step_file(), 'wb'))
def __iter__(self):
"""
Read a file where each line is of the form "word1 word2 ..."
Yields lists of the form [word1,word2,...]
"""
if os.path.isdir(self.fname):
filenames = [os.path.join(self.fname,f) for f in os.listdir(self.fname)]
else:
filenames = [self.fname]
for filename in filenames:
# with io.open(filename,encoding='utf-8') as f:
with open(filename) as f:
doc = f.read()
for line in doc.split("\n"):
#if not line: continue
sent = "".join([ch for ch in line.lower() if ch not in string.punctuation]).strip().split()
# sent = [word for word in line.strip().split()]
sent = [self.begin] + sent + [self.end]
yield sent
def __iter__(self):
"""
Read a file where each line is of the form "word1 word2 ..."
Yields lists of the form [word1,...]
"""
#jfbbb
if os.path.isdir(self.fname):
filenames = [os.path.join(self.fname,f) for f in os.listdir(self.fname)]
#else:
# filenames = [self.fname]
for langpath in filenames:
with open(filename) as f:
doc = f.read()
for line in doc.split("\n"):
#if not line: continue
sent = "".join([ch for ch in line.lower() if ch not in string.punctuation]).strip().split()
# sent = [word for word in line.strip().split()]
sent = [self.begin] + sent + [self.end]
yield sent
def archive_context(filename):
# extracting the archive
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
old_wd = os.getcwd()
try:
os.chdir(tmpdir)
with get_zip_class()(filename) as archive:
archive.extractall()
# going in the directory
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
os.chdir(subdir)
log.warn('Now working in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def _delete_directory_contents(self, dirpath, filter_func):
"""Delete all files in a directory.
:param dirpath: path to directory to clear
:type dirpath: ``unicode`` or ``str``
:param filter_func function to determine whether a file shall be
deleted or not.
:type filter_func ``callable``
"""
if os.path.exists(dirpath):
for filename in os.listdir(dirpath):
if not filter_func(filename):
continue
path = os.path.join(dirpath, filename)
if os.path.isdir(path):
shutil.rmtree(path)
else:
os.unlink(path)
self.logger.debug('Deleted : %r', path)
def getAudio(freq, audio_files=None):
files = os.listdir(data_dir)
p = re.compile('.*\.[mkv|avi]')
files = [ f for f in files if p.match(f) ]
if audio_files:
files = [ f for f in files if os.path.splitext(f)[0] in audio_files]
audio_dirs = []
for f in files:
name, extension = os.path.splitext(f)
command = "ffmpeg -i {0}{1}{2} -ab 160k -ac 2 -ar {3} -vn {0}{1}_{3}.wav".format(data_dir, name, extension, freq)
audio_dirs.append(data_dir + name + '_' + str(freq) + '.wav')
subprocess.call(command, shell=True)
return audio_dirs
# Convert timestamp to seconds
def __init__(self):
Analyzer.__init__(self)
# Get config parameters
self.path = self.getParam('config.blocklistpath', None, 'No path to blocklists provided.')
self.ignoreolderthandays = self.getParam('config.ignoreolderthandays', 365)
self.utc = pytz.UTC
self.Now = dt.datetime.Now(tz=self.utc)
# Check if directory exists
if not os.path.exists(self.path):
os.mkdir(self.path, 0700)
# Downloading/updating the list is implemented with an external cronjob which git pulls the repo
# Read files in the given path and prepare file lists for ip- and netsets
files = os.listdir(self.path)
self.ipsets = []
self.netsets = []
for file in files:
if '.ipset' in file:
self.ipsets.append(file)
elif '.netset' in file:
self.netsets.append(file)
def read_preposition_senses(self):
num_senses_per_prep = []
for filename in os.listdir(self.prep_senses_dir):
if '.defs.xml' in filename:
prep_str = filename.replace('.defs.xml', '')
xml_root = ElementTree.parse("%s/%s" % (self.prep_senses_dir, filename)).getroot()
senses = []
for child_el in xml_root.getchildren():
sense_id = child_el.findtext('senseid')
if sense_id is not None:
# This will add strings like 'into-1(1)'
senses.append("%s-%s" % (prep_str, sense_id))
num_senses_per_prep.append(len(senses))
self.prep_senses[prep_str] = senses
num_preps = len(self.prep_senses)
print >>sys.stderr, "Read senses for %d prepositions." % num_preps
print >>sys.stderr, "Senses per preposition: %f" % (float(sum(num_senses_per_prep))/num_preps)
# Todo: Take a coarse-grained mapping file and implement the following function.
def database_reset(ctx):
"""
Reset's the database based on the current configuration
"""
logger.info('Resetting database ...')
ctx['NNTPSettings'].open(reset=True)
__db_prep(ctx)
db_path = join(ctx['NNTPSettings'].base_dir, 'cache', 'search')
logger.debug('Scanning %s for databases...' % db_path)
with pushd(db_path, create_if_missing=True):
for entry in listdir(db_path):
db_file = join(db_path, entry)
if not isfile(db_file):
continue
try:
unlink(db_file)
logger.info('Removed %s ...' % entry)
except:
logger.warning('@R_404_4761@ to remove %s ...' % entry)
def dirsize(src):
"""
Takes a source directory and returns the entire size of all of it's
content(s) in bytes.
The function returns None if the size can't be properly calculated.
"""
if not isdir(src):
# nothing to return
return 0
try:
with pushd(src, create_if_missing=False):
size = sum(getsize(f) for f in listdir('.') if isfile(f))
except (OSError, IOError):
return None
# Return our total size
return size
def addsitedir(sitedir, kNown_paths=None):
"""Add 'sitedir' argument to sys.path if missing and handle .pth files in
'sitedir'"""
if kNown_paths is None:
kNown_paths = _init_pathinfo()
reset = 1
else:
reset = 0
sitedir, sitedircase = makepath(sitedir)
if not sitedircase in kNown_paths:
sys.path.append(sitedir) # Add path component
try:
names = os.listdir(sitedir)
except os.error:
return
names.sort()
for name in names:
if name.endswith(os.extsep + "pth"):
addpackage(sitedir, kNown_paths)
if reset:
kNown_paths = None
return kNown_paths
def _build_one(self, req, output_dir, python_tag=None):
"""Build one wheel.
:return: The filename of the built wheel,or None if the build @R_404_4761@.
"""
tempd = tempfile.mkdtemp('pip-wheel-')
try:
if self.__build_one(req, tempd, python_tag=python_tag):
try:
wheel_name = os.listdir(tempd)[0]
wheel_path = os.path.join(output_dir, wheel_name)
shutil.move(os.path.join(tempd, wheel_name), wheel_path)
logger.info('Stored in directory: %s', output_dir)
return wheel_path
except:
pass
# Ignore return,we can't do anything else useful.
self._clean_one(req)
return None
finally:
rmtree(tempd)
def test_install():
tempdir = mkdtemp()
def get_supported():
return list(wheel.pep425tags.get_supported()) + [('py3', 'none', 'win32')]
whl = WheelFile(TESTWHEEL, context=get_supported)
assert whl.supports_current_python(get_supported)
try:
locs = {}
for key in ('purelib', 'platlib', 'scripts', 'headers', 'data'):
locs[key] = os.path.join(tempdir, key)
os.mkdir(locs[key])
whl.install(overrides=locs)
assert len(os.listdir(locs['purelib'])) == 0
assert check(locs['platlib'], 'hello.pyd')
assert check(locs['platlib'], 'hello', 'hello.py')
assert check(locs['platlib'], '__init__.py')
assert check(locs['data'], 'hello.dat')
assert check(locs['headers'], 'hello.dat')
assert check(locs['scripts'], 'hello.sh')
assert check(locs['platlib'], 'test-1.0.dist-info', 'RECORD')
finally:
shutil.rmtree(tempdir)
def maybe_move(self, spec, dist_filename, setup_base):
dst = os.path.join(self.build_directory, spec.key)
if os.path.exists(dst):
msg = (
"%r already exists in %s; build directory %s will not be kept"
)
log.warn(msg, spec.key, self.build_directory, setup_base)
return setup_base
if os.path.isdir(dist_filename):
setup_base = dist_filename
else:
if os.path.dirname(dist_filename) == setup_base:
os.unlink(dist_filename) # get it out of the tmp dir
contents = os.listdir(setup_base)
if len(contents) == 1:
dist_filename = os.path.join(setup_base, contents[0])
if os.path.isdir(dist_filename):
# if the only thing there is a directory,move it instead
setup_base = dist_filename
ensure_directory(dst)
shutil.move(setup_base, dst)
return dst
def _use_last_dir_name(self, path, prefix=''):
"""
Return name of the last dir in path or '' if no dir found.
Parameters
----------
path: str
Use dirs in this path
prefix: str
Use only dirs startings by this prefix
"""
matching_dirs = (
dir_name
for dir_name in reversed(os.listdir(path))
if os.path.isdir(os.path.join(path, dir_name)) and
dir_name.startswith(prefix)
)
return next(matching_dirs, None) or ''
def archive_context(filename):
# extracting the archive
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def get_model_filenames(model_dir):
files = os.listdir(model_dir)
Meta_files = [s for s in files if s.endswith('.Meta')]
if len(Meta_files)==0:
raise ValueError('No Meta file found in the model directory (%s)' % model_dir)
elif len(Meta_files)>1:
raise ValueError('There should not be more than one Meta file in the model directory (%s)' % model_dir)
Meta_file = Meta_files[0]
Meta_files = [s for s in files if '.ckpt' in s]
max_step = -1
for f in files:
step_str = re.match(r'(^model-[\w\- ]+.ckpt-(\d+))', f)
if step_str is not None and len(step_str.groups())>=2:
step = int(step_str.groups()[1])
if step > max_step:
max_step = step
ckpt_file = step_str.groups()[0]
return Meta_file, ckpt_file
def ftp_upload(ftp, remotefile, localfile):
#f = open(localpath,"rb")
#filename = os.path.split(localpath)[-1]
try:
#bufsize = 1024
#localpath_file = os.listdir(localpath)
#for filename in localpath_file:
#fp = open(filename,'rb')
fp = open(localfile,'rb')
ftp.storbinary('STOR ' + remotefile, fp) # ????
ftp.set_debuglevel(0)
fp.close() # ????
#ftp.quit()
print('????')
except Exception as e:
traceback.print_exc()
def archive_context(filename):
# extracting the archive
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def _cacheProtected(self) :
'''
gets all the protected packages
'''
self._protected = []
protected_conf_path='/etc/dnf/protected.d'
conf_files = listdir(protected_conf_path)
for f in conf_files :
file_path = protected_conf_path + '/' + f
with open(file_path, 'r') as content_file:
for line in content_file:
if line.strip() :
match_all = False
newest_only = False
tags =""
pkgs = self.get_packages_by_name(line.strip(), newest_only)
for pkg in pkgs:
pkg_id = pkg.pkg_id
if (not pkg_id in self._protected) :
self._protected.append(pkg_id)
# Todo it would be better to get recursive require
#for pkg_id in self._protected:
#recursive_id = self.GetAttribute(pkg_id,'requires')
def get_config_hash(file_dir, resource_mapping, exts=['conf']):
res = {}
if not os.path.isdir(file_dir):
logger.debug(
"Directory {} not found. Returning emty dict".format(file_dir))
return {}
conf_files = [conf for conf in os.listdir(file_dir)
if conf.split('.')[-1] in exts]
for conf_file in conf_files:
if conf_file in resource_mapping.keys():
drv = resource_mapping[conf_file].get(
'driver',
'fuel_external_git.drivers.openstack_config.OpenStackConfig'
)
drv_class = importutils.import_class(drv)
config = drv_class(
os.path.join(file_dir, conf_file),
resource_mapping[conf_file]['resource']
)
deep_merge(res, config.to_config_dict())
return res
def recover_allclusters(self):
logger.info("recovering all vclusters for all users...")
usersdir = self.fspath+"/global/users/"
auth_key = env.getenv('AUTH_KEY')
res = post_to_user("/master/user/groupinfo/", {'auth_key':auth_key})
#logger.info(res)
groups = json.loads(res['groups'])
quotas = {}
for group in groups:
#logger.info(group)
quotas[group['name']] = group['quotas']
for user in os.listdir(usersdir):
for cluster in self.list_clusters(user)[1]:
logger.info ("recovering cluster:%s for user:%s ..." % (cluster, user))
#res = post_to_user('/user/uid/',{'username':user,'auth_key':auth_key})
recover_info = post_to_user("/master/user/recoverinfo/", {'username':user,'auth_key':auth_key})
uid = recover_info['uid']
groupname = recover_info['groupname']
input_rate_limit = quotas[groupname]['input_rate_limit']
output_rate_limit = quotas[groupname]['output_rate_limit']
self.recover_cluster(cluster, user, uid, input_rate_limit, output_rate_limit)
logger.info("recovered all vclusters for all users")
def diff_containers(self):
[status, localcontainers] = self.list_containers()
globalpath = self.fspath+"/global/users/"
users = os.listdir(globalpath)
globalcontainers = []
for user in users:
clusters = os.listdir(globalpath+user+"/clusters")
for cluster in clusters:
clusterfile = open(globalpath+user+"/clusters/"+cluster, 'r')
clusterinfo = json.loads(clusterfile.read())
for container in clusterinfo['containers']:
if container['host'] == self.addr:
globalcontainers.append(container['containername'])
both = []
onlylocal = []
onlyglobal = []
for container in localcontainers:
if container in globalcontainers:
both.append(container)
else:
onlylocal.append(container)
for container in globalcontainers:
if container not in localcontainers:
onlyglobal.append(container)
return [both, onlylocal, onlyglobal]
def save_billing_history(vnode_name, billing_history):
clusters_dir = env.getenv("FS_PREFIX")+"/global/users/"+get_owner(vnode_name)+"/clusters/"
if not os.path.exists(clusters_dir):
return
clusters = os.listdir(clusters_dir)
vnode_cluster_id = get_cluster(vnode_name)
for cluster in clusters:
clusterpath = clusters_dir + cluster
if not os.path.isfile(clusterpath):
continue
infofile = open(clusterpath, 'r')
info = json.loads(infofile.read())
infofile.close()
if vnode_cluster_id != str(info['clusterid']):
continue
if 'billing_history' not in info:
info['billing_history'] = {}
info['billing_history'][vnode_name] = billing_history
infofile = open(clusterpath, 'w')
infofile.write(json.dumps(info))
infofile.close()
break
return
def get_billing_history(vnode_name):
clusters_dir = env.getenv("FS_PREFIX")+"/global/users/"+get_owner(vnode_name)+"/clusters/"
if os.path.exists(clusters_dir):
clusters = os.listdir(clusters_dir)
for cluster in clusters:
clusterpath = clusters_dir + cluster
if not os.path.isfile(clusterpath):
continue
infofile = open(clusterpath, 'r')
info = json.loads(infofile.read())
infofile.close()
if 'billing_history' not in info or vnode_name not in info['billing_history']:
continue
return info['billing_history'][vnode_name]
default = {}
default['cpu'] = 0
default['mem'] = 0
default['disk'] = 0
default['port'] = 0
return default
# the thread to collect data from each worker and store them in monitor_hosts and monitor_vnodes
def enumerate_backups_entities():
"""Enumerates the entities of all the available backups"""
if isdir(Backuper.backups_dir):
# Look for subdirectories
for directory in listdir(Backuper.backups_dir):
entity_file = path.join(Backuper.backups_dir, directory, 'entity.tlo')
# Ensure the entity.pickle file exists
if isfile(entity_file):
# Load and yield it
with open(entity_file, 'rb') as file:
with BinaryReader(stream=file) as reader:
try:
yield reader.tgread_object()
except TypeNotFoundError:
# Old user,scheme got updated,don't care.
pass
#endregion
#region Backup exists and deletion
def downloadFilesSave(links, fileFormat): # main function
if (links == 'EMPTY'): # if links list is empty
return ' NO LINKS FOUND !'
for link in links:
name = random.randint(0, 10000001)
if (name in os.listdir(os.getcwd())): # random name to files
name = random.randint(0, 10000001)
if (format not in ['zip', 'png', 'jpg', 'jpeg', 'tiff', 'bmp', 'svg', 'gif']):
try:
saveFile=open(str(name)+'.' + fileFormat, 'w')
saveFile.write(urllib2.urlopen(link).read())
saveFile.close()
except urllib2.URLError:
pass
else:
try:
saveFile=open(str(name)+'.' + fileFormat, 'wb')
saveFile.write(urllib2.urlopen(link).read())
saveFile.close()
except urllib2.URLError:
pass
return ' {} DOWNLOADS SUCCESSFULL YET !'.format(len(os.listdir(os.getcwd())))
def get_firefox_db(db_file):
'''Return the full path of firefox sqlite databases,platform independent'''
success = False
plat_dict = {"Windows 7" : r"C:\Users\%s\AppData\Roaming\Mozilla\Firefox\Profiles" % os.getlogin(),
"Windows XP" : r"C:\Documents and Settings\%s\Application Data\Mozilla\Firefox\Profiles" % os.getlogin(),
"Linux" : r"/home/%s/.mozilla/firefox/" % os.getlogin(),
"Darwin" : r"/Users/%s/Library/Application Support/Firefox/Profiles" % os.getlogin()}
if platform.system() == "Windows":
string = plat_dict[platform.system() + " " + platform.release()]
else:
string = plat_dict[platform.system()]
for item in os.listdir(string):
if os.path.isdir(os.path.join(string, item)) and "default" in item:
if os.path.isfile(os.path.join(string, item, db_file)):
success = True
return os.path.join(string, db_file)
if not success:
sys.exit("Couldn't find the database file in the default location! Try providing a different location using the -b option...")
def _build_one(self,we can't do anything else useful.
self._clean_one(req)
return None
finally:
rmtree(tempd)
def get_3d_data_slices(slices): # get data in Hunsfield Units
#slices = [dicom.read_file(path + '/' + s) for s in os.listdir(path)]
#slices.sort(key=lambda x: int(x.InstanceNumber)) # was x.InstanceNumber
slices.sort(key = lambda x: int(x.ImagePositionPatient[2])) # from v 8
image = np.stack([s.pixel_array for s in slices])
image = image.astype(np.int16) # ensure int16 (it may be here uint16 for some images )
image[image == -2000] = 0 #correcting cyindrical bound entrioes to 0
# Convert to Hounsfield units (HU)
# The intercept is usually -1024
for slice_number in range(len(slices)): # from v 8
intercept = slices[slice_number].RescaleIntercept
slope = slices[slice_number].RescaleSlope
if slope != 1: # added 16 Jan 2016,evening
image[slice_number] = slope * image[slice_number].astype(np.float64)
image[slice_number] = image[slice_number].astype(np.int16)
image[slice_number] += np.int16(intercept)
return np.array(image, dtype=np.int16)
def get_3d_data_hu(path): # get data in Hunsfield Units
slices = [dicom.read_file(path + '/' + s) for s in os.listdir(path)]
#slices.sort(key=lambda x: int(x.InstanceNumber)) # was x.InstanceNumber
#slices.sort(key = lambda x: int(x.ImagePositionPatient[2])) # from v8 - BUGGY
slices.sort(key = lambda x: float(x.ImagePositionPatient[2])) # from 22.02
image = np.stack([s.pixel_array for s in slices])
image = image.astype(np.int16) # ensure int16 (it may be here uint16 for some images )
image[image == -2000] = 0 #correcting cyindrical bound entrioes to 0
# Convert to Hounsfield units (HU)
# The intercept is usually -1024
for slice_number in range(len(slices)): # from v 8
intercept = slices[slice_number].RescaleIntercept
slope = slices[slice_number].RescaleSlope
if slope != 1: # added 16 Jan 2016, dtype=np.int16)
def get_all_problem_instances(problem_path):
""" Returns a list of instances for a given problem """
instances = []
instances_dir = join(DEPLOYED_ROOT, problem_path)
if os.path.isdir(instances_dir):
for name in os.listdir(instances_dir):
if name.endswith(".json"):
try:
instance = json.loads(open(join(instances_dir, name)).read())
except Exception as e:
continue
instances.append(instance)
return instances
def test_cppCompNet(self):
nodebooter, domMgr = self.launchdomainManager()
self.assertNotEqual(domMgr, None)
nodebooter, devMgr = self.launchdeviceManager("/nodes/test_GPP_node/DeviceManager.dcd.xml")
self.assertNotEqual(devMgr, None)
domMgr.installApplication("/waveforms/cpp_comp_w/cpp_comp_w.sad.xml")
self.assertEqual(len(domMgr._get_applicationFactories()), 1)
appFact = domMgr._get_applicationFactories()[0]
app = appFact.create(appFact._get_name(), [], [])
self.assertEqual(len(domMgr._get_applications()), 1)
app.start()
time.sleep(0.5)
nic_name = app._get_registeredComponents()[0].componentObject.query([CF.DataType(id='nic_name',value=any.to_any(None))])[0].value._v
nic_names = os.listdir('/sys/class/net')
self.assertTrue(nic_name in nic_names)
app.releaSEObject()
self.assertEqual(len(domMgr._get_applications()), 0)
def test_javaCompNet(self):
nodebooter, None)
domMgr.installApplication("/waveforms/java_comp_w/java_comp_w.sad.xml")
self.assertEqual(len(domMgr._get_applicationFactories()), 0)
def updateListAvailableWaveforms(self):
"""
Update available waveforms list.
"""
waveroot = os.path.join(self.root, 'waveforms')
if not os.path.exists(waveroot):
print "Cannot find SDR waveforms directory"
#return {}
return
self.waveforms = {}
for wave_dir in os.listdir(waveroot):
wave_dir_path = os.path.join(waveroot,wave_dir)
if not os.path.isdir(wave_dir_path):
continue
for wave_file in os.listdir(wave_dir_path):
if ".sad.xml" in wave_file.lower():
f_path = os.path.join('waveforms', wave_dir)
f_path = os.path.join(f_path, wave_file)
if wave_dir not in self.waveforms:
self.waveforms[wave_dir] = f_path
def __processDir(self):
"""
Looks for Makefiles in the given directory and all the sub-directories
if recursive is set to true
"""
self.__log("Processing directory %s" % self.__tgt)
# if recurse,then use walk otherwise do current directory only
if self.__recurse:
for (path, dirs, files) in os.walk(self.__tgt):
for curr_file in files:
# if the file is a Makefile added to process
if curr_file == __PATTERN__:
fname = os.path.join(path, curr_file)
self.__make_files.append(fname)
self.__log("Adding %s to list" % fname)
else:
# just care to find Makefiles in this directory
files = os.listdir(self.__tgt)
if __PATTERN__ in files:
fname = os.path.join(self.__tgt, __PATTERN__)
self.__log("Appending %s to the list" % fname)
self.__make_files.append(fname)
def storageindex(self):
#get the filelist
onlyfiles = [ f for f in listdir(self.indexdata) if isfile(join(self.indexdata,f)) ]
#read from using pandas
for f in onlyfiles:
df = pd.read_csv(self.indexdata+"/"+f)
s=f.split('.')
name = s[0][2:8]
records = json.loads(df.T.to_json()).values()
for row in records:
row['date'] = datetime.datetime.strptime(row['date'], "%Y-%m-%d")
print name
self.index[name].insert_many(records)
#storage stock pool into database
def find_templates():
"""
Load python modules from templates directory and get templates list
:return: list of tuples (pairs):
[(compiled regex,lambda regex_match: return message_data)]
"""
templates = []
templates_directory = (inspect.getsourcefile(lambda: 0).rstrip('__init__.py') +
'templates')
template_files = os.listdir(templates_directory)
for template_file in template_files:
if template_file.startswith('.') or not template_file.endswith('.py'):
continue
# Hack for dev development and disutils
try:
template_module = importlib.import_module('templates.{}'.format(
template_file.rstrip('.py')
))
except ImportError:
template_module = importlib.import_module('ross.templates.{}'.format(
template_file.rstrip('.py')
))
# Iterate throw items in template.
# If there are variable ends with 'templates',
# extend templates list with it.
for (name, content) in template_module.__dict__.items():
if name.endswith('templates'):
for (regex_text, data_func) in content:
templates.append((re.compile(regex_text, re.IGnorECASE), data_func))
return templates
def get_dataset(dataset_path='Data/Train_Data'):
# Getting all data from data path:
try:
X = np.load('Data/npy_train_data/X.npy')
Y = np.load('Data/npy_train_data/Y.npy')
except:
inputs_path = dataset_path+'/input'
images = listdir(inputs_path) # Geting images
X = []
Y = []
for img in images:
img_path = inputs_path+'/'+img
x_img = get_img(img_path).astype('float32').reshape(64, 64, 3)
x_img /= 255.
y_img = get_img(img_path.replace('input/', 'mask/mask_')).astype('float32').reshape(64, 1)
y_img /= 255.
X.append(x_img)
Y.append(y_img)
X = np.array(X)
Y = np.array(Y)
# Create dateset:
if not os.path.exists('Data/npy_train_data/'):
os.makedirs('Data/npy_train_data/')
np.save('Data/npy_train_data/X.npy', X)
np.save('Data/npy_train_data/Y.npy', Y)
X, X_test, Y, Y_test = train_test_split(X, test_size=0.1, random_state=42)
return X, Y_test
def create_model(self, train_folder):
"""
Return the training set,its labels and the trained model
:param train_folder: folder where to retrieve data
:return: (train_set,train_labels,trained_model)
"""
digits = []
labels = []
for n in range(1, 10):
folder = train_folder + str(n)
samples = [pic for pic in os.listdir(folder)
if os.path.isfile(os.path.join(folder, pic))]
for sample in samples:
image = cv2.imread(os.path.join(folder, sample))
# Expecting black on white
image = 255 - cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
_, image = cv2.threshold(image, 0, 255,
cv2.THRESH_BINARY + cv2.THRESH_OTSU)
feat = self.feature(image)
digits.append(feat)
labels.append(n)
digits = np.array(digits, np.float32)
labels = np.array(labels, np.float32)
if cv2.__version__[0] == '2':
model = cv2.KNearest()
model.train(digits, labels)
else:
model = cv2.ml.KNearest_create()
model.train(digits, cv2.ml.ROW_SAMPLE, labels)
return digits, labels, model
def execd_module_paths(execd_dir=None):
"""Generate a list of full paths to modules within execd_dir."""
if not execd_dir:
execd_dir = default_execd_dir()
if not os.path.exists(execd_dir):
return
for subpath in os.listdir(execd_dir):
module = os.path.join(execd_dir, subpath)
if os.path.isdir(module):
yield module
def all_migrations(cls):
migrations = []
files = os.listdir(os.path.join(REDBerry_ROOT, 'models', 'migrations'))
for f in files:
try:
bits = filter(lambda x: x is not None, f.split('_'))
if len(bits) > 1 and int(bits[0]):
migrations.append(f)
except:
pass
return migrations