diff --git a/.opencode/agents/code-reviewer.md b/.opencode/agents/code-reviewer.md index a9b78fb..045d087 100644 --- a/.opencode/agents/code-reviewer.md +++ b/.opencode/agents/code-reviewer.md @@ -108,10 +108,11 @@ Commands MUST use `app.Context` for dependencies, NOT create clients directly: // GOOD func runRestore(appCtx *app.Context) error { appCtx.K8sClient // Use injected client - appCtx.ESClient appCtx.Config appCtx.Logger appCtx.Formatter + // Service clients created via factory methods after port-forwarding + esClient, err := appCtx.NewESClient(pf.LocalPort) } // BAD - Direct client creation in command diff --git a/AGENTS.md b/AGENTS.md index 5e5826f..58ad8fc 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -137,9 +137,10 @@ internal/ // GOOD func runRestore(appCtx *app.Context) error { appCtx.K8sClient // Kubernetes client - appCtx.ESClient // Elasticsearch client appCtx.Config // Configuration appCtx.Logger // Structured logger + // Service clients created via factory methods after port-forwarding + esClient, err := appCtx.NewESClient(pf.LocalPort) } ``` diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index a809f66..9883559 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -100,12 +100,13 @@ if err != nil { } // All dependencies available via appCtx -appCtx.K8sClient -appCtx.S3Client -appCtx.ESClient -appCtx.Config -appCtx.Logger -appCtx.Formatter +appCtx.K8sClient // Kubernetes client +appCtx.Config // Configuration +appCtx.Logger // Structured logger +appCtx.Formatter // Output formatter +appCtx.NewESClient(localPort) // Elasticsearch client factory +appCtx.NewS3Client(localPort) // S3/Minio client factory +appCtx.NewCHClient(backupAPIPort, dbPort) // ClickHouse client factory ``` **Dependency Rules**: @@ -226,9 +227,10 @@ func runList(appCtx *app.Context) error { // All dependencies available immediately appCtx.K8sClient appCtx.Config - appCtx.S3Client appCtx.Logger appCtx.Formatter + // Service clients created via factory methods with port-forwarded port + s3Client, err := appCtx.NewS3Client(pf.LocalPort) } ``` @@ -455,9 +457,11 @@ func runListSnapshots(globalFlags *config.CLIGlobalFlags) error { ```go // GOOD func runListSnapshots(appCtx *app.Context) error { - // Dependencies already created + // Direct dependencies appCtx.K8sClient - appCtx.ESClient + appCtx.Config + // Service clients created via factory methods after port-forwarding + esClient, err := appCtx.NewESClient(pf.LocalPort) } ``` diff --git a/README.md b/README.md index 8950270..14b97d9 100644 --- a/README.md +++ b/README.md @@ -319,7 +319,6 @@ elasticsearch: service: name: suse-observability-elasticsearch-master-headless port: 9200 - localPortForwardPort: 9200 restore: repository: sts-backup diff --git a/cmd/clickhouse/check_and_finalize.go b/cmd/clickhouse/check_and_finalize.go index 665a561..ff340e5 100644 --- a/cmd/clickhouse/check_and_finalize.go +++ b/cmd/clickhouse/check_and_finalize.go @@ -7,7 +7,7 @@ import ( "github.com/spf13/cobra" "github.com/stackvista/stackstate-backup-cli/cmd/cmdutils" "github.com/stackvista/stackstate-backup-cli/internal/app" - "github.com/stackvista/stackstate-backup-cli/internal/clients/clickhouse" + ch "github.com/stackvista/stackstate-backup-cli/internal/clients/clickhouse" "github.com/stackvista/stackstate-backup-cli/internal/foundation/config" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/portforward" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restore" @@ -45,12 +45,11 @@ It will check the restore status and if complete, execute post-restore tasks and } func runCheckAndFinalize(appCtx *app.Context) error { - // Setup port-forward + // Setup port-forward to ClickHouse Backup API pf, err := portforward.SetupPortForward( appCtx.K8sClient, appCtx.Namespace, appCtx.Config.Clickhouse.BackupService.Name, - appCtx.Config.Clickhouse.BackupService.LocalPortForwardPort, appCtx.Config.Clickhouse.BackupService.Port, appCtx.Logger, ) @@ -58,15 +57,22 @@ func runCheckAndFinalize(appCtx *app.Context) error { return err } defer close(pf.StopChan) - return checkAndFinalize(appCtx, checkOperationID, waitForRestore) + + // Create CH client with backup API port only + chClient, err := appCtx.NewCHClient(pf.LocalPort, 0) + if err != nil { + return fmt.Errorf("failed to create ClickHouse client: %w", err) + } + + return checkAndFinalize(chClient, appCtx, checkOperationID, waitForRestore) } // checkAndFinalize checks restore status and finalizes if complete -func checkAndFinalize(appCtx *app.Context, operationID string, waitForComplete bool) error { +func checkAndFinalize(chClient ch.Interface, appCtx *app.Context, operationID string, waitForComplete bool) error { // Check status appCtx.Logger.Println() appCtx.Logger.Infof("Checking restore status for operation: %s", operationID) - status, err := appCtx.CHClient.GetRestoreStatus(appCtx.Context, operationID) + status, err := chClient.GetRestoreStatus(appCtx.Context, operationID) if err != nil { return err } @@ -87,7 +93,7 @@ func checkAndFinalize(appCtx *app.Context, operationID string, waitForComplete b if waitForComplete { // Still running - wait appCtx.Logger.Infof("Restore is in progress, waiting for completion...") - return waitAndFinalize(appCtx, appCtx.CHClient, operationID) + return waitAndFinalize(appCtx, chClient, operationID) } // Just print status appCtx.Logger.Println() @@ -96,7 +102,7 @@ func checkAndFinalize(appCtx *app.Context, operationID string, waitForComplete b } // waitAndFinalize waits for restore completion and finalizes -func waitAndFinalize(appCtx *app.Context, chClient clickhouse.Interface, operationID string) error { +func waitAndFinalize(appCtx *app.Context, chClient ch.Interface, operationID string) error { restore.PrintAPIWaitingMessage("clickhouse", operationID, appCtx.Namespace, appCtx.Logger) // Wait for restore using shared utility @@ -157,7 +163,6 @@ func executePostRestoreSQL(appCtx *app.Context) error { appCtx.K8sClient, appCtx.Namespace, appCtx.Config.Clickhouse.Service.Name, - appCtx.Config.Clickhouse.Service.LocalPortForwardPort, appCtx.Config.Clickhouse.Service.Port, appCtx.Logger, ) @@ -166,8 +171,14 @@ func executePostRestoreSQL(appCtx *app.Context) error { } defer close(pf.StopChan) + // Create ClickHouse client with DB port only + chDBClient, err := appCtx.NewCHClient(0, pf.LocalPort) + if err != nil { + return fmt.Errorf("failed to create ClickHouse DB client: %w", err) + } + // Create ClickHouse SQL connection - conn, closeConn, err := appCtx.CHClient.Connect() + conn, closeConn, err := chDBClient.Connect() if err != nil { return fmt.Errorf("failed to connect to ClickHouse: %w", err) } diff --git a/cmd/clickhouse/list.go b/cmd/clickhouse/list.go index 05b0e4c..3f7abc2 100644 --- a/cmd/clickhouse/list.go +++ b/cmd/clickhouse/list.go @@ -29,7 +29,6 @@ func runList(appCtx *app.Context) error { appCtx.K8sClient, appCtx.Namespace, appCtx.Config.Clickhouse.BackupService.Name, - appCtx.Config.Clickhouse.BackupService.LocalPortForwardPort, appCtx.Config.Clickhouse.BackupService.Port, appCtx.Logger, ) @@ -38,11 +37,17 @@ func runList(appCtx *app.Context) error { } defer close(pf.StopChan) + // Create CH client with backup API port only + chClient, err := appCtx.NewCHClient(pf.LocalPort, 0) + if err != nil { + return fmt.Errorf("failed to create ClickHouse client: %w", err) + } + // List backups appCtx.Logger.Infof("Listing Clickhouse backups...") appCtx.Logger.Println() - backups, err := appCtx.CHClient.ListBackups(appCtx.Context) + backups, err := chClient.ListBackups(appCtx.Context) if err != nil { return fmt.Errorf("failed to list backups: %w", err) } diff --git a/cmd/clickhouse/restore.go b/cmd/clickhouse/restore.go index 9e6d820..e7549e5 100644 --- a/cmd/clickhouse/restore.go +++ b/cmd/clickhouse/restore.go @@ -96,7 +96,6 @@ func executeRestore(appCtx *app.Context, backupName string, waitForComplete bool appCtx.K8sClient, appCtx.Namespace, appCtx.Config.Clickhouse.BackupService.Name, - appCtx.Config.Clickhouse.BackupService.LocalPortForwardPort, appCtx.Config.Clickhouse.BackupService.Port, appCtx.Logger, ) @@ -105,10 +104,16 @@ func executeRestore(appCtx *app.Context, backupName string, waitForComplete bool } defer close(pf.StopChan) + // Create CH client with backup API port only + chClient, err := appCtx.NewCHClient(pf.LocalPort, 0) + if err != nil { + return fmt.Errorf("failed to create ClickHouse client: %w", err) + } + // Trigger restore appCtx.Logger.Println() appCtx.Logger.Infof("Triggering restore for backup: %s", backupName) - operationID, err := appCtx.CHClient.TriggerRestore(appCtx.Context, backupName) + operationID, err := chClient.TriggerRestore(appCtx.Context, backupName) if err != nil { return fmt.Errorf("failed to trigger restore: %w", err) } @@ -119,7 +124,7 @@ func executeRestore(appCtx *app.Context, backupName string, waitForComplete bool return nil } - return checkAndFinalize(appCtx, operationID, waitForComplete) + return checkAndFinalize(chClient, appCtx, operationID, waitForComplete) } // getLatestBackupForRestore retrieves the most recent backup @@ -129,7 +134,6 @@ func getLatestBackupForRestore(appCtx *app.Context) (string, error) { appCtx.K8sClient, appCtx.Namespace, appCtx.Config.Clickhouse.BackupService.Name, - appCtx.Config.Clickhouse.BackupService.LocalPortForwardPort, appCtx.Config.Clickhouse.BackupService.Port, appCtx.Logger, ) @@ -138,8 +142,14 @@ func getLatestBackupForRestore(appCtx *app.Context) (string, error) { } defer close(pf.StopChan) + // Create CH client with backup API port only + chClient, err := appCtx.NewCHClient(pf.LocalPort, 0) + if err != nil { + return "", fmt.Errorf("failed to create ClickHouse client: %w", err) + } + // List backups - backups, err := appCtx.CHClient.ListBackups(appCtx.Context) + backups, err := chClient.ListBackups(appCtx.Context) if err != nil { return "", fmt.Errorf("failed to list backups: %w", err) } diff --git a/cmd/elasticsearch/check_and_finalize.go b/cmd/elasticsearch/check_and_finalize.go index 6cd028a..d7f6da8 100644 --- a/cmd/elasticsearch/check_and_finalize.go +++ b/cmd/elasticsearch/check_and_finalize.go @@ -6,6 +6,7 @@ import ( "github.com/spf13/cobra" "github.com/stackvista/stackstate-backup-cli/cmd/cmdutils" "github.com/stackvista/stackstate-backup-cli/internal/app" + es "github.com/stackvista/stackstate-backup-cli/internal/clients/elasticsearch" "github.com/stackvista/stackstate-backup-cli/internal/foundation/config" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/portforward" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restore" @@ -39,24 +40,29 @@ If the restore is still running and --wait is specified, wait for completion bef func runCheckAndFinalize(appCtx *app.Context) error { // Setup port-forward to Elasticsearch serviceName := appCtx.Config.Elasticsearch.Service.Name - localPort := appCtx.Config.Elasticsearch.Service.LocalPortForwardPort remotePort := appCtx.Config.Elasticsearch.Service.Port - pf, err := portforward.SetupPortForward(appCtx.K8sClient, appCtx.Namespace, serviceName, localPort, remotePort, appCtx.Logger) + pf, err := portforward.SetupPortForward(appCtx.K8sClient, appCtx.Namespace, serviceName, remotePort, appCtx.Logger) if err != nil { return err } defer close(pf.StopChan) + // Create ES client with actual port + esClient, err := appCtx.NewESClient(pf.LocalPort) + if err != nil { + return fmt.Errorf("failed to create Elasticsearch client: %w", err) + } + repository := appCtx.Config.Elasticsearch.Restore.Repository - return checkAndFinalize(appCtx, repository, checkOperationID, checkWait) + return checkAndFinalize(esClient, appCtx, repository, checkOperationID, checkWait) } -func checkAndFinalize(appCtx *app.Context, repository, snapshotName string, waitForComplete bool) error { +func checkAndFinalize(esClient es.Interface, appCtx *app.Context, repository, snapshotName string, waitForComplete bool) error { // Get restore status appCtx.Logger.Infof("Checking restore status for snapshot: %s", snapshotName) - status, isComplete, err := appCtx.ESClient.GetRestoreStatus(repository, snapshotName) + status, isComplete, err := esClient.GetRestoreStatus(repository, snapshotName) if err != nil { return fmt.Errorf("failed to get restore status: %w", err) } @@ -87,7 +93,7 @@ func checkAndFinalize(appCtx *app.Context, repository, snapshotName string, wait if waitForComplete { appCtx.Logger.Println() - return waitAndFinalize(appCtx, repository, snapshotName) + return waitAndFinalize(esClient, appCtx, repository, snapshotName) } // Not waiting - print status and exit @@ -97,12 +103,12 @@ func checkAndFinalize(appCtx *app.Context, repository, snapshotName string, wait } // waitAndFinalize waits for restore to complete and finalizes (scale up) -func waitAndFinalize(appCtx *app.Context, repository, snapshotName string) error { +func waitAndFinalize(esClient es.Interface, appCtx *app.Context, repository, snapshotName string) error { restore.PrintAPIWaitingMessage("elasticsearch", snapshotName, appCtx.Namespace, appCtx.Logger) // Wait for restore to complete checkStatusFn := func() (string, bool, error) { - return appCtx.ESClient.GetRestoreStatus(repository, snapshotName) + return esClient.GetRestoreStatus(repository, snapshotName) } if err := restore.WaitForAPIRestore(checkStatusFn, 0, appCtx.Logger); err != nil { diff --git a/cmd/elasticsearch/configure.go b/cmd/elasticsearch/configure.go index c56e3c2..07614aa 100644 --- a/cmd/elasticsearch/configure.go +++ b/cmd/elasticsearch/configure.go @@ -29,28 +29,33 @@ func runConfigure(appCtx *app.Context) error { // Setup port-forward to Elasticsearch serviceName := appCtx.Config.Elasticsearch.Service.Name - localPort := appCtx.Config.Elasticsearch.Service.LocalPortForwardPort remotePort := appCtx.Config.Elasticsearch.Service.Port - pf, err := portforward.SetupPortForward(appCtx.K8sClient, appCtx.Namespace, serviceName, localPort, remotePort, appCtx.Logger) + pf, err := portforward.SetupPortForward(appCtx.K8sClient, appCtx.Namespace, serviceName, remotePort, appCtx.Logger) if err != nil { return err } defer close(pf.StopChan) + // Create ES client with actual port + esClient, err := appCtx.NewESClient(pf.LocalPort) + if err != nil { + return fmt.Errorf("failed to create Elasticsearch client: %w", err) + } + // Configure snapshot repository repo := appCtx.Config.Elasticsearch.SnapshotRepository // Always unregister existing repository to ensure clean state appCtx.Logger.Infof("Unregistering snapshot repository '%s'...", repo.Name) - if err := appCtx.ESClient.DeleteSnapshotRepository(repo.Name); err != nil { + if err := esClient.DeleteSnapshotRepository(repo.Name); err != nil { return fmt.Errorf("failed to unregister snapshot repository: %w", err) } appCtx.Logger.Successf("Snapshot repository unregistered successfully") appCtx.Logger.Infof("Configuring snapshot repository '%s' (bucket: %s)...", repo.Name, repo.Bucket) - err = appCtx.ESClient.ConfigureSnapshotRepository( + err = esClient.ConfigureSnapshotRepository( repo.Name, repo.Bucket, repo.Endpoint, @@ -68,7 +73,7 @@ func runConfigure(appCtx *app.Context) error { slm := appCtx.Config.Elasticsearch.SLM appCtx.Logger.Infof("Configuring SLM policy '%s'...", slm.Name) - err = appCtx.ESClient.ConfigureSLMPolicy( + err = esClient.ConfigureSLMPolicy( slm.Name, slm.Schedule, slm.SnapshotTemplateName, diff --git a/cmd/elasticsearch/configure_test.go b/cmd/elasticsearch/configure_test.go index b294c0e..268c254 100644 --- a/cmd/elasticsearch/configure_test.go +++ b/cmd/elasticsearch/configure_test.go @@ -92,7 +92,7 @@ func (m *mockESClientForConfigure) IndexExists(_ string) (bool, error) { return false, fmt.Errorf("not implemented") } -func (m *mockESClientForConfigure) RestoreSnapshot(_, _, _ string, _ bool) error { +func (m *mockESClientForConfigure) RestoreSnapshot(_, _, _ string) error { return fmt.Errorf("not implemented") } @@ -138,7 +138,6 @@ elasticsearch: service: name: elasticsearch-master port: 9200 - localPortForwardPort: 9200 restore: scaleDownLabelSelector: app=test indexPrefix: sts_ @@ -173,7 +172,6 @@ elasticsearch: service: name: elasticsearch-master port: 9200 - localPortForwardPort: 9200 restore: scaleDownLabelSelector: app=test indexPrefix: sts_ diff --git a/cmd/elasticsearch/list-indices.go b/cmd/elasticsearch/list-indices.go index cc931dc..2c4a8bf 100644 --- a/cmd/elasticsearch/list-indices.go +++ b/cmd/elasticsearch/list-indices.go @@ -24,19 +24,24 @@ func listIndicesCmd(globalFlags *config.CLIGlobalFlags) *cobra.Command { func runListIndices(appCtx *app.Context) error { // Setup port-forward to Elasticsearch serviceName := appCtx.Config.Elasticsearch.Service.Name - localPort := appCtx.Config.Elasticsearch.Service.LocalPortForwardPort remotePort := appCtx.Config.Elasticsearch.Service.Port - pf, err := portforward.SetupPortForward(appCtx.K8sClient, appCtx.Namespace, serviceName, localPort, remotePort, appCtx.Logger) + pf, err := portforward.SetupPortForward(appCtx.K8sClient, appCtx.Namespace, serviceName, remotePort, appCtx.Logger) if err != nil { return err } defer close(pf.StopChan) + // Create ES client with actual port + esClient, err := appCtx.NewESClient(pf.LocalPort) + if err != nil { + return fmt.Errorf("failed to create Elasticsearch client: %w", err) + } + // List indices with cat API appCtx.Logger.Infof("Fetching Elasticsearch indices...") - indices, err := appCtx.ESClient.ListIndicesDetailed() + indices, err := esClient.ListIndicesDetailed() if err != nil { return fmt.Errorf("failed to list indices: %w", err) } diff --git a/cmd/elasticsearch/list.go b/cmd/elasticsearch/list.go index 0c548c6..bb1961d 100644 --- a/cmd/elasticsearch/list.go +++ b/cmd/elasticsearch/list.go @@ -25,20 +25,25 @@ func listCmd(globalFlags *config.CLIGlobalFlags) *cobra.Command { func runListSnapshots(appCtx *app.Context) error { // Setup port-forward to Elasticsearch serviceName := appCtx.Config.Elasticsearch.Service.Name - localPort := appCtx.Config.Elasticsearch.Service.LocalPortForwardPort remotePort := appCtx.Config.Elasticsearch.Service.Port - pf, err := portforward.SetupPortForward(appCtx.K8sClient, appCtx.Namespace, serviceName, localPort, remotePort, appCtx.Logger) + pf, err := portforward.SetupPortForward(appCtx.K8sClient, appCtx.Namespace, serviceName, remotePort, appCtx.Logger) if err != nil { return err } defer close(pf.StopChan) + // Create ES client with actual port + esClient, err := appCtx.NewESClient(pf.LocalPort) + if err != nil { + return fmt.Errorf("failed to create Elasticsearch client: %w", err) + } + // List snapshots repository := appCtx.Config.Elasticsearch.Restore.Repository appCtx.Logger.Infof("Fetching snapshots from repository '%s'...", repository) - snapshots, err := appCtx.ESClient.ListSnapshots(repository) + snapshots, err := esClient.ListSnapshots(repository) if err != nil { return fmt.Errorf("failed to list snapshots: %w", err) } diff --git a/cmd/elasticsearch/list_indices_test.go b/cmd/elasticsearch/list_indices_test.go index 6b0bd21..7785e50 100644 --- a/cmd/elasticsearch/list_indices_test.go +++ b/cmd/elasticsearch/list_indices_test.go @@ -53,7 +53,7 @@ func (m *mockESClientForIndices) IndexExists(_ string) (bool, error) { return false, fmt.Errorf("not implemented") } -func (m *mockESClientForIndices) RestoreSnapshot(_, _, _ string, _ bool) error { +func (m *mockESClientForIndices) RestoreSnapshot(_, _, _ string) error { return fmt.Errorf("not implemented") } @@ -105,7 +105,6 @@ elasticsearch: service: name: elasticsearch-master port: 9200 - localPortForwardPort: 9200 restore: scaleDownLabelSelector: app=test indexPrefix: sts_ diff --git a/cmd/elasticsearch/list_test.go b/cmd/elasticsearch/list_test.go index 81684d6..e38f98a 100644 --- a/cmd/elasticsearch/list_test.go +++ b/cmd/elasticsearch/list_test.go @@ -28,7 +28,6 @@ minio: service: name: minio port: 9000 - localPortForwardPort: 9000 accessKey: minioadmin secretKey: minioadmin stackgraph: @@ -95,11 +94,9 @@ clickhouse: service: name: "clickhouse" port: 9000 - localPortForwardPort: 9000 backupService: name: "clickhouse" port: 7171 - localPortForwardPort: 7171 database: "default" username: "default" password: "password" @@ -140,7 +137,7 @@ func (m *mockESClient) IndexExists(_ string) (bool, error) { return false, fmt.Errorf("not implemented") } -func (m *mockESClient) RestoreSnapshot(_, _, _ string, _ bool) error { +func (m *mockESClient) RestoreSnapshot(_, _, _ string) error { return fmt.Errorf("not implemented") } @@ -179,7 +176,6 @@ elasticsearch: service: name: elasticsearch-master port: 9200 - localPortForwardPort: 9200 restore: scaleDownLabelSelector: app=test indexPrefix: sts_ diff --git a/cmd/elasticsearch/restore.go b/cmd/elasticsearch/restore.go index 6b1c88b..1366cf0 100644 --- a/cmd/elasticsearch/restore.go +++ b/cmd/elasticsearch/restore.go @@ -53,22 +53,27 @@ func restoreCmd(globalFlags *config.CLIGlobalFlags) *cobra.Command { func runRestore(appCtx *app.Context) error { // Setup port-forward to Elasticsearch (needed for both snapshot selection and restore) serviceName := appCtx.Config.Elasticsearch.Service.Name - localPort := appCtx.Config.Elasticsearch.Service.LocalPortForwardPort remotePort := appCtx.Config.Elasticsearch.Service.Port - pf, err := portforward.SetupPortForward(appCtx.K8sClient, appCtx.Namespace, serviceName, localPort, remotePort, appCtx.Logger) + pf, err := portforward.SetupPortForward(appCtx.K8sClient, appCtx.Namespace, serviceName, remotePort, appCtx.Logger) if err != nil { return err } defer close(pf.StopChan) + // Create ES client with actual port + esClient, err := appCtx.NewESClient(pf.LocalPort) + if err != nil { + return fmt.Errorf("failed to create Elasticsearch client: %w", err) + } + repository := appCtx.Config.Elasticsearch.Restore.Repository // Determine snapshot name (either from flag or latest) selectedSnapshot := snapshotName if useLatest { appCtx.Logger.Infof("Fetching latest snapshot from repository '%s'...", repository) - latestSnapshot, err := getLatestSnapshot(appCtx, repository) + latestSnapshot, err := getLatestSnapshot(esClient, repository) if err != nil { return err } @@ -108,14 +113,14 @@ func runRestore(appCtx *app.Context) error { // Delete all STS indices before restore appCtx.Logger.Println() - if err := deleteAllSTSIndices(appCtx); err != nil { + if err := deleteAllSTSIndices(esClient, appCtx); err != nil { return err } // Trigger async restore appCtx.Logger.Println() appCtx.Logger.Infof("Triggering restore for snapshot: %s", selectedSnapshot) - if err := appCtx.ESClient.RestoreSnapshot(repository, selectedSnapshot, appCtx.Config.Elasticsearch.Restore.IndicesPattern); err != nil { + if err := esClient.RestoreSnapshot(repository, selectedSnapshot, appCtx.Config.Elasticsearch.Restore.IndicesPattern); err != nil { return fmt.Errorf("failed to trigger restore: %w", err) } appCtx.Logger.Successf("Restore triggered successfully") @@ -125,12 +130,12 @@ func runRestore(appCtx *app.Context) error { return nil } - return checkAndFinalize(appCtx, repository, selectedSnapshot, !runBackground) + return checkAndFinalize(esClient, appCtx, repository, selectedSnapshot, !runBackground) } // getLatestSnapshot retrieves the most recent snapshot from the repository -func getLatestSnapshot(appCtx *app.Context, repository string) (string, error) { - snapshots, err := appCtx.ESClient.ListSnapshots(repository) +func getLatestSnapshot(esClient es.Interface, repository string) (string, error) { + snapshots, err := esClient.ListSnapshots(repository) if err != nil { return "", fmt.Errorf("failed to list snapshots: %w", err) } @@ -148,9 +153,9 @@ func getLatestSnapshot(appCtx *app.Context, repository string) (string, error) { } // deleteAllSTSIndices deletes all STS indices including datastream rollover if needed -func deleteAllSTSIndices(appCtx *app.Context) error { +func deleteAllSTSIndices(esClient es.Interface, appCtx *app.Context) error { appCtx.Logger.Infof("Fetching current Elasticsearch indices...") - allIndices, err := appCtx.ESClient.ListIndices("*") + allIndices, err := esClient.ListIndices("*") if err != nil { return fmt.Errorf("failed to list indices: %w", err) } @@ -170,7 +175,7 @@ func deleteAllSTSIndices(appCtx *app.Context) error { // Check for datastream and rollover if needed if hasDatastreamIndices(stsIndices, appCtx.Config.Elasticsearch.Restore.DatastreamIndexPrefix) { appCtx.Logger.Infof("Rolling over datastream '%s'...", appCtx.Config.Elasticsearch.Restore.DatastreamName) - if err := appCtx.ESClient.RolloverDatastream(appCtx.Config.Elasticsearch.Restore.DatastreamName); err != nil { + if err := esClient.RolloverDatastream(appCtx.Config.Elasticsearch.Restore.DatastreamName); err != nil { return fmt.Errorf("failed to rollover datastream: %w", err) } appCtx.Logger.Successf("Datastream rolled over successfully") @@ -179,7 +184,7 @@ func deleteAllSTSIndices(appCtx *app.Context) error { // Delete all indices appCtx.Logger.Infof("Deleting %d index(es)...", len(stsIndices)) for _, index := range stsIndices { - if err := deleteIndexWithVerification(appCtx.ESClient, index, appCtx.Logger); err != nil { + if err := deleteIndexWithVerification(esClient, index, appCtx.Logger); err != nil { return err } } diff --git a/cmd/settings/list.go b/cmd/settings/list.go index 265e0d5..89b99a7 100644 --- a/cmd/settings/list.go +++ b/cmd/settings/list.go @@ -119,15 +119,20 @@ type BackupFileInfo struct { func getBackupListFromS3(appCtx *app.Context) ([]BackupFileInfo, error) { // Setup port-forward to Minio serviceName := appCtx.Config.Minio.Service.Name - localPort := appCtx.Config.Minio.Service.LocalPortForwardPort remotePort := appCtx.Config.Minio.Service.Port - pf, err := portforward.SetupPortForward(appCtx.K8sClient, appCtx.Namespace, serviceName, localPort, remotePort, appCtx.Logger) + pf, err := portforward.SetupPortForward(appCtx.K8sClient, appCtx.Namespace, serviceName, remotePort, appCtx.Logger) if err != nil { return nil, err } defer close(pf.StopChan) + // Create S3 client with actual port + s3Client, err := appCtx.NewS3Client(pf.LocalPort) + if err != nil { + return nil, fmt.Errorf("failed to create S3 client: %w", err) + } + // List objects in bucket bucket := appCtx.Config.Settings.Bucket prefix := appCtx.Config.Settings.S3Prefix @@ -139,7 +144,7 @@ func getBackupListFromS3(appCtx *app.Context) ([]BackupFileInfo, error) { Prefix: aws.String(prefix), } - result, err := appCtx.S3Client.ListObjectsV2(context.Background(), input) + result, err := s3Client.ListObjectsV2(context.Background(), input) if err != nil { return nil, fmt.Errorf("failed to list S3 objects: %w", err) } diff --git a/cmd/stackgraph/list.go b/cmd/stackgraph/list.go index 330fa8c..d5ef289 100644 --- a/cmd/stackgraph/list.go +++ b/cmd/stackgraph/list.go @@ -30,15 +30,20 @@ func listCmd(globalFlags *config.CLIGlobalFlags) *cobra.Command { func runList(appCtx *app.Context) error { // Setup port-forward to Minio serviceName := appCtx.Config.Minio.Service.Name - localPort := appCtx.Config.Minio.Service.LocalPortForwardPort remotePort := appCtx.Config.Minio.Service.Port - pf, err := portforward.SetupPortForward(appCtx.K8sClient, appCtx.Namespace, serviceName, localPort, remotePort, appCtx.Logger) + pf, err := portforward.SetupPortForward(appCtx.K8sClient, appCtx.Namespace, serviceName, remotePort, appCtx.Logger) if err != nil { return err } defer close(pf.StopChan) + // Create S3 client with actual port + s3Client, err := appCtx.NewS3Client(pf.LocalPort) + if err != nil { + return fmt.Errorf("failed to create S3 client: %w", err) + } + // List objects in bucket bucket := appCtx.Config.Stackgraph.Bucket prefix := appCtx.Config.Stackgraph.S3Prefix @@ -51,7 +56,7 @@ func runList(appCtx *app.Context) error { Prefix: aws.String(prefix), } - result, err := appCtx.S3Client.ListObjectsV2(context.Background(), input) + result, err := s3Client.ListObjectsV2(context.Background(), input) if err != nil { return fmt.Errorf("failed to list S3 objects: %w", err) } diff --git a/cmd/stackgraph/restore.go b/cmd/stackgraph/restore.go index 1f61c42..32c0f3b 100644 --- a/cmd/stackgraph/restore.go +++ b/cmd/stackgraph/restore.go @@ -146,10 +146,9 @@ func waitAndCleanupRestoreJob(k8sClient *k8s.Client, namespace, jobName string, func getLatestBackup(k8sClient *k8s.Client, namespace string, config *config.Config, log *logger.Logger) (string, error) { // Setup port-forward to Minio serviceName := config.Minio.Service.Name - localPort := config.Minio.Service.LocalPortForwardPort remotePort := config.Minio.Service.Port - pf, err := portforward.SetupPortForward(k8sClient, namespace, serviceName, localPort, remotePort, log) + pf, err := portforward.SetupPortForward(k8sClient, namespace, serviceName, remotePort, log) if err != nil { return "", err } diff --git a/cmd/victoriametrics/list.go b/cmd/victoriametrics/list.go index 839db9e..7110eb6 100644 --- a/cmd/victoriametrics/list.go +++ b/cmd/victoriametrics/list.go @@ -35,15 +35,20 @@ func listCmd(globalFlags *config.CLIGlobalFlags) *cobra.Command { func runList(appCtx *app.Context) error { // Setup port-forward to Minio serviceName := appCtx.Config.Minio.Service.Name - localPort := appCtx.Config.Minio.Service.LocalPortForwardPort remotePort := appCtx.Config.Minio.Service.Port - pf, err := portforward.SetupPortForward(appCtx.K8sClient, appCtx.Namespace, serviceName, localPort, remotePort, appCtx.Logger) + pf, err := portforward.SetupPortForward(appCtx.K8sClient, appCtx.Namespace, serviceName, remotePort, appCtx.Logger) if err != nil { return err } defer close(pf.StopChan) + // Create S3 client with actual port + s3Client, err := appCtx.NewS3Client(pf.LocalPort) + if err != nil { + return fmt.Errorf("failed to create S3 client: %w", err) + } + var vmBackups []s3client.Object // List objects in bucket appCtx.Logger.Infof("Listing VictoriaMetrics backups in bucket ...") @@ -63,7 +68,7 @@ func runList(appCtx *app.Context) error { Delimiter: aws.String("/"), } - result, err := appCtx.S3Client.ListObjectsV2(context.Background(), input) + result, err := s3Client.ListObjectsV2(context.Background(), input) if err != nil { return fmt.Errorf("failed to list S3 objects: %w", err) } @@ -71,7 +76,7 @@ func runList(appCtx *app.Context) error { for _, key := range s3client.FilterByCommonPrefix(result.CommonPrefixes) { vmBackups = append(vmBackups, s3client.Object{ Key: fmt.Sprintf("%s/%s", bucket, key.Key), - LastModified: getVMBackupTime(appCtx.S3Client, bucket, key.Key), + LastModified: getVMBackupTime(s3Client, bucket, key.Key), }) } } diff --git a/cmd/victoriametrics/restore.go b/cmd/victoriametrics/restore.go index 231eb81..1c88409 100644 --- a/cmd/victoriametrics/restore.go +++ b/cmd/victoriametrics/restore.go @@ -143,10 +143,9 @@ func waitAndCleanupRestoreJob(k8sClient *k8s.Client, namespace, jobName string, func getLatestBackup(k8sClient *k8s.Client, namespace string, config *config.Config, log *logger.Logger) (string, error) { // Setup port-forward to Minio serviceName := config.Minio.Service.Name - localPort := config.Minio.Service.LocalPortForwardPort remotePort := config.Minio.Service.Port - pf, err := portforward.SetupPortForward(k8sClient, namespace, serviceName, localPort, remotePort, log) + pf, err := portforward.SetupPortForward(k8sClient, namespace, serviceName, remotePort, log) if err != nil { return "", err } diff --git a/internal/app/app.go b/internal/app/app.go index 7cd1d94..9753b00 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -18,9 +18,6 @@ import ( type Context struct { K8sClient *k8s.Client Namespace string - S3Client s3.Interface - ESClient elasticsearch.Interface - CHClient clickhouse.Interface Config *config.Config Logger *logger.Logger Formatter *output.Formatter @@ -40,30 +37,6 @@ func NewContext(flags *config.CLIGlobalFlags) (*Context, error) { return nil, fmt.Errorf("failed to load configuration: %w", err) } - // Create S3 client - endpoint := fmt.Sprintf("http://localhost:%d", cfg.Minio.Service.LocalPortForwardPort) - s3Client, err := s3.NewClient(endpoint, cfg.Minio.AccessKey, cfg.Minio.SecretKey) - if err != nil { - return nil, err - } - - // Create Elasticsearch client - esClient, err := elasticsearch.NewClient(fmt.Sprintf("http://localhost:%d", cfg.Elasticsearch.Service.LocalPortForwardPort)) - if err != nil { - return nil, fmt.Errorf("failed to create Elasticsearch client: %w", err) - } - - // Create ClickHouse client - chClient, err := clickhouse.NewClient( - fmt.Sprintf("http://localhost:%d", cfg.Clickhouse.BackupService.LocalPortForwardPort), - fmt.Sprintf("localhost:%d", cfg.Clickhouse.Service.LocalPortForwardPort), - cfg.Clickhouse.Database, - cfg.Clickhouse.Username, - cfg.Clickhouse.Password) - if err != nil { - return nil, fmt.Errorf("failed to create ClickHouse client: %w", err) - } - // Format and print backups formatter := output.NewFormatter(os.Stdout, flags.OutputFormat) @@ -71,11 +44,39 @@ func NewContext(flags *config.CLIGlobalFlags) (*Context, error) { K8sClient: k8sClient, Namespace: flags.Namespace, Config: cfg, - S3Client: s3Client, - ESClient: esClient, - CHClient: chClient, Logger: logger.New(flags.Quiet, flags.Debug), Formatter: formatter, Context: context.Background(), }, nil } + +// NewS3Client creates an S3 client connecting to the given local port-forwarded port +func (c *Context) NewS3Client(localPort int) (s3.Interface, error) { + endpoint := fmt.Sprintf("http://localhost:%d", localPort) + return s3.NewClient(endpoint, c.Config.Minio.AccessKey, c.Config.Minio.SecretKey) +} + +// NewESClient creates an Elasticsearch client connecting to the given local port-forwarded port +func (c *Context) NewESClient(localPort int) (elasticsearch.Interface, error) { + return elasticsearch.NewClient(fmt.Sprintf("http://localhost:%d", localPort)) +} + +// NewCHClient creates a ClickHouse client. Pass backupAPIPort for backup API access, +// and dbPort for SQL access. Use 0 for either if not needed. +func (c *Context) NewCHClient(backupAPIPort, dbPort int) (clickhouse.Interface, error) { + backupAPIURL := "" + if backupAPIPort > 0 { + backupAPIURL = fmt.Sprintf("http://localhost:%d", backupAPIPort) + } + dbAddr := "" + if dbPort > 0 { + dbAddr = fmt.Sprintf("localhost:%d", dbPort) + } + return clickhouse.NewClient( + backupAPIURL, + dbAddr, + c.Config.Clickhouse.Database, + c.Config.Clickhouse.Username, + c.Config.Clickhouse.Password, + ) +} diff --git a/internal/clients/clickhouse/client.go b/internal/clients/clickhouse/client.go index 1450098..392901f 100644 --- a/internal/clients/clickhouse/client.go +++ b/internal/clients/clickhouse/client.go @@ -54,11 +54,8 @@ type ActionResponse struct { // NewClient creates a new ClickHouse client with both Backup API and SQL support func NewClient(backupAPI, addr, db, username, password string) (*Client, error) { - if backupAPI == "" { - return nil, fmt.Errorf("backupAPIURL cannot be empty") - } - if addr == "" { - return nil, fmt.Errorf("clickhouseAddr cannot be empty") + if backupAPI == "" && addr == "" { + return nil, fmt.Errorf("at least one of backupAPIURL or clickhouseAddr must be provided") } if db == "" { return nil, fmt.Errorf("clickhouseDatabase cannot be empty") diff --git a/internal/clients/clickhouse/client_test.go b/internal/clients/clickhouse/client_test.go index 431ea21..db9be5b 100644 --- a/internal/clients/clickhouse/client_test.go +++ b/internal/clients/clickhouse/client_test.go @@ -22,9 +22,9 @@ func TestNewClient(t *testing.T) { wantError: false, }, { - name: "empty backupAPIURL", + name: "empty backupAPIURL with addr", baseURL: "", - wantError: true, + wantError: false, }, } diff --git a/internal/clients/k8s/client.go b/internal/clients/k8s/client.go index 14ac751..08b2f83 100644 --- a/internal/clients/k8s/client.go +++ b/internal/clients/k8s/client.go @@ -11,6 +11,7 @@ import ( "os" "path" "path/filepath" + "time" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -63,14 +64,15 @@ func NewClient(kubeconfigPath string, debug bool) (*Client, error) { }, nil } -// PortForwardService creates a port-forward to a Kubernetes service -func (c *Client) PortForwardService(namespace, serviceName string, localPort, remotePort int) (chan struct{}, chan struct{}, error) { +// PortForwardService creates a port-forward to a Kubernetes service. +// It uses OS dynamic port allocation (port 0) and returns the actual allocated local port. +func (c *Client) PortForwardService(namespace, serviceName string, remotePort int) (chan struct{}, int, error) { ctx := context.Background() // Get service to find pods svc, err := c.clientset.CoreV1().Services(namespace).Get(ctx, serviceName, metav1.GetOptions{}) if err != nil { - return nil, nil, fmt.Errorf("failed to get service: %w", err) + return nil, 0, fmt.Errorf("failed to get service: %w", err) } // Find pod matching service selector @@ -80,11 +82,11 @@ func (c *Client) PortForwardService(namespace, serviceName string, localPort, re }), }) if err != nil { - return nil, nil, fmt.Errorf("failed to list pods: %w", err) + return nil, 0, fmt.Errorf("failed to list pods: %w", err) } if len(podList.Items) == 0 { - return nil, nil, fmt.Errorf("no pods found for service %s", serviceName) + return nil, 0, fmt.Errorf("no pods found for service %s", serviceName) } // Find a running pod @@ -97,26 +99,30 @@ func (c *Client) PortForwardService(namespace, serviceName string, localPort, re } if targetPod == nil { - return nil, nil, fmt.Errorf("no running pods found for service %s", serviceName) + return nil, 0, fmt.Errorf("no running pods found for service %s", serviceName) } // Setup port-forward - return c.PortForwardPod(namespace, targetPod.Name, localPort, remotePort) + return c.PortForwardPod(namespace, targetPod.Name, remotePort) } -// PortForwardPod creates a port-forward to a specific pod -func (c *Client) PortForwardPod(namespace, podName string, localPort, remotePort int) (chan struct{}, chan struct{}, error) { +// portForwardReadyTimeout is the maximum time to wait for a port-forward to become ready. +const portForwardReadyTimeout = 60 * time.Second + +// PortForwardPod creates a port-forward to a specific pod using OS dynamic port allocation. +// It waits for the port-forward to be ready and returns the actual allocated local port. +func (c *Client) PortForwardPod(namespace, podName string, remotePort int) (chan struct{}, int, error) { reqPath := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", namespace, podName) hostIP := c.restConfig.Host pfURL, err := url.Parse(hostIP) if err != nil { - return nil, nil, fmt.Errorf("failed to parse host: %w", err) + return nil, 0, fmt.Errorf("failed to parse host: %w", err) } // Preserve any existing path prefix (e.g. from Rancher/OpenShift proxy URLs) pfURL.Path = path.Join(pfURL.Path, reqPath) transport, upgrader, err := spdy.RoundTripperFor(c.restConfig) if err != nil { - return nil, nil, fmt.Errorf("failed to create round tripper: %w", err) + return nil, 0, fmt.Errorf("failed to create round tripper: %w", err) } dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, http.MethodPost, pfURL) @@ -124,7 +130,8 @@ func (c *Client) PortForwardPod(namespace, podName string, localPort, remotePort stopChan := make(chan struct{}, 1) readyChan := make(chan struct{}) - ports := []string{fmt.Sprintf("%d:%d", localPort, remotePort)} + // Use port 0 so the OS picks a free local port + ports := []string{fmt.Sprintf("0:%d", remotePort)} // Use discard writers if debug is disabled to suppress port-forward output outWriter := io.Discard @@ -136,18 +143,49 @@ func (c *Client) PortForwardPod(namespace, podName string, localPort, remotePort fw, err := portforward.New(dialer, ports, stopChan, readyChan, outWriter, errWriter) if err != nil { - return nil, nil, fmt.Errorf("failed to create port forwarder: %w", err) + return nil, 0, fmt.Errorf("failed to create port forwarder: %w", err) } + errChan := make(chan error, 1) go func() { - if err := fw.ForwardPorts(); err != nil { - if c.debug { - _, _ = fmt.Fprintf(os.Stderr, "Port forward error: %v\n", err) - } + errChan <- fw.ForwardPorts() + }() + + // Wait for port-forward to be ready, fail, or timeout + select { + case <-readyChan: + // Port-forward is ready + case err := <-errChan: + if err != nil { + return nil, 0, fmt.Errorf("port forward failed: %w", err) + } + return nil, 0, fmt.Errorf("port forward closed unexpectedly") + case <-time.After(portForwardReadyTimeout): + close(stopChan) + return nil, 0, fmt.Errorf("timed out waiting for port-forward to become ready after %s", portForwardReadyTimeout) + } + + // Drain errChan in background to avoid silent failures if the port-forward + // dies after becoming ready (e.g., pod eviction, network partition). + go func() { + if err := <-errChan; err != nil { + _, _ = fmt.Fprintf(os.Stderr, "port-forward error: %v\n", err) } }() - return stopChan, readyChan, nil + // Get the actual allocated local port + forwardedPorts, err := fw.GetPorts() + if err != nil { + close(stopChan) + return nil, 0, fmt.Errorf("failed to get forwarded ports: %w", err) + } + if len(forwardedPorts) == 0 { + close(stopChan) + return nil, 0, fmt.Errorf("no ports were forwarded") + } + + actualLocalPort := int(forwardedPorts[0].Local) + return stopChan, actualLocalPort, nil } const ( diff --git a/internal/clients/k8s/client_test.go b/internal/clients/k8s/client_test.go index d9a680f..cefd882 100644 --- a/internal/clients/k8s/client_test.go +++ b/internal/clients/k8s/client_test.go @@ -356,7 +356,7 @@ func TestClient_PortForwardService_ServiceNotFound(t *testing.T) { clientset: fakeClient, } - _, _, err := client.PortForwardService("test-ns", "nonexistent-svc", 8080, 9200) + _, _, err := client.PortForwardService("test-ns", "nonexistent-svc", 9200) assert.Error(t, err) assert.Contains(t, err.Error(), "failed to get service") } @@ -383,7 +383,7 @@ func TestClient_PortForwardService_NoPodsFound(t *testing.T) { clientset: fakeClient, } - _, _, err = client.PortForwardService("test-ns", "test-svc", 8080, 9200) + _, _, err = client.PortForwardService("test-ns", "test-svc", 9200) assert.Error(t, err) assert.Contains(t, err.Error(), "no pods found for service") } @@ -426,7 +426,7 @@ func TestClient_PortForwardService_NoRunningPods(t *testing.T) { clientset: fakeClient, } - _, _, err = client.PortForwardService("test-ns", "test-svc", 8080, 9200) + _, _, err = client.PortForwardService("test-ns", "test-svc", 9200) assert.Error(t, err) assert.Contains(t, err.Error(), "no running pods found for service") } diff --git a/internal/clients/k8s/interface.go b/internal/clients/k8s/interface.go index 2ddc1a4..1a8266c 100644 --- a/internal/clients/k8s/interface.go +++ b/internal/clients/k8s/interface.go @@ -10,7 +10,7 @@ type Interface interface { Clientset() kubernetes.Interface // Port forwarding operations - PortForwardService(namespace, serviceName string, localPort, remotePort int) (stopChan chan struct{}, readyChan chan struct{}, err error) + PortForwardService(namespace, serviceName string, remotePort int) (stopChan chan struct{}, actualLocalPort int, err error) // Deployment scaling operations ScaleDownDeployments(namespace, labelSelector string) ([]AppsScale, error) diff --git a/internal/foundation/config/config.go b/internal/foundation/config/config.go index 64e01d9..b3e5121 100644 --- a/internal/foundation/config/config.go +++ b/internal/foundation/config/config.go @@ -72,9 +72,8 @@ type SLMConfig struct { // ServiceConfig holds service connection details type ServiceConfig struct { - Name string `yaml:"name" validate:"required"` - Port int `yaml:"port" validate:"required,min=1,max=65535"` - LocalPortForwardPort int `yaml:"localPortForwardPort" validate:"required,min=1,max=65535"` + Name string `yaml:"name" validate:"required"` + Port int `yaml:"port" validate:"required,min=1,max=65535"` } // MinioConfig holds Minio-specific configuration diff --git a/internal/foundation/config/config_test.go b/internal/foundation/config/config_test.go index 48a13a1..eff2ba2 100644 --- a/internal/foundation/config/config_test.go +++ b/internal/foundation/config/config_test.go @@ -106,8 +106,6 @@ func TestLoadConfig_CompleteConfiguration(t *testing.T) { // Service config assert.Equal(t, "suse-observability-elasticsearch-master-headless", config.Elasticsearch.Service.Name) assert.Equal(t, 9200, config.Elasticsearch.Service.Port) - assert.Equal(t, 9200, config.Elasticsearch.Service.LocalPortForwardPort) - // Restore config assert.Equal(t, "observability.suse.com/scalable-during-es-restore=true", config.Elasticsearch.Restore.ScaleDownLabelSelector) assert.Equal(t, "sts", config.Elasticsearch.Restore.IndexPrefix) @@ -328,9 +326,8 @@ func TestConfig_StructValidation(t *testing.T) { config: &Config{ Elasticsearch: ElasticsearchConfig{ Service: ServiceConfig{ - Name: "es-master", - Port: 9200, - LocalPortForwardPort: 9200, + Name: "es-master", + Port: 9200, }, Restore: RestoreConfig{ ScaleDownLabelSelector: "app=test", @@ -361,9 +358,8 @@ func TestConfig_StructValidation(t *testing.T) { Minio: MinioConfig{ Enabled: true, Service: ServiceConfig{ - Name: "minio", - Port: 9000, - LocalPortForwardPort: 9000, + Name: "minio", + Port: 9000, }, AccessKey: "minioadmin", SecretKey: "minioadmin", @@ -455,14 +451,12 @@ func TestConfig_StructValidation(t *testing.T) { }, Clickhouse: ClickhouseConfig{ Service: ServiceConfig{ - Name: "clickhouse", - Port: 9000, - LocalPortForwardPort: 9000, + Name: "clickhouse", + Port: 9000, }, BackupService: ServiceConfig{ - Name: "clickhouse", - Port: 7171, - LocalPortForwardPort: 7171, + Name: "clickhouse", + Port: 7171, }, Database: "default", Username: "default", @@ -479,9 +473,8 @@ func TestConfig_StructValidation(t *testing.T) { config: &Config{ Elasticsearch: ElasticsearchConfig{ Service: ServiceConfig{ - Name: "es-master", - Port: 0, // Invalid - LocalPortForwardPort: 9200, + Name: "es-master", + Port: 0, // Invalid }, Restore: RestoreConfig{ ScaleDownLabelSelector: "app=test", @@ -517,9 +510,8 @@ func TestConfig_StructValidation(t *testing.T) { config: &Config{ Elasticsearch: ElasticsearchConfig{ Service: ServiceConfig{ - Name: "es-master", - Port: 9200, - LocalPortForwardPort: 9200, + Name: "es-master", + Port: 9200, }, Restore: RestoreConfig{ ScaleDownLabelSelector: "app=test", diff --git a/internal/foundation/config/testdata/validConfigMapConfig.yaml b/internal/foundation/config/testdata/validConfigMapConfig.yaml index ab505db..76aca9a 100644 --- a/internal/foundation/config/testdata/validConfigMapConfig.yaml +++ b/internal/foundation/config/testdata/validConfigMapConfig.yaml @@ -42,8 +42,6 @@ elasticsearch: name: suse-observability-elasticsearch-master-headless # Port number for Elasticsearch HTTP API port: 9200 - # Local port to use for port-forwarding (can be same as port) - localPortForwardPort: 9200 # Restore operation configuration restore: @@ -68,7 +66,6 @@ minio: service: name: suse-observability-minio port: 9000 - localPortForwardPort: 9000 # Access credentials (typically from Kubernetes secret) accessKey: minioadmin secretKey: minioadmin @@ -168,11 +165,9 @@ clickhouse: service: name: "suse-observability-clickhouse-shard0-0" port: 9000 - localPortForwardPort: 9000 backupService: name: "suse-observability-clickhouse-shard0-0" port: 7171 - localPortForwardPort: 7171 database: "default" username: "default" password: "password" diff --git a/internal/foundation/config/testdata/validConfigMapOnly.yaml b/internal/foundation/config/testdata/validConfigMapOnly.yaml index fd61761..dbf8ea1 100644 --- a/internal/foundation/config/testdata/validConfigMapOnly.yaml +++ b/internal/foundation/config/testdata/validConfigMapOnly.yaml @@ -49,8 +49,6 @@ elasticsearch: name: suse-observability-elasticsearch-master-headless # Port number for Elasticsearch HTTP API port: 9200 - # Local port to use for port-forwarding (can be same as port) - localPortForwardPort: 9200 # Restore operation configuration restore: @@ -74,7 +72,6 @@ minio: service: name: suse-observability-minio port: 9000 - localPortForwardPort: 9000 accessKey: minioadmin secretKey: minioadmin @@ -158,11 +155,9 @@ clickhouse: service: name: "suse-observability-clickhouse-shard0-0" port: 9000 - localPortForwardPort: 9000 backupService: name: "suse-observability-clickhouse-shard0-0" port: 7171 - localPortForwardPort: 7171 database: "default" username: "default" password: "password" diff --git a/internal/orchestration/portforward/portforward.go b/internal/orchestration/portforward/portforward.go index 032361c..3225603 100644 --- a/internal/orchestration/portforward/portforward.go +++ b/internal/orchestration/portforward/portforward.go @@ -10,36 +10,31 @@ import ( // Conn contains the channels needed to manage a port-forward connection type Conn struct { StopChan chan struct{} - ReadyChan <-chan struct{} LocalPort int } // SetupPortForward establishes a port-forward to a Kubernetes service and waits for it to be ready. -// It returns a Conn containing the stop and ready channels, plus the local port. +// It uses OS dynamic port allocation so the local port is determined automatically. +// It returns a Conn containing the stop channel and the actual local port. // The caller is responsible for closing the StopChan when done. func SetupPortForward( k8sClient *k8s.Client, namespace string, serviceName string, - localPort int, remotePort int, log *logger.Logger, ) (*Conn, error) { log.Infof("Setting up port-forward to %s:%d in namespace %s...", serviceName, remotePort, namespace) - stopChan, readyChan, err := k8sClient.PortForwardService(namespace, serviceName, localPort, remotePort) + stopChan, actualLocalPort, err := k8sClient.PortForwardService(namespace, serviceName, remotePort) if err != nil { return nil, fmt.Errorf("failed to setup port-forward: %w", err) } - // Wait for port-forward to be ready - <-readyChan - - log.Successf("Port-forward established successfully") + log.Successf("Port-forward established on localhost:%d", actualLocalPort) return &Conn{ StopChan: stopChan, - ReadyChan: readyChan, - LocalPort: localPort, + LocalPort: actualLocalPort, }, nil } diff --git a/internal/orchestration/portforward/portforward_test.go b/internal/orchestration/portforward/portforward_test.go index c710dd1..af3cffe 100644 --- a/internal/orchestration/portforward/portforward_test.go +++ b/internal/orchestration/portforward/portforward_test.go @@ -16,7 +16,7 @@ func TestSetupPortForward_ServiceNotFound(t *testing.T) { client := k8s.NewTestClient(fakeClientset) log := logger.New(true, false) - _, err := SetupPortForward(client, "default", "nonexistent-service", 8080, 9200, log) + _, err := SetupPortForward(client, "default", "nonexistent-service", 9200, log) if err == nil { t.Fatal("expected error for nonexistent service, got nil") } @@ -39,7 +39,7 @@ func TestSetupPortForward_NoPodsFound(t *testing.T) { client := k8s.NewTestClient(fakeClientset) log := logger.New(true, false) - _, err := SetupPortForward(client, "default", "test-service", 8080, 9200, log) + _, err := SetupPortForward(client, "default", "test-service", 9200, log) if err == nil { t.Fatal("expected error for service with no pods, got nil") } @@ -74,7 +74,7 @@ func TestSetupPortForward_NoRunningPods(t *testing.T) { client := k8s.NewTestClient(fakeClientset) log := logger.New(true, false) - _, err := SetupPortForward(client, "default", "test-service", 8080, 9200, log) + _, err := SetupPortForward(client, "default", "test-service", 9200, log) if err == nil { t.Fatal("expected error for service with no running pods, got nil") } @@ -82,21 +82,16 @@ func TestSetupPortForward_NoRunningPods(t *testing.T) { func TestConn_Structure(t *testing.T) { stopChan := make(chan struct{}) - readyChan := make(chan struct{}) localPort := 8080 result := &Conn{ StopChan: stopChan, - ReadyChan: readyChan, LocalPort: localPort, } if result.StopChan == nil { t.Error("expected StopChan to be set") } - if result.ReadyChan == nil { - t.Error("expected ReadyChan to be set") - } if result.LocalPort != localPort { t.Errorf("expected LocalPort to be %d, got %d", localPort, result.LocalPort) } @@ -104,11 +99,9 @@ func TestConn_Structure(t *testing.T) { func TestConn_ChannelCleanup(t *testing.T) { stopChan := make(chan struct{}) - readyChan := make(chan struct{}) result := &Conn{ StopChan: stopChan, - ReadyChan: readyChan, LocalPort: 8080, }