Вступление

Данная статья является третьей в цикле (1,2), посвященном изучению исходного кода Docker и прямым продолжением предыдущей статьи, в которой мы начали разбирать код первого публичного релиза Docker v0.1.0. В этой части будет рассмотрена реализация практически всех команд, а в конце, мы создадим образ и запустим докер контейнер на его основе. Для удобства я постарался разбить список команд на условные группы: работа с образами, работа с контейнерами, сетевой стек и т.д. 

А теперь, как говорится, “without further ado”, приступим к изучению кода из файла commands.go начиная с команд для управления образами (images).

Управление образами

Import

Команда import позволяет импортировать образ файловой системы из tar архива, подаваемого на stdin, или же загрузить его по url:

CmdImport
func (srv *Server) CmdImport(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
	cmd := rcli.Subcmd(stdout, "import", "[OPTIONS] URL|- [REPOSITORY [TAG]]", "Create a new filesystem image from the contents of a tarball")
	var archive io.Reader
	var resp *http.Response

	if err := cmd.Parse(args); err != nil {
		return nil
	}
	src := cmd.Arg(0)
	if src == "" {
		return errors.New("Not enough arguments")
	} else if src == "-" {
		archive = stdin
	} else {
		u, err := url.Parse(src)
		if err != nil {
			return err
		}
		if u.Scheme == "" {
			u.Scheme = "http"
			u.Host = src
			u.Path = ""
		}
		fmt.Fprintf(stdout, "Downloading from %s\n", u.String())
		// Download with curl (pretty progress bar)
		// If curl is not available, fallback to http.Get()
		resp, err = Download(u.String(), stdout)
		if err != nil {
			return err
		}
		archive = ProgressReader(resp.Body, int(resp.ContentLength), stdout)
	}
	img, err := srv.runtime.graph.Create(archive, nil, "Imported from "+src)
	if err != nil {
		return err
	}
	// Optionally register the image at REPO/TAG
	if repository := cmd.Arg(1); repository != "" {
		tag := cmd.Arg(2) // Repository will handle an empty tag properly
		if err := srv.runtime.repositories.Set(repository, tag, img.Id, true); err != nil {
			return err
		}
	}
	fmt.Fprintln(stdout, img.Id)
	return nil
}

После стандартного разбора аргументов функция определяет место откуда нужно импортировать образ: "-" означает stdin, в другом случае аргумент рассматривается, как url. Хелпер методы Download и ProgressReader для загрузки архива по http находятся в файле utils.go:

utils.go
// Request a given URL and return an io.Reader
func Download(url string, stderr io.Writer) (*http.Response, error) {
	var resp *http.Response
	var err error = nil
	if resp, err = http.Get(url); err != nil {
		return nil, err
	}
	if resp.StatusCode >= 400 {
		return nil, errors.New("Got HTTP status code >= 400: " + resp.Status)
	}
	return resp, nil
} 

type progressReader struct {
	reader        io.ReadCloser // Stream to read from
	output        io.Writer     // Where to send progress bar to
	read_total    int           // Expected stream length (bytes)
	read_progress int           // How much has been read so far (bytes)
	last_update   int           // How many bytes read at least update
}

func (r *progressReader) Read(p []byte) (n int, err error) {
	read, err := io.ReadCloser(r.reader).Read(p)
	r.read_progress += read

	// Only update progress for every 1% read
	update_every := int(0.01 * float64(r.read_total))
	if r.read_progress-r.last_update > update_every || r.read_progress == r.read_total {
		fmt.Fprintf(r.output, "%d/%d (%.0f%%)\r",
			r.read_progress,
			r.read_total,
			float64(r.read_progress)/float64(r.read_total)*100)
		r.last_update = r.read_progress
	}
	// Send newline when complete
	if err == io.EOF {
		fmt.Fprintf(r.output, "\n")
	}

	return read, err
}
func (r *progressReader) Close() error {
	return io.ReadCloser(r.reader).Close()
}
func ProgressReader(r io.ReadCloser, size int, output io.Writer) *progressReader {
	return &progressReader{r, output, size, 0, 0}
}

Далее управление переходит в функцию graph.Create из файла graph.go:

graph.Create
func (graph *Graph) Create(layerData Archive, container *Container, comment string) (*Image, error) {
	img := &Image{
		Id:      GenerateId(),
		Comment: comment,
		Created: time.Now(),
	}
	if container != nil {
		img.Parent = container.Image
		img.Container = container.Id
		img.ContainerConfig = *container.Config
	}
	if err := graph.Register(layerData, img); err != nil {
		return nil, err
	}
	return img, nil
}

Здесь генерируется уникальный идентификатор образа и инициализируется структура Image, которая далее вместе с данными архива передается в метод graph.Register. Если дополнительно передан и контейнер, то ссылка на его образ будет сохранена в поле img.Parent - это используется в команде Commit, создающей новый образ из текущего контейнера. Структура Image и функции для генерации Id на основе SHA256 находятся в файле image.go:

image.go
type Image struct {
	Id              string    `json:"id"`
	Parent          string    `json:"parent,omitempty"`
	Comment         string    `json:"comment,omitempty"`
	Created         time.Time `json:"created"`
	Container       string    `json:"container,omitempty"`
	ContainerConfig Config    `json:"container_config,omitempty"`
	graph           *Graph
}

