Python os 模块-symlink() 实例源码

Python os 模块,symlink() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.symlink()

项目:acbs    作者:AOSC-Dev    | 项目源码 | 文件源码
def src_proc_dispatcher(pkg_name, src_tbl_name, src_loc):
    tobj = tempfile.mkdtemp(dir='/var/cache/acbs/build/', prefix='acbs.')
    src_tbl_loc = os.path.join(src_loc, src_tbl_name)
    shadow_ark_loc = os.path.join(tobj, src_tbl_name)
    if os.path.isdir(src_tbl_loc):
        print('[I] Making a copy of the source directory...', end='')
        try:
            shutil.copytree(src=src_tbl_loc, dst=shadow_ark_loc)
        except:
            print('Failed!')
            return False
        print('Done!')
        return True, tobj
    else:
        os.symlink(src_tbl_loc, shadow_ark_loc)
        # print('[D] Source location: {},Shadow link: {}'.format(src_tbl_loc,shadow_ark_loc))
        return decomp_file(shadow_ark_loc, tobj), tobj
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def makelink(self, tarinfo, targetpath):
        """Make a (symbolic) link called targetpath. If it cannot be created
          (platform limitation),we try to make a copy of the referenced file
          instead of a link.
        """
        try:
            # For systems that support symbolic and hard links.
            if tarinfo.issym():
                os.symlink(tarinfo.linkname, targetpath)
            else:
                # See extract().
                if os.path.exists(tarinfo._link_target):
                    os.link(tarinfo._link_target, targetpath)
                else:
                    self._extract_member(self._find_link_target(tarinfo),
                                         targetpath)
        except symlink_exception:
            try:
                self._extract_member(self._find_link_target(tarinfo),
                                     targetpath)
            except KeyError:
                raise ExtractError("unable to resolve link inside archive")
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def makelink(self,we try to make a copy of the referenced file
          instead of a link.
        """
        if hasattr(os, "symlink") and hasattr(os, "link"):
            # For systems that support symbolic and hard links.
            if tarinfo.issym():
                if os.path.lexists(targetpath):
                    os.unlink(targetpath)
                os.symlink(tarinfo.linkname, targetpath)
            else:
                # See extract().
                if os.path.exists(tarinfo._link_target):
                    if os.path.lexists(targetpath):
                        os.unlink(targetpath)
                    os.link(tarinfo._link_target, targetpath)
        else:
            try:
                self._extract_member(self._find_link_target(tarinfo), targetpath)
            except KeyError:
                raise ExtractError("unable to resolve link inside archive")
项目:myDotFiles    作者:GuidoFe    | 项目源码 | 文件源码
def execute(self):
        from ranger.container.file import File

        new_path = self.rest(1)
        cf = self.fm.thisfile

        if not new_path:
            return self.fm.notify('Syntax: relink <newpath>', bad=True)

        if not cf.is_link:
            return self.fm.notify('%s is not a symlink!' % cf.relative_path, bad=True)

        if new_path == os.readlink(cf.path):
            return

        try:
            os.remove(cf.path)
            os.symlink(new_path, cf.path)
        except OSError as err:
            self.fm.notify(err)

        self.fm.reset()
        self.fm.thisdir.pointed_obj = cf
        self.fm.thisfile = cf
项目:factotum    作者:Denubis    | 项目源码 | 文件源码
def copytree(src, dst, symlinks = False, ignore = None):
    if not os.path.exists(dst):
        os.makedirs(dst)
        shutil.copystat(src, dst)
    lst = os.listdir(src)
    if ignore:
        excl = ignore(src, lst)
        lst = [x for x in lst if x not in excl]
    for item in lst:
        s = os.path.join(src, item)
        d = os.path.join(dst, item)
        if symlinks and os.path.islink(s):
            if os.path.lexists(d):
                os.remove(d)
            os.symlink(os.readlink(s), d)
            try:
                st = os.lstat(s)
                mode = stat.S_IMODE(st.st_mode)
                os.lchmod(d, mode)
            except:
                pass # lchmod not available
        elif os.path.isdir(s):
            copytree(s, d, symlinks, ignore)
        else:
            shutil.copy2(s, d)
项目:burrowbeat    作者:Goomzee    | 项目源码 | 文件源码
def setUp(self):
        self.template_env = jinja2.Environment(
            loader=jinja2.FileSystemLoader("config")
        )

        # create working dir
        self.working_dir = os.path.join(build_path + "run", self.id())
        if os.path.exists(self.working_dir):
            shutil.rmtree(self.working_dir)
        os.makedirs(self.working_dir)

        try:
            # update the last_run link
            if os.path.islink(build_path + "last_run"):
                os.unlink(build_path + "last_run")
            os.symlink(build_path + "run/{}".format(self.id()),
                       build_path + "last_run")
        except:
            # symlink is best effort and can fail when 
            # running tests in parallel
            pass
项目:BookCloud    作者:livro-aberto    | 项目源码 | 文件源码
def clone(self, name, user):
        """
        create a clone of self with a given name,owned by user
        """
        self.expiration = None
        # create the branch on the database
        new_branch = Branch(name, self.project, self, user)
        db.session.add(new_branch)
        db.session.commit()
        # clone repository in file system
        branch_path = os.path.abspath(join(os.getcwd(), 'repos',
                                           self.project.name,
                                           name, 'source'))
        self.get_repo().clone(branch_path, branch=self.name)
        os.symlink(os.path.abspath(join('repos', self.project.name,
                                        '_resources/low_resolution')),
                   join(branch_path, '_resources'))
        branch_repo = git.Repo(branch_path)
        branch_repo.git.checkout('HEAD', b=name)
        config_repo(branch_repo, user.username, user.email)
        # build the source
        new_branch.build(timeout=60)
        return new_branch
项目:otest    作者:rohe    | 项目源码 | 文件源码
def mk_tar_dir(issuer, test_profile):
    wd = os.getcwd()

    # Make sure there is a tar directory
    tardirname = wd
    for part in ["tar", issuer, test_profile]:
        tardirname = os.path.join(tardirname, part)
        if not os.path.isdir(tardirname):
            os.mkdir(tardirname)

    # Now walk through the log directory and make symlinks from
    # the log files to links in the tar directory
    logdirname = os.path.join(wd, "log", test_profile)
    for item in os.listdir(logdirname):
        if item.startswith("."):
            continue

        ln = os.path.join(logdirname, item)
        tn = os.path.join(tardirname, "{}.txt".format(item))

        if os.path.isfile(tn):
            os.unlink(tn)

        if not os.path.islink(tn):
            os.symlink(ln, tn)
项目:bob.bio.base    作者:bioidiap    | 项目源码 | 文件源码
def close_compressed(filename, hdf5_file, compression_type='bz2', create_link=False):
  """Closes the compressed hdf5_file that was opened with open_compressed.
  When the file was opened for writing (using the 'w' flag in open_compressed),the created HDF5 file is compressed into the given file name.
  To be able to read the data using the real tools,a link with the correct extension might is created,when create_link is set to True.
  """
  hdf5_file_name = hdf5_file.filename
  is_writable = hdf5_file.writable
  hdf5_file.close()

  if is_writable:
    # create compressed tar file
    tar = tarfile.open(filename, mode="w:" + compression_type)
    tar.add(hdf5_file_name, os.path.basename(filename))
    tar.close()

  if create_link:
    extension = {'': '.tar', 'bz2': '.tar.bz2',
                 'gz': 'tar.gz'}[compression_type]
    link_file = filename + extension
    if not os.path.exists(link_file):
      os.symlink(os.path.basename(filename), link_file)

  # clean up locally generated files
  os.remove(hdf5_file_name)
项目:CycleGAN-Tensorflow-PyTorch-Simple    作者:LynnHo    | 项目源码 | 文件源码
def reorganize(dataset_dir):
    dirs = {}
    dirs['trainA'] = os.path.join(dataset_dir, 'link_trainA')
    dirs['trainB'] = os.path.join(dataset_dir, 'link_trainB')
    dirs['testA'] = os.path.join(dataset_dir, 'link_testA')
    dirs['testB'] = os.path.join(dataset_dir, 'link_testB')
    mkdir(dirs.values())

    for key in dirs:
        try:
            os.remove(os.path.join(dirs[key], '0'))
        except:
            pass
        os.symlink(os.path.abspath(os.path.join(dataset_dir, key)),
                   os.path.join(dirs[key], '0'))

    return dirs
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def build_same_files_and_dirs(tmpdir, source_is_symlink, dest_is_symlink, use_files):
    """Build temporary files or directories to test indication of same source and destination.

    :param bool source_is_symlink: Should the source be a symlink to the destination (both cannot be True)
    :param bool dest is symlink: Should the destination be a symlink to the source (both cannot be True)
    :param bool use_files: Should files be created (if False,directories are used instead)
    """
    if use_files:
        real = tmpdir.join('real')
        real.write('some data')
    else:
        real = tmpdir.mkdir('real')
    link = tmpdir.join('link')

    if source_is_symlink or dest_is_symlink:
        os.symlink(str(real), str(link))

        if source_is_symlink:
            return str(link), str(real)
        elif dest_is_symlink:
            return str(real), str(link)

    return str(real), str(real)
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def test_process_single_file_destination_is_symlink_to_source(
        tmpdir,
        patch_process_single_operation,
        standard_handler
):
    source = tmpdir.join('source')
    source.write('some data')
    destination = str(tmpdir.join('destination'))
    os.symlink(str(source), destination)

    with patch('aws_encryption_sdk_cli.internal.io_handling.open', create=True) as mock_open:
        standard_handler.process_single_file(
            stream_args=sentinel.stream_args,
            source=str(source),
            destination=destination
        )

    assert not mock_open.called
    assert not patch_process_single_operation.called
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def test_file_to_file_cycle_target_through_symlink(tmpdir):
    plaintext = tmpdir.join('source_plaintext')
    output_dir = tmpdir.mkdir('output')
    os.symlink(str(output_dir), str(tmpdir.join('output_link')))
    ciphertext = tmpdir.join('output_link', 'ciphertext')
    decrypted = tmpdir.join('decrypted')
    with open(str(plaintext), 'wb') as f:
        f.write(os.urandom(1024))

    encrypt_args = encrypt_args_template().format(
        source=str(plaintext),
        target=str(ciphertext)
    )
    decrypt_args = decrypt_args_template().format(
        source=str(ciphertext),
        target=str(decrypted)
    )

    aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))
    aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows()))

    assert filecmp.cmp(str(plaintext), str(decrypted))
项目:cthulhu    作者:sholsapp    | 项目源码 | 文件源码
def render(self):
        # Render all instances
        for instance in self.instances:
            instance.render()
        # Render human readable links to each instance
        for instance in self.instances:
            try:
                os.symlink(instance.node_root, os.path.join(
                    self.fixture_root, instance.instance_name))
            except Exception:
                log.exception('Failed to create symlink to %s',
                              instance.instance_name)
        # Render the master control script
        self.write_file(self.fixture_control,
                        self.render_control(), perm=0o755)

        # Render the fixture specification file
        self.write_file(self.fixture_spec,
                        self.render_spec())
项目:cthulhu    作者:sholsapp    | 项目源码 | 文件源码
def run(self):
        venv_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'venv', 'cthulhu')
        venv_cmd = [
            'virtualenv',
            venv_path
        ]
        print('Creating virtual environment in ', venv_path)
        subprocess.check_call(venv_cmd)
        print('Linking `activate` to top level of project.\n')
        print('To activate,simply run `source activate`.')
        try:
            os.symlink(
                os.path.join(venv_path, 'bin', 'activate'),
                os.path.join(os.path.dirname(os.path.abspath(__file__)), 'activate')
            )
        except OSError:
            print('Unable to create symlink,you may have a stale symlink from a previous invocation.')
项目:zoocore    作者:dsparrow27    | 项目源码 | 文件源码
def symlink_ms(source, linkname):
        """Python 2 doesn't have os.symlink on windows so we do it ourselfs

        :param source: sourceFile
        :type source: str
        :param linkname: symlink path
        :type linkname: str
        :raises: WindowsError,raises when it fails to create the symlink if the user permissions
         are incorrect
        """
        import ctypes
        csl = ctypes.windll.kernel32.CreateSymbolicLinkW
        csl.argtypes = (ctypes.c_wchar_p, ctypes.c_wchar_p, ctypes.c_uint32)
        csl.restype = ctypes.c_ubyte
        flags = 1 if os.path.isdir(source) else 0
        try:
            if csl(linkname, source.replace('/', '\\'), flags) == 0:
                raise ctypes.WinError()
        except WindowsError:
            raise WindowsError("Failed to create symbolicLink due to user permissions")
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def symlinks_supported():
    """
    A function to check if creating symlinks are supported in the
    host platform and/or if they are allowed to be created (e.g.
    on Windows it requires admin permissions).
    """
    tmpdir = tempfile.mkdtemp()
    original_path = os.path.join(tmpdir, 'original')
    symlink_path = os.path.join(tmpdir, 'symlink')
    os.makedirs(original_path)
    try:
        os.symlink(original_path, symlink_path)
        supported = True
    except (OSError, NotImplementedError, AttributeError):
        supported = False
    else:
        os.remove(symlink_path)
    finally:
        os.rmdir(original_path)
        os.rmdir(tmpdir)
        return supported
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def _find_link_target(self, tarinfo):
        """Find the target member of a symlink or hardlink member in the
           archive.
        """
        if tarinfo.issym():
            # Always search the entire archive.
            linkname = os.path.dirname(tarinfo.name) + "/" + tarinfo.linkname
            limit = None
        else:
            # Search the archive before the link,because a hard link is
            # just a reference to an already archived file.
            linkname = tarinfo.linkname
            limit = tarinfo

        member = self._getmember(linkname, tarinfo=limit, normalize=True)
        if member is None:
            raise KeyError("linkname %r not found" % linkname)
        return member
项目:sv-mgr    作者:hristoast    | 项目源码 | 文件源码
def enable_sv(sv, sv_dir, runsvdir):
    """
    Enable the specified service,'sv'.

    Assemble 'sv_path' and 'service_path' from 'sv','sv_dir',and 'runsvdir'.
    Make a symlink from 'sv_path' to 'service_path' and bail if we get one
    of a couple different exceptions.
    """
    sv_path = os.path.join(sv_dir, sv)
    service_path = os.path.join(runsvdir, sv)
    try:
        check_sv_path(sv_path)
        os.symlink(sv_path, service_path)
        return True
    except NoSuchSvError:
        raise NoSuchSvError
    except PermissionError:
        raise NeedSudoError
    except FileExistsError:
        raise SvAlreadyEnabledError
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _find_link_target(self, normalize=True)
        if member is None:
            raise KeyError("linkname %r not found" % linkname)
        return member
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _find_link_target(self, normalize=True)
        if member is None:
            raise KeyError("linkname %r not found" % linkname)
        return member
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def makelink(self, targetpath)
            except KeyError:
                raise ExtractError("unable to resolve link inside archive")
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _find_link_target(self, tarinfo):
        """Find the target member of a symlink or hardlink member in the
           archive.
        """
        if tarinfo.issym():
            # Always search the entire archive.
            linkname = "/".join(filter(None, (os.path.dirname(tarinfo.name), tarinfo.linkname)))
            limit = None
        else:
            # Search the archive before the link, normalize=True)
        if member is None:
            raise KeyError("linkname %r not found" % linkname)
        return member
项目:cheat-ext    作者:chhsiao90    | 项目源码 | 文件源码
def link(repo):
    cheat_dir = get_cheat_path()
    sheet_dir = get_sheet_path(repo)
    if not os.path.isdir(sheet_dir):
        raise CheatExtException(
            "%s hadn't been installed yet at %s" % (repo, sheet_dir))

    sheets = get_available_sheets_at(sheet_dir)
    state_sheets = get_sheets_with_state(cheat_dir, sheet_dir, sheets)

    _check_sheets_availability(state_sheets)

    for sheet, _ in filter_by_state(STATE_UNLINK, state_sheets):
        os.symlink(
            os.path.join(sheet_dir, sheet),
            os.path.join(cheat_dir, sheet))
项目:defcon-2017-tools    作者:david942j    | 项目源码 | 文件源码
def gen_stream(self, i):
        cwd = os.getcwd()+'/'
        res, prob_id = self.gen_data(i)
        if res == None or prob_id == None:
            return
        path = os.path.join('stream', prob_id)
        path2 = os.path.join('json', 'todo')
        path2 = os.path.join(d[prob_id], path2)
        data = concat_data(res)
        md5 = hashlib.md5(data).hexdigest()
        outfname = os.path.join(path, self.basename + '_' + md5 + '.json')
        if (md5 + '.json') in fset[prob_id]:
            logging.info('Skip %s due to same' % md5)
            return
        logging.info('Save Packet Stream: %s' % outfname)
        fset[prob_id].add(md5 + '.json')
        f = file(outfname, 'w')
        json.dump(res, f)
        outfname2 = os.path.join(path2, self.basename + '_' + md5 + '.json')
        outfname = cwd+outfname
        outfname2 = cwd+outfname2
        os.symlink(outfname,outfname2)
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def _find_link_target(self, normalize=True)
        if member is None:
            raise KeyError("linkname %r not found" % linkname)
        return member
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def _find_link_target(self, normalize=True)
        if member is None:
            raise KeyError("linkname %r not found" % linkname)
        return member
项目:plotnine    作者:has2k1    | 项目源码 | 文件源码
def link_to_tutorials():
    # Linking to the directory does not work well with
    # nbsphinx. We link to the files themselves
    from glob import glob
    from plotnine_examples.tutorials import TUTPATH

    dest_dir = os.path.join(CUR_PATH, 'tutorials')

    # Unlink files from previous build
    for old_file in glob(dest_dir + '/*.ipynb'):
        os.unlink(old_file)

    # Link files for this build
    for file in glob(TUTPATH + '/*.ipynb'):
        basename = os.path.basename(file)
        dest = os.path.join(dest_dir, basename)
        os.symlink(file, dest)
项目:vsphere-storage-for-docker    作者:vmware    | 项目源码 | 文件源码
def config_mv(args):
    """[Not Supported Yet]
    Relocate config DB from its current location
    :return: None for success,string for error
    """

    if not args.force:
        return err_out(DB_REF + " move to {} ".format(args.to) +
                       "requires '--force' flag to execute the request.")

    # TODO:
    # this is pure convenience code,so it  is very low priority; still,here are the steps:
    # checks if target exists upfront,and fail if it does
    # cp the DB instance 'to',and flip the symlink.
    # refresh service (not really needed as next vmci_command handlers will pick it up)
    # need --dryrun or --confirm
    # issue: works really with discovery only,as others need to find it out

    printMessage(args.output_format, "Sorry,configuration move ('config mv' command) is not supported yet")
    return None
项目:ascii-art-py    作者:blinglnav    | 项目源码 | 文件源码
def _find_link_target(self, normalize=True)
        if member is None:
            raise KeyError("linkname %r not found" % linkname)
        return member
项目:firecloud_developer_toolkit    作者:broadinstitute    | 项目源码 | 文件源码
def safe_make_symlink(input_file_path,output_file_path):
    output_file_dir = os.path.dirname(output_file_path)
    # Verify the input file is actually there
    if not os.path.exists(input_file_path):
        raise Exception("can't find file %s"%input_file_path)
    safe_make_dirs(output_file_dir)
    try:
        os.symlink(input_file_path,output_file_path)
    except OSError as err:
        if err.errno == errno.EEXIST:
            # link already exists,check that it is identical to the one we are trying to put down
            old = os.path.realpath(input_file_path)
            new = os.path.realpath(output_file_path)
            if old != new:
                raise Exception('Existing file is different than the new symlink')
        else:
            raise
项目:firecloud_developer_toolkit    作者:broadinstitute    | 项目源码 | 文件源码
def pipetteSynchronousRunner(args):
    full_script_path = os.path.abspath(args[0])
    scriptDir = os.path.dirname(full_script_path)    

    communicationDirBase = args[1]
    fh_outdir = args[2]
    pipelineCmdStr = ' '.join(args[3:])

    if os.path.exists(communicationDirBase):
        fns = os.listdir(communicationDirBase)
        if len(fns)>0:
            raise Exception('For running single pipelines,comm_dir must start empty')
    if communicationDirBase not in pipelineCmdStr:
        raise Exception('Pipeline script must accept the comm_dir as an argument')

    os.symlink(fh_outdir,os.path.join(communicationDirBase,'firehose_outdir'))

    # run the pipeline,write its description to communicationDirBase/launch
    subprocess.check_call(pipelineCmdStr,shell=True)

    runMode = 'runone'
    retryMode = 'False'
    main = pipetteServer.Main()
    main.run_server(communicationDirBase, scriptDir, runMode, retryMode)
    # if one or more pipelines fail,an exception will be thrown once all that can be run has been attempted.
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def add_gstreamer_packages():
    import os
    import sys
    from distutils.sysconfig import get_python_lib

    dest_dir = get_python_lib()

    packages = ['gobject', 'glib', 'pygst', 'pygst.pyc', 'pygst.pth',
                'gst-0.10', 'pygtk.pth', 'pygtk.py', 'pygtk.pyc']

    python_version = sys.version[:3]
    global_path = os.path.join('/usr/lib', 'python' + python_version)
    global_sitepackages = [os.path.join(global_path,
                                        'dist-packages'),  # for Debian-based
                           os.path.join(global_path,
                                        'site-packages')]  # for others

    for package in packages:
        for pack_dir in global_sitepackages:
            src = os.path.join(pack_dir, package)
            dest = os.path.join(dest_dir, package)
            if not os.path.exists(dest) and os.path.exists(src):
                os.symlink(src, dest)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def manually_disable_aa_profile(self):
        """
        Manually disable an apparmor profile.

        If aa-profile-mode is set to disabled (default) this is required as the
        template has been written but apparmor is yet unaware of the profile
        and aa-disable aa-profile fails. Without this the profile would kick
        into enforce mode on the next service restart.

        """
        profile_path = '/etc/apparmor.d'
        disable_path = '/etc/apparmor.d/disable'
        if not os.path.lexists(os.path.join(disable_path, self.aa_profile)):
            os.symlink(os.path.join(profile_path, self.aa_profile),
                       os.path.join(disable_path, self.aa_profile))
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def acquire(self, timeout=None):
        # Hopefully unnecessary for symlink.
        # try:
        #     open(self.unique_name,"wb").close()
        # except IOError:
        #     raise LockFailed("failed to create %s" % self.unique_name)
        timeout = timeout if timeout is not None else self.timeout
        end_time = time.time()
        if timeout is not None and timeout > 0:
            end_time += timeout

        while True:
            # Try and create a symbolic link to it.
            try:
                os.symlink(self.unique_name, self.lock_file)
            except OSError:
                # Link creation failed.  Maybe we've double-locked?
                if self.i_am_locking():
                    # Linked to out unique name. Proceed.
                    return
                else:
                    # Otherwise the lock creation failed.
                    if timeout is not None and time.time() > end_time:
                        if timeout > 0:
                            raise LockTimeout("Timeout waiting to acquire"
                                              " lock for %s" %
                                              self.path)
                        else:
                            raise AlreadyLocked("%s is already locked" %
                                                self.path)
                    time.sleep(timeout / 10 if timeout is not None else 0.1)
            else:
                # Link creation succeeded.  We're good to go.
                return
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def makelink(self,
                                         targetpath)
        except symlink_exception:
            if tarinfo.issym():
                linkpath = os.path.join(os.path.dirname(tarinfo.name),
                                        tarinfo.linkname)
            else:
                linkpath = tarinfo.linkname
        else:
            try:
                self._extract_member(self._find_link_target(tarinfo),
                                     targetpath)
            except KeyError:
                raise ExtractError("unable to resolve link inside archive")
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _find_link_target(self, normalize=True)
        if member is None:
            raise KeyError("linkname %r not found" % linkname)
        return member
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def move(src, dst):
    """Recursively move a file or directory to another location. This is
    similar to the Unix "mv" command.

    If the destination is a directory or a symlink to a directory,the source
    is moved inside the directory. The destination path must not already
    exist.

    If the destination already exists but is not a directory,it may be
    overwritten depending on os.rename() semantics.

    If the destination is on our current filesystem,then rename() is used.
    Otherwise,src is copied to the destination and then removed.
    A lot more could be done here...  A look at a mv.c shows a lot of
    the issues this implementation glosses over.

    """
    real_dst = dst
    if os.path.isdir(dst):
        if _samefile(src, dst):
            # We might be on a case insensitive filesystem,
            # perform the rename anyway.
            os.rename(src, dst)
            return

        real_dst = os.path.join(dst, _basename(src))
        if os.path.exists(real_dst):
            raise Error("Destination path '%s' already exists" % real_dst)
    try:
        os.rename(src, real_dst)
    except OSError:
        if os.path.isdir(src):
            if _destinsrc(src, dst):
                raise Error("Cannot move a directory '%s' into itself '%s'." % (src, dst))
            copytree(src, real_dst, symlinks=True)
            rmtree(src)
        else:
            copy2(src, real_dst)
            os.unlink(src)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def copyfile(src, *, follow_symlinks=True):
    """Copy data from src to dst.

    If follow_symlinks is not set and src is a symbolic link,a new
    symlink will be created instead of copying the file it points to.

    """
    if _samefile(src, dst):
        raise SameFileError("{!r} and {!r} are the same file".format(src, dst))

    for fn in [src, dst]:
        try:
            st = os.stat(fn)
        except OSError:
            # File most likely does not exist
            pass
        else:
            # XXX What about other special files? (sockets,devices...)
            if stat.S_ISFIFO(st.st_mode):
                raise SpecialFileError("`%s` is a named pipe" % fn)

    if not follow_symlinks and os.path.islink(src):
        os.symlink(os.readlink(src), dst)
    else:
        with open(src, 'rb') as fsrc:
            with open(dst, 'wb') as fdst:
                copyfileobj(fsrc, fdst)
    return dst
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def acquire(self, self.lock_file)
            except OSError:
                # Link creation failed.  Maybe we've double-locked?
                if self.i_am_locking():
                    # Linked to out unique name. Proceed.
                    return
                else:
                    # Otherwise the lock creation failed.
                    if timeout is not None and time.time() > end_time:
                        if timeout > 0:
                            raise LockTimeout("Timeout waiting to acquire"
                                              " lock for %s" %
                                              self.path)
                        else:
                            raise AlreadyLocked("%s is already locked" %
                                                self.path)
                    time.sleep(timeout / 10 if timeout is not None else 0.1)
            else:
                # Link creation succeeded.  We're good to go.
                return
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def makelink(self,
                                     targetpath)
            except KeyError:
                raise ExtractError("unable to resolve link inside archive")
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def _find_link_target(self, normalize=True)
        if member is None:
            raise KeyError("linkname %r not found" % linkname)
        return member
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def move(src, real_dst)
            os.unlink(src)
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def test_ValgrindOption(self):
        # Make sure that valgrind exists and is in the path
        valgrind = scatest.which('valgrind')
        if not valgrind:
            raise RuntimeError('Valgrind is not installed')

        # Let the device manager find valgrind on the path
        self._test_Valgrind('')

        # Set an explicit path to valgrind,using a symbolic link to a non-path
        # location as an additional check
        altpath = os.path.join(scatest.getSdrPath(), 'valgrind')
        os.symlink(valgrind, altpath)

        # patch for ubuntu valgrind script
        ub_patch=False
        try:
           if 'UBUNTU' in platform.linux_distribution()[0].upper():
               ub_patch=True
               valgrind_bin = scatest.which('valgrind.bin')
               os.symlink(valgrind_bin, altpath+'.bin')
        except:
             pass

        try:
            self._test_Valgrind(altpath)
        finally:
            os.unlink(altpath)
            if ub_patch:
                os.unlink(altpath+'.bin')
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def updateLink(source, target):
    if os.path.islink(target):
        # Remove old,possibly stale link.
        os.unlink(target)
    if not os.path.exists(target):
        # Do not replace existing files.
        os.symlink(source, target)
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def symlink(self, source: str, dest: str, **kwargs):
        os.symlink(self.absolute(source), self.absolute(dest), **kwargs)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def manually_disable_aa_profile(self):
        """
        Manually disable an apparmor profile.

        If aa-profile-mode is set to disabled (default) this is required as the
        template has been written but apparmor is yet unaware of the profile
        and aa-disable aa-profile fails. Without this the profile would kick
        into enforce mode on the next service restart.

        """
        profile_path = '/etc/apparmor.d'
        disable_path = '/etc/apparmor.d/disable'
        if not os.path.lexists(os.path.join(disable_path, self.aa_profile))
项目:IgDiscover    作者:NBISweden    | 项目源码 | 文件源码
def relative_symlink(src, force=False):
    """
    Create a symbolic link in any directory.

    force -- if True,then overwrite an existing file/symlink
    """
    if force:
        try:
            os.remove(dst)
        except FileNotFoundError:
            pass
    target = os.path.relpath(os.path.abspath(src), start=os.path.dirname(dst))
    os.symlink(target, dst)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _find_link_target(self, normalize=True)
        if member is None:
            raise KeyError("linkname %r not found" % linkname)
        return member
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def move(src, _basename(src))
        if os.path.exists(real_dst):
            raise Error, "Destination path '%s' already exists" % real_dst
    try:
        os.rename(src, dst):
                raise Error, "Cannot move a directory '%s' into itself '%s'." % (src, dst)
            copytree(src, real_dst)
            os.unlink(src)

相关文章

Python setuptools.dep_util 模块,newer_pairwise_group() ...
Python chainer.utils.type_check 模块,eval() 实例源码 我...
Python chainer.utils.type_check 模块,prod() 实例源码 我...
Python chainer.utils.type_check 模块,expect() 实例源码 ...
Python multiprocessing.managers 模块,BaseProxy() 实例源...
Python multiprocessing.managers 模块,RemoteError() 实例...