# pgmanage **Repository Path**: manjim/pgmanage ## Basic Information - **Project Name**: pgmanage - **Description**: pgsql数据库工具 - **Primary Language**: Python - **License**: MulanPSL-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2024-12-21 - **Last Updated**: 2025-04-16 ## Categories & Tags **Categories**: Uncategorized **Tags**: 自建-包项目, Pip ## README # 使用说明 本程序包用到的库:psycopg3 官方文档:https://www.psycopg.org/psycopg3/docs/advanced/pool.html#other-ways-to-create-a-pool 开源地址:https://gitee.com/manjim/pgmanage ```python # 依赖模块 psycopg[binary,pool] # pandas df 转列表字典 data = dfbb.to_dict(orient='records') ``` ## 1. 下载安装 ```python # 安装 pip install pgmanage # 升级 pip install --upgrade pgmanage # 卸载 pip uninstall pgmanage ``` ## 2. 导入包模块 ```python from pgmanage import PgPool # 数据库连接池模块 from pgmanage import PgExec # 全功能模块 包含连接池 from pgmanage import AsyncPgPool # 异步数据库连接池模块 from pgmanage import AsyncPgExec # 异步全功能模块 包含连接池 ``` ## 数据库连接字符串 根据环境变量配置的数据库字符串,自动链接数据库。也可传入连接字符串参数。 ``` 环境变量名称:PGLINK_URL 链接字符串示例:'dbname=shop_data user=postgres password=1116666688 host=127.0.0.1 port=5432' 环境变量写法示例:PGLINK_URL='dbname=shop_data user=postgres password=1116666688 host=127.0.0.1 port=5432' ``` ## 3.使用示例 ### 同步模式 - 示例1——PgPool ```python from pgmanage import PgPool dbm = PgPool() with dbm.pool.connection() as conn: with conn.cursor() as cur: cur.execute("SELECT 1") data = cur.fetchone() print("查询结果:",data) # 使用完成后会自动关闭连接池 ``` - 示例2-PgExec ```python # 设置日志 import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) from pgmanage import PgExec def main(): # 生成10万行的测试数据 data_to_bulk_insert = [] for i in range(100000): data_to_bulk_insert.append({'id': f'{i}', 'name': f'name{i}88','name2': f'name{i}88', 'age': i % 100, 'email': f'email{i}@example.com','fasle': True}) # 创建数据库管理器实例 db_mgr = PgExec() # 写入数据,主键冲突时更新所有非主键列 upsert_data rows_inserted = db_mgr.upsert_data( schema='data_test', table='table_test', data=data_to_bulk_insert, batch_size=10000, update=True, create=True, add_columns=True, max_workers=10, ) logger.info(f"总共插入 {rows_inserted} 行") # 执行 SQL 查询 query = "SELECT * FROM data_test.table_test LIMIT 10;" result = db_mgr.exec_query(query) for row in result: logger.info(row) # 删除数据 query = "delete FROM data_test.table_test;" result = db_mgr.exec_query_rowcount(query) logger.info(f"删除结果:{result}") if __name__ == '__main__': main() ``` ### 异步模式 - 示例1——AsyncPgPool ```python import asyncio from pgmanage import AsyncPgPool async def main(): try: # 初始化连接池 pool = await AsyncPgPool.initialize() # 使用连接池执行数据库操作 async with pool.connection() as conn: async with conn.cursor() as cur: await cur.execute("SELECT 1") rows = await cur.fetchall() for row in rows: print("查询结果:",row) except Exception as e: logger.error(f"发生错误: {e}") raise if __name__ == "__main__": asyncio.run(main()) ``` - 示例2-AsyncPgExec ```python import logging import asyncio from pgmanage import AsyncPgExec # 设置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # 使用示例 async def main(): import os DBURI = os.environ.get("PGLINK_URL") # 方式1:使用上下文管理器(推荐) async with AsyncPgExec(DBURI) as dbmange: # 生成测试数据 data_to_bulk_insert = [ {'id': f'{i}', 'name': f'name{i}88', 'age': i % 100, 'email': f'email{i}@example.com', 'fasle': True} for i in range(100000) ] result = await dbmange.upsert_data( 'data_test', 'table_test', data_to_bulk_insert, batch_size=10000, update=True, create=True, add_columns=True ) logger.info(f"总共成功插入或更新了 {result} 条记录") # 方式2:直接使用(程序结束时会自动关闭连接池) # 生成测试数据 data_to_bulk_insert = [ {'id': f'{i}', 'name': f'name{i}88', 'age': i % 100, 'email': f'email{i}@example.com', 'fasle': True} for i in range(100000) ] db_exec = AsyncPgExec(DBURI) db_exec.pool = await db_exec.async_pool.initialize() result = await db_exec.upsert_data( 'data_test', 'table_test', data_to_bulk_insert, batch_size=10000, update=True, create=True, add_columns=True ) logger.info(f"总共成功插入或更新了 {result} 条记录") # 不需要手动关闭,atexit 会处理 if __name__ == "__main__": asyncio.run(main()) ```