mirror of https://github.com/k3d-io/k3d
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
560 lines
12 KiB
560 lines
12 KiB
1 year ago
|
// Copyright 2023 Google LLC All Rights Reserved.
|
||
|
//
|
||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
// you may not use this file except in compliance with the License.
|
||
|
// You may obtain a copy of the License at
|
||
|
//
|
||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||
|
//
|
||
|
// Unless required by applicable law or agreed to in writing, software
|
||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
// See the License for the specific language governing permissions and
|
||
|
// limitations under the License.
|
||
|
|
||
|
package remote
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"context"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"net/http"
|
||
|
"net/url"
|
||
|
"sync"
|
||
|
|
||
|
"github.com/google/go-containerregistry/pkg/logs"
|
||
|
"github.com/google/go-containerregistry/pkg/name"
|
||
|
v1 "github.com/google/go-containerregistry/pkg/v1"
|
||
|
"github.com/google/go-containerregistry/pkg/v1/partial"
|
||
|
"github.com/google/go-containerregistry/pkg/v1/remote/transport"
|
||
|
"github.com/google/go-containerregistry/pkg/v1/stream"
|
||
|
"github.com/google/go-containerregistry/pkg/v1/types"
|
||
|
"golang.org/x/sync/errgroup"
|
||
|
)
|
||
|
|
||
|
type manifest interface {
|
||
|
Taggable
|
||
|
partial.Describable
|
||
|
}
|
||
|
|
||
|
// key is either v1.Hash or v1.Layer (for stream.Layer)
|
||
|
type workers struct {
|
||
|
// map[v1.Hash|v1.Layer]*sync.Once
|
||
|
onces sync.Map
|
||
|
|
||
|
// map[v1.Hash|v1.Layer]error
|
||
|
errors sync.Map
|
||
|
}
|
||
|
|
||
|
func nop() error {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (w *workers) err(digest v1.Hash) error {
|
||
|
v, ok := w.errors.Load(digest)
|
||
|
if !ok || v == nil {
|
||
|
return nil
|
||
|
}
|
||
|
return v.(error)
|
||
|
}
|
||
|
|
||
|
func (w *workers) Do(digest v1.Hash, f func() error) error {
|
||
|
// We don't care if it was loaded or not because the sync.Once will do it for us.
|
||
|
once, _ := w.onces.LoadOrStore(digest, &sync.Once{})
|
||
|
|
||
|
once.(*sync.Once).Do(func() {
|
||
|
w.errors.Store(digest, f())
|
||
|
})
|
||
|
|
||
|
err := w.err(digest)
|
||
|
if err != nil {
|
||
|
// Allow this to be retried by another caller.
|
||
|
w.onces.Delete(digest)
|
||
|
}
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func (w *workers) Stream(layer v1.Layer, f func() error) error {
|
||
|
// We don't care if it was loaded or not because the sync.Once will do it for us.
|
||
|
once, _ := w.onces.LoadOrStore(layer, &sync.Once{})
|
||
|
|
||
|
once.(*sync.Once).Do(func() {
|
||
|
w.errors.Store(layer, f())
|
||
|
})
|
||
|
|
||
|
v, ok := w.errors.Load(layer)
|
||
|
if !ok || v == nil {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
return v.(error)
|
||
|
}
|
||
|
|
||
|
type Pusher struct {
|
||
|
o *options
|
||
|
|
||
|
// map[name.Repository]*repoWriter
|
||
|
writers sync.Map
|
||
|
}
|
||
|
|
||
|
func NewPusher(options ...Option) (*Pusher, error) {
|
||
|
o, err := makeOptions(options...)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
return newPusher(o), nil
|
||
|
}
|
||
|
|
||
|
func newPusher(o *options) *Pusher {
|
||
|
if o.pusher != nil {
|
||
|
return o.pusher
|
||
|
}
|
||
|
return &Pusher{
|
||
|
o: o,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (p *Pusher) writer(ctx context.Context, repo name.Repository, o *options) (*repoWriter, error) {
|
||
|
v, _ := p.writers.LoadOrStore(repo, &repoWriter{
|
||
|
repo: repo,
|
||
|
o: o,
|
||
|
})
|
||
|
rw := v.(*repoWriter)
|
||
|
return rw, rw.init(ctx)
|
||
|
}
|
||
|
|
||
|
func (p *Pusher) Push(ctx context.Context, ref name.Reference, t Taggable) error {
|
||
|
w, err := p.writer(ctx, ref.Context(), p.o)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
return w.writeManifest(ctx, ref, t)
|
||
|
}
|
||
|
|
||
|
func (p *Pusher) Upload(ctx context.Context, repo name.Repository, l v1.Layer) error {
|
||
|
w, err := p.writer(ctx, repo, p.o)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
return w.writeLayer(ctx, l)
|
||
|
}
|
||
|
|
||
|
func (p *Pusher) Delete(ctx context.Context, ref name.Reference) error {
|
||
|
w, err := p.writer(ctx, ref.Context(), p.o)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
u := url.URL{
|
||
|
Scheme: ref.Context().Registry.Scheme(),
|
||
|
Host: ref.Context().RegistryStr(),
|
||
|
Path: fmt.Sprintf("/v2/%s/manifests/%s", ref.Context().RepositoryStr(), ref.Identifier()),
|
||
|
}
|
||
|
|
||
|
req, err := http.NewRequest(http.MethodDelete, u.String(), nil)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
resp, err := w.w.client.Do(req.WithContext(ctx))
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
defer resp.Body.Close()
|
||
|
|
||
|
return transport.CheckError(resp, http.StatusOK, http.StatusAccepted)
|
||
|
|
||
|
// TODO(jason): If the manifest had a `subject`, and if the registry
|
||
|
// doesn't support Referrers, update the index pointed to by the
|
||
|
// subject's fallback tag to remove the descriptor for this manifest.
|
||
|
}
|
||
|
|
||
|
type repoWriter struct {
|
||
|
repo name.Repository
|
||
|
o *options
|
||
|
once sync.Once
|
||
|
|
||
|
w *writer
|
||
|
err error
|
||
|
|
||
|
work *workers
|
||
|
}
|
||
|
|
||
|
// this will run once per repoWriter instance
|
||
|
func (rw *repoWriter) init(ctx context.Context) error {
|
||
|
rw.once.Do(func() {
|
||
|
rw.work = &workers{}
|
||
|
rw.w, rw.err = makeWriter(ctx, rw.repo, nil, rw.o)
|
||
|
})
|
||
|
return rw.err
|
||
|
}
|
||
|
|
||
|
func (rw *repoWriter) writeDeps(ctx context.Context, m manifest) error {
|
||
|
if img, ok := m.(v1.Image); ok {
|
||
|
return rw.writeLayers(ctx, img)
|
||
|
}
|
||
|
|
||
|
if idx, ok := m.(v1.ImageIndex); ok {
|
||
|
return rw.writeChildren(ctx, idx)
|
||
|
}
|
||
|
|
||
|
// This has no deps, not an error (e.g. something you want to just PUT).
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
type describable struct {
|
||
|
desc v1.Descriptor
|
||
|
}
|
||
|
|
||
|
func (d describable) Digest() (v1.Hash, error) {
|
||
|
return d.desc.Digest, nil
|
||
|
}
|
||
|
|
||
|
func (d describable) Size() (int64, error) {
|
||
|
return d.desc.Size, nil
|
||
|
}
|
||
|
|
||
|
func (d describable) MediaType() (types.MediaType, error) {
|
||
|
return d.desc.MediaType, nil
|
||
|
}
|
||
|
|
||
|
type tagManifest struct {
|
||
|
Taggable
|
||
|
partial.Describable
|
||
|
}
|
||
|
|
||
|
func taggableToManifest(t Taggable) (manifest, error) {
|
||
|
if m, ok := t.(manifest); ok {
|
||
|
return m, nil
|
||
|
}
|
||
|
|
||
|
if d, ok := t.(*Descriptor); ok {
|
||
|
if d.MediaType.IsIndex() {
|
||
|
return d.ImageIndex()
|
||
|
}
|
||
|
|
||
|
if d.MediaType.IsImage() {
|
||
|
return d.Image()
|
||
|
}
|
||
|
|
||
|
if d.MediaType.IsSchema1() {
|
||
|
return d.Schema1()
|
||
|
}
|
||
|
|
||
|
return tagManifest{t, describable{d.toDesc()}}, nil
|
||
|
}
|
||
|
|
||
|
desc := v1.Descriptor{
|
||
|
// A reasonable default if Taggable doesn't implement MediaType.
|
||
|
MediaType: types.DockerManifestSchema2,
|
||
|
}
|
||
|
|
||
|
b, err := t.RawManifest()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
if wmt, ok := t.(withMediaType); ok {
|
||
|
desc.MediaType, err = wmt.MediaType()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
desc.Digest, desc.Size, err = v1.SHA256(bytes.NewReader(b))
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
return tagManifest{t, describable{desc}}, nil
|
||
|
}
|
||
|
|
||
|
func (rw *repoWriter) writeManifest(ctx context.Context, ref name.Reference, t Taggable) error {
|
||
|
m, err := taggableToManifest(t)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
needDeps := true
|
||
|
|
||
|
digest, err := m.Digest()
|
||
|
if errors.Is(err, stream.ErrNotComputed) {
|
||
|
if err := rw.writeDeps(ctx, m); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
needDeps = false
|
||
|
|
||
|
digest, err = m.Digest()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
} else if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// This may be a lazy child where we have no ref until digest is computed.
|
||
|
if ref == nil {
|
||
|
ref = rw.repo.Digest(digest.String())
|
||
|
}
|
||
|
|
||
|
// For tags, we want to do this check outside of our Work.Do closure because
|
||
|
// we don't want to dedupe based on the manifest digest.
|
||
|
_, byTag := ref.(name.Tag)
|
||
|
if byTag {
|
||
|
if exists, err := rw.manifestExists(ctx, ref, t); err != nil {
|
||
|
return err
|
||
|
} else if exists {
|
||
|
return nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// The following work.Do will get deduped by digest, so it won't happen unless
|
||
|
// this tag happens to be the first commitManifest to run for that digest.
|
||
|
needPut := byTag
|
||
|
|
||
|
if err := rw.work.Do(digest, func() error {
|
||
|
if !byTag {
|
||
|
if exists, err := rw.manifestExists(ctx, ref, t); err != nil {
|
||
|
return err
|
||
|
} else if exists {
|
||
|
return nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if needDeps {
|
||
|
if err := rw.writeDeps(ctx, m); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
needPut = false
|
||
|
return rw.commitManifest(ctx, ref, m)
|
||
|
}); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if !needPut {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Only runs for tags that got deduped by digest.
|
||
|
return rw.commitManifest(ctx, ref, m)
|
||
|
}
|
||
|
|
||
|
func (rw *repoWriter) writeChildren(ctx context.Context, idx v1.ImageIndex) error {
|
||
|
children, err := partial.Manifests(idx)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
g, ctx := errgroup.WithContext(ctx)
|
||
|
g.SetLimit(rw.o.jobs)
|
||
|
|
||
|
for _, child := range children {
|
||
|
child := child
|
||
|
if err := rw.writeChild(ctx, child, g); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return g.Wait()
|
||
|
}
|
||
|
|
||
|
func (rw *repoWriter) writeChild(ctx context.Context, child partial.Describable, g *errgroup.Group) error {
|
||
|
switch child := child.(type) {
|
||
|
case v1.ImageIndex:
|
||
|
// For recursive index, we want to do a depth-first launching of goroutines
|
||
|
// to avoid deadlocking.
|
||
|
//
|
||
|
// Note that this is rare, so the impact of this should be really small.
|
||
|
return rw.writeManifest(ctx, nil, child)
|
||
|
case v1.Image:
|
||
|
g.Go(func() error {
|
||
|
return rw.writeManifest(ctx, nil, child)
|
||
|
})
|
||
|
case v1.Layer:
|
||
|
g.Go(func() error {
|
||
|
return rw.writeLayer(ctx, child)
|
||
|
})
|
||
|
default:
|
||
|
// This can't happen.
|
||
|
return fmt.Errorf("encountered unknown child: %T", child)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// TODO: Consider caching some representation of the tags/digests in the destination
|
||
|
// repository as a hint to avoid this optimistic check in cases where we will most
|
||
|
// likely have to do a PUT anyway, e.g. if we are overwriting a tag we just wrote.
|
||
|
func (rw *repoWriter) manifestExists(ctx context.Context, ref name.Reference, t Taggable) (bool, error) {
|
||
|
f := &fetcher{
|
||
|
target: ref.Context(),
|
||
|
client: rw.w.client,
|
||
|
}
|
||
|
|
||
|
m, err := taggableToManifest(t)
|
||
|
if err != nil {
|
||
|
return false, err
|
||
|
}
|
||
|
|
||
|
digest, err := m.Digest()
|
||
|
if err != nil {
|
||
|
// Possibly due to streaming layers.
|
||
|
return false, nil
|
||
|
}
|
||
|
got, err := f.headManifest(ctx, ref, allManifestMediaTypes)
|
||
|
if err != nil {
|
||
|
var terr *transport.Error
|
||
|
if errors.As(err, &terr) {
|
||
|
if terr.StatusCode == http.StatusNotFound {
|
||
|
return false, nil
|
||
|
}
|
||
|
|
||
|
// We treat a 403 here as non-fatal because this existence check is an optimization and
|
||
|
// some registries will return a 403 instead of a 404 in certain situations.
|
||
|
// E.g. https://jfrog.atlassian.net/browse/RTFACT-13797
|
||
|
if terr.StatusCode == http.StatusForbidden {
|
||
|
logs.Debug.Printf("manifestExists unexpected 403: %v", err)
|
||
|
return false, nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return false, err
|
||
|
}
|
||
|
|
||
|
if digest != got.Digest {
|
||
|
// Mark that we saw this digest in the registry so we don't have to check it again.
|
||
|
rw.work.Do(got.Digest, nop)
|
||
|
|
||
|
return false, nil
|
||
|
}
|
||
|
|
||
|
if tag, ok := ref.(name.Tag); ok {
|
||
|
logs.Progress.Printf("existing manifest: %s@%s", tag.Identifier(), got.Digest)
|
||
|
} else {
|
||
|
logs.Progress.Print("existing manifest: ", got.Digest)
|
||
|
}
|
||
|
|
||
|
return true, nil
|
||
|
}
|
||
|
|
||
|
func (rw *repoWriter) commitManifest(ctx context.Context, ref name.Reference, m manifest) error {
|
||
|
if rw.o.progress != nil {
|
||
|
size, err := m.Size()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
rw.o.progress.total(size)
|
||
|
}
|
||
|
|
||
|
return rw.w.commitManifest(ctx, m, ref)
|
||
|
}
|
||
|
|
||
|
func (rw *repoWriter) writeLayers(pctx context.Context, img v1.Image) error {
|
||
|
ls, err := img.Layers()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
g, ctx := errgroup.WithContext(pctx)
|
||
|
g.SetLimit(rw.o.jobs)
|
||
|
|
||
|
for _, l := range ls {
|
||
|
l := l
|
||
|
|
||
|
g.Go(func() error {
|
||
|
return rw.writeLayer(ctx, l)
|
||
|
})
|
||
|
}
|
||
|
|
||
|
mt, err := img.MediaType()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if mt.IsSchema1() {
|
||
|
return g.Wait()
|
||
|
}
|
||
|
|
||
|
cl, err := partial.ConfigLayer(img)
|
||
|
if errors.Is(err, stream.ErrNotComputed) {
|
||
|
if err := g.Wait(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
cl, err := partial.ConfigLayer(img)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
return rw.writeLayer(pctx, cl)
|
||
|
} else if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
g.Go(func() error {
|
||
|
return rw.writeLayer(ctx, cl)
|
||
|
})
|
||
|
|
||
|
return g.Wait()
|
||
|
}
|
||
|
|
||
|
func (rw *repoWriter) writeLayer(ctx context.Context, l v1.Layer) error {
|
||
|
// Skip any non-distributable things.
|
||
|
mt, err := l.MediaType()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
if !mt.IsDistributable() && !rw.o.allowNondistributableArtifacts {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
digest, err := l.Digest()
|
||
|
if err != nil {
|
||
|
if errors.Is(err, stream.ErrNotComputed) {
|
||
|
return rw.lazyWriteLayer(ctx, l)
|
||
|
}
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
return rw.work.Do(digest, func() error {
|
||
|
if rw.o.progress != nil {
|
||
|
size, err := l.Size()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
rw.o.progress.total(size)
|
||
|
}
|
||
|
return rw.w.uploadOne(ctx, l)
|
||
|
})
|
||
|
}
|
||
|
|
||
|
func (rw *repoWriter) lazyWriteLayer(ctx context.Context, l v1.Layer) error {
|
||
|
return rw.work.Stream(l, func() error {
|
||
|
if err := rw.w.uploadOne(ctx, l); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// Mark this upload completed.
|
||
|
digest, err := l.Digest()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
rw.work.Do(digest, nop)
|
||
|
|
||
|
if rw.o.progress != nil {
|
||
|
size, err := l.Size()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
rw.o.progress.total(size)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
})
|
||
|
}
|