Fix artifacts v4 backend upload problems (#36805)

* Use base64.RawURLEncoding to avoid equal sign
  * using the nodejs package they seem to get lost
* Support uploads with unspecified length
* Support uploads with a single named blockid
  * without requiring a blockmap

---------

Signed-off-by: wxiaoguang <wxiaoguang@gmail.com>
Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
This commit is contained in:
ChristopherHX
2026-03-05 16:49:01 +01:00
committed by GitHub
parent 5d87bb3d45
commit 867c4af481
10 changed files with 552 additions and 247 deletions
+50 -26
View File
@@ -5,6 +5,7 @@ package storage
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
"net/url" "net/url"
@@ -27,25 +28,32 @@ type LocalStorage struct {
// NewLocalStorage returns a local files // NewLocalStorage returns a local files
func NewLocalStorage(ctx context.Context, config *setting.Storage) (ObjectStorage, error) { func NewLocalStorage(ctx context.Context, config *setting.Storage) (ObjectStorage, error) {
// prepare storage root path
if !filepath.IsAbs(config.Path) { if !filepath.IsAbs(config.Path) {
return nil, fmt.Errorf("LocalStorageConfig.Path should have been prepared by setting/storage.go and should be an absolute path, but not: %q", config.Path) return nil, fmt.Errorf("LocalStorage config.Path should have been prepared by setting/storage.go and should be an absolute path, but not: %q", config.Path)
}
log.Info("Creating new Local Storage at %s", config.Path)
if err := os.MkdirAll(config.Path, os.ModePerm); err != nil {
return nil, err
} }
storageRoot := util.FilePathJoinAbs(config.Path)
if config.TemporaryPath == "" { // prepare storage temporary path
config.TemporaryPath = filepath.Join(config.Path, "tmp") storageTmp := config.TemporaryPath
if storageTmp == "" {
storageTmp = filepath.Join(storageRoot, "tmp")
} }
if !filepath.IsAbs(config.TemporaryPath) { if !filepath.IsAbs(storageTmp) {
return nil, fmt.Errorf("LocalStorageConfig.TemporaryPath should be an absolute path, but not: %q", config.TemporaryPath) return nil, fmt.Errorf("LocalStorage config.TemporaryPath should be an absolute path, but not: %q", config.TemporaryPath)
}
storageTmp = util.FilePathJoinAbs(storageTmp)
// create the storage root if not exist
log.Info("Creating new Local Storage at %s", storageRoot)
if err := os.MkdirAll(storageRoot, os.ModePerm); err != nil {
return nil, err
} }
return &LocalStorage{ return &LocalStorage{
ctx: ctx, ctx: ctx,
dir: config.Path, dir: storageRoot,
tmpdir: config.TemporaryPath, tmpdir: storageTmp,
}, nil }, nil
} }
@@ -108,9 +116,21 @@ func (l *LocalStorage) Stat(path string) (os.FileInfo, error) {
return os.Stat(l.buildLocalPath(path)) return os.Stat(l.buildLocalPath(path))
} }
// Delete delete a file func (l *LocalStorage) deleteEmptyParentDirs(localFullPath string) {
for parent := filepath.Dir(localFullPath); len(parent) > len(l.dir); parent = filepath.Dir(parent) {
if err := os.Remove(parent); err != nil {
// since the target file has been deleted, parent dir error is not related to the file deletion itself.
break
}
}
}
// Delete deletes the file in storage and removes the empty parent directories (if possible)
func (l *LocalStorage) Delete(path string) error { func (l *LocalStorage) Delete(path string) error {
return util.Remove(l.buildLocalPath(path)) localFullPath := l.buildLocalPath(path)
err := util.Remove(localFullPath)
l.deleteEmptyParentDirs(localFullPath)
return err
} }
// URL gets the redirect URL to a file // URL gets the redirect URL to a file
@@ -118,34 +138,38 @@ func (l *LocalStorage) URL(path, name, _ string, reqParams url.Values) (*url.URL
return nil, ErrURLNotSupported return nil, ErrURLNotSupported
} }
func (l *LocalStorage) normalizeWalkError(err error) error {
if errors.Is(err, os.ErrNotExist) {
// ignore it because the file may be deleted during the walk, and we don't care about it
return nil
}
return err
}
// IterateObjects iterates across the objects in the local storage // IterateObjects iterates across the objects in the local storage
func (l *LocalStorage) IterateObjects(dirName string, fn func(path string, obj Object) error) error { func (l *LocalStorage) IterateObjects(dirName string, fn func(path string, obj Object) error) error {
dir := l.buildLocalPath(dirName) dir := l.buildLocalPath(dirName)
return filepath.WalkDir(dir, func(path string, d os.DirEntry, err error) error { return filepath.WalkDir(dir, func(path string, d os.DirEntry, errWalk error) error {
if err != nil { if err := l.ctx.Err(); err != nil {
return err return err
} }
select { if errWalk != nil {
case <-l.ctx.Done(): return l.normalizeWalkError(errWalk)
return l.ctx.Err()
default:
} }
if path == l.dir { if path == l.dir || d.IsDir() {
return nil
}
if d.IsDir() {
return nil return nil
} }
relPath, err := filepath.Rel(l.dir, path) relPath, err := filepath.Rel(l.dir, path)
if err != nil { if err != nil {
return err return l.normalizeWalkError(err)
} }
obj, err := os.Open(path) obj, err := os.Open(path)
if err != nil { if err != nil {
return err return l.normalizeWalkError(err)
} }
defer obj.Close() defer obj.Close()
return fn(relPath, obj) return fn(filepath.ToSlash(relPath), obj)
}) })
} }
+46
View File
@@ -4,11 +4,14 @@
package storage package storage
import ( import (
"os"
"strings"
"testing" "testing"
"code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/setting"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func TestBuildLocalPath(t *testing.T) { func TestBuildLocalPath(t *testing.T) {
@@ -53,6 +56,49 @@ func TestBuildLocalPath(t *testing.T) {
} }
} }
func TestLocalStorageDelete(t *testing.T) {
rootDir := t.TempDir()
st, err := NewLocalStorage(t.Context(), &setting.Storage{Path: rootDir})
require.NoError(t, err)
assertExists := func(t *testing.T, path string, exists bool) {
_, err = os.Stat(rootDir + "/" + path)
if exists {
require.NoError(t, err)
} else {
require.ErrorIs(t, err, os.ErrNotExist)
}
}
_, err = st.Save("dir/sub1/1-a.txt", strings.NewReader(""), -1)
require.NoError(t, err)
_, err = st.Save("dir/sub1/1-b.txt", strings.NewReader(""), -1)
require.NoError(t, err)
_, err = st.Save("dir/sub2/2-a.txt", strings.NewReader(""), -1)
require.NoError(t, err)
assertExists(t, "dir/sub1/1-a.txt", true)
assertExists(t, "dir/sub1/1-b.txt", true)
assertExists(t, "dir/sub2/2-a.txt", true)
require.NoError(t, st.Delete("dir/sub1/1-a.txt"))
assertExists(t, "dir/sub1", true)
assertExists(t, "dir/sub1/1-a.txt", false)
assertExists(t, "dir/sub1/1-b.txt", true)
assertExists(t, "dir/sub2/2-a.txt", true)
require.NoError(t, st.Delete("dir/sub1/1-b.txt"))
assertExists(t, ".", true)
assertExists(t, "dir/sub1", false)
assertExists(t, "dir/sub1/1-a.txt", false)
assertExists(t, "dir/sub1/1-b.txt", false)
assertExists(t, "dir/sub2/2-a.txt", true)
require.NoError(t, st.Delete("dir/sub2/2-a.txt"))
assertExists(t, ".", true)
assertExists(t, "dir", false)
}
func TestLocalStorageIterator(t *testing.T) { func TestLocalStorageIterator(t *testing.T) {
testStorageIterator(t, setting.LocalStorageType, &setting.Storage{Path: t.TempDir()}) testStorageIterator(t, setting.LocalStorageType, &setting.Storage{Path: t.TempDir()})
} }
+6 -1
View File
@@ -68,7 +68,12 @@ type ObjectStorage interface {
Stat(path string) (os.FileInfo, error) Stat(path string) (os.FileInfo, error)
Delete(path string) error Delete(path string) error
URL(path, name, method string, reqParams url.Values) (*url.URL, error) URL(path, name, method string, reqParams url.Values) (*url.URL, error)
IterateObjects(path string, iterator func(path string, obj Object) error) error
// IterateObjects calls the iterator function for each object in the storage with the given path as prefix
// The "fullPath" argument in callback is the full path in this storage.
// * IterateObjects("", ...): iterate all objects in this storage
// * IterateObjects("sub-path", ...): iterate all objects with "sub-path" as prefix in this storage, the "fullPath" will be like "sub-path/xxx"
IterateObjects(basePath string, iterator func(fullPath string, obj Object) error) error
} }
// Copy copies a file from source ObjectStorage to dest ObjectStorage // Copy copies a file from source ObjectStorage to dest ObjectStorage
+12 -10
View File
@@ -75,19 +75,21 @@ const filepathSeparator = string(os.PathSeparator)
// {`/foo`, ``, `bar`} => `/foo/bar` // {`/foo`, ``, `bar`} => `/foo/bar`
// {`/foo`, `..`, `bar`} => `/foo/bar` // {`/foo`, `..`, `bar`} => `/foo/bar`
func FilePathJoinAbs(base string, sub ...string) string { func FilePathJoinAbs(base string, sub ...string) string {
elems := make([]string, 1, len(sub)+1)
// POSIX filesystem can have `\` in file names. Windows: `\` and `/` are both used for path separators // POSIX filesystem can have `\` in file names. Windows: `\` and `/` are both used for path separators
// to keep the behavior consistent, we do not allow `\` in file names, replace all `\` with `/` // to keep the behavior consistent, we do not allow `\` in file names, replace all `\` with `/`
if isOSWindows() { if !isOSWindows() {
elems[0] = filepath.Clean(base) base = strings.ReplaceAll(base, "\\", filepathSeparator)
} else {
elems[0] = filepath.Clean(strings.ReplaceAll(base, "\\", filepathSeparator))
} }
if !filepath.IsAbs(elems[0]) { if !filepath.IsAbs(base) {
// This shouldn't happen. If there is really necessary to pass in relative path, return the full path with filepath.Abs() instead // This shouldn't happen. If it is really necessary to handle relative paths, use filepath.Abs() to get absolute paths first
panic(fmt.Sprintf("FilePathJoinAbs: %q (for path %v) is not absolute, do not guess a relative path based on current working directory", elems[0], elems)) panic(fmt.Sprintf("FilePathJoinAbs: %q (for path %v) is not absolute, do not guess a relative path based on current working directory", base, sub))
} }
if len(sub) == 0 {
return filepath.Clean(base)
}
elems := make([]string, 1, len(sub)+1)
elems[0] = base
for _, s := range sub { for _, s := range sub {
if s == "" { if s == "" {
continue continue
@@ -98,7 +100,7 @@ func FilePathJoinAbs(base string, sub ...string) string {
elems = append(elems, filepath.Clean(filepathSeparator+strings.ReplaceAll(s, "\\", filepathSeparator))) elems = append(elems, filepath.Clean(filepathSeparator+strings.ReplaceAll(s, "\\", filepathSeparator)))
} }
} }
// the elems[0] must be an absolute path, just join them together // the elems[0] must be an absolute path, just join them together, and Join will also do Clean
return filepath.Join(elems...) return filepath.Join(elems...)
} }
+4 -4
View File
@@ -241,7 +241,7 @@ func (ar artifactRoutes) uploadArtifact(ctx *ArtifactContext) {
} }
// get upload file size // get upload file size
fileRealTotalSize, contentLength := getUploadFileSize(ctx) fileRealTotalSize := getUploadFileSize(ctx)
// get artifact retention days // get artifact retention days
expiredDays := setting.Actions.ArtifactRetentionDays expiredDays := setting.Actions.ArtifactRetentionDays
@@ -265,17 +265,17 @@ func (ar artifactRoutes) uploadArtifact(ctx *ArtifactContext) {
return return
} }
// save chunk to storage, if success, return chunk stotal size // save chunk to storage, if success, return chunks total size
// if artifact is not gzip when uploading, chunksTotalSize == fileRealTotalSize // if artifact is not gzip when uploading, chunksTotalSize == fileRealTotalSize
// if artifact is gzip when uploading, chunksTotalSize < fileRealTotalSize // if artifact is gzip when uploading, chunksTotalSize < fileRealTotalSize
chunksTotalSize, err := saveUploadChunk(ar.fs, ctx, artifact, contentLength, runID) chunksTotalSize, err := saveUploadChunkV3GetTotalSize(ar.fs, ctx, artifact, runID)
if err != nil { if err != nil {
log.Error("Error save upload chunk: %v", err) log.Error("Error save upload chunk: %v", err)
ctx.HTTPError(http.StatusInternalServerError, "Error save upload chunk") ctx.HTTPError(http.StatusInternalServerError, "Error save upload chunk")
return return
} }
// update artifact size if zero or not match, over write artifact size // update artifact size if zero or not match, overwrite artifact size
if artifact.FileSize == 0 || if artifact.FileSize == 0 ||
artifact.FileCompressedSize == 0 || artifact.FileCompressedSize == 0 ||
artifact.FileSize != fileRealTotalSize || artifact.FileSize != fileRealTotalSize ||
+182 -94
View File
@@ -12,7 +12,7 @@ import (
"fmt" "fmt"
"hash" "hash"
"io" "io"
"path/filepath" "path"
"sort" "sort"
"strings" "strings"
"time" "time"
@@ -20,18 +20,73 @@ import (
"code.gitea.io/gitea/models/actions" "code.gitea.io/gitea/models/actions"
"code.gitea.io/gitea/models/db" "code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/storage" "code.gitea.io/gitea/modules/storage"
) )
func saveUploadChunkBase(st storage.ObjectStorage, ctx *ArtifactContext, type saveUploadChunkOptions struct {
artifact *actions.ActionArtifact, start int64
contentSize, runID, start, end, length int64, checkMd5 bool, end *int64
) (int64, error) { checkMd5 bool
}
func makeTmpPathNameV3(runID int64) string {
return fmt.Sprintf("tmp-upload/run-%d", runID)
}
func makeTmpPathNameV4(runID int64) string {
return fmt.Sprintf("tmp-upload/run-%d-v4", runID)
}
func makeChunkFilenameV3(runID, artifactID, start int64, endPtr *int64) string {
var end int64
if endPtr != nil {
end = *endPtr
}
return fmt.Sprintf("%d-%d-%d-%d.chunk", runID, artifactID, start, end)
}
func parseChunkFileItemV3(st storage.ObjectStorage, fpath string) (*chunkFileItem, error) {
baseName := path.Base(fpath)
if !strings.HasSuffix(baseName, ".chunk") {
return nil, errSkipChunkFile
}
var item chunkFileItem
var unusedRunID int64
if _, err := fmt.Sscanf(baseName, "%d-%d-%d-%d.chunk", &unusedRunID, &item.ArtifactID, &item.Start, &item.End); err != nil {
return nil, err
}
item.Path = fpath
if item.End == 0 {
fi, err := st.Stat(item.Path)
if err != nil {
return nil, err
}
item.Size = fi.Size()
item.End = item.Start + item.Size - 1
} else {
item.Size = item.End - item.Start + 1
}
return &item, nil
}
func saveUploadChunkV3(st storage.ObjectStorage, ctx *ArtifactContext, artifact *actions.ActionArtifact,
runID int64, opts saveUploadChunkOptions,
) (writtenSize int64, retErr error) {
// build chunk store path // build chunk store path
storagePath := fmt.Sprintf("tmp%d/%d-%d-%d-%d.chunk", runID, runID, artifact.ID, start, end) storagePath := fmt.Sprintf("%s/%s", makeTmpPathNameV3(runID), makeChunkFilenameV3(runID, artifact.ID, opts.start, opts.end))
// "end" is optional, so "contentSize=-1" means read until EOF
contentSize := int64(-1)
if opts.end != nil {
contentSize = *opts.end - opts.start + 1
}
var r io.Reader = ctx.Req.Body var r io.Reader = ctx.Req.Body
var hasher hash.Hash var hasher hash.Hash
if checkMd5 { if opts.checkMd5 {
// use io.TeeReader to avoid reading all body to md5 sum. // use io.TeeReader to avoid reading all body to md5 sum.
// it writes data to hasher after reading end // it writes data to hasher after reading end
// if hash is not matched, delete the read-end result // if hash is not matched, delete the read-end result
@@ -41,76 +96,81 @@ func saveUploadChunkBase(st storage.ObjectStorage, ctx *ArtifactContext,
// save chunk to storage // save chunk to storage
writtenSize, err := st.Save(storagePath, r, contentSize) writtenSize, err := st.Save(storagePath, r, contentSize)
if err != nil { if err != nil {
return -1, fmt.Errorf("save chunk to storage error: %v", err) return 0, fmt.Errorf("save chunk to storage error: %v", err)
} }
var checkErr error
if checkMd5 { defer func() {
if retErr != nil {
if err := st.Delete(storagePath); err != nil {
log.Error("Error deleting chunk: %s, %v", storagePath, err)
}
}
}()
if contentSize != -1 && writtenSize != contentSize {
return writtenSize, fmt.Errorf("writtenSize %d does not match contentSize %d", writtenSize, contentSize)
}
if opts.checkMd5 {
// check md5 // check md5
reqMd5String := ctx.Req.Header.Get(artifactXActionsResultsMD5Header) reqMd5String := ctx.Req.Header.Get(artifactXActionsResultsMD5Header)
chunkMd5String := base64.StdEncoding.EncodeToString(hasher.Sum(nil)) chunkMd5String := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
log.Info("[artifact] check chunk md5, sum: %s, header: %s", chunkMd5String, reqMd5String) log.Debug("[artifact] check chunk md5, sum: %s, header: %s", chunkMd5String, reqMd5String)
// if md5 not match, delete the chunk // if md5 not match, delete the chunk
if reqMd5String != chunkMd5String { if reqMd5String != chunkMd5String {
checkErr = errors.New("md5 not match") return writtenSize, errors.New("md5 not match")
} }
} }
if writtenSize != contentSize { log.Debug("[artifact] save chunk %s, size: %d, artifact id: %d, start: %d, size: %d", storagePath, writtenSize, artifact.ID, opts.start, contentSize)
checkErr = errors.Join(checkErr, fmt.Errorf("writtenSize %d not match contentSize %d", writtenSize, contentSize)) return writtenSize, nil
}
if checkErr != nil {
if err := st.Delete(storagePath); err != nil {
log.Error("Error deleting chunk: %s, %v", storagePath, err)
}
return -1, checkErr
}
log.Info("[artifact] save chunk %s, size: %d, artifact id: %d, start: %d, end: %d",
storagePath, contentSize, artifact.ID, start, end)
// return chunk total size
return length, nil
} }
func saveUploadChunk(st storage.ObjectStorage, ctx *ArtifactContext, func saveUploadChunkV3GetTotalSize(st storage.ObjectStorage, ctx *ArtifactContext, artifact *actions.ActionArtifact, runID int64) (totalSize int64, _ error) {
artifact *actions.ActionArtifact,
contentSize, runID int64,
) (int64, error) {
// parse content-range header, format: bytes 0-1023/146515 // parse content-range header, format: bytes 0-1023/146515
contentRange := ctx.Req.Header.Get("Content-Range") contentRange := ctx.Req.Header.Get("Content-Range")
start, end, length := int64(0), int64(0), int64(0) var start, end int64
if _, err := fmt.Sscanf(contentRange, "bytes %d-%d/%d", &start, &end, &length); err != nil { if _, err := fmt.Sscanf(contentRange, "bytes %d-%d/%d", &start, &end, &totalSize); err != nil {
log.Warn("parse content range error: %v, content-range: %s", err, contentRange) return 0, fmt.Errorf("parse content range error: %v", err)
return -1, fmt.Errorf("parse content range error: %v", err)
} }
return saveUploadChunkBase(st, ctx, artifact, contentSize, runID, start, end, length, true) _, err := saveUploadChunkV3(st, ctx, artifact, runID, saveUploadChunkOptions{start: start, end: &end, checkMd5: true})
if err != nil {
return 0, err
}
return totalSize, nil
} }
func appendUploadChunk(st storage.ObjectStorage, ctx *ArtifactContext, // Returns uploaded length
artifact *actions.ActionArtifact, func appendUploadChunkV3(st storage.ObjectStorage, ctx *ArtifactContext, artifact *actions.ActionArtifact, runID, start int64) (int64, error) {
start, contentSize, runID int64, opts := saveUploadChunkOptions{start: start}
) (int64, error) { if ctx.Req.ContentLength > 0 {
end := start + contentSize - 1 end := start + ctx.Req.ContentLength - 1
return saveUploadChunkBase(st, ctx, artifact, contentSize, runID, start, end, contentSize, false) opts.end = &end
}
return saveUploadChunkV3(st, ctx, artifact, runID, opts)
} }
type chunkFileItem struct { type chunkFileItem struct {
RunID int64
ArtifactID int64 ArtifactID int64
Start int64
End int64
Path string Path string
// these offset/size related fields might be missing when parsing, they will be filled in the listing functions
Size int64
Start int64
End int64 // inclusive: Size=10, Start=0, End=9
ChunkName string // v4 only
} }
func listChunksByRunID(st storage.ObjectStorage, runID int64) (map[int64][]*chunkFileItem, error) { func listV3UnorderedChunksMapByRunID(st storage.ObjectStorage, runID int64) (map[int64][]*chunkFileItem, error) {
storageDir := fmt.Sprintf("tmp%d", runID) storageDir := makeTmpPathNameV3(runID)
var chunks []*chunkFileItem var chunks []*chunkFileItem
if err := st.IterateObjects(storageDir, func(fpath string, obj storage.Object) error { if err := st.IterateObjects(storageDir, func(fpath string, obj storage.Object) error {
baseName := filepath.Base(fpath) item, err := parseChunkFileItemV3(st, fpath)
// when read chunks from storage, it only contains storage dir and basename, if errors.Is(err, errSkipChunkFile) {
// no matter the subdirectory setting in storage config return nil
item := chunkFileItem{Path: storageDir + "/" + baseName} } else if err != nil {
if _, err := fmt.Sscanf(baseName, "%d-%d-%d-%d.chunk", &item.RunID, &item.ArtifactID, &item.Start, &item.End); err != nil { return fmt.Errorf("unable to parse chunk name: %v", fpath)
return fmt.Errorf("parse content range error: %v", err)
} }
chunks = append(chunks, &item) chunks = append(chunks, item)
return nil return nil
}); err != nil { }); err != nil {
return nil, err return nil, err
@@ -123,52 +183,78 @@ func listChunksByRunID(st storage.ObjectStorage, runID int64) (map[int64][]*chun
return chunksMap, nil return chunksMap, nil
} }
func listChunksByRunIDV4(st storage.ObjectStorage, runID, artifactID int64, blist *BlockList) ([]*chunkFileItem, error) { func listOrderedChunksForArtifact(st storage.ObjectStorage, runID, artifactID int64, blist *BlockList) ([]*chunkFileItem, error) {
storageDir := fmt.Sprintf("tmpv4%d", runID) emptyListAsError := func(chunks []*chunkFileItem) ([]*chunkFileItem, error) {
var chunks []*chunkFileItem if len(chunks) == 0 {
chunkMap := map[string]*chunkFileItem{} return nil, fmt.Errorf("no chunk found for artifact id: %d", artifactID)
dummy := &chunkFileItem{} }
for _, name := range blist.Latest { return chunks, nil
chunkMap[name] = dummy
} }
storageDir := makeTmpPathNameV4(runID)
var chunks []*chunkFileItem
var chunkMapV4 map[string]*chunkFileItem
if blist != nil {
// make a dummy map for quick lookup of chunk names, the values are nil now and will be filled after iterating storage objects
chunkMapV4 = map[string]*chunkFileItem{}
for _, name := range blist.Latest {
chunkMapV4[name] = nil
}
}
if err := st.IterateObjects(storageDir, func(fpath string, obj storage.Object) error { if err := st.IterateObjects(storageDir, func(fpath string, obj storage.Object) error {
baseName := filepath.Base(fpath) item, err := parseChunkFileItemV4(st, artifactID, fpath)
if !strings.HasPrefix(baseName, "block-") { if errors.Is(err, errSkipChunkFile) {
return nil return nil
} else if err != nil {
return fmt.Errorf("unable to parse chunk name: %v", fpath)
} }
// when read chunks from storage, it only contains storage dir and basename,
// no matter the subdirectory setting in storage config // Single chunk upload with block id
item := chunkFileItem{Path: storageDir + "/" + baseName, ArtifactID: artifactID} if _, ok := chunkMapV4[item.ChunkName]; ok {
var size int64 chunkMapV4[item.ChunkName] = item
var b64chunkName string } else if chunkMapV4 == nil {
if _, err := fmt.Sscanf(baseName, "block-%d-%d-%s", &item.RunID, &size, &b64chunkName); err != nil { if chunks != nil {
return fmt.Errorf("parse content range error: %v", err) return errors.New("blockmap is required for chunks > 1")
} }
rchunkName, err := base64.URLEncoding.DecodeString(b64chunkName) chunks = []*chunkFileItem{item}
if err != nil {
return fmt.Errorf("failed to parse chunkName: %v", err)
}
chunkName := string(rchunkName)
item.End = item.Start + size - 1
if _, ok := chunkMap[chunkName]; ok {
chunkMap[chunkName] = &item
} }
return nil return nil
}); err != nil { }); err != nil {
return nil, err return nil, err
} }
for i, name := range blist.Latest {
chunk, ok := chunkMap[name] if blist == nil && chunks == nil {
if !ok || chunk.Path == "" { chunkUnorderedItemsMapV3, err := listV3UnorderedChunksMapByRunID(st, runID)
return nil, fmt.Errorf("missing Chunk (%d/%d): %s", i, len(blist.Latest), name) if err != nil {
return nil, err
} }
chunks = append(chunks, chunk) chunks = chunkUnorderedItemsMapV3[artifactID]
if i > 0 { sort.Slice(chunks, func(i, j int) bool {
chunk.Start = chunkMap[blist.Latest[i-1]].End + 1 return chunks[i].Start < chunks[j].Start
chunk.End += chunk.Start })
return emptyListAsError(chunks)
}
if len(chunks) == 0 && blist != nil {
for i, name := range blist.Latest {
chunk := chunkMapV4[name]
if chunk == nil {
return nil, fmt.Errorf("missing chunk (%d/%d): %s", i, len(blist.Latest), name)
}
chunks = append(chunks, chunk)
} }
} }
return chunks, nil for i, chunk := range chunks {
if i == 0 {
chunk.End += chunk.Size - 1
} else {
chunk.Start = chunkMapV4[blist.Latest[i-1]].End + 1
chunk.End = chunk.Start + chunk.Size - 1
}
}
return emptyListAsError(chunks)
} }
func mergeChunksForRun(ctx *ArtifactContext, st storage.ObjectStorage, runID int64, artifactName string) error { func mergeChunksForRun(ctx *ArtifactContext, st storage.ObjectStorage, runID int64, artifactName string) error {
@@ -181,13 +267,13 @@ func mergeChunksForRun(ctx *ArtifactContext, st storage.ObjectStorage, runID int
return err return err
} }
// read all uploading chunks from storage // read all uploading chunks from storage
chunksMap, err := listChunksByRunID(st, runID) unorderedChunksMap, err := listV3UnorderedChunksMapByRunID(st, runID)
if err != nil { if err != nil {
return err return err
} }
// range db artifacts to merge chunks // range db artifacts to merge chunks
for _, art := range artifacts { for _, art := range artifacts {
chunks, ok := chunksMap[art.ID] chunks, ok := unorderedChunksMap[art.ID]
if !ok { if !ok {
log.Debug("artifact %d chunks not found", art.ID) log.Debug("artifact %d chunks not found", art.ID)
continue continue
@@ -239,12 +325,14 @@ func mergeChunksForArtifact(ctx *ArtifactContext, chunks []*chunkFileItem, st st
} }
mergedReader := io.MultiReader(readers...) mergedReader := io.MultiReader(readers...)
shaPrefix := "sha256:" shaPrefix := "sha256:"
var hash hash.Hash var hashSha256 hash.Hash
if strings.HasPrefix(checksum, shaPrefix) { if strings.HasPrefix(checksum, shaPrefix) {
hash = sha256.New() hashSha256 = sha256.New()
} else if checksum != "" {
setting.PanicInDevOrTesting("unsupported checksum format: %s, will skip the checksum verification", checksum)
} }
if hash != nil { if hashSha256 != nil {
mergedReader = io.TeeReader(mergedReader, hash) mergedReader = io.TeeReader(mergedReader, hashSha256)
} }
// if chunk is gzip, use gz as extension // if chunk is gzip, use gz as extension
@@ -274,8 +362,8 @@ func mergeChunksForArtifact(ctx *ArtifactContext, chunks []*chunkFileItem, st st
} }
}() }()
if hash != nil { if hashSha256 != nil {
rawChecksum := hash.Sum(nil) rawChecksum := hashSha256.Sum(nil)
actualChecksum := hex.EncodeToString(rawChecksum) actualChecksum := hex.EncodeToString(rawChecksum)
if !strings.HasSuffix(checksum, actualChecksum) { if !strings.HasSuffix(checksum, actualChecksum) {
return fmt.Errorf("update artifact error checksum is invalid %v vs %v", checksum, actualChecksum) return fmt.Errorf("update artifact error checksum is invalid %v vs %v", checksum, actualChecksum)
+5 -6
View File
@@ -20,8 +20,8 @@ const (
artifactXActionsResultsMD5Header = "x-actions-results-md5" artifactXActionsResultsMD5Header = "x-actions-results-md5"
) )
// The rules are from https://github.com/actions/toolkit/blob/main/packages/artifact/src/internal/path-and-artifact-name-validation.ts#L32 // The rules are from https://github.com/actions/toolkit/blob/main/packages/artifact/src/internal/upload/path-and-artifact-name-validation.ts
var invalidArtifactNameChars = strings.Join([]string{"\\", "/", "\"", ":", "<", ">", "|", "*", "?", "\r", "\n"}, "") const invalidArtifactNameChars = "\\/\":<>|*?\r\n"
func validateArtifactName(ctx *ArtifactContext, artifactName string) bool { func validateArtifactName(ctx *ArtifactContext, artifactName string) bool {
if strings.ContainsAny(artifactName, invalidArtifactNameChars) { if strings.ContainsAny(artifactName, invalidArtifactNameChars) {
@@ -84,11 +84,10 @@ func parseArtifactItemPath(ctx *ArtifactContext) (string, string, bool) {
// getUploadFileSize returns the size of the file to be uploaded. // getUploadFileSize returns the size of the file to be uploaded.
// The raw size is the size of the file as reported by the header X-TFS-FileLength. // The raw size is the size of the file as reported by the header X-TFS-FileLength.
func getUploadFileSize(ctx *ArtifactContext) (int64, int64) { func getUploadFileSize(ctx *ArtifactContext) int64 {
contentLength := ctx.Req.ContentLength
xTfsLength, _ := strconv.ParseInt(ctx.Req.Header.Get(artifactXTfsFileLengthHeader), 10, 64) xTfsLength, _ := strconv.ParseInt(ctx.Req.Header.Get(artifactXTfsFileLengthHeader), 10, 64)
if xTfsLength > 0 { if xTfsLength > 0 {
return xTfsLength, contentLength return xTfsLength
} }
return contentLength, contentLength return ctx.Req.ContentLength
} }
+115 -70
View File
@@ -90,10 +90,12 @@ import (
"crypto/sha256" "crypto/sha256"
"encoding/base64" "encoding/base64"
"encoding/xml" "encoding/xml"
"errors"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"net/url" "net/url"
"path"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@@ -109,7 +111,7 @@ import (
"code.gitea.io/gitea/services/context" "code.gitea.io/gitea/services/context"
"google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/encoding/protojson"
protoreflect "google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/timestamppb"
) )
@@ -157,33 +159,81 @@ func ArtifactsV4Routes(prefix string) *web.Router {
return m return m
} }
func (r artifactV4Routes) buildSignature(endp, expires, artifactName string, taskID, artifactID int64) []byte { func (r *artifactV4Routes) buildSignature(endpoint, expires, artifactName string, taskID, artifactID int64) []byte {
mac := hmac.New(sha256.New, setting.GetGeneralTokenSigningSecret()) mac := hmac.New(sha256.New, setting.GetGeneralTokenSigningSecret())
mac.Write([]byte(endp)) mac.Write([]byte(endpoint))
mac.Write([]byte(expires)) mac.Write([]byte(expires))
mac.Write([]byte(artifactName)) mac.Write([]byte(artifactName))
fmt.Fprint(mac, taskID) _, _ = fmt.Fprint(mac, taskID)
fmt.Fprint(mac, artifactID) _, _ = fmt.Fprint(mac, artifactID)
return mac.Sum(nil) return mac.Sum(nil)
} }
func (r artifactV4Routes) buildArtifactURL(ctx *ArtifactContext, endp, artifactName string, taskID, artifactID int64) string { func (r *artifactV4Routes) buildArtifactURL(ctx *ArtifactContext, endpoint, artifactName string, taskID, artifactID int64) string {
expires := time.Now().Add(60 * time.Minute).Format("2006-01-02 15:04:05.999999999 -0700 MST") expires := time.Now().Add(60 * time.Minute).Format("2006-01-02 15:04:05.999999999 -0700 MST")
uploadURL := strings.TrimSuffix(httplib.GuessCurrentAppURL(ctx), "/") + strings.TrimSuffix(r.prefix, "/") + uploadURL := strings.TrimSuffix(httplib.GuessCurrentAppURL(ctx), "/") + strings.TrimSuffix(r.prefix, "/") +
"/" + endp + "?sig=" + base64.URLEncoding.EncodeToString(r.buildSignature(endp, expires, artifactName, taskID, artifactID)) + "&expires=" + url.QueryEscape(expires) + "&artifactName=" + url.QueryEscape(artifactName) + "&taskID=" + strconv.FormatInt(taskID, 10) + "&artifactID=" + strconv.FormatInt(artifactID, 10) "/" + endpoint +
"?sig=" + base64.RawURLEncoding.EncodeToString(r.buildSignature(endpoint, expires, artifactName, taskID, artifactID)) +
"&expires=" + url.QueryEscape(expires) +
"&artifactName=" + url.QueryEscape(artifactName) +
"&taskID=" + strconv.FormatInt(taskID, 10) +
"&artifactID=" + strconv.FormatInt(artifactID, 10)
return uploadURL return uploadURL
} }
func (r artifactV4Routes) verifySignature(ctx *ArtifactContext, endp string) (*actions.ActionTask, string, bool) { func makeBlockFilenameV4(runID, artifactID, size int64, blockID string) string {
sizeInName := max(size, 0) // do not use "-1" in filename
return fmt.Sprintf("block-%d-%d-%d-%s", runID, artifactID, sizeInName, base64.URLEncoding.EncodeToString([]byte(blockID)))
}
var errSkipChunkFile = errors.New("skip this chunk file")
func parseChunkFileItemV4(st storage.ObjectStorage, artifactID int64, fpath string) (*chunkFileItem, error) {
baseName := path.Base(fpath)
if !strings.HasPrefix(baseName, "block-") {
return nil, errSkipChunkFile
}
var item chunkFileItem
var unusedRunID int64
var b64chunkName string
_, err := fmt.Sscanf(baseName, "block-%d-%d-%d-%s", &unusedRunID, &item.ArtifactID, &item.Size, &b64chunkName)
if err != nil {
return nil, err
}
if item.ArtifactID != artifactID {
return nil, errSkipChunkFile
}
chunkName, err := base64.URLEncoding.DecodeString(b64chunkName)
if err != nil {
return nil, err
}
item.ChunkName = string(chunkName)
item.Path = fpath
if item.Size <= 0 {
fi, err := st.Stat(item.Path)
if err != nil {
return nil, err
}
item.Size = fi.Size()
}
return &item, nil
}
func (r *artifactV4Routes) verifySignature(ctx *ArtifactContext, endp string) (*actions.ActionTask, string, bool) {
rawTaskID := ctx.Req.URL.Query().Get("taskID") rawTaskID := ctx.Req.URL.Query().Get("taskID")
rawArtifactID := ctx.Req.URL.Query().Get("artifactID") rawArtifactID := ctx.Req.URL.Query().Get("artifactID")
sig := ctx.Req.URL.Query().Get("sig") sig := ctx.Req.URL.Query().Get("sig")
expires := ctx.Req.URL.Query().Get("expires") expires := ctx.Req.URL.Query().Get("expires")
artifactName := ctx.Req.URL.Query().Get("artifactName") artifactName := ctx.Req.URL.Query().Get("artifactName")
dsig, _ := base64.URLEncoding.DecodeString(sig) dsig, errSig := base64.RawURLEncoding.DecodeString(sig)
taskID, _ := strconv.ParseInt(rawTaskID, 10, 64) taskID, errTask := strconv.ParseInt(rawTaskID, 10, 64)
artifactID, _ := strconv.ParseInt(rawArtifactID, 10, 64) artifactID, errArtifactID := strconv.ParseInt(rawArtifactID, 10, 64)
err := errors.Join(errSig, errTask, errArtifactID)
if err != nil {
log.Error("Error decoding signature values: %v", err)
ctx.HTTPError(http.StatusBadRequest, "Error decoding signature values")
return nil, "", false
}
expecedsig := r.buildSignature(endp, expires, artifactName, taskID, artifactID) expecedsig := r.buildSignature(endp, expires, artifactName, taskID, artifactID)
if !hmac.Equal(dsig, expecedsig) { if !hmac.Equal(dsig, expecedsig) {
log.Error("Error unauthorized") log.Error("Error unauthorized")
@@ -226,7 +276,7 @@ func (r *artifactV4Routes) getArtifactByName(ctx *ArtifactContext, runID int64,
return &art, nil return &art, nil
} }
func (r *artifactV4Routes) parseProtbufBody(ctx *ArtifactContext, req protoreflect.ProtoMessage) bool { func (r *artifactV4Routes) parseProtobufBody(ctx *ArtifactContext, req protoreflect.ProtoMessage) bool {
body, err := io.ReadAll(ctx.Req.Body) body, err := io.ReadAll(ctx.Req.Body)
if err != nil { if err != nil {
log.Error("Error decode request body: %v", err) log.Error("Error decode request body: %v", err)
@@ -242,7 +292,7 @@ func (r *artifactV4Routes) parseProtbufBody(ctx *ArtifactContext, req protorefle
return true return true
} }
func (r *artifactV4Routes) sendProtbufBody(ctx *ArtifactContext, req protoreflect.ProtoMessage) { func (r *artifactV4Routes) sendProtobufBody(ctx *ArtifactContext, req protoreflect.ProtoMessage) {
resp, err := protojson.Marshal(req) resp, err := protojson.Marshal(req)
if err != nil { if err != nil {
log.Error("Error encode response body: %v", err) log.Error("Error encode response body: %v", err)
@@ -257,7 +307,7 @@ func (r *artifactV4Routes) sendProtbufBody(ctx *ArtifactContext, req protoreflec
func (r *artifactV4Routes) createArtifact(ctx *ArtifactContext) { func (r *artifactV4Routes) createArtifact(ctx *ArtifactContext) {
var req CreateArtifactRequest var req CreateArtifactRequest
if ok := r.parseProtbufBody(ctx, &req); !ok { if ok := r.parseProtobufBody(ctx, &req); !ok {
return return
} }
_, _, ok := validateRunIDV4(ctx, req.WorkflowRunBackendId) _, _, ok := validateRunIDV4(ctx, req.WorkflowRunBackendId)
@@ -291,7 +341,7 @@ func (r *artifactV4Routes) createArtifact(ctx *ArtifactContext) {
Ok: true, Ok: true,
SignedUploadUrl: r.buildArtifactURL(ctx, "UploadArtifact", artifactName, ctx.ActionTask.ID, artifact.ID), SignedUploadUrl: r.buildArtifactURL(ctx, "UploadArtifact", artifactName, ctx.ActionTask.ID, artifact.ID),
} }
r.sendProtbufBody(ctx, &respData) r.sendProtobufBody(ctx, &respData)
} }
func (r *artifactV4Routes) uploadArtifact(ctx *ArtifactContext) { func (r *artifactV4Routes) uploadArtifact(ctx *ArtifactContext) {
@@ -303,34 +353,34 @@ func (r *artifactV4Routes) uploadArtifact(ctx *ArtifactContext) {
comp := ctx.Req.URL.Query().Get("comp") comp := ctx.Req.URL.Query().Get("comp")
switch comp { switch comp {
case "block", "appendBlock": case "block", "appendBlock":
blockid := ctx.Req.URL.Query().Get("blockid") // get artifact by name
if blockid == "" { artifact, err := r.getArtifactByName(ctx, task.Job.RunID, artifactName)
// get artifact by name if err != nil {
artifact, err := r.getArtifactByName(ctx, task.Job.RunID, artifactName) log.Error("Error artifact not found: %v", err)
ctx.HTTPError(http.StatusNotFound, "Error artifact not found")
return
}
blockID := ctx.Req.URL.Query().Get("blockid")
if blockID == "" {
uploadedLength, err := appendUploadChunkV3(r.fs, ctx, artifact, artifact.RunID, artifact.FileSize)
if err != nil { if err != nil {
log.Error("Error artifact not found: %v", err) log.Error("Error appending chunk %v", err)
ctx.HTTPError(http.StatusNotFound, "Error artifact not found") ctx.HTTPError(http.StatusInternalServerError, "Error appending Chunk")
return return
} }
artifact.FileCompressedSize += uploadedLength
_, err = appendUploadChunk(r.fs, ctx, artifact, artifact.FileSize, ctx.Req.ContentLength, artifact.RunID) artifact.FileSize += uploadedLength
if err != nil {
log.Error("Error runner api getting task: task is not running")
ctx.HTTPError(http.StatusInternalServerError, "Error runner api getting task: task is not running")
return
}
artifact.FileCompressedSize += ctx.Req.ContentLength
artifact.FileSize += ctx.Req.ContentLength
if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil { if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil {
log.Error("Error UpdateArtifactByID: %v", err) log.Error("Error UpdateArtifactByID: %v", err)
ctx.HTTPError(http.StatusInternalServerError, "Error UpdateArtifactByID") ctx.HTTPError(http.StatusInternalServerError, "Error UpdateArtifactByID")
return return
} }
} else { } else {
_, err := r.fs.Save(fmt.Sprintf("tmpv4%d/block-%d-%d-%s", task.Job.RunID, task.Job.RunID, ctx.Req.ContentLength, base64.URLEncoding.EncodeToString([]byte(blockid))), ctx.Req.Body, -1) blockFilename := makeBlockFilenameV4(task.Job.RunID, artifact.ID, ctx.Req.ContentLength, blockID)
_, err := r.fs.Save(fmt.Sprintf("%s/%s", makeTmpPathNameV4(task.Job.RunID), blockFilename), ctx.Req.Body, ctx.Req.ContentLength)
if err != nil { if err != nil {
log.Error("Error runner api getting task: task is not running") log.Error("Error uploading block blob %v", err)
ctx.HTTPError(http.StatusInternalServerError, "Error runner api getting task: task is not running") ctx.HTTPError(http.StatusInternalServerError, "Error uploading block blob")
return return
} }
} }
@@ -338,10 +388,10 @@ func (r *artifactV4Routes) uploadArtifact(ctx *ArtifactContext) {
case "blocklist": case "blocklist":
rawArtifactID := ctx.Req.URL.Query().Get("artifactID") rawArtifactID := ctx.Req.URL.Query().Get("artifactID")
artifactID, _ := strconv.ParseInt(rawArtifactID, 10, 64) artifactID, _ := strconv.ParseInt(rawArtifactID, 10, 64)
_, err := r.fs.Save(fmt.Sprintf("tmpv4%d/%d-%d-blocklist", task.Job.RunID, task.Job.RunID, artifactID), ctx.Req.Body, -1) _, err := r.fs.Save(fmt.Sprintf("%s/%d-%d-blocklist", makeTmpPathNameV4(task.Job.RunID), task.Job.RunID, artifactID), ctx.Req.Body, -1)
if err != nil { if err != nil {
log.Error("Error runner api getting task: task is not running") log.Error("Error uploading blocklist %v", err)
ctx.HTTPError(http.StatusInternalServerError, "Error runner api getting task: task is not running") ctx.HTTPError(http.StatusInternalServerError, "Error uploading blocklist")
return return
} }
ctx.JSON(http.StatusCreated, "created") ctx.JSON(http.StatusCreated, "created")
@@ -357,7 +407,7 @@ type Latest struct {
} }
func (r *artifactV4Routes) readBlockList(runID, artifactID int64) (*BlockList, error) { func (r *artifactV4Routes) readBlockList(runID, artifactID int64) (*BlockList, error) {
blockListName := fmt.Sprintf("tmpv4%d/%d-%d-blocklist", runID, runID, artifactID) blockListName := fmt.Sprintf("%s/%d-%d-blocklist", makeTmpPathNameV4(runID), runID, artifactID)
s, err := r.fs.Open(blockListName) s, err := r.fs.Open(blockListName)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -367,17 +417,22 @@ func (r *artifactV4Routes) readBlockList(runID, artifactID int64) (*BlockList, e
blockList := &BlockList{} blockList := &BlockList{}
err = xdec.Decode(blockList) err = xdec.Decode(blockList)
_ = s.Close()
delerr := r.fs.Delete(blockListName) delerr := r.fs.Delete(blockListName)
if delerr != nil { if delerr != nil {
log.Warn("Failed to delete blockList %s: %v", blockListName, delerr) log.Warn("Failed to delete blockList %s: %v", blockListName, delerr)
} }
return blockList, err if err != nil {
return nil, err
}
return blockList, nil
} }
func (r *artifactV4Routes) finalizeArtifact(ctx *ArtifactContext) { func (r *artifactV4Routes) finalizeArtifact(ctx *ArtifactContext) {
var req FinalizeArtifactRequest var req FinalizeArtifactRequest
if ok := r.parseProtbufBody(ctx, &req); !ok { if ok := r.parseProtobufBody(ctx, &req); !ok {
return return
} }
_, runID, ok := validateRunIDV4(ctx, req.WorkflowRunBackendId) _, runID, ok := validateRunIDV4(ctx, req.WorkflowRunBackendId)
@@ -394,30 +449,20 @@ func (r *artifactV4Routes) finalizeArtifact(ctx *ArtifactContext) {
} }
var chunks []*chunkFileItem var chunks []*chunkFileItem
blockList, err := r.readBlockList(runID, artifact.ID) blockList, blockListErr := r.readBlockList(runID, artifact.ID)
chunks, err = listOrderedChunksForArtifact(r.fs, runID, artifact.ID, blockList)
if err != nil { if err != nil {
log.Warn("Failed to read BlockList, fallback to old behavior: %v", err) log.Error("Error list chunks: %v", errors.Join(blockListErr, err))
chunkMap, err := listChunksByRunID(r.fs, runID) ctx.HTTPError(http.StatusInternalServerError, "Error list chunks")
if err != nil { return
log.Error("Error merge chunks: %v", err) }
ctx.HTTPError(http.StatusInternalServerError, "Error merge chunks") artifact.FileSize = chunks[len(chunks)-1].End + 1
return artifact.FileCompressedSize = chunks[len(chunks)-1].End + 1
}
chunks, ok = chunkMap[artifact.ID] if req.Size != artifact.FileSize {
if !ok { log.Error("Error merge chunks size mismatch")
log.Error("Error merge chunks") ctx.HTTPError(http.StatusInternalServerError, "Error merge chunks size mismatch")
ctx.HTTPError(http.StatusInternalServerError, "Error merge chunks") return
return
}
} else {
chunks, err = listChunksByRunIDV4(r.fs, runID, artifact.ID, blockList)
if err != nil {
log.Error("Error merge chunks: %v", err)
ctx.HTTPError(http.StatusInternalServerError, "Error merge chunks")
return
}
artifact.FileSize = chunks[len(chunks)-1].End + 1
artifact.FileCompressedSize = chunks[len(chunks)-1].End + 1
} }
checksum := "" checksum := ""
@@ -434,13 +479,13 @@ func (r *artifactV4Routes) finalizeArtifact(ctx *ArtifactContext) {
Ok: true, Ok: true,
ArtifactId: artifact.ID, ArtifactId: artifact.ID,
} }
r.sendProtbufBody(ctx, &respData) r.sendProtobufBody(ctx, &respData)
} }
func (r *artifactV4Routes) listArtifacts(ctx *ArtifactContext) { func (r *artifactV4Routes) listArtifacts(ctx *ArtifactContext) {
var req ListArtifactsRequest var req ListArtifactsRequest
if ok := r.parseProtbufBody(ctx, &req); !ok { if ok := r.parseProtobufBody(ctx, &req); !ok {
return return
} }
_, runID, ok := validateRunIDV4(ctx, req.WorkflowRunBackendId) _, runID, ok := validateRunIDV4(ctx, req.WorkflowRunBackendId)
@@ -485,13 +530,13 @@ func (r *artifactV4Routes) listArtifacts(ctx *ArtifactContext) {
respData := ListArtifactsResponse{ respData := ListArtifactsResponse{
Artifacts: list, Artifacts: list,
} }
r.sendProtbufBody(ctx, &respData) r.sendProtobufBody(ctx, &respData)
} }
func (r *artifactV4Routes) getSignedArtifactURL(ctx *ArtifactContext) { func (r *artifactV4Routes) getSignedArtifactURL(ctx *ArtifactContext) {
var req GetSignedArtifactURLRequest var req GetSignedArtifactURLRequest
if ok := r.parseProtbufBody(ctx, &req); !ok { if ok := r.parseProtobufBody(ctx, &req); !ok {
return return
} }
_, runID, ok := validateRunIDV4(ctx, req.WorkflowRunBackendId) _, runID, ok := validateRunIDV4(ctx, req.WorkflowRunBackendId)
@@ -525,7 +570,7 @@ func (r *artifactV4Routes) getSignedArtifactURL(ctx *ArtifactContext) {
if respData.SignedUrl == "" { if respData.SignedUrl == "" {
respData.SignedUrl = r.buildArtifactURL(ctx, "DownloadArtifact", artifactName, ctx.ActionTask.ID, artifact.ID) respData.SignedUrl = r.buildArtifactURL(ctx, "DownloadArtifact", artifactName, ctx.ActionTask.ID, artifact.ID)
} }
r.sendProtbufBody(ctx, &respData) r.sendProtobufBody(ctx, &respData)
} }
func (r *artifactV4Routes) downloadArtifact(ctx *ArtifactContext) { func (r *artifactV4Routes) downloadArtifact(ctx *ArtifactContext) {
@@ -555,7 +600,7 @@ func (r *artifactV4Routes) downloadArtifact(ctx *ArtifactContext) {
func (r *artifactV4Routes) deleteArtifact(ctx *ArtifactContext) { func (r *artifactV4Routes) deleteArtifact(ctx *ArtifactContext) {
var req DeleteArtifactRequest var req DeleteArtifactRequest
if ok := r.parseProtbufBody(ctx, &req); !ok { if ok := r.parseProtobufBody(ctx, &req); !ok {
return return
} }
_, runID, ok := validateRunIDV4(ctx, req.WorkflowRunBackendId) _, runID, ok := validateRunIDV4(ctx, req.WorkflowRunBackendId)
@@ -582,5 +627,5 @@ func (r *artifactV4Routes) deleteArtifact(ctx *ArtifactContext) {
Ok: true, Ok: true,
ArtifactId: artifact.ID, ArtifactId: artifact.ID,
} }
r.sendProtbufBody(ctx, &respData) r.sendProtobufBody(ctx, &respData)
} }
+128 -36
View File
@@ -6,6 +6,7 @@ package integration
import ( import (
"bytes" "bytes"
"crypto/sha256" "crypto/sha256"
"encoding/base64"
"encoding/hex" "encoding/hex"
"encoding/xml" "encoding/xml"
"fmt" "fmt"
@@ -22,6 +23,7 @@ import (
"code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/storage" "code.gitea.io/gitea/modules/storage"
api "code.gitea.io/gitea/modules/structs" api "code.gitea.io/gitea/modules/structs"
"code.gitea.io/gitea/modules/util"
"code.gitea.io/gitea/routers/api/actions" "code.gitea.io/gitea/routers/api/actions"
actions_service "code.gitea.io/gitea/services/actions" actions_service "code.gitea.io/gitea/services/actions"
@@ -45,45 +47,135 @@ func TestActionsArtifactV4UploadSingleFile(t *testing.T) {
token, err := actions_service.CreateAuthorizationToken(48, 792, 193) token, err := actions_service.CreateAuthorizationToken(48, 792, 193)
assert.NoError(t, err) assert.NoError(t, err)
// acquire artifact upload url table := []struct {
req := NewRequestWithBody(t, "POST", "/twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact", toProtoJSON(&actions.CreateArtifactRequest{ name string
Version: 4, version int32
Name: "artifact", blockID bool
WorkflowRunBackendId: "792", noLength bool
WorkflowJobRunBackendId: "193", append int
})).AddTokenAuth(token) }{
resp := MakeRequest(t, req, http.StatusOK) {
var uploadResp actions.CreateArtifactResponse name: "artifact",
protojson.Unmarshal(resp.Body.Bytes(), &uploadResp) version: 4,
assert.True(t, uploadResp.Ok) },
assert.Contains(t, uploadResp.SignedUploadUrl, "/twirp/github.actions.results.api.v1.ArtifactService/UploadArtifact") {
name: "artifact2",
version: 4,
blockID: true,
},
{
name: "artifact3",
version: 4,
noLength: true,
},
{
name: "artifact4",
version: 4,
blockID: true,
noLength: true,
},
{
name: "artifact5",
version: 7,
blockID: true,
},
{
name: "artifact6",
version: 7,
append: 2,
noLength: true,
},
{
name: "artifact7",
version: 7,
append: 3,
blockID: true,
noLength: true,
},
{
name: "artifact8",
version: 7,
append: 4,
blockID: true,
},
}
// get upload url for _, entry := range table {
idx := strings.Index(uploadResp.SignedUploadUrl, "/twirp/") t.Run(entry.name, func(t *testing.T) {
url := uploadResp.SignedUploadUrl[idx:] + "&comp=block" // acquire artifact upload url
req := NewRequestWithBody(t, "POST", "/twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact", toProtoJSON(&actions.CreateArtifactRequest{
Version: entry.version,
Name: entry.name,
WorkflowRunBackendId: "792",
WorkflowJobRunBackendId: "193",
})).AddTokenAuth(token)
resp := MakeRequest(t, req, http.StatusOK)
var uploadResp actions.CreateArtifactResponse
protojson.Unmarshal(resp.Body.Bytes(), &uploadResp)
assert.True(t, uploadResp.Ok)
assert.Contains(t, uploadResp.SignedUploadUrl, "/twirp/github.actions.results.api.v1.ArtifactService/UploadArtifact")
// upload artifact chunk h := sha256.New()
body := strings.Repeat("A", 1024)
req = NewRequestWithBody(t, "PUT", url, strings.NewReader(body))
MakeRequest(t, req, http.StatusCreated)
t.Logf("Create artifact confirm") blocks := make([]string, 0, util.Iif(entry.blockID, entry.append+1, 0))
sha := sha256.Sum256([]byte(body)) // get upload url
idx := strings.Index(uploadResp.SignedUploadUrl, "/twirp/")
for i := range entry.append + 1 {
url := uploadResp.SignedUploadUrl[idx:]
// See https://learn.microsoft.com/en-us/rest/api/storageservices/append-block
// See https://learn.microsoft.com/en-us/rest/api/storageservices/put-block
if entry.blockID {
blockID := base64.RawURLEncoding.EncodeToString(fmt.Append([]byte("SOME_BIG_BLOCK_ID_"), i))
blocks = append(blocks, blockID)
url += "&comp=block&blockid=" + blockID
} else {
url += "&comp=appendBlock"
}
// confirm artifact upload // upload artifact chunk
req = NewRequestWithBody(t, "POST", "/twirp/github.actions.results.api.v1.ArtifactService/FinalizeArtifact", toProtoJSON(&actions.FinalizeArtifactRequest{ body := strings.Repeat("A", 1024)
Name: "artifact", _, _ = h.Write([]byte(body))
Size: 1024, var bodyReader io.Reader = strings.NewReader(body)
Hash: wrapperspb.String("sha256:" + hex.EncodeToString(sha[:])), if entry.noLength {
WorkflowRunBackendId: "792", bodyReader = io.MultiReader(bodyReader)
WorkflowJobRunBackendId: "193", }
})). req = NewRequestWithBody(t, "PUT", url, bodyReader)
AddTokenAuth(token) MakeRequest(t, req, http.StatusCreated)
resp = MakeRequest(t, req, http.StatusOK) }
var finalizeResp actions.FinalizeArtifactResponse
protojson.Unmarshal(resp.Body.Bytes(), &finalizeResp) if entry.blockID && entry.append > 0 {
assert.True(t, finalizeResp.Ok) // https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list
blockListURL := uploadResp.SignedUploadUrl[idx:] + "&comp=blocklist"
// upload artifact blockList
blockList := &actions.BlockList{
Latest: blocks,
}
rawBlockList, err := xml.Marshal(blockList)
assert.NoError(t, err)
req = NewRequestWithBody(t, "PUT", blockListURL, bytes.NewReader(rawBlockList))
MakeRequest(t, req, http.StatusCreated)
}
sha := h.Sum(nil)
t.Logf("Create artifact confirm")
// confirm artifact upload
req = NewRequestWithBody(t, "POST", "/twirp/github.actions.results.api.v1.ArtifactService/FinalizeArtifact", toProtoJSON(&actions.FinalizeArtifactRequest{
Name: entry.name,
Size: int64(entry.append+1) * 1024,
Hash: wrapperspb.String("sha256:" + hex.EncodeToString(sha)),
WorkflowRunBackendId: "792",
WorkflowJobRunBackendId: "193",
})).
AddTokenAuth(token)
resp = MakeRequest(t, req, http.StatusOK)
var finalizeResp actions.FinalizeArtifactResponse
protojson.Unmarshal(resp.Body.Bytes(), &finalizeResp)
assert.True(t, finalizeResp.Ok)
})
}
} }
func TestActionsArtifactV4UploadSingleFileWrongChecksum(t *testing.T) { func TestActionsArtifactV4UploadSingleFileWrongChecksum(t *testing.T) {
@@ -312,7 +404,7 @@ func TestActionsArtifactV4DownloadSingle(t *testing.T) {
token, err := actions_service.CreateAuthorizationToken(48, 792, 193) token, err := actions_service.CreateAuthorizationToken(48, 792, 193)
assert.NoError(t, err) assert.NoError(t, err)
// acquire artifact upload url // list artifacts by name
req := NewRequestWithBody(t, "POST", "/twirp/github.actions.results.api.v1.ArtifactService/ListArtifacts", toProtoJSON(&actions.ListArtifactsRequest{ req := NewRequestWithBody(t, "POST", "/twirp/github.actions.results.api.v1.ArtifactService/ListArtifacts", toProtoJSON(&actions.ListArtifactsRequest{
NameFilter: wrapperspb.String("artifact-v4-download"), NameFilter: wrapperspb.String("artifact-v4-download"),
WorkflowRunBackendId: "792", WorkflowRunBackendId: "792",
@@ -323,7 +415,7 @@ func TestActionsArtifactV4DownloadSingle(t *testing.T) {
protojson.Unmarshal(resp.Body.Bytes(), &listResp) protojson.Unmarshal(resp.Body.Bytes(), &listResp)
assert.Len(t, listResp.Artifacts, 1) assert.Len(t, listResp.Artifacts, 1)
// confirm artifact upload // acquire artifact download url
req = NewRequestWithBody(t, "POST", "/twirp/github.actions.results.api.v1.ArtifactService/GetSignedArtifactURL", toProtoJSON(&actions.GetSignedArtifactURLRequest{ req = NewRequestWithBody(t, "POST", "/twirp/github.actions.results.api.v1.ArtifactService/GetSignedArtifactURL", toProtoJSON(&actions.GetSignedArtifactURLRequest{
Name: "artifact-v4-download", Name: "artifact-v4-download",
WorkflowRunBackendId: "792", WorkflowRunBackendId: "792",
+4
View File
@@ -341,6 +341,10 @@ func MakeRequest(t testing.TB, rw *RequestWrapper, expectedStatus int) *httptest
if req.RemoteAddr == "" { if req.RemoteAddr == "" {
req.RemoteAddr = "test-mock:12345" req.RemoteAddr = "test-mock:12345"
} }
// Ensure unknown contentLength is seen as -1
if req.Body != nil && req.ContentLength == 0 {
req.ContentLength = -1
}
testWebRoutes.ServeHTTP(recorder, req) testWebRoutes.ServeHTTP(recorder, req)
if expectedStatus != NoExpectedStatus { if expectedStatus != NoExpectedStatus {
if expectedStatus != recorder.Code { if expectedStatus != recorder.Code {