refactor(infra): move parse_address to es package (#2163)

main
Ryo 1 month ago committed by GitHub
parent 61e8331a54
commit c950318a94
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      backend/domain/workflow/entity/node_meta.go
  2. 1
      backend/domain/workflow/entity/vo/node.go
  3. 2
      backend/domain/workflow/internal/canvas/adaptor/to_schema.go
  4. 18
      backend/domain/workflow/internal/compose/workflow_field_mapping_test.go
  5. 2
      backend/domain/workflow/internal/nodes/convert.go
  6. 8
      backend/domain/workflow/service/executable_impl_test.go
  7. 9
      backend/infra/impl/coderunner/direct/runner.go
  8. 12
      backend/infra/impl/es/es7.go
  9. 10
      backend/infra/impl/es/es8.go
  10. 3
      backend/infra/impl/es/es_impl.go
  11. 11
      backend/infra/impl/es/parse_address.go

@ -609,7 +609,7 @@ var NodeTypeMetas = map[NodeType]*NodeTypeMeta{
Category: "", // Not found in cate_list
Desc: "comment_desc", // Placeholder from JSON
Color: "",
SupportBatch: false, // supportBatch: 1
SupportBatch: false, // supportBatch: 1
EnUSName: "Comment",
},
NodeTypeVariableAggregator: {

@ -19,6 +19,7 @@ package vo
import (
"errors"
"fmt"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"

@ -446,7 +446,7 @@ func PruneIsolatedNodes(nodes []*vo.Node, edges []*vo.Edge, parentNode *vo.Node)
func parseBatchMode(n *vo.Node) (
batchN *vo.Node, // the new batch node
enabled bool, // whether the node has enabled batch mode
enabled bool, // whether the node has enabled batch mode
err error) {
if n.Data == nil || n.Data.Inputs == nil {
return nil, false, nil

@ -27,17 +27,17 @@ import (
func TestAddFieldMappingsWithDeduplication(t *testing.T) {
tests := []struct {
name string
name string
initialCarryOvers map[vo.NodeKey][]*compose.FieldMapping
fromNodeKey vo.NodeKey
fieldMappings []*compose.FieldMapping
expectedCount int
description string
fromNodeKey vo.NodeKey
fieldMappings []*compose.FieldMapping
expectedCount int
description string
}{
{
name: "empty_carry_overs",
name: "empty_carry_overs",
initialCarryOvers: make(map[vo.NodeKey][]*compose.FieldMapping),
fromNodeKey: "node1",
fromNodeKey: "node1",
fieldMappings: []*compose.FieldMapping{
compose.MapFieldPaths(compose.FieldPath{"input1"}, compose.FieldPath{"output1"}),
compose.MapFieldPaths(compose.FieldPath{"input2"}, compose.FieldPath{"output2"}),
@ -94,7 +94,7 @@ func TestAddFieldMappingsWithDeduplication(t *testing.T) {
description: "should not add any mappings when all are duplicates",
},
{
name: "new_node_key",
name: "new_node_key",
initialCarryOvers: map[vo.NodeKey][]*compose.FieldMapping{
"node1": {
compose.MapFieldPaths(compose.FieldPath{"input1"}, compose.FieldPath{"output1"}),
@ -109,7 +109,7 @@ func TestAddFieldMappingsWithDeduplication(t *testing.T) {
description: "should add all mappings for new node key",
},
{
name: "empty_field_mappings",
name: "empty_field_mappings",
initialCarryOvers: map[vo.NodeKey][]*compose.FieldMapping{
"node1": {
compose.MapFieldPaths(compose.FieldPath{"input1"}, compose.FieldPath{"output1"}),

@ -20,12 +20,12 @@ import (
"context"
"errors"
"fmt"
workflowModel "github.com/coze-dev/coze-studio/backend/api/model/crossdomain/workflow"
"net/url"
"path/filepath"
"strconv"
"strings"
workflowModel "github.com/coze-dev/coze-studio/backend/api/model/crossdomain/workflow"
"github.com/coze-dev/coze-studio/backend/domain/workflow/entity/vo"
"github.com/coze-dev/coze-studio/backend/pkg/errorx"
"github.com/coze-dev/coze-studio/backend/pkg/lang/ptr"

@ -204,11 +204,11 @@ func TestImpl_prefetchChatHistory(t *testing.T) {
crossmessage.SetDefaultSVC(mockMessage)
tests := []struct {
name string
setupMock func(msgSvc *messagemock.MockMessage)
config workflowModel.ExecuteConfig
name string
setupMock func(msgSvc *messagemock.MockMessage)
config workflowModel.ExecuteConfig
historyRounds int64
expectErr bool
expectErr bool
}{
{
name: "SectionID is nil",

@ -74,20 +74,23 @@ func (r *runner) Run(ctx context.Context, request *coderunner.RunRequest) (*code
}
func (r *runner) pythonCmdRun(_ context.Context, code string, params map[string]any) (map[string]any, error) {
bs, _ := sonic.Marshal(params)
bs, err := sonic.Marshal(params)
if err != nil {
return nil, fmt.Errorf("failed to marshal params to json, err: %w", err)
}
cmd := exec.Command(goutil.GetPython3Path(), "-c", fmt.Sprintf(pythonCode, code), string(bs)) // ignore_security_alert RCE
stdout := new(bytes.Buffer)
stderr := new(bytes.Buffer)
cmd.Stdout = stdout
cmd.Stderr = stderr
err := cmd.Run()
err = cmd.Run()
if err != nil {
return nil, fmt.Errorf("failed to run python script err: %s, std err: %s", err.Error(), stderr.String())
}
if stderr.String() != "" {
return nil, fmt.Errorf("failed to run python script err: %s", stderr.String())
}
ret := make(map[string]any)
err = sonic.Unmarshal(stdout.Bytes(), &ret)
if err != nil {

@ -21,12 +21,12 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/coze-dev/coze-studio/backend/pkg/parsex"
"io"
"os"
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/elastic/go-elasticsearch/v7/esutil"
"io"
"os"
"github.com/coze-dev/coze-studio/backend/infra/contract/es"
"github.com/coze-dev/coze-studio/backend/pkg/lang/conv"
@ -39,7 +39,7 @@ type es7Client struct {
}
func newES7() (Client, error) {
addresses, err := parsex.ParseClusterEndpoints(os.Getenv("ES_ADDR"))
addresses, err := parseClusterEndpoints(os.Getenv("ES_ADDR"))
if err != nil {
return nil, err
}
@ -123,8 +123,8 @@ func (c *es7Client) CreateIndex(ctx context.Context, index string, properties ma
"properties": properties,
},
"settings": map[string]any{
"number_of_shards": parsex.GetEnvDefaultIntSetting("ES_NUMBER_OF_SHARDS", "1"),
"number_of_replicas": parsex.GetEnvDefaultIntSetting("ES_NUMBER_OF_REPLICAS", "1"),
"number_of_shards": getEnvDefaultIntSetting("ES_NUMBER_OF_SHARDS", "1"),
"number_of_replicas": getEnvDefaultIntSetting("ES_NUMBER_OF_REPLICAS", "1"),
},
}

@ -19,7 +19,8 @@ package es
import (
"context"
"fmt"
"github.com/coze-dev/coze-studio/backend/pkg/parsex"
"os"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esutil"
"github.com/elastic/go-elasticsearch/v8/typedapi/core/search"
@ -30,7 +31,6 @@ import (
"github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/operator"
"github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/sortorder"
"github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/textquerytype"
"os"
"github.com/coze-dev/coze-studio/backend/infra/contract/es"
"github.com/coze-dev/coze-studio/backend/pkg/lang/conv"
@ -51,7 +51,7 @@ type es8BulkIndexer struct {
type es8Types struct{}
func newES8() (Client, error) {
addresses, err := parsex.ParseClusterEndpoints(os.Getenv("ES_ADDR"))
addresses, err := parseClusterEndpoints(os.Getenv("ES_ADDR"))
if err != nil {
return nil, err
}
@ -243,8 +243,8 @@ func (c *es8Client) CreateIndex(ctx context.Context, index string, properties ma
Properties: propertiesMap,
},
Settings: &types.IndexSettings{
NumberOfShards: parsex.GetEnvDefaultIntSetting("ES_NUMBER_OF_SHARDS", "1"),
NumberOfReplicas: parsex.GetEnvDefaultIntSetting("ES_NUMBER_OF_REPLICAS", "1"),
NumberOfShards: getEnvDefaultIntSetting("ES_NUMBER_OF_SHARDS", "1"),
NumberOfReplicas: getEnvDefaultIntSetting("ES_NUMBER_OF_REPLICAS", "1"),
},
}).Do(ctx); err != nil {
return err

@ -18,8 +18,9 @@ package es
import (
"fmt"
"github.com/coze-dev/coze-studio/backend/infra/contract/es"
"os"
"github.com/coze-dev/coze-studio/backend/infra/contract/es"
)
type (

@ -14,18 +14,18 @@
* limitations under the License.
*/
package parsex
package es
import (
"fmt"
"github.com/coze-dev/coze-studio/backend/pkg/logs"
"os"
"strconv"
"strings"
"github.com/coze-dev/coze-studio/backend/pkg/logs"
)
// ParseClusterEndpoints 解析 ES /kafka 地址,多个地址用逗号分隔
func ParseClusterEndpoints(address string) ([]string, error) {
func parseClusterEndpoints(address string) ([]string, error) {
if strings.TrimSpace(address) == "" {
return nil, fmt.Errorf("endpoints environment variable is required")
}
@ -52,8 +52,7 @@ func ParseClusterEndpoints(address string) ([]string, error) {
return validEndpoints, nil
}
// GetEnvDefaultIntSetting 获取环境变量的值,如果不存在或无效则返回默认值
func GetEnvDefaultIntSetting(envVar, defaultValue string) string {
func getEnvDefaultIntSetting(envVar, defaultValue string) string {
value := os.Getenv(envVar)
if value == "" {
return defaultValue
Loading…
Cancel
Save