Python sqlalchemy.util 模块,OrderedDict() 实例源码
我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用sqlalchemy.util.OrderedDict()。
def revision_as_dict(revision, include_packages=True, include_groups=True,
ref_package_by='name'):
revision_dict = OrderedDict((
('id', revision.id),
('timestamp', revision.timestamp.isoformat()),
('message', revision.message),
('author', revision.author),
('approved_timestamp',
revision.approved_timestamp.isoformat() \
if revision.approved_timestamp else None),
))
if include_packages:
revision_dict['packages'] = [getattr(pkg, ref_package_by) \
for pkg in revision.packages
if (pkg and not pkg.private)]
if include_groups:
revision_dict['groups'] = [getattr(grp, ref_package_by) \
for grp in revision.groups if grp]
return revision_dict
def _grab_table_elements(self):
schema = self.table.schema
self.columns = OrderedDict()
for c in self.table.c:
c_copy = c.copy(schema=schema)
c_copy.unique = c_copy.index = False
self.columns[c.name] = c_copy
self.named_constraints = {}
self.unnamed_constraints = []
self.indexes = {}
for const in self.table.constraints:
if _is_type_bound(const):
continue
if const.name:
self.named_constraints[const.name] = const
else:
self.unnamed_constraints.append(const)
for idx in self.table.indexes:
self.indexes[idx.name] = idx
def _grab_table_elements(self):
schema = self.table.schema
self.columns = OrderedDict()
for c in self.table.c:
c_copy = c.copy(schema=schema)
c_copy.unique = c_copy.index = False
# ensure that the type object was copied,
# as we may need to modify it in-place
if isinstance(c.type, SchemaEventTarget):
assert c_copy.type is not c.type
self.columns[c.name] = c_copy
self.named_constraints = {}
self.unnamed_constraints = []
self.indexes = {}
self.new_indexes = {}
for const in self.table.constraints:
if _is_type_bound(const):
continue
elif const.name:
self.named_constraints[const.name] = const
else:
self.unnamed_constraints.append(const)
for idx in self.table.indexes:
self.indexes[idx.name] = idx
for k in self.table.kwargs:
self.table_kwargs.setdefault(k, self.table.kwargs[k])
def __init__(self, uowtransaction, mapper):
if uowtransaction is not None:
uowtransaction.tasks[mapper] = self
self.uowtransaction = uowtransaction
self.mapper = mapper
self.objects = util.OrderedDict()
self.dependencies = []
self.cyclical_dependencies = []
self.circular = None
self.postcircular = None
self.childtasks = []
# print "NEW TASK",repr(self)
def as_dict(self, core_columns_only=False):
_dict = OrderedDict()
cols = self.get_columns()
if not core_columns_only:
cols = ['id'] + cols + ['position']
for col in cols:
value = getattr(self, col)
if isinstance(value, datetime.datetime):
value = value.isoformat()
_dict[col] = value
for k, v in self.extras.items() if self.extras else []:
_dict[k] = v
if self.package_id and not core_columns_only:
_dict["package_id"] = self.package_id
return _dict
def as_dict(self):
_dict = OrderedDict()
table = orm.class_mapper(self.__class__).mapped_table
for col in table.c:
val = getattr(self, col.name)
if isinstance(val, datetime.date):
val = str(val)
if isinstance(val, datetime.datetime):
val = val.isoformat()
_dict[col.name] = val
return _dict
def dataset_facets(self, facets_dict, package_type):
if package_type <> 'harvest':
return facets_dict
return OrderedDict([('frequency', 'Frequency'),
('source_type','Type'),
])
def __init__(self, {'expr': c}) for c in self.table.c
)
self._grab_table_elements()
def __init__(self, table_kwargs, reflected):
self.table = table # this is a Table object
self.table_args = table_args
self.table_kwargs = table_kwargs
self.new_table = None
self.column_transfers = OrderedDict(
(c.name, {'expr': c}) for c in self.table.c
)
self.reflected = reflected
self._grab_table_elements()
def _grab_table_elements(self):
schema = self.table.schema
self.columns = OrderedDict()
for c in self.table.c:
c_copy = c.copy(schema=schema)
c_copy.unique = c_copy.index = False
# ensure that the type object was copied, SchemaEventTarget):
assert c_copy.type is not c.type
self.columns[c.name] = c_copy
self.named_constraints = {}
self.unnamed_constraints = []
self.indexes = {}
self.new_indexes = {}
for const in self.table.constraints:
if _is_type_bound(const):
continue
elif self.reflected and isinstance(const, CheckConstraint):
# Todo: we are skipping reflected CheckConstraint because
# we have no way to determine _is_type_bound() for these.
pass
elif const.name:
self.named_constraints[const.name] = const
else:
self.unnamed_constraints.append(const)
for idx in self.table.indexes:
self.indexes[idx.name] = idx
for k in self.table.kwargs:
self.table_kwargs.setdefault(k, self.table.kwargs[k])
def __init__(self, association_tables, inflect_engine, detect_joined):
super(ModelClass, self).__init__(table)
self.name = self._tablename_to_classname(table.name, inflect_engine)
self.children = []
self.attributes = OrderedDict()
# Assign attribute names for columns
for column in table.columns:
self._add_attribute(column.name, column)
# Add many-to-one relationships
pk_column_names = set(col.name for col in table.primary_key.columns)
for constraint in sorted(table.constraints, key=_get_constraint_sort_key):
if isinstance(constraint, ForeignKeyConstraint):
target_cls = self._tablename_to_classname(constraint.elements[0].column.table.name, inflect_engine)
if (detect_joined and self.parent_name == 'Base' and
set(_get_column_names(constraint)) == pk_column_names):
self.parent_name = target_cls
else:
relationship_ = ManyToOneRelationship(self.name, target_cls, constraint, inflect_engine)
self._add_attribute(relationship_.preferred_name, relationship_)
# Add many-to-many relationships
for association_table in association_tables:
fk_constraints = [c for c in association_table.constraints if isinstance(c, ForeignKeyConstraint)]
fk_constraints.sort(key=_get_constraint_sort_key)
target_cls = self._tablename_to_classname(fk_constraints[1].elements[0].column.table.name, inflect_engine)
relationship_ = ManyToManyRelationship(self.name, association_table)
self._add_attribute(relationship_.preferred_name, relationship_)
def __init__(self, {'expr': c}) for c in self.table.c
)
self.reflected = reflected
self._grab_table_elements()
def __init__(self, {'expr': c}) for c in self.table.c
)
self._grab_table_elements()
def get_datastore_table(self):
engine, conn = self.get_datastore_engine_and_connection()
Meta = MetaData(bind=engine, reflect=True)
table = Table(self.resource_id, Meta,
autoload=True, autoload_with=engine)
s = select([table])
result = conn.execute(s)
return dict(
num_rows=result.rowcount,
headers=result.keys(),
header_dict=OrderedDict([(c.key, str(c.type))
for c in table.columns]),
rows=result.fetchall(),
)
def __init__(self, {'expr': c}) for c in self.table.c
)
self.reflected = reflected
self._grab_table_elements()
def __init__(self, {'expr': c}) for c in self.table.c
)
self.reflected = reflected
self._grab_table_elements()
def __init__(self, {'expr': c}) for c in self.table.c
)
self.reflected = reflected
self._grab_table_elements()
def __init__(self, {'expr': c}) for c in self.table.c
)
self.reflected = reflected
self._grab_table_elements()
def __init__(self, {'expr': c}) for c in self.table.c
)
self.reflected = reflected
self._grab_table_elements()
def __init__(self, columns=None, whereclause = None, from_obj = [], order_by = None, group_by=None, having=None, use_labels = False, distinct=False, engine = None, limit=None, offset=None):
self._froms = util.OrderedDict()
self.use_labels = use_labels
self.id = "Select(%d)" % id(self)
self.name = None
self.whereclause = None
self.having = None
self._engine = engine
self.oid_column = None
self.limit = limit
self.offset = offset
# indicates if this select statement is a subquery inside another query
self.issubquery = False
# indicates if this select statement is a subquery as a criterion
# inside of a WHERE clause
self.is_where = False
self.clauses = []
self.distinct = distinct
self._text = None
self._raw_columns = []
self._correlated = None
self._correlator = Select.correlatedVisitor(self, False)
self._wherecorrelator = Select.correlatedVisitor(self, True)
if columns is not None:
for c in columns:
self.append_column(c)
if whereclause is not None:
self.append_whereclause(whereclause)
if having is not None:
self.append_having(having)
for f in from_obj:
self.append_from(f)
if group_by:
self.group_by(*group_by)
if order_by:
self.order_by(*order_by)