From 04792aeb37efeb510b4b1850be66bc3cfaf9deeb Mon Sep 17 00:00:00 2001 From: fucktx <9391575+fucktx@users.noreply.github.com> Date: Mon, 8 Sep 2025 14:13:51 +0800 Subject: [PATCH] =?UTF-8?q?=20=20feat(infra):=20es=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E5=A4=9A=E8=8A=82=E7=82=B9=E9=85=8D=E7=BD=AE=20(#1995)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/infra/impl/es/es7.go | 18 ++++++--- backend/infra/impl/es/es8.go | 15 ++++++-- backend/infra/impl/es/es_impl.go | 3 +- backend/pkg/parsex/address.go | 66 ++++++++++++++++++++++++++++++++ docker/.env.debug.example | 2 + docker/.env.example | 2 + 6 files changed, 94 insertions(+), 12 deletions(-) create mode 100644 backend/pkg/parsex/address.go diff --git a/backend/infra/impl/es/es7.go b/backend/infra/impl/es/es7.go index 1a0a8930..1e6db76a 100644 --- a/backend/infra/impl/es/es7.go +++ b/backend/infra/impl/es/es7.go @@ -21,12 +21,12 @@ import ( "context" "encoding/json" "fmt" - "io" - "os" - + "github.com/coze-dev/coze-studio/backend/pkg/parsex" "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esapi" "github.com/elastic/go-elasticsearch/v7/esutil" + "io" + "os" "github.com/coze-dev/coze-studio/backend/infra/contract/es" "github.com/coze-dev/coze-studio/backend/pkg/lang/conv" @@ -39,12 +39,14 @@ type es7Client struct { } func newES7() (Client, error) { - esAddr := os.Getenv("ES_ADDR") + addresses, err := parsex.ParseClusterEndpoints(os.Getenv("ES_ADDR")) + if err != nil { + return nil, err + } esUsername := os.Getenv("ES_USERNAME") esPassword := os.Getenv("ES_PASSWORD") - esClient, err := elasticsearch.NewClient(elasticsearch.Config{ - Addresses: []string{esAddr}, + Addresses: addresses, Username: esUsername, Password: esPassword, }) @@ -120,6 +122,10 @@ func (c *es7Client) CreateIndex(ctx context.Context, index string, properties ma "mappings": map[string]any{ "properties": properties, }, + "settings": map[string]any{ + "number_of_shards": parsex.GetEnvDefaultIntSetting("ES_NUMBER_OF_SHARDS", "1"), + "number_of_replicas": parsex.GetEnvDefaultIntSetting("ES_NUMBER_OF_REPLICAS", "1"), + }, } body, err := json.Marshal(mapping) diff --git a/backend/infra/impl/es/es8.go b/backend/infra/impl/es/es8.go index 04f88dc7..55fc3e9a 100644 --- a/backend/infra/impl/es/es8.go +++ b/backend/infra/impl/es/es8.go @@ -19,8 +19,7 @@ package es import ( "context" "fmt" - "os" - + "github.com/coze-dev/coze-studio/backend/pkg/parsex" "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/esutil" "github.com/elastic/go-elasticsearch/v8/typedapi/core/search" @@ -31,6 +30,7 @@ import ( "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/operator" "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/sortorder" "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/textquerytype" + "os" "github.com/coze-dev/coze-studio/backend/infra/contract/es" "github.com/coze-dev/coze-studio/backend/pkg/lang/conv" @@ -51,11 +51,14 @@ type es8BulkIndexer struct { type es8Types struct{} func newES8() (Client, error) { - esAddr := os.Getenv("ES_ADDR") + addresses, err := parsex.ParseClusterEndpoints(os.Getenv("ES_ADDR")) + if err != nil { + return nil, err + } esUsername := os.Getenv("ES_USERNAME") esPassword := os.Getenv("ES_PASSWORD") esClient, err := elasticsearch.NewTypedClient(elasticsearch.Config{ - Addresses: []string{esAddr}, + Addresses: addresses, Username: esUsername, Password: esPassword, }) @@ -239,6 +242,10 @@ func (c *es8Client) CreateIndex(ctx context.Context, index string, properties ma Mappings: &types.TypeMapping{ Properties: propertiesMap, }, + Settings: &types.IndexSettings{ + NumberOfShards: parsex.GetEnvDefaultIntSetting("ES_NUMBER_OF_SHARDS", "1"), + NumberOfReplicas: parsex.GetEnvDefaultIntSetting("ES_NUMBER_OF_REPLICAS", "1"), + }, }).Do(ctx); err != nil { return err } diff --git a/backend/infra/impl/es/es_impl.go b/backend/infra/impl/es/es_impl.go index 5921dc89..5b7bcaf4 100644 --- a/backend/infra/impl/es/es_impl.go +++ b/backend/infra/impl/es/es_impl.go @@ -18,9 +18,8 @@ package es import ( "fmt" - "os" - "github.com/coze-dev/coze-studio/backend/infra/contract/es" + "os" ) type ( diff --git a/backend/pkg/parsex/address.go b/backend/pkg/parsex/address.go new file mode 100644 index 00000000..a772b226 --- /dev/null +++ b/backend/pkg/parsex/address.go @@ -0,0 +1,66 @@ +/* + * Copyright 2025 coze-dev Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parsex + +import ( + "fmt" + "github.com/coze-dev/coze-studio/backend/pkg/logs" + "os" + "strconv" + "strings" +) + +// ParseClusterEndpoints 解析 ES /kafka 地址,多个地址用逗号分隔 +func ParseClusterEndpoints(address string) ([]string, error) { + if strings.TrimSpace(address) == "" { + return nil, fmt.Errorf("endpoints environment variable is required") + } + + endpoints := strings.Split(address, ",") + var validEndpoints []string + uniqueEndpoints := make(map[string]bool, len(endpoints)) + + for _, endpoint := range endpoints { + trimmed := strings.TrimSpace(endpoint) + if trimmed == "" { + continue + } + if !uniqueEndpoints[trimmed] { + uniqueEndpoints[trimmed] = true + validEndpoints = append(validEndpoints, trimmed) + } + } + + if len(validEndpoints) == 0 { + return nil, fmt.Errorf("no valid endpoints found in: %s", address) + } + + return validEndpoints, nil +} + +// GetEnvDefaultIntSetting 获取环境变量的值,如果不存在或无效则返回默认值 +func GetEnvDefaultIntSetting(envVar, defaultValue string) string { + value := os.Getenv(envVar) + if value == "" { + return defaultValue + } + if num, err := strconv.Atoi(value); err != nil || num <= 0 { + logs.Warnf("Invalid %s value: %s, using default: %s", envVar, value, defaultValue) + return defaultValue + } + return value +} diff --git a/docker/.env.debug.example b/docker/.env.debug.example index 0efa3829..745117d9 100644 --- a/docker/.env.debug.example +++ b/docker/.env.debug.example @@ -75,6 +75,8 @@ export ES_ADDR="http://127.0.0.1:9200" export ES_VERSION="v8" export ES_USERNAME="" export ES_PASSWORD="" +export ES_NUMBER_OF_SHARDS = "1" +export ES_NUMBER_OF_REPLICAS = "1" export COZE_MQ_TYPE="nsq" # nsq / kafka / rmq diff --git a/docker/.env.example b/docker/.env.example index c1467213..7517e6fa 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -71,6 +71,8 @@ export ES_ADDR="http://elasticsearch:9200" export ES_VERSION="v8" export ES_USERNAME="" export ES_PASSWORD="" +export ES_NUMBER_OF_SHARDS = "1" +export ES_NUMBER_OF_REPLICAS = "1" export COZE_MQ_TYPE="nsq" # nsq / kafka / rmq