Python sqlalchemy 模块,outerjoin() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.outerjoin()。
def _create_outerjoin(cls, left, right, onclause=None):
"""Return an ``OUTER JOIN`` clause element.
The returned object is an instance of :class:`.Join`.
Similar functionality is also available via the
:meth:`~.FromClause.outerjoin()` method on any
:class:`.FromClause`.
:param left: The left side of the join.
:param right: The right side of the join.
:param onclause: Optional criterion for the ``ON`` clause,is
derived from foreign key relationships established between
left and right otherwise.
To chain joins together,use the :meth:`.FromClause.join` or
:meth:`.FromClause.outerjoin` methods on the resulting
:class:`.Join` object.
"""
return cls(left, onclause, isouter=True)
def _create_outerjoin(cls, onclause=None, full=False):
"""Return an ``OUTER JOIN`` clause element.
The returned object is an instance of :class:`.Join`.
Similar functionality is also available via the
:meth:`~.FromClause.outerjoin()` method on any
:class:`.FromClause`.
:param left: The left side of the join.
:param right: The right side of the join.
:param onclause: Optional criterion for the ``ON`` clause, isouter=True, full=full)
def outerjoin(self, onclause=None):
"""Return a :class:`.Join` from this :class:`.FromClause`
to another :class:`FromClause`,with the "isouter" flag set to
True.
E.g.::
from sqlalchemy import outerjoin
j = user_table.outerjoin(address_table,
user_table.c.id == address_table.c.user_id)
The above is equivalent to::
j = user_table.join(
address_table,
user_table.c.id == address_table.c.user_id,
isouter=True)
:param right: the right side of the join; this is any
:class:`.FromClause` object such as a :class:`.Table` object,and
may also be a selectable-compatible object such as an ORM-mapped
class.
:param onclause: a sql expression representing the ON clause of the
join. If left at ``None``,:meth:`.FromClause.join` will attempt to
join the two tables based on a foreign key relationship.
.. seealso::
:meth:`.FromClause.join`
:class:`.Join`
"""
return Join(self, True)
def outerjoin(self, full=False):
"""Return a :class:`.Join` from this :class:`.FromClause`
to another :class:`FromClause`,:meth:`.FromClause.join` will attempt to
join the two tables based on a foreign key relationship.
:param full: if True,render a FULL OUTER JOIN,instead of
LEFT OUTER JOIN.
.. versionadded:: 1.1
.. seealso::
:meth:`.FromClause.join`
:class:`.Join`
"""
return Join(self, True, full)
def get_query(self):
t = db.Metadata.tables
tables = set()
columns = []
for i in self.fields_to_export:
table = 'movies'
column = i.split('.')
if len(column) > 1:
table = column[0]
column = column[1]
if table not in t:
log.warning("Wrong table name: %s", table)
continue
tables.add(table) # will be used to generate JOIN
else:
column = column[0]
if column in t[table].columns:
columns.append(t[table].columns[column])
else:
log.warning("Wrong field name: %s", i)
joins = []
if 'media' in tables:
joins.append((t['media'], t['movies'].c.medium_id==t['media'].c.medium_id))
if 'collections' in tables:
joins.append((t['collections'], t['movies'].c.collection_id==t['collections'].c.collection_id))
if 'volumes' in tables:
joins.append((t['volumes'], t['movies'].c.volume_id==t['volumes'].c.volume_id))
if 'vcodecs' in tables:
joins.append((t['vcodecs'], t['movies'].c.vcodec_id==t['vcodecs'].c.vcodec_id))
if joins:
from_obj = [ outerjoin(t['movies'], *(joins[0])) ]
for j in joins[1:]:
from_obj.append(outerjoin(from_obj[-1], *j))
query = select(columns=columns, bind=self.db.session.bind, from_obj=from_obj, use_labels=True)
else:
query = select(columns=columns, bind=self.db.session.bind)
query = update_whereclause(query, self.search_conditions)
# save column names (will contain 'movies_title' or 'title' depending on how many tables were requested)
self.exported_columns = query.columns
return query
def export_to_document(self, document, mainelement):
table_movies = db.Metadata.tables['movies']
# create object
columns = [table_movies.c.movie_id, table_movies.c.number,
table_movies.c.title, table_movies.c.o_title,
table_movies.c.country, table_movies.c.year,
table_movies.c.runtime, table_movies.c.classification,
table_movies.c.genre, table_movies.c.region, table_movies.c.studio,
table_movies.c.cast, table_movies.c.director,
table_movies.c.plot, table_movies.c.notes,
table_movies.c.loaned, table_movies.c.rating,
table_movies.c.trailer, table_movies.c.image,
table_movies.c.seen, table_movies.c.media_num,
table_movies.c.poster_md5, table_movies.c.screenplay,
table_movies.c.cameraman, table_movies.c.barcode]
# use outer join to media table to get the name of the media
columns.append(db.Metadata.tables['media'].c.name)
media_join = outerjoin(db.Metadata.tables['movies'], \
db.Metadata.tables['media'], \
db.Metadata.tables['movies'].c.medium_id==db.Metadata.tables['media'].c.medium_id)
# use outer join to collections table to get the name of the collection
columns.append(db.Metadata.tables['collections'].c.name)
collection_join = media_join.outerjoin( \
db.Metadata.tables['collections'], \
db.Metadata.tables['movies'].c.collection_id==db.Metadata.tables['collections'].c.collection_id)
# use outer join to volumes table to get the name of the volume
columns.append(db.Metadata.tables['volumes'].c.name)
volume_join = collection_join.outerjoin( \
db.Metadata.tables['volumes'], \
db.Metadata.tables['movies'].c.volume_id==db.Metadata.tables['volumes'].c.volume_id)
# use outer join to volumes table to get the name of the video codec
columns.append(db.Metadata.tables['vcodecs'].c.name)
vcodec_join = volume_join.outerjoin( \
db.Metadata.tables['vcodecs'], \
db.Metadata.tables['movies'].c.vcodec_id==db.Metadata.tables['vcodecs'].c.vcodec_id)
# use outer join to posters table to get the poster image
columns.append(db.Metadata.tables['posters'].c.data)
posters_join = vcodec_join.outerjoin( \
db.Metadata.tables['posters'], \
db.Metadata.tables['movies'].c.poster_md5==db.Metadata.tables['posters'].c.md5sum)
# fetch movie data
moviesquery = select(columns=columns, from_obj=[media_join, collection_join, volume_join, vcodec_join, posters_join], use_labels = True)
moviesquery = update_whereclause(moviesquery, self.search_conditions)
self.process_movies(document, mainelement, moviesquery)