1 Elasticsearch数据库

1.1 环境配置

安装环境

pip install elasticsearch==7.6.0

1.2 EsDao包装类

# -- coding: utf-8 --

"""
@version: v1.0
@author: huangyc
@file: EsDao.py
@Description: Es统一操作类
@time: 2020/4/27 10:22
"""

from elasticsearch.helpers import bulk
from elasticsearch import Elasticsearch
import pandas as pd


class EsDao(object):
    """
    ElasticSearch的数据操作类
    """
    # 查询批次大小
    DEFAULT_BATCH_SIZE = 1000

    # 写入批次大小
    BULK_BATCH_SIZE = 10000

    def __init__(self, hosts, timeout=3600*24):
        self.hosts = hosts
        self.timeout = timeout
        self.es = Elasticsearch(hosts, timeout=self.timeout)

    def save_data_list(self, index_name, data_list):
        """
        保存数据列表到es的指定索引中
        :param index_name: 索引名称
        :param data_list: 数据列表,列表元素代表一行数据,元素类型为dict
        :return:
        """
        bulk_data_lst = [
            data_list[i:i + self.BULK_BATCH_SIZE]
            for i in range(0, len(data_list), self.BULK_BATCH_SIZE)
        ]

        if len(data_list) > 0 and '_id' in data_list[0]:
            for bulk_data in bulk_data_lst:
                actions = [{
                    "_index": index_name,
                    "_type": index_name,
                    "_id": data.pop("_id"),
                    "_source": data
                }
                    for data in bulk_data
                ]
                bulk(self.es, actions, index=index_name, raise_on_error=True)
        else:
            for bulk_data in bulk_data_lst:
                actions = [{
                    "_index": index_name,
                    "_type": index_name,
                    "_source": data
                }
                    for data in bulk_data
                ]
                bulk(self.es, actions, index=index_name, raise_on_error=True)

    def is_index_exists(self, index_name):
        """
        判断指定索引是否存在
        :param index_name: 索引名称
        :return:
        """
        return self.es.indices.exists(index=index_name)

    def delete_by_query(self, index_name, query_body):
        """
        按查询结果删除数据
        :param index_name:
        :param query_body:
        :return:
        """
        return self.es.delete_by_query(index_name, query_body)

    def clear_index_data(self, index_name):
        """
        清空指定索引的数据
        :param index_name:
        :return:
        """
        return self.delete_by_query(
            index_name=index_name,
            query_body={
                "query": {
                    "match_all": {}
                }
            }
        )

    def save_df_data(self, index_name, df):
        """
        保存pandas的DataFrame到es的指定索引中
        :param index_name: 索引名称
        :param df: 要保存的dataframe
        :return:
        """
        col_lst = df.columns.tolist()
        dic_lst = [dict([(c, v) for c, v in zip(col_lst, r)]) for r in df.values.tolist()]
        self.save_data_list(index_name=index_name, data_list=dic_lst)

    def create_index(self, index_name, mapping_properties):
        """
        创建索引
        :param index_name: 索引名称
        :param mapping_properties: 索引mapping中的属性列表
        :return:
        """
        if not self.es.indices.exists(index=index_name):
            mapping = {
                "mappings": {
                    index_name: {
                        "properties": mapping_properties
                    }
                }
            }
            res = self.es.indices.create(index=index_name, body=mapping)
            if res is not None and 'acknowledged' in res:
                return res.get('acknowledged')
        return False

    def _search_with_scroll(self, index_name, query_body):
        if "size" not in query_body:
            query_body["size"] = self.DEFAULT_BATCH_SIZE
        response = self.es.search(
            index=index_name,
            body=query_body,
            search_type="dfs_query_then_fetch",
            scroll="120m",
            timeout="60m"
        )
        scroll_id = response["_scroll_id"]
        while True:
            sources = [doc["_source"] for doc in response["hits"]["hits"]]
            if len(sources) == 0:
                break
            yield sources
            response = self.es.scroll(scroll_id=scroll_id, scroll="60m")

    def query_for_df(self, index_name, query_body):
        """
        执行查询并获取pandas.DataFrame格式的返回值
        :param index_name: 索引名称
        :param query_body: 查询条件
        :return:
        """
        sources = []
        for sub_source in self._search_with_scroll(index_name=index_name, query_body=query_body):
            sources.extend(sub_source)
        return pd.DataFrame(sources)

    def query_for_df_with_batch(self, index_name, query_body, batch_size=DEFAULT_BATCH_SIZE):
        """
        按批次大小查询并返回pandas.DataFrame的generator格式的返回值
        :param index_name: 索引名称
        :param query_body: 查询条件
        :param batch_size: 批次大小
        :return:
        """
        if "size" not in query_body:
            query_body["size"] = batch_size
        for sub_source in self._search_with_scroll(index_name=index_name, query_body=query_body):
            yield pd.DataFrame(sub_source)

    def get_first_row_with_df(self, index_name):
        """
        获取指定索引的首行数据,格式为pandas.DataFrame
        可用于获取索引的元信息
        :param index_name: 索引名称
        :return:
        """
        query_body = {
            "size": 1,
            "query": {
                "match_all": {}
            }
        }
        for sub_source in self._search_with_scroll(index_name=index_name, query_body=query_body):
            return pd.DataFrame(sub_source)

1.3 使用案例

class TaskMeta:
    '''
    数据元类
    '''
    def __init__(self, text, doc_id, sentence_id, reg_lst, flag, has_reg, text_source="primitive"):
        self.text = text
        self.doc_id = doc_id
        self.sentence_id = sentence_id
        self.reg_lst = reg_lst
        self.flag = flag
        self.has_reg = has_reg
        self.text_source = text_source

    def __repr__(self):
        return f'{self.text} {self.doc_id} {self.sentence_id} {self.reg_lst} {self.flag} {self.has_reg} {self.text_source}'

    def to_dict(self):
        return {"text": self.text,
                "doc_id": self.doc_id,
                "sentence_id": self.sentence_id,
                "reg_lst": self.reg_lst,
                "flag": self.flag,
                "has_reg": self.has_reg,
                "text_source": self.text_source}
def create_index(target_es_dao, index_name, mapping):
    '''
    创建es索引
    :return: 是否创建成功
    '''
    if not target_es_dao.is_index_exists(index_name):
        target_es_dao.create_index(index_name, mapping)
    else:
        target_es_dao.clear_index_data(index_name)
        print(f"索引{index_name}已存在, 已清除数据")

