Python click 模块,launch() 实例源码
我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用click.launch()。
def webui(obj):
'''List dashboard links for matching pods (if only one matched,URL is opened in browser).'''
kubectl = obj.kubey.kubectl
info = click.unstyle(kubectl.call_capture('cluster-info'))
dash_endpoint = re.search(r'kubernetes-dashboard.*?(http\S+)', info).group(1)
urls = []
for pod in obj.kubey.each_pod(obj.maximum):
pod_path = '/#/pod/{0}/{1}?namespace={0}'.format(pod.namespace, pod.name)
urls.append(dash_endpoint + pod_path)
if len(urls) == 1:
url = urls[0]
click.echo(url)
click.launch(url)
else:
for url in urls:
click.echo(url)
def preview(context):
"""Opens local preview of your blog website"""
config = context.obj
pelican(config, '--verbose', '--ignore-cache')
server_proc = None
os.chdir(config['OUTPUT_DIR'])
try:
try:
command = 'python -m http.server ' + str(PORT)
server_proc = run(command, bg=True)
time.sleep(3)
click.launch('http://localhost:8000')
time.sleep(5)
pelican(config, '--autoreload')
except:
if server_proc is not None:
server_proc.kill()
raise
except KeyboardInterrupt:
abort(context)
def search(ctx, name):
"""Search Curse Forge for a mod named NAME."""
search_result_description = {
'header': _('Search results for "{name}"').format_map(locals()),
'footer': _('Choose a mod to open its project page,or press [q] to quit.'),
}
moddb = ctx['default_game'].database
results = Mod.search(moddb.session(), name)
chosen = select_mod(results, **search_result_description)
if chosen is not None:
mod_page_url = 'https://www.curseforge.com/projects/{chosen.id}/'.format_map(locals())
click.launch(mod_page_url)
# Shared option -- location of mod-pack data
def log(config, l):
"""
Tempy log with all the deletions reports
"""
if l:
try:
last_cleanup = filemanager.unpickle_data("last-cleanup")
click.echo("\nPerformed at: " + last_cleanup["datetime"])
click.echo("\n* Deletions: " + str(last_cleanup["deletions"]))
click.echo("* Deletion size: " + converter.human_readable_size(last_cleanup["size"]))
click.echo("* Errors: " + str(last_cleanup["error_count"]))
except FileNotFoundError:
click.echo("No data was found.")
else:
click.launch(os.path.join(config.app_dir, config.log_name))
def cli(app, environment, branch, open_deploy):
"""
Deploy an application to an environment.
"""
config = load_config()
try:
client = VMFarmsapiclient.from_config(config)
application_list = client.get('applications')['results']
selected_application = next(application for application in application_list if application['name'] == app)
assert environment in selected_application['environments'], 'Invalid environment specified.'
data = {
'environment': environment,
'branch': branch,
}
response = client.post('applications/{application_id}/builds'.format(**selected_application), data=data)
build_id = response['id']
deploy_url = client.url_for('builds', 'deploys', build_id)
except AssertionError as exc:
output.die(str(exc))
except stopiteration:
output.die('Invalid app specified.')
except VMFarmsAPIError as error:
output.die(error.message, error.description)
else:
output.success('Triggered deploy! Monitor it at <{}>.'.format(deploy_url))
if open_deploy:
click.launch(deploy_url)
def read(ctx, item_id, with_note):
""" Read an item attachment. """
try:
item_id = pick_item(ctx.obj, item_id)
except ValueError as e:
ctx.fail(e.args[0])
read_att = None
attachments = ctx.obj.attachments(item_id)
if not attachments:
ctx.fail("Could not find an attachment for reading.")
elif len(attachments) > 1:
click.echo("Multiple attachments available.")
read_att = select([(att, att['data']['title'])
for att in attachments])
else:
read_att = attachments[0]
att_path = ctx.obj.get_attachment_path(read_att)
click.echo("opening '{}'.".format(att_path))
click.launch(str(att_path), wait=False)
if with_note:
existing_notes = list(ctx.obj.notes(item_id))
if existing_notes:
edit_existing = click.confirm("Edit existing note?")
if edit_existing:
note = pick_note(ctx, ctx.obj, item_id)
else:
note = None
else:
note = None
note_body = click.edit(
text=note['data']['note']['text'] if note else None,
extension=get_extension(ctx.obj.note_format))
if note_body and note is None:
ctx.obj.create_note(item_id, note_body)
elif note_body:
note['data']['note']['text'] = note_body
ctx.obj.save_note(note)
def create_api_key():
""" Interactively create a new API key via Zotero's OAuth API.
Requires the user to enter a verification key displayed in the browser.
:returns: API key and the user's library ID
"""
auth = OAuth1Service(
name='zotero',
consumer_key=CLIENT_KEY,
consumer_secret=CLIENT_SECRET,
request_token_url=REQUEST_TOKEN_URL,
access_token_url=ACCESS_TOKEN_URL,
authorize_url=AUTH_URL,
base_url=BASE_URL)
token, secret = auth.get_request_token(
params={'oauth_callback': 'oob'})
auth_url = auth.get_authorize_url(token)
auth_url += '&' + urlencode({
'name': 'zotero-cli',
'library_access': 1,
'notes_access': 1,
'write_access': 1,
'all_groups': 'read'})
click.echo("opening {} in browser,please confirm.".format(auth_url))
click.launch(auth_url)
verification = click.prompt("Enter verification code")
token_resp = auth.get_raw_access_token(
token, secret, method='POST',
data={'oauth_verifier': verification})
if not token_resp:
logging.debug(token_resp.content)
click.fail("Error during API key generation.")
access = urlparse.parse_qs(token_resp.text)
return access['oauth_token'][0], access['userID'][0]
def publish(context):
"""Saves changes and sends them to GitHub"""
header('Recording changes...')
run('git add -A')
header('displaying changes...')
run('git -c color.status=always status')
if not click.confirm('\nContinue publishing'):
run('git reset HEAD --')
abort(context)
header('Saving changes...')
try:
run('git commit -m "{message}"'.format(
message='Publishing {}'.format(choose_commit_emoji())
), capture=True)
except subprocess.CalledProcessError as e:
if 'nothing to commit' not in e.stdout:
raise
else:
click.echo('nothing to commit.')
header('Pushing to GitHub...')
branch = get_branch()
run('git push origin {branch}:{branch}'.format(branch=branch))
pr_link = get_pr_link(branch)
if pr_link:
click.launch(pr_link)
def write(context):
"""Starts a new article"""
config = context.obj
title = click.prompt('Title')
author = click.prompt('Author', default=config.get('DEFAULT_AUTHOR'))
slug = slugify(title)
creation_date = datetime.Now()
basename = '{:%Y-%m-%d}_{}.md'.format(creation_date, slug)
Meta = (
('Title', title),
('Date', '{:%Y-%m-%d %H:%M}:00'.format(creation_date)),
('Modified',
('Author', author),
)
file_content = ''
for key, value in Meta:
file_content += '{}: {}\n'.format(key, value)
file_content += '\n\n'
file_content += 'Text...\n\n'
file_content += '![image description]({filename}/images/my-photo.jpg)\n\n'
file_content += 'Text...\n\n'
os.makedirs(config['CONTENT_DIR'], exist_ok=True)
path = os.path.join(config['CONTENT_DIR'], basename)
with click.open_file(path, 'w') as f:
f.write(file_content)
click.echo(path)
click.launch(path)
def compile_note(note, outdir, watch=False, view=False, style=None, templ='default', ignore_missing=False, comments=False):
note = util.abs_path(note)
f = partial(compile.compile_note,
outdir=outdir,
templ=templ,
stylesheet=style,
ignore_missing=ignore_missing,
comments=comments,
preview=True)
outpath = f(note)
if view:
click.launch(outpath)
if watch:
watch_note(note, f)
return outpath
def login(token, username, password):
"""Log into polyaxon."""
auth_client = polyaxonClients().auth
if username:
# Use username / password login
if not password:
password = click.prompt('Please enter your password', type=str, hide_input=True)
password = password.strip()
if not password:
logger.info('You entered an empty string. '
'Please make sure you enter your password correctly.')
sys.exit(1)
credentials = CredentialsConfig(username=username, password=password)
try:
access_code = auth_client.login(credentials=credentials)
except (polyaxonHTTPError, polyaxonShouldExitError) as e:
Printer.print_error('Could not login.')
Printer.print_error(e)
sys.exit(1)
if not access_code:
Printer.print_error("Failed to login")
return
else:
if not token:
cli_info_url = "{}/users/token".format(auth_client.http_host)
click.confirm('Authentication token page will Now open in your browser. Continue?',
abort=True, default=True)
click.launch(cli_info_url)
logger.info("Please copy and paste the authentication token.")
access_code = click.prompt('This is an invisible field. Paste token and press ENTER',
type=str, hide_input=True)
if not access_code:
logger.info("Empty token received. "
"Make sure your shell is handling the token appropriately.")
logger.info("See docs for help: http://docs.polyaxon.com/faqs/authentication/")
return
access_code = access_code.strip(" ")
try:
user = polyaxonClients().auth.get_user(token=access_code)
except (polyaxonHTTPError, polyaxonShouldExitError) as e:
Printer.print_error('Could not load user info.')
Printer.print_error('Error message `{}`.'.format(e))
sys.exit(1)
access_token = AccesstokenConfig(username=user.username, token=access_code)
AuthConfigManager.set_config(access_token)
Printer.print_success("Login Successful")
def view(visualization_path, index_extension):
# Guard headless envs from having to import anything large
import sys
if not os.getenv("disPLAY") and sys.platform != "darwin":
raise click.UsageError(
'Visualization viewing is currently not supported in headless '
'environments. You can view Visualizations (and Artifacts) at '
'https://view.qiime2.org,or move the Visualization to an '
'environment with a display and view it with `qiime tools view`.')
import zipfile
import qiime2.sdk
if index_extension.startswith('.'):
index_extension = index_extension[1:]
try:
visualization = qiime2.sdk.Visualization.load(visualization_path)
# Todo: currently a KeyError is raised if a zipped file that is not a
# QIIME 2 result is passed. This should be handled better by the framework.
except (zipfile.BadZipFile, KeyError, TypeError):
raise click.BadParameter(
'%s is not a QIIME 2 Visualization. Only QIIME 2 Visualizations '
'can be viewed.' % visualization_path)
index_paths = visualization.get_index_paths(relative=False)
if index_extension not in index_paths:
raise click.BadParameter(
'No index %s file with is present in the archive. Available index '
'extensions are: %s' % (index_extension,
','.join(index_paths.keys())))
else:
index_path = index_paths[index_extension]
launch_status = click.launch(index_path)
if launch_status != 0:
click.echo('Viewing visualization Failed while attempting to '
'open %s' % index_path, err=True)
else:
while True:
click.echo(
"Press the 'q' key,Control-C,or Control-D to quit. This "
"view may no longer be accessible or work correctly after "
"quitting.", nl=False)
# There is currently a bug in click.getchar where translation
# of Control-C and Control-D into KeyboardInterrupt and
# EOFError (respectively) does not work on Python 3. The code
# here should continue to work as expected when the bug is
# fixed in Click.
#
# https://github.com/pallets/click/issues/583
try:
char = click.getchar()
click.echo()
if char in {'q', '\x03', '\x04'}:
break
except (KeyboardInterrupt, EOFError):
break
def photos(context, path):
"""Adds images to the last article"""
config = context.obj
header('Looking for the latest article...')
article_filename = find_last_article(config['CONTENT_DIR'])
if not article_filename:
return click.secho('No articles.', fg='red')
click.echo(os.path.basename(article_filename))
header('Looking for images...')
images = list(find_images(path))
if not images:
return click.secho('Found no images.', fg='red')
for filename in images:
click.secho(filename, fg='green')
if not click.confirm('\nAdd these images to the latest article'):
abort(config)
url_prefix = os.path.join('{filename}', IMAGES_PATH)
images_dir = os.path.join(config['CONTENT_DIR'], IMAGES_PATH)
os.makedirs(images_dir, exist_ok=True)
header('Processing images...')
urls = []
for filename in images:
image_basename = os.path.basename(filename).replace(' ', '-').lower()
urls.append(os.path.join(url_prefix, image_basename))
image_filename = os.path.join(images_dir, image_basename)
print(filename, image_filename)
import_image(filename, image_filename)
content = '\n'
for url in urls:
url = url.replace('\\', '/')
content += '\n![image description]({})\n'.format(url)
header('Adding to article: {}'.format(article_filename))
with click.open_file(article_filename, 'a') as f:
f.write(content)
click.launch(article_filename)