func GenerateId() string {
	// FIXME: don't seed every time
	rand.Seed(time.Now().UTC().UnixNano())
	randomBytes := bytes.NewBuffer([]byte(fmt.Sprintf("%x", rand.Int())))
	id, _ := ComputeId(randomBytes) // can't fail
	return id
}

// ComputeId reads from `content` until EOF, then returns a SHA of what it read, as a string.
func ComputeId(content io.Reader) (string, error) {
	h := sha256.New()
	if _, err := io.Copy(h, content); err != nil {
		return "", err
	}
	return fmt.Sprintf("%x", h.Sum(nil)[:8]), nil
}

Далее взглянем на метод graph.Register:

graph.Register
func (graph *Graph) Register(layerData Archive, img *Image) error {
	if err := ValidateId(img.Id); err != nil {
		return err
	}
	// (This is a convenience to save time. Race conditions are taken care of by os.Rename)
	if graph.Exists(img.Id) {
		return fmt.Errorf("Image %s already exists", img.Id)
	}
	tmp, err := graph.Mktemp(img.Id)
	defer os.RemoveAll(tmp)
	if err != nil {
		return fmt.Errorf("Mktemp failed: %s", err)
	}
	if err := StoreImage(img, layerData, tmp); err != nil {
		return err
	}
	// Commit
	if err := os.Rename(tmp, graph.imageRoot(img.Id)); err != nil {
		return err
	}
	img.graph = graph
	return nil
}

После валидации id на наличие запрещенного символа ":" (так как он является разделителем для тега), создается временная папка для модификаций, а затем вызывается функция StoreImage, в которой происходит создание образа. По завершению временная папка переименовывается в img.Id:

StoreImage
func StoreImage(img *Image, layerData Archive, root string) error {
	// Check that root doesn't already exist
	if _, err := os.Stat(root); err == nil {
		return fmt.Errorf("Image %s already exists", img.Id)
	} else if !os.IsNotExist(err) {
		return err
	}
	// Store the layer
	layer := layerPath(root)
	if err := os.MkdirAll(layer, 0700); err != nil {
		return err
	}
	if err := Untar(layerData, layer); err != nil {
		return err
	}
	// Store the json ball
	jsonData, err := json.Marshal(img)
	if err != nil {
		return err
	}
	if err := ioutil.WriteFile(jsonPath(root), jsonData, 0600); err != nil {
		return err
	}
	return nil
}

func layerPath(root string) string {
	return path.Join(root, "layer")
}

func jsonPath(root string) string {
	return path.Join(root, "json")
}

В StoreImage создается директория layer, в которую помещается распакованный при помощи функции Untar архив файловой системы, после чего структура Image экспортируется в json и сохраняется в соседний файл, как метаданные для образа. Функции Tar и Untar для работы с архивами находятся в файле archive.go и представляют собой лишь удобные обертки над утилитой bsdtar:

archive.go
type Archive io.Reader

type Compression uint32

const (
	Uncompressed Compression = iota
	Bzip2
	Gzip
)

func (compression *Compression) Flag() string {
	switch *compression {
	case Bzip2:
		return "j"
	case Gzip:
		return "z"
	}
	return ""
}

func Tar(path string, compression Compression) (io.Reader, error) {
	cmd := exec.Command("bsdtar", "-f", "-", "-C", path, "-c"+compression.Flag(), ".")
	return CmdStream(cmd)
}

func Untar(archive io.Reader, path string) error {
	cmd := exec.Command("bsdtar", "-f", "-", "-C", path, "-x")
	cmd.Stdin = archive
	output, err := cmd.CombinedOutput()
	if err != nil {
		return errors.New(err.Error() + ": " + string(output))
	}
	return nil
}

func CmdStream(cmd *exec.Cmd) (io.Reader, error) {
	stdout, err := cmd.StdoutPipe()
	if err != nil {
		return nil, err
	}
	stderr, err := cmd.StderrPipe()
	if err != nil {
		return nil, err
	}
	pipeR, pipeW := io.Pipe()
	go func() {
		_, err := io.Copy(pipeW, stdout)
		if err != nil {
			pipeW.CloseWithError(err)
		}
		errText, e := ioutil.ReadAll(stderr)
		if e != nil {
			errText = []byte("(...couldn't fetch stderr: " + e.Error() + ")")
		}
		if err := cmd.Wait(); err != nil {
			// FIXME: can this block if stderr outputs more than the size of StderrPipe()'s buffer?
			pipeW.CloseWithError(errors.New(err.Error() + ": " + string(errText)))
		} else {
			pipeW.Close()
		}
	}()
	if err := cmd.Start(); err != nil {
		return nil, err
	}
	return pipeR, nil
} 

Отметим, что функция import также может сохранять tag образа, принимая его опциональным параметром, но этот функционал мы рассмотрим позже, когда до него дойдет очередь.

Export

Команда export возвращает экспортированный архив файловой системы контейнера:

CmdExport
func (srv *Server) CmdExport(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
	cmd := rcli.Subcmd(stdout,
		"export", "CONTAINER",
		"Export the contents of a filesystem as a tar archive")
	if err := cmd.Parse(args); err != nil {
		return nil
	}
	name := cmd.Arg(0)
	if container := srv.runtime.Get(name); container != nil {
		data, err := container.Export()
		if err != nil {
			return err
		}
		// Stream the entire contents of the container (basically a volatile snapshot)
		if _, err := io.Copy(stdout, data); err != nil {
			return err
		}
		return nil
	}
	return errors.New("No such container: " + name)
}

