1 Elasticsearch数据库
1.1 环境配置
安装环境
pip install elasticsearch==7.6.0
1.2 EsDao包装类
# -- coding: utf-8 --
"""
@version: v1.0
@author: huangyc
@file: EsDao.py
@Description: Es统一操作类
@time: 2020/4/27 10:22
"""
from elasticsearch.helpers import bulk
from elasticsearch import Elasticsearch
import pandas as pd
class EsDao(object):
"""
ElasticSearch的数据操作类
"""
# 查询批次大小
DEFAULT_BATCH_SIZE = 1000
# 写入批次大小
BULK_BATCH_SIZE = 10000
def __init__(self, hosts, timeout=3600*24):
self.hosts = hosts
self.timeout = timeout
self.es = Elasticsearch(hosts, timeout=self.timeout)
def save_data_list(self, index_name, data_list):
"""
保存数据列表到es的指定索引中
:param index_name: 索引名称
:param data_list: 数据列表,列表元素代表一行数据,元素类型为dict
:return:
"""
bulk_data_lst = [
data_list[i:i + self.BULK_BATCH_SIZE]
for i in range(0, len(data_list), self.BULK_BATCH_SIZE)
]
if len(data_list) > 0 and '_id' in data_list[0]:
for bulk_data in bulk_data_lst:
actions = [{
"_index": index_name,
"_type": index_name,
"_id": data.pop("_id"),
"_source": data
}
for data in bulk_data
]
bulk(self.es, actions, index=index_name, raise_on_error=True)
else:
for bulk_data in bulk_data_lst:
actions = [{
"_index": index_name,
"_type": index_name,
"_source": data
}
for data in bulk_data
]
bulk(self.es, actions, index=index_name, raise_on_error=True)
def is_index_exists(self, index_name):
"""
判断指定索引是否存在
:param index_name: 索引名称
:return:
"""
return self.es.indices.exists(index=index_name)
def delete_by_query(self, index_name, query_body):
"""
按查询结果删除数据
:param index_name:
:param query_body:
:return:
"""
return self.es.delete_by_query(index_name, query_body)
def clear_index_data(self, index_name):
"""
清空指定索引的数据
:param index_name:
:return:
"""
return self.delete_by_query(
index_name=index_name,
query_body={
"query": {
"match_all": {}
}
}
)
def save_df_data(self, index_name, df):
"""
保存pandas的DataFrame到es的指定索引中
:param index_name: 索引名称
:param df: 要保存的dataframe
:return:
"""
col_lst = df.columns.tolist()
dic_lst = [dict([(c, v) for c, v in zip(col_lst, r)]) for r in df.values.tolist()]
self.save_data_list(index_name=index_name, data_list=dic_lst)
def create_index(self, index_name, mapping_properties):
"""
创建索引
:param index_name: 索引名称
:param mapping_properties: 索引mapping中的属性列表
:return:
"""
if not self.es.indices.exists(index=index_name):
mapping = {
"mappings": {
index_name: {
"properties": mapping_properties
}
}
}
res = self.es.indices.create(index=index_name, body=mapping)
if res is not None and 'acknowledged' in res:
return res.get('acknowledged')
return False
def _search_with_scroll(self, index_name, query_body):
if "size" not in query_body:
query_body["size"] = self.DEFAULT_BATCH_SIZE
response = self.es.search(
index=index_name,
body=query_body,
search_type="dfs_query_then_fetch",
scroll="120m",
timeout="60m"
)
scroll_id = response["_scroll_id"]
while True:
sources = [doc["_source"] for doc in response["hits"]["hits"]]
if len(sources) == 0:
break
yield sources
response = self.es.scroll(scroll_id=scroll_id, scroll="60m")
def query_for_df(self, index_name, query_body):
"""
执行查询并获取pandas.DataFrame格式的返回值
:param index_name: 索引名称
:param query_body: 查询条件
:return:
"""
sources = []
for sub_source in self._search_with_scroll(index_name=index_name, query_body=query_body):
sources.extend(sub_source)
return pd.DataFrame(sources)
def query_for_df_with_batch(self, index_name, query_body, batch_size=DEFAULT_BATCH_SIZE):
"""
按批次大小查询并返回pandas.DataFrame的generator格式的返回值
:param index_name: 索引名称
:param query_body: 查询条件
:param batch_size: 批次大小
:return:
"""
if "size" not in query_body:
query_body["size"] = batch_size
for sub_source in self._search_with_scroll(index_name=index_name, query_body=query_body):
yield pd.DataFrame(sub_source)
def get_first_row_with_df(self, index_name):
"""
获取指定索引的首行数据,格式为pandas.DataFrame
可用于获取索引的元信息
:param index_name: 索引名称
:return:
"""
query_body = {
"size": 1,
"query": {
"match_all": {}
}
}
for sub_source in self._search_with_scroll(index_name=index_name, query_body=query_body):
return pd.DataFrame(sub_source)
1.3 使用案例
class TaskMeta:
'''
数据元类
'''
def __init__(self, text, doc_id, sentence_id, reg_lst, flag, has_reg, text_source="primitive"):
self.text = text
self.doc_id = doc_id
self.sentence_id = sentence_id
self.reg_lst = reg_lst
self.flag = flag
self.has_reg = has_reg
self.text_source = text_source
def __repr__(self):
return f'{self.text} {self.doc_id} {self.sentence_id} {self.reg_lst} {self.flag} {self.has_reg} {self.text_source}'
def to_dict(self):
return {"text": self.text,
"doc_id": self.doc_id,
"sentence_id": self.sentence_id,
"reg_lst": self.reg_lst,
"flag": self.flag,
"has_reg": self.has_reg,
"text_source": self.text_source}
def create_index(target_es_dao, index_name, mapping):
'''
创建es索引
:return: 是否创建成功
'''
if not target_es_dao.is_index_exists(index_name):
target_es_dao.create_index(index_name, mapping)
else:
target_es_dao.clear_index_data(index_name)
print(f"索引{index_name}已存在, 已清除数据")
def writer_fun(target_es_dao, target_index, sample_lst):
'''
写数据到es库
'''
df_sample_lst = []
[df_sample_lst.append(sample.to_dict()) for sample in sample_lst]
df_sample_lst = pd.DataFrame(df_sample_lst)
target_es_dao.save_df_data(target_index, df_sample_lst)
print(f'写入数据{len(sample_lst)}条')
def es_cal_test():
# 获取连接
source_es_dao = EsDao(f"http://{aug_config.SOURCE_IP}:{aug_config.SOURCE_PORT}/")
query_condition = {
"query_string": {
"default_field": "has_reg",
"query": "true"
}
}
query_body = {
"query": query_condition
}
# 查询数据
datas = source_es_dao.query_for_df(index_name=aug_config.SOURCE_INDEX, query_body=query_body)
records = datas.to_dict(orient='record')
sample_lst = []
for record in records:
sample_lst.append(
TaskMeta(
text=record["text"],
doc_id=record["doc_id"],
sentence_id=record["sentence_id"],
reg_lst=record["reg_lst"],
flag=record["flag"],
has_reg=record["has_reg"]
)
)
# 创建索引
create_index(target_es_dao, aug_config.TARGET_INDEX, aug_config.MAPPING)
# 写入数据
writer_fun(target_es_dao, aug_config.TARGET_INDEX, sample_lst=sample_lst)
if __name__ == '__main__':
es_cal_test()
2 Oracle数据库
2.1 环境配置
安装库
pip install cx-Oracle
链接库准备,需要将
oci.dll、oraocci11.dll、oraociei11.dll
复制到sitepackages路径下,oracle client下载链接,并配置到系统环境变量,链接中没有的自己去官网(win64、所有平台、linux64)注册一个账号下载对应的版本-- 查看oracle版本 SELECT * FROM v$version;
没有配置会报如下的错:
# Windows下报错 cx_Oracle.DatabaseError: DPI-1047: Cannot locate a 64-bit Oracle Client library: "D:\software\win_or # Linux下报错 cx_Oracle.DatabaseError: DPI-1047: Cannot locate a 64-bit Oracle Client library: "libclntsh.so: cannot open shared object file: No such file or directory". See https://cx-oracle.readthedocs.io/en/latest/user_guide/installation.html for help
windows下安装完客户端后,配置oracle客户端的环境变量
D:\software\win_oracle_dlls\instantclient_11_2
linux下可以使用rpm安装包安装
sudo rpm -ivh oracle-instantclient11.2-basic-11.2.0.4.0-1.x86_64.rpm
然后将环境变量配置到
/etc/profile
# 配置oracle客户端 export ORACLE_CLIENT_HOME=/lib/oracle/11.2/client64 export TNS_ADMIN=$ORACLE_CLIENT_HOME export LD_LIBRARY_PATH=$ORACLE_CLIENT_HOME/lib export ORABIN=$ORACLE_CLIENT_HOME/bin PATH=$PATH:$ORABIN export PATH export PATH=$ORACLE_HOME:$PATH export PATH=$PATH:$HOME/bin:$ORACLE_CLIENT_HOME/bin
其他类似找不到libclntsh.sod的错误,如果出现这个错误,请进行软连接挂载文件,让系统的路径能正确的获取到该文件,操作如下:
sudo sh -c "/usr/lib/oracle/instantclient_11_1 > /etc/ld.so.conf.d/oracle-instantclient.conf" sudo ldconfig
2.2 sql基础
2.2.1 建表
--blob字段插入实例
create table blob_table_tmp(
id number primary key,
blob_cl blob not null,
clob_cl clob not null
);
insert into blob_table_tmp values(1,rawtohex('11111000011111'),'增加一条记录时,碰到插入blob类型数据出错');
insert into blob_table_tmp values(3,rawtohex('4561888'),'增加一条记录时,碰到插入blob类型数据出错');
insert into blob_table_tmp values(4,rawtohex('增加一条记录时333'),'增加一条记录时,碰到插入blob类型数据出错');
2.2.2 查询
获取连接
FINANCE_DB_HOST = "192.168.x.x"
FINANCE_DB_PORT = 1521
FINANCE_DB_USER = "hyc"
FINANCE_DB_PASSWORD = "123456"
FINANCE_DB_DB = "ORCL"
class OracleConn():
config_path = ''
@staticmethod
def get_conn(conn_name, encoding="UTF-8"):
conn_str = str(eval("%s_DB_USER" % (OracleConn.config_path, conn_name))) + "/" + str(eval("%s.%s_DB_PASSWORD" % (OracleConn.config_path, conn_name)))
conn_str += "@" + str(eval("%s_DB_HOST" % (OracleConn.config_path, conn_name)))
conn_str += ":" + str(eval("%s_DB_PORT" % (OracleConn.config_path, conn_name)))
conn_str += "/" + str(eval("%s_DB_DB" % (OracleConn.config_path, conn_name)))
return ora.connect(conn_str, encoding=encoding, nencoding=encoding)
读写数据库
def oracle_test():
# 获取数据库连接
conn = OracleConn.get_conn("FINANCE")
cur = conn.cursor()
# 查询数据
sql = "select id,blob_cl,clob_cl from FINANCE.blob_table_tmp"
datas = []
r = cur.execute(sql)
# 假设name是clob字段类型
[datas.append((gg[0], gg[1].read().decode('utf-8'), gg[2].read())) for gg in r]
# 写入数据
insert_sql = "INSERT INTO new_table(id,new_name) VALUES (:ID,:NEW_NAME)"
res = []
[res.append((data[0], data[1])) for data in datas]
cur.executemany(insert_sql, res)
cur.execute('commit')
cur.close()
conn.close()
print("写入结束")
if __name__ == '__main__':
oracle_test()
2.3 相关操作
关于数据库的连接,查询和写入
import cx_Oracle
class Setting:
DB_USER = 'narutohyc'
DB_PASSWORD = 'hyc'
DB_IP = '192.168.0.1'
DB_PORT = ''
DB_SERVICE = 'dataBaseName'
setting = Setting()
def oracle_test():
# 获取数据库连接
conn = cx_Oracle.connect('%s/%s@%s/%s' % (setting.DB_USER, setting.DB_PASSWORD, setting.DB_IP, setting.DB_SERVICE), encoding='utf-8')
cur = conn.cursor()
# 查询数据
sql = "select ID, name from hyc_database"
datas = []
r = cur.execute(sql)
# 假设name是clob字段类型
[datas.append((gg[0], gg[1].read())) for gg in r]
# 写入数据
insert_sql = "INSERT INTO new_table(id,new_name) VALUES (:ID,:NEW_NAME)"
res = []
[res.append((data[0], data[1])) for data in datas]
cur.executemany(insert_sql, res)
cur.execute('commit')
cur.close()
conn.close()
print("写入结束")
if __name__ == '__main__':
oracle_test()
3 Postgresql数据库
官方文档Documentation → PostgreSQL 16
3.1 离线安装数据库
先从centos7-pg_14.2下载下载rpm包(微云下载centos7.6_PostgreSQL14.2),或者直接官方下载安装教程安装,如果离线安装就下载rpm包
# 离线安装执行以下命令安装
rpm -ivh postgresql14-libs-14.2-1PGDG.rhel7.x86_64.rpm
rpm -ivh postgresql14-14.2-1PGDG.rhel7.x86_64.rpm
rpm -ivh postgresql14-server-14.2-1PGDG.rhel7.x86_64.rpm
rpm -ivh postgresql14-contrib-14.2-1PGDG.rhel7.x86_64.rpm
出现OSError: Python library not found: libpython3.6mu.so.1.0, libpython3.6m.so.1.0, libpython3.6.so.1.0, libpython3.6m.so的解决办法
yum install python3-devel
创建数据库data和log文件夹
# 创建数据库data和log文件夹
mkdir -p /home/postgres/pgsql_data
mkdir -p /home/postgres/pgsql_log
# 创建日志文件
touch /home/postgres/pgsql_log/pgsql.log
授权给安装数据时自动创建的postgres用户
chown -R postgres:postgres /home/postgres/pgsql_data
chown -R postgres:postgres /home/postgres/pgsql_log
切换到安装数据时自动创建的postgres用户
su - postgres
初始化数据库到新建数据目录
/usr/pgsql-14/bin/initdb -D /home/postgres/pgsql_data
启动服务器(初始化数据库日志文件)
/usr/pgsql-14/bin/pg_ctl -D /home/postgres/pgsql_data/ -l /home/postgres/pgsql_log/pgsql.log start
# 查看状态
/usr/pgsql-14/bin/pg_ctl -D /home/postgres/pgsql_data/ -l /home/postgres/pgsql_log/pgsql.log status
切换到管理员开启端口并重启防火墙
su root
firewall-cmd --zone=public --add-port=5432/tcp --permanent
firewall-cmd --reload
修改配置文件实现远程访问vi /home/postgres/pgsql_data/postgresql.conf
# 修改监听地址
listen_addresses = '*'
# 修改最大连接数(按需)
max_connections = 1000
# 修改密码认证
password_encryption = md5
修改可访问的用户IP段
vi /home/pgsql_data/pg_hba.conf(a进入编辑模式,esc退出编辑模式,:wq并按回车保存)
IPV4下修改为或新增
host all all 0.0.0.0/0 trust
postgres用户重启数据库服务
su - postgres
/usr/pgsql-14/bin/pg_ctl -D /home/postgres/pgsql_data/ -l /home/postgres/pgsql_log/pgsql.log restart
数据库安装结束,管理员postgres,默认密码123456
3.2 环境配置
3.2.1 python连接pg
pip install "psycopg[binary,pool]"
使用sqlalchemy
from sqlalchemy import create_engine with engine.connect().execution_options(stream_results=True) as connection: stream = connection.execute(sql) while 1: streams = stream.fetchmany(1000) if not streams: break for idx, row in enumerate(streams): row = row._asdict() print(idx)
使用psycopg
#!/usr/bin/env Python # -- coding: utf-8 -- """ @version: v1.0 @author: huangyc @file: cur_2_test.py @Description: @time: 2024/3/21 17:22 """ import importlib import os from basic_support.comm_funcs.comm_utils import gen_uuid32 from dbutils.pooled_db import PooledDB from psycopg import ServerCursor from functools import partial from psycopg.rows import dict_row os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8' # 解决游标的问题, 默认使用的是ClientCursor cls = partial(ServerCursor, name=gen_uuid32()) config = dict(host='192.168.xx.xx', port=5432, user='xx', password='xx', dbname='xx', cursor_factory=cls) dic = dict(keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5) config.update(dic) db_creator = importlib.import_module("psycopg") max_cached = 0 max_connections = 16 max_usage = 0 blocking = True sql = f"SELECT * FROM t_tmp_filter_docs limit 5000" """ max_cached: 池中空闲连接的最大数量。默认为0,即无最大数量限制。(建议默认) max_connections: 被允许的最大连接数。默认为0,无最大数量限制。(视情况而定) max_usage: 连接的最大使用次数。默认0,即无使用次数限制。(建议默认) blocking: 连接数达到最大时,新连接是否可阻塞。默认False,即达到最大连接数时,再取新连接将会报错。 (建议True,达到最大连接数时,新连接阻塞,等待连接数减少再连接) """ pool = PooledDB(db_creator, maxcached=max_cached, maxconnections=max_connections, maxusage=max_usage, blocking=blocking, **config) conn = pool.connection() cur = conn.cursor(row_factory=dict_row) batch_size = 1000 params = None cur.itersize = batch_size cur.execute(sql, params) idx = 0 while records := cur.fetchmany(batch_size): idx += 1 print(idx) try: cur.close() except: pass try: conn.close() except: pass
3.3 sql语法
3.3.1 数据库连接
-- 获取数据库实例连接数
select count(*) from pg_stat_activity;
-- 获取数据库最大连接数
show max_connections;
-- 查询当前连接数详细信息
select * from pg_stat_activity;
-- 查询数据库中各个用户名对应的数据库连接数
select usename, count(*) from pg_stat_activity group by usename;
3.3.2 数据库信息
-- 查询数据库大小
select pg_size_pretty (pg_database_size('pg_fac_stk'));
-- 查询各表磁盘占用
SELECT
table_schema || '.' || table_name AS table_full_name,
pg_size_pretty(pg_total_relation_size('"' || table_schema || '"."' || table_name || '"')) AS size
FROM information_schema.tables where table_name like 'finance_%'
ORDER BY
pg_total_relation_size('"' || table_schema || '"."' || table_name || '"') DESC;
-- 获取各个表中的数据记录数
select relname as TABLE_NAME, reltuples as rowCounts from pg_class where relkind = 'r' order by rowCounts desc;
-- 查看数据库表对应的数据文件
select pg_relation_filepath('product');
-- 查看数据库实例的版本
select version();
-- 分析评估SQL执行情况
EXPLAIN ANALYZE SELECT * FROM t_cfg_opinfo;
-- 获取数据库当前的回滚事务数以及死锁数
select datname,xact_rollback,deadlocks from pg_stat_database;
3.3.3 数据备份与恢复
pgdump是PostgreSQL官方提供的备份工具,可以将数据库的数据和架构保存到一个文件中,使用pgdump备份的优点包括:
- 备份数据可以保持原有的结构和特性,还原时可以保证数据准确性
- 备份文件可以跨平台传输,方便进行远程备份
- 备份文件可以进行压缩,减小文件大小,方便传输和存储
可以新建数据库,建几张表做测试
# 学生表
CREATE TABLE students (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
gender VARCHAR(10) NOT NULL,
age INTEGER NOT NULL,
class VARCHAR(20) NOT NULL
);
# 学科表
CREATE TABLE subjects (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL
);
# 成绩表
CREATE TABLE scores (
id SERIAL PRIMARY KEY,
student_id INTEGER NOT NULL,
subject_id INTEGER NOT NULL,
score INTEGER NOT NULL,
FOREIGN KEY (student_id) REFERENCES students (id),
FOREIGN KEY (subject_id) REFERENCES subjects (id)
);
# 插入一些测试数据
INSERT INTO students (name, gender, age, class)
VALUES
('Alice', 'Female', 18, 'Class A'),
('Bob', 'Male', 17, 'Class B'),
('Charlie', 'Male', 19, 'Class A'),
('Diana', 'Female', 18, 'Class B');
# 插入学科表数据
INSERT INTO subjects (name)
VALUES
('Mathematics'),
('English'),
('Science');
-- Alice 的成绩
INSERT INTO scores (student_id, subject_id, score)
VALUES
(1, 1, 90),
(1, 2, 85),
(1, 3, 92);
-- Bob 的成绩
INSERT INTO scores (student_id, subject_id, score)
VALUES
(2, 1, 78),
(2, 2, 80),
(2, 3, 75);
-- Charlie 的成绩
INSERT INTO scores (student_id, subject_id, score)
VALUES
(3, 1, 88),
(3, 2, 92),
(3, 3, 90);
-- Diana 的成绩
INSERT INTO scores (student_id, subject_id, score)
VALUES
(4, 1, 95),
(4, 2, 88),
(4, 3, 92);
备份
使用pgdump备份数据库非常简单,只需要在终端中输入相应的命令即可
备份整个数据库
pg_dump -h <数据库地址> -p <数据库端口> -U <数据库用户名> -F c -b -v -f <备份文件路径> <数据库名称> # 示例 /usr/pgsql-14/bin/pg_dump -h 127.0.0.1 -U postgres -p 5432 -F t -b -v -f build_hyc_test.sql.tar hyc_test
备份指定表或数据
pg_dump -h <数据库地址> -p <数据库端口> -U <数据库用户名> -F c -b -v -t <表名1> -t <表名2> -f <备份文件路径> <数据库名称> # 示例 -- 备份指定表到sql文件 -- '-c --if-exists' 会生成 'drop table if exist' 命令 -- '--no-owner' 是一个选项,用于指定在导出数据库时不包括拥有者信息 pg_dump --verbose --host=192.168.xx.xx --port=5432 --username=postgres --file /home/huangyc/pg_bak_test/bak_hyc.sql --encoding=UTF-8 -t "public.tushare_wz_index" -t "public.tushare_us_basic" -t "public.dim_fund" -t "public.dim_index" -c --if-exists --no-owner pg_fac_stk
具体参数的含义如下:
- -h:数据库服务所在主机地址,可以是本地地址localhost或者IP地址
- -p:数据库服务的监听端口,一般为默认端口5432
- -U:连接数据库的用户名
- -F:备份文件的格式,包括自定义格式c,纯文本格式p和归档格式t
- -b:在备份文件中包含备份的数据库的模式信息
- -v:备份过程中输出详细的信息
- -f:备份文件的保存路径和文件名
- -t:只备份指定的表和数据
-- 备份postgres库并tar打包
pg_dump -h 127.0.0.1 -p 5432 -U postgres -f postgres.sql.tar -Ft;
-- 备份postgres库,转储数据为带列名的INSERT命令
pg_dumpall -d postgres -U postgres -f postgres.sql --column-inserts;
还原
使用备份文件进行恢复也非常简单,只需要在终端中输入相应的命令即可
恢复整个库
pg_restore -h <数据库地址> -p <数据库端口> -U <数据库用户名> -d <数据库名称> <备份文件路径> # 示例 /usr/pgsql-14/bin/pg_restore -h 127.0.0.1 -U postgres -p 5432 -d hyc_test_bak build_hyc_test.sql.tar
恢复指定数据
pg_restore -h <数据库地址> -p <数据库端口> -U <数据库用户名> -t <表名1> -t <表名2> -d <数据库名称> <备份文件路径> # 示例 -- 对于pg_dump备份出来的sql文件,直接执行sql文件即可恢复 -- 还原指定sql文件到bak_test库(需要自己建库) psql --host=192.168.xx.xx --port=5432 --username=postgres -d bak_test --file /home/huangyc/pg_bak_test/bak_hyc.sql.tar
具体参数的含义如下:
- -h:数据库服务所在主机地址,可以是本地地址localhost或者IP地址
- -p:数据库服务的监听端口,一般为默认端口5432
- -U:连接数据库的用户名
- -d:恢复数据的目标数据库名称
- -t:只恢复指定的表和数据
命令详解
[postgres@pg01 ~]$ pg_dump --help
用法:
pg_dump [选项]... [数据库名字]
**一般选项**:
-f, --file=FILENAME 输出文件或目录名
-F, --format=c|d|t|p 输出文件格式 (c=custom, d=directory, t=tar,p=plain,plain就是sql纯文本 (默认值))
-j, --jobs=NUM 执行多个并行任务进行备份转储工作
-v, --verbose 详细模式
-V, --version 输出版本信息,然后退出
-Z, --compress=0-9 被压缩格式的压缩级别,0表示不压缩
--lock-wait-timeout=TIMEOUT 在等待表锁超时后操作失败
--no-sync 不用等待变化安全写入磁盘
-?, --help 显示此帮助, 然后退出
**控制输出内容选项(常用)**:
-a, --data-only 只转储数据,不包括模式,只对纯文本输出有意义
-s, --schema-only 只转储模式, 不包括数据
-c, --clean 在重新创建之前,先清除(删除)数据库对象,如drop table。只对纯文本输出有意义
-C, --create 指定输出文件中是否生成create database语句,只对纯文本输出有意义
-n, --schema=PATTERN 指定要导出的schema,不指定则导出所有的非系统schema
-N, --exclude-schema=PATTERN 排除导出哪些schema
-O, --no-owner 在明文格式中, 忽略恢复对象所属者
-t, --table=PATTERN 指定导出的表、视图、序列,可以使用多个-t匹配多个表,使用-t之后,-n和-N就失效了
-T, --exclude-table=PATTERN 排除表
-x, --no-privileges 不要转储权限 (grant/revoke)
--disable-triggers 在只恢复数据的过程中禁用触发器
--exclude-table-data=PATTERN do NOT dump data for the specified table(s)
--if-exists 当删除对象时使用IF EXISTS
--inserts 以INSERT命令,而不是COPY命令的形式转储数据,使用该选项可以把数据加载到非pg数据库,会使恢复非常慢
该选项为每行生成1个单独的insert命令,?在恢复过程中遇到错误,将会丢失1行而不是全部表数据
--column-inserts 以带有列名的INSERT命令形式转储数据,例如insert into table_name(column,...) values(value1,...)
--load-via-partition-root 通过根表加载分区
--no-comments 不转储注释
--no-tablespaces 不转储表空间分配信息
--no-unlogged-table-data 不转储没有日志的表数据
--on-conflict-do-nothing 将ON CONFLICT DO NOTHING添加到INSERT命令
**控制输出内容选项(不常用)**:
-S, --superuser=NAME 指定关闭触发器时需要用到的超级用户名。 它只有在使用了--disable-triggers时才有影响。一般情况下,最好不要输入该参数,而是用 超级用户启动生成的脚本。
-b, --blobs 在转储中包括大对象
-B, --no-blobs 排除转储中的大型对象
-E, --encoding=ENCODING 转储以ENCODING形式编码的数据
--binary-upgrade 只能由升级工具使用
--enable-row-security 启用行安全性(只转储用户能够访问的内容)
--extra-float-digits=NUM 覆盖extra_float_digits的默认设置
--disable-dollar-quoting 取消美元 (符号) 引号, 使用 SQL 标准引号
--no-publications 不转储发布
--no-security-labels 不转储安全标签的分配
--no-subscriptions 不转储订阅
--no-synchronized-snapshots 在并行工作集中不使用同步快照
--quote-all-identifiers 所有标识符加引号,即使不是关键字
--rows-per-insert=NROWS 每个插入的行数;意味着--inserts
--section=SECTION 备份命名的节 (数据前, 数据, 及 数据后)
--serializable-deferrable 等到备份可以无异常运行
--snapshot=SNAPSHOT 为转储使用给定的快照
--strict-names 要求每个表和(或)schema包括模式以匹配至少一个实体
--use-set-session-authorization
使用 SESSION AUTHORIZATION 命令代替
ALTER OWNER 命令来设置所有权
**联接选项**:
-d, --dbname=DBNAME 对数据库 DBNAME备份
-h, --host=主机名 数据库服务器的主机名或套接字目录
-p, --port=端口号 数据库服务器的端口号
-U, --username=名字 以指定的数据库用户联接
-w, --no-password 永远不提示输入口令
-W, --password 强制口令提示 (自动)
--role=ROLENAME 在转储前运行SET ROLE
对于pg_dump的自定义备份custom和tar类型的备份,需要使用pg_restore进行恢复,pg_restore语法如下
[postgres@pg01 pg_backup]$ pg_restore --help
pg_restore 从一个归档中恢复一个由 pg_dump 创建的 PostgreSQL 数据库.
用法:
pg_restore [选项]... [文件名]
一般选项:
-d, --dbname=名字 连接数据库名字
-f, --file=文件名 输出文件名(- 对于stdout)
-F, --format=c|d|t 备份文件格式(应该自动进行)
-l, --list 打印归档文件的 TOC 概述
-v, --verbose 详细模式
-V, --version 输出版本信息, 然后退出
-?, --help 显示此帮助, 然后退出
恢复控制选项:
-a, --data-only 只恢复数据, 不包括模式
-c, --clean 在重新创建之前,先清除(删除)数据库对象
-C, --create 创建目标数据库
-e, --exit-on-error 发生错误退出, 默认为继续
-I, --index=NAME 恢复指定名称的索引
-j, --jobs=NUM 执行多个并行任务进行恢复工作
-L, --use-list=FILENAME 从这个文件中使用指定的内容表排序
输出
-n, --schema=NAME 在这个模式中只恢复对象
-N, --exclude-schema=NAME 不恢复此模式中的对象
-O, --no-owner 不恢复对象所属者
-P, --function=NAME(args) 恢复指定名字的函数
-s, --schema-only 只恢复模式, 不包括数据
-S, --superuser=NAME 使用指定的超级用户来禁用触发器
-t, --table=NAME 恢复命名关系(表、视图等)
-T, --trigger=NAME 恢复指定名字的触发器
-x, --no-privileges 跳过处理权限的恢复 (grant/revoke)
-1, --single-transaction 作为单个事务恢复
--disable-triggers 在只恢复数据的过程中禁用触发器
--enable-row-security 启用行安全性
--if-exists 当删除对象时使用IF EXISTS
--no-comments 不恢复注释
--no-data-for-failed-tables 对那些无法创建的表不进行
数据恢复
--no-publications 不恢复发行
--no-security-labels 不恢复安全标签信息
--no-subscriptions 不恢复订阅
--no-tablespaces 不恢复表空间的分配信息
--section=SECTION 恢复命名节 (数据前、数据及数据后)
--strict-names 要求每个表和(或)schema包括模式以匹配至少一个实体
--use-set-session-authorization
使用 SESSION AUTHORIZATION 命令代替
ALTER OWNER 命令来设置所有权
联接选项:
-h, --host=主机名 数据库服务器的主机名或套接字目录
-p, --port=端口号 数据库服务器的端口号
-U, --username=名字 以指定的数据库用户联接
-w, --no-password 永远不提示输入口令
-W, --password 强制口令提示 (自动)
--role=ROLENAME 在恢复前执行SET ROLE操作
选项 -I, -n, -N, -P, -t, -T, 以及 --section 可以组合使用和指定
多次用于选择多个对象.
如果没有提供输入文件名, 则使用标准输入.
3.3.4 表空间
新建表空间
# 新建表空间目录 t_fac_ts
mkdir /home/huangyc/t_fac_ts
# 修改表空间的用户权限
chown postgres /home/huangyc/t_fac_ts
pg库新建表空间
create tablespace t_fac_ts owner postgres location '/home/huangyc/t_fac_ts';
表空间有关的一些语法
# 删除表空间 (需要先drop表空间所有的表, 或者将该空间下所有的表移除才能drop表空间)
DROP TABLESPACE t_fac_ts;
# 修改具体的表到指定表空间下
ALTER TABLE t_fac_tushare_stock_basic SET TABLESPACE t_fac_ts;
# 修改指定库到指定表空间下
ALTER DATABASE name SET TABLESPACE new_tablespace;
3.3.5 锁表处理
pg锁表解锁
查看被锁的表
select a.locktype,a.database,a.pid,a.mode,a.relation,b.relname from pg_locks a join pg_class b on a.relation = b.oid where relname='t_opt_strhdk_blsj';
杀死被锁的pid
select pg_terminate_backend(pid);
3.3.6 表结构修改
-- 修改表名
alter table "user" rename to "ts_user";
-- 添加新字段
alter table table_name add column col_name varchar(50);
-- 丢弃某列
alter table table_name drop column col_name;
-- 添加主键
alter table table_name add primary key("col_name");
-- 修改字段名
alter table table_name rename column old_col_name to new_col_name;
3.3.7 数据更新和查询
设置某字段的值
-- 设置某字段的值
update table_name set col_name=new_value;
-- 更新某个字段并关联其他表
UPDATE table1
SET field_to_update = table2.new_value
FROM table2
WHERE table1.common_column = table2.common_column;
删除表中重复数据
-- 查询[旧表]数据的重复情况
select col1,col2,count(*) from old_table group by col1,col2;
-- 所有字段都一样的情况
create table bak_table as select distinct * from table_name;
-- 查询[新表]数据的重复情况
select col1,col2,count(*) from bak_table group by col1,col2;
truncate table old_table;
insert into old_table (col1,col2) select col1,col2 from bak_table;
不存在插入,存在更新
insert into ... on conflict(column_name) do update set ...
conflict(column_name): column_name字段是判断要查找的数据是否存在,作为判断条件
column_name必须是主键或者其他具有唯一性的字段(如唯一键或排他键)
insert into user(id,username,address,create_date,create_by)
values('1','刘德华','香港',now(),'system')
on conflict(id)
do update set address='中国',update_date=now(),update_by='system';
# 批量的方式
insert into testunnest(id, age, name) values (unnest(array[1,3]), unnest(array[18,10]), unnest(array['valupdated', 'val3'])) on conflict (id) do update set age = excluded.age, name = excluded.name;
3.3.8 数据和结构复制
-- [复制表和数据] 复制表结构和数据 自动建表,不会复制主键什么的
create table new_table as select * from old_table [WITH NO DATA];
-- [复制数据] 复制数据到 新表 表需要提前建,并且表字段要一致,不会复制主键什么的
insert into new_table (col_0, col_1) select col_0, col_1 from old_table;
3.3.9 视图
普通视图
视图是一个虚拟表,它是根据一个或多个基本表的查询结果动态生成的,每次查询视图时都会执行相应的查询
CREATE VIEW view_name AS
SELECT column1, column2, ...
FROM table_name
WHERE condition;
drop view view_name;
物化视图
物化视图是一个实际存储数据的表,它的数据定期刷新,不像普通视图那样每次查询都重新计算。
CREATE MATERIALIZED VIEW materialized_view_name AS
SELECT column1, column2, ...
FROM table_name
WHERE condition;
drop MATERIALIZED VIEW materialized_view_name
需要注意的是,物化视图需要定期手动或自动刷新以更新数据,你可以使用 REFRESH MATERIALIZED VIEW
命令来进行刷新
3.3.10 分页查询
在PostgreSQL数据库中,进行分页查询通常使用LIMIT和OFFSET子句来实现
LIMIT用于限制返回的行数,OFFSET用于指定从结果集的哪一行开始返回数据,下面是一个简单的分页查询示例:
假设我们有一个名为products的表,其中包含产品的名称和价格,我们想要获取第2页,每页显示10条数据,可以使用以下SQL查询语句:
SELECT product_name, price
FROM products
ORDER BY product_id
LIMIT 10 OFFSET 10;
在这个例子中,假设product_id是产品表中的唯一标识符,通过ORDER BY将结果按照product_id排序,LIMIT 10表示返回10行数据,OFFSET 10表示从结果集中的第11行开始返回数据,即返回第2页的数据
3.3.11 乱序查询
在PostgreSQL数据库中,使用ORDER BY RANDOM()语句可以实现按照随机顺序排序查询结果
这样可以让查询结果以随机的顺序返回,每次执行查询时都会得到不同的排序结果
这在需要随机抽样数据或随机展示数据时非常有用
需要注意的是,使用ORDER BY RANDOM()可能会影响查询的性能,因为需要对结果集进行随机排序操作
假设我们有一个名为products的表,其中包含产品的名称和价格,我们想要以随机顺序返回产品列表,可以使用以下SQL查询语句:
SELECT
product_name,
price
FROM
products
ORDER BY
RANDOM();
这将返回products表中产品名称和价格的数据,并以随机的顺序排序,每次执行该查询,返回的产品列表顺序都会不同
3.3.12 列表查询
列表字段的创建
CREATE TABLE your_table (
id SERIAL PRIMARY KEY,
list_column_name _TEXT
);
在PostgreSQL中,你可以使用以下方式来查询数组字段:
- 查询数组字段中包含特定值的行:
SELECT * FROM your_table WHERE your_array_column = ANY('{value1,value2,value3}');
- 查询数组字段中包含任何查询数组中的值的行:
SELECT * FROM your_table WHERE your_array_column && '{value1,value2,value3}';
- 查询数组字段中包含所有查询数组中的值的行:
SELECT * FROM your_table WHERE your_array_column @> '{value1,value2,value3}';
- 查询数组字段的长度大于1的所有行:
SELECT * FROM your_table WHERE array_length(your_array_column, 1) > 1;
- 查询数组字段中第一个元素等于特定值的行:
SELECT * FROM your_table WHERE your_array_column[1] = 'specific_value';
3.3.13 删除重复记录
-- 初始化数据
create table hyc_tmp_del_test(id int, name varchar(255));
create table hyc_tmp_del_test_bk (like hyc_tmp_del_test);
insert into hyc_tmp_del_test select generate_series(1, 10000), 'huangyc';
insert into hyc_tmp_del_test select generate_series(1, 10000), 'huangyc';
insert into hyc_tmp_del_test_bk select * from hyc_tmp_del_test;
-- 最容易想到的方法就是判断数据是否重复,对于重复的数据只保留ctid最小(或最大)的数据,删除其他的
-- id相同的数据,保留ctid最小的,其他的删除
explain analyse delete from hyc_tmp_del_test_bk a where a.ctid <> (select min(t.ctid) from hyc_tmp_del_test_bk t where a.id=t.id); -- 17.112s
-- group by方法通过分组找到ctid最小的数据,然后删除其他数据
explain analyse delete from hyc_tmp_del_test_bk a where a.ctid not in (select min(ctid) from hyc_tmp_del_test_bk group by id); -- 0.052s
-- 高效删除方法
explain analyze delete from hyc_tmp_del_test_bk a where a.ctid = any(array (select ctid from (select row_number() over (partition by id), ctid from hyc_tmp_del_test_bk) t where t.row_number > 1)); -- 0.055s
第二种和第三种感觉差不多,原文说是第三种快不少,这里pg库是14.x版本
关键
pg中每个表都有几个系统隐藏列:tableoid, xmin, xmax,cmin,cmax,ctid
其中tableoid表示表的oid,cmin、cmax、xmin和xmax是mvcc的实现有关
ctid表示行版本在表中的物理位置: 它属于对象标识符类型(oid,Object Identifier Types),是一种行标识符,它的数据使用的元组标识符(tid,tuple identifier)。元组ID是一对(块号,块内的元组索引),用于标识当前行的物理位置。
引申用法
假设我们有一个表格 products
,包含产品名称和产品类别。我们希望从中找出每个产品类别中的前两个产品
SELECT
*
FROM
(
SELECT
product_name,
category,
row_number() OVER (PARTITION BY SUBSTR(p.category, 0, LENGTH(p.category)-2) ORDER BY product_name) AS ranking
FROM
products p
WHERE
p.category LIKE 'Electronics:%'
) subquery
WHERE
ranking <= 2;
在这个例子中,我们有一个产品表 products
,其中包含 product_name
和 category
列
我们想要找出每个以 'Electronics:' 开头的产品类别中的前两个产品
- 子查询部分:
- 子查询从
products
表中选择了产品名称product_name
和产品类别category
- 我们只关注以 'Electronics:' 开头的产品类别,即
p.category LIKE 'Electronics:%'
- 使用
row_number()
函数,对每个以 'Electronics:' 开头的类别进行编号,按产品名称排序
- 子查询从
- 主查询部分:
- 在主查询中,我们选择了子查询的所有列
*
- 我们进一步筛选了结果,只选择了编号小于等于2的记录,以获取每个类别中的前两个产品
- 在主查询中,我们选择了子查询的所有列
3.3.14 索引
-- 获取数据库表中的索引
select * from pg_indexes where tablename = 't_cfg_opinfo';
-- 创建索引
create index index_name on table_name (col_0, col_1);
-- 查询索引
select * from pg_indexes where tablename='table_name';
-- 删除索引
drop index index_name;
什么情况下要避免使用索引?
虽然索引的目的在于提高数据库的性能,但这里有几个情况需要避免使用索引
使用索引时,需要考虑下列准则:
- 索引不应该使用在较小的表上
- 索引不应该使用在有频繁的大批量的更新或插入操作的表上
- 索引不应该使用在含有大量的 NULL 值的列上
- 索引不应该使用在频繁操作的列上
3.3.15 实用sql
-- 查询库中的最大版本
SELECT (CASE WHEN MAX(version) IS NULL THEN -1 ELSE MAX(version) END) + 1 AS version FROM table_name
3.3.16 其他语法
筛选某列,逗号拼接
select string_agg(bs_org_id,',') as bs_org_ids
from bs_org
where par_org_id ='100'
日期转换
select to_char(col_name,'yyyyMMDD')-interval '2 day' from table_name
-- -interval '2 day' 表示往前2天
转时间戳
select '2011-01-06 09:57:59'::timestamp;
TO_TIMESTAMP('2011-01-06 09:57:59', 'YYYY-MM-DD HH24:MI:S')
postgresql 获取分组第一条数据 窗口函数
- 给数据分组并排名,使用
row_number() over (partition by 分组的字段名 order by 排序规则) as 排名
- 从上述第一步中取出,排名为第一的数据,即为第一条数据
select * from 上述第一步 where 排名=1
- 获取前N名的数据,将一中第二步的条件换成
where 排名 < N+1
distributed key
alter table table_name set distributed by (id);
alter table table_name add primary key (id);
4 ORM框架
4.1 ORM框架比较
SQLObject
优点:
采用了易懂的ActiveRecord 模式
一个相对较小的代码库
缺点:
方法和类的命名遵循了Java 的小驼峰风格
不支持数据库session隔离工作单元
Storm
优点:
清爽轻量的API,短学习曲线和长期可维护性
不需要特殊的类构造函数,也没有必要的基类
缺点:
迫使程序员手工写表格创建的DDL语句,而不是从模型类自动派生
Storm的贡献者必须把他们的贡献的版权给Canonical公司
Django's ORM
优点:
易用,学习曲线短
和Django紧密集合,用Django时使用约定俗成的方法去操作数据库
缺点:
不好处理复杂的查询,强制开发者回到原生SQL
紧密和Django集成,使得在Django环境外很难使用
peewee
优点:
Django式的API,使其易用
轻量实现,很容易和任意web框架集成
缺点:
不支持自动化 schema 迁移
多对多查询写起来不直观
SQLAlchemy
优点:
企业级API,使得代码有健壮性和适应性
灵活的设计,使得能轻松写复杂查询
缺点:
工作单元概念不常见
重量级API,导致长学习曲线
相比其他的ORM, SQLAlchemy 意味着,无论你何时写SQLAlchemy代码, 都专注于工作单元的前沿概念 。DB Session 的概念可能最初很难理解和正确使用,但是后来你会欣赏这额外的复杂性,这让意外的时序提交相关的数据库bug减少到0。在SQLAlchemy中处理多数据库是棘手的, 因为每个DB session 都限定了一个数据库连接。但是,这种类型的限制实际上是好事, 因为这样强制你绞尽脑汁去想在多个数据库之间的交互, 从而使得数据库交互代码很容易调试。
4.2 SQLAlchemy
SQLAlchemy 中的 Session、sessionmaker、scoped_session
Contextual/Thread-local Sessions
4.2.1 session和scoped_session
session用于创建程序和数据库之间的会话,所有对象的载入和保存都需通过session对象 。 通过sessionmaker调用创建一个工厂,并关联Engine以确保每个session都可以使用该Engine连接资源 scoped_session 实现了一个线程的隔离,保证不同的线程拿到不同的session, 同一个线程拿到的session 是同一个值
s1 = Session()
s2 = Session()
s1.add(person)
s1.commit()
# 必须先close,s2才能继续操作person
s1.close()
s2.add(person)
session 和scoped_session本质上都是用来操作数据库的,只是session 只适合在单线程下面使用 官方文档提到了scoped_session的正确使用方法。request结束后要调用scoped_session.remove()
使用 create_engine
创建我们需要的DB starting point
from sqlalchemy import create_engine
scheme = 'mysql+pymysql://root:123456@localhost:3306/dev_shopping?charset=utf8'
engine = create_engine(scheme, pool_size=10 , max_overflow=-1, pool_recycle=1200)
create_engine
函数常用参数:
- pool_size=10 # 连接池的大小,0表示连接数无限制
- pool_recycle=-1 # 连接池回收连接的时间,如果设置为-1,表示没有no timeout, 注意,mysql会自动断开超过8小时的连接,所以sqlalchemy沿用被mysql断开的连接会抛出MySQL has gone away
- max_overflow=-1 # 连接池中允许‘溢出’的连接个数,如果设置为-1,表示连接池中可以创建任意数量的连接
- pool_timeout=30 # 在连接池获取一个空闲连接等待的时间
- echo=False # 如果设置True, Engine将会记录所有的日志,日志默认会输出到sys.stdout
创建Engine
之后,接下来的问题,就是如何使用Engine
在单进程中,建议在在初始化的模块的时候创建Engine
, 使Engine
成为全局变量, 而不是为每个调用Engine
的对象或者函数中创建, Engine
不同于connect
, connect
函数会创建数据库连接的资源,Engine
是管理connect
创建的连接资源
在多进程中,为每个子进程都创建各自的Engine
, 因为进程之间是不能共享Engine
4.2.2 几种操作方式
Working with Engines and Connections
SqlAlchemy的Engine,Connection和Session 区别?适合什么时候用?
Engine方式
Engine是SQLAlchemy中连接数据库最底层级别的对象,它维护了一个连接池,可以在应用程序需要和数据库对话时使用。在Engine.execute(close_with_result=True) close_with_result=True 表示连接自动关闭;
result = engine.execute('SELECT * FROM tablename;')
conn = engine.connect(close_with_result=True)
result = conn.execute('SELECT * FROM tablename;')
for row in result:
print(result['columnname']
result.close()
Connection方式
Connection,实际上是执行SQL查询的工作,每当你想更好的控制连接的属性,如何时关闭等都建议使用这个操作;比如在一个事务中,要控制它提交commit的时间,在connection控制中就可以运行多个不同的SQL语句,如果其中一个出现问题,则其他所有的语句都会撤销更改;
connection = engine.connect()
trans = connection.begin()
try:
connection.execute("INSERT INTO films VALUES ('Comedy', '82 minutes');")
connection.execute("INSERT INTO datalog VALUES ('added a comedy');")
trans.commit()
except:
trans.rollback()
raise
Session方式
Session,一般都是用于ORM中,因为在ORM中,会自动生成SQL语句以及自动连接数据库(自己配置),使用session.execute()也是个编辑的方法,可以将会话绑定到任何对象;如果你确定使用ORM,就建议使用session来处理execute(),否则还是使用connection更好方便;
总结: 从应用角度来看,可以把这三类分为两种:
直接使用Engine.execute() 或Connection.execute(),更加灵活,可以使用原生SQL语句
使用Session处理交易类型的数据,因为方便使用session.add(), session.rollback(), session.commit(), session.close()等,它是使用ORM时推荐的一种和数据库交互的方式