def writer_fun(target_es_dao, target_index, sample_lst):
    '''
    写数据到es库
    '''
    df_sample_lst = []
    [df_sample_lst.append(sample.to_dict()) for sample in sample_lst]
    df_sample_lst = pd.DataFrame(df_sample_lst)
    target_es_dao.save_df_data(target_index, df_sample_lst)
    print(f'写入数据{len(sample_lst)}条')

def es_cal_test():
    # 获取连接
    source_es_dao = EsDao(f"http://{aug_config.SOURCE_IP}:{aug_config.SOURCE_PORT}/")
    query_condition = {
        "query_string": {
            "default_field": "has_reg",
            "query": "true"
        }
    }
    query_body = {
        "query": query_condition
    }
    # 查询数据
    datas = source_es_dao.query_for_df(index_name=aug_config.SOURCE_INDEX, query_body=query_body)
    records = datas.to_dict(orient='record')
    sample_lst = []
    for record in records:
        sample_lst.append(
            TaskMeta(
                text=record["text"],
                doc_id=record["doc_id"],
                sentence_id=record["sentence_id"],
                reg_lst=record["reg_lst"],
                flag=record["flag"],
                has_reg=record["has_reg"]
            )
        )

    # 创建索引
    create_index(target_es_dao, aug_config.TARGET_INDEX, aug_config.MAPPING)
    # 写入数据
    writer_fun(target_es_dao, aug_config.TARGET_INDEX, sample_lst=sample_lst)

if __name__ == '__main__':
    es_cal_test()

2 Oracle数据库

Python操作Oracle数据库:cx_Oracle

2.1 环境配置

Linux上Python连接Oracle解决报错cx_Oracle.DatabaseError: DPI-1047

  1. 安装库

    pip install cx-Oracle
    
  2. 链接库准备,需要将oci.dll、oraocci11.dll、oraociei11.dll复制到sitepackages路径下,oracle client下载链接,并配置到系统环境变量,链接中没有的自己去官网(win64所有平台linux64)注册一个账号下载对应的版本

    -- 查看oracle版本
    SELECT * FROM v$version;
    

    没有配置会报如下的错:

    # Windows下报错
    cx_Oracle.DatabaseError: DPI-1047: Cannot locate a 64-bit Oracle Client library: "D:\software\win_or
    
    # Linux下报错
    cx_Oracle.DatabaseError: DPI-1047: Cannot locate a 64-bit Oracle Client library: "libclntsh.so: cannot open shared object file: No such file or directory". See https://cx-oracle.readthedocs.io/en/latest/user_guide/installation.html for help
    
    • windows下安装完客户端后,配置oracle客户端的环境变量

      D:\software\win_oracle_dlls\instantclient_11_2
      
    • linux下可以使用rpm安装包安装

      sudo rpm -ivh oracle-instantclient11.2-basic-11.2.0.4.0-1.x86_64.rpm
      

      然后将环境变量配置到/etc/profile

      # 配置oracle客户端
      export ORACLE_CLIENT_HOME=/lib/oracle/11.2/client64
      export TNS_ADMIN=$ORACLE_CLIENT_HOME
      export LD_LIBRARY_PATH=$ORACLE_CLIENT_HOME/lib
      export ORABIN=$ORACLE_CLIENT_HOME/bin
      
      PATH=$PATH:$ORABIN
      export PATH
      
      export PATH=$ORACLE_HOME:$PATH
      export PATH=$PATH:$HOME/bin:$ORACLE_CLIENT_HOME/bin
      

      其他类似找不到libclntsh.sod的错误,如果出现这个错误,请进行软连接挂载文件,让系统的路径能正确的获取到该文件,操作如下:

      sudo sh -c "/usr/lib/oracle/instantclient_11_1 > /etc/ld.so.conf.d/oracle-instantclient.conf"
      
      sudo ldconfig
      

2.2 sql基础

2.2.1 建表

--blob字段插入实例
create table blob_table_tmp(
  id number primary key,
  blob_cl blob not null,
    clob_cl clob not null
);
insert into blob_table_tmp values(1,rawtohex('11111000011111'),'增加一条记录时,碰到插入blob类型数据出错');
insert into blob_table_tmp values(3,rawtohex('4561888'),'增加一条记录时,碰到插入blob类型数据出错');
insert into blob_table_tmp values(4,rawtohex('增加一条记录时333'),'增加一条记录时,碰到插入blob类型数据出错');

2.2.2 查询

获取连接

FINANCE_DB_HOST = "192.168.x.x"
FINANCE_DB_PORT = 1521
FINANCE_DB_USER = "hyc"
FINANCE_DB_PASSWORD = "123456"
FINANCE_DB_DB = "ORCL"

class OracleConn():
    config_path = ''
    @staticmethod
    def get_conn(conn_name, encoding="UTF-8"):
        conn_str = str(eval("%s_DB_USER" % (OracleConn.config_path, conn_name))) + "/" + str(eval("%s.%s_DB_PASSWORD" % (OracleConn.config_path, conn_name)))
        conn_str += "@" + str(eval("%s_DB_HOST" % (OracleConn.config_path, conn_name)))
        conn_str += ":" + str(eval("%s_DB_PORT" % (OracleConn.config_path, conn_name)))
        conn_str += "/" + str(eval("%s_DB_DB" % (OracleConn.config_path, conn_name)))
        return ora.connect(conn_str, encoding=encoding, nencoding=encoding)

读写数据库

def oracle_test():
    # 获取数据库连接
    conn = OracleConn.get_conn("FINANCE")
    cur = conn.cursor()

    # 查询数据
    sql = "select id,blob_cl,clob_cl from FINANCE.blob_table_tmp"
    datas = []
    r = cur.execute(sql)
    # 假设name是clob字段类型
    [datas.append((gg[0], gg[1].read().decode('utf-8'), gg[2].read())) for gg in r]

    # 写入数据
    insert_sql = "INSERT INTO new_table(id,new_name) VALUES (:ID,:NEW_NAME)"
    res = []
    [res.append((data[0], data[1])) for data in datas]
    cur.executemany(insert_sql, res)
    cur.execute('commit')

    cur.close()
    conn.close()
    print("写入结束")


if __name__ == '__main__':
    oracle_test()

2.3 相关操作

关于数据库的连接,查询和写入

import cx_Oracle

class Setting:
    DB_USER = 'narutohyc'
    DB_PASSWORD = 'hyc'
    DB_IP = '192.168.0.1'
    DB_PORT = ''
    DB_SERVICE = 'dataBaseName'
setting = Setting()

