k8s优雅关闭

k8s如何保障服务健壮性 之 实现优雅关闭

必要性

  1. 没有graceful的关闭将导致请求连接异常;
  2. 数据统计短时间内出现大量错误;

实现

两类信号

SIGTERM:通知进程进行graceful信号;

SIGKILL:硬终止信息;

k8s中关闭pod时的流程

https://kubernetes.io/docs/concepts/workloads/pods/pod/#termination-of-pods

简而言之就是:

  1. 先把pod标记为Terminating,此时service就会把该pod去除了;
  2. 发送SIGTERM给pod内的所有容器;
  3. pod等待grace period结束或者pod提前处理完SIGTERM;
  4. pod发送SIGKILL给所有容器;

确保信号正确传递到进程

CMD的坑

1
CMD myapp

相当于:

1
/bin/sh -c myapp

以上的写法,容器接收到信号的进程是/bin/sh而不是myapp,这种写法会依赖于真正使用的shell,有些shell是不会传递信号给子进程的。比如基础镜像使用的是Alpine Linux下的基础shell就不会,但是bash就可以。

使用EXEC的方式

1
CMD ["myapp"]

上面的方式将会直接执行myapp,但是这种就不能把环境变量当做参数传递了???

base方式

1
CMD ["/bin/bash", "-c", "myapp --arg=$ENV_VAR"]

解决以上问题。

k8s支持的几种方式

1、yaml中修改

1
terminationGracePeriodSeconds: 60

2、delete命令

1
kubectl delete pod-name --grace-peroid=60

3、preStop Hook

1
2
3
4
5
lifecycle:
preStop:
exec:
# SIGTERM triggers a quick exit; gracefully terminate instead
command: ["/usr/sbin/nginx","-s","quit"]

4、validating webhook

指到资源清理完成才返回true,否则返回false,这样pod就能保证清理完才推出,而不会因为grace peroid被强制清除。

程序支持(最重要)

思路

  1. 首先关闭所有的监听,如果有使用服务注册之类的话,应该也把该服务从注册中去除;
  2. 然后关闭所有的空闲连接;
  3. 然后无限期等待连接处理完毕转为空闲,并关闭;
  4. 如果提供了带有超时的Context,将在服务关闭前返回Context的超时错误;

http

go http的server.Shutdown

如果你的服务是被其他服务调用的,那么关闭会比较复杂,

  1. 约定调用者的keep-alive timeout 时间,默认为30秒
  2. 服务关闭时,先设置 keep-alive 为 false
  3. 服务关闭时,再等待30秒
  4. 再调用server.Shutdown

grpc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package main

import (
"context"
"flag"
"fmt"
"log"
"net"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"syscall"
"time"

"github.com/blademainer/commons/pkg/logger"

"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
pb "google.golang.org/grpc/examples/helloworld/helloworld"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/keepalive"
)

const serviceName = "myserviced"

var (
version = "no version"

debugPort = flag.Int("debugPort", 16161, "debug port")
httpPort = flag.Int("httpPort", 8888, "http port")
grpcPort = flag.Int("grpcPort", 9200, "grpc port")
healthPort = flag.Int("healthPort", 6666, "grpc health port")
)

type server struct {
}

func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("Received: %v", in.GetName())
return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}

func main() {
flag.Parse()

logger.Infof("Starting app, version: %v", version)

// shutdown functions
shutdownFunctions := make([]func(context.Context), 0)


ctx, cancel := context.WithCancel(context.Background())
shutdownFunctions = append(shutdownFunctions, func(ctx context.Context) {
cancel()
})
defer cancel()

interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM)
defer signal.Stop(interrupt)

g, ctx := errgroup.WithContext(ctx)

g.Go(func() error {
//profiles := pprof.Profiles()

httpServer := &http.Server{
Addr: fmt.Sprintf(":%d", *debugPort),
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
Handler: nil,
}
shutdownFunctions = append(shutdownFunctions, func(ctx context.Context) {
err := httpServer.Shutdown(ctx)
if err != nil {
logger.Errorf("failed to shutdown pprof server! error: %v", err.Error())
}
})

logger.Infof("pprof server serving at :%d", *debugPort)

if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Errorf("failed to listen: %v", err.Error())
return err
}
return nil
})

