Python sqlalchemy 模块,literal_column() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.literal_column()。
def define_tables(cls, Metadata):
Table('autoinc_pk', Metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('data', String(50))
)
Table('manual_pk', autoincrement=False), String(50))
)
Table('includes_defaults', String(50)),
Column('x', default=5),
Column('y',
default=literal_column("2", type_=Integer) + literal(2)))
def define_tables(cls, type_=Integer) + literal(2)))
def createNumericBinCaseStatement(dbSession,columnObject,bins,sumLabel):
cases = []
for i, threshold in enumerate(bins):
if i == 0:
low = 0
else:
low = bins[i-1]
high = bins[i]
caseLabel = str(low) + '-' + str(high)
addCase = columnObject < threshold,literal_column("'" + caseLabel + "'")
cases.append(addCase)
return dbSession(
case(cases, else_=literal_column("'> " + str(bins[-1]) + "'"))
.label(sumLabel),func.count(1)
)
# Run a custom-binned histogram query and return the counted results
def define_tables(cls, type_=Integer) + literal(2)))
def define_tables(cls, type_=Integer) + literal(2)))
def define_tables(cls, type_=Integer) + literal(2)))
def define_tables(cls, type_=Integer) + literal(2)))
def literal_column(text, type_=None):
"""Produce a :class:`.ColumnClause` object that has the
:paramref:`.column.is_literal` flag set to True.
:func:`.literal_column` is similar to :func:`.column`,except that
it is more often used as a "standalone" column expression that renders
exactly as stated; while :func:`.column` stores a string name that
will be assumed to be part of a table and may be quoted as such,
:func:`.literal_column` can be that,or any other arbitrary column-oriented
expression.
:param text: the text of the expression; can be any sql expression.
Quoting rules will not be applied. To specify a column-name expression
which should be subject to quoting rules,use the :func:`column`
function.
:param type\_: an optional :class:`~sqlalchemy.types.TypeEngine`
object which will
provide result-set translation and additional expression semantics for
this column. If left as None the type will be NullType.
.. seealso::
:func:`.column`
:func:`.text`
:ref:`sqlexpression_literal_column`
"""
return ColumnClause(text, type_=type_, is_literal=True)
def _interpret_as_column_or_from(element):
if isinstance(element, Visitable):
return element
elif hasattr(element, '__clause_element__'):
return element.__clause_element__()
insp = inspection.inspect(element, raiseerr=False)
if insp is None:
if isinstance(element, (util.nonetype, bool)):
return _const_expr(element)
elif hasattr(insp, "selectable"):
return insp.selectable
# be forgiving as this is an extremely common
# and kNown expression
if element == "*":
guess_is_literal = True
elif isinstance(element, (numbers.Number)):
return ColumnClause(str(element), is_literal=True)
else:
element = str(element)
# give into temptation,as this fact we are guessing about
# is not one we've prevIoUsly ever needed our users tell us;
# but let them kNow we are not happy about it
guess_is_literal = not _guess_straight_column.match(element)
util.warn_limited(
"Textual column expression %(column)r should be "
"explicitly declared with text(%(column)r),"
"or use %(literal_column)s(%(column)r) "
"for more specificity",
{
"column": util.ellipses_string(element),
"literal_column": "literal_column"
if guess_is_literal else "column"
})
return ColumnClause(
element,
is_literal=guess_is_literal)
def _delete_version(self, version):
self.heads.remove(version)
ret = self.context.impl._exec(
self.context._version.delete().where(
self.context._version.c.version_num ==
literal_column("'%s'" % version)))
if not self.context.as_sql and ret.rowcount != 1:
raise util.CommandError(
"Online migration expected to match one "
"row when deleting '%s' in '%s'; "
"%d found"
% (version,
self.context.version_table, ret.rowcount))
def _interpret_as_column_or_from(element):
if isinstance(element,
is_literal=guess_is_literal)
def _update_current_rev(self, old, new):
if old == new:
return
if new is None:
self.impl._exec(self._version.delete())
elif old is None:
self.impl._exec(self._version.insert().
values(version_num=literal_column("'%s'" % new))
)
else:
self.impl._exec(self._version.update().
values(version_num=literal_column("'%s'" % new))
)
def _interpret_as_column_or_from(element):
if isinstance(element,
is_literal=guess_is_literal)
def _interpret_as_column_or_from(element):
if isinstance(element,
is_literal=guess_is_literal)
def _update_current_rev(self, new):
if old == new:
return
if new is None:
self.impl._exec(self._version.delete())
elif old is None:
self.impl._exec(self._version.insert().
values(version_num=literal_column("'%s'" % new))
)
else:
self.impl._exec(self._version.update().
values(version_num=literal_column("'%s'" % new))
)
def _interpret_as_column_or_from(element):
if isinstance(element,
is_literal=guess_is_literal)
def importVolumes(connection,Metadata,sourcePath):
invVolumes = Table('invVolumes',Metadata)
invTypes = Table('invTypes',Metadata)
with open(os.path.join(sourcePath,'invVolumes1.csv'), 'r') as groupVolumes:
volumereader=csv.reader(groupVolumes, delimiter=',')
for group in volumereader:
connection.execute(invVolumes.insert().from_select(['typeID','volume'],select([invTypes.c.typeID,literal_column(group[0])]).where(invTypes.c.groupID == literal_column(group[1]))))
with open(os.path.join(sourcePath,'invVolumes2.csv'),')
for group in volumereader:
connection.execute(invVolumes.insert(),typeID=group[1],volume=group[0])
def _interpret_as_column_or_from(element):
if isinstance(element,
is_literal=guess_is_literal)
def _interpret_as_column_or_from(element):
if isinstance(element,
is_literal=guess_is_literal)