def oracle_test():
    # 获取数据库连接
    conn = cx_Oracle.connect('%s/%s@%s/%s' % (setting.DB_USER, setting.DB_PASSWORD, setting.DB_IP, setting.DB_SERVICE), encoding='utf-8')
    cur = conn.cursor()

    # 查询数据
    sql = "select ID, name from hyc_database"
    datas = []
    r = cur.execute(sql)
    # 假设name是clob字段类型
    [datas.append((gg[0], gg[1].read())) for gg in r]

    # 写入数据
    insert_sql = "INSERT INTO new_table(id,new_name) VALUES (:ID,:NEW_NAME)"
    res = []
    [res.append((data[0], data[1])) for data in datas]
    cur.executemany(insert_sql, res)
    cur.execute('commit')

    cur.close()
    conn.close()
    print("写入结束")

if __name__ == '__main__':
    oracle_test()

3 Postgresql数据库

官方文档DocumentationPostgreSQL 16

  1. 查询
  2. 数据类型

我终于学会了使用python操作postgresql

保姆级 CentOS 7离线安装PostgreSQL 14教程

易百_PostgreSQL教程

3.1 离线安装数据库

先从centos7-pg_14.2下载下载rpm包(微云下载centos7.6_PostgreSQL14.2),或者直接官方下载安装教程安装,如果离线安装就下载rpm包

# 离线安装执行以下命令安装
rpm -ivh postgresql14-libs-14.2-1PGDG.rhel7.x86_64.rpm
rpm -ivh postgresql14-14.2-1PGDG.rhel7.x86_64.rpm
rpm -ivh postgresql14-server-14.2-1PGDG.rhel7.x86_64.rpm
rpm -ivh postgresql14-contrib-14.2-1PGDG.rhel7.x86_64.rpm

出现OSError: Python library not found: libpython3.6mu.so.1.0, libpython3.6m.so.1.0, libpython3.6.so.1.0, libpython3.6m.so的解决办法

yum install python3-devel

创建数据库data和log文件夹

# 创建数据库data和log文件夹
mkdir -p /home/postgres/pgsql_data
mkdir -p /home/postgres/pgsql_log

# 创建日志文件
touch /home/postgres/pgsql_log/pgsql.log

授权给安装数据时自动创建的postgres用户

chown -R postgres:postgres /home/postgres/pgsql_data
chown -R postgres:postgres /home/postgres/pgsql_log

切换到安装数据时自动创建的postgres用户

su - postgres

初始化数据库到新建数据目录

/usr/pgsql-14/bin/initdb -D /home/postgres/pgsql_data

启动服务器(初始化数据库日志文件)

/usr/pgsql-14/bin/pg_ctl -D  /home/postgres/pgsql_data/ -l /home/postgres/pgsql_log/pgsql.log start
# 查看状态
/usr/pgsql-14/bin/pg_ctl -D /home/postgres/pgsql_data/ -l /home/postgres/pgsql_log/pgsql.log status

切换到管理员开启端口并重启防火墙

su root
firewall-cmd --zone=public --add-port=5432/tcp --permanent
firewall-cmd --reload

修改配置文件实现远程访问vi /home/postgres/pgsql_data/postgresql.conf

# 修改监听地址
listen_addresses = '*'
# 修改最大连接数(按需)
max_connections = 1000
# 修改密码认证
password_encryption = md5

修改可访问的用户IP段

vi /home/pgsql_data/pg_hba.conf(a进入编辑模式,esc退出编辑模式,:wq并按回车保存)
IPV4下修改为或新增
host    all             all             0.0.0.0/0               trust

postgres用户重启数据库服务

su - postgres
/usr/pgsql-14/bin/pg_ctl -D  /home/postgres/pgsql_data/ -l /home/postgres/pgsql_log/pgsql.log restart

数据库安装结束,管理员postgres,默认密码123456

img

使用navicat连接pg库后新建数据库

3.2 环境配置

3.2.1 python连接pg

pip install "psycopg[binary,pool]"

  1. 使用sqlalchemy

    from sqlalchemy import create_engine
    
    with engine.connect().execution_options(stream_results=True) as connection:
        stream = connection.execute(sql)
    
        while 1:
            streams = stream.fetchmany(1000)
            if not streams:
                break
            for idx, row in enumerate(streams):
                row = row._asdict()
                print(idx)
    
  2. 使用psycopg

    #!/usr/bin/env Python
    # -- coding: utf-8 --
    
    """
    @version: v1.0
    @author: huangyc
    @file: cur_2_test.py
    @Description: 
    @time: 2024/3/21 17:22
    """
    import importlib
    import os
    
    from basic_support.comm_funcs.comm_utils import gen_uuid32
    from dbutils.pooled_db import PooledDB
    from psycopg import ServerCursor
    from functools import partial
    from psycopg.rows import dict_row
    
    os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
    
    # 解决游标的问题, 默认使用的是ClientCursor
    cls = partial(ServerCursor, name=gen_uuid32())
    
    config = dict(host='192.168.xx.xx', port=5432, user='xx', password='xx', dbname='xx', cursor_factory=cls)
    dic = dict(keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5)
    config.update(dic)
    
    db_creator = importlib.import_module("psycopg")
    
    max_cached = 0
    max_connections = 16
    max_usage = 0
    blocking = True
    
    sql = f"SELECT * FROM t_tmp_filter_docs limit 5000"
    
    """
    max_cached: 池中空闲连接的最大数量。默认为0,即无最大数量限制。(建议默认)
    max_connections: 被允许的最大连接数。默认为0,无最大数量限制。(视情况而定)
    max_usage: 连接的最大使用次数。默认0,即无使用次数限制。(建议默认)
    blocking: 连接数达到最大时,新连接是否可阻塞。默认False,即达到最大连接数时,再取新连接将会报错。
             (建议True,达到最大连接数时,新连接阻塞,等待连接数减少再连接)
    """
    pool = PooledDB(db_creator,
                    maxcached=max_cached,
                    maxconnections=max_connections,
                    maxusage=max_usage,
                    blocking=blocking,
                    **config)
    
    conn = pool.connection()
    
    cur = conn.cursor(row_factory=dict_row)
    batch_size = 1000
    params = None
    cur.itersize = batch_size
    cur.execute(sql, params)
    
    idx = 0
    while records := cur.fetchmany(batch_size):
        idx += 1
        print(idx)
    
    try:
        cur.close()
    except:
        pass
    
    try:
        conn.close()
    except:
        pass
    

3.3 sql语法

3.3.1 数据库连接

