koukiblog

たぶんweb系の話題

kubectl execの仕組みを追える範囲で追ってみた

kubernetesを利用しているとき、 kubectl exec -it "pod_name" /bin/bash で目的のコンテナにbashログインすることができますが、これがなぜ動いているのか理解できていなったので調べてみました。

kubectl execとsshと何が違うのか?(sshと同じセキュリティ対策が必要ではないのか??)と聞かれたときに違うとは思うんだけどどう違うのかうまく答えられなくて気になってしまったのがきっかけです。

概要

調べてみたところ既に質問していた人がいました。

Google グループ

このフォーラムによると、kubectl exec は以下のように流れていくようです。

  1. kube-apiserver(以下apiserver)にPOSTリクエストを投げる( /v1/namespaces/{ns}/pods/{pod}/exec )
  2. このリクエストには SPDYへのアップグレードが含まれている(SPDYによる多重化通信が必要なため)
  3. apiserverはPodに接続するため、kubeletとの通信を確立させる
  4. kubeletは有効期限の短いtokenを発行し、CRIにリダイレクトさせる
  5. CRIハンドラーがリクエストを実行し、docker exec APIコールを行う

kubectl execコマンドは最終的にはPodが存在するNodeでdocker exec コマンドが実行される、と覚えておけばよさそうです。(CRIインタフェースに対応していればコンテナランタイムは問わないので、docker execとは限りませんが)

ソースコードを追ってみる

Google Forumに書かれていることが本当なのかわからなかったので、念のためソースコードを追ってみます。最近仕事でGo使いだしたのできっと読めるはず・・

kubectl

POSTして

kubernetes/exec.go at master · kubernetes/kubernetes · GitHub

SPDY通信してそう。

kubernetes/exec.go at master · kubernetes/kubernetes · GitHub

/v1/namespaces/{ns}/pods/{pod}/execでPOSTしている箇所は見つけることができました

api-server

/v1/namespaces/{ns}/pods/{pod}/execで実行されるコードを見つけることはできませんでした。。

kubernetes/subresources.go at 245f360a45e8ad3e241593b099973cf3ce213371 · kubernetes/kubernetes · GitHub

しかし、おそらくapiserverとkubeletが通信するendpointを作成しているコードを見つけることができました。

kubernetes/strategy.go at 152b09ac550d50deeeff7162093332b4f7f0397d · kubernetes/kubernetes · GitHub

   loc := &url.URL{
        Scheme:   nodeInfo.Scheme,
        Host:     net.JoinHostPort(nodeInfo.Hostname, nodeInfo.Port),
        Path:     fmt.Sprintf("/%s/%s/%s/%s", path, pod.Namespace, pod.Name, container),
        RawQuery: params.Encode(),
    }

/exec/{namespace}/{name}/{container} というpathでNodeと通信するURLを作成していそうです。Nodeに存在しているのはkubeletのはずなので、次はkubeletで /exec/{namespace}/{name}/{container} というエンドポイントで待ち受けているコードを探してみます。

kubelet

https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/server/server.go#L352

// getExec handles requests to run a command inside a container. なんかそれっぽい!

https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/server/server.go#L707

func (s *Server) getExec(request *restful.Request, response *restful.Response) {
        ~~~
    url, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts)
        ~~~
}

hostのgetExecを読んでいます。Hostに何が入るのかわからないですが、getExecを実装している箇所をさらに検索します。

https://github.com/kubernetes/kubernetes/blob/5bfea15e7b8bde1a9bb891513d8d0c9812ab8358/pkg/kubelet/kubelet_pods.go

hostは省略しますが、

func (kl *Kubelet) GetExec(podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) (*url.URL, error) {
    container, err := kl.findContainer(podFullName, podUID, containerName)
    if err != nil {
        return nil, err
    }
    if container == nil {
        return nil, fmt.Errorf("container not found (%q)", containerName)
    }
    return kl.streamingRuntime.GetExec(container.ID, cmd, streamOpts.Stdin, streamOpts.Stdout, streamOpts.Stderr, streamOpts.TTY)
}

Kubelet.streamingRuntimeのGetExecが呼ばれています。

https://github.com/kubernetes/kubernetes/blob/72be2f40b73d1fbb98a8ed9bc58f0081052105eb/pkg/kubelet/kubelet.go#L665

streamingRuntimeの定義はこれです

   runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
           ~~~
    )
    if err != nil {
        return nil, err
    }
    klet.containerRuntime = runtime
    klet.streamingRuntime = runtime

https://github.com/kubernetes/kubernetes/blob/c2a326fdaa839b6b07e1d94c6fade28f156ae4a8/pkg/kubelet/kuberuntime/kuberuntime_container.go#L755

streamingRuntime.GetExecでは下記のGetExecが実行されることになります

func (m *kubeGenericRuntimeManager) GetExec(id kubecontainer.ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) {
        ~~
    resp, err := m.runtimeService.Exec(req)
        ~~
}

ここでは、runtimeServiceのExec(req) を実行しています。runtimeServiceが何なのか調べます。

https://github.com/kubernetes/kubernetes/blob/c2a326fdaa839b6b07e1d94c6fade28f156ae4a8/pkg/kubelet/kuberuntime/kuberuntime_manager.go#L112

 // gRPC service clients
    runtimeService internalapi.RuntimeService

runtimeService internalapi.RuntimeService とあるので、 internalapi を見てみると

internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri

ついにCRIまでたどり着きました。