func (container *Container) Export() (Archive, error) {
	if err := container.EnsureMounted(); err != nil {
		return nil, err
	}
	return Tar(container.RootfsPath(), Uncompressed)
}

Функция по переданному имени получает контейнер и вызывает у него метод container.Export, который в свою очередь просто возвращает созданный архив, смонтированной директории Rootfs. Код функции Tar был приведен выше в файле archive.go.

Rmi

Удаляет переданный список образов, вызывая метод graph.Delete:

graph.Delete
func (srv *Server) CmdRmi(stdin io.ReadCloser, stdout io.Writer, args ...string) (err error) {
	cmd := rcli.Subcmd(stdout, "rmimage", "[OPTIONS] IMAGE", "Remove an image")
	if cmd.Parse(args) != nil || cmd.NArg() < 1 {
		cmd.Usage()
		return nil
	}
	for _, name := range cmd.Args() {
		if err := srv.runtime.graph.Delete(name); err != nil {
			return err
		}
	}
	return nil
}

func (graph *Graph) Delete(id string) error {
	garbage, err := graph.Garbage()
	if err != nil {
		return err
	}
	return os.Rename(graph.imageRoot(id), garbage.imageRoot(id))
}

func (graph *Graph) Garbage() (*Graph, error) {
	return NewGraph(path.Join(graph.Root, ":garbage:"))
}

В реальности, graph.Delete перемещает их в папку :garbage:, для возможности последующего восстановления, но данная функция здесь не используется.

Images

Возвращает таблицу со списком имеющихся образов:

CmdImages
func (srv *Server) CmdImages(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
	cmd := rcli.Subcmd(stdout, "images", "[OPTIONS] [NAME]", "List images")
	//limit := cmd.Int("l", 0, "Only show the N most recent versions of each image")
	quiet := cmd.Bool("q", false, "only show numeric IDs")
	fl_a := cmd.Bool("a", false, "show all images")
	if err := cmd.Parse(args); err != nil {
		return nil
	}
	if cmd.NArg() > 1 {
		cmd.Usage()
		return nil
	}
	var nameFilter string
	if cmd.NArg() == 1 {
		nameFilter = cmd.Arg(0)
	}
	w := tabwriter.NewWriter(stdout, 20, 1, 3, ' ', 0)
	if !*quiet {
		fmt.Fprintf(w, "REPOSITORY\tTAG\tID\tCREATED\tPARENT\n")
	}
	var allImages map[string]*Image
	var err error
	if *fl_a {
		allImages, err = srv.runtime.graph.Map()
	} else {
		allImages, err = srv.runtime.graph.Heads()
	}
	if err != nil {
		return err
	}
	for name, repository := range srv.runtime.repositories.Repositories {
		if nameFilter != "" && name != nameFilter {
			continue
		}
		for tag, id := range repository {
			image, err := srv.runtime.graph.Get(id)
			if err != nil {
				log.Printf("Warning: couldn't load %s from %s/%s: %s", id, name, tag, err)
				continue
			}
			delete(allImages, id)
			if !*quiet {
				for idx, field := range []string{
					/* REPOSITORY */ name,
					/* TAG */ tag,
					/* ID */ id,
					/* CREATED */ HumanDuration(time.Now().Sub(image.Created)) + " ago",
					/* PARENT */ srv.runtime.repositories.ImageName(image.Parent),
				} {
					if idx == 0 {
						w.Write([]byte(field))
					} else {
						w.Write([]byte("\t" + field))
					}
				}
				w.Write([]byte{'\n'})
			} else {
				stdout.Write([]byte(image.Id + "\n"))
			}
		}
	}
	// Display images which aren't part of a
	if nameFilter == "" {
		for id, image := range allImages {
			if !*quiet {
				for idx, field := range []string{
					/* REPOSITORY */ "",
					/* TAG */ "",
					/* ID */ id,
					/* CREATED */ HumanDuration(time.Now().Sub(image.Created)) + " ago",
					/* PARENT */ srv.runtime.repositories.ImageName(image.Parent),
				} {
					if idx == 0 {
						w.Write([]byte(field))
					} else {
						w.Write([]byte("\t" + field))
					}
				}
				w.Write([]byte{'\n'})
			} else {
				stdout.Write([]byte(image.Id + "\n"))
			}
		}
	}
	if !*quiet {
		w.Flush()
	}
	return nil
}

В функции происходит простая итерация по полученным образам, фильтрация на основе имени репозитория и вывод полей в консольную таблицу. Map, Head и вспомогательные к ним функции, формирующие хеш таблицы allImages, находятся в файле graph.go:

graph.go
func (graph *Graph) Map() (map[string]*Image, error) {
	// FIXME: this should replace All()
	all, err := graph.All()
	if err != nil {
		return nil, err
	}
	images := make(map[string]*Image, len(all))
	for _, image := range all {
		images[image.Id] = image
	}
	return images, nil
}

func (graph *Graph) All() ([]*Image, error) {
	var images []*Image
	err := graph.WalkAll(func(image *Image) {
		images = append(images, image)
	})
	return images, err
}

func (graph *Graph) WalkAll(handler func(*Image)) error {
	files, err := ioutil.ReadDir(graph.Root)
	if err != nil {
		return err
	}
	for _, st := range files {
		if img, err := graph.Get(st.Name()); err != nil {
			// Skip image
			continue
		} else if handler != nil {
			handler(img)
		}
	}
	return nil
}