-- 获取数据库实例连接数
select count(*) from pg_stat_activity;
-- 获取数据库最大连接数
show max_connections;
-- 查询当前连接数详细信息
select * from pg_stat_activity;
-- 查询数据库中各个用户名对应的数据库连接数
select usename, count(*) from pg_stat_activity group by usename;

3.3.2 数据库信息

-- 查询数据库大小
select pg_size_pretty (pg_database_size('pg_fac_stk'));

-- 查询各表磁盘占用
SELECT
    table_schema || '.' || table_name AS table_full_name,
    pg_size_pretty(pg_total_relation_size('"' || table_schema || '"."' || table_name || '"')) AS size
FROM information_schema.tables where table_name like 'finance_%'
ORDER BY
    pg_total_relation_size('"' || table_schema || '"."' || table_name || '"') DESC;

-- 获取各个表中的数据记录数
select relname as TABLE_NAME, reltuples as rowCounts from pg_class where relkind = 'r' order by rowCounts desc;

-- 查看数据库表对应的数据文件
select pg_relation_filepath('product');

-- 查看数据库实例的版本
select version();

-- 分析评估SQL执行情况
EXPLAIN ANALYZE SELECT * FROM t_cfg_opinfo;

-- 获取数据库当前的回滚事务数以及死锁数
select datname,xact_rollback,deadlocks from pg_stat_database;

3.3.3 数据备份与恢复

使用pgdump备份数据库

pgdump是PostgreSQL官方提供的备份工具,可以将数据库的数据和架构保存到一个文件中,使用pgdump备份的优点包括:

  1. 备份数据可以保持原有的结构和特性,还原时可以保证数据准确性
  2. 备份文件可以跨平台传输,方便进行远程备份
  3. 备份文件可以进行压缩,减小文件大小,方便传输和存储

可以新建数据库,建几张表做测试

# 学生表
CREATE TABLE students (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    gender VARCHAR(10) NOT NULL,
    age INTEGER NOT NULL,
    class VARCHAR(20) NOT NULL
);
# 学科表
CREATE TABLE subjects (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL
);
# 成绩表
CREATE TABLE scores (
    id SERIAL PRIMARY KEY,
    student_id INTEGER NOT NULL,
    subject_id INTEGER NOT NULL,
    score INTEGER NOT NULL,
    FOREIGN KEY (student_id) REFERENCES students (id),
    FOREIGN KEY (subject_id) REFERENCES subjects (id)
);

# 插入一些测试数据
INSERT INTO students (name, gender, age, class)
VALUES
    ('Alice', 'Female', 18, 'Class A'),
    ('Bob', 'Male', 17, 'Class B'),
    ('Charlie', 'Male', 19, 'Class A'),
    ('Diana', 'Female', 18, 'Class B');

# 插入学科表数据
INSERT INTO subjects (name)
VALUES
    ('Mathematics'),
    ('English'),
    ('Science');

-- Alice 的成绩
INSERT INTO scores (student_id, subject_id, score)
VALUES
    (1, 1, 90),
    (1, 2, 85),
    (1, 3, 92);
-- Bob 的成绩
INSERT INTO scores (student_id, subject_id, score)
VALUES
    (2, 1, 78),
    (2, 2, 80),
    (2, 3, 75);
-- Charlie 的成绩
INSERT INTO scores (student_id, subject_id, score)
VALUES
    (3, 1, 88),
    (3, 2, 92),
    (3, 3, 90);
-- Diana 的成绩
INSERT INTO scores (student_id, subject_id, score)
VALUES
    (4, 1, 95),
    (4, 2, 88),
    (4, 3, 92);

备份

使用pgdump备份数据库非常简单,只需要在终端中输入相应的命令即可

  • 备份整个数据库

    pg_dump -h <数据库地址> -p <数据库端口> -U <数据库用户名> -F c -b -v -f <备份文件路径> <数据库名称>
    
    # 示例
    /usr/pgsql-14/bin/pg_dump -h 127.0.0.1 -U postgres -p 5432 -F t -b -v -f build_hyc_test.sql.tar hyc_test
    
  • 备份指定表或数据

    pg_dump -h <数据库地址> -p <数据库端口> -U <数据库用户名> -F c -b -v -t <表名1> -t <表名2> -f <备份文件路径> <数据库名称>
    
    # 示例
    -- 备份指定表到sql文件
    -- '-c --if-exists' 会生成 'drop table if exist' 命令
    -- '--no-owner' 是一个选项,用于指定在导出数据库时不包括拥有者信息
    pg_dump --verbose --host=192.168.xx.xx --port=5432 --username=postgres --file /home/huangyc/pg_bak_test/bak_hyc.sql --encoding=UTF-8 -t "public.tushare_wz_index" -t "public.tushare_us_basic" -t "public.dim_fund" -t "public.dim_index" -c --if-exists --no-owner pg_fac_stk
    

具体参数的含义如下:

  • -h:数据库服务所在主机地址,可以是本地地址localhost或者IP地址
  • -p:数据库服务的监听端口,一般为默认端口5432
  • -U:连接数据库的用户名
  • -F:备份文件的格式,包括自定义格式c,纯文本格式p和归档格式t
  • -b:在备份文件中包含备份的数据库的模式信息
  • -v:备份过程中输出详细的信息
  • -f:备份文件的保存路径和文件名
  • -t:只备份指定的表和数据
-- 备份postgres库并tar打包
pg_dump -h 127.0.0.1 -p 5432 -U postgres -f postgres.sql.tar -Ft;

-- 备份postgres库,转储数据为带列名的INSERT命令
pg_dumpall -d postgres -U postgres -f postgres.sql --column-inserts;

还原

使用备份文件进行恢复也非常简单,只需要在终端中输入相应的命令即可

  • 恢复整个库

    pg_restore -h <数据库地址> -p <数据库端口> -U <数据库用户名> -d <数据库名称> <备份文件路径>
    
    # 示例
    /usr/pgsql-14/bin/pg_restore -h 127.0.0.1 -U postgres -p 5432 -d hyc_test_bak build_hyc_test.sql.tar
    
  • 恢复指定数据

    pg_restore -h <数据库地址> -p <数据库端口> -U <数据库用户名> -t <表名1> -t <表名2> -d <数据库名称> <备份文件路径>
    
    # 示例
    -- 对于pg_dump备份出来的sql文件,直接执行sql文件即可恢复
    -- 还原指定sql文件到bak_test库(需要自己建库)
    psql --host=192.168.xx.xx --port=5432 --username=postgres -d bak_test --file /home/huangyc/pg_bak_test/bak_hyc.sql.tar
    

