From c950318a944e7c74d0124c0fbcf0bea4b27d64a4 Mon Sep 17 00:00:00 2001 From: Ryo Date: Mon, 15 Sep 2025 20:03:56 +0800 Subject: [PATCH] refactor(infra): move parse_address to es package (#2163) --- backend/domain/workflow/entity/node_meta.go | 2 +- backend/domain/workflow/entity/vo/node.go | 1 + .../internal/canvas/adaptor/to_schema.go | 2 +- .../compose/workflow_field_mapping_test.go | 18 +++++++++--------- .../domain/workflow/internal/nodes/convert.go | 2 +- .../workflow/service/executable_impl_test.go | 8 ++++---- backend/infra/impl/coderunner/direct/runner.go | 9 ++++++--- backend/infra/impl/es/es7.go | 12 ++++++------ backend/infra/impl/es/es8.go | 10 +++++----- backend/infra/impl/es/es_impl.go | 3 ++- .../impl/es/parse_address.go} | 11 +++++------ 11 files changed, 41 insertions(+), 37 deletions(-) rename backend/{pkg/parsex/address.go => infra/impl/es/parse_address.go} (83%) diff --git a/backend/domain/workflow/entity/node_meta.go b/backend/domain/workflow/entity/node_meta.go index d046f2b8..198b1e87 100644 --- a/backend/domain/workflow/entity/node_meta.go +++ b/backend/domain/workflow/entity/node_meta.go @@ -609,7 +609,7 @@ var NodeTypeMetas = map[NodeType]*NodeTypeMeta{ Category: "", // Not found in cate_list Desc: "comment_desc", // Placeholder from JSON Color: "", - SupportBatch: false, // supportBatch: 1 + SupportBatch: false, // supportBatch: 1 EnUSName: "Comment", }, NodeTypeVariableAggregator: { diff --git a/backend/domain/workflow/entity/vo/node.go b/backend/domain/workflow/entity/vo/node.go index cce8a333..08520f0b 100644 --- a/backend/domain/workflow/entity/vo/node.go +++ b/backend/domain/workflow/entity/vo/node.go @@ -19,6 +19,7 @@ package vo import ( "errors" "fmt" + "github.com/cloudwego/eino/compose" "github.com/cloudwego/eino/schema" diff --git a/backend/domain/workflow/internal/canvas/adaptor/to_schema.go b/backend/domain/workflow/internal/canvas/adaptor/to_schema.go index d8d87561..63aa4790 100644 --- a/backend/domain/workflow/internal/canvas/adaptor/to_schema.go +++ b/backend/domain/workflow/internal/canvas/adaptor/to_schema.go @@ -446,7 +446,7 @@ func PruneIsolatedNodes(nodes []*vo.Node, edges []*vo.Edge, parentNode *vo.Node) func parseBatchMode(n *vo.Node) ( batchN *vo.Node, // the new batch node - enabled bool, // whether the node has enabled batch mode + enabled bool, // whether the node has enabled batch mode err error) { if n.Data == nil || n.Data.Inputs == nil { return nil, false, nil diff --git a/backend/domain/workflow/internal/compose/workflow_field_mapping_test.go b/backend/domain/workflow/internal/compose/workflow_field_mapping_test.go index 6e4c1ce2..616e756a 100644 --- a/backend/domain/workflow/internal/compose/workflow_field_mapping_test.go +++ b/backend/domain/workflow/internal/compose/workflow_field_mapping_test.go @@ -27,17 +27,17 @@ import ( func TestAddFieldMappingsWithDeduplication(t *testing.T) { tests := []struct { - name string + name string initialCarryOvers map[vo.NodeKey][]*compose.FieldMapping - fromNodeKey vo.NodeKey - fieldMappings []*compose.FieldMapping - expectedCount int - description string + fromNodeKey vo.NodeKey + fieldMappings []*compose.FieldMapping + expectedCount int + description string }{ { - name: "empty_carry_overs", + name: "empty_carry_overs", initialCarryOvers: make(map[vo.NodeKey][]*compose.FieldMapping), - fromNodeKey: "node1", + fromNodeKey: "node1", fieldMappings: []*compose.FieldMapping{ compose.MapFieldPaths(compose.FieldPath{"input1"}, compose.FieldPath{"output1"}), compose.MapFieldPaths(compose.FieldPath{"input2"}, compose.FieldPath{"output2"}), @@ -94,7 +94,7 @@ func TestAddFieldMappingsWithDeduplication(t *testing.T) { description: "should not add any mappings when all are duplicates", }, { - name: "new_node_key", + name: "new_node_key", initialCarryOvers: map[vo.NodeKey][]*compose.FieldMapping{ "node1": { compose.MapFieldPaths(compose.FieldPath{"input1"}, compose.FieldPath{"output1"}), @@ -109,7 +109,7 @@ func TestAddFieldMappingsWithDeduplication(t *testing.T) { description: "should add all mappings for new node key", }, { - name: "empty_field_mappings", + name: "empty_field_mappings", initialCarryOvers: map[vo.NodeKey][]*compose.FieldMapping{ "node1": { compose.MapFieldPaths(compose.FieldPath{"input1"}, compose.FieldPath{"output1"}), diff --git a/backend/domain/workflow/internal/nodes/convert.go b/backend/domain/workflow/internal/nodes/convert.go index 1776bad2..6d549261 100644 --- a/backend/domain/workflow/internal/nodes/convert.go +++ b/backend/domain/workflow/internal/nodes/convert.go @@ -20,12 +20,12 @@ import ( "context" "errors" "fmt" - workflowModel "github.com/coze-dev/coze-studio/backend/api/model/crossdomain/workflow" "net/url" "path/filepath" "strconv" "strings" + workflowModel "github.com/coze-dev/coze-studio/backend/api/model/crossdomain/workflow" "github.com/coze-dev/coze-studio/backend/domain/workflow/entity/vo" "github.com/coze-dev/coze-studio/backend/pkg/errorx" "github.com/coze-dev/coze-studio/backend/pkg/lang/ptr" diff --git a/backend/domain/workflow/service/executable_impl_test.go b/backend/domain/workflow/service/executable_impl_test.go index e3224d6e..9a4e3ba7 100644 --- a/backend/domain/workflow/service/executable_impl_test.go +++ b/backend/domain/workflow/service/executable_impl_test.go @@ -204,11 +204,11 @@ func TestImpl_prefetchChatHistory(t *testing.T) { crossmessage.SetDefaultSVC(mockMessage) tests := []struct { - name string - setupMock func(msgSvc *messagemock.MockMessage) - config workflowModel.ExecuteConfig + name string + setupMock func(msgSvc *messagemock.MockMessage) + config workflowModel.ExecuteConfig historyRounds int64 - expectErr bool + expectErr bool }{ { name: "SectionID is nil", diff --git a/backend/infra/impl/coderunner/direct/runner.go b/backend/infra/impl/coderunner/direct/runner.go index 76232e41..4b4e7258 100644 --- a/backend/infra/impl/coderunner/direct/runner.go +++ b/backend/infra/impl/coderunner/direct/runner.go @@ -74,20 +74,23 @@ func (r *runner) Run(ctx context.Context, request *coderunner.RunRequest) (*code } func (r *runner) pythonCmdRun(_ context.Context, code string, params map[string]any) (map[string]any, error) { - bs, _ := sonic.Marshal(params) + bs, err := sonic.Marshal(params) + if err != nil { + return nil, fmt.Errorf("failed to marshal params to json, err: %w", err) + } cmd := exec.Command(goutil.GetPython3Path(), "-c", fmt.Sprintf(pythonCode, code), string(bs)) // ignore_security_alert RCE stdout := new(bytes.Buffer) stderr := new(bytes.Buffer) cmd.Stdout = stdout cmd.Stderr = stderr - err := cmd.Run() + err = cmd.Run() if err != nil { return nil, fmt.Errorf("failed to run python script err: %s, std err: %s", err.Error(), stderr.String()) } - if stderr.String() != "" { return nil, fmt.Errorf("failed to run python script err: %s", stderr.String()) } + ret := make(map[string]any) err = sonic.Unmarshal(stdout.Bytes(), &ret) if err != nil { diff --git a/backend/infra/impl/es/es7.go b/backend/infra/impl/es/es7.go index 1e6db76a..a8e00451 100644 --- a/backend/infra/impl/es/es7.go +++ b/backend/infra/impl/es/es7.go @@ -21,12 +21,12 @@ import ( "context" "encoding/json" "fmt" - "github.com/coze-dev/coze-studio/backend/pkg/parsex" + "io" + "os" + "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esapi" "github.com/elastic/go-elasticsearch/v7/esutil" - "io" - "os" "github.com/coze-dev/coze-studio/backend/infra/contract/es" "github.com/coze-dev/coze-studio/backend/pkg/lang/conv" @@ -39,7 +39,7 @@ type es7Client struct { } func newES7() (Client, error) { - addresses, err := parsex.ParseClusterEndpoints(os.Getenv("ES_ADDR")) + addresses, err := parseClusterEndpoints(os.Getenv("ES_ADDR")) if err != nil { return nil, err } @@ -123,8 +123,8 @@ func (c *es7Client) CreateIndex(ctx context.Context, index string, properties ma "properties": properties, }, "settings": map[string]any{ - "number_of_shards": parsex.GetEnvDefaultIntSetting("ES_NUMBER_OF_SHARDS", "1"), - "number_of_replicas": parsex.GetEnvDefaultIntSetting("ES_NUMBER_OF_REPLICAS", "1"), + "number_of_shards": getEnvDefaultIntSetting("ES_NUMBER_OF_SHARDS", "1"), + "number_of_replicas": getEnvDefaultIntSetting("ES_NUMBER_OF_REPLICAS", "1"), }, } diff --git a/backend/infra/impl/es/es8.go b/backend/infra/impl/es/es8.go index 55fc3e9a..dd5c34ad 100644 --- a/backend/infra/impl/es/es8.go +++ b/backend/infra/impl/es/es8.go @@ -19,7 +19,8 @@ package es import ( "context" "fmt" - "github.com/coze-dev/coze-studio/backend/pkg/parsex" + "os" + "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/esutil" "github.com/elastic/go-elasticsearch/v8/typedapi/core/search" @@ -30,7 +31,6 @@ import ( "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/operator" "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/sortorder" "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/textquerytype" - "os" "github.com/coze-dev/coze-studio/backend/infra/contract/es" "github.com/coze-dev/coze-studio/backend/pkg/lang/conv" @@ -51,7 +51,7 @@ type es8BulkIndexer struct { type es8Types struct{} func newES8() (Client, error) { - addresses, err := parsex.ParseClusterEndpoints(os.Getenv("ES_ADDR")) + addresses, err := parseClusterEndpoints(os.Getenv("ES_ADDR")) if err != nil { return nil, err } @@ -243,8 +243,8 @@ func (c *es8Client) CreateIndex(ctx context.Context, index string, properties ma Properties: propertiesMap, }, Settings: &types.IndexSettings{ - NumberOfShards: parsex.GetEnvDefaultIntSetting("ES_NUMBER_OF_SHARDS", "1"), - NumberOfReplicas: parsex.GetEnvDefaultIntSetting("ES_NUMBER_OF_REPLICAS", "1"), + NumberOfShards: getEnvDefaultIntSetting("ES_NUMBER_OF_SHARDS", "1"), + NumberOfReplicas: getEnvDefaultIntSetting("ES_NUMBER_OF_REPLICAS", "1"), }, }).Do(ctx); err != nil { return err diff --git a/backend/infra/impl/es/es_impl.go b/backend/infra/impl/es/es_impl.go index 5b7bcaf4..5921dc89 100644 --- a/backend/infra/impl/es/es_impl.go +++ b/backend/infra/impl/es/es_impl.go @@ -18,8 +18,9 @@ package es import ( "fmt" - "github.com/coze-dev/coze-studio/backend/infra/contract/es" "os" + + "github.com/coze-dev/coze-studio/backend/infra/contract/es" ) type ( diff --git a/backend/pkg/parsex/address.go b/backend/infra/impl/es/parse_address.go similarity index 83% rename from backend/pkg/parsex/address.go rename to backend/infra/impl/es/parse_address.go index a772b226..e8590f64 100644 --- a/backend/pkg/parsex/address.go +++ b/backend/infra/impl/es/parse_address.go @@ -14,18 +14,18 @@ * limitations under the License. */ -package parsex +package es import ( "fmt" - "github.com/coze-dev/coze-studio/backend/pkg/logs" "os" "strconv" "strings" + + "github.com/coze-dev/coze-studio/backend/pkg/logs" ) -// ParseClusterEndpoints 解析 ES /kafka 地址,多个地址用逗号分隔 -func ParseClusterEndpoints(address string) ([]string, error) { +func parseClusterEndpoints(address string) ([]string, error) { if strings.TrimSpace(address) == "" { return nil, fmt.Errorf("endpoints environment variable is required") } @@ -52,8 +52,7 @@ func ParseClusterEndpoints(address string) ([]string, error) { return validEndpoints, nil } -// GetEnvDefaultIntSetting 获取环境变量的值,如果不存在或无效则返回默认值 -func GetEnvDefaultIntSetting(envVar, defaultValue string) string { +func getEnvDefaultIntSetting(envVar, defaultValue string) string { value := os.Getenv(envVar) if value == "" { return defaultValue