https://github.com/kubernetes/kubernetes/blob/72be2f40b73d1fbb98a8ed9bc58f0081052105eb/pkg/kubelet/kubelet.go#L617

 switch containerRuntime {
    case kubetypes.DockerContainerRuntime:
        // Create and start the CRI shim running as a grpc server.
        streamingConfig := getStreamingConfig(kubeCfg, kubeDeps, crOptions)
        ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig,
            &pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory, !crOptions.RedirectContainerStreaming)
        if err != nil {
            return nil, err
        }
        if crOptions.RedirectContainerStreaming {
            klet.criHandler = ds
        }

        // The unix socket for kubelet <-> dockershim communication.
        klog.V(5).Infof("RemoteRuntimeEndpoint: %q, RemoteImageEndpoint: %q",
            remoteRuntimeEndpoint,
            remoteImageEndpoint)
        klog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
        server := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds)
        if err := server.Start(); err != nil {
            return nil, err
        }

                ~~~~

criHandlerにDockerServiceが設定されています。

https://github.com/kubernetes/kubernetes/blob/a2a5bd03fdae6ac45509560157f4c24d1c001e86/pkg/kubelet/dockershim/docker_service.go#L284

DockerServiceの定義はこれです。 ここで詰まってしまったのですが、kubeletがexec用にtokenを発行しているという手掛かりを元にコードを検索してみます。

https://github.com/kubernetes/kubernetes/blob/7f23a743e8c23ac6489340bbb34fa6f1d392db9d/pkg/kubelet/server/streaming/server.go#L129

   endpoints := []struct {
        path    string
        handler restful.RouteFunction
    }{
        {"/exec/{token}", s.serveExec},
        {"/attach/{token}", s.serveAttach},
        {"/portforward/{token}", s.servePortForward},
    }

見つかりました。

exec/{token} というURLでserveExec を実行しています。どこかでこの streaming packageのNewServer(config, runtime)が実行されていると思うのですが見つかりませんでした。 GetExecが実行されてからどこかでこのserveExecにたどり着くんだと思います。

https://github.com/kubernetes/kubernetes/blob/7f23a743e8c23ac6489340bbb34fa6f1d392db9d/pkg/kubelet/server/streaming/server.go#L288

   remotecommandserver.ServeExec(
        resp.ResponseWriter,
        req.Request,
        s.runtime,
        "", // unused: podName
        "", // unusued: podUID
        exec.ContainerId,
        exec.Cmd,
        streamOpts,
        s.config.StreamIdleTimeout,
        s.config.StreamCreationTimeout,
        s.config.SupportedRemoteCommandProtocols)

serveExecはremotecommandserverのServeExecを呼び出します。

importに

   remotecommandserver "k8s.io/kubernetes/pkg/kubelet/server/remotecommand"

が含まれているので remotecommandを見に行きます。

https://github.com/kubernetes/kubernetes/blob/7f23a743e8c23ac6489340bbb34fa6f1d392db9d/pkg/kubelet/server/remotecommand/exec.go#L52

func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
  ~~
    err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan, 0)
  ~~
}

executorのExecInContainerが実行されています。

https://github.com/kubernetes/kubernetes/blob/7f23a743e8c23ac6489340bbb34fa6f1d392db9d/pkg/kubelet/server/streaming/server.go#L372

func (a *criAdapter) ExecInContainer(podName string, podUID types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
    return a.Runtime.Exec(container, cmd, in, out, err, tty, resize)
}

ExecInContainerはRuntimeのExec(container, cmd, in, out, err, tty, resize)を実行しています。 dockershimにExecがあればそれが処理の実態になっていそうです。

dockershim

https://github.com/kubernetes/kubernetes/blob/a3ccea9d8743f2ff82e41b6c2af6dc2c41dc7b10/pkg/kubelet/dockershim/docker_streaming.go#L49

func (r *streamingRuntime) Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
    return r.exec(containerID, cmd, in, out, err, tty, resize, 0)
}

// Internal version of Exec adds a timeout.
func (r *streamingRuntime) exec(containerID string, cmd []string, in io.Reader, out, errw io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
    container, err := checkContainerStatus(r.client, containerID)
    if err != nil {
        return err
    }
    return r.execHandler.ExecInContainer(r.client, container, cmd, in, out, errw, tty, resize, timeout)
}

Execから execHandlerのExecInContainerを呼んでいます。

https://github.com/kubernetes/kubernetes/blob/a3ccea9d8743f2ff82e41b6c2af6dc2c41dc7b10/pkg/kubelet/dockershim/exec.go#L61

func (*NativeExecHandler) ExecInContainer(client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
    done := make(chan struct{})
    defer close(done)

    createOpts := dockertypes.ExecConfig{
        Cmd:          cmd,
        AttachStdin:  stdin != nil,
        AttachStdout: stdout != nil,
        AttachStderr: stderr != nil,
        Tty:          tty,
    }
    execObj, err := client.CreateExec(container.ID, createOpts)

exec handler のExecInContainerで docker execしてるコードにたどり着きました。長かった。。ほとんど自己満足ですが、前述のフォーラムの情報は間違ってなさそうだと思えるところまで調べることが出来ました。

CreateExecの中身も気になりましたがさすがに気力なくなりました。。 getExecからServceExecのところは完全に流れを見失ってしまったのでいずれ補完したいです。

Goは書いてると大量のコードを書くことになってつらいですが、読むのは楽ですね。途中詰まってしまいましたが、API通信を挟むところでコードだけでは追いきれなくなってしまうのは言語問わずそうなのでGoの問題ではないと思います。

参考

api-server, kubeletについてはこちら

Kubernetes: 構成コンポーネント一覧 - Qiita

CRIってなんだっけ?というの調べるのにはここを参照しました

コンテナランタイムの動向を整理してみた件 - Qiita