具体参数的含义如下:

  • -h:数据库服务所在主机地址,可以是本地地址localhost或者IP地址
  • -p:数据库服务的监听端口,一般为默认端口5432
  • -U:连接数据库的用户名
  • -d:恢复数据的目标数据库名称
  • -t:只恢复指定的表和数据

命令详解

[postgres@pg01 ~]$ pg_dump --help
用法:
  pg_dump [选项]... [数据库名字]
**一般选项**:
  -f, --file=FILENAME          输出文件或目录名
  -F, --format=c|d|t|p         输出文件格式 (c=custom, d=directory, t=tar,p=plain,plain就是sql纯文本 (默认值))
  -j, --jobs=NUM               执行多个并行任务进行备份转储工作
  -v, --verbose                详细模式
  -V, --version                输出版本信息,然后退出
  -Z, --compress=0-9           被压缩格式的压缩级别,0表示不压缩
  --lock-wait-timeout=TIMEOUT  在等待表锁超时后操作失败
  --no-sync                    不用等待变化安全写入磁盘
  -?, --help                   显示此帮助, 然后退出
**控制输出内容选项(常用)**:
  -a, --data-only              只转储数据,不包括模式,只对纯文本输出有意义
  -s, --schema-only            只转储模式, 不包括数据
  -c, --clean                  在重新创建之前,先清除(删除)数据库对象,如drop table。只对纯文本输出有意义
  -C, --create                 指定输出文件中是否生成create database语句,只对纯文本输出有意义
  -n, --schema=PATTERN         指定要导出的schema,不指定则导出所有的非系统schema
  -N, --exclude-schema=PATTERN 排除导出哪些schema
  -O, --no-owner               在明文格式中, 忽略恢复对象所属者
  -t, --table=PATTERN          指定导出的表、视图、序列,可以使用多个-t匹配多个表,使用-t之后,-n和-N就失效了
  -T, --exclude-table=PATTERN  排除表
  -x, --no-privileges          不要转储权限 (grant/revoke)
  --disable-triggers           在只恢复数据的过程中禁用触发器
  --exclude-table-data=PATTERN do NOT dump data for the specified table(s)
  --if-exists                  当删除对象时使用IF EXISTS
  --inserts                    以INSERT命令,而不是COPY命令的形式转储数据,使用该选项可以把数据加载到非pg数据库,会使恢复非常慢
                               该选项为每行生成1个单独的insert命令,?在恢复过程中遇到错误,将会丢失1行而不是全部表数据
  --column-inserts             以带有列名的INSERT命令形式转储数据,例如insert into table_name(column,...) values(value1,...)
  --load-via-partition-root    通过根表加载分区
  --no-comments                不转储注释
  --no-tablespaces             不转储表空间分配信息
  --no-unlogged-table-data     不转储没有日志的表数据
  --on-conflict-do-nothing     将ON CONFLICT DO NOTHING添加到INSERT命令
**控制输出内容选项(不常用)**:
  -S, --superuser=NAME         指定关闭触发器时需要用到的超级用户名。 它只有在使用了--disable-triggers时才有影响。一般情况下,最好不要输入该参数,而是用 超级用户启动生成的脚本。
  -b, --blobs                  在转储中包括大对象
  -B, --no-blobs               排除转储中的大型对象
  -E, --encoding=ENCODING      转储以ENCODING形式编码的数据
  --binary-upgrade             只能由升级工具使用
  --enable-row-security        启用行安全性(只转储用户能够访问的内容)
  --extra-float-digits=NUM     覆盖extra_float_digits的默认设置
  --disable-dollar-quoting     取消美元 (符号) 引号, 使用 SQL 标准引号
  --no-publications            不转储发布
  --no-security-labels         不转储安全标签的分配
  --no-subscriptions           不转储订阅
  --no-synchronized-snapshots  在并行工作集中不使用同步快照
  --quote-all-identifiers      所有标识符加引号,即使不是关键字
  --rows-per-insert=NROWS      每个插入的行数;意味着--inserts
  --section=SECTION            备份命名的节 (数据前, 数据, 及 数据后)
  --serializable-deferrable    等到备份可以无异常运行
  --snapshot=SNAPSHOT          为转储使用给定的快照
  --strict-names               要求每个表和(或)schema包括模式以匹配至少一个实体
  --use-set-session-authorization
                               使用 SESSION AUTHORIZATION 命令代替
                               ALTER OWNER 命令来设置所有权
**联接选项**:
  -d, --dbname=DBNAME      对数据库 DBNAME备份
  -h, --host=主机名        数据库服务器的主机名或套接字目录
  -p, --port=端口号        数据库服务器的端口号
  -U, --username=名字      以指定的数据库用户联接
  -w, --no-password        永远不提示输入口令
  -W, --password           强制口令提示 (自动)
  --role=ROLENAME          在转储前运行SET ROLE

对于pg_dump的自定义备份custom和tar类型的备份,需要使用pg_restore进行恢复,pg_restore语法如下

[postgres@pg01 pg_backup]$ pg_restore --help
pg_restore 从一个归档中恢复一个由 pg_dump 创建的 PostgreSQL 数据库.
用法:
  pg_restore [选项]... [文件名]
一般选项:
  -d, --dbname=名字        连接数据库名字
  -f, --file=文件名       输出文件名(- 对于stdout)
  -F, --format=c|d|t       备份文件格式(应该自动进行)
  -l, --list               打印归档文件的 TOC 概述
  -v, --verbose            详细模式
  -V, --version            输出版本信息, 然后退出
  -?, --help               显示此帮助, 然后退出
