Postgresql 分区管理与 sqlalchemy 和烧瓶迁移

问题描述

我正在开发一个多租户系统,其中任意数量的租户都可以注册到系统并管理他们的数据。 每当租户注册时,就会创建一个新的表分区,分区名称以他的 id 结尾,并且他将管理 company_id 等于他的租户 id 的数据。还有一些不需要分区的所有租户通用的表。 使用以下代码/create/company 路由中创建分区

        ...
        db.session.add(company)
        db.session.commit()
        # tables to be partitioned
        for table_name in ['export_import_data_rel','fields_company_rel','export','import_data','import_logs']:
            db.engine.execute("CREATE TABLE {table_name}_{company_id} 
                               PARTITION OF {table_name}
                               FOR VALUES IN ('{company_id}');".format(table_name=table_name,company_id=company.id))

这可以很好地为每个租户创建新分区。但是当我执行 flask db migrateflask db upgrade 时,租户注册时创建的那些分区被检测为已删除的表,因为没有提到分区(如 import_data_1 或 {{ 1}}) 在 import_data_2 中。

所以我试图在 application/models.py 中创建分区和分区表,以便它们也被检测到。我正在使用 this 问题中提到的 mixin 类和元类来完成它。

在我的 application/models.py 中,以前是什么

application/__init__.py

现在变成

from flask_sqlalchemy import sqlAlchemy
db = sqlAlchemy() 

这是我的 from sqlalchemy.ext.declarative import declarative_base from flask_sqlalchemy.model import Model from sqlalchemy.ext.declarative import DeclarativeMeta class PartitionByMeta(DeclarativeMeta): def __new__(cls,clsname,bases,attrs): @classmethod def get_partition_name(cls_,key): return f'{cls_.__tablename__}_{key}' @classmethod def create_partition(cls_,key,_partition_type="LIST",_partition_range=(),*args,**kwargs): if key not in cls_.partitions: Partition = type( f'{clsname}{key}',# Class name,only used internally bases,{'__tablename__': cls_.get_partition_name(key)} ) Partition.__table__.add_is_dependent_on(cls_.__table__) event.listen( Partition.__table__,'after_create',DDL( f""" ALTER TABLE {cls_.__tablename__} ATTACH PARTITION {Partition.__tablename__} FOR VALUES IN ('{_partition_range}'); """ ) if _partition_type == "LIST" else DDL( f""" ALTER TABLE {cls_.__tablename__} ATTACH PARTITION {Partition.__tablename__} FOR VALUES FROM ('{_partition_range[0]}') TO ('{_partition_range[1]}'); """ ) ) cls_.partitions[key] = Partition return cls_.partitions[key] attrs.update( { # tables to be partitioned will be sending it in __table_args__,so I'm commenting __table_args__ and partitioned_by # '__table_args__': attrs.get('__table_args__',()) + (dict(postgresql_partition_by=f'RANGE({partition_by})'),),# 'partitioned_by': partition_by,'partitions': {},'get_partition_name': get_partition_name,'create_partition': create_partition } ) return super().__new__(cls,attrs) db = sqlAlchemy(model_class=declarative_base(cls=Model,Metaclass=PartitionByMeta,name='Model')) 中需要按 application/models.py 字段分区的表的示例

company_id

将使用 mixin 和元类重写

class ImportData(db.Model):
    id = db.Column(UUID(as_uuid=True),default=uuid.uuid4,nullable=False,index=True)
    company_id = db.Column(db.Integer,db.ForeignKey('res_company.id'),index=True)
    export_ids = db.relationship(
        'Export',secondary=export_import_data_rel,lazy='dynamic',backref=db.backref('import'),primaryjoin="and_(ImportData.id == foreign(export_import_data_rel.c.res_id),ImportData.company_id == foreign(export_import_data_rel.c.company_id))",secondaryjoin="and_(Export.id == foreign(export_import_data_rel.c.export_id),Export.company_id == export_import_data_rel.c.company_id)")
    business_name = db.Column(db.String(),index=True)
    phone = db.Column(db.String(),index=True)
    address = db.Column(db.String(),index=True)
    upserted_by = db.Column(db.Integer,db.ForeignKey('res_users.id'))
    upsert_date = db.Column(db.DateTime,default=datetime.utcNow)

    __table_args__ = (PrimaryKeyConstraint('id','company_id'),{"postgresql_partition_by": 'LIST (company_id)'})

现在我正在尝试为 class ImportDataMixin: id = db.Column(UUID(as_uuid=True),index=True) business_name = db.Column(db.String(),index=True) upsert_date = db.Column(db.DateTime,default=datetime.utcNow) @declared_attr def company_id(cls): return db.Column(db.Integer,index=True) @declared_attr def upserted_by(cls): return db.Column(db.Integer,db.ForeignKey('res_users.id')) @declared_attr def export_ids(cls): return db.relationship( 'Export',Export.company_id == export_import_data_rel.c.company_id)") __table_args__ = (PrimaryKeyConstraint('id',{"postgresql_partition_by": 'LIST (company_id)'}) class ImportData(ImportDataMixin,db.Model,Metaclass=PartitionByMeta): __tablename__ = 'import_data' 添加 2 个分区

import_data

我为新数据库提供了 for i in range(0,2): Partition = ImportData.create_partition(key=i + 1,_partition_range=(i + 1)) flask db migrate。但它不起作用;创建的架构是

flask db upgrade

import_data_2 也是如此。

我还有其他没有定义类的表,例如

CREATE TABLE public.import_data_1 (
    id uuid NOT NULL,business_name character varying,phone character varying,address character varying,upsert_date timestamp without time zone,company_id integer NOT NULL,upserted_by integer
)
PARTITION BY LIST (company_id);

为了为这些表创建分区,我尝试了

export_import_data_rel = db.Table('export_import_data_rel',db.Column('company_id',db.Integer,primary_key=True),db.Column('export_id',UUID(as_uuid=True),db.Column('res_id',ForeignKeyConstraint(['export_id','company_id'],['export.id','export.company_id'],ondelete="CASCADE"),ForeignKeyConstraint(['res_id',['import_data.id','import_data.company_id'],postgresql_partition_by='LIST (company_id)')

但它不会创建任何表。

所以我的问题是

  1. 如何正确地对表进行分区,以免使用flask db migrate 和upgrade 命令删除分区?这种方式甚至可能吗?每次我重新启动服务器时,要分区的代码都会运行吗?还有其他方法吗?
  2. 如何为没有类定义的表添加分区?

感谢任何帮助。谢谢

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)