func (graph *Graph) ByParent() (map[string][]*Image, error) {
	byParent := make(map[string][]*Image)
	err := graph.WalkAll(func(image *Image) {
		image, err := graph.Get(image.Parent)
		if err != nil {
			return
		}
		if children, exists := byParent[image.Parent]; exists {
			byParent[image.Parent] = []*Image{image}
		} else {
			byParent[image.Parent] = append(children, image)
		}
	})
	return byParent, err
}

func (graph *Graph) Heads() (map[string]*Image, error) {
	heads := make(map[string]*Image)
	byParent, err := graph.ByParent()
	if err != nil {
		return nil, err
	}
	err = graph.WalkAll(func(image *Image) {
		// If it's not in the byParent lookup table, then
		// it's not a parent -> so it's a head!
		if _, exists := byParent[image.Id]; !exists {
			heads[image.Id] = image
		}
	})
	return heads, err
}

History

Отображает историю образа:

CmdHistory
func (srv *Server) CmdHistory(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
	cmd := rcli.Subcmd(stdout, "history", "[OPTIONS] IMAGE", "Show the history of an image")
	if cmd.Parse(args) != nil || cmd.NArg() != 1 {
		cmd.Usage()
		return nil
	}
	image, err := srv.runtime.repositories.LookupImage(cmd.Arg(0))
	if err != nil {
		return err
	}
	w := tabwriter.NewWriter(stdout, 20, 1, 3, ' ', 0)
	defer w.Flush()
	fmt.Fprintf(w, "ID\tCREATED\tCREATED BY\n")
	return image.WalkHistory(func(img *Image) error {
		fmt.Fprintf(w, "%s\t%s\t%s\n",
			srv.runtime.repositories.ImageName(img.Id),
			HumanDuration(time.Now().Sub(img.Created))+" ago",
			strings.Join(img.ContainerConfig.Cmd, " "),
		)
		return nil
	})
}

func (img *Image) WalkHistory(handler func(*Image) error) (err error) {
    currentImg := img
    for currentImg != nil {
        if handler != nil {
            if err := handler(currentImg); err != nil {
                return err
            }
        }
        currentImg, err = currentImg.GetParent()
        if err != nil {
            return fmt.Errorf("Error while getting parent image: %v", err)
        }
    }
    return nil
}

func (img *Image) GetParent() (*Image, error) {
    if img.Parent == "" {
        return nil, nil
    }
    if img.graph == nil {
        return nil, fmt.Errorf("Can't lookup parent of unregistered image")
    }
    return img.graph.Get(img.Parent)
} 

После получения структуры образа по переданному имени вызывается метод image.WalkHistory, который по цепочке обходит родительские образы, используя сохраненные ссылки Image.Parent, и выводит информацию в виде таблицы.

Commit

Создает новый образ на основе измененных данных файловой системы контейнера:

CmdCommit
func (srv *Server) CmdCommit(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
	cmd := rcli.Subcmd(stdout,
		"commit", "[OPTIONS] CONTAINER [REPOSITORY [TAG]]",
		"Create a new image from a container's changes")
	if err := cmd.Parse(args); err != nil {
		return nil
	}
	containerName, repository, tag := cmd.Arg(0), cmd.Arg(1), cmd.Arg(2)
	if containerName == "" {
		cmd.Usage()
		return nil
	}
	img, err := srv.runtime.Commit(containerName, repository, tag)
	if err != nil {
		return err
	}
	fmt.Fprintln(stdout, img.Id)
	return nil
}

// Commit creates a new filesystem image from the current state of a container.
// The image can optionally be tagged into a repository
func (runtime *Runtime) Commit(id, repository, tag string) (*Image, error) {
	container := runtime.Get(id)
	if container == nil {
		return nil, fmt.Errorf("No such container: %s", id)
	}
	// FIXME: freeze the container before copying it to avoid data corruption?
	// FIXME: this shouldn't be in commands.
	rwTar, err := container.ExportRw()
	if err != nil {
		return nil, err
	}
	// Create a new image from the container's base layers + a new layer from container changes
	img, err := runtime.graph.Create(rwTar, container, "")
	if err != nil {
		return nil, err
	}
	// Register the image if needed
	if repository != "" {
		if err := runtime.repositories.Set(repository, tag, img.Id, true); err != nil {
			return img, err
		}
	}
	return img, nil
}

Метод Commit получает структуру container по переданному имени, вызывает метод container.ExportRw, который возвращает архив с директорией rw, после чего передает его в метод graph.Create, который мы уже разбирали выше. Если передано имя репозитория и tag, то дополнительно будет создан tag образа. Этот функционал будет разобран ниже в команде Tag.

Tag

Функция создает tag образа в локальном репозитории. Используется при импортировании образа и коммите контейнера:

CmdTag
func (srv *Server) CmdTag(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
	cmd := rcli.Subcmd(stdout, "tag", "[OPTIONS] IMAGE REPOSITORY [TAG]", "Tag an image into a repository")
	force := cmd.Bool("f", false, "Force")
	if err := cmd.Parse(args); err != nil {
		return nil
	}
	if cmd.NArg() < 2 {
		cmd.Usage()
		return nil
	}
	return srv.runtime.repositories.Set(cmd.Arg(1), cmd.Arg(2), cmd.Arg(0), *force)
}