恢复控制选项:
  -a, --data-only              只恢复数据, 不包括模式
  -c, --clean                  在重新创建之前,先清除(删除)数据库对象
  -C, --create                 创建目标数据库
  -e, --exit-on-error          发生错误退出, 默认为继续
  -I, --index=NAME             恢复指定名称的索引
  -j, --jobs=NUM               执行多个并行任务进行恢复工作
  -L, --use-list=FILENAME      从这个文件中使用指定的内容表排序
                               输出
  -n, --schema=NAME            在这个模式中只恢复对象
  -N, --exclude-schema=NAME    不恢复此模式中的对象
  -O, --no-owner               不恢复对象所属者
  -P, --function=NAME(args)    恢复指定名字的函数
  -s, --schema-only            只恢复模式, 不包括数据
  -S, --superuser=NAME         使用指定的超级用户来禁用触发器
  -t, --table=NAME             恢复命名关系(表、视图等)
  -T, --trigger=NAME           恢复指定名字的触发器
  -x, --no-privileges          跳过处理权限的恢复 (grant/revoke)
  -1, --single-transaction     作为单个事务恢复
  --disable-triggers           在只恢复数据的过程中禁用触发器
  --enable-row-security        启用行安全性
  --if-exists                  当删除对象时使用IF EXISTS
  --no-comments                不恢复注释
  --no-data-for-failed-tables  对那些无法创建的表不进行
                               数据恢复
  --no-publications            不恢复发行
  --no-security-labels         不恢复安全标签信息
  --no-subscriptions           不恢复订阅
  --no-tablespaces             不恢复表空间的分配信息
  --section=SECTION            恢复命名节 (数据前、数据及数据后)
  --strict-names               要求每个表和(或)schema包括模式以匹配至少一个实体
  --use-set-session-authorization
                               使用 SESSION AUTHORIZATION 命令代替
                               ALTER OWNER 命令来设置所有权
联接选项:
  -h, --host=主机名        数据库服务器的主机名或套接字目录
  -p, --port=端口号        数据库服务器的端口号
  -U, --username=名字      以指定的数据库用户联接
  -w, --no-password        永远不提示输入口令
  -W, --password           强制口令提示 (自动)
  --role=ROLENAME          在恢复前执行SET ROLE操作
选项 -I, -n, -N, -P, -t, -T, 以及 --section 可以组合使用和指定
多次用于选择多个对象.
如果没有提供输入文件名, 则使用标准输入.

3.3.4 表空间

新建表空间

# 新建表空间目录 t_fac_ts
mkdir /home/huangyc/t_fac_ts
# 修改表空间的用户权限
chown postgres /home/huangyc/t_fac_ts

pg库新建表空间

create tablespace t_fac_ts owner postgres location '/home/huangyc/t_fac_ts';

表空间有关的一些语法

# 删除表空间 (需要先drop表空间所有的表, 或者将该空间下所有的表移除才能drop表空间)
DROP TABLESPACE t_fac_ts;
# 修改具体的表到指定表空间下
ALTER TABLE t_fac_tushare_stock_basic SET TABLESPACE t_fac_ts;
# 修改指定库到指定表空间下
ALTER DATABASE name SET TABLESPACE new_tablespace;

3.3.5 锁表处理

pg锁表解锁

  1. 查看被锁的表

    select a.locktype,a.database,a.pid,a.mode,a.relation,b.relname
    from pg_locks a
    join pg_class b on a.relation = b.oid where relname='t_opt_strhdk_blsj';
    
  2. 杀死被锁的pid

    select pg_terminate_backend(pid);
    

3.3.6 表结构修改

-- 修改表名
alter table "user" rename to "ts_user";
-- 添加新字段
alter table table_name add column col_name varchar(50);
-- 丢弃某列
alter table table_name drop column col_name;
-- 添加主键
alter table table_name add primary key("col_name");
-- 修改字段名
alter table table_name rename column old_col_name to new_col_name;

3.3.7 数据更新和查询

设置某字段的值

-- 设置某字段的值
update table_name set col_name=new_value;

-- 更新某个字段并关联其他表
UPDATE table1
SET field_to_update = table2.new_value
FROM table2
WHERE table1.common_column = table2.common_column;

删除表中重复数据

-- 查询[旧表]数据的重复情况
select col1,col2,count(*) from old_table group by col1,col2;

-- 所有字段都一样的情况
create table bak_table as select distinct * from table_name;

-- 查询[新表]数据的重复情况
select col1,col2,count(*) from bak_table group by col1,col2;
truncate table old_table;
insert into old_table (col1,col2) select col1,col2 from bak_table;

不存在插入,存在更新

insert into ... on conflict(column_name) do update set ...

conflict(column_name): column_name字段是判断要查找的数据是否存在,作为判断条件

column_name必须是主键或者其他具有唯一性的字段(如唯一键或排他键)

insert into user(id,username,address,create_date,create_by) 
values('1','刘德华','香港',now(),'system') 
on conflict(id) 
do update set address='中国',update_date=now(),update_by='system';
# 批量的方式
insert into testunnest(id, age, name) values (unnest(array[1,3]), unnest(array[18,10]), unnest(array['valupdated', 'val3'])) on conflict (id) do update set age = excluded.age, name = excluded.name;

3.3.8 数据和结构复制

-- [复制表和数据] 复制表结构和数据 自动建表,不会复制主键什么的
create table new_table as select * from old_table [WITH NO DATA];

-- [复制数据] 复制数据到 新表 表需要提前建,并且表字段要一致,不会复制主键什么的
insert into new_table (col_0, col_1) select col_0, col_1 from old_table;

3.3.9 视图

普通视图

视图是一个虚拟表,它是根据一个或多个基本表的查询结果动态生成的,每次查询视图时都会执行相应的查询

CREATE VIEW view_name AS
SELECT column1, column2, ...
FROM table_name
WHERE condition;

drop view view_name;

物化视图

物化视图是一个实际存储数据的表,它的数据定期刷新,不像普通视图那样每次查询都重新计算。

CREATE MATERIALIZED VIEW materialized_view_name AS
SELECT column1, column2, ...
FROM table_name
WHERE condition;

drop MATERIALIZED VIEW materialized_view_name

需要注意的是,物化视图需要定期手动或自动刷新以更新数据,你可以使用 REFRESH MATERIALIZED VIEW 命令来进行刷新

3.3.10 分页查询

在PostgreSQL数据库中,进行分页查询通常使用LIMIT和OFFSET子句来实现

LIMIT用于限制返回的行数,OFFSET用于指定从结果集的哪一行开始返回数据,下面是一个简单的分页查询示例:

假设我们有一个名为products的表,其中包含产品的名称和价格,我们想要获取第2页,每页显示10条数据,可以使用以下SQL查询语句:

SELECT product_name, price
FROM products
ORDER BY product_id
LIMIT 10 OFFSET 10;

在这个例子中,假设product_id是产品表中的唯一标识符,通过ORDER BY将结果按照product_id排序,LIMIT 10表示返回10行数据,OFFSET 10表示从结果集中的第11行开始返回数据,即返回第2页的数据

3.3.11 乱序查询

在PostgreSQL数据库中,使用ORDER BY RANDOM()语句可以实现按照随机顺序排序查询结果

这样可以让查询结果以随机的顺序返回,每次执行查询时都会得到不同的排序结果

