diff --git a/advance/ctx/context.go b/advance/ctx/context.go index 97fb03830a9f2be72590857da5d38da9ca744cae..dd2ea3f3a6d58a474c5d772908649fa218e9b105 100644 --- a/advance/ctx/context.go +++ b/advance/ctx/context.go @@ -89,6 +89,7 @@ func TestBusinessTimeout(t *testing.T) { timeoutCtx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() end := make(chan struct{}, 1) + // 这个goroutine只要执行了,就算超时也是退出不了的,除非在MyBusiness()里面加个select来判断是否超时然后退出才可以! go func() { MyBusiness() end <- struct{}{} diff --git a/advance/ctx/graceful_shutdown/main.go b/advance/ctx/graceful_shutdown/main.go index 1fdb6b099b04a705f7c6066952e63ffba6bf7d18..e539af36278489e7230f96e55b048008814f87cc 100644 --- a/advance/ctx/graceful_shutdown/main.go +++ b/advance/ctx/graceful_shutdown/main.go @@ -10,11 +10,11 @@ import ( // 注意要从命令行启动,否则不同的 IDE 可能会吞掉关闭信号 func main() { - s1 := service.NewServer("business", "localhost:8080") + s1 := service.NewServer("business", "localhost:1231") s1.Handle("/", http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { _, _ = writer.Write([]byte("hello")) })) - s2 := service.NewServer("admin", "localhost:8081") + s2 := service.NewServer("admin", "localhost:4312") app := service.NewApp([]*service.Server{s1, s2}, service.WithShutdownCallbacks(StoreCacheToDBCallback)) app.StartAndServe() } diff --git a/advance/ctx/graceful_shutdown/service/shutdown.go b/advance/ctx/graceful_shutdown/service/shutdown.go index 673cc9dbf066a03050c6e2283e9e22bbb7a1a6ec..779da75e44095892cb082b84d6c04ec20a18d59a 100644 --- a/advance/ctx/graceful_shutdown/service/shutdown.go +++ b/advance/ctx/graceful_shutdown/service/shutdown.go @@ -4,6 +4,10 @@ import ( "context" "log" "net/http" + "os" + "os/signal" + "sync" + "syscall" "time" ) @@ -17,7 +21,10 @@ type ShutdownCallback func(ctx context.Context) // 你需要实现这个方法 func WithShutdownCallbacks(cbs ...ShutdownCallback) Option { - panic("implement me") + //panic("implement me") + return func(app *App) { + app.cbs = cbs + } } // 这里我已经预先定义好了各种可配置字段 @@ -37,11 +44,21 @@ type App struct { // NewApp 创建 App 实例,注意设置默认值,同时使用这些选项 func NewApp(servers []*Server, opts ...Option) *App { - panic("implement me") + res := &App{ + waitTime: 5 * time.Second, + cbTimeout: 2 * time.Second, + shutdownTimeout: 10 * time.Second, + servers: servers, + } + for _, opt := range opts { + opt(res) + } + return res } // StartAndServe 你主要要实现这个方法 func (app *App) StartAndServe() { + // 这里可以加上启动失败的情况,有一个失败,其他都会失败。 for _, s := range app.servers { srv := s go func() { @@ -57,22 +74,69 @@ func (app *App) StartAndServe() { // 从这里开始优雅退出监听系统信号,强制退出以及超时强制退出。 // 优雅退出的具体步骤在 shutdown 里面实现 // 所以你需要在这里恰当的位置,调用 shutdown + c := make(chan os.Signal, 1) + signals := []os.Signal{ + syscall.SIGKILL, syscall.SIGSTOP, + syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP, + syscall.SIGABRT, syscall.SIGSYS, syscall.SIGTERM, + } + signal.Notify(c, signals...) + select { + // 监听信号 + case <-c: + app.shutdown() + go func() { + select { + case <-c: + os.Exit(1) // 再次监听信号 + case <-time.After(app.shutdownTimeout): + log.Println() + os.Exit(1) // 再次监听信号 + } + }() + } } // shutdown 你要设计这里面的执行步骤。 func (app *App) shutdown() { log.Println("开始关闭应用,停止接收新请求") // 你需要在这里让所有的 server 拒绝新请求 - + for _, serv := range app.servers { + serv.rejectReq() + } log.Println("等待正在执行请求完结") // 在这里等待一段时间 + // 这里可以改造为实时统计正在处理的服务的数量,为0则开启下一步。 + time.Sleep(app.waitTime) log.Println("开始关闭服务器") // 并发关闭服务器,同时要注意协调所有的 server 都关闭之后才能步入下一个阶段 - + var wg sync.WaitGroup + wg.Add(len(app.servers)) + for _, srv := range app.servers { + srvCp := srv + go func() { + if err := srvCp.stop(context.Background()); err != nil { + log.Printf("关闭服务失败%s \n", srvCp.name) + } + }() + wg.Done() + } + wg.Wait() log.Println("开始执行自定义回调") + // 这里的回调位置可以随意设置,可以放到关闭服务中间,也可以放在关闭服务之前 + // 如果还要考虑到回调之间的顺序,比如A先回调再B回调,中间件一般不考虑这个情况,让用户自己考虑去吧。 // 并发执行回调,要注意协调所有的回调都执行完才会步入下一个阶段 - + wg.Add(len(app.servers)) + for _, cb := range app.cbs { + c := cb + go func() { + ctx, cancel := context.WithTimeout(context.Background(), app.cbTimeout) + c(ctx) + cancel() + wg.Done() + }() + } // 释放资源 log.Println("开始释放资源") app.close() @@ -95,7 +159,8 @@ type Server struct { // serverMux 既可以看做是装饰器模式,也可以看做委托模式 type serverMux struct { - reject bool + reject bool // 可以不用原子类型 atomic.Value/atomic.Bool 因为它是单向的,没有其他地方会将其改为false + // 使用顺序:atomic(会复杂一些,比如+1 -1等) > 读写锁(使用较为简单) > 写锁 *http.ServeMux } @@ -106,6 +171,7 @@ func (s *serverMux) ServeHTTP(w http.ResponseWriter, r *http.Request) { _, _ = w.Write([]byte("服务已关闭")) return } + // 用上面装饰下面这句 s.ServeMux.ServeHTTP(w, r) } @@ -133,6 +199,7 @@ func (s *Server) rejectReq() { s.mux.reject = true } +// 这里的stop为什么要再封装一层? 因为要在应用层做处理 而不是仅仅在框架层面让其去断掉连接。 func (s *Server) stop(ctx context.Context) error { log.Printf("服务器%s关闭中", s.name) return s.srv.Shutdown(ctx) diff --git a/advance/ctx/graceful_shutdown/service/shutdown_answer.go b/advance/ctx/graceful_shutdown/service/shutdown_answer.go index d027750e307ce891734acc5e3d42aa5daedf081d..b0f23430bf3d5f92b36504ba51641f94c09bf0dc 100644 --- a/advance/ctx/graceful_shutdown/service/shutdown_answer.go +++ b/advance/ctx/graceful_shutdown/service/shutdown_answer.go @@ -1,5 +1,7 @@ //go:build answer +// go build -tags=answer +// 作业讲解在8-24 第五讲 网络编程与SQL编程中 package service import ( @@ -113,9 +115,7 @@ func (app *App) shutdown() { wg.Done() }() } - wg.Wait() - log.Println("开始执行自定义回调") // 执行回调 wg.Add(len(app.cbs))