// web server metrics
g.Go(func() error {
httpServer := &http.Server{
Addr: fmt.Sprintf(":%d", *httpPort),
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}
shutdownFunctions = append(shutdownFunctions, func(ctx context.Context) {
err := httpServer.Shutdown(ctx)
if err != nil {
logger.Errorf("failed to shutdown pprof server! error: %v", err.Error())
}
})
logger.Infof("HTTP Metrics server serving at :%d", *httpPort)

if err := httpServer.ListenAndServe(); err != http.ErrServerClosed {
return err
}

return nil
})

// gRPC Health Server
healthServer := health.NewServer()
g.Go(func() error {
grpcHealthServer := grpc.NewServer()

shutdownFunctions = append(shutdownFunctions, func(ctx context.Context) {
healthServer.SetServingStatus(fmt.Sprintf("grpc.health.v1.%s", serviceName), healthpb.HealthCheckResponse_NOT_SERVING)
grpcHealthServer.GracefulStop()
})

healthpb.RegisterHealthServer(grpcHealthServer, healthServer)

haddr := fmt.Sprintf(":%d", *healthPort)
hln, err := net.Listen("tcp", haddr)
if err != nil {
logger.Errorf("gRPC Health server: failed to listen, error: %v", err)
os.Exit(2)
}
logger.Infof("gRPC health server serving at %s", haddr)
return grpcHealthServer.Serve(hln)
})

// gRPC server
g.Go(func() error {
addr := fmt.Sprintf(":%d", *grpcPort)
ln, err := net.Listen("tcp", addr)
if err != nil {
logger.Errorf("gRPC server: failed to listen, error: %v", err)
os.Exit(2)
}

server := &server{
}
grpcServer := grpc.NewServer(
// MaxConnectionAge is just to avoid long connection, to facilitate load balancing
// MaxConnectionAgeGrace will torn them, default to infinity
grpc.KeepaliveParams(keepalive.ServerParameters{MaxConnectionAge: 2 * time.Minute}),
)
pb.RegisterGreeterServer(grpcServer, server)
shutdownFunctions = append(shutdownFunctions, func(ctx context.Context) {
healthServer.SetServingStatus(fmt.Sprintf("grpc.health.v1.%s", serviceName), healthpb.HealthCheckResponse_NOT_SERVING)
grpcServer.GracefulStop()
})

logger.Infof("gRPC server serving at %s", addr)

healthServer.SetServingStatus(fmt.Sprintf("grpc.health.v1.%s", serviceName), healthpb.HealthCheckResponse_SERVING)

return grpcServer.Serve(ln)
})

select {
case <-interrupt:
break
case <-ctx.Done():
break
}

logger.Warnf("received shutdown signal")

// 创建一个新的Context,等待各个服务释放资源
timeout, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFunc()
for _, shutdown := range shutdownFunctions {
shutdown(timeout)
}

err := g.Wait()
if err != nil {
logger.Errorf("server returning an error, error: %v", err)
os.Exit(2)
}
}

tcp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
cmdAddr, _ := net.ResolveTCPAddr("tcp", n.cfg.Addr)
lcmd, err := net.ListenTCP("tcp", cmdAddr)
if err != nil {
log.Fatalln(err)
}
defer lcmd.Close()
quitChan := make(chan os.Signal, 1)
signal.Notify(quitChan, os.Interrupt, os.Kill, syscall.SIGTERM)
wg := sync.WaitGroup{}
for {
select {
case <-quitChan:
lcmd.Close()
wg.Wait()
return
default:
}
lcmd.SetDeadline(time.Now().Add(1e9))
conn, err := lcmd.AcceptTCP()
if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
continue
}
if err != nil {
log.WithError(err).Errorln("Listener accept")
continue
}
wg.Add(1)
go func(){
wg.Done()
n.handleRequest(conn)
}
}