feat(webhook): add workflow_step webhook events for step-level CI notifications

Adds HookEventWorkflowStep event type that fires on every step state
transition (queued -> in_progress -> completed). Follows the same
pattern as the existing workflow_job events.

- New WorkflowStepPayload struct with run/job/step context
- WorkflowStepStatusUpdate notifier interface + dispatch
- Step state change detection in UpdateTask runner endpoint
- Fix: register workflow_step in updateHookEvents API mapping
- Full test coverage mirroring workflow_job tests

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-06 17:47:14 -05:00
parent a39af1a829
commit bb6faa4c5d
30 changed files with 504 additions and 3 deletions
+24
View File
@@ -119,6 +119,30 @@ func NotifyWorkflowJobsStatusUpdate(ctx context.Context, jobs ...*actions_model.
}
}
// NotifyWorkflowStepStatusUpdate notifies a single step status update when a concrete task is available.
// Use it when a step transitions between queued, in_progress, and completed states.
func NotifyWorkflowStepStatusUpdate(ctx context.Context, job *actions_model.ActionRunJob, task *actions_model.ActionTask, step *actions_model.ActionTaskStep) {
if job.RunAttemptID == 0 {
if err := job.LoadAttributes(ctx); err != nil {
log.Error("job.LoadAttributes: %v", err)
return
}
notify_service.WorkflowStepStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, task, step)
return
}
attempt, err := actions_model.GetRunAttemptByRepoAndID(ctx, job.RepoID, job.RunAttemptID)
if err != nil {
log.Error("GetRunAttemptByRepoAndID: %v", err)
return
}
if err := attempt.LoadAttributes(ctx); err != nil {
log.Error("attempt.LoadAttributes: %v", err)
return
}
notify_service.WorkflowStepStatusUpdate(ctx, attempt.Run.Repo, attempt.TriggerUser, job, task, step)
}
// NotifyWorkflowJobStatusUpdateWithTask notifies a single job status update when a concrete task is available.
// Use it for runner/task lifecycle callbacks so the notification includes the originating task context.
func NotifyWorkflowJobStatusUpdateWithTask(ctx context.Context, job *actions_model.ActionRunJob, task *actions_model.ActionTask) {
+14
View File
@@ -357,6 +357,20 @@ func ToActionsStatus(status actions_model.Status) (string, string) {
return action, conclusion
}
// ToActionWorkflowStep converts an actions_model.ActionTaskStep to an api.ActionWorkflowStep.
// The step index (0-based position) must be passed by the caller because it is not stored on the model.
func ToActionWorkflowStep(step *actions_model.ActionTaskStep, stepIndex int) *api.ActionWorkflowStep {
stepStatus, stepConclusion := ToActionsStatus(step.Status)
return &api.ActionWorkflowStep{
Name: step.Name,
Number: int64(stepIndex),
Status: stepStatus,
Conclusion: stepConclusion,
StartedAt: step.Started.AsTime().UTC(),
CompletedAt: step.Stopped.AsTime().UTC(),
}
}
// ToActionWorkflowJob convert a actions_model.ActionRunJob to an api.ActionWorkflowJob
// task is optional and can be nil
func ToActionWorkflowJob(ctx context.Context, repo *repo_model.Repository, task *actions_model.ActionTask, job *actions_model.ActionRunJob) (*api.ActionWorkflowJob, error) {
+1
View File
@@ -230,6 +230,7 @@ type WebhookForm struct {
Status bool
WorkflowRun bool
WorkflowJob bool
WorkflowStep bool
Active bool
BranchFilter string `binding:"GlobPattern"`
AuthorizationHeader string
+2
View File
@@ -82,4 +82,6 @@ type Notifier interface {
WorkflowRunStatusUpdate(ctx context.Context, repo *repo_model.Repository, sender *user_model.User, run *actions_model.ActionRun)
WorkflowJobStatusUpdate(ctx context.Context, repo *repo_model.Repository, sender *user_model.User, job *actions_model.ActionRunJob, task *actions_model.ActionTask)
WorkflowStepStatusUpdate(ctx context.Context, repo *repo_model.Repository, sender *user_model.User, job *actions_model.ActionRunJob, task *actions_model.ActionTask, step *actions_model.ActionTaskStep)
}
+7
View File
@@ -416,3 +416,10 @@ func WorkflowJobStatusUpdate(ctx context.Context, repo *repo_model.Repository, s
notifier.WorkflowJobStatusUpdate(ctx, repo, sender, job, task)
}
}
// WorkflowStepStatusUpdate dispatches a workflow step status change to every registered notifier.
func WorkflowStepStatusUpdate(ctx context.Context, repo *repo_model.Repository, sender *user_model.User, job *actions_model.ActionRunJob, task *actions_model.ActionTask, step *actions_model.ActionTaskStep) {
for _, notifier := range notifiers {
notifier.WorkflowStepStatusUpdate(ctx, repo, sender, job, task, step)
}
}
+3
View File
@@ -219,3 +219,6 @@ func (*NullNotifier) WorkflowRunStatusUpdate(ctx context.Context, repo *repo_mod
func (*NullNotifier) WorkflowJobStatusUpdate(ctx context.Context, repo *repo_model.Repository, sender *user_model.User, job *actions_model.ActionRunJob, task *actions_model.ActionTask) {
}
func (*NullNotifier) WorkflowStepStatusUpdate(ctx context.Context, repo *repo_model.Repository, sender *user_model.User, job *actions_model.ActionRunJob, task *actions_model.ActionTask, step *actions_model.ActionTaskStep) {
}
+6
View File
@@ -188,6 +188,12 @@ func (dingtalkConvertor) WorkflowJob(p *api.WorkflowJobPayload) (DingtalkPayload
return createDingtalkPayload(text, text, "Workflow Job", p.WorkflowJob.HTMLURL), nil
}
func (dingtalkConvertor) WorkflowStep(p *api.WorkflowStepPayload) (DingtalkPayload, error) {
text, _ := getWorkflowStepPayloadInfo(p, noneLinkFormatter, true)
return createDingtalkPayload(text, text, "Workflow Step", p.WorkflowJob.HTMLURL), nil
}
func createDingtalkPayload(title, text, singleTitle, singleURL string) DingtalkPayload {
return DingtalkPayload{
MsgType: "actionCard",
+6
View File
@@ -290,6 +290,12 @@ func (d discordConvertor) WorkflowJob(p *api.WorkflowJobPayload) (DiscordPayload
return d.createPayload(p.Sender, text, "", p.WorkflowJob.HTMLURL, color), nil
}
func (d discordConvertor) WorkflowStep(p *api.WorkflowStepPayload) (DiscordPayload, error) {
text, color := getWorkflowStepPayloadInfo(p, noneLinkFormatter, false)
return d.createPayload(p.Sender, text, "", p.WorkflowJob.HTMLURL, color), nil
}
func newDiscordRequest(_ context.Context, w *webhook_model.Webhook, t *webhook_model.HookTask) (*http.Request, []byte, error) {
meta := &DiscordMeta{}
if err := json.Unmarshal([]byte(w.Meta), meta); err != nil {
+15
View File
@@ -280,6 +280,21 @@ func TestDiscordPayload(t *testing.T) {
assert.Equal(t, setting.AppURL+p.Sender.UserName, pl.Embeds[0].Author.URL)
assert.Equal(t, p.Sender.AvatarURL, pl.Embeds[0].Author.IconURL)
})
t.Run("WorkflowStep", func(t *testing.T) {
p := workflowStepTestPayload()
pl, err := dc.WorkflowStep(p)
require.NoError(t, err)
assert.Len(t, pl.Embeds, 1)
assert.Equal(t, "Workflow Step completed: build / Run tests:success", pl.Embeds[0].Title)
assert.Empty(t, pl.Embeds[0].Description)
assert.Equal(t, p.WorkflowJob.HTMLURL, pl.Embeds[0].URL)
assert.Equal(t, p.Sender.UserName, pl.Embeds[0].Author.Name)
assert.Equal(t, setting.AppURL+p.Sender.UserName, pl.Embeds[0].Author.URL)
assert.Equal(t, p.Sender.AvatarURL, pl.Embeds[0].Author.IconURL)
})
}
func TestDiscordJSONPayload(t *testing.T) {
+6
View File
@@ -191,6 +191,12 @@ func (feishuConvertor) WorkflowJob(p *api.WorkflowJobPayload) (FeishuPayload, er
return newFeishuTextPayload(text), nil
}
func (feishuConvertor) WorkflowStep(p *api.WorkflowStepPayload) (FeishuPayload, error) {
text, _ := getWorkflowStepPayloadInfo(p, noneLinkFormatter, true)
return newFeishuTextPayload(text), nil
}
// feishuGenSign generates a signature for Feishu webhook
// https://open.feishu.cn/document/client-docs/bot-v3/add-custom-bot
func feishuGenSign(secret string, timestamp int64) string {
+31
View File
@@ -389,6 +389,37 @@ func getWorkflowJobPayloadInfo(p *api.WorkflowJobPayload, linkFormatter linkForm
return text, color
}
func getWorkflowStepPayloadInfo(p *api.WorkflowStepPayload, linkFormatter linkFormatter, withSender bool) (text string, color int) {
description := p.Step.Conclusion
if description == "" {
description = p.Step.Status
}
stepLink := linkFormatter(p.WorkflowJob.HTMLURL, fmt.Sprintf("%s / %s", p.WorkflowJob.Name, p.Step.Name)+":"+description)
text = fmt.Sprintf("Workflow Step %s: %s", p.Action, stepLink)
switch description {
case "waiting":
color = orangeColor
case "queued":
color = orangeColorLight
case "success":
color = greenColor
case "failure":
color = redColor
case "cancelled":
color = yellowColor
case "skipped":
color = purpleColor
default:
color = greyColor
}
if withSender {
text += " by " + linkFormatter(setting.AppURL+url.PathEscape(p.Sender.UserName), p.Sender.UserName)
}
return text, color
}
// ToHook convert models.Webhook to api.Hook
// This function is not part of the convert package to prevent an import cycle
func ToHook(repoLink string, w *webhook_model.Webhook) (*api.Hook, error) {
+119
View File
@@ -610,6 +610,125 @@ func TestGetReleasePayloadInfo(t *testing.T) {
}
}
func workflowJobTestPayload() *api.WorkflowJobPayload {
return &api.WorkflowJobPayload{
Action: "completed",
WorkflowJob: &api.ActionWorkflowJob{
ID: 1,
Name: "build",
Status: "completed",
RunID: 42,
HeadSha: "abc123",
HTMLURL: "http://localhost:3000/test/repo/actions/runs/42/jobs/1",
},
Repo: &api.Repository{
HTMLURL: "http://localhost:3000/test/repo",
Name: "repo",
FullName: "test/repo",
},
Sender: &api.User{
UserName: "user1",
AvatarURL: "http://localhost:3000/user1/avatar",
},
}
}
func workflowStepTestPayload() *api.WorkflowStepPayload {
return &api.WorkflowStepPayload{
Action: "completed",
WorkflowRun: &api.ActionWorkflowRun{
ID: 42,
DisplayTitle: "Push: main",
},
WorkflowJob: &api.ActionWorkflowJob{
ID: 1,
Name: "build",
Status: "completed",
RunID: 42,
HeadSha: "abc123",
HTMLURL: "http://localhost:3000/test/repo/actions/runs/42/jobs/1",
},
Step: &api.ActionWorkflowStep{
Name: "Run tests",
Number: 0,
Status: "completed",
Conclusion: "success",
},
Repo: &api.Repository{
HTMLURL: "http://localhost:3000/test/repo",
Name: "repo",
FullName: "test/repo",
},
Sender: &api.User{
UserName: "user1",
AvatarURL: "http://localhost:3000/user1/avatar",
},
}
}
func TestGetWorkflowStepPayloadInfo(t *testing.T) {
cases := []struct {
action string
status string
conclusion string
expectText string
expectColor int
}{
{
action: "queued",
status: "queued",
conclusion: "",
expectText: "Workflow Step queued: build / Run tests:queued",
expectColor: orangeColorLight,
},
{
action: "in_progress",
status: "in_progress",
conclusion: "",
expectText: "Workflow Step in_progress: build / Run tests:in_progress",
expectColor: greyColor,
},
{
action: "completed",
status: "completed",
conclusion: "success",
expectText: "Workflow Step completed: build / Run tests:success",
expectColor: greenColor,
},
{
action: "completed",
status: "completed",
conclusion: "failure",
expectText: "Workflow Step completed: build / Run tests:failure",
expectColor: redColor,
},
{
action: "completed",
status: "completed",
conclusion: "cancelled",
expectText: "Workflow Step completed: build / Run tests:cancelled",
expectColor: yellowColor,
},
{
action: "completed",
status: "completed",
conclusion: "skipped",
expectText: "Workflow Step completed: build / Run tests:skipped",
expectColor: purpleColor,
},
}
for i, c := range cases {
p := workflowStepTestPayload()
p.Action = c.action
p.Step.Status = c.status
p.Step.Conclusion = c.conclusion
text, color := getWorkflowStepPayloadInfo(p, noneLinkFormatter, false)
assert.Equal(t, c.expectText, text, "case %d", i)
assert.Equal(t, c.expectColor, color, "case %d", i)
}
}
func TestGetIssueCommentPayloadInfo(t *testing.T) {
p := pullRequestCommentTestPayload()
+6
View File
@@ -265,6 +265,12 @@ func (m matrixConvertor) WorkflowJob(p *api.WorkflowJobPayload) (MatrixPayload,
return m.newPayload(text)
}
func (m matrixConvertor) WorkflowStep(p *api.WorkflowStepPayload) (MatrixPayload, error) {
text, _ := getWorkflowStepPayloadInfo(p, htmlLinkFormatter, true)
return m.newPayload(text)
}
var urlRegex = regexp.MustCompile(`<a [^>]*?href="([^">]*?)">(.*?)</a>`)
func getMessageBody(htmlText string) string {
+14
View File
@@ -346,6 +346,20 @@ func (msteamsConvertor) WorkflowJob(p *api.WorkflowJobPayload) (MSTeamsPayload,
), nil
}
func (msteamsConvertor) WorkflowStep(p *api.WorkflowStepPayload) (MSTeamsPayload, error) {
title, color := getWorkflowStepPayloadInfo(p, noneLinkFormatter, false)
return createMSTeamsPayload(
p.Repo,
p.Sender,
title,
"",
p.WorkflowJob.HTMLURL,
color,
&MSTeamsFact{"WorkflowStep:", p.Step.Name},
), nil
}
func createMSTeamsPayload(r *api.Repository, s *api.User, title, text, actionTarget string, color int, fact *MSTeamsFact) MSTeamsPayload {
facts := make([]MSTeamsFact, 0, 2)
if r != nil {
+49
View File
@@ -984,6 +984,55 @@ func notifyPackage(ctx context.Context, sender *user_model.User, pd *packages_mo
}
}
func (*webhookNotifier) WorkflowStepStatusUpdate(ctx context.Context, repo *repo_model.Repository, sender *user_model.User, job *actions_model.ActionRunJob, task *actions_model.ActionTask, step *actions_model.ActionTaskStep) {
source := EventSource{
Repository: repo,
Owner: repo.Owner,
}
var org *api.Organization
if repo.Owner.IsOrganization() {
org = convert.ToOrganization(ctx, organization.OrgFromUser(repo.Owner))
}
stepStatus, _ := convert.ToActionsStatus(step.Status)
convertedJob, err := convert.ToActionWorkflowJob(ctx, repo, task, job)
if err != nil {
log.Error("ToActionWorkflowJob: %v", err)
return
}
// Find the step index within the task steps for the Number field.
stepIndex := int(step.Index)
convertedStep := convert.ToActionWorkflowStep(step, stepIndex)
// Build a minimal WorkflowRun for the payload.
if err := job.LoadAttributes(ctx); err != nil {
log.Error("job.LoadAttributes: %v", err)
return
}
convertedRun, err := convert.ToActionWorkflowRun(ctx, job.Run, nil)
if err != nil {
log.Error("ToActionWorkflowRun: %v", err)
return
}
if err := PrepareWebhooks(ctx, source, webhook_module.HookEventWorkflowStep, &api.WorkflowStepPayload{
Action: stepStatus,
WorkflowRun: convertedRun,
WorkflowJob: convertedJob,
Step: convertedStep,
Organization: org,
Repo: convert.ToRepo(ctx, repo, access_model.Permission{AccessMode: perm.AccessModeOwner}),
Sender: convert.ToUser(ctx, sender, nil),
}); err != nil {
log.Error("PrepareWebhooks: %v", err)
}
}
func (*webhookNotifier) WorkflowJobStatusUpdate(ctx context.Context, repo *repo_model.Repository, sender *user_model.User, job *actions_model.ActionRunJob, task *actions_model.ActionTask) {
source := EventSource{
Repository: repo,
+4
View File
@@ -122,6 +122,10 @@ func (pc packagistConvertor) WorkflowJob(_ *api.WorkflowJobPayload) (PackagistPa
return PackagistPayload{}, nil
}
func (pc packagistConvertor) WorkflowStep(_ *api.WorkflowStepPayload) (PackagistPayload, error) {
return PackagistPayload{}, nil
}
func newPackagistRequest(_ context.Context, w *webhook_model.Webhook, t *webhook_model.HookTask) (*http.Request, []byte, error) {
meta := &PackagistMeta{}
if err := json.Unmarshal([]byte(w.Meta), meta); err != nil {
+3
View File
@@ -31,6 +31,7 @@ type payloadConvertor[T any] interface {
Status(*api.CommitStatusPayload) (T, error)
WorkflowRun(*api.WorkflowRunPayload) (T, error)
WorkflowJob(*api.WorkflowJobPayload) (T, error)
WorkflowStep(*api.WorkflowStepPayload) (T, error)
}
func convertUnmarshalledJSON[T, P any](convert func(P) (T, error), data []byte) (t T, err error) {
@@ -86,6 +87,8 @@ func newPayload[T any](rc payloadConvertor[T], data []byte, event webhook_module
return convertUnmarshalledJSON(rc.WorkflowRun, data)
case webhook_module.HookEventWorkflowJob:
return convertUnmarshalledJSON(rc.WorkflowJob, data)
case webhook_module.HookEventWorkflowStep:
return convertUnmarshalledJSON(rc.WorkflowStep, data)
}
return t, fmt.Errorf("newPayload unsupported event: %s", event)
}
+6
View File
@@ -185,6 +185,12 @@ func (s slackConvertor) WorkflowJob(p *api.WorkflowJobPayload) (SlackPayload, er
return s.createPayload(text, nil), nil
}
func (s slackConvertor) WorkflowStep(p *api.WorkflowStepPayload) (SlackPayload, error) {
text, _ := getWorkflowStepPayloadInfo(p, SlackLinkFormatter, true)
return s.createPayload(text, nil), nil
}
// Push implements payloadConvertor Push method
func (s slackConvertor) Push(p *api.PushPayload) (SlackPayload, error) {
// n new commits
+9
View File
@@ -155,6 +155,15 @@ func TestSlackPayload(t *testing.T) {
assert.Equal(t, "[<http://localhost:3000/test/repo|test/repo>] Release created: <http://localhost:3000/test/repo/releases/tag/v1.0|v1.0> by <https://try.gitea.io/user1|user1>", pl.Text)
})
t.Run("WorkflowStep", func(t *testing.T) {
p := workflowStepTestPayload()
pl, err := sc.WorkflowStep(p)
require.NoError(t, err)
assert.Equal(t, "Workflow Step completed: <http://localhost:3000/test/repo/actions/runs/42/jobs/1|build / Run tests:success> by <https://try.gitea.io/user1|user1>", pl.Text)
})
}
func TestSlackJSONPayload(t *testing.T) {
+6
View File
@@ -192,6 +192,12 @@ func (telegramConvertor) WorkflowJob(p *api.WorkflowJobPayload) (TelegramPayload
return createTelegramPayloadHTML(text), nil
}
func (telegramConvertor) WorkflowStep(p *api.WorkflowStepPayload) (TelegramPayload, error) {
text, _ := getWorkflowStepPayloadInfo(p, htmlLinkFormatter, true)
return createTelegramPayloadHTML(text), nil
}
func createTelegramPayloadHTML(msgHTML string) TelegramPayload {
// https://core.telegram.org/bots/api#formatting-options
return TelegramPayload{
+6
View File
@@ -193,6 +193,12 @@ func (wc wechatworkConvertor) WorkflowJob(p *api.WorkflowJobPayload) (Wechatwork
return newWechatworkMarkdownPayload(text), nil
}
func (wc wechatworkConvertor) WorkflowStep(p *api.WorkflowStepPayload) (WechatworkPayload, error) {
text, _ := getWorkflowStepPayloadInfo(p, noneLinkFormatter, true)
return newWechatworkMarkdownPayload(text), nil
}
func newWechatworkRequest(_ context.Context, w *webhook_model.Webhook, t *webhook_model.HookTask) (*http.Request, []byte, error) {
var pc payloadConvertor[WechatworkPayload] = wechatworkConvertor{}
return newJSONRequest(pc, w, t, true)