diff --git a/backend/infra/impl/es/es7.go b/backend/infra/impl/es/es7.go index 1a0a8930..1e6db76a 100644 --- a/backend/infra/impl/es/es7.go +++ b/backend/infra/impl/es/es7.go @@ -21,12 +21,12 @@ import ( "context" "encoding/json" "fmt" - "io" - "os" - + "github.com/coze-dev/coze-studio/backend/pkg/parsex" "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esapi" "github.com/elastic/go-elasticsearch/v7/esutil" + "io" + "os" "github.com/coze-dev/coze-studio/backend/infra/contract/es" "github.com/coze-dev/coze-studio/backend/pkg/lang/conv" @@ -39,12 +39,14 @@ type es7Client struct { } func newES7() (Client, error) { - esAddr := os.Getenv("ES_ADDR") + addresses, err := parsex.ParseClusterEndpoints(os.Getenv("ES_ADDR")) + if err != nil { + return nil, err + } esUsername := os.Getenv("ES_USERNAME") esPassword := os.Getenv("ES_PASSWORD") - esClient, err := elasticsearch.NewClient(elasticsearch.Config{ - Addresses: []string{esAddr}, + Addresses: addresses, Username: esUsername, Password: esPassword, }) @@ -120,6 +122,10 @@ func (c *es7Client) CreateIndex(ctx context.Context, index string, properties ma "mappings": map[string]any{ "properties": properties, }, + "settings": map[string]any{ + "number_of_shards": parsex.GetEnvDefaultIntSetting("ES_NUMBER_OF_SHARDS", "1"), + "number_of_replicas": parsex.GetEnvDefaultIntSetting("ES_NUMBER_OF_REPLICAS", "1"), + }, } body, err := json.Marshal(mapping) diff --git a/backend/infra/impl/es/es8.go b/backend/infra/impl/es/es8.go index 04f88dc7..55fc3e9a 100644 --- a/backend/infra/impl/es/es8.go +++ b/backend/infra/impl/es/es8.go @@ -19,8 +19,7 @@ package es import ( "context" "fmt" - "os" - + "github.com/coze-dev/coze-studio/backend/pkg/parsex" "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/esutil" "github.com/elastic/go-elasticsearch/v8/typedapi/core/search" @@ -31,6 +30,7 @@ import ( "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/operator" "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/sortorder" "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/textquerytype" + "os" "github.com/coze-dev/coze-studio/backend/infra/contract/es" "github.com/coze-dev/coze-studio/backend/pkg/lang/conv" @@ -51,11 +51,14 @@ type es8BulkIndexer struct { type es8Types struct{} func newES8() (Client, error) { - esAddr := os.Getenv("ES_ADDR") + addresses, err := parsex.ParseClusterEndpoints(os.Getenv("ES_ADDR")) + if err != nil { + return nil, err + } esUsername := os.Getenv("ES_USERNAME") esPassword := os.Getenv("ES_PASSWORD") esClient, err := elasticsearch.NewTypedClient(elasticsearch.Config{ - Addresses: []string{esAddr}, + Addresses: addresses, Username: esUsername, Password: esPassword, }) @@ -239,6 +242,10 @@ func (c *es8Client) CreateIndex(ctx context.Context, index string, properties ma Mappings: &types.TypeMapping{ Properties: propertiesMap, }, + Settings: &types.IndexSettings{ + NumberOfShards: parsex.GetEnvDefaultIntSetting("ES_NUMBER_OF_SHARDS", "1"), + NumberOfReplicas: parsex.GetEnvDefaultIntSetting("ES_NUMBER_OF_REPLICAS", "1"), + }, }).Do(ctx); err != nil { return err } diff --git a/backend/infra/impl/es/es_impl.go b/backend/infra/impl/es/es_impl.go index 5921dc89..5b7bcaf4 100644 --- a/backend/infra/impl/es/es_impl.go +++ b/backend/infra/impl/es/es_impl.go @@ -18,9 +18,8 @@ package es import ( "fmt" - "os" - "github.com/coze-dev/coze-studio/backend/infra/contract/es" + "os" ) type ( diff --git a/backend/pkg/parsex/address.go b/backend/pkg/parsex/address.go new file mode 100644 index 00000000..a772b226 --- /dev/null +++ b/backend/pkg/parsex/address.go @@ -0,0 +1,66 @@ +/* + * Copyright 2025 coze-dev Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parsex + +import ( + "fmt" + "github.com/coze-dev/coze-studio/backend/pkg/logs" + "os" + "strconv" + "strings" +) + +// ParseClusterEndpoints 解析 ES /kafka 地址,多个地址用逗号分隔 +func ParseClusterEndpoints(address string) ([]string, error) { + if strings.TrimSpace(address) == "" { + return nil, fmt.Errorf("endpoints environment variable is required") + } + + endpoints := strings.Split(address, ",") + var validEndpoints []string + uniqueEndpoints := make(map[string]bool, len(endpoints)) + + for _, endpoint := range endpoints { + trimmed := strings.TrimSpace(endpoint) + if trimmed == "" { + continue + } + if !uniqueEndpoints[trimmed] { + uniqueEndpoints[trimmed] = true + validEndpoints = append(validEndpoints, trimmed) + } + } + + if len(validEndpoints) == 0 { + return nil, fmt.Errorf("no valid endpoints found in: %s", address) + } + + return validEndpoints, nil +} + +// GetEnvDefaultIntSetting 获取环境变量的值,如果不存在或无效则返回默认值 +func GetEnvDefaultIntSetting(envVar, defaultValue string) string { + value := os.Getenv(envVar) + if value == "" { + return defaultValue + } + if num, err := strconv.Atoi(value); err != nil || num <= 0 { + logs.Warnf("Invalid %s value: %s, using default: %s", envVar, value, defaultValue) + return defaultValue + } + return value +} diff --git a/docker/.env.debug.example b/docker/.env.debug.example index 0efa3829..745117d9 100644 --- a/docker/.env.debug.example +++ b/docker/.env.debug.example @@ -75,6 +75,8 @@ export ES_ADDR="http://127.0.0.1:9200" export ES_VERSION="v8" export ES_USERNAME="" export ES_PASSWORD="" +export ES_NUMBER_OF_SHARDS = "1" +export ES_NUMBER_OF_REPLICAS = "1" export COZE_MQ_TYPE="nsq" # nsq / kafka / rmq diff --git a/docker/.env.example b/docker/.env.example index c1467213..7517e6fa 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -71,6 +71,8 @@ export ES_ADDR="http://elasticsearch:9200" export ES_VERSION="v8" export ES_USERNAME="" export ES_PASSWORD="" +export ES_NUMBER_OF_SHARDS = "1" +export ES_NUMBER_OF_REPLICAS = "1" export COZE_MQ_TYPE="nsq" # nsq / kafka / rmq