После обработки параметров вызывается метод repositories.Set из файла tags.go. Ниже я приведу содержание файла tags.go, который отвечает за весь этот функционал. Общий принцип работы довольно простой - структура TagStore имеет хеш таблицу(map) для соответствия тегов и образов. При добавлении нового тега он проходит валидацию на запрещенные символы и записывается в хеш таблицу. Далее структура TagStore экспортируется в json и сохраняется в файле на диске. При запуске докер структура загружается из этого файла и в дальнейшем на ее основе осуществляется поиск и фильтрация образов по именам и тегам:

tags.go
const DEFAULT_TAG = "latest"

type TagStore struct {
	path         string
	graph        *Graph
	Repositories map[string]Repository
}

type Repository map[string]string

func NewTagStore(path string, graph *Graph) (*TagStore, error) {
	abspath, err := filepath.Abs(path)
	if err != nil {
		return nil, err
	}
	store := &TagStore{
		path:         abspath,
		graph:        graph,
		Repositories: make(map[string]Repository),
	}
	// Load the json file if it exists, otherwise create it.
	if err := store.Reload(); os.IsNotExist(err) {
		if err := store.Save(); err != nil {
			return nil, err
		}
	} else if err != nil {
		return nil, err
	}
	return store, nil
}

func (store *TagStore) Save() error {
	// Store the json ball
	jsonData, err := json.Marshal(store)
	if err != nil {
		return err
	}
	if err := ioutil.WriteFile(store.path, jsonData, 0600); err != nil {
		return err
	}
	return nil
}

func (store *TagStore) Reload() error {
	jsonData, err := ioutil.ReadFile(store.path)
	if err != nil {
		return err
	}
	if err := json.Unmarshal(jsonData, store); err != nil {
		return err
	}
	return nil
}

func (store *TagStore) LookupImage(name string) (*Image, error) {
	img, err := store.graph.Get(name)
	if err != nil {
		// FIXME: standardize on returning nil when the image doesn't exist, and err for everything else
		// (so we can pass all errors here)
		repoAndTag := strings.SplitN(name, ":", 2)
		if len(repoAndTag) == 1 {
			repoAndTag = append(repoAndTag, DEFAULT_TAG)
		}
		if i, err := store.GetImage(repoAndTag[0], repoAndTag[1]); err != nil {
			return nil, err
		} else if i == nil {
			return nil, fmt.Errorf("No such image: %s", name)
		} else {
			img = i
		}
	}
	return img, nil
}

// Return a reverse-lookup table of all the names which refer to each image
// Eg. {"43b5f19b10584": {"base:latest", "base:v1"}}
func (store *TagStore) ById() map[string][]string {
	byId := make(map[string][]string)
	for repoName, repository := range store.Repositories {
		for tag, id := range repository {
			name := repoName + ":" + tag
			if _, exists := byId[id]; !exists {
				byId[id] = []string{name}
			} else {
				byId[id] = append(byId[id], name)
			}
		}
	}
	return byId
}

func (store *TagStore) ImageName(id string) string {
	if names, exists := store.ById()[id]; exists && len(names) > 0 {
		return names[0]
	}
	return id
}

func (store *TagStore) Set(repoName, tag, imageName string, force bool) error {
	img, err := store.LookupImage(imageName)
	if err != nil {
		return err
	}
	if tag == "" {
		tag = DEFAULT_TAG
	}
	if err := validateRepoName(repoName); err != nil {
		return err
	}
	if err := validateTagName(tag); err != nil {
		return err
	}
	if err := store.Reload(); err != nil {
		return err
	}
	var repo Repository
	if r, exists := store.Repositories[repoName]; exists {
		repo = r
	} else {
		repo = make(map[string]string)
		if old, exists := store.Repositories[repoName]; exists && !force {
			return fmt.Errorf("Tag %s:%s is already set to %s", repoName, tag, old)
		}
		store.Repositories[repoName] = repo
	}
	repo[tag] = img.Id
	return store.Save()
}

func (store *TagStore) Get(repoName string) (Repository, error) {
	if err := store.Reload(); err != nil {
		return nil, err
	}
	if r, exists := store.Repositories[repoName]; exists {
		return r, nil
	}
	return nil, nil
}

func (store *TagStore) GetImage(repoName, tag string) (*Image, error) {
	repo, err := store.Get(repoName)
	if err != nil {
		return nil, err
	} else if repo == nil {
		return nil, nil
	}
	if revision, exists := repo[tag]; exists {
		return store.graph.Get(revision)
	}
	return nil, nil
}

// Validate the name of a repository
func validateRepoName(name string) error {
	if name == "" {
		return fmt.Errorf("Repository name can't be empty")
	}
	if strings.Contains(name, ":") {
		return fmt.Errorf("Illegal repository name: %s", name)
	}
	return nil
}

// Validate the name of a tag
func validateTagName(name string) error {
	if name == "" {
		return fmt.Errorf("Tag name can't be empty")
	}
	if strings.Contains(name, "/") || strings.Contains(name, ":") {
		return fmt.Errorf("Illegal tag name: %s", name)
	}
	return nil
}

Управление контейнерами

Run

Создает и запускает контейнер на основе заданного образа:

