Python sqlalchemy 模块,schema() 实例源码
我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用sqlalchemy.schema()。
def render_table(self, model):
rendered = 't_{0} = Table(\n{1}{0!r},Metadata,\n'.format(model.table.name, self.indentation)
for column in model.table.columns:
rendered += '{0}{1},\n'.format(self.indentation, self.render_column(column, True))
for constraint in sorted(model.table.constraints, key=_get_constraint_sort_key):
if isinstance(constraint, PrimaryKeyConstraint):
continue
if isinstance(constraint, (ForeignKeyConstraint, UniqueConstraint)) and len(constraint.columns) == 1:
continue
rendered += '{0}{1}, self.render_constraint(constraint))
for index in model.table.indexes:
if len(index.columns) > 1:
rendered += '{0}{1}, self.render_index(index))
if model.schema:
rendered += "{0}schema='{1}',\n".format(self.indentation, model.schema)
return rendered.rstrip('\n,') + '\n)\n'
def test_check(self):
"""Can create columns with check constraint"""
col = Column('data',
Integer,
sqlalchemy.schema.CheckConstraint('data > 4'))
col.create(self.table)
# check if constraint was added (cannot test on objects)
self.table.insert(values={'data': 5}).execute()
try:
self.table.insert(values={'data': 3}).execute()
except (sqlalchemy.exc.IntegrityError,
sqlalchemy.exc.ProgrammingError):
pass
else:
self.fail()
col.drop()
def _setup(self, url):
super(TestColumnChange, self)._setup(url)
self.Meta = MetaData(self.engine)
self.table = Table(self.table_name, self.Meta,
Column('id', Integer, primary_key=True),
Column('data', String(40), server_default=DefaultClause("tluafed"),
nullable=True),
)
if self.table.exists():
self.table.drop()
try:
self.table.create()
except sqlalchemy.exceptions.sqlError, e:
# sqlite: database schema has changed
if not self.url.startswith('sqlite://'):
raise
def test_datetime(self):
df = DataFrame({'A': date_range('2013-01-01 09:00:00', periods=3),
'B': np.arange(3.0)})
df.to_sql('test_datetime', self.conn)
# with read_table -> type information from schema used
result = sql.read_sql_table('test_datetime', self.conn)
result = result.drop('index', axis=1)
tm.assert_frame_equal(result, df)
# with read_sql -> no type information -> sqlite has no native
result = sql.read_sql_query('SELECT * FROM test_datetime', axis=1)
if self.flavor == 'sqlite':
self.assertTrue(isinstance(result.loc[0, 'A'], string_types))
result['A'] = to_datetime(result['A'])
tm.assert_frame_equal(result, df)
else:
tm.assert_frame_equal(result, df)
def test_datetime_NaT(self):
df = DataFrame({'A': date_range('2013-01-01 09:00:00',
'B': np.arange(3.0)})
df.loc[1, 'A'] = np.nan
df.to_sql('test_datetime', self.conn, index=False)
# with read_table -> type information from schema used
result = sql.read_sql_table('test_datetime', self.conn)
tm.assert_frame_equal(result, self.conn)
if self.flavor == 'sqlite':
self.assertTrue(isinstance(result.loc[0, string_types))
result['A'] = to_datetime(result['A'], errors='coerce')
tm.assert_frame_equal(result, df)
def test_notnull_dtype(self):
cols = {'Bool': Series([True, None]),
'Date': Series([datetime(2012, 5, 1),
'Int': Series([1, None], dtype='object'),
'Float': Series([1.1, None])
}
df = DataFrame(cols)
tbl = 'notnull_dtype_test'
df.to_sql(tbl, self.conn)
returned_df = sql.read_sql_table(tbl, self.conn) # noqa
Meta = sqlalchemy.schema.MetaData(bind=self.conn)
Meta.reflect()
if self.flavor == 'MysqL':
my_type = sqltypes.Integer
else:
my_type = sqltypes.Boolean
col_dict = Meta.tables[tbl].columns
self.assertTrue(isinstance(col_dict['Bool'].type, my_type))
self.assertTrue(isinstance(col_dict['Date'].type, sqltypes.DateTime))
self.assertTrue(isinstance(col_dict['Int'].type, sqltypes.Integer))
self.assertTrue(isinstance(col_dict['Float'].type, sqltypes.Float))
def _create_table_setup(self):
from sqlalchemy import Table, Column, PrimaryKeyConstraint
column_names_and_types = \
self._get_column_names_and_types(self._sqlalchemy_type)
columns = [Column(name, typ, index=is_index)
for name, is_index in column_names_and_types]
if self.keys is not None:
if not com.is_list_like(self.keys):
keys = [self.keys]
else:
keys = self.keys
pkc = PrimaryKeyConstraint(*keys, name=self.name + '_pk')
columns.append(pkc)
schema = self.schema or self.pd_sql.Meta.schema
# At this point,attach to new Metadata,only attach to self.Meta
# once table is created.
from sqlalchemy.schema import MetaData
Meta = MetaData(self.pd_sql, schema=schema)
return Table(self.name, Meta, *columns, schema=schema)
def has_table(self, connection, tablename, schema=None):
result = connection.scalar(
sql.text(
"select count(*) from msysobjects where "
"type=1 and name=:name"), name=tablename
)
return bool(result)
def get_table_names(self, schema=None, **kw):
result = connection.execute("select name from msysobjects where "
"type=1 and name not like 'MSys%'")
table_names = [r[0] for r in result]
return table_names
def __init__(self, table):
super(Model, self).__init__()
self.table = table
self.schema = table.schema
# Adapt column types to the most reasonable generic types (ie. VARCHAR -> String)
for column in table.columns:
cls = column.type.__class__
for supercls in cls.__mro__:
if hasattr(supercls, '__visit_name__'):
cls = supercls
if supercls.__name__ != supercls.__name__.upper() and not supercls.__name__.startswith('_'):
break
column.type = column.type.adapt(cls)
def test_fk(self):
"""Can create columns with foreign keys"""
# create FK's target
reftable = Table('tmp_ref',
)
if self.engine.has_table(reftable.name):
reftable.drop()
reftable.create()
# create column with fk
col = Column('data', ForeignKey(reftable.c.id))
col.create(self.table)
# check if constraint is added
for cons in self.table.constraints:
if isinstance(cons, sqlalchemy.schema.ForeignKeyConstraint):
break
else:
self.fail('No constraint found')
# Todo: test on db level if constraints work
if sqlA_07:
self.assertEqual(reftable.c.id.name,
list(col.foreign_keys)[0].column.name)
else:
self.assertEqual(reftable.c.id.name,
col.foreign_keys[0].column.name)
col.drop(self.table)
if self.engine.has_table(reftable.name):
reftable.drop()
def test_sqlite_type_mapping(self):
# Test Timestamp objects (no datetime64 because of timezone) (GH9085)
df = DataFrame({'time': to_datetime(['201412120154', '201412110254'],
utc=True)})
db = sql.sqliteDatabase(self.conn, self.flavor)
table = sql.sqliteTable("test_type", db, frame=df)
schema = table.sql_schema()
self.assertEqual(self._get_sqlite_column_type(schema, 'time'),
"TIMESTAMP")
# -----------------------------------------------------------------------------
# -- Database flavor specific tests
def test_double_precision(self):
V = 1.23456789101112131415
df = DataFrame({'f32': Series([V, ], dtype='float32'),
'f64': Series([V, dtype='float64'),
'f64_as_f32': Series([V,
'i32': Series([5, dtype='int32'),
'i64': Series([5, dtype='int64'),
})
df.to_sql('test_dtypes', index=False, if_exists='replace',
dtype={'f64_as_f32': sqlalchemy.Float(precision=23)})
res = sql.read_sql_table('test_dtypes', self.conn)
# check precision of float64
self.assertEqual(np.round(df['f64'].iloc[0], 14),
np.round(res['f64'].iloc[0], 14))
# check sql types
Meta = sqlalchemy.schema.MetaData(bind=self.conn)
Meta.reflect()
col_dict = Meta.tables['test_dtypes'].columns
self.assertEqual(str(col_dict['f32'].type),
str(col_dict['f64_as_f32'].type))
self.assertTrue(isinstance(col_dict['f32'].type, sqltypes.Float))
self.assertTrue(isinstance(col_dict['f64'].type, sqltypes.Float))
self.assertTrue(isinstance(col_dict['i32'].type, sqltypes.Integer))
self.assertTrue(isinstance(col_dict['i64'].type, sqltypes.BigInteger))
def has_table(table_name, con, flavor='sqlite', schema=None):
"""
Check if DataBase has named table.
Parameters
----------
table_name: string
Name of sql table
con: sqlAlchemy connectable(engine/connection) or sqlite3 DBAPI2 connection
Using sqlAlchemy makes it possible to use any DB supported by that
library.
If a DBAPI2 object,only sqlite3 is supported.
flavor: {'sqlite','MysqL'},default 'sqlite'
The flavor of sql to use. Ignored when using sqlAlchemy connectable.
'MysqL' is deprecated and will be removed in future versions,but it
will be further supported through sqlAlchemy connectables.
schema : string,default None
Name of sql schema in database to write to (if database flavor supports
this). If None,use default schema (default).
Returns
-------
boolean
"""
pandas_sql = pandassql_builder(con, flavor=flavor, schema=schema)
return pandas_sql.has_table(table_name)
def pandassql_builder(con, flavor=None, Meta=None,
is_cursor=False):
"""
Convenience function to return the correct Pandassql subclass based on the
provided parameters
"""
# When support for DBAPI connections is removed,
# is_cursor should not be necessary.
con = _engine_builder(con)
if _is_sqlalchemy_connectable(con):
return sqlDatabase(con, schema=schema, Meta=Meta)
else:
if flavor == 'MysqL':
warnings.warn(_MysqL_WARNING, FutureWarning, stacklevel=3)
return sqliteDatabase(con, flavor, is_cursor=is_cursor)
def exists(self):
return self.pd_sql.has_table(self.name, self.schema)
def create(self):
if self.exists():
if self.if_exists == 'fail':
raise ValueError("Table '%s' already exists." % self.name)
elif self.if_exists == 'replace':
self.pd_sql.drop_table(self.name, self.schema)
self._execute_create()
elif self.if_exists == 'append':
pass
else:
raise ValueError(
"'{0}' is not valid for if_exists".format(self.if_exists))
else:
self._execute_create()
def has_table(self, name, schema=None):
return self.connectable.run_callable(
self.connectable.dialect.has_table,
name,
schema or self.Meta.schema,
)
def get_table(self, table_name, schema=None):
schema = schema or self.Meta.schema
if schema:
tbl = self.Meta.tables.get('.'.join([schema, table_name]))
else:
tbl = self.Meta.tables.get(table_name)
# Avoid casting double-precision floats into decimals
from sqlalchemy import Numeric
for column in tbl.columns:
if isinstance(column.type, Numeric):
column.type.asdecimal = False
return tbl
def to_sql(self, frame, if_exists='fail', index=True,
index_label=None, chunksize=None, dtype=None):
"""
Write records stored in a DataFrame to a sql database.
Parameters
----------
frame: DataFrame
name: name of sql table
if_exists: {'fail','replace','append'},default 'fail'
fail: If table exists,do nothing.
replace: If table exists,drop it,recreate it,and insert data.
append: If table exists,insert data. Create if does not exist.
index : boolean,default True
Write DataFrame index as a column
index_label : string or sequence,default None
Column label for index column(s). If None is given (default) and
`index` is True,then the index names are used.
A sequence should be given if the DataFrame uses MultiIndex.
schema : string,default None
Ignored parameter included for compatability with sqlAlchemy
version of ``to_sql``.
chunksize : int,default None
If not None,then rows will be written in batches of this
size at a time. If None,all rows will be written at once.
dtype : dict of column name to sql type,default None
Optional specifying the datatype for columns. The sql type should
be a string.
"""
if dtype is not None:
for col, my_type in dtype.items():
if not isinstance(my_type, str):
raise ValueError('%s (%s) not a string' % (
col, str(my_type)))
table = sqliteTable(name, self, frame=frame, index=index,
if_exists=if_exists, index_label=index_label,
dtype=dtype)
table.create()
table.insert(chunksize)
def get_table(self, schema=None):
return None # not supported in fallback mode
def get_schema(frame, keys=None, con=None, dtype=None):
"""
Get the sql db table schema for the given frame.
Parameters
----------
frame : DataFrame
name : string
name of sql table
flavor : {'sqlite',but it
will be further supported through sqlAlchemy engines.
keys : string or sequence,default: None
columns to use a primary key
con: an open sql database connection object or a sqlAlchemy connectable
Using sqlAlchemy makes it possible to use any DB supported by that
library,default: None
If a DBAPI2 object,only sqlite3 is supported.
dtype : dict of column name to sql type,default None
Optional specifying the datatype for columns. The sql type should
be a sqlAlchemy type,or a string for sqlite3 fallback connection.
"""
pandas_sql = pandassql_builder(con=con, flavor=flavor)
return pandas_sql._create_sql_schema(frame, keys=keys, dtype=dtype)
def _set_search_path(schema, dbapi_connection, connection_record, connection_proxy):
cursor = dbapi_connection.cursor()
cursor.execute("SET search_path TO {};".format(schema))
dbapi_connection.commit()
cursor.close()
def _db_engine(conn_string, schema):
db_engine = sqlalchemy.create_engine(conn_string, json_serializer=Encoder().encode)
sqlalchemy.event.listens_for(db_engine, "engine_connect")(_ping_postgres)
sqlalchemy.event.listens_for(db_engine, "connect")(_record_pid)
sqlalchemy.event.listens_for(db_engine, "checkout")(_check_pid)
if schema:
sqlalchemy.event.listens_for(db_engine, "checkout")(_set_search_path(schema))
return db_engine
def _to_index(index, table=None, engine=None):
"""Return if instance of Index,else construct new with Metadata"""
if isinstance(index, sqlalchemy.Index):
return index
# Given: index name; table name required
table = _to_table(table, engine)
ret = sqlalchemy.Index(index)
ret.table = table
return ret
# python3: if we just use:
#
# class ColumnDelta(DictMixin,sqlalchemy.schema.SchemaItem):
# ...
#
# We get the following error:
# TypeError: Metaclass conflict: the Metaclass of a derived class must be a
# (non-strict) subclass of the Metaclasses of all its bases.
#
# The complete inheritance/Metaclass relationship list of ColumnDelta can be
# summarized by this following dot file:
#
# digraph test123 {
# ColumnDelta -> MutableMapping;
# MutableMapping -> Mapping;
# Mapping -> {Sized Iterable Container};
# {Sized Iterable Container} -> ABCMeta[style=dashed];
#
# ColumnDelta -> SchemaItem;
# SchemaItem -> {SchemaEventTarget Visitable};
# SchemaEventTarget -> object;
# Visitable -> {VisitableType object} [style=dashed];
# VisitableType -> type;
# }
#
# We need to use a Metaclass that inherits from all the Metaclasses of
# DictMixin and sqlalchemy.schema.SchemaItem. Let's call it "MyMeta".
def test_fk(self):
"""Can create columns with foreign keys"""
# create FK's target
reftable = Table('tmp_ref', ForeignKey(reftable.c.id, name='testfk'))
col.create(self.table)
# check if constraint is added
for cons in self.table.constraints:
if isinstance(cons,
col.foreign_keys[0].column.name)
if self.engine.name == 'MysqL':
constraint.ForeignKeyConstraint([self.table.c.data],
[reftable.c.id],
name='testfk').drop()
col.drop(self.table)
if self.engine.has_table(reftable.name):
reftable.drop()
def _actual_foreign_keys(self):
from sqlalchemy.schema import ForeignKeyConstraint
result = []
for cons in self.table.constraints:
if isinstance(cons,ForeignKeyConstraint):
col_names = []
for col_name in cons.columns:
if not isinstance(col_name,six.string_types):
col_name = col_name.name
col_names.append(col_name)
result.append(col_names)
result.sort()
return result
def render_class(self, model):
rendered = 'class {0}({1}):\n'.format(model.name, model.parent_name)
rendered += '{0}__tablename__ = {1!r}\n'.format(self.indentation, model.table.name)
# Render constraints and indexes as __table_args__
table_args = []
for constraint in sorted(model.table.constraints, UniqueConstraint)) and len(constraint.columns) == 1:
continue
table_args.append(self.render_constraint(constraint))
for index in model.table.indexes:
if len(index.columns) > 1:
table_args.append(self.render_index(index))
table_kwargs = {}
if model.schema:
table_kwargs['schema'] = model.schema
kwargs_items = ','.join('{0!r}: {1!r}'.format(key, table_kwargs[key]) for key in table_kwargs)
kwargs_items = '{{{0}}}'.format(kwargs_items) if kwargs_items else None
if table_kwargs and not table_args:
rendered += '{0}__table_args__ = {1}\n'.format(self.indentation, kwargs_items)
elif table_args:
if kwargs_items:
table_args.append(kwargs_items)
if len(table_args) == 1:
table_args[0] += ','
table_args_joined = ',\n{0}{0}'.format(self.indentation).join(table_args)
rendered += '{0}__table_args__ = (\n{0}{0}{1}\n{0})\n'.format(self.indentation, table_args_joined)
# Render columns
rendered += '\n'
for attr, column in model.attributes.items():
if isinstance(column, Column):
show_name = attr != column.name
rendered += '{0}{1} = {2}\n'.format(self.indentation, attr, show_name))
# Render relationships
if any(isinstance(value, Relationship) for value in model.attributes.values()):
rendered += '\n'
for attr, relationship in model.attributes.items():
if isinstance(relationship, Relationship):
rendered += '{0}{1} = {2}\n'.format(self.indentation, self.render_relationship(relationship))
# Render subclasses
for child_class in model.children:
rendered += self.model_separator + self.render_class(child_class)
return rendered
def alter_column(*p, **k):
"""Alter a column.
This is a helper function that creates a :class:`ColumnDelta` and
runs it.
:argument column:
The name of the column to be altered or a
:class:`ChangesetColumn` column representing it.
:param table:
A :class:`~sqlalchemy.schema.Table` or table name to
for the table where the column will be changed.
:param engine:
The :class:`~sqlalchemy.engine.base.Engine` to use for table
reflection and schema alterations.
:returns: A :class:`ColumnDelta` instance representing the change.
"""
if 'table' not in k and isinstance(p[0], sqlalchemy.Column):
k['table'] = p[0].table
if 'engine' not in k:
k['engine'] = k['table'].bind
# deprecation
if len(p) >= 2 and isinstance(p[1], sqlalchemy.Column):
warnings.warn(
"Passing a Column object to alter_column is deprecated."
" Just pass in keyword parameters instead.",
MigrateDeprecationWarning
)
engine = k['engine']
# enough tests seem to break when Metadata is always altered
# that this crutch has to be left in until they can be sorted
# out
k['alter_Metadata']=True
delta = ColumnDelta(*p, **k)
visitorcallable = get_engine_visitor(engine, 'schemachanger')
engine._run_visitor(visitorcallable, delta)
return delta
def test_drop_with_complex_foreign_keys(self):
from sqlalchemy.schema import ForeignKeyConstraint
from sqlalchemy.schema import UniqueConstraint
self.table.drop()
self.Meta.clear()
# create FK's target
reftable = Table('tmp_ref',
Column('jd', Integer),
UniqueConstraint('id','jd')
)
if self.engine.has_table(reftable.name):
reftable.drop()
reftable.create()
# add a table with a complex foreign key constraint
self.table = Table(
self.table_name,
Column('r1',
Column('r2',
ForeignKeyConstraint(['r1','r2'],
[reftable.c.id,reftable.c.jd],
name='test_fk')
)
self.table.create()
# paranoid check
self.assertEqual([['r1','r2']],
self._actual_foreign_keys())
# delete one
self.table.c.r2.drop()
# check the constraint is gone,since part of it
# is no longer there - if people hit this,
# they may be confused,maybe we should raise an error
# and insist that the constraint is deleted first,separately?
self.assertEqual([],
self._actual_foreign_keys())
def to_sql(frame,
index=True, index_label=None, dtype=None):
"""
Write records stored in a DataFrame to a sql database.
Parameters
----------
frame : DataFrame
name : string
Name of sql table
con : sqlAlchemy connectable(engine/connection) or database string URI
or sqlite3 DBAPI2 connection
Using sqlAlchemy makes it possible to use any DB supported by that
library.
If a DBAPI2 object,only sqlite3 is supported.
flavor : {'sqlite',default None
Name of sql schema in database to write to (if database flavor
supports this). If None,use default schema (default).
if_exists : {'fail',default 'fail'
- fail: If table exists,do nothing.
- replace: If table exists,and insert data.
- append: If table exists,insert data. Create if does not exist.
index : boolean,default True
Write DataFrame index as a column
index_label : string or sequence,default None
Column label for index column(s). If None is given (default) and
`index` is True,then the index names are used.
A sequence should be given if the DataFrame uses MultiIndex.
chunksize : int,default None
If not None,then rows will be written in batches of this size at a
time. If None,all rows will be written at once.
dtype : dict of column name to sql type,or a string for sqlite3 fallback connection.
"""
if if_exists not in ('fail', 'replace', 'append'):
raise ValueError("'{0}' is not valid for if_exists".format(if_exists))
pandas_sql = pandassql_builder(con, flavor=flavor)
if isinstance(frame, Series):
frame = frame.to_frame()
elif not isinstance(frame, DataFrame):
raise NotImplementedError("'frame' argument should be either a "
"Series or a DataFrame")
pandas_sql.to_sql(frame, if_exists=if_exists,
index_label=index_label,
chunksize=chunksize, dtype=dtype)
def to_sql(self, dtype=None):
"""
Write records stored in a DataFrame to a sql database.
Parameters
----------
frame : DataFrame
name : string
Name of sql table
if_exists : {'fail',default 'fail'
- fail: If table exists,do nothing.
- replace: If table exists,and insert data.
- append: If table exists,default None
Name of sql schema in database to write to (if database flavor
supports this). If specified,this overwrites the default
schema of the sqlDatabase object.
chunksize : int,then rows will be written in batches of this size at a
time. If None,default None
Optional specifying the datatype for columns. The sql type should
be a sqlAlchemy type.
"""
if dtype is not None:
from sqlalchemy.types import to_instance, TypeEngine
for col, my_type in dtype.items():
if not isinstance(to_instance(my_type), TypeEngine):
raise ValueError('The type of %s is not a sqlAlchemy '
'type ' % col)
table = sqlTable(name,
if_exists=if_exists,
schema=schema, dtype=dtype)
table.create()
table.insert(chunksize)
# check for potentially case sensitivity issues (GH7815)
engine = self.connectable.engine
with self.connectable.connect() as conn:
table_names = engine.table_names(
schema=schema or self.Meta.schema,
connection=conn,
)
if name not in table_names:
warnings.warn("The provided table name '{0}' is not found exactly "
"as such in the database after writing the table,"
"possibly due to case sensitivity issues. Consider "
"using lower case table names.".format(name),
UserWarning)
def objectify(self, dict_, context=None):
"""Extended to handle JSON properties."""
mapper = self.inspector
context = mapper.class_() if context is None else context
# If our schema and widgets wants pass us back full objects instead of theri dictified versions,let them pass through
if sqlalchemy.inspect(dict_, raiseerr=False) is not None:
return dict_
for attr in dict_:
if mapper.has_property(attr):
prop = mapper.get_property(attr)
if hasattr(prop, 'mapper'):
cls = prop.mapper.class_
value = dict_[attr]
if prop.uselist:
# Sequence of objects
if isinstance(value, ModelSetResultList):
# We kNow we do not need to try to convert these
pass
else:
# Try to map incoming colander items back to sql items
value = [self[attr].children[0].objectify(obj)
for obj in dict_[attr]]
else:
if hasattr(value, "__tablename__"):
# Raw sqlAlchemy object - do not try to convert
pass
else:
# Single object
if value:
value = self[attr].objectify(value)
else:
value = dict_[attr]
if value is colander.null:
# `colander.null` is never an appropriate
# value to be placed on an sqlAlchemy object
# so we translate it into `None`.
value = None
setattr(context, value)
elif hasattr(context, attr):
# Set any properties on the object which are not sqlAlchemy column based.
# These are JSONBProperty like user_data and password on the user model (actual column is called _password,but we mangle the password has before pushing it through)
value = dict_[attr]
setattr(context, value)
else:
# Ignore attributes if they are not mapped
logger.debug(
'sqlAlchemySchemaNode.objectify: %s not found on '
'%s. This property has been ignored.',
attr, self
)
continue
return context
def __init__(self, store,
read=True, write=True, read_through_write=True, delete=True,
create_db=False, create_schema=True):
upgrade_db=False
super(PostgresRepo, self).__init__(read=read, write=write,
read_through_write=read_through_write,
delete=delete)
if not isinstance(db, string_type) and schema is not None:
raise ValueError("You can only provide a schema with a DB url.")
init_db = False
if create_db and isinstance(db, string_type):
_create_db_if_needed(db)
init_db = True
upgrade_db = False
self._run = None
if isinstance(db, string_type):
if create_db:
init_db = True
self._db_engine = _db_engine(db, schema)
self._sessionmaker = sqlalchemy.orm.sessionmaker(bind=self._db_engine)
else:
self._session = db
if create_schema and schema is not None:
with self.session() as session:
q = sa.exists(sa.select([("schema_name")]).select_from(sa.text("information_schema.schemata"))
.where(sa.text("schema_name = :schema")
.bindparams(schema=schema)))
if not session.query(q).scalar():
session.execute(CreateSchema(schema))
session.commit()
init_db = True
upgrade_db = False
if init_db:
self._db_init()
if upgrade_db:
self._db_upgrade()
self.blobstore = store
def test_drop_with_complex_foreign_keys(self):
from sqlalchemy.schema import ForeignKeyConstraint
from sqlalchemy.schema import UniqueConstraint
self.table.drop()
self.Meta.clear()
# NOTE(mriedem): DB2 does not currently support unique constraints
# on nullable columns,so the columns that are used to create the
# foreign keys here need to be non-nullable for testing with DB2
# to work.
# create FK's target
reftable = Table('tmp_ref', nullable=False),
self._actual_foreign_keys())
# delete one
if self.engine.name == 'MysqL':
constraint.ForeignKeyConstraint([self.table.c.r1, self.table.c.r2],
[reftable.c.id, reftable.c.jd],
name='test_fk').drop()
self.table.c.r2.drop()
# check the constraint is gone,
self._actual_foreign_keys())