这在需要随机抽样数据或随机展示数据时非常有用

需要注意的是,使用ORDER BY RANDOM()可能会影响查询的性能,因为需要对结果集进行随机排序操作

假设我们有一个名为products的表,其中包含产品的名称和价格,我们想要以随机顺序返回产品列表,可以使用以下SQL查询语句:

SELECT
    product_name,
    price
FROM
    products
ORDER BY
    RANDOM();

这将返回products表中产品名称和价格的数据,并以随机的顺序排序,每次执行该查询,返回的产品列表顺序都会不同

3.3.12 列表查询

列表字段的创建

CREATE TABLE your_table (
    id SERIAL PRIMARY KEY,
    list_column_name _TEXT
);

在PostgreSQL中,你可以使用以下方式来查询数组字段:

  1. 查询数组字段中包含特定值的行:
SELECT * FROM your_table WHERE your_array_column = ANY('{value1,value2,value3}');
  1. 查询数组字段中包含任何查询数组中的值的行:
SELECT * FROM your_table WHERE your_array_column && '{value1,value2,value3}';
  1. 查询数组字段中包含所有查询数组中的值的行:
SELECT * FROM your_table WHERE your_array_column @> '{value1,value2,value3}';
  1. 查询数组字段的长度大于1的所有行:
SELECT * FROM your_table WHERE array_length(your_array_column, 1) > 1;
  1. 查询数组字段中第一个元素等于特定值的行:
SELECT * FROM your_table WHERE your_array_column[1] = 'specific_value';

3.3.13 删除重复记录

postgresql 常用的删除重复数据方法

-- 初始化数据
create table hyc_tmp_del_test(id int, name varchar(255));
create table hyc_tmp_del_test_bk (like hyc_tmp_del_test);
insert into hyc_tmp_del_test select generate_series(1, 10000), 'huangyc';
insert into hyc_tmp_del_test select generate_series(1, 10000), 'huangyc';
insert into hyc_tmp_del_test_bk select * from hyc_tmp_del_test;

-- 最容易想到的方法就是判断数据是否重复,对于重复的数据只保留ctid最小(或最大)的数据,删除其他的
-- id相同的数据,保留ctid最小的,其他的删除
explain analyse delete from hyc_tmp_del_test_bk a where a.ctid <> (select min(t.ctid) from hyc_tmp_del_test_bk t where a.id=t.id); -- 17.112s

-- group by方法通过分组找到ctid最小的数据,然后删除其他数据
explain analyse delete from hyc_tmp_del_test_bk a where a.ctid not in (select min(ctid) from hyc_tmp_del_test_bk group by id); -- 0.052s

-- 高效删除方法
explain analyze delete from hyc_tmp_del_test_bk a where a.ctid = any(array (select ctid from (select row_number() over (partition by id), ctid from hyc_tmp_del_test_bk) t where t.row_number > 1)); -- 0.055s

第二种和第三种感觉差不多,原文说是第三种快不少,这里pg库是14.x版本

关键

pg中每个表都有几个系统隐藏列:tableoid, xmin, xmax,cmin,cmax,ctid

其中tableoid表示表的oid,cmin、cmax、xmin和xmax是mvcc的实现有关

ctid表示行版本在表中的物理位置: 它属于对象标识符类型(oid,Object Identifier Types),是一种行标识符,它的数据使用的元组标识符(tid,tuple identifier)。元组ID是一对(块号,块内的元组索引),用于标识当前行的物理位置。

引申用法

假设我们有一个表格 products,包含产品名称和产品类别。我们希望从中找出每个产品类别中的前两个产品

SELECT
  *
FROM
  (
    SELECT
      product_name,
      category,
      row_number() OVER (PARTITION BY SUBSTR(p.category, 0, LENGTH(p.category)-2) ORDER BY product_name) AS ranking
    FROM
      products p
    WHERE
      p.category LIKE 'Electronics:%'
  ) subquery
WHERE
  ranking <= 2;

在这个例子中,我们有一个产品表 products,其中包含 product_namecategory

我们想要找出每个以 'Electronics:' 开头的产品类别中的前两个产品

  • 子查询部分:
    • 子查询从 products 表中选择了产品名称 product_name 和产品类别 category
    • 我们只关注以 'Electronics:' 开头的产品类别,即 p.category LIKE 'Electronics:%'
    • 使用 row_number() 函数,对每个以 'Electronics:' 开头的类别进行编号,按产品名称排序
  • 主查询部分:
    • 在主查询中,我们选择了子查询的所有列 *
    • 我们进一步筛选了结果,只选择了编号小于等于2的记录,以获取每个类别中的前两个产品

3.3.14 索引

-- 获取数据库表中的索引
select * from pg_indexes where tablename = 't_cfg_opinfo'; 
-- 创建索引
create index index_name on table_name (col_0, col_1);
-- 查询索引
select * from pg_indexes where tablename='table_name';
-- 删除索引
drop index index_name;

什么情况下要避免使用索引?

虽然索引的目的在于提高数据库的性能,但这里有几个情况需要避免使用索引

使用索引时,需要考虑下列准则

  • 索引不应该使用在较小的表上
  • 索引不应该使用在有频繁的大批量的更新或插入操作的表上
  • 索引不应该使用在含有大量的 NULL 值的列上
  • 索引不应该使用在频繁操作的列上

3.3.15 实用sql

-- 查询库中的最大版本
SELECT (CASE WHEN MAX(version) IS NULL THEN -1 ELSE MAX(version) END) + 1 AS version FROM table_name

3.3.16 其他语法

筛选某列,逗号拼接

select string_agg(bs_org_id,',') as bs_org_ids 
  from bs_org 
  where par_org_id ='100'

日期转换

select to_char(col_name,'yyyyMMDD')-interval '2 day' from table_name
-- -interval '2 day' 表示往前2天

转时间戳

select '2011-01-06 09:57:59'::timestamp;
TO_TIMESTAMP('2011-01-06 09:57:59', 'YYYY-MM-DD HH24:MI:S')

postgresql 获取分组第一条数据 窗口函数

  1. 给数据分组并排名,使用 row_number() over (partition by 分组的字段名 order by 排序规则) as 排名
  2. 从上述第一步中取出,排名为第一的数据,即为第一条数据 select * from 上述第一步 where 排名=1
  3. 获取前N名的数据,将一中第二步的条件换成where 排名 < N+1

distributed key

alter table table_name set distributed by (id);
alter table table_name add primary key (id);

4 ORM框架

4.1 ORM框架比较

