# FlaskBoot **Repository Path**: jeff10/flask-boot ## Basic Information - **Project Name**: FlaskBoot - **Description**: 开箱即用,一款python后端开发脚手架 - **Primary Language**: Python - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 10 - **Forks**: 4 - **Created**: 2024-03-16 - **Last Updated**: 2025-01-27 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## 引言 Flask框架更像是为我们开发者提供了一个Web开发引擎,所需的其它开发工具需要引入相应的插件,当然这也符合Flask轻量的特点——“按需插拔”。 所以开发了该脚手架的定义一套能够拿来即用的Flask工程,无需自己从零组装,可基于FlaskBoot项目模板按需定制化自己的项目。 ## 项目工程目录 + config:配置文件目录 + controller:restful接口文件目录 + enums: 枚举 + model:ORM模型目录 + schema:序列化与反序列化模型目录 + test:测试目录 + filter:中间件过滤器目录 + utils:工具文件目录 + vo:数据模型处理类目录 + data:数据存储目录 ## 开发约定 + 业务逻辑写在service层 ```python class PublicService(BaseService): @staticmethod def get_sys_time(): current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") return Status.OK, {"data": formatted_time} ``` + 引入插件的操作实体类需在工厂函数create_app中实例化,防止循环引用 ```python from flask_sqlalchemy import SQLAlchemy # 防止循环依赖 db = SQLAlchemy() ``` + model文件的每个实体类下建议封装一些执行原生SQL的方法,便于在service层调用,使SQL与逻辑分离便于维护 + controller层的返回值可以使用如下格式: ```python class UploadController: @staticmethod @upload.route('/upload', methods=['POST']) def upload_png(): status, message, info = UploadService.png_service() return api_response(status=status, message=message, info=info) ``` + 路由注册 在app.py中 ```python app.register_blueprint(user_bp) ``` ## 配置文件 + 配置文件位置:根目录下的application.yaml + 主要配置db,线程池,日志等 + 使用示例 ```yaml redis: HOSTNAME: "YOU HOSTNAME" PORT: "YOUR PORT" PASSWORD: "PASSWORD" ``` ## 接口文档 + 接口文档采用swagger + 如何使用? + 在config里面配置一个全局的swagger对象,然后在app.py中注册 + 在controller层的接口方法上添加装饰器,即可生成接口文档 + 项目启动后,生成的接口文档地址:http://127.0.0.1:4000/docs#/ + 接口请求返回参数统一采用验证模型 ```python @siwa.doc(query=TaskVo) def start_task(): # 获取请求数据 request_data = request.get_json() # 交给数据校验模型 task_data = TaskVo(**request_data) status, message, info = TaskService.start_task(task_data) return api_response(status=status, message=message, info=info) ``` ## 日志记录 ## 单元测试 * test目录存放测试代码 ## 鉴权 目前采用单token的方式,token过期,则会退出登录,用户需要重新登陆 ### 系统设计 * api接口controller层:哪个接口需要鉴权,在哪个接口上面添加鉴权注解 ○ 在调用server层之前会先执行require_token这个装饰器 ○ 参数传角色权限,如admin * token生成:使用jwt库生成令牌 ```python class JwtUtils: # Your key, used to sign and verify JWT, should be kept secret __SECRET_KEY = SECRET_KEY __time = TIME __redis_switch = REDIS_SWITCH @classmethod def create_new_token(cls, username, password): """ Create a new JWT and generate jwt (token) :param username: :param password: :return: """ # Set the validity period of JWT expiry_time = datetime.datetime.utcnow() + datetime.timedelta(minutes=cls.__time) payload = { 'username': username, 'password': password, 'exp': expiry_time } # Generate JWT return jwt.encode(payload, cls.__SECRET_KEY, algorithm='HS256') ``` * 解析token,返回用户名,密码等信息 ### 数据库设计 * 两张表必须要有,以及数据初始化 ```json CREATE TABLE Users ( user_id INT AUTO_INCREMENT PRIMARY KEY, user_name VARCHAR(255) NOT NULL UNIQUE, pass_word VARCHAR(255) NOT NULL, email VARCHAR(255), role_id INT, is_deleted TINYINT(1) DEFAULT 0 ); CREATE TABLE roles ( role_id INT AUTO_INCREMENT PRIMARY KEY, role_name VARCHAR(255) NOT NULL, access_route VARCHAR(1024), action VARCHAR(255), is_deleted TINYINT(1) DEFAULT 0 ); // 初始化数据 INSERT INTO users (user_name, pass_word, role_id, is_deleted) VALUES ('admin', 'admin123', 1, 0); INSERT INTO roles (role_id, role_name, is_deleted) VALUES (1, 'admin', 0); ``` ## 异步处理封装 + 在整个项目中,会存在一些长时间的运行的接口,比如数据导出,全用例执行等,把这些接口放到后台执行,前端不用等待,可以提高用户体验。 + 本框架通过调用线程池的方式,实现异步处理 + 统一去线程池中去获取线程,线程数自己配置,通过线程池的submit方法,提交任务 + 配置位置:根目录下的application文件 ```python thread_pool: max_workers: 3 wait_for_workers: 4 timeout: 60 thread_name_prefix: 'MyThreadPool' ``` + 配置文件 ```ini 通过线程池的submit方法,提交任务 ``` ## orm框架封装 + 基于sqlalchemy + 全局只有唯一一个db实例,防止循环引用,在BaseModel中实例化 ```python from flask_sqlalchemy import SQLAlchemy db = SQLAlchemy() ``` + 封装数据库日常curd操作 + 增 + 删 + 改 + 查 + 分页 + 事务 ```python class BaseModel(db.Model): __abstract__ = True @classmethod def insert(cls, data): """ 插入数据库实例 :return: """ db.session.add(data) try: db.session.commit() return True except Exception: db.session.rollback() raise ``` ### 分页 + 基于sqlalchemy + 封装分页方法 ```python class BaseModel(db.Model): __abstract__ = True @classmethod def paginate(cls, page, per_page, **kwargs): """ 分页查询 :param page: :param per_page: :param kwargs: :return: """ query = cls.query.filter_by(**kwargs) return query.paginate(page=page, per_page=per_page, error_out=False) ``` ### 接口级事务 ### 三段式事务 + 目前开启三段式事物,有两种方式,一种是接口级事物那里自动开启了三段式事物 + ```python def wrapper(*args, **kwargs): with db.session.begin(nested=True): # 开启事务,嵌套事务,with自动关闭事务,不需要手动调用db.session.close() try: res = func(*args, **kwargs) db.session.commit() # 提交事务,统一在接口级别提交,不需要在每个service中单独提交,也不需要在model层提交 except exc.SQLAlchemyError as e: logger.error(e) db.session.rollback() return Status.REPEAT_MESSAGE except Exception as e: logger.error(e) db.session.rollback() return Status.GENERIC_ERROR # 返回通用的错误信息 return res ``` ``` + 另一种是在service层手动开启三段式事物,或者在model层手动开启三段式事物 ```python class BaseModel(db.Model): __abstract__ = True @classmethod def insert(cls, data): """ 插入数据库实例 :return: """ db.session.add(data) try: db.session.commit() return True except Exception: db.session.rollback() raise ``` ``` ## 任务调度 + 可以通过任务调度调用项目任意位置的方法 + 任务调度的方法需要在app.py中注册Scheduler().init_apscheduler() + 采用单例模式,全局只有一个Scheduler实例 + 时间格式,支持cron表达式 + 后台管理地址:随后台启动,访问根目录即可查看web页面 + web页面包含任务列表,创建任务 + 接口包含:创建任务,删除任务,查询任务 + ## 中间件处理 + before_request:在每个请求被路由之前调用,通常用来进行身份验证、权限管理和日志记录等操作; + after_request:在每个请求被路由之后调用,通常用于修改响应头信息和格式化数据等操作; + teardown_request:在每次请求处理后调用,无论请求成功或是抛出异常,通常用于资源回收和错误处理等操作; + teardown_appcontext:在每个请求上下文被销毁时调用,通常用来清理应用程序状态或垃圾回收等操作; ```python def middleware(app): @app.before_request def before_request(): logger.info('在每个请求被路由之前调用') @app.before_request def after_request(): logger.info('在每个请求被路由之后调用') @app.teardown_request def teardown_request(exception): logger.info('在每次请求处理后调用') @app.teardown_appcontext def shutdown_session(exception=None): logger.info('在每个请求上下文被销毁时调用') ``` ## 缓存 + 背景:在项目中,有些数据是不需要每次都去数据库中查询的,比如:环境列表,项目列表,接口列表等,这些数据是不会经常变动的,所以可以将这些数据缓存起来, 减少数据库的压力 + 目前框架采用flask_caching缓存 + 应用程序级别缓存:适用于应用程序启动后生命周期内所有请求共享的缓存,创建一个 Cache 实例即可用于整个 Flask 应用程序 + 视图函数级别缓存仅应用于同一视图函数内的多个请求,不同视图函数之间的缓存是独立的。可以通过在视图函数中使用 cache.cached() 装饰器来设置视图函数级别的缓存。 缓存装饰器提供了一组有用的参数,例如 timeout(缓存过期时间)、key_prefix(缓存键前缀)和 unless(条件函数,使装饰器返回未缓存的函数调用结果)等,可根据需求进行配置。 + 使用示例: 直接缓存接口数据,建议不要在db操作中使用缓存,因为db返回的数据需要处理序列化 ```python @staticmethod @env_config.route('/list', methods=['GET']) @cache.cached(key_prefix='env_list', timeout=20) def env_list(): """ 环境列表 分页查询 :return: """ status, info = EnvConfigService.env_list_service() return api_response(status=status, info=info) ``` ## 公共处理 ### 统一返回数据 + 调用api_response统一返回数据 往info中传入需要返回的数据 ```python def env_list_service(): """ 环境列表 分页查询 :return: """ try: page = request.args.get('page') size = request.args.get('size') logger.info(f'page:{type(page)},size:{type(size)}') if size: size = int(size) else: size = 10 data = Env.paging_query(page, size) result = [] for env in data: item = { 'id': env.id, 'env_name': env.env_name, 'url': env.url, 'create_time': env.format_create_time() } result.append(item) return Status.OK, {"data": result, "total": data.total} except Exception as e: logger.error(e) return Status.ERROR_SELECT, '查询失败', '' ``` #### 状态码枚举 + 定义通用的状态码枚举 + 使用示范 ```python class Status(Enum): OK = {'code': 2000, 'message': 'OK'} OK_MESSAGE = {'code': 2001} BAD_REQUEST = {'code': 4000, 'message': 'Bad Request'} NOT_FOUND = {'code': 4004, 'message': 'Not Found'} BAD_MESSAGE = {'code': 4001} REPEAT_MESSAGE = {'code': 1000, 'message': 'Repeat Message'} ERROR_SELECT = {'code': 1000, 'message': 'Internal Server Error'} GENERIC_ERROR = {'code': 1001, 'message': 'Service is busy'} ERROR_INSERT = {'code': 1002, 'message': 'Insert Error'} ``` #### 返回体 + 定义通用的返回体 ## 统一异常处理 + 采用装饰器的方式,统一处理异常,flask框架中的异常,都会被捕获,采用flask装饰器errorhandler实现 + errorhandler() 装饰器用于在 Flask 中捕获未捕获的异常。当 Flask 应用程序遇到未处理的异常时,会自动调用被 errorhandler() 装饰的视图函数,以此为异常提供一个自定义响应。 + 需要在flask实例中进行注册 ```python business_exception(app) application_exception(app) ``` #### 系统级异常 ```python def application_exception(app): """ 系统异常, 用于处理应用程序中的异常,使用flask的errorhandler装饰器,可以捕获指定的异常 :param app: :return: """ @app.errorhandler(500) def handle_500_error(error): api_response( status=Status.BAD_MESSAGE, message='服务器内部错误', info=error ) ``` #### 业务级异常 ```python def business_exception(app): """ 业务异常, 用于处理业务逻辑中的异常,使用flask的errorhandler装饰器,可以捕获指定的异常 当出现 ValidationError 异常时,会自动调用 handle_base_error 函数,并将异常实例作为参数传递给该函数, 因此我们可以直接使用异常实例 error 调用其内部的 to_response 方法,返回封装好的 API 响应对象给客户端。 :param app: :return: """ @app.errorhandler(ValidationError) def handle_base_error(error): return error.to_response() ``` ### excel导出 + 使用flask_excel插件进行excel导出 + 使用示例: + sheet_name="test_case" 为sheet页名称 + file_name="test_case" 为文件名称 + 需要在flask实例中进行初始化 ```python @staticmethod @BaseService.transactional def export_test_case_service(): """ 导出测试用例 :return: """ content = [['id', 'case_name', 'path', 'model', 'data', 'create_time', 'is_smoke', 'assertion']] case = TestCase.query.order_by(TestCase.created_time.desc()) if case: for item in case: content.append( [item.id, item.case_name, item.path, item.model_name, item.data, item.created_time, item.is_smoke, item.assertion]) res = excel.make_response_from_array(content, "xlsx", file_name="test_case", sheet_name="test_case") logger.info(f'res:{res}') with open('/Users/jeff/pyproject/api-test/data/case.xlsx', 'wb') as f: f.write(res.data) return Status.OK ``` ### 数据校验 + 使用pydantic进行数据校验 + 先定义一个数据校验模型 ```python class TaskVo(BaseModel): """ 定时任务数据校验模型 """ cron_time: str method_name: str module_name: str @validator('cron_time') def cron_time_length(cls, v): if len(v) < 1 or len(v) > 20: raise ValidationError('cron_time长度必须在1到20之间') return v ``` + 然后再调用这个模型,进行数据校验 ```python @staticmethod @tasks.route('/start', methods=['POST']) def start_task(): # 获取请求数据 request_data = request.get_json() # 交给数据校验模型 task_data = TaskVo(**request_data) status, message, info = TaskService.start_task(task_data) return api_response(status=status, message=message, info=info) ``` ## 线程池 + 采用单例模式,整个程序中,只允许有一个对象 + 线程池作用:统一管理整个项目的线程数,可防可控,当超过线程数后 a. 异步处理都统一采用线程池进行操作 b. 限制并发线程数量,防止过多的线程创建导致内存溢出和 CPU 开销等问题 c. 节省资源:线程池避免了大量创建和销毁线程的资源浪费。 d. 提高程序可靠性:线程池能够限制线程数量和资源使用量,避免出现资源耗尽等问题,从而提高程序的稳定性和可靠性 e. 异步请求超时处理,比如timeout超过60 ```python class ThreadPool(object, metaclass=Singleton): def __init__(self, max_workers=2, wait_for_workers=None, thread_name_prefix='ThreadPoolExecutor', timeout=None): self._executor = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix=thread_name_prefix) self._wait_for_workers = wait_for_workers self._timeout = timeout def submit(self, task, *args, **kwargs): task = partial(task, *args, **kwargs) if self._wait_for_workers is not None: future = self._executor.submit(task) if self._wait_for_workers > 0 and self._executor._work_queue.qsize() >= self._wait_for_workers: self._executor._work_queue.get(timeout=self._timeout) return future else: return self._executor.submit(task) def shutdown(self, wait=True): self._executor.shutdown(wait=wait) def __del__(self): self._executor.shutdown(wait=False) ``` + 在使用 submit() 方法将任务提交到线程池中时,它们将在可用的线程池线程上异步执行。因为线程可以在任何时候处理任务,所以结果的顺序可能与它们被提交到线程池的顺序不同。 ```python future1 = thread_pool.submit(worker, 'Task A', 3) future2 = thread_pool.submit(worker, 'Task B', 4) ```