Python sqlalchemy 模块,case() 实例源码
我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用sqlalchemy.case()。
def count_water_stock(message):
"""????????????????
:param message: slackbot?????????????class
"""
s = Session()
stock_number, latest_ctime = (
s.query(func.sum(WaterHistory.delta),
func.max(case(whens=((
WaterHistory.delta != 0,
WaterHistory.ctime),), else_=None))).first()
)
if stock_number:
# sqlite????????????????
if not isinstance(latest_ctime, datetime.datetime):
latest_ctime = datetime.datetime.strptime(latest_ctime,
'%Y-%m-%d %H:%M:%s')
message.send('??: {}? ({:%Y?%m?%d?} ??)'
.format(stock_number, latest_ctime))
else:
message.send('??????????')
def pattern_order(column, patterns):
"""Create a clause suitable for use in an ``order by`` clause which will
sort results in the order they are matched by the patterns.
Parameters
----------
column : sa.sql.ColumnClause
The column being matched against.
patterns : iterable[str]
The patterns to search in order.
Returns
-------
sa.sql.ColumnClause
The clause used in an ``order by`` to enforce the given order.
"""
return sa.case([
(column.like(fuzzy(pattern)), n)
for n, pattern in enumerate(patterns)
]).asc()
def all(self, PRONAC = None, CgcCpf = None):
descricao_case = case([
(CertidoesNegativasModel.CodigoCertidao == '49', u'Quitação de Tributos Federais'),
(CertidoesNegativasModel.CodigoCertidao == '51', 'FGTS'),
(CertidoesNegativasModel.CodigoCertidao == '52', 'INSS'),
(CertidoesNegativasModel.CodigoCertidao == '244', 'CADIN'),
])
situacao_case = case([(CertidoesNegativasModel.cdSituacaoCertidao == 0, u'Pendente')],
else_ = u'Não Pendente'
)
res = self.sql_connector.session.query(CertidoesNegativasModel.DtEmissao.label('data_emissao'),
CertidoesNegativasModel.DtValidade.label('data_validade'),
descricao_case.label('descricao'),
situacao_case.label('situacao'),
)
if PRONAC is not None:
res = res.filter(CertidoesNegativasModel.PRONAC == PRONAC)
return res.all()
def Hybridratio(line1, line2):
''' produces emission line ratio hybrid properties '''
@hybrid_property
def hybridratio(self):
if type(line1) == tuple:
myline1 = getattr(self, line1[0])+getattr(self, line1[1])
else:
myline1 = getattr(self, line1)
if getattr(self, line2) > 0:
return myline1/getattr(self, line2)
else:
return -999.
@hybridratio.expression
def hybridratio(cls):
if type(line1) == tuple:
myline1 = getattr(cls, line1[0])+getattr(cls, line1[1])
else:
myline1 = getattr(cls, line1)
return cast(case([(getattr(cls, line2) > 0., myline1/getattr(cls, line2)),
(getattr(cls, line2) == 0., -999.)]), Float)
return hybridratio
def ajax_post_typeahead():
if not permissions.index_view.can():
return '[]'
# this a string of the search term
search_terms = request.args.get('search', '')
search_terms = search_terms.split(" ")
case_statements = []
for term in search_terms:
case_stmt = case([(Post.keywords.ilike('%' + term.strip() + '%'), 1)], else_=0)
case_statements += [case_stmt]
match_score = sum(case_statements).label("match_score")
posts = (db_session.query(Post, match_score)
.filter(Post.status == current_repo.PostStatus.PUBLISHED.value)
.order_by(desc(match_score))
.limit(5)
.all())
matches = []
for (post, count) in posts:
authors_str = [author.format_name for author in post.authors]
typeahead_entry = {'author': authors_str,
'title': str(post.title),
'path': str(post.path),
'keywords': str(post.keywords)}
matches += [typeahead_entry]
return json.dumps(matches)
def test_session_query(session, table):
col_concat = func.concat(table.c.string).label('concat')
result = (
session
.query(
table.c.string,
col_concat,
func.avg(table.c.integer),
func.sum(case([(table.c.boolean == True, else_=0))
)
.group_by(table.c.string, col_concat)
.having(func.avg(table.c.integer) > 10)
).all()
assert len(result) > 0
def get_grades(cls, user_id, exercise_id, active=None):
expr = case([(cls.junk == True, 0.0)], else_=cls.score)
query = db.session.query(distinct(Response.id), expr) \
.outerjoin(cls, and_(cls.response_id == Response.id,
cls.user_id == user_id)) \
.filter(Response.exercise_id == exercise_id).order_by(Response.id)
if active:
query = query.filter(Response.active == True)
return query.all()
def sort_by_demographics_field(current_user, field, reverse, else_=None):
if current_user.is_admin:
expression = field
else:
sub_query = filter_by_group_roles(current_user, get_roles_with_permission(PERMISSION.VIEW_DEmogRAPHICS))
expression = case([(sub_query, field)], else_=else_)
if reverse:
expression = desc(expression)
return expression