Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion backend/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ OUTPUT=type=docker,dest=$(HOME)/tmp/mcai_server.tar
GOCACHE=/root/.cache/go-build
GOMODCACHE?=/go/pkg/mod
REGISTRY=chaitin-registry.cn-hangzhou.cr.aliyuncs.com/monkeycode

# make server PLATFORM= TAG= OUTPUT_SERVER= GOCACHE=
image:
docker buildx build \
Expand Down
20 changes: 20 additions & 0 deletions backend/biz/host/handler/v1/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
type InternalHostHandler struct {
logger *slog.Logger
repo domain.HostRepo
taskRepo taskLogStoreRepo
teamRepo domain.TeamHostRepo
redis *redis.Client
getAgentToken agentTokenGetter
Expand All @@ -46,6 +47,10 @@ type InternalHostHandler struct {
tokenProvider *gituc.TokenProvider
}

type taskLogStoreRepo interface {
GetLogStore(ctx context.Context, id uuid.UUID) (consts.LogStore, error)
}

func NewInternalHostHandler(i *do.Injector) (*InternalHostHandler, error) {
w := do.MustInvoke[*web.Web](i)
tf := do.MustInvoke[taskflow.Clienter](i)
Expand All @@ -54,6 +59,7 @@ func NewInternalHostHandler(i *do.Injector) (*InternalHostHandler, error) {
h := &InternalHostHandler{
logger: do.MustInvoke[*slog.Logger](i).With("module", "InternalHostHandler"),
repo: do.MustInvoke[domain.HostRepo](i),
taskRepo: do.MustInvoke[domain.TaskRepo](i),
teamRepo: do.MustInvoke[domain.TeamHostRepo](i),
redis: rdb,
getAgentToken: defaultAgentTokenGetter(rdb),
Expand All @@ -78,6 +84,7 @@ func NewInternalHostHandler(i *do.Injector) (*InternalHostHandler, error) {
g.POST("/coding-config", web.BindHandler(h.GetCodingConfig))
g.POST("/git-credential", web.BindHandler(h.GitCredential))
g.GET("/vm/list", web.BaseHandler(h.VMList))
g.POST("/task-log-store", web.BindHandler(h.GetTaskLogStore))
g.POST("/task-stream-ips", web.BindHandler(h.GetTaskStreamIPs))

return h, nil
Expand Down Expand Up @@ -186,6 +193,19 @@ func (h *InternalHostHandler) CheckToken(c *web.Context, req taskflow.CheckToken
return c.Success(tk)
}

func (h *InternalHostHandler) GetTaskLogStore(c *web.Context, req taskflow.GetTaskLogStoreReq) error {
store, err := h.taskRepo.GetLogStore(c.Request().Context(), req.TaskID)
if err != nil {
return err
}
if store == "" {
store = consts.LogStoreLoki
}
return c.Success(taskflow.GetTaskLogStoreResp{
LogStore: string(store),
})
}

func (h *InternalHostHandler) agentAuth(ctx context.Context, token, mid string) (*taskflow.Token, error) {
// 1) 优先从 Redis 读取一次性 agent token,并清除
key := fmt.Sprintf("agent:token:%s", token)
Expand Down
2 changes: 1 addition & 1 deletion backend/biz/host/handler/v1/internal_auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func TestAgentAuthSoftDeletedRecycledVMStillTriggersDelete(t *testing.T) {
type internalHostRepoStub struct {
vm *db.VirtualMachine
assertSkipMarker bool
skipMarkerKey interface{}
skipMarkerKey any
skipMarkerValue string
}

Expand Down
114 changes: 114 additions & 0 deletions backend/biz/host/handler/v1/internal_logstore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package v1

import (
"context"
"encoding/json"
"errors"
"io"
"log/slog"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/GoYoko/web"
"github.com/google/uuid"

"github.com/chaitin/MonkeyCode/backend/consts"
taskflowpkg "github.com/chaitin/MonkeyCode/backend/pkg/taskflow"
)

type taskLogStoreRepoStub struct {
store consts.LogStore
err error
}

func (s *taskLogStoreRepoStub) GetLogStore(context.Context, uuid.UUID) (consts.LogStore, error) {
return s.store, s.err
}

func TestInternalHostHandler_GetTaskLogStore_EmptyMeansLoki(t *testing.T) {
h := &InternalHostHandler{
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
taskRepo: &taskLogStoreRepoStub{},
}
req := taskflowpkg.GetTaskLogStoreReq{TaskID: uuid.New()}
resp := callGetTaskLogStore(t, h, req)
if resp.LogStore != string(consts.LogStoreLoki) {
t.Fatalf("log_store = %q, want %q", resp.LogStore, consts.LogStoreLoki)
}
}

func TestInternalHostHandler_GetTaskLogStore_ClickHousePassthrough(t *testing.T) {
h := &InternalHostHandler{
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
taskRepo: &taskLogStoreRepoStub{store: consts.LogStoreClickHouse},
}
req := taskflowpkg.GetTaskLogStoreReq{TaskID: uuid.New()}
resp := callGetTaskLogStore(t, h, req)
if resp.LogStore != string(consts.LogStoreClickHouse) {
t.Fatalf("log_store = %q, want %q", resp.LogStore, consts.LogStoreClickHouse)
}
}

func TestInternalHostHandler_GetTaskLogStore_RepoError(t *testing.T) {
h := &InternalHostHandler{
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
taskRepo: &taskLogStoreRepoStub{err: errors.New("boom")},
}
req := taskflowpkg.GetTaskLogStoreReq{TaskID: uuid.New()}
body, err := json.Marshal(req)
if err != nil {
t.Fatal(err)
}
w := web.New()
w.POST("/internal/task-log-store", web.BindHandler(h.GetTaskLogStore))
rec := httptest.NewRecorder()
httpReq := httptest.NewRequest(http.MethodPost, "/internal/task-log-store", strings.NewReader(string(body)))
httpReq.Header.Set("Content-Type", "application/json")
w.Echo().ServeHTTP(rec, httpReq)

var resp web.Resp
if err := json.Unmarshal(rec.Body.Bytes(), &resp); err != nil {
t.Fatalf("unmarshal resp: %v", err)
}
if resp.Code == 0 {
t.Fatalf("response = %+v, want error", resp)
}
}

func callGetTaskLogStore(t *testing.T, h *InternalHostHandler, req taskflowpkg.GetTaskLogStoreReq) taskflowpkg.GetTaskLogStoreResp {
t.Helper()

body, err := json.Marshal(req)
if err != nil {
t.Fatal(err)
}

w := web.New()
w.POST("/internal/task-log-store", web.BindHandler(h.GetTaskLogStore))

rec := httptest.NewRecorder()
httpReq := httptest.NewRequest(http.MethodPost, "/internal/task-log-store", strings.NewReader(string(body)))
httpReq.Header.Set("Content-Type", "application/json")
w.Echo().ServeHTTP(rec, httpReq)

if rec.Code != http.StatusOK {
t.Fatalf("status = %d, body = %s", rec.Code, rec.Body.String())
}

var resp web.Resp
if err := json.Unmarshal(rec.Body.Bytes(), &resp); err != nil {
t.Fatalf("unmarshal resp: %v", err)
}
data, err := json.Marshal(resp.Data)
if err != nil {
t.Fatalf("marshal resp data: %v", err)
}

var out taskflowpkg.GetTaskLogStoreResp
if err := json.Unmarshal(data, &out); err != nil {
t.Fatalf("unmarshal typed resp: %v", err)
}
return out
}
11 changes: 11 additions & 0 deletions backend/biz/host/usecase/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,17 @@ func (s *hostTaskRepoStub) GetByID(ctx context.Context, id uuid.UUID) (*db.Task,
return s.client.Task.Get(ctx, id)
}

func (s *hostTaskRepoStub) GetLogStore(ctx context.Context, id uuid.UUID) (consts.LogStore, error) {
tk, err := s.client.Task.Get(ctx, id)
if err != nil {
return "", err
}
if tk.LogStore == nil {
return "", nil
}
return *tk.LogStore, nil
}

func (s *hostTaskRepoStub) Stat(context.Context, uuid.UUID) (*domain.TaskStats, error) {
panic("unexpected call to Stat")
}
Expand Down
Loading