Python psycopg2 模块,connect() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用psycopg2.connect()。
def main():
db_name = os.environ['PGDATABASE']
connection_parameters = {
'host': os.environ['PGHOST'],
'database': 'postgres',
'user': os.environ['PGUSER'],
'password': os.environ['PGPASSWORD']
}
drop_statement = 'DROP DATABASE IF EXISTS {};'.format(db_name)
ddl_statement = 'CREATE DATABASE {};'.format(db_name)
conn = connect(**connection_parameters)
conn.autocommit = True
try:
with conn.cursor() as cursor:
cursor.execute(drop_statement)
cursor.execute(ddl_statement)
conn.close()
sys.stdout.write('Created database environment successfully.\n')
except psycopg2.Error:
raise SystemExit(
'Failed to setup Postgres environment.\n{0}'.format(sys.exc_info())
)
def create(self):
"""If not created,create a database with the name specified in
the constructor"""
conn = None
try:
conn = psycopg2.connect(database="postgres",
user=self.user,
password=self.passwd)
conn.set_isolation_level(ISOLEVEL)
cursor = conn.cursor()
cursor.execute("SELECT datname FROM pg_database WHERE " +
"datistemplate = false;")
fetch = cursor.fetchall()
dblist = [fetch[i][0] for i in range(len(fetch))]
if self.name not in dblist:
cursor.execute("CREATE DATABASE %s;" % self.name)
logger.debug("created databse %s", self.name)
except psycopg2.DatabaseError, e:
logger.warning("error creating database: %s", self.fmt_errmsg(e))
finally:
conn.close()
def handle(self, *args, **options):
self.stdout.write(self.style.SUCCESS('Starting Schema creation..'))
dbname = settings.DATABASES['default']['NAME']
user = settings.DATABASES['default']['USER']
password = settings.DATABASES['default']['PASSWORD']
host = settings.DATABASES['default']['HOST']
con = connect(dbname=dbname, user=user, host=host, password=password)
self.stdout.write(self.style.SUCCESS('Adding schema {schema} to database {dbname}'
.format(schema=settings.SCHEMA, dbname=dbname)))
con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cur = con.cursor()
cur.execute('CREATE SCHEMA {schema};'.format(schema=settings.SCHEMA))
cur.close()
con.close()
self.stdout.write(self.style.SUCCESS('All Done!'))
def handle(self, **options):
self.stdout.write(self.style.SUCCESS('Starting Schema deletion..'))
dbname = settings.DATABASES['default']['NAME']
user = settings.DATABASES['default']['USER']
password = settings.DATABASES['default']['PASSWORD']
host = settings.DATABASES['default']['HOST']
con = connect(dbname=dbname, password=password)
self.stdout.write(self.style.SUCCESS('Removing schema {schema} from database {dbname}'
.format(schema=settings.SCHEMA, dbname=dbname)))
con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cur = con.cursor()
cur.execute('DROP SCHEMA {schema} CASCADE;'.format(schema=settings.SCHEMA))
cur.close()
con.close()
self.stdout.write(self.style.SUCCESS('All Done.'))
def handle(self, **options):
self.stdout.write(self.style.SUCCESS('Starting DB creation..'))
dbname = settings.DATABASES['default']['NAME']
user = settings.DATABASES['default']['USER']
password = settings.DATABASES['default']['PASSWORD']
host = settings.DATABASES['default']['HOST']
self.stdout.write(self.style.SUCCESS('Connecting to host..'))
con = connect(dbname='postgres', password=password)
con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
self.stdout.write(self.style.SUCCESS('Creating database'))
cur = con.cursor()
cur.execute('CREATE DATABASE ' + dbname)
cur.close()
con.close()
self.stdout.write(self.style.SUCCESS('All done!'))
def resolve_user_or_group(self, old_id):
"""Resolve a user by its user id from old dudel."""
# connect to db
conn = psycopg2.connect(self.conn_string)
cursor = conn.cursor()
cursor.execute('SELECT username FROM "user" WHERE id=%s', (old_id,))
username = cursor.fetchone()
try:
if username:
return get_user_model().objects.get(username=username[0])
else:
cursor.execute('SELECT name FROM "group" WHERE id=%s',))
groupname = cursor.fetchone()
if groupname:
return Group.objects.get(name=groupname[0])
except ObjectDoesNotExist:
return None
def __init__(self, name, user, base, passwd=None, reconnect=False):
"""name: the name of the database to connect to
user: the username to use to connect
base: a file containing the sql implementation for Ravel's base
passwd: the password to connect to the database
reconnect: true to connect to an existing database setup,false
to load a new instance of Ravel's base into the database"""
self.name = name
self.user = user
self.passwd = passwd
self.base = base
self.cleaned = not reconnect
self._cursor = None
self._conn = None
if not reconnect and self.num_connections() > 0:
logger.warning("existing connections to database,skipping reinit")
self.cleaned = False
elif not reconnect:
self.init()
self.cleaned = True
def clean(self):
"""Clean the database of any existing Ravel components"""
# close existing connections
self.conn.close()
conn = None
try:
conn = psycopg2.connect(database="postgres",
password=self.passwd)
conn.set_isolation_level(ISOLEVEL)
cursor = conn.cursor()
cursor.execute("drop database %s" % self.name)
except psycopg2.DatabaseError, e:
logger.warning("error cleaning database: %s", self.fmt_errmsg(e))
finally:
if conn:
conn.close()
def connectToDB(dbname=None, userName=None, dbPassword=None, dbHost=None,
dbPort=None, dbCursor=psycopg2.extras.DictCursor):
'''
Connect to a specified Postgresql DB and return connection and cursor objects.
'''
# Start DB connection
try:
connectionString = "dbname='" + dbname + "'"
if userName != None and userName != '':
connectionString += " user='" + userName + "'"
if dbHost != None and dbHost != '':
connectionString += " host='" + dbHost + "'"
if dbPassword != None and dbPassword != '':
connectionString += " password='" + dbPassword + "'"
if dbPort != None:
connectionString += " port='" + str(dbPort) + "'"
connection = psycopg2.connect(connectionString)
register_adapter(numpy.float64, addapt_numpy_float64)
register_adapter(numpy.int64, addapt_numpy_int64)
except:
raise
# if the connection succeeded get a cursor
cursor = connection.cursor(cursor_factory=dbCursor)
return connection, cursor
def connect(self, **kwargs):
try:
self._conns
except AttributeError as e:
raise AttributeError(
"%s (did you forget to call ConnectingTestCase.setUp()?)"
% e)
if 'dsn' in kwargs:
conninfo = kwargs.pop('dsn')
else:
conninfo = dsn
import psycopg2
conn = psycopg2.connect(conninfo, **kwargs)
self._conns.append(conn)
return conn
def skip_if_tpc_disabled(f):
"""Skip a test if the server has tpc support disabled."""
@wraps(f)
def skip_if_tpc_disabled_(self):
from psycopg2 import ProgrammingError
cnn = self.connect()
cur = cnn.cursor()
try:
cur.execute("SHOW max_prepared_transactions;")
except ProgrammingError:
return self.skipTest(
"server too old: two phase transactions not supported.")
else:
mtp = int(cur.fetchone()[0])
cnn.close()
if not mtp:
return self.skipTest(
"server not configured for two phase transactions. "
"set max_prepared_transactions to > 0 to run the test")
return f(self)
return skip_if_tpc_disabled_
def test_concurrent_execution(self):
def slave():
cnn = self.connect()
cur = cnn.cursor()
cur.execute("select pg_sleep(4)")
cur.close()
cnn.close()
t1 = threading.Thread(target=slave)
t2 = threading.Thread(target=slave)
t0 = time.time()
t1.start()
t2.start()
t1.join()
t2.join()
self.assertTrue(time.time() - t0 < 7,
"something broken in concurrency")
def test_set_isolation_level_default(self):
conn = self.connect()
curs = conn.cursor()
conn.autocommit = True
curs.execute("set default_transaction_isolation to 'read committed'")
conn.autocommit = False
conn.set_isolation_level(ext.ISOLATION_LEVEL_SERIALIZABLE)
self.assertEqual(conn.isolation_level,
ext.ISOLATION_LEVEL_SERIALIZABLE)
curs.execute("show transaction_isolation")
self.assertEqual(curs.fetchone()[0], "serializable")
conn.rollback()
conn.set_isolation_level(ext.ISOLATION_LEVEL_DEFAULT)
curs.execute("show transaction_isolation")
self.assertEqual(curs.fetchone()[0], "read committed")
def test_isolation_level_read_committed(self):
cnn1 = self.connect()
cnn2 = self.connect()
cnn2.set_isolation_level(ext.ISOLATION_LEVEL_READ_COMMITTED)
cur1 = cnn1.cursor()
cur1.execute("select count(*) from isolevel;")
self.assertEqual(0, cur1.fetchone()[0])
cnn1.commit()
cur2 = cnn2.cursor()
cur2.execute("insert into isolevel values (10);")
cur1.execute("insert into isolevel values (20);")
cur2.execute("select count(*) from isolevel;")
self.assertEqual(1, cur2.fetchone()[0])
cnn1.commit()
cur2.execute("select count(*) from isolevel;")
self.assertEqual(2, cur2.fetchone()[0])
cur1.execute("select count(*) from isolevel;")
self.assertEqual(1, cur1.fetchone()[0])
cnn2.commit()
cur1.execute("select count(*) from isolevel;")
self.assertEqual(2, cur1.fetchone()[0])
def clear_test_xacts(self):
"""Rollback all the prepared transaction in the testing db."""
cnn = self.connect()
cnn.set_isolation_level(0)
cur = cnn.cursor()
try:
cur.execute(
"select gid from pg_prepared_xacts where database = %s",
(dbname,))
except psycopg2.ProgrammingError:
cnn.rollback()
cnn.close()
return
gids = [r[0] for r in cur]
for gid in gids:
cur.execute("rollback prepared %s;", (gid,))
cnn.close()
def test_tpc_commit(self):
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid)
self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_commit');")
self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records())
cnn.tpc_prepare()
self.assertEqual(cnn.status, ext.STATUS_PREPARED)
self.assertEqual(1, self.count_test_records())
cnn.tpc_commit()
self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts())
self.assertEqual(1, self.count_test_records())
def test_tpc_commit_one_phase(self):
cnn = self.connect()
xid = cnn.xid(1, ext.STATUS_BEGIN)
cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_commit_1p');")
self.assertEqual(0, self.count_test_records())
def test_tpc_commit_recovered(self):
cnn = self.connect()
xid = cnn.xid(1, ext.STATUS_BEGIN)
cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_commit_rec');")
self.assertEqual(0, self.count_test_records())
cnn.tpc_prepare()
cnn.close()
self.assertEqual(1, self.count_test_records())
cnn = self.connect()
xid = cnn.xid(1, "bqual")
cnn.tpc_commit(xid)
self.assertEqual(cnn.status, self.count_test_records())
def test_tpc_rollback(self):
cnn = self.connect()
xid = cnn.xid(1, ext.STATUS_BEGIN)
cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_rollback');")
self.assertEqual(0, self.count_test_records())
cnn.tpc_rollback()
self.assertEqual(cnn.status, self.count_test_records())
def test_tpc_rollback_recovered(self):
cnn = self.connect()
xid = cnn.xid(1, "bqual")
cnn.tpc_rollback(xid)
self.assertEqual(cnn.status, self.count_test_records())
def test_xid_roundtrip(self):
for fid, gtrid, bqual in [
(0, "", ""),
(42, "bqual"),
(0x7fffffff, "x" * 64, "y" * 64),
]:
cnn = self.connect()
xid = cnn.xid(fid, bqual)
cnn.tpc_begin(xid)
cnn.tpc_prepare()
cnn.close()
cnn = self.connect()
xids = [x for x in cnn.tpc_recover() if x.database == dbname]
self.assertEqual(1, len(xids))
xid = xids[0]
self.assertEqual(xid.format_id, fid)
self.assertEqual(xid.gtrid, gtrid)
self.assertEqual(xid.bqual, bqual)
cnn.tpc_rollback(xid)
def test_copy_from_segfault(self):
# issue #219
script = ("""\
import psycopg2
conn = psycopg2.connect(%(dsn)r)
curs = conn.cursor()
curs.execute("create table copy_segf (id int)")
try:
curs.execute("copy copy_segf from stdin")
except psycopg2.ProgrammingError:
pass
conn.close()
""" % {'dsn': dsn})
proc = Popen([sys.executable, '-c', script_to_py3(script)])
proc.communicate()
self.assertEqual(0, proc.returncode)
def notify(self, sec=0, payload=None):
"""Send a notification to the database,eventually after some time."""
if payload is None:
payload = ''
else:
payload = ",%r" % payload
script = ("""\
import time
time.sleep(%(sec)s)
import %(module)s as psycopg2
import %(module)s.extensions as ext
conn = psycopg2.connect(%(dsn)r)
conn.set_isolation_level(ext.ISOLATION_LEVEL_AUTOCOMMIT)
print conn.get_backend_pid()
curs = conn.cursor()
curs.execute("NOTIFY " %(name)r %(payload)r)
curs.close()
conn.close()
""" % {
'module': psycopg2.__name__,
'dsn': dsn, 'sec': sec, 'name': name, 'payload': payload})
return Popen([sys.executable, script_to_py3(script)], stdout=PIPE)
def connect(self, **kwargs):
try:
self._conns
except AttributeError, e:
raise AttributeError(
"%s (did you forget to call ConnectingTestCase.setUp()?)"
% e)
if 'dsn' in kwargs:
conninfo = kwargs.pop('dsn')
else:
conninfo = dsn
import psycopg2
conn = psycopg2.connect(conninfo, **kwargs)
self._conns.append(conn)
return conn
def skip_if_tpc_disabled(f):
"""Skip a test if the server has tpc support disabled."""
@wraps(f)
def skip_if_tpc_disabled_(self):
from psycopg2 import ProgrammingError
cnn = self.connect()
cur = cnn.cursor()
try:
cur.execute("SHOW max_prepared_transactions;")
except ProgrammingError:
return self.skipTest(
"server too old: two phase transactions not supported.")
else:
mtp = int(cur.fetchone()[0])
cnn.close()
if not mtp:
return self.skipTest(
"server not configured for two phase transactions. "
"set max_prepared_transactions to > 0 to run the test")
return f(self)
return skip_if_tpc_disabled_
def test_concurrent_execution(self):
def slave():
cnn = self.connect()
cur = cnn.cursor()
cur.execute("select pg_sleep(4)")
cur.close()
cnn.close()
t1 = threading.Thread(target=slave)
t2 = threading.Thread(target=slave)
t0 = time.time()
t1.start()
t2.start()
t1.join()
t2.join()
self.assert_(time.time() - t0 < 7,
"something broken in concurrency")
def test_set_isolation_level_default(self):
conn = self.connect()
curs = conn.cursor()
conn.autocommit = True
curs.execute("set default_transaction_isolation to 'read committed'")
conn.autocommit = False
conn.set_isolation_level(ext.ISOLATION_LEVEL_SERIALIZABLE)
self.assertEqual(conn.isolation_level, "read committed")
def test_isolation_level_read_committed(self):
cnn1 = self.connect()
cnn2 = self.connect()
cnn2.set_isolation_level(ext.ISOLATION_LEVEL_READ_COMMITTED)
cur1 = cnn1.cursor()
cur1.execute("select count(*) from isolevel;")
self.assertEqual(0, cur1.fetchone()[0])
def clear_test_xacts(self):
"""Rollback all the prepared transaction in the testing db."""
cnn = self.connect()
cnn.set_isolation_level(0)
cur = cnn.cursor()
try:
cur.execute(
"select gid from pg_prepared_xacts where database = %s",))
cnn.close()
def test_tpc_commit(self):
cnn = self.connect()
xid = cnn.xid(1, self.count_test_records())
def test_tpc_commit_recovered(self):
cnn = self.connect()
xid = cnn.xid(1, self.count_test_records())
def test_tpc_rollback(self):
cnn = self.connect()
xid = cnn.xid(1, self.count_test_records())
def test_tpc_rollback_one_phase(self):
cnn = self.connect()
xid = cnn.xid(1, ext.STATUS_BEGIN)
cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_rollback_1p');")
self.assertEqual(0, self.count_test_records())
def test_tpc_rollback_recovered(self):
cnn = self.connect()
xid = cnn.xid(1, self.count_test_records())
def test_xid_roundtrip(self):
for fid, bqual)
cnn.tpc_rollback(xid)
def test_unparsed_roundtrip(self):
for tid in [
'',
'hello,world!',
'x' * 199, # Postgresql's limit in transaction id length
]:
cnn = self.connect()
cnn.tpc_begin(tid)
cnn.tpc_prepare()
cnn.close()
cnn = self.connect()
xids = [x for x in cnn.tpc_recover() if x.database == dbname]
self.assertEqual(1, None)
self.assertEqual(xid.gtrid, tid)
self.assertEqual(xid.bqual, None)
cnn.tpc_rollback(xid)
def test_copy_from_segfault(self):
# issue #219
script = ("""\
import psycopg2
conn = psycopg2.connect(%(dsn)r)
curs = conn.cursor()
curs.execute("create table copy_segf (id int)")
try:
curs.execute("copy copy_segf from stdin")
except psycopg2.ProgrammingError:
pass
conn.close()
""" % {'dsn': dsn})
proc = Popen([sys.executable, proc.returncode)
def test_copy_to_segfault(self):
# issue #219
script = ("""\
import psycopg2
conn = psycopg2.connect(%(dsn)r)
curs = conn.cursor()
curs.execute("create table copy_segf (id int)")
try:
curs.execute("copy copy_segf to stdout")
except psycopg2.ProgrammingError:
pass
conn.close()
""" % {'dsn': dsn})
proc = Popen([sys.executable, stdout=PIPE)
proc.communicate()
self.assertEqual(0, proc.returncode)
def test_supported_keywords(self):
psycopg2.connect(database='foo')
self.assertEqual(self.args[0], 'dbname=foo')
psycopg2.connect(user='postgres')
self.assertEqual(self.args[0], 'user=postgres')
psycopg2.connect(password='secret')
self.assertEqual(self.args[0], 'password=secret')
psycopg2.connect(port=5432)
self.assertEqual(self.args[0], 'port=5432')
psycopg2.connect(sslmode='require')
self.assertEqual(self.args[0], 'sslmode=require')
psycopg2.connect(database='foo',
user='postgres', password='secret', port=5432)
self.assert_('dbname=foo' in self.args[0])
self.assert_('user=postgres' in self.args[0])
self.assert_('password=secret' in self.args[0])
self.assert_('port=5432' in self.args[0])
self.assertEqual(len(self.args[0].split()), 4)
def get_connection_params(self):
settings_dict = self.settings_dict
# None may be used to connect to the default 'postgres' db
if settings_dict['NAME'] == '':
raise ImproperlyConfigured(
"settings.DATABASES is improperly configured. "
"Please supply the NAME value.")
conn_params = {
'database': settings_dict['NAME'] or 'postgres',
}
conn_params.update(settings_dict['OPTIONS'])
conn_params.pop('isolation_level', None)
if settings_dict['USER']:
conn_params['user'] = settings_dict['USER']
if settings_dict['PASSWORD']:
conn_params['password'] = force_str(settings_dict['PASSWORD'])
if settings_dict['HOST']:
conn_params['host'] = settings_dict['HOST']
if settings_dict['PORT']:
conn_params['port'] = settings_dict['PORT']
return conn_params
def get_new_connection(self, conn_params):
connection = Database.connect(**conn_params)
# self.isolation_level must be set:
# - after connecting to the database in order to obtain the database's
# default when no value is explicitly specified in options.
# - before calling _set_autocommit() because if autocommit is on,that
# will set connection.isolation_level to ISOLATION_LEVEL_AUTOCOMMIT.
options = self.settings_dict['OPTIONS']
try:
self.isolation_level = options['isolation_level']
except KeyError:
self.isolation_level = connection.isolation_level
else:
# Set the isolation level to the value from OPTIONS.
if self.isolation_level != connection.isolation_level:
connection.set_session(isolation_level=self.isolation_level)
return connection
def _connect(self) -> None:
logger.info("initializing database connection:")
logger.info("host: %s", self.host)
logger.info("port: %s", self.port)
logger.info("dbname: %s", self.dbname)
try:
self.conn = psycopg2.connect(host=self.host,
port=self.port,
user=self.user,
password=self.password,
dbname=self.dbname,
connect_timeout=5)
self.conn.set_session(autocommit=True)
logger.info("successfully initialized database connection")
except psycopg2.Error as error:
logger.exception("unable to connect to database")
raise error
def from_environment(cls) -> Optional['PostgresDemoDatabase']:
host = os.environ.get("DEMO_POSTGRES_HOST")
port = os.environ.get("DEMO_POSTGRES_PORT") or "5432"
dbname = os.environ.get("DEMO_POSTGRES_dbnAME")
user = os.environ.get("DEMO_POSTGRES_USER")
password = os.environ.get("DEMO_POSTGRES_PASSWORD")
if all([host, port, dbname, password]):
try:
logger.info("Initializing demo database connection using environment variables")
return PostgresDemoDatabase(dbname=dbname, port=port, password=password)
except psycopg2.Error:
logger.exception("unable to connect to database,permalinks not enabled")
return None
else:
logger.info("Relevant environment variables not found,so no demo database")
return None
def uriDatabaseConnect(self, uri):
"""Create a connection from a uri and return a cursor of it."""
conninfo = uri.connectionInfo()
conn = None
cur = None
ok = False
while not conn:
try:
conn = psycopg2.connect(uri.connectionInfo())
cur = conn.cursor()
except psycopg2.OperationalError as e:
(ok, passwd) = QgsCredentials.instance().get(conninfo, uri.username(), uri.password())
if not ok:
break
if not conn:
QMessageBox.warning(self.iface.mainWindow(), "Connection Error", "Could not connect to Postgresql database - check connection info")
if ok:
QgsCredentials.instance().put(conninfo, passwd)
return cur
def __init__(self, config, db_config, web, plugin_path):
super().__init__()
self.db = db_connect(
host = db_config.get(c.DB_HOST_KEY),
user = db_config.get(c.DB_USER_KEY)
)
self.config = config
self.web = web
self.plugin_collection = PluginCollection(self, plugin_path)
self.token = self.config.get(c.GLOBIBOT_TOKEN_KEY)
self.masters = [
str(id) for id in
self.config.get(c.MASTER_IDS_KEY, [])
]
# self.enabled_servers = [
# str(id) for id in
# self.config.get(c.ENABLED_SERVERS_KEY,[])
# ]
self.web.add_routes('bot', *api.routes(self))
def run_sql_multiprocessing(args):
the_sql = args[0]
settings = args[1]
pg_conn = psycopg2.connect(settings['pg_connect_string'])
pg_conn.autocommit = True
pg_cur = pg_conn.cursor()
# # set raw gnaf database schema (it's needed for the primary and foreign key creation)
# if settings['raw_gnaf_schema'] != "public":
# pg_cur.execute("SET search_path = {0},public,pg_catalog".format(settings['raw_gnaf_schema'],))
try:
pg_cur.execute(the_sql)
result = "SUCCESS"
except Exception as ex:
result = "sql Failed! : {0} : {1}".format(the_sql, ex)
pg_cur.close()
pg_conn.close()
return result
def intermediate_shapefile_load_step(args):
work_dict = args[0]
settings = args[1]
# logger = args[2]
file_path = work_dict['file_path']
pg_table = work_dict['pg_table']
pg_schema = work_dict['pg_schema']
delete_table = work_dict['delete_table']
spatial = work_dict['spatial']
pg_conn = psycopg2.connect(settings['pg_connect_string'])
pg_conn.autocommit = True
pg_cur = pg_conn.cursor()
result = import_shapefile_to_postgres(pg_cur, file_path, pg_table, pg_schema, delete_table, spatial)
return result
# imports a Shapefile into Postgres in 2 steps: SHP > sql; sql > Postgres
# overcomes issues trying to use psql with PGPASSWORD set at runtime
def clear_static_routes(host, passwd):
# --------------????????----------------------
con = None
try:
con = psycopg2.connect(database='cloud_controller',
password=passwd, port=port)
cur = con.cursor()
cur.execute('delete from static_routes')
con.commit()
except psycopg2.DatabaseError as e:
if con:
con.rollback()
print('Error is %s' % e)
finally:
if con:
con.close()
def update_redirect_url(host, passwd, domain_name):
# --------------????????----------------------
con = None
try:
con = psycopg2.connect(database='uaa', port=port)
cur = con.cursor()
cur.execute("update oauth_client_details set"
" web_server_redirect_uri='http://uaa.cloudfoundry.com/redirect/vmc,"
"https://uaa.cloudfoundry.com/redirect/vmc,"
"http://uaa.%s/redirect/vmc,https://uaa.%s/redirect/vmc'"
" where client_id in ('simple','vmc')" % (domain_name, domain_name))
con.commit()
except psycopg2.DatabaseError as e:
if con:
con.rollback()
print('Error is %s' % e)
finally:
if con:
con.close()
def install_extensions(extensions, **connection_parameters):
"""Install Postgres extension if available.
Notes
-----
- superuser is generally required for installing extensions.
- Currently does not support specific schema.
"""
from postpy.connections import connect
conn = connect(**connection_parameters)
conn.autocommit = True
for extension in extensions:
install_extension(conn, extension)