Python os 模块,linesep() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.linesep()。
def _install_message(self, message):
"""Format a message and blindly write to self._file."""
from_line = None
if isinstance(message, str) and message.startswith('From '):
newline = message.find('\n')
if newline != -1:
from_line = message[:newline]
message = message[newline + 1:]
else:
from_line = message
message = ''
elif isinstance(message, _mboxMMDFMessage):
from_line = 'From ' + message.get_from()
elif isinstance(message, email.message.Message):
from_line = message.get_unixfrom() # May be None.
if from_line is None:
from_line = 'From MAILER-DAEMON %s' % time.asctime(time.gmtime())
start = self._file.tell()
self._file.write(from_line + os.linesep)
self._dump_message(message, self._file, self._mangle_from_)
stop = self._file.tell()
return (start, stop)
def _generate_toc(self):
"""Generate key-to-(start,stop) table of contents."""
starts, stops = [], []
self._file.seek(0)
while True:
line_pos = self._file.tell()
line = self._file.readline()
if line.startswith('From '):
if len(stops) < len(starts):
stops.append(line_pos - len(os.linesep))
starts.append(line_pos)
elif line == '':
stops.append(line_pos)
break
self._toc = dict(enumerate(zip(starts, stops)))
self._next_key = len(self._toc)
self._file_length = self._file.tell()
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip '1,' line specifying labels.
original_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == '*** EOOH ***' + os.linesep or line == '':
break
original_headers.write(line.replace(os.linesep, '\n'))
visible_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == os.linesep or line == '':
break
visible_headers.write(line.replace(os.linesep, '\n'))
body = self._file.read(stop - self._file.tell()).replace(os.linesep,
'\n')
msg = BabylMessage(original_headers.getvalue() + body)
msg.set_visible(visible_headers.getvalue())
if key in self._labels:
msg.set_labels(self._labels[key])
return msg
def get_string(self, key):
"""Return a string representation or raise a KeyError."""
start, '\n'))
while True:
line = self._file.readline()
if line == os.linesep or line == '':
break
return original_headers.getvalue() + \
self._file.read(stop - self._file.tell()).replace(os.linesep,
'\n')
def fix_script(path):
"""Replace #!python with #!/path/to/python
Return True if file was changed."""
# XXX RECORD hashes will need to be updated
if os.path.isfile(path):
script = open(path, 'rb')
try:
firstline = script.readline()
if not firstline.startswith(binary('#!python')):
return False
exename = sys.executable.encode(sys.getfilesystemencoding())
firstline = binary('#!') + exename + binary(os.linesep)
rest = script.read()
finally:
script.close()
script = open(path, 'wb')
try:
script.write(firstline)
script.write(rest)
finally:
script.close()
return True
def main(initial_args=None):
if initial_args is None:
initial_args = sys.argv[1:]
autocomplete()
try:
cmd_name, cmd_args = parseopts(initial_args)
except PipError:
e = sys.exc_info()[1]
sys.stderr.write("ERROR: %s" % e)
sys.stderr.write(os.linesep)
sys.exit(1)
command = commands[cmd_name]()
return command.main(cmd_args)
def main(args=None):
if args is None:
args = sys.argv[1:]
# Configure our deprecation warnings to be sent through loggers
deprecation.install_warning_logger()
autocomplete()
try:
cmd_name, cmd_args = parseopts(args)
except PipError as exc:
sys.stderr.write("ERROR: %s" % exc)
sys.stderr.write(os.linesep)
sys.exit(1)
# Needed for locale.getpreferredencoding(False) to work
# in pip._internal.utils.encoding.auto_decode
try:
locale.setlocale(locale.LC_ALL, '')
except locale.Error as e:
# setlocale can apparently crash if locale are uninitialized
logger.debug("Ignoring error %s when setting locale", e)
command = commands_dict[cmd_name](isolated=check_isolated(cmd_args))
return command.main(cmd_args)
def remove(path, pth_file=None):
pth_file = pth_file or PATH
try:
with open(pth_file, "r") as f:
lines = f.read().splitlines()[2:]
except IOError:
raise ValueError("not found: {}".format(path))
try:
lines.remove(path)
except ValueError:
raise ValueError("not found: {}".format(path))
lines.insert(0, HEADER)
with open(pth_file, "w") as f:
f.write(os.linesep.join(lines))
def _printf(fmt, *args, **print3opts):
'''Formatted print to sys.stdout or given stream.
*print3opts* -- print keyword arguments,like Python 3.+
'''
if print3opts: # like Python 3.0
f = print3opts.get('file', None) or sys.stdout
if args:
f.write(fmt % args)
else:
f.write(fmt)
f.write(print3opts.get('end', linesep))
if print3opts.get('flush', False):
f.flush()
elif args:
print(fmt % args)
else:
print(fmt)
def set_exit_code(bld):
"""
If any of the tests fail waf will exit with that exit code.
This is useful if you have an automated build system which need
to report on errors from the tests.
You may use it like this:
def build(bld):
bld(features='cxx cxxprogram test',source='main.c',target='app')
from waflib.Tools import waf_unit_test
bld.add_post_fun(waf_unit_test.set_exit_code)
"""
lst = getattr(bld, 'utest_results', [])
for (f, code, out, err) in lst:
if code:
msg = []
if out:
msg.append('stdout:%s%s' % (os.linesep, out.decode('utf-8')))
if err:
msg.append('stderr:%s%s' % (os.linesep, err.decode('utf-8')))
bld.fatal(os.linesep.join(msg))
def write_json(self, data, pretty=True):
"""
Writes a python object as JSON to disk. Files are always written as UTF8 as per the JSON standard::
def build(bld):
bld.path.find_node('xyz.json').write_json(199)
:type data: object
:param data: The data to write to disk
:type pretty: boolean
:param pretty: Determines if the JSON will be nicely space separated
"""
import json # Python 2.6 and up
indent = 2
separators = (',', ': ')
sort_keys = pretty
newline = os.linesep
if not pretty:
indent = None
separators = (', ':')
newline = ''
output = json.dumps(data, indent=indent, separators=separators, sort_keys=sort_keys) + newline
self.write(output, encoding='utf-8')
def set_exit_code(bld):
"""
If any of the tests fail waf will exit with that exit code.
This is useful if you have an automated build system which need
to report on errors from the tests.
You may use it like this:
def build(bld):
bld(features='cxx cxxprogram test', err.decode('utf-8')))
bld.fatal(os.linesep.join(msg))
def set_exit_code(bld):
"""
If any of the tests fail waf will exit with that exit code.
This is useful if you have an automated build system which need
to report on errors from the tests.
You may use it like this:
def build(bld):
bld(features='cxx cxxprogram test', err.decode('utf-8')))
bld.fatal(os.linesep.join(msg))
def __init__(self, to=sys.stdout, only=(), **kwargs):
super(DebugStream, self).__init__(*args, **kwargs)
def safe_str(chunk):
if isinstance(chunk, bytes):
chunk = chunk.decode("utf-8")
elif not isinstance(chunk, str):
chunk = str(chunk)
return chunk
class Bugger(object):
__before__ = __after__ = lambda *args: None
def __getattr__(self, event):
def inner(*args, **kwargs):
to.write(event.upper() + " ")
to.write("; ".join(map(safe_str, args)))
to.write(" ")
to.write(",".join("{0}: {1}".format(k, safe_str(v))
for k, v in kwargs.items()))
to.write(os.linesep)
return inner
self.attach(Bugger(), only=only)
def generate():
global sound
statement = coremeraco.constructRegularStatement() + os.linesep
textArea.insert(Tkinter.END, statement)
textArea.yview(Tkinter.END)
if readOption and readOutputState.get():
if sound:
sound.stop()
sound = playsnd.playsnd()
threading.Thread(target=sound.play, args=(procfest.text2wave(statement),)).start()
if saveOutputState.get():
try:
outputFileHandler = open(saveOutputDestination.get(), 'a') # 'a' means append mode
outputFileHandler.write(statement)
outputFileHandler.close()
except IOError as Argument:
tkMessageBox.showerror(productName, meracolocale.getLocalisedString(localisationFile, "ioErrorMessage") + str(Argument))
except Exception as Argument:
tkMessageBox.showerror(productName, "genericErrorMessage") + str(Argument))
def test_append_metadata_file(tmpdir):
initial_data = 'some data'
my_metadata = {
'some': 'data',
'for': 'this metadata'
}
output_file = tmpdir.join('metadata')
output_file.write_binary((initial_data + os.linesep).encode('utf-8'))
with metadata.MetadataWriter(suppress_output=False)(str(output_file)) as writer:
writer.write_metadata(**my_metadata)
lines = output_file.readlines()
assert len(lines) == 2
assert lines[0].strip() == initial_data
assert json.loads(lines[1].strip()) == my_metadata
def test_overwrite_metdata_file(tmpdir):
initial_data = 'some data'
my_metadata = {
'some': 'data',
'for': 'this metadata'
}
output_file = tmpdir.join('metadata')
output_file.write(initial_data + os.linesep)
overwrite_writer = metadata.MetadataWriter(suppress_output=False)(str(output_file))
overwrite_writer.force_overwrite()
with overwrite_writer as writer:
writer.write_metadata(**my_metadata)
lines = output_file.readlines()
assert len(lines) == 1
assert json.loads(lines[0].strip()) == my_metadata
def write_metadata(self, **metadata):
# type: (**Any) -> Optional[int]
"""Writes metadata to the output stream if output is not suppressed.
:param **metadata: JSON-serializeable metadata kwargs to write
"""
if self.suppress_output:
return 0 # wrote 0 bytes
metadata_line = json.dumps(metadata, sort_keys=True) + os.linesep
metadata_output = '' # type: Union[str,bytes]
if 'b' in self._output_mode:
metadata_output = metadata_line.encode('utf-8')
else:
metadata_output = metadata_line
return self._output_stream.write(metadata_output)
def _collect(self, room_name, compress):
event = yield idiokit.next()
while True:
current = datetime.utcnow().day
with _open_archive(self.archive_dir, time.time(), room_name) as archive:
self.log.info("Opened archive {0!r}".format(archive.name))
while current == datetime.utcnow().day:
json_dict = dict((key, event.values(key)) for key in event.keys())
archive.write(json.dumps(json_dict) + os.linesep)
event = yield idiokit.next()
yield compress.queue(0.0, _rename(archive.name))
def output(self):
if self._params['delimiter'] is None:
delim = ','
else:
delim = self._params['delimiter']
if self._params['linesep'] is None:
eol = os.linesep
else:
eol = self._params['linesep']
lines = eol.join([delim.join(map(str, row)) for row in self._params['data_matrix']])
if self._params['header_list']:
header = delim.join(self._params['header_list'])
else:
header = ''
return header + eol + lines
def main(args=None):
if args is None:
args = sys.argv[1:]
# Configure our deprecation warnings to be sent through loggers
deprecation.install_warning_logger()
autocomplete()
try:
cmd_name, cmd_args = parseopts(args)
except PipError as exc:
sys.stderr.write("ERROR: %s" % exc)
sys.stderr.write(os.linesep)
sys.exit(1)
# Needed for locale.getpreferredencoding(False) to work
# in pip.utils.encoding.auto_decode
locale.setlocale(locale.LC_ALL, '')
command = commands_dict[cmd_name](isolated=check_isolated(cmd_args))
return command.main(cmd_args)
# ###########################################################
# # Writing freeze files
def keyPressEvent(self, qkeyevent):
if qkeyevent.matches(QKeySequence.Copy):
selected_rows = [x.row() for x in self.selectionModel().selectedRows()]
logger.info('Copy to clipboard requested [%r rows]' % len(selected_rows))
out_string = ''
for row in selected_rows:
out_string += self.get_row_as_string(row) + os.linesep
if out_string:
QApplication.clipboard().setText(out_string)
else:
super(BasicTable, self).keyPressEvent(qkeyevent)
if qkeyevent.matches(QKeySequence.InsertParagraphSeparator):
if self.hasFocus():
self.on_enter_pressed([(x.row(), x.column()) for x in self.selectedIndexes()])
def print_help():
"""Print help encoded as initial script's comment between #< and #>."""
show_line = False
fp = open(sys.argv[0], 'r')
for line in fp:
if line[0:2] == '#<': # start token
show_line = True
continue
elif line[0:2] == '#>': # end token
return
if show_line == True:
print(line.rstrip(os.linesep)[1:])
fp.close()
def gui_ask_for_api():
"""Gtk dialog for API key insert."""
message = gtk.MessageDialog(type=gtk.MESSAGE_QUESTION, buttons=gtk.BUTTONS_OK_CANCEL)
message.set_markup(colorize.MSG_ASK_API.replace(colorize.URL,"<u>" + colorize.URL +"</u>"))
entry = gtk.Entry(max=64)
entry.set_text("Enter your API key")
entry.show()
message.vbox.pack_end(entry)
entry.connect("activate", lambda _: d.response(gtk.RESPONSE_OK))
message.set_default_response(gtk.RESPONSE_OK)
message.run()
api_key = entry.get_text().decode('utf8')
fp = open(colorize.HOME + colorize.API_KEY_FILE, 'w')
fp.write("YOUR_API_KEY={0}{1}".format(api_key, os.linesep))
fp.close()
# process buttong click immediately
message.destroy()
while gtk.events_pending():
gtk.main_iteration()
def testOutputSignal(self):
# Use SIGKILL here because it's guaranteed to be delivered. Using
# SIGHUP might not work in,e.g.,a buildbot slave run under the
# 'nohup' command.
exe = sys.executable
scriptFile = self.makeSourceFile([
"import sys,os,signal",
"sys.stdout.write('stdout bytes\\n')",
"sys.stderr.write('stderr bytes\\n')",
"sys.stdout.flush()",
"sys.stderr.flush()",
"os.kill(os.getpid(),signal.SIGKILL)"
])
def gotOutputAndValue(err):
(out, err, sig) = err.value # XXX Sigh wtf
self.assertEquals(out, "stdout bytes" + os.linesep)
self.assertEquals(err, "stderr bytes" + os.linesep)
self.assertEquals(sig, signal.SIGKILL)
d = utils.getProcessOutputAndValue(exe, ['-u', scriptFile])
return d.addErrback(gotOutputAndValue)
def fix_script(path):
"""Replace #!python with #!/path/to/python
Return True if file was changed."""
# XXX RECORD hashes will need to be updated
if os.path.isfile(path):
script = open(path, 'wb')
try:
script.write(firstline)
script.write(rest)
finally:
script.close()
return True
def main(initial_args=None):
if initial_args is None:
initial_args = sys.argv[1:]
autocomplete()
try:
cmd_name, cmd_args = parseopts(initial_args)
except PipError:
e = sys.exc_info()[1]
sys.stderr.write("ERROR: %s" % e)
sys.stderr.write(os.linesep)
sys.exit(1)
command = commands[cmd_name]()
return command.main(cmd_args)
def get_message(self,
'\n')
msg = BabylMessage(original_headers.getvalue() + body)
msg.set_visible(visible_headers.getvalue())
if key in self._labels:
msg.set_labels(self._labels[key])
return msg
def hard(args):
ref = collections.defaultdict(list)
with open(args.ref, "rb") as f:
b = f.read(1)
i = 0
while b:
ref[b].append(i)
i = i + 1
b = f.read(1)
if len(ref) < 256:
raise Exception
output = []
with open(args.input, "rb") as f:
b = f.read(1)
while b:
output.append(ref[b][random.randrange(len(ref[b]))])
b = f.read(1)
with open(args.output, "w") as f:
for item in output:
f.write(str(item) + os.linesep)
def __parse_data(self, proto_parse_class, file_hd):
"""
Args:
proto_parse_class:
data:
file_hd:
Returns:
"""
proto_parse_ob = proto_parse_class(data)
data_yield = proto_parse_ob.parse_data(sep="\x00")
for d in data_yield:
if d:
if file_hd:
file_hd.write("%r%s" % (d, os.linesep))
if self.std_output_enable:
if not isinstance(d, dict):
d = d.toDict()
print json.dumps(d, indent=4)
def __str__(self):
res = "DynArgDescriptor03 object" + os.linesep
res += " argument_type[0]='{0}'".format(self._type)
if self._vector_size > 1:
res += "*"+str(self._vector_size)
res += os.linesep
res += " access_descriptor[1]='{0}'".format(self._access_descriptor) \
+ os.linesep
if self._type == "gh_field":
res += " function_space[2]='{0}'".format(self._function_space1) \
+ os.linesep
elif self._type in VALID_OPERATOR_NAMES:
res += " function_space_to[2]='{0}'".\
format(self._function_space1) + os.linesep
res += " function_space_from[3]='{0}'".\
format(self._function_space2) + os.linesep
elif self._type in VALID_SCALAR_NAMES:
pass # we have nothing to add if we're a scalar
else: # we should never get to here
raise ParseError("Internal error in DynArgDescriptor03.__str__")
return res
def amend_commit_message(self, cherry_pick_branch):
""" prefix the commit message with (X.Y) """
commit_prefix = ""
if self.prefix_commit:
commit_prefix = f"[{get_base_branch(cherry_pick_branch)}] "
updated_commit_message = f"{commit_prefix}{self.get_commit_message(self.commit_sha1)}{os.linesep}(cherry picked from commit {self.commit_sha1})"
updated_commit_message = updated_commit_message.replace('#', 'GH-')
if self.dry_run:
click.echo(f" dry-run: git commit --amend -m '{updated_commit_message}'")
else:
try:
subprocess.check_output(["git", "commit", "--amend", "-m",
updated_commit_message],
stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as cpe:
click.echo("Failed to amend the commit message \u2639")
click.echo(cpe.output)
return updated_commit_message
def convert_and_parse_xml(src_model_fname):
dst_model_fname = os.path.basename(src_model_fname).split('.')[0] + '.xml.mdl'
with open(dst_model_fname, 'wb') as wfile:
wfile.write('<MODEL>\n')
with open(src_model_fname, 'rb') as rfile:
for line in rfile.readlines():
newline = line
if '<CNT>' in line:
newline = line.strip() + '</CNT>'
elif '<MEAN>' in line:
newline = line.strip() + '</MEAN>'
elif pn_re.findall(line):
newline = pn_re.sub(r'@ \2 \3 \4 \5 @',line)
wfile.write(newline.strip() + os.linesep)
wfile.write('</MODEL>\n')
xmldoc = minidom.parse(dst_model_fname)
os.remove(dst_model_fname)
return xmldoc
def start(self, args, config):
super(DisplayPlugin, self).start(args, config)
if args.pattern_help:
self.__print_pattern_help()
return
if not _have_grako:
console.printError(u"Unknown module 'grako'" + os.linesep +
u"Please install grako! " +
u"E.g. $ pip install grako")
self.__return_code = 2
return
if args.pattern_string is not None:
self.__pattern = Pattern(args.pattern_string)
if args.pattern_file is not None:
pfile = open(args.pattern_file, "r")
self.__pattern = Pattern(''.join(pfile.read().splitlines()))
pfile.close()
self.__output_ending = "" if args.no_newline else os.linesep
def __print_pattern_help(self):
# FIXME: Force some order
print(console.formatText("ID3 Tags:", b=True))
self.__print_complex_pattern_help(TagPattern)
print(os.linesep)
print(console.formatText("Functions:", b=True))
self.__print_complex_pattern_help(FunctionPattern)
print(os.linesep)
print(console.formatText("Special characters:", b=True))
print(console.formatText("\tescape seq. character"))
for i in range(len(TextPattern.SPECIAL_CHARACTERS)):
print(("\t\\%s" + (" " * 12) + "%s") %
(TextPattern.SPECIAL_CHARACTERS[i],
TextPattern.SPECIAL_CHARACTERS_DESCRIPTIONS[i]))
def _wrap_lines(msg):
"""Format lines nicely to 80 chars.
:param str msg: Original message
:returns: Formatted message respecting newlines in message
:rtype: str
"""
lines = msg.splitlines()
fixed_l = []
for line in lines:
fixed_l.append(textwrap.fill(
line,
80,
break_long_words=False,
break_on_hyphens=False))
return os.linesep.join(fixed_l)
def _ask_user_to_confirm_new_names(config, new_domains, certname, old_domains):
"""Ask user to confirm update cert certname to contain new_domains.
"""
if config.renew_with_new_domains:
return
msg = ("You are updating certificate {0} to include domains: {1}{br}{br}"
"It previously included domains: {2}{br}{br}"
"Did you intend to make this change?".format(
certname,
",".join(new_domains),".join(old_domains),
br=os.linesep))
obj = zope.component.getUtility(interfaces.IDisplay)
if not obj.yesno(msg, "Update cert", "Cancel", default=True):
raise errors.ConfigurationError("Specified mismatched cert name and domains.")
def test_get_version(self, mock_script):
mock_script.return_value = (
"Server Version: Apache/2.4.2 (Debian)", "")
self.assertEqual(self.config.get_version(), (2, 4, 2))
mock_script.return_value = (
"Server Version: Apache/2 (Linux)",))
mock_script.return_value = (
"Server Version: Apache (Debian)", "")
self.assertRaises(errors.PluginError, self.config.get_version)
mock_script.return_value = (
"Server Version: Apache/2.3{0} Apache/2.4.7".format(
os.linesep), self.config.get_version)
mock_script.side_effect = errors.SubprocessError("Can't find program")
self.assertRaises(errors.PluginError, self.config.get_version)
def update_init_file(version: tuple):
import os
import re
print('Updating __init__.py to have the correct version.')
version_line_re = re.compile(r'^\s+[\'"]version[\'"]: (\([0-9,]+\)),')
with open('__init__.py', 'r') as infile, \
open('__init__.py~whl~installer~', 'w') as outfile:
for line in infile:
if version_line_re.match(line):
outfile.write(" 'version': %s,%s" % (version, os.linesep))
else:
outfile.write(line)
os.unlink('__init__.py')
os.rename('__init__.py~whl~installer~', '__init__.py')
def _copy_partial_doc(cls):
for base in cls.mro():
if base.__module__.startswith('sklearn'):
break
lines = base.__doc__.split(os.linesep)
header, rest = lines[0], lines[1:]
insert = """
This class wraps scikit-learn's {classname}. When a dask-array is passed
to our ``fit`` method,the array is passed block-wise to the scikit-learn
class' ``partial_fit`` method. This will allow you to fit the estimator
on larger-than memory datasets sequentially (block-wise),but without an
parallelism,or any ability to distribute across a cluster.""".format(
classname=base.__name__)
doc = '\n'.join([header + insert] + rest)
cls.__doc__ = doc
return cls
def replace_line_in_file(f, start_exp, to_replace):
lines = read_lines_of_file(f)
for i in range(len(lines)):
if lines[i].startswith(start_exp):
lines[i] = to_replace + os.linesep
write_lines_into_file(Res.project_path + f, lines)
def ENMLtoText(contentENML):
soup = BeautifulSoup(contentENML.decode('utf-8'))
for section in soup.select('li > p'):
section.replace_with( section.contents[0] )
for section in soup.select('li > br'):
if section.next_sibling:
next_sibling = section.next_sibling.next_sibling
if next_sibling:
if next_sibling.find('li'):
section.extract()
else:
section.extract()
Editor.checklistInENMLtoSoup(soup)
for section in soup.findAll('en-todo', checked='true'):
section.replace_with('[x]')
for section in soup.findAll('en-todo'):
section.replace_with('[ ]')
content = html2text.html2text(str(soup).decode('utf-8'), '', 0)
content = re.sub(r' *\n', os.linesep, content)
return content.encode('utf-8')
def parseopts(args):
parser = create_main_parser()
# Note: parser calls disable_interspersed_args(),so the result of this
# call is to split the initial args into the general options before the
# subcommand and everything else.
# For example:
# args: ['--timeout=5','install','--user','INITools']
# general_options: ['--timeout==5']
# args_else: ['install','INITools']
general_options, args_else = parser.parse_args(args)
# --version
if general_options.version:
sys.stdout.write(parser.version)
sys.stdout.write(os.linesep)
sys.exit()
# pip || pip help -> print_help()
if not args_else or (args_else[0] == 'help' and len(args_else) == 1):
parser.print_help()
sys.exit()
# the subcommand name
cmd_name = args_else[0]
if cmd_name not in commands_dict:
guess = get_similar_commands(cmd_name)
msg = ['unknown command "%s"' % cmd_name]
if guess:
msg.append('maybe you meant "%s"' % guess)
raise CommandError(' - '.join(msg))
# all the args without the subcommand
cmd_args = args[:]
cmd_args.remove(cmd_name)
return cmd_name, cmd_args
def main(args=None):
if args is None:
args = sys.argv[1:]
# Configure our deprecation warnings to be sent through loggers
deprecation.install_warning_logger()
autocomplete()
try:
cmd_name, cmd_args = parseopts(args)
except PipError as exc:
sys.stderr.write("ERROR: %s" % exc)
sys.stderr.write(os.linesep)
sys.exit(1)
# Needed for locale.getpreferredencoding(False) to work
# in pip.utils.encoding.auto_decode
try:
locale.setlocale(locale.LC_ALL, e)
command = commands_dict[cmd_name](isolated=check_isolated(cmd_args))
return command.main(cmd_args)
# ###########################################################
# # Writing freeze files
def parseopts(args):
parser = create_main_parser()
# Note: parser calls disable_interspersed_args(), cmd_args
def main(args=None):
if args is None:
args = sys.argv[1:]
# Configure our deprecation warnings to be sent through loggers
deprecation.install_warning_logger()
autocomplete()
try:
cmd_name, e)
command = commands_dict[cmd_name](isolated=check_isolated(cmd_args))
return command.main(cmd_args)
# ###########################################################
# # Writing freeze files