CmdRun
func (srv *Server) CmdRun(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
	config, err := ParseRun(args)
	if err != nil {
		return err
	}
	if config.Image == "" {
		return fmt.Errorf("Image not specified")
	}
	if len(config.Cmd) == 0 {
		return fmt.Errorf("Command not specified")
	}
	// Create new container
	container, err := srv.runtime.Create(config)
	if err != nil {
		return errors.New("Error creating container: " + err.Error())
	}
	if config.OpenStdin {
		cmd_stdin, err := container.StdinPipe()
		if err != nil {
			return err
		}
		if !config.Detach {
			Go(func() error {
				_, err := io.Copy(cmd_stdin, stdin)
				cmd_stdin.Close()
				return err
			})
		}
	}
	// Run the container
	if !config.Detach {
		cmd_stderr, err := container.StderrPipe()
		if err != nil {
			return err
		}
		cmd_stdout, err := container.StdoutPipe()
		if err != nil {
			return err
		}
		if err := container.Start(); err != nil {
			return err
		}
		sending_stdout := Go(func() error {
			_, err := io.Copy(stdout, cmd_stdout)
			return err
		})
		sending_stderr := Go(func() error {
			_, err := io.Copy(stdout, cmd_stderr)
			return err
		})
		err_sending_stdout := <-sending_stdout
		err_sending_stderr := <-sending_stderr
		if err_sending_stdout != nil {
			return err_sending_stdout
		}
		if err_sending_stderr != nil {
			return err_sending_stderr
		}
		container.Wait()
	} else {
		if err := container.Start(); err != nil {
			return err
		}
		fmt.Fprintln(stdout, container.Id)
	}
	return nil
}

В начале функция ParseRun производит разбор параметров и инициализацию структуры Config:

ParseRun
func ParseRun(args []string) (*Config, error) {
	cmd := flag.NewFlagSet("", flag.ContinueOnError)
	cmd.SetOutput(ioutil.Discard)
	fl_user := cmd.String("u", "", "Username or UID")
	fl_detach := cmd.Bool("d", false, "Detached mode: leave the container running in the background")
	fl_stdin := cmd.Bool("i", false, "Keep stdin open even if not attached")
	fl_tty := cmd.Bool("t", false, "Allocate a pseudo-tty")
	fl_memory := cmd.Int64("m", 0, "Memory limit (in bytes)")
	var fl_ports ports

	cmd.Var(&fl_ports, "p", "Map a network port to the container")
	var fl_env ListOpts
	cmd.Var(&fl_env, "e", "Set environment variables")
	if err := cmd.Parse(args); err != nil {
		return nil, err
	}
	config := &Config{
		Ports:     fl_ports,
		User:      *fl_user,
		Tty:       *fl_tty,
		OpenStdin: *fl_stdin,
		Memory:    *fl_memory,
		Detach:    *fl_detach,
		Env:       fl_env,
		Cmd:       cmd.Args()[1:],
		Image:     cmd.Arg(0),
	}
	return config, nil
}

На основании Config функция runtime.Create создает и возвращает новый контейнер, после чего при помощи пайпов и каналов перенаправляет потоки stdin, stdout, stderr, а затем запускает созданный контейнер методом container.Start.

runtime.Create
func (runtime *Runtime) Create(config *Config) (*Container, error) {
	// Lookup image
	img, err := runtime.repositories.LookupImage(config.Image)
	if err != nil {
		return nil, err
	}
	container := &Container{
		// FIXME: we should generate the ID here instead of receiving it as an argument
		Id:              GenerateId(),
		Created:         time.Now(),
		Path:            config.Cmd[0],
		Args:            config.Cmd[1:], //FIXME: de-duplicate from config
		Config:          config,
		Image:           img.Id, // Always use the resolved image id
		NetworkSettings: &NetworkSettings{},
		// FIXME: do we need to store this in the container?
		SysInitPath: sysInitPath,
	}
	container.root = runtime.containerRoot(container.Id)
	// Step 1: create the container directory.
	// This doubles as a barrier to avoid race conditions.
	if err := os.Mkdir(container.root, 0700); err != nil {
		return nil, err
	}
	// Step 2: save the container json
	if err := container.ToDisk(); err != nil {
		return nil, err
	}
	// Step 3: register the container
	if err := runtime.Register(container); err != nil {
		return nil, err
	}
	return container, nil
}

Здесь происходит инициализация структуры Container, создание рабочей директории контейнера и экспорт структуры в json формате. Завершает это вызов метода runtime.Register, код которого мы разбирали в прошлой статье. Функция GenerateId была рассмотрена ранее в разделе по команде import. Теперь перейдем к методу container.Start:

container.Start
func (container *Container) Start() error {
	if err := container.EnsureMounted(); err != nil {
		return err
	}
	if err := container.allocateNetwork(); err != nil {
		return err
	}
	if err := container.generateLXCConfig(); err != nil {
		return err
	}
	params := []string{
		"-n", container.Id,
		"-f", container.lxcConfigPath(),
		"--",
		"/sbin/init",
	}

	// Networking
	params = append(params, "-g", container.network.Gateway.String())

	// User
	if container.Config.User != "" {
		params = append(params, "-u", container.Config.User)
	}

	// Program
	params = append(params, "--", container.Path)
	params = append(params, container.Args...)

	container.cmd = exec.Command("/usr/bin/lxc-start", params...)

	// Setup environment
	container.cmd.Env = append(
		[]string{
			"HOME=/",
			"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
		},
		container.Config.Env...,
	)

	var err error
	if container.Config.Tty {
		err = container.startPty()
	} else {
		err = container.start()
	}
	if err != nil {
		return err
	}
	// FIXME: save state on disk *first*, then converge
	// this way disk state is used as a journal, eg. we can restore after crash etc.
	container.State.setRunning(container.cmd.Process.Pid)
	container.ToDisk()
	go container.monitor()
	return nil
}

