Python marshal 模块-load() 实例源码

Python marshal 模块,load() 实例源码

我们从Python开源项目中,提取了以下50代码示例,用于说明如何使用marshal.load()

项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def __init__(self, fs_imp=None):
        # we're definitely going to be importing something in the future,
        # so let's just load the OS-related facilities.
        if not _os_stat:
            _os_bootstrap()

        # This is the Importer that we use for grabbing stuff from the
        # filesystem. It defines one more method (import_from_dir) for our use.
        if fs_imp is None:
            cls = self.clsFilesystemImporter or _FilesystemImporter
            fs_imp = cls()
        self.fs_imp = fs_imp

        # Initialize the set of suffixes that we recognize and import.
        # The default will import dynamic-load modules first,followed by
        # .py files (or a .py file's cached bytecode)
        for desc in imp.get_suffixes():
            if desc[2] == imp.C_EXTENSION:
                self.add_suffix(desc[0],
                                DynLoadSuffixImporter(desc).import_file)
        self.add_suffix('.py', py_suffix_importer)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _import_one(self, parent, modname, fqname):
        "Import a single module."

        # has the module already been imported?
        try:
            return sys.modules[fqname]
        except KeyError:
            pass

        # load the module's code,or fetch the module itself
        result = self.get_code(parent, fqname)
        if result is None:
            return None

        module = self._process_result(result, fqname)

        # insert the module into its parent
        if parent:
            setattr(parent, module)
        return module
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def py_suffix_importer(filename, finfo, fqname):
    file = filename[:-3] + _suffix
    t_py = long(finfo[8])
    t_pyc = _timestamp(file)

    code = None
    if t_pyc is not None and t_pyc >= t_py:
        f = open(file, 'rb')
        if f.read(4) == imp.get_magic():
            t = struct.unpack('<I', f.read(4))[0]
            if t == t_py:
                code = marshal.load(f)
        f.close()
    if code is None:
        file = filename
        code = _compile(file, t_py)

    return 0, code, { '__file__' : file }
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def load_stats(self, arg):
        if not arg:  self.stats = {}
        elif isinstance(arg, basestring):
            f = open(arg, 'rb')
            self.stats = marshal.load(f)
            f.close()
            try:
                file_stats = os.stat(arg)
                arg = time.ctime(file_stats.st_mtime) + "    " + arg
            except:  # in case this is not unix
                pass
            self.files = [ arg ]
        elif hasattr(arg, 'create_stats'):
            arg.create_stats()
            self.stats = arg.stats
            arg.stats = {}
        if not self.stats:
            raise TypeError,  "Cannot create or construct a %r object from '%r''" % (
                              self.__class__, arg)
        return
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def load_bytecode(self, f):
        """Loads bytecode from a file or file like object."""
        # make sure the magic header is correct
        magic = f.read(len(bc_magic))
        if magic != bc_magic:
            self.reset()
            return
        # the source code of the file changed,we need to reload
        checksum = pickle.load(f)
        if self.checksum != checksum:
            self.reset()
            return
        # if marshal_load fails then we need to reload
        try:
            self.code = marshal_load(f)
        except (EOFError, ValueError, TypeError):
            self.reset()
            return
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def load_bytecode(self, TypeError):
            self.reset()
            return
项目:sublime-text-3-packages    作者:nickjj    | 项目源码 | 文件源码
def load_bytecode(self, TypeError):
            self.reset()
            return
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def load_bytecode(self, TypeError):
            self.reset()
            return
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _write_pyc(state, co, source_stat, pyc):
    # Technically,we don't have to have the same pyc format as
    # (C)Python,since these "pycs" should never be seen by builtin
    # import. However,there's little reason deviate,and I hope
    # sometime to be able to use imp.load_compiled to load them. (See
    # the comment in load_module above.)
    try:
        fp = open(pyc, "wb")
    except IOError:
        err = sys.exc_info()[1].errno
        state.trace("error writing pyc file at %s: errno=%s" %(pyc, err))
        # we ignore any failure to write the cache file
        # there are many reasons,permission-denied,__pycache__ being a
        # file etc.
        return False
    try:
        fp.write(imp.get_magic())
        mtime = int(source_stat.mtime)
        size = source_stat.size & 0xFFFFFFFF
        fp.write(struct.pack("<ll", mtime, size))
        marshal.dump(co, fp)
    finally:
        fp.close()
    return True
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def __init__(self, py_suffix_importer)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _import_one(self, module)
        return module
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def get_code(self, fqname):
        if parent:
            # these modules definitely do not occur within a package context
            return None

        # look for the module
        if imp.is_builtin(modname):
            type = imp.C_BUILTIN
        elif imp.is_frozen(modname):
            type = imp.PY_FROZEN
        else:
            # not found
            return None

        # got it. Now load and return it.
        module = imp.load_module(modname, None, ('', '', type))
        return 0, module, { }