一文了解 Python 的三种数据源架构模式

SQLAlchemy 和其他的 ORM 框架

SQLObject

  • 优点:

    采用了易懂的ActiveRecord 模式

    一个相对较小的代码库

  • 缺点:

    方法和类的命名遵循了Java 的小驼峰风格

    不支持数据库session隔离工作单元

Storm

  • 优点:

    清爽轻量的API,短学习曲线和长期可维护性

    不需要特殊的类构造函数,也没有必要的基类

  • 缺点:

    迫使程序员手工写表格创建的DDL语句,而不是从模型类自动派生

    Storm的贡献者必须把他们的贡献的版权给Canonical公司

Django's ORM

  • 优点:

    易用,学习曲线短

    和Django紧密集合,用Django时使用约定俗成的方法去操作数据库

  • 缺点:

    不好处理复杂的查询,强制开发者回到原生SQL

    紧密和Django集成,使得在Django环境外很难使用

peewee

  • 优点:

    Django式的API,使其易用

    轻量实现,很容易和任意web框架集成

  • 缺点:

    不支持自动化 schema 迁移

    多对多查询写起来不直观

SQLAlchemy

  • 优点:

    企业级API,使得代码有健壮性和适应性

    灵活的设计,使得能轻松写复杂查询

  • 缺点:

    工作单元概念不常见

    重量级API,导致长学习曲线

相比其他的ORM, SQLAlchemy 意味着,无论你何时写SQLAlchemy代码, 都专注于工作单元的前沿概念 。DB Session 的概念可能最初很难理解和正确使用,但是后来你会欣赏这额外的复杂性,这让意外的时序提交相关的数据库bug减少到0。在SQLAlchemy中处理多数据库是棘手的, 因为每个DB session 都限定了一个数据库连接。但是,这种类型的限制实际上是好事, 因为这样强制你绞尽脑汁去想在多个数据库之间的交互, 从而使得数据库交互代码很容易调试。

4.2 SQLAlchemy

SQLAlchemy 1.4 Documentation

sqlalchemy操作数据库

sqlalchemy外键和relationship查询

SQLALlchemy数据查询小集合

SQLAlchemy 的连接池机制

SQLAlchemy 中的 Session、sessionmaker、scoped_session

Contextual/Thread-local Sessions

SQLAlchemy(常用的SQLAlchemy列选项)

查询官网例子Object Relational Tutorial (1.x API)

sqlalchemy外键和relationship查询

4.2.1 session和scoped_session

session用于创建程序和数据库之间的会话,所有对象的载入和保存都需通过session对象 。 通过sessionmaker调用创建一个工厂,并关联Engine以确保每个session都可以使用该Engine连接资源 scoped_session 实现了一个线程的隔离,保证不同的线程拿到不同的session, 同一个线程拿到的session 是同一个值

s1 = Session()
s2 = Session()
s1.add(person)
s1.commit()
# 必须先close,s2才能继续操作person
s1.close()
s2.add(person)

session 和scoped_session本质上都是用来操作数据库的,只是session 只适合在单线程下面使用 官方文档提到了scoped_session的正确使用方法。request结束后要调用scoped_session.remove()

Engine Configuration

使用 create_engine创建我们需要的DB starting point

from sqlalchemy import create_engine

scheme = 'mysql+pymysql://root:123456@localhost:3306/dev_shopping?charset=utf8'
engine = create_engine(scheme, pool_size=10 , max_overflow=-1, pool_recycle=1200)

create_engine 函数常用参数:

  • pool_size=10 # 连接池的大小,0表示连接数无限制
  • pool_recycle=-1 # 连接池回收连接的时间,如果设置为-1,表示没有no timeout, 注意,mysql会自动断开超过8小时的连接,所以sqlalchemy沿用被mysql断开的连接会抛出MySQL has gone away
  • max_overflow=-1 # 连接池中允许‘溢出’的连接个数,如果设置为-1,表示连接池中可以创建任意数量的连接
  • pool_timeout=30 # 在连接池获取一个空闲连接等待的时间
  • echo=False # 如果设置True, Engine将会记录所有的日志,日志默认会输出到sys.stdout

创建Engine之后,接下来的问题,就是如何使用Engine

在单进程中,建议在在初始化的模块的时候创建Engine, 使Engine成为全局变量, 而不是为每个调用Engine的对象或者函数中创建, Engine不同于connect, connect函数会创建数据库连接的资源,Engine是管理connect创建的连接资源

在多进程中,为每个子进程都创建各自的Engine, 因为进程之间是不能共享Engine

4.2.2 几种操作方式

Working with Engines and Connections

SqlAlchemy的Engine,Connection和Session 区别?适合什么时候用?

Engine方式

Engine是SQLAlchemy中连接数据库最底层级别的对象,它维护了一个连接池,可以在应用程序需要和数据库对话时使用。在Engine.execute(close_with_result=True) close_with_result=True 表示连接自动关闭;

result = engine.execute('SELECT * FROM tablename;') 
conn = engine.connect(close_with_result=True)
result = conn.execute('SELECT * FROM tablename;')
for row in result:
    print(result['columnname']
result.close()

Connection方式

Connection,实际上是执行SQL查询的工作,每当你想更好的控制连接的属性,如何时关闭等都建议使用这个操作;比如在一个事务中,要控制它提交commit的时间,在connection控制中就可以运行多个不同的SQL语句,如果其中一个出现问题,则其他所有的语句都会撤销更改;

connection = engine.connect()
trans = connection.begin()
try:
    connection.execute("INSERT INTO films VALUES ('Comedy', '82 minutes');")
    connection.execute("INSERT INTO datalog VALUES ('added a comedy');")
    trans.commit()
except:
    trans.rollback()
    raise

Session方式

Session,一般都是用于ORM中,因为在ORM中,会自动生成SQL语句以及自动连接数据库(自己配置),使用session.execute()也是个编辑的方法,可以将会话绑定到任何对象;如果你确定使用ORM,就建议使用session来处理execute(),否则还是使用connection更好方便;

总结: 从应用角度来看,可以把这三类分为两种:

  1. 直接使用Engine.execute() 或Connection.execute(),更加灵活,可以使用原生SQL语句

  2. 使用Session处理交易类型的数据,因为方便使用session.add(), session.rollback(), session.commit(), session.close()等,它是使用ORM时推荐的一种和数据库交互的方式

Copyright © narutohyc.com 2021 all right reserved,powered by Gitbook该文件修订时间: 2024-05-06 07:11:15

results matching ""

    No results matching ""