Основная логика его работы практически не изменилась со времен первого коммита, который мы разбирали в первой статье. Изменения коснулись части отвечающей за монтирование файловой системы, а также был добавлен код для сетевого стека и новый метод запуска процесса в контейнере. Разберем все по порядку, начиная с монтирования файловой системы:

EnsureMounted
func (container *Container) EnsureMounted() error {
	if mounted, err := container.Mounted(); err != nil {
		return err
	} else if mounted {
		return nil
	}
	return container.Mount()
} 

func (container *Container) Mounted() (bool, error) {
	return Mounted(container.RootfsPath())
} 

func Mounted(mountpoint string) (bool, error) {
	mntpoint, err := os.Stat(mountpoint)
	if err != nil {
		if os.IsNotExist(err) {
			return false, nil
		}
		return false, err
	}
	parent, err := os.Stat(filepath.Join(mountpoint, ".."))
	if err != nil {
		return false, err
	}
	mntpointSt := mntpoint.Sys().(*syscall.Stat_t)
	parentSt := parent.Sys().(*syscall.Stat_t)
	return mntpointSt.Dev != parentSt.Dev, nil
}

Метод container.EnsureMounted проверяет была ли смонтирована файловая система, в противном случае - выполняет монтирование вызовом container.Mount():

container.Mount
func (container *Container) Mount() error {
	image, err := container.GetImage()
	if err != nil {
		return err
	}
	return image.Mount(container.RootfsPath(), container.rwPath())
}

func (image *Image) Mount(root, rw string) error {
	if mounted, err := Mounted(root); err != nil {
		return err
	} else if mounted {
		return fmt.Errorf("%s is already mounted", root)
	}
	layers, err := image.layers()
	if err != nil {
		return err
	}
	// Create the target directories if they don't exist
	if err := os.Mkdir(root, 0755); err != nil && !os.IsExist(err) {
		return err
	}
	if err := os.Mkdir(rw, 0755); err != nil && !os.IsExist(err) {
		return err
	}
	// FIXME: @creack shouldn't we do this after going over changes?
	if err := MountAUFS(layers, rw, root); err != nil {
		return err
	}
	// FIXME: Create tests for deletion
	// FIXME: move this part to change.go
	// Retrieve the changeset from the parent and apply it to the container
	//  - Retrieve the changes
	changes, err := Changes(layers, layers[0])
	if err != nil {
		return err
	}
	// Iterate on changes
	for _, c := range changes {
		// If there is a delete
		if c.Kind == ChangeDelete {
			// Make sure the directory exists
			file_path, file_name := path.Dir(c.Path), path.Base(c.Path)
			if err := os.MkdirAll(path.Join(rw, file_path), 0755); err != nil {
				return err
			}
			// And create the whiteout (we just need to create empty file, discard the return)
			if _, err := os.Create(path.Join(path.Join(rw, file_path),
				".wh."+path.Base(file_name))); err != nil {
				return err
			}
		}
	}
	return nil
}

Подготовка параметров для монтирования производится функцией MountAUFS, логика которой аналогична первой версии, только теперь вместо утилиты mount монтирование производится системным вызовом:

MountAUFS
func MountAUFS(ro []string, rw string, target string) error {
	// FIXME: Now mount the layers
	rwBranch := fmt.Sprintf("%v=rw", rw)
	roBranches := ""
	for _, layer := range ro {
		roBranches += fmt.Sprintf("%v=ro:", layer)
	}
	branches := fmt.Sprintf("br:%v:%v", rwBranch, roBranches)
	return mount("none", target, "aufs", 0, branches)
}

func mount(source string, target string, fstype string, flags uintptr, data string) (err error) {
	return syscall.Mount(source, target, fstype, flags, data)
}

После монтирования функция получает изменения файловой системы с помощью команды Changes и производит создание файлов c .wh. префиксом в папке rw, если они были удалены в верхнем слое. Алгоритм работы этой функции будет рассмотрен в разделе команды Diff, вычисляющей изменения в файловой системе. Теперь перейдем к инициализации сети вызовом container.allocateNetwork:

container.allocateNetwork
func (container *Container) allocateNetwork() error {
	iface, err := container.runtime.networkManager.Allocate()
	if err != nil {
		return err
	}
	container.NetworkSettings.PortMapping = make(map[string]string)
	for _, port := range container.Config.Ports {
		if extPort, err := iface.AllocatePort(port); err != nil {
			iface.Release()
			return err
		} else {
			container.NetworkSettings.PortMapping[strconv.Itoa(port)] = strconv.Itoa(extPort)
		}
	}
	container.network = iface
	container.NetworkSettings.IpAddress = iface.IPNet.IP.String()
	container.NetworkSettings.IpPrefixLen, _ = iface.IPNet.Mask.Size()
	container.NetworkSettings.Gateway = iface.Gateway.String()
	return nil
}

В нем происходит настройка сетевого интерфейса, присвоение ip адреса, маски и шлюза, а также проброс портов. Я подробно разберу весь этот функционал ниже, в отдельной части по работе с сетевым стеком. Далее идет вызов метода generateLXCConfig. Он был подробно разобран в первой статье и остался практически без изменений. Стоит лишь отметить, что теперь в lxc_template.go добавлены настройки сети, монтирование /etc/resolv.conf для работы dns и, главное, монтирование исполняемого файла docker в точку /sbin/init, так как теперь выполнение процесса будет начинаться с него. Я уже обращал на это внимание в части 2.1. Ниже приведены изменения в lxc_template:

