Python sqlalchemy 模块,bindparam() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.bindparam()。
def search(cls, connection: sqlSession, term: str) -> Sequence['Mod']:
"""Search for Mods that contain TERM in name or summary.
Keyword arguments:
connection: Database connection to ask on.
term: The term to search for.
Returns:
Sequence of matching mods (possibly empty).
"""
query = sqlBakery(lambda conn: conn.query(cls))
query += lambda q: q.filter(or_(
cls.name.like(bindparam('term')),
cls.summary.like(bindparam('term')),
))
query += lambda q: q.order_by(cls.name)
return query(connection).params(term='%{}%'.format(term)).all()
def find(cls, name: str) -> 'Mod':
"""Find exactly one Mod named NAME.
Keyword Arguments:
connection: Database connection to ask on.
name: The name of the mod to search for.
Returns:
The requested mod.
Raises:
noresultsFound: The name does not match any kNown mod.
MultipleResultsFound: The name is too ambiguous,
multiple matching mods found.
"""
query = sqlBakery(lambda conn: conn.query(cls))
query += lambda q: q.filter(cls.name.like(bindparam('name')))
return query(connection).params(name='{}'.format(name)).one()
def with_id(cls, id: int) -> 'Mod':
"""Fetch mod with id from database.
Keyword arguments:
connection: Database connection to fetch from.
id: The id field of the mod to get.
Returns:
The requested mod.
Raises:
noresultFound: Mod with specified id does not exist.
"""
query = sqlBakery(lambda conn: conn.query(cls))
query += lambda q: q.filter(cls.id == bindparam('id'))
return query(connection).params(id=id).one()
def _params(self, unique, kwargs):
if len(optionaldict) == 1:
kwargs.update(optionaldict[0])
elif len(optionaldict) > 1:
raise exc.ArgumentError(
"params() takes zero or one positional dictionary argument")
def visit_bindparam(bind):
if bind.key in kwargs:
bind.value = kwargs[bind.key]
bind.required = False
if unique:
bind._convert_to_unique()
return cloned_traverse(self, {}, {'bindparam': visit_bindparam})
def update_canonicals(canonicals):
'''
Update canonical data for android devices.
'''
global ENGINE
binding = [{"p_{}".format(k): v for k, v in canonical.items()} for canonical in canonicals]
device_table = model.Metadata.tables['device']
stmt = update(device_table).\
values(device_token_new=bindparam('p_new_token')).\
where(and_(device_table.c.login_id == bindparam('p_login_id'),
func.coalesce(device_table.c.device_token_new, device_table.c.device_token) == bindparam('p_old_token')))
ENGINE.execute(stmt, binding)
with session_scope() as session:
query = text('SELECT keep_max_users_per_device( \
(:platform_id)::int2,:device_token,(:max_users_per_device)::int2)')
for canonical in canonicals:
session.execute(query,
{'platform_id': constants.PLATFORM_ANDROID,
'device_token': canonical['new_token'],
'max_users_per_device': config.max_users_per_device
})
session.execute(query,
{'platform_id': constants.PLATFORM_ANDROID_TABLET,
'max_users_per_device': config.max_users_per_device
})
session.commit()
def update_unregistered_devices(unregistered):
'''
Update data for unregistered Android devices.
Unregistered device will not receive notifications and will be deleted when number of devices exceeds maximum.
'''
global ENGINE
binding = [{"p_{}".format(k): v for k, v in u.items()} for u in unregistered]
device_table = model.Metadata.tables['device']
stmt = update(device_table).\
values(unregistered_ts=func.Now()).\
where(and_(device_table.c.login_id == bindparam('p_login_id'), device_table.c.device_token) == bindparam('p_device_token')))
ENGINE.execute(stmt, binding)
def params(self, **kwargs):
"""Return a copy with :func:`bindparam()` elements replaced.
Returns a copy of this ClauseElement with :func:`bindparam()`
elements replaced with values taken from the given dictionary::
>>> clause = column('x') + bindparam('foo')
>>> print clause.compile().params
{'foo':None}
>>> print clause.params({'foo':7}).compile().params
{'foo':7}
"""
return self._params(False, kwargs)
def _getPipeInfo(self, pipename):
''' Retrieve the pipeline Info for a given pipeline version name '''
assert pipename.lower() in ['drp', 'dap'], 'Pipeline Name must either be DRP or DAP'
# bindparam values
bindname = 'drpver' if pipename.lower() == 'drp' else 'dapver'
bindvalue = self._drpver if pipename.lower() == 'drp' else self._dapver
# class names
if pipename.lower() == 'drp':
inclasses = self._tableInQuery('cube') or 'cube' in str(self.query.statement.compile())
elif pipename.lower() == 'dap':
inclasses = self._tableInQuery('file') or 'file' in str(self.query.statement.compile())
# set alias
pipealias = self._drp_alias if pipename.lower() == 'drp' else self._dap_alias
# get the pipeinfo
if inclasses:
pipeinfo = marvindb.session.query(pipealias).\
join(marvindb.datadb.PipelineName, marvindb.datadb.PipelineVersion).\
filter(marvindb.datadb.PipelineName.label == pipename.upper(),
marvindb.datadb.PipelineVersion.version == bindparam(bindname, bindvalue)).one()
else:
pipeinfo = None
return pipeinfo