Little helper to run CNCF's k3s in Docker
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
k3d/vendor/github.com/containerd/stargz-snapshotter/estargz/estargz.go

1223 lines
32 KiB

/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Copyright 2019 The Go Authors. All rights reserved.
Use of this source code is governed by a BSD-style
license that can be found in the LICENSE file.
*/
package estargz
import (
"bufio"
"bytes"
"compress/gzip"
"crypto/sha256"
"errors"
"fmt"
"hash"
"io"
"os"
"path"
"sort"
"strings"
"sync"
"time"
"github.com/containerd/stargz-snapshotter/estargz/errorutil"
digest "github.com/opencontainers/go-digest"
"github.com/vbatts/tar-split/archive/tar"
)
// A Reader permits random access reads from a stargz file.
type Reader struct {
sr *io.SectionReader
toc *JTOC
tocDigest digest.Digest
// m stores all non-chunk entries, keyed by name.
m map[string]*TOCEntry
// chunks stores all TOCEntry values for regular files that
// are split up. For a file with a single chunk, it's only
// stored in m.
chunks map[string][]*TOCEntry
decompressor Decompressor
}
type openOpts struct {
tocOffset int64
decompressors []Decompressor
telemetry *Telemetry
}
// OpenOption is an option used during opening the layer
type OpenOption func(o *openOpts) error
// WithTOCOffset option specifies the offset of TOC
func WithTOCOffset(tocOffset int64) OpenOption {
return func(o *openOpts) error {
o.tocOffset = tocOffset
return nil
}
}
// WithDecompressors option specifies decompressors to use.
// Default is gzip-based decompressor.
func WithDecompressors(decompressors ...Decompressor) OpenOption {
return func(o *openOpts) error {
o.decompressors = decompressors
return nil
}
}
// WithTelemetry option specifies the telemetry hooks
func WithTelemetry(telemetry *Telemetry) OpenOption {
return func(o *openOpts) error {
o.telemetry = telemetry
return nil
}
}
// MeasureLatencyHook is a func which takes start time and records the diff
type MeasureLatencyHook func(time.Time)
// Telemetry is a struct which defines telemetry hooks. By implementing these hooks you should be able to record
// the latency metrics of the respective steps of estargz open operation. To be used with estargz.OpenWithTelemetry(...)
type Telemetry struct {
GetFooterLatency MeasureLatencyHook // measure time to get stargz footer (in milliseconds)
GetTocLatency MeasureLatencyHook // measure time to GET TOC JSON (in milliseconds)
DeserializeTocLatency MeasureLatencyHook // measure time to deserialize TOC JSON (in milliseconds)
}
// Open opens a stargz file for reading.
// The behavior is configurable using options.
//
// Note that each entry name is normalized as the path that is relative to root.
func Open(sr *io.SectionReader, opt ...OpenOption) (*Reader, error) {
var opts openOpts
for _, o := range opt {
if err := o(&opts); err != nil {
return nil, err
}
}
gzipCompressors := []Decompressor{new(GzipDecompressor), new(LegacyGzipDecompressor)}
decompressors := append(gzipCompressors, opts.decompressors...)
// Determine the size to fetch. Try to fetch as many bytes as possible.
fetchSize := maxFooterSize(sr.Size(), decompressors...)
if maybeTocOffset := opts.tocOffset; maybeTocOffset > fetchSize {
if maybeTocOffset > sr.Size() {
return nil, fmt.Errorf("blob size %d is smaller than the toc offset", sr.Size())
}
fetchSize = sr.Size() - maybeTocOffset
}
start := time.Now() // before getting layer footer
footer := make([]byte, fetchSize)
if _, err := sr.ReadAt(footer, sr.Size()-fetchSize); err != nil {
return nil, fmt.Errorf("error reading footer: %v", err)
}
if opts.telemetry != nil && opts.telemetry.GetFooterLatency != nil {
opts.telemetry.GetFooterLatency(start)
}
var allErr []error
var found bool
var r *Reader
for _, d := range decompressors {
fSize := d.FooterSize()
fOffset := positive(int64(len(footer)) - fSize)
maybeTocBytes := footer[:fOffset]
_, tocOffset, tocSize, err := d.ParseFooter(footer[fOffset:])
if err != nil {
allErr = append(allErr, err)
continue
}
if tocOffset >= 0 && tocSize <= 0 {
tocSize = sr.Size() - tocOffset - fSize
}
if tocOffset >= 0 && tocSize < int64(len(maybeTocBytes)) {
maybeTocBytes = maybeTocBytes[:tocSize]
}
r, err = parseTOC(d, sr, tocOffset, tocSize, maybeTocBytes, opts)
if err == nil {
found = true
break
}
allErr = append(allErr, err)
}
if !found {
return nil, errorutil.Aggregate(allErr)
}
if err := r.initFields(); err != nil {
return nil, fmt.Errorf("failed to initialize fields of entries: %v", err)
}
return r, nil
}
// OpenFooter extracts and parses footer from the given blob.
// only supports gzip-based eStargz.
func OpenFooter(sr *io.SectionReader) (tocOffset int64, footerSize int64, rErr error) {
if sr.Size() < FooterSize && sr.Size() < legacyFooterSize {
return 0, 0, fmt.Errorf("blob size %d is smaller than the footer size", sr.Size())
}
var footer [FooterSize]byte
if _, err := sr.ReadAt(footer[:], sr.Size()-FooterSize); err != nil {
return 0, 0, fmt.Errorf("error reading footer: %v", err)
}
var allErr []error
for _, d := range []Decompressor{new(GzipDecompressor), new(LegacyGzipDecompressor)} {
fSize := d.FooterSize()
fOffset := positive(int64(len(footer)) - fSize)
_, tocOffset, _, err := d.ParseFooter(footer[fOffset:])
if err == nil {
return tocOffset, fSize, err
}
allErr = append(allErr, err)
}
return 0, 0, errorutil.Aggregate(allErr)
}
// initFields populates the Reader from r.toc after decoding it from
// JSON.
//
// Unexported fields are populated and TOCEntry fields that were
// implicit in the JSON are populated.
func (r *Reader) initFields() error {
r.m = make(map[string]*TOCEntry, len(r.toc.Entries))
r.chunks = make(map[string][]*TOCEntry)
var lastPath string
uname := map[int]string{}
gname := map[int]string{}
var lastRegEnt *TOCEntry
var chunkTopIndex int
for i, ent := range r.toc.Entries {
ent.Name = cleanEntryName(ent.Name)
switch ent.Type {
case "reg", "chunk":
if ent.Offset != r.toc.Entries[chunkTopIndex].Offset {
chunkTopIndex = i
}
ent.chunkTopIndex = chunkTopIndex
}
if ent.Type == "reg" {
lastRegEnt = ent
}
if ent.Type == "chunk" {
ent.Name = lastPath
r.chunks[ent.Name] = append(r.chunks[ent.Name], ent)
if ent.ChunkSize == 0 && lastRegEnt != nil {
ent.ChunkSize = lastRegEnt.Size - ent.ChunkOffset
}
} else {
lastPath = ent.Name
if ent.Uname != "" {
uname[ent.UID] = ent.Uname
} else {
ent.Uname = uname[ent.UID]
}
if ent.Gname != "" {
gname[ent.GID] = ent.Gname
} else {
ent.Gname = uname[ent.GID]
}
ent.modTime, _ = time.Parse(time.RFC3339, ent.ModTime3339)
if ent.Type == "dir" {
ent.NumLink++ // Parent dir links to this directory
}
r.m[ent.Name] = ent
}
if ent.Type == "reg" && ent.ChunkSize > 0 && ent.ChunkSize < ent.Size {
r.chunks[ent.Name] = make([]*TOCEntry, 0, ent.Size/ent.ChunkSize+1)
r.chunks[ent.Name] = append(r.chunks[ent.Name], ent)
}
if ent.ChunkSize == 0 && ent.Size != 0 {
ent.ChunkSize = ent.Size
}
}
// Populate children, add implicit directories:
for _, ent := range r.toc.Entries {
if ent.Type == "chunk" {
continue
}
// add "foo/":
// add "foo" child to "" (creating "" if necessary)
//
// add "foo/bar/":
// add "bar" child to "foo" (creating "foo" if necessary)
//
// add "foo/bar.txt":
// add "bar.txt" child to "foo" (creating "foo" if necessary)
//
// add "a/b/c/d/e/f.txt":
// create "a/b/c/d/e" node
// add "f.txt" child to "e"
name := ent.Name
pdirName := parentDir(name)
if name == pdirName {
// This entry and its parent are the same.
// Ignore this for avoiding infinite loop of the reference.
// The example case where this can occur is when tar contains the root
// directory itself (e.g. "./", "/").
continue
}
pdir := r.getOrCreateDir(pdirName)
ent.NumLink++ // at least one name(ent.Name) references this entry.
if ent.Type == "hardlink" {
org, err := r.getSource(ent)
if err != nil {
return err
}
org.NumLink++ // original entry is referenced by this ent.Name.
ent = org
}
pdir.addChild(path.Base(name), ent)
}
lastOffset := r.sr.Size()
for i := len(r.toc.Entries) - 1; i >= 0; i-- {
e := r.toc.Entries[i]
if e.isDataType() {
e.nextOffset = lastOffset
}
if e.Offset != 0 && e.InnerOffset == 0 {
lastOffset = e.Offset
}
}
return nil
}
func (r *Reader) getSource(ent *TOCEntry) (_ *TOCEntry, err error) {
if ent.Type == "hardlink" {
org, ok := r.m[cleanEntryName(ent.LinkName)]
if !ok {
return nil, fmt.Errorf("%q is a hardlink but the linkname %q isn't found", ent.Name, ent.LinkName)
}
ent, err = r.getSource(org)
if err != nil {
return nil, err
}
}
return ent, nil
}
func parentDir(p string) string {
dir, _ := path.Split(p)
return strings.TrimSuffix(dir, "/")
}
func (r *Reader) getOrCreateDir(d string) *TOCEntry {
e, ok := r.m[d]
if !ok {
e = &TOCEntry{
Name: d,
Type: "dir",
Mode: 0755,
NumLink: 2, // The directory itself(.) and the parent link to this directory.
}
r.m[d] = e
if d != "" {
pdir := r.getOrCreateDir(parentDir(d))
pdir.addChild(path.Base(d), e)
}
}
return e
}
func (r *Reader) TOCDigest() digest.Digest {
return r.tocDigest
}
// VerifyTOC checks that the TOC JSON in the passed blob matches the
// passed digests and that the TOC JSON contains digests for all chunks
// contained in the blob. If the verification succceeds, this function
// returns TOCEntryVerifier which holds all chunk digests in the stargz blob.
func (r *Reader) VerifyTOC(tocDigest digest.Digest) (TOCEntryVerifier, error) {
// Verify the digest of TOC JSON
if r.tocDigest != tocDigest {
return nil, fmt.Errorf("invalid TOC JSON %q; want %q", r.tocDigest, tocDigest)
}
return r.Verifiers()
}
// Verifiers returns TOCEntryVerifier of this chunk. Use VerifyTOC instead in most cases
// because this doesn't verify TOC.
func (r *Reader) Verifiers() (TOCEntryVerifier, error) {
chunkDigestMap := make(map[int64]digest.Digest) // map from chunk offset to the chunk digest
regDigestMap := make(map[int64]digest.Digest) // map from chunk offset to the reg file digest
var chunkDigestMapIncomplete bool
var regDigestMapIncomplete bool
var containsChunk bool
for _, e := range r.toc.Entries {
if e.Type != "reg" && e.Type != "chunk" {
continue
}
// offset must be unique in stargz blob
_, dOK := chunkDigestMap[e.Offset]
_, rOK := regDigestMap[e.Offset]
if dOK || rOK {
return nil, fmt.Errorf("offset %d found twice", e.Offset)
}
if e.Type == "reg" {
if e.Size == 0 {
continue // ignores empty file
}
// record the digest of regular file payload
if e.Digest != "" {
d, err := digest.Parse(e.Digest)
if err != nil {
return nil, fmt.Errorf("failed to parse regular file digest %q: %w", e.Digest, err)
}
regDigestMap[e.Offset] = d
} else {
regDigestMapIncomplete = true
}
} else {
containsChunk = true // this layer contains "chunk" entries.
}
// "reg" also can contain ChunkDigest (e.g. when "reg" is the first entry of
// chunked file)
if e.ChunkDigest != "" {
d, err := digest.Parse(e.ChunkDigest)
if err != nil {
return nil, fmt.Errorf("failed to parse chunk digest %q: %w", e.ChunkDigest, err)
}
chunkDigestMap[e.Offset] = d
} else {
chunkDigestMapIncomplete = true
}
}
if chunkDigestMapIncomplete {
// Though some chunk digests are not found, if this layer doesn't contain
// "chunk"s and all digest of "reg" files are recorded, we can use them instead.
if !containsChunk && !regDigestMapIncomplete {
return &verifier{digestMap: regDigestMap}, nil
}
return nil, fmt.Errorf("some ChunkDigest not found in TOC JSON")
}
return &verifier{digestMap: chunkDigestMap}, nil
}
// verifier is an implementation of TOCEntryVerifier which holds verifiers keyed by
// offset of the chunk.
type verifier struct {
digestMap map[int64]digest.Digest
digestMapMu sync.Mutex
}
// Verifier returns a content verifier specified by TOCEntry.
func (v *verifier) Verifier(ce *TOCEntry) (digest.Verifier, error) {
v.digestMapMu.Lock()
defer v.digestMapMu.Unlock()
d, ok := v.digestMap[ce.Offset]
if !ok {
return nil, fmt.Errorf("verifier for offset=%d,size=%d hasn't been registered",
ce.Offset, ce.ChunkSize)
}
return d.Verifier(), nil
}
// ChunkEntryForOffset returns the TOCEntry containing the byte of the
// named file at the given offset within the file.
// Name must be absolute path or one that is relative to root.
func (r *Reader) ChunkEntryForOffset(name string, offset int64) (e *TOCEntry, ok bool) {
name = cleanEntryName(name)
e, ok = r.Lookup(name)
if !ok || !e.isDataType() {
return nil, false
}
ents := r.chunks[name]
if len(ents) < 2 {
if offset >= e.ChunkSize {
return nil, false
}
return e, true
}
i := sort.Search(len(ents), func(i int) bool {
e := ents[i]
return e.ChunkOffset >= offset || (offset > e.ChunkOffset && offset < e.ChunkOffset+e.ChunkSize)
})
if i == len(ents) {
return nil, false
}
return ents[i], true
}
// Lookup returns the Table of Contents entry for the given path.
//
// To get the root directory, use the empty string.
// Path must be absolute path or one that is relative to root.
func (r *Reader) Lookup(path string) (e *TOCEntry, ok bool) {
path = cleanEntryName(path)
if r == nil {
return
}
e, ok = r.m[path]
if ok && e.Type == "hardlink" {
var err error
e, err = r.getSource(e)
if err != nil {
return nil, false
}
}
return
}
// OpenFile returns the reader of the specified file payload.
//
// Name must be absolute path or one that is relative to root.
func (r *Reader) OpenFile(name string) (*io.SectionReader, error) {
fr, err := r.newFileReader(name)
if err != nil {
return nil, err
}
return io.NewSectionReader(fr, 0, fr.size), nil
}
func (r *Reader) newFileReader(name string) (*fileReader, error) {
name = cleanEntryName(name)
ent, ok := r.Lookup(name)
if !ok {
// TODO: come up with some error plan. This is lazy:
return nil, &os.PathError{
Path: name,
Op: "OpenFile",
Err: os.ErrNotExist,
}
}
if ent.Type != "reg" {
return nil, &os.PathError{
Path: name,
Op: "OpenFile",
Err: errors.New("not a regular file"),
}
}
return &fileReader{
r: r,
size: ent.Size,
ents: r.getChunks(ent),
}, nil
}
func (r *Reader) OpenFileWithPreReader(name string, preRead func(*TOCEntry, io.Reader) error) (*io.SectionReader, error) {
fr, err := r.newFileReader(name)
if err != nil {
return nil, err
}
fr.preRead = preRead
return io.NewSectionReader(fr, 0, fr.size), nil
}
func (r *Reader) getChunks(ent *TOCEntry) []*TOCEntry {
if ents, ok := r.chunks[ent.Name]; ok {
return ents
}
return []*TOCEntry{ent}
}
type fileReader struct {
r *Reader
size int64
ents []*TOCEntry // 1 or more reg/chunk entries
preRead func(*TOCEntry, io.Reader) error
}
func (fr *fileReader) ReadAt(p []byte, off int64) (n int, err error) {
if off >= fr.size {
return 0, io.EOF
}
if off < 0 {
return 0, errors.New("invalid offset")
}
var i int
if len(fr.ents) > 1 {
i = sort.Search(len(fr.ents), func(i int) bool {
return fr.ents[i].ChunkOffset >= off
})
if i == len(fr.ents) {
i = len(fr.ents) - 1
}
}
ent := fr.ents[i]
if ent.ChunkOffset > off {
if i == 0 {
return 0, errors.New("internal error; first chunk offset is non-zero")
}
ent = fr.ents[i-1]
}
// If ent is a chunk of a large file, adjust the ReadAt
// offset by the chunk's offset.
off -= ent.ChunkOffset
finalEnt := fr.ents[len(fr.ents)-1]
compressedOff := ent.Offset
// compressedBytesRemain is the number of compressed bytes in this
// file remaining, over 1+ chunks.
compressedBytesRemain := finalEnt.NextOffset() - compressedOff
sr := io.NewSectionReader(fr.r.sr, compressedOff, compressedBytesRemain)
const maxRead = 2 << 20
var bufSize = maxRead
if compressedBytesRemain < maxRead {
bufSize = int(compressedBytesRemain)
}
br := bufio.NewReaderSize(sr, bufSize)
if _, err := br.Peek(bufSize); err != nil {
return 0, fmt.Errorf("fileReader.ReadAt.peek: %v", err)
}
dr, err := fr.r.decompressor.Reader(br)
if err != nil {
return 0, fmt.Errorf("fileReader.ReadAt.decompressor.Reader: %v", err)
}
defer dr.Close()
if fr.preRead == nil {
if n, err := io.CopyN(io.Discard, dr, ent.InnerOffset+off); n != ent.InnerOffset+off || err != nil {
return 0, fmt.Errorf("discard of %d bytes != %v, %v", ent.InnerOffset+off, n, err)
}
return io.ReadFull(dr, p)
}
var retN int
var retErr error
var found bool
var nr int64
for _, e := range fr.r.toc.Entries[ent.chunkTopIndex:] {
if !e.isDataType() {
continue
}
if e.Offset != fr.r.toc.Entries[ent.chunkTopIndex].Offset {
break
}
if in, err := io.CopyN(io.Discard, dr, e.InnerOffset-nr); err != nil || in != e.InnerOffset-nr {
return 0, fmt.Errorf("discard of remaining %d bytes != %v, %v", e.InnerOffset-nr, in, err)
}
nr = e.InnerOffset
if e == ent {
found = true
if n, err := io.CopyN(io.Discard, dr, off); n != off || err != nil {
return 0, fmt.Errorf("discard of offset %d bytes != %v, %v", off, n, err)
}
retN, retErr = io.ReadFull(dr, p)
nr += off + int64(retN)
continue
}
cr := &countReader{r: io.LimitReader(dr, e.ChunkSize)}
if err := fr.preRead(e, cr); err != nil {
return 0, fmt.Errorf("failed to pre read: %w", err)
}
nr += cr.n
}
if !found {
return 0, fmt.Errorf("fileReader.ReadAt: target entry not found")
}
return retN, retErr
}
// A Writer writes stargz files.
//
// Use NewWriter to create a new Writer.
type Writer struct {
bw *bufio.Writer
cw *countWriter
toc *JTOC
diffHash hash.Hash // SHA-256 of uncompressed tar
closed bool
gz io.WriteCloser
lastUsername map[int]string
lastGroupname map[int]string
compressor Compressor
uncompressedCounter *countWriteFlusher
// ChunkSize optionally controls the maximum number of bytes
// of data of a regular file that can be written in one gzip
// stream before a new gzip stream is started.
// Zero means to use a default, currently 4 MiB.
ChunkSize int
// MinChunkSize optionally controls the minimum number of bytes
// of data must be written in one gzip stream before a new gzip
// NOTE: This adds a TOC property that stargz snapshotter < v0.13.0 doesn't understand.
MinChunkSize int
needsOpenGzEntries map[string]struct{}
}
// currentCompressionWriter writes to the current w.gz field, which can
// change throughout writing a tar entry.
//
// Additionally, it updates w's SHA-256 of the uncompressed bytes
// of the tar file.
type currentCompressionWriter struct{ w *Writer }
func (ccw currentCompressionWriter) Write(p []byte) (int, error) {
ccw.w.diffHash.Write(p)
if ccw.w.gz == nil {
if err := ccw.w.condOpenGz(); err != nil {
return 0, err
}
}
return ccw.w.gz.Write(p)
}
func (w *Writer) chunkSize() int {
if w.ChunkSize <= 0 {
return 4 << 20
}
return w.ChunkSize
}
// Unpack decompresses the given estargz blob and returns a ReadCloser of the tar blob.
// TOC JSON and footer are removed.
func Unpack(sr *io.SectionReader, c Decompressor) (io.ReadCloser, error) {
footerSize := c.FooterSize()
if sr.Size() < footerSize {
return nil, fmt.Errorf("blob is too small; %d < %d", sr.Size(), footerSize)
}
footerOffset := sr.Size() - footerSize
footer := make([]byte, footerSize)
if _, err := sr.ReadAt(footer, footerOffset); err != nil {
return nil, err
}
blobPayloadSize, _, _, err := c.ParseFooter(footer)
if err != nil {
return nil, fmt.Errorf("failed to parse footer: %w", err)
}
if blobPayloadSize < 0 {
blobPayloadSize = sr.Size()
}
return c.Reader(io.LimitReader(sr, blobPayloadSize))
}
// NewWriter returns a new stargz writer (gzip-based) writing to w.
//
// The writer must be closed to write its trailing table of contents.
func NewWriter(w io.Writer) *Writer {
return NewWriterLevel(w, gzip.BestCompression)
}
// NewWriterLevel returns a new stargz writer (gzip-based) writing to w.
// The compression level is configurable.
//
// The writer must be closed to write its trailing table of contents.
func NewWriterLevel(w io.Writer, compressionLevel int) *Writer {
return NewWriterWithCompressor(w, NewGzipCompressorWithLevel(compressionLevel))
}
// NewWriterWithCompressor returns a new stargz writer writing to w.
// The compression method is configurable.
//
// The writer must be closed to write its trailing table of contents.
func NewWriterWithCompressor(w io.Writer, c Compressor) *Writer {
bw := bufio.NewWriter(w)
cw := &countWriter{w: bw}
return &Writer{
bw: bw,
cw: cw,
toc: &JTOC{Version: 1},
diffHash: sha256.New(),
compressor: c,
uncompressedCounter: &countWriteFlusher{},
}
}
// Close writes the stargz's table of contents and flushes all the
// buffers, returning any error.
func (w *Writer) Close() (digest.Digest, error) {
if w.closed {
return "", nil
}
defer func() { w.closed = true }()
if err := w.closeGz(); err != nil {
return "", err
}
// Write the TOC index and footer.
tocDigest, err := w.compressor.WriteTOCAndFooter(w.cw, w.cw.n, w.toc, w.diffHash)
if err != nil {
return "", err
}
if err := w.bw.Flush(); err != nil {
return "", err
}
return tocDigest, nil
}
func (w *Writer) closeGz() error {
if w.closed {
return errors.New("write on closed Writer")
}
if w.gz != nil {
if err := w.gz.Close(); err != nil {
return err
}
w.gz = nil
}
return nil
}
func (w *Writer) flushGz() error {
if w.closed {
return errors.New("flush on closed Writer")
}
if w.gz != nil {
if f, ok := w.gz.(interface {
Flush() error
}); ok {
return f.Flush()
}
}
return nil
}
// nameIfChanged returns name, unless it was the already the value of (*mp)[id],
// in which case it returns the empty string.
func (w *Writer) nameIfChanged(mp *map[int]string, id int, name string) string {
if name == "" {
return ""
}
if *mp == nil {
*mp = make(map[int]string)
}
if (*mp)[id] == name {
return ""
}
(*mp)[id] = name
return name
}
func (w *Writer) condOpenGz() (err error) {
if w.gz == nil {
w.gz, err = w.compressor.Writer(w.cw)
if w.gz != nil {
w.gz = w.uncompressedCounter.register(w.gz)
}
}
return
}
// AppendTar reads the tar or tar.gz file from r and appends
// each of its contents to w.
//
// The input r can optionally be gzip compressed but the output will
// always be compressed by the specified compressor.
func (w *Writer) AppendTar(r io.Reader) error {
return w.appendTar(r, false)
}
// AppendTarLossLess reads the tar or tar.gz file from r and appends
// each of its contents to w.
//
// The input r can optionally be gzip compressed but the output will
// always be compressed by the specified compressor.
//
// The difference of this func with AppendTar is that this writes
// the input tar stream into w without any modification (e.g. to header bytes).
//
// Note that if the input tar stream already contains TOC JSON, this returns
// error because w cannot overwrite the TOC JSON to the one generated by w without
// lossy modification. To avoid this error, if the input stream is known to be stargz/estargz,
// you shoud decompress it and remove TOC JSON in advance.
func (w *Writer) AppendTarLossLess(r io.Reader) error {
return w.appendTar(r, true)
}
func (w *Writer) appendTar(r io.Reader, lossless bool) error {
var src io.Reader
br := bufio.NewReader(r)
if isGzip(br) {
zr, _ := gzip.NewReader(br)
src = zr
} else {
src = io.Reader(br)
}
dst := currentCompressionWriter{w}
var tw *tar.Writer
if !lossless {
tw = tar.NewWriter(dst) // use tar writer only when this isn't lossless mode.
}
tr := tar.NewReader(src)
if lossless {
tr.RawAccounting = true
}
prevOffset := w.cw.n
var prevOffsetUncompressed int64
for {
h, err := tr.Next()
if err == io.EOF {
if lossless {
if remain := tr.RawBytes(); len(remain) > 0 {
// Collect the remaining null bytes.
// https://github.com/vbatts/tar-split/blob/80a436fd6164c557b131f7c59ed69bd81af69761/concept/main.go#L49-L53
if _, err := dst.Write(remain); err != nil {
return err
}
}
}
break
}
if err != nil {
return fmt.Errorf("error reading from source tar: tar.Reader.Next: %v", err)
}
if cleanEntryName(h.Name) == TOCTarName {
// It is possible for a layer to be "stargzified" twice during the
// distribution lifecycle. So we reserve "TOCTarName" here to avoid
// duplicated entries in the resulting layer.
if lossless {
// We cannot handle this in lossless way.
return fmt.Errorf("existing TOC JSON is not allowed; decompress layer before append")
}
continue
}
xattrs := make(map[string][]byte)
const xattrPAXRecordsPrefix = "SCHILY.xattr."
if h.PAXRecords != nil {
for k, v := range h.PAXRecords {
if strings.HasPrefix(k, xattrPAXRecordsPrefix) {
xattrs[k[len(xattrPAXRecordsPrefix):]] = []byte(v)
}
}
}
ent := &TOCEntry{
Name: h.Name,
Mode: h.Mode,
UID: h.Uid,
GID: h.Gid,
Uname: w.nameIfChanged(&w.lastUsername, h.Uid, h.Uname),
Gname: w.nameIfChanged(&w.lastGroupname, h.Gid, h.Gname),
ModTime3339: formatModtime(h.ModTime),
Xattrs: xattrs,
}
if err := w.condOpenGz(); err != nil {
return err
}
if tw != nil {
if err := tw.WriteHeader(h); err != nil {
return err
}
} else {
if _, err := dst.Write(tr.RawBytes()); err != nil {
return err
}
}
switch h.Typeflag {
case tar.TypeLink:
ent.Type = "hardlink"
ent.LinkName = h.Linkname
case tar.TypeSymlink:
ent.Type = "symlink"
ent.LinkName = h.Linkname
case tar.TypeDir:
ent.Type = "dir"
case tar.TypeReg:
ent.Type = "reg"
ent.Size = h.Size
case tar.TypeChar:
ent.Type = "char"
ent.DevMajor = int(h.Devmajor)
ent.DevMinor = int(h.Devminor)
case tar.TypeBlock:
ent.Type = "block"
ent.DevMajor = int(h.Devmajor)
ent.DevMinor = int(h.Devminor)
case tar.TypeFifo:
ent.Type = "fifo"
default:
return fmt.Errorf("unsupported input tar entry %q", h.Typeflag)
}
// We need to keep a reference to the TOC entry for regular files, so that we
// can fill the digest later.
var regFileEntry *TOCEntry
var payloadDigest digest.Digester
if h.Typeflag == tar.TypeReg {
regFileEntry = ent
payloadDigest = digest.Canonical.Digester()
}
if h.Typeflag == tar.TypeReg && ent.Size > 0 {
var written int64
totalSize := ent.Size // save it before we destroy ent
tee := io.TeeReader(tr, payloadDigest.Hash())
for written < totalSize {
chunkSize := int64(w.chunkSize())
remain := totalSize - written
if remain < chunkSize {
chunkSize = remain
} else {
ent.ChunkSize = chunkSize
}
// We flush the underlying compression writer here to correctly calculate "w.cw.n".
if err := w.flushGz(); err != nil {
return err
}
if w.needsOpenGz(ent) || w.cw.n-prevOffset >= int64(w.MinChunkSize) {
if err := w.closeGz(); err != nil {
return err
}
ent.Offset = w.cw.n
prevOffset = ent.Offset
prevOffsetUncompressed = w.uncompressedCounter.n
} else {
ent.Offset = prevOffset
ent.InnerOffset = w.uncompressedCounter.n - prevOffsetUncompressed
}
ent.ChunkOffset = written
chunkDigest := digest.Canonical.Digester()
if err := w.condOpenGz(); err != nil {
return err
}
teeChunk := io.TeeReader(tee, chunkDigest.Hash())
var out io.Writer
if tw != nil {
out = tw
} else {
out = dst
}
if _, err := io.CopyN(out, teeChunk, chunkSize); err != nil {
return fmt.Errorf("error copying %q: %v", h.Name, err)
}
ent.ChunkDigest = chunkDigest.Digest().String()
w.toc.Entries = append(w.toc.Entries, ent)
written += chunkSize
ent = &TOCEntry{
Name: h.Name,
Type: "chunk",
}
}
} else {
w.toc.Entries = append(w.toc.Entries, ent)
}
if payloadDigest != nil {
regFileEntry.Digest = payloadDigest.Digest().String()
}
if tw != nil {
if err := tw.Flush(); err != nil {
return err
}
}
}
remainDest := io.Discard
if lossless {
remainDest = dst // Preserve the remaining bytes in lossless mode
}
_, err := io.Copy(remainDest, src)
return err
}
func (w *Writer) needsOpenGz(ent *TOCEntry) bool {
if ent.Type != "reg" {
return false
}
if w.needsOpenGzEntries == nil {
return false
}
_, ok := w.needsOpenGzEntries[ent.Name]
return ok
}
// DiffID returns the SHA-256 of the uncompressed tar bytes.
// It is only valid to call DiffID after Close.
func (w *Writer) DiffID() string {
return fmt.Sprintf("sha256:%x", w.diffHash.Sum(nil))
}
func maxFooterSize(blobSize int64, decompressors ...Decompressor) (res int64) {
for _, d := range decompressors {
if s := d.FooterSize(); res < s && s <= blobSize {
res = s
}
}
return
}
func parseTOC(d Decompressor, sr *io.SectionReader, tocOff, tocSize int64, tocBytes []byte, opts openOpts) (*Reader, error) {
if tocOff < 0 {
// This means that TOC isn't contained in the blob.
// We pass nil reader to ParseTOC and expect that ParseTOC acquire TOC from
// the external location.
start := time.Now()
toc, tocDgst, err := d.ParseTOC(nil)
if err != nil {
return nil, err
}
if opts.telemetry != nil && opts.telemetry.GetTocLatency != nil {
opts.telemetry.GetTocLatency(start)
}
if opts.telemetry != nil && opts.telemetry.DeserializeTocLatency != nil {
opts.telemetry.DeserializeTocLatency(start)
}
return &Reader{
sr: sr,
toc: toc,
tocDigest: tocDgst,
decompressor: d,
}, nil
}
if len(tocBytes) > 0 {
start := time.Now()
toc, tocDgst, err := d.ParseTOC(bytes.NewReader(tocBytes))
if err == nil {
if opts.telemetry != nil && opts.telemetry.DeserializeTocLatency != nil {
opts.telemetry.DeserializeTocLatency(start)
}
return &Reader{
sr: sr,
toc: toc,
tocDigest: tocDgst,
decompressor: d,
}, nil
}
}
start := time.Now()
tocBytes = make([]byte, tocSize)
if _, err := sr.ReadAt(tocBytes, tocOff); err != nil {
return nil, fmt.Errorf("error reading %d byte TOC targz: %v", len(tocBytes), err)
}
if opts.telemetry != nil && opts.telemetry.GetTocLatency != nil {
opts.telemetry.GetTocLatency(start)
}
start = time.Now()
toc, tocDgst, err := d.ParseTOC(bytes.NewReader(tocBytes))
if err != nil {
return nil, err
}
if opts.telemetry != nil && opts.telemetry.DeserializeTocLatency != nil {
opts.telemetry.DeserializeTocLatency(start)
}
return &Reader{
sr: sr,
toc: toc,
tocDigest: tocDgst,
decompressor: d,
}, nil
}
func formatModtime(t time.Time) string {
if t.IsZero() || t.Unix() == 0 {
return ""
}
return t.UTC().Round(time.Second).Format(time.RFC3339)
}
func cleanEntryName(name string) string {
// Use path.Clean to consistently deal with path separators across platforms.
return strings.TrimPrefix(path.Clean("/"+name), "/")
}
// countWriter counts how many bytes have been written to its wrapped
// io.Writer.
type countWriter struct {
w io.Writer
n int64
}
func (cw *countWriter) Write(p []byte) (n int, err error) {
n, err = cw.w.Write(p)
cw.n += int64(n)
return
}
type countWriteFlusher struct {
io.WriteCloser
n int64
}
func (wc *countWriteFlusher) register(w io.WriteCloser) io.WriteCloser {
wc.WriteCloser = w
return wc
}
func (wc *countWriteFlusher) Write(p []byte) (n int, err error) {
n, err = wc.WriteCloser.Write(p)
wc.n += int64(n)
return
}
func (wc *countWriteFlusher) Flush() error {
if f, ok := wc.WriteCloser.(interface {
Flush() error
}); ok {
return f.Flush()
}
return nil
}
func (wc *countWriteFlusher) Close() error {
err := wc.WriteCloser.Close()
wc.WriteCloser = nil
return err
}
// isGzip reports whether br is positioned right before an upcoming gzip stream.
// It does not consume any bytes from br.
func isGzip(br *bufio.Reader) bool {
const (
gzipID1 = 0x1f
gzipID2 = 0x8b
gzipDeflate = 8
)
peek, _ := br.Peek(3)
return len(peek) >= 3 && peek[0] == gzipID1 && peek[1] == gzipID2 && peek[2] == gzipDeflate
}
func positive(n int64) int64 {
if n < 0 {
return 0
}
return n
}
type countReader struct {
r io.Reader
n int64
}
func (cr *countReader) Read(p []byte) (n int, err error) {
n, err = cr.r.Read(p)
cr.n += int64(n)
return
}