Python sqlalchemy 模块,types() 实例源码
我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用sqlalchemy.types()。
def _get_dtype(self, sqltype):
from sqlalchemy.types import (Integer, Float, Boolean, DateTime,
Date, TIMESTAMP)
if isinstance(sqltype, Float):
return float
elif isinstance(sqltype, Integer):
# Todo: Refine integer size.
return np.dtype('int64')
elif isinstance(sqltype, TIMESTAMP):
# we have a timezone capable type
if not sqltype.timezone:
return datetime
return DatetimeTZDtype
elif isinstance(sqltype, DateTime):
# Caution: np.datetime64 is also a subclass of np.number.
return datetime
elif isinstance(sqltype, Date):
return date
elif isinstance(sqltype, Boolean):
return bool
return object
def get_columns(self, connection, table_name, schema=None, **kw):
q = "SELECT * FROM `%(table_id)s` LIMIT 0" % ({"table_id": table_name})
columns = connection.execute(q)
result = []
for column_name in columns.keys():
# Todo Handle types better
column = {
"name": column_name,
"type": VARCHAR,
"default": None,
"autoincrement": None,
"nullable": False,
}
result.append(column)
return result
def __init__(self, table):
super(Model, self).__init__()
self.table = table
self.schema = table.schema
# Adapt column types to the most reasonable generic types (ie. VARCHAR -> String)
for column in table.columns:
cls = column.type.__class__
for supercls in cls.__mro__:
if hasattr(supercls, '__visit_name__'):
cls = supercls
if supercls.__name__ != supercls.__name__.upper() and not supercls.__name__.startswith('_'):
break
column.type = column.type.adapt(cls)
def render_column_type(coltype):
args = []
if isinstance(coltype, Enum):
args.extend(repr(arg) for arg in coltype.enums)
if coltype.name is not None:
args.append('name={0!r}'.format(coltype.name))
else:
# All other types
argspec = _getargspec_init(coltype.__class__.__init__)
defaults = dict(zip(argspec.args[-len(argspec.defaults or ()):], argspec.defaults or ()))
missing = object()
use_kwargs = False
for attr in argspec.args[1:]:
# Remove annoyances like _warn_on_bytestring
if attr.startswith('_'):
continue
value = getattr(coltype, attr, missing)
default = defaults.get(attr, missing)
if value is missing or value == default:
use_kwargs = True
elif use_kwargs:
args.append('{0}={1}'.format(attr, repr(value)))
else:
args.append(repr(value))
rendered = coltype.__class__.__name__
if args:
rendered += '({0})'.format(','.join(args))
return rendered
def LIMIT1_get_columns(self, **kw):
# This method stinks at getting columns from a speed perspective,because it's slow (it has to process a query)
# However,it's accurate,because it takes a look at the data,and provides a good representation of the types it saw. (for 1 record,not idea...)
q = "SELECT * FROM %(table_id)s LIMIT 1" % ({"table_id": table_name})#
# q = "DESCRIBE %(table_id)s" % ({"table_id": table_name})
cursor = connection.execute(q)
desc = cursor.cursor.getdesc()
result = []
for col in desc:
cname = col[0]
bisnull = True
ctype = _type_map[col[1]]
column = {
"name": cname,
"type": ctype,
"nullable": bisnull,
}
result.append(column)
return(result)
def LIMIT1_get_columns(self,
}
result.append(column)
return(result)
def _harmonize_columns(self, parse_dates=None):
"""
Make the DataFrame's column types align with the sql table
column types.
Need to work around limited NA value support. Floats are always
fine,ints must always be floats if there are Null values.
Booleans are hard because converting bool column with None replaces
all Nones with false. Therefore only convert bool if there are no
NA values.
Datetimes should already be converted to np.datetime64 if supported,
but here we also force conversion if required
"""
# handle non-list entries for parse_dates gracefully
if parse_dates is True or parse_dates is None or parse_dates is False:
parse_dates = []
if not hasattr(parse_dates, '__iter__'):
parse_dates = [parse_dates]
for sql_col in self.table.columns:
col_name = sql_col.name
try:
df_col = self.frame[col_name]
# the type the dataframe column should have
col_type = self._get_dtype(sql_col.type)
if (col_type is datetime or col_type is date or
col_type is DatetimeTZDtype):
self.frame[col_name] = _handle_date_column(df_col)
elif col_type is float:
# floats support NA,can always convert!
self.frame[col_name] = df_col.astype(col_type, copy=False)
elif len(df_col) == df_col.count():
# No NA values,can convert ints and bools
if col_type is np.dtype('int64') or col_type is bool:
self.frame[col_name] = df_col.astype(
col_type, copy=False)
# Handle date parsing
if col_name in parse_dates:
try:
fmt = parse_dates[col_name]
except TypeError:
fmt = None
self.frame[col_name] = _handle_date_column(
df_col, format=fmt)
except KeyError:
pass # this column not in results
def _sqlalchemy_type(self, col):
dtype = self.dtype or {}
if col.name in dtype:
return self.dtype[col.name]
col_type = self._get_notnull_col_dtype(col)
from sqlalchemy.types import (BigInteger, Integer,
Text,
DateTime, Date, Time)
if col_type == 'datetime64' or col_type == 'datetime':
try:
tz = col.tzinfo # noqa
return DateTime(timezone=True)
except:
return DateTime
if col_type == 'timedelta64':
warnings.warn("the 'timedelta' type is not supported,and will be "
"written as integer values (ns frequency) to the "
"database.", UserWarning, stacklevel=8)
return BigInteger
elif col_type == 'floating':
if col.dtype == 'float32':
return Float(precision=23)
else:
return Float(precision=53)
elif col_type == 'integer':
if col.dtype == 'int32':
return Integer
else:
return BigInteger
elif col_type == 'boolean':
return Boolean
elif col_type == 'date':
return Date
elif col_type == 'time':
return Time
elif col_type == 'complex':
raise ValueError('Complex datatypes not supported')
return Text
def to_sql(self, frame, name, if_exists='fail', index=True,
index_label=None, chunksize=None, dtype=None):
"""
Write records stored in a DataFrame to a sql database.
Parameters
----------
frame : DataFrame
name : string
Name of sql table
if_exists : {'fail','replace','append'},default 'fail'
- fail: If table exists,do nothing.
- replace: If table exists,drop it,recreate it,and insert data.
- append: If table exists,insert data. Create if does not exist.
index : boolean,default True
Write DataFrame index as a column
index_label : string or sequence,default None
Column label for index column(s). If None is given (default) and
`index` is True,then the index names are used.
A sequence should be given if the DataFrame uses MultiIndex.
schema : string,default None
Name of sql schema in database to write to (if database flavor
supports this). If specified,this overwrites the default
schema of the sqlDatabase object.
chunksize : int,default None
If not None,then rows will be written in batches of this size at a
time. If None,all rows will be written at once.
dtype : dict of column name to sql type,default None
Optional specifying the datatype for columns. The sql type should
be a sqlAlchemy type.
"""
if dtype is not None:
from sqlalchemy.types import to_instance, TypeEngine
for col, my_type in dtype.items():
if not isinstance(to_instance(my_type), TypeEngine):
raise ValueError('The type of %s is not a sqlAlchemy '
'type ' % col)
table = sqlTable(name, self, frame=frame, index=index,
if_exists=if_exists, index_label=index_label,
schema=schema, dtype=dtype)
table.create()
table.insert(chunksize)
# check for potentially case sensitivity issues (GH7815)
engine = self.connectable.engine
with self.connectable.connect() as conn:
table_names = engine.table_names(
schema=schema or self.Meta.schema,
connection=conn,
)
if name not in table_names:
warnings.warn("The provided table name '{0}' is not found exactly "
"as such in the database after writing the table,"
"possibly due to case sensitivity issues. Consider "
"using lower case table names.".format(name),
UserWarning)