docker1.12-containerd源码分析
从原openstack转型至docker已有一段时间。更稳定的使用docker了解docker的各流程,从源代码层面了解下containerd。本文基于docker 1.12版本,从1.11开始docker已拆分docker daemon
containerd源码流程图
源码接口调用详情
A)第一步从ctr入口至API接口
checkpoint(用于快照,docker目前该功能不完善)
|
1
2
3
|
list -->
/types
.API
/ListCheckpoint
create -->
/types
.API
/CreateCheckpoint
delete -->
/types
.API
/DeleteCheckpoint
|
containers
|
1
2
3
4
5
6
7
8
9
|
list、state -->
/types
.API
/State
pause、resume、update -->
/types
.API
/UpdateContainer
create -->
/types
.API
/CreateContainer
stats -->
/types
.API
/Stats
watch
-->
/types
.API
/State
、
/types
.API
/Events
exec
-->
/types
.API
/Events
、
/types
.API
/AddProcess
、
/types
.API
/UpdateProcess
kill
-->
/types
.API
/Signal
start -->
/types
.API
/Events
、
/types
.API
/CreateContainer
、
/types
.API
/UpdateProcess
update -->
/types
.API
/UpdateContainer
|
events
|
1
|
/types
.API
/Events
|
state
|
1
|
/types
.API
/State
|
version
|
1
|
/types
.API
/GetServerVersion
--
return
result
|
B)第二步从API接口至supervisor任务处理
|
1
|
注:API--server.go --> daemon – supervisor.go(handleTask func)
|
checkpoint
|
1
2
3
|
/types
.API
/ListCheckpoint
(supervisor.GetContainersTask)--> getContainers
/types
.API
/CreateCheckpoint
--> createCheckpoint
/types
.API
/DeleteCheckpoint
--> deleteCheckpoint
|
containers
|
1
2
3
4
5
6
7
|
/types
.API
/State
/types
.API
/Stats
(supervisor.GetContainersTask)--> getContainers
/types
.API
/UpdateContainer
(supervisor.UpdateTask)-->updateContainer
/types
.API
/CreateContainer
(supervisor.StartTask)-->start
/types
.API
/Events
--> Events --
return
result
/types
.API
/AddProcess
-->addProcess
/types
.API
/UpdateProcess
-->updateProcess
/types
.API
/Signal
-->signal
|
C)第三步从任务队列至runtime至runc
checkpoint
|
1
2
3
|
getContainers --
return
result
createCheckpoint -->(runtime)CheckPoint -->
exec
.Command(c.runtime,arg....)
deleteCheckpoint -->(runtime)DeleteCheckpoint --
return
result
|
containers
|
1
2
3
4
5
6
|
getContainers --
return
result
updateContainer -->(runtime)Resume Pause UpdateResources-->
exec
.Command(c.runtime,arg....)
start -->(runtime supervisor
/worker
.go) Start -->
exec
.Command(c.shim,c.
id
,c.bundle,c.runtime)
addProcess -->(runtime)
exec
-->
exec
.Command(c.shim,c.
id
,c.bundle,c.runtime)
updateProcess -->
return
result
signal -->
return
result
|
createContainer示例
deamon启动监听tasks及startTasks进程
a)进入main.go main方法调用daemon方法
|
1
2
3
4
5
6
7
|
app.Action = func(context *cli.Context) {
if
err := daemon(context); err != nil {
logrus.Fatal(err)
}
}
|
b)进入main.go daemon方法
|
1
2
3
4
5
6
7
8
|
for
i := 0; i < 10; i++ {
wg.Add(1)
w := supervisor.NewWorker(sv, wg)
go w.Start()
}
if
err := sv.Start(); err != nil {
return
err
}
|
c)初始化supervisor/worker.go NewWorker并启动监听startTask并处理
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
func NewWorker(s *Supervisor, wg *
sync
.WaitGroup) Worker {
return
&worker{
s: s,
wg: wg,
}
}
func (w *worker) Start() {
defer w.wg.Done()
for
t := range w.s.startTasks {
started :=
time
.Now()
process, err := t.Container.Start(t.CheckpointPath, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr))
if
err != nil {
logrus.WithFields(logrus.Fields{
"error"
: err,
"id"
: t.Container.ID(),
}).Error(
"containerd: start container"
)
t.Err <- err
evt := &DeleteTask{
ID: t.Container.ID(),
NoEvent:
true
,
Process: process,
}
w.s.SendTask(evt)
continue
}
|
d)启动supervisor/supervisor.go task监听task并处理
|
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func (s *Supervisor) Start() error {
logrus.WithFields(logrus.Fields{
"stateDir"
: s.stateDir,
"runtime"
: s.runtime,
"runtimeArgs"
: s.runtimeArgs,
"memory"
: s.machine.Memory,
"cpus"
: s.machine.Cpus,
}).Debug(
"containerd: supervisor running"
)
go func() {
for
i := range s.tasks {
s.handleTask(i)
}
|
containers容器创建示例
Ctl控制台命令入口
ctr/main.go containersCommand
|
1
2
3
4
5
6
7
8
9
10
11
|
execCommand,
killCommand,
listCommand,
pauseCommand,
resumeCommand,
startCommand,
stateCommand,
statsCommand,
watchCommand,
updateCommand,
|
ctr/container.go
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
var startCommand = cli.Command{
Name:
"start"
,
Usage:
"start a container"
,
ArgsUsage: "ID BundlePath”, ————…...
events, err := c.Events(netcontext.Background(), &types.EventsRequest{})/*事件创建*/
if
err != nil {
fatal(err.Error(), 1)
}
if
_, err := c.CreateContainer(netcontext.Background(), r); err != nil {/*容器创建*/
fatal(err.Error(), 1)
}
if
context.Bool(
"attach"
) {
go func() {
io.Copy(stdin, os.Stdin)
if
_, err := c.UpdateProcess(netcontext.Background(), &types.UpdateProcessRequest{/*更新进程*/
Id:
id
,
Pid:
"init"
,
CloseStdin:
true
,
}); err != nil {
fatal(err.Error(), 1)
}
restoreAndCloseStdin()
}()
if
tty
{
resize(
id
,
"init"
, c)
go func() {
s :=
make
(chan os.Signal, 64)
signal.Notify(s, syscall.SIGWINCH)
for
range s {
if
err := resize(
id
,
"init"
, c); err != nil {
log.Println(err)
}
}
}()
}
waitForExit(c, events,
id
,
"init"
, restoreAndCloseStdin)
}
},
|
api处理
api/grpc/types/api.pb.go
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
func (c *aPIClient) Events(ctx context.Context,
in
*EventsRequest, opts ...grpc.CallOption) (API_EventsClient, error) {
stream, err := grpc.NewClientStream(ctx, &_API_serviceDesc.Streams[0], c.cc,
"/types.API/Events"
, opts...)
if
err != nil {
return
nil, err
}
x := &aPIEventsClient{stream}
if
err := x.ClientStream.SendMsg(
in
); err != nil {
return
nil, err
}
if
err := x.ClientStream.CloseSend(); err != nil {
return
nil, err
}
return
x, nil
}
func (c *aPIClient) CreateContainer(ctx context.Context,
in
*CreateContainerRequest, opts ...grpc.CallOption) (*CreateContainerResponse, error) {
out := new(CreateContainerResponse)
err := grpc.Invoke(ctx,
"/types.API/CreateContainer"
,
in
, out, c.cc, opts...)
if
err != nil {
return
nil, err
}
return
out, nil
}
func (c *aPIClient) UpdateProcess(ctx context.Context,
in
*UpdateProcessRequest, opts ...grpc.CallOption) (*UpdateProcessResponse, error) {
out := new(UpdateProcessResponse)
err := grpc.Invoke(ctx,
"/types.API/UpdateProcess"
,
in
, out, c.cc, opts...)
if
err != nil {
return
nil, err
}
return
out, nil
}
|
api/grpc/types/api.pb.go
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
func _API_Events_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(EventsRequest)
if
err := stream.RecvMsg(m); err != nil {
return
err
}
return
srv.(APIServer).Events(m, &aPIEventsServer{stream})
}
func _API_CreateContainer_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in
:= new(CreateContainerRequest)
if
err := dec(
in
); err != nil {
return
nil, err
}
if
interceptor == nil {
return
srv.(APIServer).CreateContainer(ctx,
in
)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod:
"/types.API/CreateContainer"
,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return
srv.(APIServer).CreateContainer(ctx, req.(*CreateContainerRequest))
}
return
interceptor(ctx,
in
, info, handler)
}
func _API_UpdateProcess_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in
:= new(UpdateProcessRequest)
if
err := dec(
in
); err != nil {
return
nil, err
}
if
interceptor == nil {
return
srv.(APIServer).UpdateProcess(ctx,
in
)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod:
"/types.API/UpdateProcess"
,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return
srv.(APIServer).UpdateProcess(ctx, req.(*UpdateProcessRequest))
}
return
interceptor(ctx,
in
, info, handler)
|
api/grpc/server/server.go 进入第一步中的tasks及sendTasks处理队列
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer) error {
events := s.sv.Events(t, r.StoredOnly, r.Id)
func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContainerRequest) (*types.CreateContainerResponse, error) {
s.sv.SendTask(e)
apiC, err := createAPIContainer(r.Container,
false
)
func (s *apiServer) UpdateProcess(ctx context.Context, r *types.UpdateProcessRequest) (*types.UpdateProcessResponse, error) {
e := &supervisor.UpdateProcessTask{}
e.ID = r.Id
e.PID = r.Pid
e.Height = int(r.Height)
e.Width = int(r.Width)
e.CloseStdin = r.CloseStdin
s.sv.SendTask(e)
if
err := <-e.ErrorCh(); err != nil {
return
nil, err
}
return
&types.UpdateProcessResponse{}, nil
}
|
supervisor/create.go
|
1
2
|
func (s *Supervisor) start(t *StartTask) error {
s.startTasks <- task
|
supervisor/worker.go
|
1
2
3
|
func (w *worker) Start() {
defer w.wg.Done()
for
t := range w.s.startTasks {
|
runtime/container.go
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
func (c *container) Start(checkpointPath string, s Stdio) (Process, error) {
processRoot := filepath.Join(c.root, c.
id
, InitProcessID)
if
err := os.Mkdir(processRoot, 0755); err != nil {
return
nil, err
}
cmd :=
exec
.Command(c.shim,
c.
id
, c.bundle, c.runtime,
) ---执行 docker-containerd-shim命令
cmd.Dir = processRoot
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid:
true
,
}
spec, err := c.readSpec()
if
err != nil {
return
nil, err
}
config := &processConfig{
checkpoint: checkpointPath,
root: processRoot,
id
: InitProcessID,
c: c,
stdio: s,
spec: spec,
processSpec: specs.ProcessSpec(spec.Process),
}
p, err := newProcess(config)
if
err != nil {
return
nil, err
}
if
err := c.createCmd(InitProcessID, cmd, p); err != nil {
return
nil, err
}
return
p, nil
}
|
containerd-shim接收后处理
containerd-shim/main.go
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
func start(log *os.File) error {
p, err := newProcess(flag.Arg(0), flag.Arg(1), flag.Arg(2))
if
err != nil {
return
err
}
defer func() {
if
err := p.Close(); err != nil {
writeMessage(log,
"warn"
, err)
}
}()
if
err := p.create(); err != nil {
p.delete()
return
err
}
|
containerd-shim/process.go跳转执行runc命令
|
1
2
3
|
func (p *process) create() error {
cmd :=
exec
.Command(p.runtime, args...)
|