######################################################################
#
# Internal importer used for importing from the filesystem
#
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def py_suffix_importer(filename, { '__file__' : file }
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def load_stats(self, 'create_stats'):
            arg.create_stats()
            self.stats = arg.stats
            arg.stats = {}
        if not self.stats:
            raise TypeError("Cannot create or construct a %r object from %r"
                            % (self.__class__, arg))
        return
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def load_bytecode(self, TypeError):
            self.reset()
            return
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def load_bytecode(self, TypeError):
            self.reset()
            return
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
def load_bytecode(self, TypeError):
            self.reset()
            return
项目:RPoint    作者:george17-meet    | 项目源码 | 文件源码
def load_bytecode(self, TypeError):
            self.reset()
            return
项目:isni-reconcile    作者:cmh2166    | 项目源码 | 文件源码
def load_bytecode(self, TypeError):
            self.reset()
            return
项目:macos-st-packages    作者:zce    | 项目源码 | 文件源码
def load_bytecode(self, TypeError):
            self.reset()
            return
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def load_bytecode(self, TypeError):
            self.reset()
            return
项目:Indushell    作者:SecarmaLabs    | 项目源码 | 文件源码
def load_bytecode(self, TypeError):
            self.reset()
            return
项目:Liljimbo-chatbot    作者:chrisjim316    | 项目源码 | 文件源码
def load_bytecode(self, TypeError):
            self.reset()
            return
项目:bytecode_simplifier    作者:extremecoders-re    | 项目源码 | 文件源码
def process(ifile, ofile):
    logger.info('opening file ' + ifile)
    ifPtr = open(ifile, 'rb')
    header = ifPtr.read(8)
    if not header.startswith('\x03\xF3\x0D\x0A'):
        raise SystemExit('[!] Header mismatch. The input file is not a valid pyc file.')
    logger.info('Input pyc file header matched')
    logger.debug('Unmarshalling file')
    rootCodeObject = marshal.load(ifPtr)
    ifPtr.close()
    deob = parse_code_object(rootCodeObject)
    logger.info('Writing deobfuscated code object to disk')
    ofPtr = open(ofile, 'wb')
    ofPtr.write(header)
    marshal.dump(deob, ofPtr)
    ofPtr.close()
    logger.info('Success')
项目:flask_system    作者:prashasy    | 项目源码 | 文件源码
def load_bytecode(self, TypeError):
            self.reset()
            return
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_imp_module(self):
        # Verify that the imp module can correctly load and find .py files
        # XXX (ncoghlan): It would be nice to use support.CleanImport
        # here,but that breaks because the os module registers some
        # handlers in copy_reg on import. Since CleanImport doesn't
        # revert that registration,the module is left in a broken
        # state after reversion. Reinitialising the module contents
        # and just reverting os.environ to its prevIoUs state is an OK
        # workaround
        orig_path = os.path
        orig_getenv = os.getenv
        with EnvironmentvarGuard():
            x = imp.find_module("os")
            self.addCleanup(x[0].close)
            new_os = imp.load_module("os", *x)
            self.assertIs(os, new_os)
            self.assertIs(orig_path, new_os.path)
            self.assertIsNot(orig_getenv, new_os.getenv)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_foreign_code(self):
        py_compile.compile(self.file_name)
        with open(self.compiled_name, "rb") as f:
            header = f.read(8)
            code = marshal.load(f)
        constants = list(code.co_consts)
        foreign_code = test_main.__code__
        pos = constants.index(1)
        constants[pos] = foreign_code
        code = type(code)(code.co_argcount, code.co_kwonlyargcount,
                          code.co_nlocals, code.co_stacksize,
                          code.co_flags, code.co_code, tuple(constants),
                          code.co_names, code.co_varnames, code.co_filename,
                          code.co_name, code.co_firstlineno, code.co_lnotab,
                          code.co_freevars, code.co_cellvars)
        with open(self.compiled_name, "wb") as f:
            f.write(header)
            marshal.dump(code, f)
        mod = self.import_module()
        self.assertEqual(mod.constant.co_filename, foreign_code.co_filename)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_recursion_limit(self):
        # Create a deeply nested structure.
        head = last = []
        # The max stack depth should match the value in Python/marshal.c.
        if os.name == 'nt' and hasattr(sys, 'gettotalrefcount'):
            MAX_MARSHAL_STACK_DEPTH = 1500
        else:
            MAX_MARSHAL_STACK_DEPTH = 2000
        for i in range(MAX_MARSHAL_STACK_DEPTH - 2):
            last.append([0])
            last = last[-1]

        # Verify we don't blow out the stack with dumps/load.
        data = marshal.dumps(head)
        new_head = marshal.loads(data)
        # Don't use == to compare objects,it can exceed the recursion limit.
        self.assertEqual(len(new_head), len(head))
        self.assertEqual(len(new_head[0]), len(head[0]))
        self.assertEqual(len(new_head[-1]), len(head[-1]))

        last.append([0])
        self.assertRaises(ValueError, marshal.dumps, head)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_multiple_dumps_and_loads(self):
        # Issue 12291: marshal.load() should be callable multiple times
        # with interleaved data written by non-marshal code
        # Adapted from a patch by Engelbert Gruber.
        data = (1, 'abc', b'def', 1.0, (2, 'a', ['b', b'c']))
        for interleaved in (b'', b'0123'):
            ilen = len(interleaved)
            positions = []
            try:
                with open(support.TESTFN, 'wb') as f:
                    for d in data:
                        marshal.dump(d, f)
                        if ilen:
                            f.write(interleaved)
                        positions.append(f.tell())
                with open(support.TESTFN, 'rb') as f:
                    for i, d in enumerate(data):
                        self.assertEqual(d, marshal.load(f))
                        if ilen:
                            f.read(ilen)
                        self.assertEqual(positions[i], f.tell())
            finally:
                support.unlink(support.TESTFN)
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def load_bytecode(self, TypeError):
            self.reset()
            return
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def load_bytecode(self, TypeError):
            self.reset()
            return
项目:tellmeabout.coffee    作者:billyfung    | 项目源码 | 文件源码
def load_bytecode(self, TypeError):
            self.reset()
            return
项目:FileStoreGAE    作者:liantian-cn    | 项目源码 | 文件源码
def load_bytecode(self, TypeError):
            self.reset()
            return
项目:bawk    作者:jttwnsnd    | 项目源码 | 文件源码
def load_bytecode(self, TypeError):
            self.reset()
            return
项目:infinite-lorem-ipsum    作者:patjm1992    | 项目源码 | 文件源码
def load_bytecode(self, TypeError):
            self.reset()
            return
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def load_bytecode(self, TypeError):
            self.reset()
            return
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def scan_module(egg_dir, base, name, stubs):
    """Check whether module possibly uses unsafe-for-zipfile stuff"""

    filename = os.path.join(base, name)
    if filename[:-1] in stubs:
        return True  # Extension module
    pkg = base[len(egg_dir) + 1:].replace(os.sep, '.')
    module = pkg + (pkg and '.' or '') + os.path.splitext(name)[0]
    if sys.version_info < (3, 3):
        skip = 8  # skip magic & date
    else:
        skip = 12  # skip magic & date & file size
    f = open(filename, 'rb')
    f.read(skip)
    code = marshal.load(f)
    f.close()
    safe = True
    symbols = dict.fromkeys(iter_symbols(code))
    for bad in ['__file__', '__path__']:
        if bad in symbols:
            log.warn("%s: module references %s", bad)
            safe = False
    if 'inspect' in symbols:
        for bad in [
            'getsource', 'getabsfile', 'getsourcefile', 'getfile'
            'getsourcelines', 'findsource', 'getcomments', 'getframeinfo',
            'getinnerframes', 'getouterframes', 'stack', 'trace'
        ]:
            if bad in symbols:
                log.warn("%s: module MAY be using inspect.%s", bad)
                safe = False
    return safe
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def get_module_constant(module, symbol, default=-1, paths=None):
    """Find 'module' by searching 'paths',and extract 'symbol'

    Return 'None' if 'module' does not exist on 'paths',or it does not define
    'symbol'.  If the module defines 'symbol' as a constant,return the
    constant.  Otherwise,return 'default'."""

    try:
        f, path, (suffix, mode, kind) = find_module(module, paths)
    except ImportError:
        # Module doesn't exist
        return None

    try:
        if kind == PY_COMPILED:
            f.read(8)  # skip magic & date
            code = marshal.load(f)
        elif kind == PY_FROZEN:
            code = imp.get_frozen_object(module)
        elif kind == PY_SOURCE:
            code = compile(f.read(), 'exec')
        else:
            # Not something we can parse; we'll have to import it.  :(
            if module not in sys.modules:
                imp.load_module(module, f, kind))
            return getattr(sys.modules[module], None)

    finally:
        if f:
            f.close()

    return extract_constant(code, default)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def scan_module(egg_dir, bad)
                safe = False
    return safe
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def get_module_constant(module, default)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def read_code(stream):
    # This helper is needed in order for the PEP 302 emulation to
    # correctly handle compiled files
    import marshal

    magic = stream.read(4)
    if magic != imp.get_magic():
        return None

    stream.read(4) # Skip timestamp
    return marshal.load(stream)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _finish_import(self, top, parts, fromlist):
        # if "a.b.c" was provided,then load the ".b.c" portion down from
        # below the top-level module.
        bottom = self._load_tail(top, parts)

        # if the form is "import a.b.c",then return "a"
        if not fromlist:
            # no fromlist: return the top of the import tree
            return top

        # the top module was imported by self.
        #
        # this means that the bottom module was also imported by self (just
        # Now,or in the past and we fetched it from sys.modules).
        #
        # since we imported/handled the bottom module,this means that we can
        # also handle its fromlist (and reliably use __ispkg__).

        # if the bottom node is a package,then (potentially) import some
        # modules.
        #
        # note: if it is not a package,then "fromlist" refers to names in
        #       the bottom module rather than modules.
        # note: for a mix of names and modules in the fromlist,we will
        #       import all modules and insert those into the namespace of
        #       the package module. Python will pick up all fromlist names
        #       from the bottom (package) module; some will be modules that
        #       we imported and stored in the namespace,others are expected
        #       to be present already.
        if bottom.__ispkg__:
            self._import_fromlist(bottom, fromlist)

        # if the form is "from a.b import c,d" then return "b"
        return bottom
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def load_module(self, fqname, fp, pathname, file_info):
        suffix, type = file_info
        self.msgin(2, "load_module", fp and "fp", pathname)
        if type == imp.PKG_DIRECTORY:
            m = self.load_package(fqname, pathname)
            self.msgout(2, "load_module ->", m)
            return m
        if type == imp.PY_SOURCE:
            co = compile(fp.read()+'\n', 'exec')
        elif type == imp.PY_COMPILED:
            if fp.read(4) != imp.get_magic():
                self.msgout(2, "raise ImportError: Bad magic number", pathname)
                raise ImportError, "Bad magic number in %s" % pathname
            fp.read(4)
            co = marshal.load(fp)
        else:
            co = None
        m = self.add_module(fqname)
        m.__file__ = pathname
        if co:
            if self.replace_paths:
                co = self.replace_paths_in_code(co)
            m.__code__ = co
            self.scan_code(co, m)
        self.msgout(2, m)
        return m
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def dump_stats(self, filename):
        """Write the profile data to a file we kNow how to load back."""
        f = file(filename, 'wb')
        try:
            marshal.dump(self.stats, f)
        finally:
            f.close()

    # list the tuple indices and directions for sorting,
    # along with some printable description
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def marshal_load(f):
        if isinstance(f, file):
            return marshal.load(f)
        return marshal.loads(f.read())
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def load_bytecode(self, bucket):
        """Subclasses have to override this method to load bytecode into a
        bucket.  If they are not able to find code in the cache for the
        bucket,it must not do anything.
        """
        raise NotImplementedError()
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def scan_module(egg_dir,name)
    if filename[:-1] in stubs:
        return True     # Extension module
    pkg = base[len(egg_dir)+1:].replace(os.sep,'.')
    module = pkg+(pkg and '.' or '')+os.path.splitext(name)[0]
    if sys.version_info < (3, 3):
        skip = 8   # skip magic & date
    else:
        skip = 12  # skip magic & date & file size
    f = open(filename,'rb'); f.read(skip)
    code = marshal.load(f); f.close()
    safe = True
    symbols = dict.fromkeys(iter_symbols(code))
    for bad in ['__file__', bad)
                safe = False
    if '__name__' in symbols and '__main__' in symbols and '.' not in module:
        if sys.version[:3]=="2.4":  # -m works w/zipfiles in 2.5
            log.warn("%s: top-level module may be 'python -m' script", module)
            safe = False
    return safe
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def get_module_constant(module, paths=None):

    """Find 'module' by searching 'paths',mode,kind) = find_module(module,paths)
    except ImportError:
        # Module doesn't exist
        return None

    try:
        if kind==PY_COMPILED:
            f.read(8)   # skip magic & date
            code = marshal.load(f)
        elif kind==PY_FROZEN:
            code = imp.get_frozen_object(module)
        elif kind==PY_SOURCE:
            code = compile(f.read(),f,path,(suffix,kind))
            return getattr(sys.modules[module],symbol,None)

    finally:
        if f:
            f.close()

    return extract_constant(code,default)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def marshal_load(f):
        if isinstance(f, file):
            return marshal.load(f)
        return marshal.loads(f.read())
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def load_bytecode(self,it must not do anything.
        """
        raise NotImplementedError()

相关文章

Python setuptools.dep_util 模块,newer_pairwise_group() ...
Python chainer.utils.type_check 模块,eval() 实例源码 我...
Python chainer.utils.type_check 模块,prod() 实例源码 我...
Python chainer.utils.type_check 模块,expect() 实例源码 ...
Python multiprocessing.managers 模块,BaseProxy() 实例源...
Python multiprocessing.managers 模块,RemoteError() 实例...