lxc_template.go
# network configuration
lxc.network.type = veth
lxc.network.flags = up
lxc.network.link = lxcbr0
lxc.network.name = eth0
lxc.network.mtu = 1500
lxc.network.ipv4 = {{.NetworkSettings.IpAddress}}/{{.NetworkSettings.IpPrefixLen}}

# Inject docker-init
lxc.mount.entry = {{.SysInitPath}} {{$ROOTFS}}/sbin/init none bind,ro 0 0

# In order to get a working DNS environment, mount bind (ro) the host's /etc/resolv.conf into the container
lxc.mount.entry = /etc/resolv.conf {{$ROOTFS}}/etc/resolv.conf none bind,ro 0 0 

Теперь осталось разобрать файл sysinit.go, с которого стартует созданный lxc контейнер:

sysinit.go
// Setup networking
func setupNetworking(gw string) {
	if gw == "" {
		return
	}
	cmd := exec.Command("/sbin/route", "add", "default", "gw", gw)
	if err := cmd.Run(); err != nil {
		log.Fatalf("Unable to set up networking: %v", err)
	}
}

// Takes care of dropping privileges to the desired user
func changeUser(u string) {
	if u == "" {
		return
	}
	userent, err := user.LookupId(u)
	if err != nil {
		userent, err = user.Lookup(u)
	}
	if err != nil {
		log.Fatalf("Unable to find user %v: %v", u, err)
	}

	uid, err := strconv.Atoi(userent.Uid)
	if err != nil {
		log.Fatalf("Invalid uid: %v", userent.Uid)
	}
	gid, err := strconv.Atoi(userent.Gid)
	if err != nil {
		log.Fatalf("Invalid gid: %v", userent.Gid)
	}

	if err := syscall.Setgid(gid); err != nil {
		log.Fatalf("setgid failed: %v", err)
	}
	if err := syscall.Setuid(uid); err != nil {
		log.Fatalf("setuid failed: %v", err)
	}
}

func executeProgram(name string, args []string) {
	path, err := exec.LookPath(name)
	if err != nil {
		log.Printf("Unable to locate %v", name)
		os.Exit(127)
	}

	if err := syscall.Exec(path, args, os.Environ()); err != nil {
		panic(err)
	}
}

// Sys Init code
// This code is run INSIDE the container and is responsible for setting
// up the environment before running the actual process
func SysInit() {
	if len(os.Args) <= 1 {
		fmt.Println("You should not invoke docker-init manually")
		os.Exit(1)
	}
	var u = flag.String("u", "", "username or uid")
	var gw = flag.String("g", "", "gateway address")

	flag.Parse()

	setupNetworking(*gw)
	changeUser(*u)
	executeProgram(flag.Arg(0), flag.Args())
}

Как видим, в SysInit происходит настройка окружения перед запуском процесса. Добавление default gateway в таблицу маршрутизации, настройка пользователя и группы, под которыми будет выполняться процесс, и собственно запуск процесса стандартным методом Exec.

После запуска процесса в методе Start идет перенаправление стандартных потоков и запуск горутины container.monitor, работу которой мы разбирали в первой статье. Можно лишь добавить, что теперь в ней происходит освобождение назначенного ip адреса, проброшенных портов и размонтирование файловой системы.

container.monitor
func (container *Container) monitor() {
	// Wait for the program to exit
	container.cmd.Wait()
	exitCode := container.cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()

	// Cleanup
	if err := container.releaseNetwork(); err != nil {
		log.Printf("%v: Failed to release network: %v", container.Id, err)
	}
	container.stdout.Close()
	container.stderr.Close()
	if err := container.Unmount(); err != nil {
		log.Printf("%v: Failed to umount filesystem: %v", container.Id, err)
	}

	// Re-create a brand new stdin pipe once the container exited
	if container.Config.OpenStdin {
		container.stdin, container.stdinPipe = io.Pipe()
	}

	// Report status back
	container.State.setStopped(exitCode)
	container.ToDisk()
}

func (container *Container) releaseNetwork() error {
	err := container.network.Release()
	container.network = nil
	container.NetworkSettings = &NetworkSettings{}
	return err
}

func (container *Container) Unmount() error {
	return Unmount(container.RootfsPath())
}

func Unmount(target string) error {
	if err := syscall.Unmount(target, 0); err != nil {
		return err
	}
	// Even though we just unmounted the filesystem, AUFS will prevent deleting the mntpoint
	// for some time. We'll just keep retrying until it succeeds.
	for retries := 0; retries < 1000; retries++ {
		err := os.Remove(target)
		if err == nil {
			// rm mntpoint succeeded
			return nil
		}
		if os.IsNotExist(err) {
			// mntpoint doesn't exist anymore. Success.
			return nil
		}
		// fmt.Printf("(%v) Remove %v returned: %v\n", retries, target, err)
		time.Sleep(10 * time.Millisecond)
	}
	return fmt.Errorf("Umount: Failed to umount %v", target)
}

Неожиданное завершение

Ввиду того, что редактор хабра начал сильно тормозить, а местами и полностью зависать (видимо сказывается большое количество снипетов кода), я вынужден завершить эту часть, но продолжу в следующей, прямо с этого места.

Комментарии (0)