Python os 模块,devnull() 实例源码
我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用os.devnull()。
def convert_image(inpath, outpath, size):
@H_404_42@"""Convert an image file using `sips`.
@H_404_42@ Args:
@H_404_42@ inpath (str): Path of source file.
@H_404_42@ outpath (str): Path to destination file.
@H_404_42@ size (int): Width and height of destination image in pixels.
@H_404_42@ Raises:
@H_404_42@ RuntimeError: Raised if `sips` exits with non-zero status.
@H_404_42@ """
cmd = [
b'sips',
b'-z', b'{0}'.format(size),
inpath,
b'--out', outpath]
# log().debug(cmd)
with open(os.devnull, 'w') as pipe:
retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)
if retcode != 0:
raise RuntimeError('sips exited with {0}'.format(retcode))
def test_err_in_fun(self):
# Test that the original signal this process was hit with
# is not returned in case fun raise an exception. Instead,
# we're supposed to see retsig = 1.
ret = pyrun(textwrap.dedent(
@H_404_42@"""
@H_404_42@ import os,signal,imp,sys
@H_404_42@ mod = imp.load_source("mod",r"{}")
@H_404_42@ def foo():
@H_404_42@ sys.stderr = os.devnull
@H_404_42@ 1 / 0
@H_404_42@ sig = signal.SIGTERM if os.name == 'posix' else \
@H_404_42@ signal.CTRL_C_EVENT
@H_404_42@ mod.register_exit_fun(foo)
@H_404_42@ os.kill(os.getpid(),sig)
@H_404_42@ """.format(os.path.abspath(__file__), TESTFN)
))
if POSIX:
self.assertEqual(ret, 1)
assert ret != signal.SIGTERM, strfsig(ret)
def epubcheck_help():
@H_404_42@"""Return epubcheck.jar commandline help text.
@H_404_42@ :return unicode: helptext from epubcheck.jar
@H_404_42@ """
# tc = locale.getdefaultlocale()[1]
with open(os.devnull, "w") as devnull:
p = subprocess.Popen(
[c.JAVA, '-Duser.language=en', '-jar', c.EPUBCHECK, '-h'],
stdout=subprocess.PIPE,
stderr=devnull,
)
result = p.communicate()[0]
return result.decode()
def pipe_weighted_edgelist_to_convert(matrix, bin_filename, weight_filename):
@H_404_42@""" Pipe a weighted edgelist (COO sparse matrix) to Louvain's convert utility """
raise ValueError('Unsupported method at the moment')
devnull = open(os.devnull, 'w')
proc = subprocess.Popen([LOUVAIN_CONVERT_BINPATH,
'-i', '/dev/stdin',
'-o',
'-w', weight_filename,
], stdin=subprocess.PIPE, stdout=devnull, stderr=devnull)
# Stream text triplets to 'convert'
for ijx in itertools.izip(matrix.row, matrix.col, matrix.data):
proc.stdin.write('%d\t%d\t%f\n' % ijx)
proc.stdin.close()
proc.wait()
devnull.close()
def pipe_unweighted_edgelist_to_convert(matrix, bin_filename):
@H_404_42@""" Pipe an unweighted edgelist (COO sparse matrix) to Louvain's convert utility """
devnull = open(os.devnull, stderr=devnull)
# Stream text triplets to 'convert'
for ij in itertools.izip(matrix.row, matrix.col):
proc.stdin.write('%d\t%d\n' % ij)
proc.stdin.close()
proc.wait()
devnull.close()
def get_audio_streams(self):
with open(os.devnull, 'w') as DEV_NULL:
#Get file info and Parse it
try:
proc = subprocess.Popen([
FFPROBE,
'-i', self.input_video,
'-of', 'json',
'-show_streams'
], stdout=subprocess.PIPE, stderr=DEV_NULL)
except OSError as e:
if e.errno == os.errno.ENOENT:
Logger.error("FFPROBE not found,install on your system to use this script")
sys.exit(0)
output = proc.stdout.read()
return get_audio_streams(json.loads(output))
def launch(self, cfg, path, flags):
logging.debug("Determine the OS and Architecture this application is currently running on")
hostOS = platform.system().lower()
logging.debug("hostOS: " + str(hostOS))
is_64bits = sys.maxsize > 2 ** 32
if is_64bits:
hostArchitecture = 'x64'
else:
hostArchitecture = 'ia32'
logging.debug("hostArchitecture: " + str(hostArchitecture))
if(self.validateConfig(cfg, hostOS, hostArchitecture)):
fnull = open(os.devnull, 'w')
if os.environ.get("WPW_HOME") is not None:
cmd = [os.environ["WPW_HOME"] + '/bin/rpc-agent-' + platform.system().lower() + '-' + self.detectHostArchitecture()]
else:
cmd = [path + '/wpwithinpy/iot-core-component/bin/rpc-agent-' + platform.system().lower() + '-' + self.detectHostArchitecture()]
cmd.extend(flags)
proc = subprocess.Popen(cmd, stdin=None, stdout=fnull, stderr=subprocess.STDOUT)
return proc
else:
logging.debug("Invalid OS/Architecture combination detected")
def win64_available(self):
wine_bin, \
wineserver_bin, \
wine_lib = self.get_wine_bin_path()
dev_null = open(os.devnull, 'w')
try:
proc = subprocess.Popen([wine_bin + '64'], stdout=dev_null, \
stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
dev_null.close()
stdoutdata, stderrdata = proc.communicate()
if proc.returncode == 1:
self.comboBox_winearch.set_visible(True)
return True
else:
self.comboBox_winearch.set_visible(False)
self.winearch = 'win32'
return False
except:
self.comboBox_winearch.set_visible(False)
self.winearch = 'win32'
return False
def isDaemonAlive(hostAndPort="{0}:{1}".format(IPFSAPI_IP, IPFSAPI_PORT)):
@H_404_42@"""Ensure that the IPFS daemon is running via HTTP before proceeding"""
client = ipfsapi.Client(IPFSAPI_IP, IPFSAPI_PORT)
try:
# OSError if ipfs not installed,redundant of below
# subprocess.call(['ipfs','--version'],stdout=open(devnull,'wb'))
# ConnectionError/AttributeError if IPFS daemon not running
client.id()
return True
except (ConnectionError, exceptions.AttributeError):
logError("Daemon is not running at http://" + hostAndPort)
return False
except OSError:
logError("IPFS is likely not installed. "
"See https://ipfs.io/docs/install/")
sys.exit()
except:
logError('UnkNown error in retrieving daemon status')
logError(sys.exc_info()[0])
def _query(self):
@H_404_42@"""
@H_404_42@ Returns the value of deepsea_minions
@H_404_42@ """
# When search matches no minions,salt prints to stdout.
# Suppress stdout.
_stdout = sys.stdout
sys.stdout = open(os.devnull, 'w')
# Relying on side effect - pylint: disable=unused-variable
ret = self.local.cmd('*', 'saltutil.pillar_refresh')
minions = self.local.cmd('*', 'pillar.get', ['deepsea_minions'],
expr_form="compound")
sys.stdout = _stdout
for minion in minions:
if minions[minion]:
return minions[minion]
log.error("deepsea_minions is not set")
return []
def _matches(self):
@H_404_42@"""
@H_404_42@ Returns the list of matched minions
@H_404_42@ """
if self.deepsea_minions:
# When search matches no minions,salt prints to stdout.
# Suppress stdout.
_stdout = sys.stdout
sys.stdout = open(os.devnull, 'w')
result = self.local.cmd(self.deepsea_minions,
'pillar.get',
['id'],
expr_form="compound")
sys.stdout = _stdout
return result.keys()
return []
def convert_image(inpath, stderr=subprocess.STDOUT)
if retcode != 0:
raise RuntimeError('sips exited with {0}'.format(retcode))
def convert_image(inpath, stderr=subprocess.STDOUT)
if retcode != 0:
raise RuntimeError('sips exited with {0}'.format(retcode))
def test_exception(self):
# Make sure handler fun is called on exception.
ret = pyrun(textwrap.dedent(
@H_404_42@"""
@H_404_42@ import os,r"{}")
@H_404_42@ def foo():
@H_404_42@ with open(r"{}","ab") as f:
@H_404_42@ f.write(b"1")
@H_404_42@ mod.register_exit_fun(foo)
@H_404_42@ sys.stderr = os.devnull
@H_404_42@ 1 / 0
@H_404_42@ """.format(os.path.abspath(__file__), TESTFN)
))
self.assertEqual(ret, 1)
with open(TESTFN, "rb") as f:
self.assertEqual(f.read(), b"1")
def test_kinterrupt(self):
# Simulates CTRL + C and make sure the exit function is called.
ret = pyrun(textwrap.dedent(
@H_404_42@"""
@H_404_42@ import os,"ab") as f:
@H_404_42@ f.write(b"1")
@H_404_42@ mod.register_exit_fun(foo)
@H_404_42@ sys.stderr = os.devnull
@H_404_42@ raise KeyboardInterrupt
@H_404_42@ """.format(os.path.abspath(__file__), b"1")
def __init__(self, raw_email, debug=False):
@H_404_42@'''
@H_404_42@ Setup the base options of the copy/convert setup
@H_404_42@ '''
self.raw_email = raw_email
self.log_processing = StringIO()
self.log_content = StringIO()
self.tree(self.raw_email)
twiggy_out = outputs.StreamOutput(formats.shell_format, stream=self.log_processing)
emitters['*'] = filters.Emitter(levels.DEBUG, True, twiggy_out)
self.log_name = log.name('files')
self.cur_attachment = None
self.debug = debug
if self.debug:
if not os.path.exists('debug_logs'):
os.makedirs('debug_logs')
self.log_debug_err = os.path.join('debug_logs', 'debug_stderr.log')
self.log_debug_out = os.path.join('debug_logs', 'debug_stdout.log')
else:
self.log_debug_err = os.devnull
self.log_debug_out = os.devnull
def call(self, args, devnull=False):
@H_404_42@"""Call other processes.
@H_404_42@ args - list of command args
@H_404_42@ devnull - whether to pipe stdout to /dev/null (or equivalent)
@H_404_42@ """
if self.debug:
click.echo(subprocess.list2cmdline(args))
click.confirm('Continue?', default=True, abort=True)
try:
kwargs = {}
if devnull:
# Pipe to /dev/null (or equivalent).
kwargs['stderr'] = subprocess.STDOUT
kwargs['stdout'] = self.FNULL
ret_code = subprocess.call(args, **kwargs)
except subprocess.CalledProcessError:
return False
return ret_code
def _check_ssl_cert(self, cert, key):
# Check SSL-Certificate with openssl,if possible
try:
exit_code = subprocess.call(
["openssl", "x509", "-text", "-noout", "-in", cert],
stdout=open(os.devnull, 'wb'),
stderr=subprocess.STDOUT)
except OSError:
exit_code = 0
if exit_code is 0:
try:
self.httpd.socket = ssl.wrap_socket(
self.httpd.socket, certfile=cert, keyfile=key, server_side=True)
except ssl.SSLError as error:
self.logger.exception('Failed to init SSL socket')
raise TelegramError(str(error))
else:
raise TelegramError('SSL Certificate invalid')
def convert_image(inpath, stderr=subprocess.STDOUT)
if retcode != 0:
raise RuntimeError('sips exited with {0}'.format(retcode))
def convert_image(inpath, stderr=subprocess.STDOUT)
if retcode != 0:
raise RuntimeError('sips exited with {0}'.format(retcode))
def run_script(script):
script = 'set -euo pipefail\n' + script
try:
with open(os.devnull) as devnull:
# is this the right way to block stdin?
data = subprocess.check_output(['bash', '-c', script], stderr=subprocess.STDOUT, stdin=devnull)
status = 0
except subprocess.CalledProcessError as ex:
data = ex.output
status = ex.returncode
data = data.decode('utf8')
if status != 0:
raise PheWebError(
'Failed with status {}\n'.format(status) +
'output was:\n' +
data)
return data
def __init__(self, targetfd, tmpfile=None):
self.targetfd = targetfd
try:
self.targetfd_save = os.dup(self.targetfd)
except OSError:
self.start = lambda: None
self.done = lambda: None
else:
if targetfd == 0:
assert not tmpfile, "cannot set tmpfile with stdin"
tmpfile = open(os.devnull, "r")
self.syscapture = SysCapture(targetfd)
else:
if tmpfile is None:
f = TemporaryFile()
with f:
tmpfile = safe_text_dupfile(f, mode="wb+")
if targetfd in patchsysdict:
self.syscapture = SysCapture(targetfd, tmpfile)
else:
self.syscapture = NoCapture()
self.tmpfile = tmpfile
self.tmpfile_fd = tmpfile.fileno()
def analyze_structure(seq,filename,ensemble=False):
chdir(project_dir)
system("echo '" + str(seq) + "' > " + filename + ".seq")
fnull = open(devnull, 'w') #this line is necessary to omit output generated by UNAFOLD
if ensemble:
call("./3rdParty/unafold/UNAFold.pl -n RNA " + filename + ".seq", shell = True, stdout = fnull, stderr = fnull) #code is necessary to omit output generated by UNAFOLD
else:
call("./3rdParty/unafold/hybrid-ss-min -n RNA " + filename + ".seq", stderr = fnull) #code is necessary to omit output generated by UNAFOLD
if os.path.isfile(filename+".ct"):
system("mv %s*.ct tmp/structures/" % filename)
# remove tmp files
system("rm %s*" % filename)
#system("mv " + filename + "* tmp/unafold_files/")
fnull.close()
return 1
def _run_cdhit(self, fasta_path, identity=0.95, word_size=5, description_length=0, cdhit_path=None):
@H_404_42@""" Run CDHIT-EST for sequences,install with sudo apt install cd-hit on Ubuntu """
self.messages.get_cdhit_message(identity)
if cdhit_path is None:
cdhit_path = "cd-hit-est"
file_name = self.project + "_IdentityClusters_" + str(identity)
out_file = os.path.join(self.tmp_path, file_name)
cluster_path = os.path.join(self.tmp_path, file_name + '.clstr')
with open(os.devnull, "w") as devnull:
call([cdhit_path, "-i", "-o", out_file, "-c", str(identity), "-n", str(word_size),
"-d", str(description_length)], stdout=devnull)
return cluster_path
def disassemble_it(filename, address=None):
@H_404_42@"""
@H_404_42@ Wrapper for the ruby disassembler script.
@H_404_42@ """
FNULL = open(os.devnull, 'w')
if address is not None:
outfile = filename + "_disass_" + hex(address)
else:
outfile = filename + "_disass_None"
args = ['ruby', 'analysis_tools/disassfunc.rb',
"-graph", "-svg", outfile, filename]
if address is not None:
args.append(hex(address))
proc = Popen(args, stdin=FNULL, stdout=FNULL, stderr=FNULL)
proc.wait()
FNULL.close()
app.logger.debug("disassembly just finished!")
return outfile
def analyze_it(self):
@H_404_42@"""
@H_404_42@ Wrapper for the ruby analysis script.
@H_404_42@ Executes and get results from files.
@H_404_42@ """
FNULL = open(os.devnull, 'w')
args = ['ruby', 'analysis_tools/AnalyzeIt.rb', self.storage_file]
proc = Popen(args, stderr=FNULL)
proc.wait()
FNULL.close()
# TEXT report,just UTF-8 decode/parsing
fname = self.storage_file + '.txt'
if os.path.exists(fname):
data = open(fname, 'rb').read()
self.txt_report = re.sub(r'[^\x00-\x7F]', '', data).decode('utf-8')
return True
def test_no_clustering(self):
with redirected_stdio(stderr=os.devnull):
obs_table, obs_sequences = cluster_features_de_novo(
sequences=self.input_sequences, table=self.input_table,
perc_identity=1.0)
# order of identifiers is important for biom.Table equality
obs_table = \
obs_table.sort_order(self.input_table.ids(axis='observation'),
axis='observation')
self.assertEqual(obs_table, self.input_table)
obs_seqs = _read_seqs(obs_sequences)
# sequences are reverse-sorted by abundance in output
exp_seqs = [self.input_sequences_list[0], self.input_sequences_list[3],
self.input_sequences_list[2], self.input_sequences_list[1]]
self.assertEqual(obs_seqs, exp_seqs)
def test_99_percent_clustering(self):
exp_table = biom.Table(np.array([[104, 106, 109],
[1, 1, 2],
[7, 8, 9]]),
['feature1', 'feature2',
'feature4'],
['sample1', 'sample2', 'sample3'])
with redirected_stdio(stderr=os.devnull):
obs_table,
perc_identity=0.99)
# order of identifiers is important for biom.Table equality
obs_table = \
obs_table.sort_order(exp_table.ids(axis='observation'), exp_table)
# sequences are reverse-sorted by abundance in output
obs_seqs = _read_seqs(obs_sequences)
exp_seqs = [self.input_sequences_list[0],
self.input_sequences_list[1]]
self.assertEqual(obs_seqs, exp_seqs)
def test_97_percent_clustering_feature1_most_abundant(self):
exp_table = biom.Table(np.array([[111, 114, 118], 2]]), 'feature2'],
perc_identity=0.97)
# order of identifiers is important for biom.Table equality
obs_table = \
obs_table.sort_order(exp_table.ids(axis='observation'), exp_seqs)
def test_uchime_denovo(self):
with redirected_stdio(stderr=os.devnull):
chime, nonchime, stats = uchime_denovo(
sequences=self.input_sequences, table=self.input_table)
obs_chime = _read_seqs(chime)
exp_chime = [self.input_sequences_list[3]]
self.assertEqual(obs_chime, exp_chime)
# sequences are reverse-sorted by abundance in output
obs_nonchime = _read_seqs(nonchime)
exp_nonchime = [self.input_sequences_list[0],
self.input_sequences_list[1],
self.input_sequences_list[2]]
self.assertEqual(obs_nonchime, exp_nonchime)
with stats.open() as stats_fh:
stats_text = stats_fh.read()
self.assertTrue('feature1' in stats_text)
self.assertTrue('feature2' in stats_text)
self.assertTrue('feature3' in stats_text)
self.assertTrue('feature4' in stats_text)
stats_lines = [e for e in stats_text.split('\n')
if len(e) > 0]
self.assertEqual(len(stats_lines), 4)
def test_join_pairs_all_samples_w_no_joined_seqs(self):
# minmergelen is set very high here,resulting in no sequences
# being joined across the three samples.
with redirected_stdio(stderr=os.devnull):
obs = join_pairs(self.input_seqs, minmergelen=500)
# manifest is as expected
self._test_manifest(obs)
# expected number of fastq files are created
output_fastqs = list(obs.sequences.iter_views(FastqGzFormat))
self.assertEqual(len(output_fastqs), 3)
for fastq_name, fastq_path in output_fastqs:
with redirected_stdio(stderr=os.devnull):
seqs = skbio.io.read(str(fastq_path), format='fastq',
compression='gzip', constructor=skbio.DNA)
seqs = list(seqs)
seq_lengths = np.asarray([len(s) for s in seqs])
self.assertEqual(len(seq_lengths), 0)
def get_parsed_args(self, comp_words):
@H_404_42@""" gets the parsed args from a patched parser """
active_parsers = self._patch_argument_parser()
parsed_args = argparse.Namespace()
self.completing = True
if USING_PYTHON2:
# Python 2 argparse only properly works with byte strings.
comp_words = [ensure_bytes(word) for word in comp_words]
try:
stderr = sys.stderr
sys.stderr = io.open(os.devnull, "w")
active_parsers[0].parse_kNown_args(comp_words, namespace=parsed_args)
sys.stderr.close()
sys.stderr = stderr
except BaseException:
pass
self.completing = False
return parsed_args
def convert_image(inpath, stderr=subprocess.STDOUT)
if retcode != 0:
raise RuntimeError('sips exited with {0}'.format(retcode))
def convert_image(inpath, stderr=subprocess.STDOUT)
if retcode != 0:
raise RuntimeError('sips exited with {0}'.format(retcode))
def get_cloud_project():
cmd = [
'gcloud', '-q', 'config', 'list', 'project',
'--format=value(core.project)'
]
with open(os.devnull, 'w') as dev_null:
try:
res = subprocess.check_output(cmd, stderr=dev_null).strip()
if not res:
raise Exception('--cloud specified but no Google Cloud Platform '
'project found.\n'
'Please specify your project name with the --project '
'flag or set a default project: '
'gcloud config set project YOUR_PROJECT_NAME')
return res
except OSError as e:
if e.errno == errno.ENOENT:
raise Exception('gcloud is not installed. The Google Cloud SDK is '
'necessary to communicate with the Cloud ML service. '
'Please install and set up gcloud.')
raise
def convert_image(inpath, size):
@H_404_42@"""Convert an image file using ``sips``.
@H_404_42@ Args:
@H_404_42@ inpath (str): Path of source file.
@H_404_42@ outpath (str): Path to destination file.
@H_404_42@ size (int): Width and height of destination image in pixels.
@H_404_42@ Raises:
@H_404_42@ RuntimeError: Raised if ``sips`` exits with non-zero status.
@H_404_42@ """
cmd = [
b'sips', str(size), stderr=subprocess.STDOUT)
if retcode != 0:
raise RuntimeError('sips exited with %d' % retcode)
def validate(opt, agent):
old_datatype = agent.opt['datatype']
agent.opt['datatype'] = 'valid'
opt = deepcopy(opt)
opt['datatype'] = 'valid'
opt['terminate'] = True
opt['batchsize'] = 1
old_stdout = sys.stdout
sys.stdout = open(os.devnull, 'w')
valid_world = create_task(opt, agent)
sys.stdout = old_stdout
for _ in valid_world:
valid_world.parley()
stats = valid_world.report()
agent.opt['datatype'] = old_datatype
return stats
def build(self):
self.line('[<comment>Building tzdata</>]')
dest_path = os.path.join(self.path, 'tz')
# Getting VERSION
with open(os.path.join(dest_path, 'version')) as f:
version = f.read().strip()
self.write('<comment>Building</> version <fg=cyan>{}</>'.format(version))
os.chdir(dest_path)
with open(os.devnull, 'w') as temp:
subprocess.call(
['make', 'TOPDIR={}'.format(dest_path), 'install'],
stdout=temp,
stderr=temp
)
self.overwrite('<info>Built</> version <fg=cyan>{}</>'.format(version))
self.line('')
def convert_image(inpath, stderr=subprocess.STDOUT)
if retcode != 0:
raise RuntimeError('sips exited with {0}'.format(retcode))
def convert_image(inpath, stderr=subprocess.STDOUT)
if retcode != 0:
raise RuntimeError('sips exited with {0}'.format(retcode))
def TeeCmd(cmd, logfile, fail_hard=True):
@H_404_42@"""Runs cmd and writes the output to both stdout and logfile."""
# Reading from PIPE can deadlock if one buffer is full but we wait on a
# different one. To work around this,pipe the subprocess's stderr to
# its stdout buffer and don't give it a stdin.
# shell=True is required in cmd.exe since depot_tools has an svn.bat,and
# bat files only work with shell=True set.
proc = subprocess.Popen(cmd, bufsize=1, shell=sys.platform == 'win32',
stdin=open(os.devnull),
stderr=subprocess.STDOUT)
for line in iter(proc.stdout.readline,''):
Tee(line, logfile)
if proc.poll() is not None:
break
exit_code = proc.wait()
if exit_code != 0 and fail_hard:
print 'Failed:', cmd
sys.exit(1)
def run(self, with_intermediate_file=False, cwd=None):
@H_404_42@"""Method to run the backup command where it applies."""
command = self.build_dump_command()
if with_intermediate_file:
try:
backup_file_f = open('%s/%s' % (self.output_directory,
self.backup_file), 'w')
except IOError as exc:
raise
p = subprocess.Popen(command.split(), stdout=backup_file_f,
env=self.env, cwd=cwd)
p.wait()
backup_file_f.flush()
else:
FNULL = open(os.devnull, 'w')
p = subprocess.Popen(command.split(), env=self.env, cwd=cwd,
stdout=FNULL, stderr=subprocess.STDOUT)
def restore(self, backup_filename,with_intermediate_file=False):
@H_404_42@"""Method to restore the backup."""
self.store.get(self.swift_container,
self.output_directory)
command = self.build_restore_command(backup_filename)
if with_intermediate_file:
file_path = '%s/%s' % (self.output_directory, backup_filename)
backup_file_content = open(file_path, 'r').read()
p = subprocess.Popen(command.split(), stdin=subprocess.PIPE)
p.communicate(backup_file_content)
else:
FNULL = open(os.devnull,
stderr=subprocess.STDOUT)
p.wait()
if self.clean_local_copy:
self._clean_local_copy(backup_filename)
def dump_database(id):
@H_404_42@"""Dump the database to a temporary directory."""
tmp_dir = tempfile.mkdtemp()
current_dir = os.getcwd()
os.chdir(tmp_dir)
FNULL = open(os.devnull, 'w')
heroku_app = HerokuApp(dallinger_uid=id, output=FNULL)
heroku_app.backup_capture()
heroku_app.backup_download()
for filename in os.listdir(tmp_dir):
if filename.startswith("latest.dump"):
os.rename(filename, "database.dump")
os.chdir(current_dir)
return os.path.join(tmp_dir, "database.dump")
def setUp(self):
super(PostArgParseSetupTest, self).setUp()
self.config.debug = False
self.config.max_log_backups = 1000
self.config.quiet = False
self.config.verbose_count = constants.CLI_DEFAULTS['verbose_count']
self.devnull = open(os.devnull, 'w')
from certbot.log import ColoredStreamHandler
self.stream_handler = ColoredStreamHandler(six.StringIO())
from certbot.log import MemoryHandler, TempHandler
self.temp_handler = TempHandler()
self.temp_path = self.temp_handler.path
self.memory_handler = MemoryHandler(self.temp_handler)
self.root_logger = mock.Magicmock(
handlers=[self.memory_handler, self.stream_handler])