Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .opencode/agents/code-reviewer.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,11 @@ Commands MUST use `app.Context` for dependencies, NOT create clients directly:
// GOOD
func runRestore(appCtx *app.Context) error {
appCtx.K8sClient // Use injected client
appCtx.ESClient
appCtx.Config
appCtx.Logger
appCtx.Formatter
// Service clients created via factory methods after port-forwarding
esClient, err := appCtx.NewESClient(pf.LocalPort)
}

// BAD - Direct client creation in command
Expand Down
3 changes: 2 additions & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,10 @@ internal/
// GOOD
func runRestore(appCtx *app.Context) error {
appCtx.K8sClient // Kubernetes client
appCtx.ESClient // Elasticsearch client
appCtx.Config // Configuration
appCtx.Logger // Structured logger
// Service clients created via factory methods after port-forwarding
esClient, err := appCtx.NewESClient(pf.LocalPort)
}
```

Expand Down
22 changes: 13 additions & 9 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,13 @@ if err != nil {
}

// All dependencies available via appCtx
appCtx.K8sClient
appCtx.S3Client
appCtx.ESClient
appCtx.Config
appCtx.Logger
appCtx.Formatter
appCtx.K8sClient // Kubernetes client
appCtx.Config // Configuration
appCtx.Logger // Structured logger
appCtx.Formatter // Output formatter
appCtx.NewESClient(localPort) // Elasticsearch client factory
appCtx.NewS3Client(localPort) // S3/Minio client factory
appCtx.NewCHClient(backupAPIPort, dbPort) // ClickHouse client factory
```

**Dependency Rules**:
Expand Down Expand Up @@ -226,9 +227,10 @@ func runList(appCtx *app.Context) error {
// All dependencies available immediately
appCtx.K8sClient
appCtx.Config
appCtx.S3Client
appCtx.Logger
appCtx.Formatter
// Service clients created via factory methods with port-forwarded port
s3Client, err := appCtx.NewS3Client(pf.LocalPort)
}
```

Expand Down Expand Up @@ -455,9 +457,11 @@ func runListSnapshots(globalFlags *config.CLIGlobalFlags) error {
```go
// GOOD
func runListSnapshots(appCtx *app.Context) error {
// Dependencies already created
// Direct dependencies
appCtx.K8sClient
appCtx.ESClient
appCtx.Config
// Service clients created via factory methods after port-forwarding
esClient, err := appCtx.NewESClient(pf.LocalPort)
}
```

Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,6 @@ elasticsearch:
service:
name: suse-observability-elasticsearch-master-headless
port: 9200
localPortForwardPort: 9200

restore:
repository: sts-backup
Expand Down
31 changes: 21 additions & 10 deletions cmd/clickhouse/check_and_finalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/spf13/cobra"
"github.com/stackvista/stackstate-backup-cli/cmd/cmdutils"
"github.com/stackvista/stackstate-backup-cli/internal/app"
"github.com/stackvista/stackstate-backup-cli/internal/clients/clickhouse"
ch "github.com/stackvista/stackstate-backup-cli/internal/clients/clickhouse"
"github.com/stackvista/stackstate-backup-cli/internal/foundation/config"
"github.com/stackvista/stackstate-backup-cli/internal/orchestration/portforward"
"github.com/stackvista/stackstate-backup-cli/internal/orchestration/restore"
Expand Down Expand Up @@ -45,28 +45,34 @@ It will check the restore status and if complete, execute post-restore tasks and
}

func runCheckAndFinalize(appCtx *app.Context) error {
// Setup port-forward
// Setup port-forward to ClickHouse Backup API
pf, err := portforward.SetupPortForward(
appCtx.K8sClient,
appCtx.Namespace,
appCtx.Config.Clickhouse.BackupService.Name,
appCtx.Config.Clickhouse.BackupService.LocalPortForwardPort,
appCtx.Config.Clickhouse.BackupService.Port,
appCtx.Logger,
)
if err != nil {
return err
}
defer close(pf.StopChan)
return checkAndFinalize(appCtx, checkOperationID, waitForRestore)

// Create CH client with backup API port only
chClient, err := appCtx.NewCHClient(pf.LocalPort, 0)
if err != nil {
return fmt.Errorf("failed to create ClickHouse client: %w", err)
}

return checkAndFinalize(chClient, appCtx, checkOperationID, waitForRestore)
}

// checkAndFinalize checks restore status and finalizes if complete
func checkAndFinalize(appCtx *app.Context, operationID string, waitForComplete bool) error {
func checkAndFinalize(chClient ch.Interface, appCtx *app.Context, operationID string, waitForComplete bool) error {
// Check status
appCtx.Logger.Println()
appCtx.Logger.Infof("Checking restore status for operation: %s", operationID)
status, err := appCtx.CHClient.GetRestoreStatus(appCtx.Context, operationID)
status, err := chClient.GetRestoreStatus(appCtx.Context, operationID)
if err != nil {
return err
}
Expand All @@ -87,7 +93,7 @@ func checkAndFinalize(appCtx *app.Context, operationID string, waitForComplete b
if waitForComplete {
// Still running - wait
appCtx.Logger.Infof("Restore is in progress, waiting for completion...")
return waitAndFinalize(appCtx, appCtx.CHClient, operationID)
return waitAndFinalize(appCtx, chClient, operationID)
}
// Just print status
appCtx.Logger.Println()
Expand All @@ -96,7 +102,7 @@ func checkAndFinalize(appCtx *app.Context, operationID string, waitForComplete b
}

// waitAndFinalize waits for restore completion and finalizes
func waitAndFinalize(appCtx *app.Context, chClient clickhouse.Interface, operationID string) error {
func waitAndFinalize(appCtx *app.Context, chClient ch.Interface, operationID string) error {
restore.PrintAPIWaitingMessage("clickhouse", operationID, appCtx.Namespace, appCtx.Logger)

// Wait for restore using shared utility
Expand Down Expand Up @@ -157,7 +163,6 @@ func executePostRestoreSQL(appCtx *app.Context) error {
appCtx.K8sClient,
appCtx.Namespace,
appCtx.Config.Clickhouse.Service.Name,
appCtx.Config.Clickhouse.Service.LocalPortForwardPort,
appCtx.Config.Clickhouse.Service.Port,
appCtx.Logger,
)
Expand All @@ -166,8 +171,14 @@ func executePostRestoreSQL(appCtx *app.Context) error {
}
defer close(pf.StopChan)

// Create ClickHouse client with DB port only
chDBClient, err := appCtx.NewCHClient(0, pf.LocalPort)
if err != nil {
return fmt.Errorf("failed to create ClickHouse DB client: %w", err)
}

// Create ClickHouse SQL connection
conn, closeConn, err := appCtx.CHClient.Connect()
conn, closeConn, err := chDBClient.Connect()
if err != nil {
return fmt.Errorf("failed to connect to ClickHouse: %w", err)
}
Expand Down
9 changes: 7 additions & 2 deletions cmd/clickhouse/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ func runList(appCtx *app.Context) error {
appCtx.K8sClient,
appCtx.Namespace,
appCtx.Config.Clickhouse.BackupService.Name,
appCtx.Config.Clickhouse.BackupService.LocalPortForwardPort,
appCtx.Config.Clickhouse.BackupService.Port,
appCtx.Logger,
)
Expand All @@ -38,11 +37,17 @@ func runList(appCtx *app.Context) error {
}
defer close(pf.StopChan)

// Create CH client with backup API port only
chClient, err := appCtx.NewCHClient(pf.LocalPort, 0)
if err != nil {
return fmt.Errorf("failed to create ClickHouse client: %w", err)
}

// List backups
appCtx.Logger.Infof("Listing Clickhouse backups...")
appCtx.Logger.Println()

backups, err := appCtx.CHClient.ListBackups(appCtx.Context)
backups, err := chClient.ListBackups(appCtx.Context)
if err != nil {
return fmt.Errorf("failed to list backups: %w", err)
}
Expand Down
20 changes: 15 additions & 5 deletions cmd/clickhouse/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func executeRestore(appCtx *app.Context, backupName string, waitForComplete bool
appCtx.K8sClient,
appCtx.Namespace,
appCtx.Config.Clickhouse.BackupService.Name,
appCtx.Config.Clickhouse.BackupService.LocalPortForwardPort,
appCtx.Config.Clickhouse.BackupService.Port,
appCtx.Logger,
)
Expand All @@ -105,10 +104,16 @@ func executeRestore(appCtx *app.Context, backupName string, waitForComplete bool
}
defer close(pf.StopChan)

// Create CH client with backup API port only
chClient, err := appCtx.NewCHClient(pf.LocalPort, 0)
if err != nil {
return fmt.Errorf("failed to create ClickHouse client: %w", err)
}

// Trigger restore
appCtx.Logger.Println()
appCtx.Logger.Infof("Triggering restore for backup: %s", backupName)
operationID, err := appCtx.CHClient.TriggerRestore(appCtx.Context, backupName)
operationID, err := chClient.TriggerRestore(appCtx.Context, backupName)
if err != nil {
return fmt.Errorf("failed to trigger restore: %w", err)
}
Expand All @@ -119,7 +124,7 @@ func executeRestore(appCtx *app.Context, backupName string, waitForComplete bool
return nil
}

return checkAndFinalize(appCtx, operationID, waitForComplete)
return checkAndFinalize(chClient, appCtx, operationID, waitForComplete)
}

// getLatestBackupForRestore retrieves the most recent backup
Expand All @@ -129,7 +134,6 @@ func getLatestBackupForRestore(appCtx *app.Context) (string, error) {
appCtx.K8sClient,
appCtx.Namespace,
appCtx.Config.Clickhouse.BackupService.Name,
appCtx.Config.Clickhouse.BackupService.LocalPortForwardPort,
appCtx.Config.Clickhouse.BackupService.Port,
appCtx.Logger,
)
Expand All @@ -138,8 +142,14 @@ func getLatestBackupForRestore(appCtx *app.Context) (string, error) {
}
defer close(pf.StopChan)

// Create CH client with backup API port only
chClient, err := appCtx.NewCHClient(pf.LocalPort, 0)
if err != nil {
return "", fmt.Errorf("failed to create ClickHouse client: %w", err)
}

// List backups
backups, err := appCtx.CHClient.ListBackups(appCtx.Context)
backups, err := chClient.ListBackups(appCtx.Context)
if err != nil {
return "", fmt.Errorf("failed to list backups: %w", err)
}
Expand Down
22 changes: 14 additions & 8 deletions cmd/elasticsearch/check_and_finalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/spf13/cobra"
"github.com/stackvista/stackstate-backup-cli/cmd/cmdutils"
"github.com/stackvista/stackstate-backup-cli/internal/app"
es "github.com/stackvista/stackstate-backup-cli/internal/clients/elasticsearch"
"github.com/stackvista/stackstate-backup-cli/internal/foundation/config"
"github.com/stackvista/stackstate-backup-cli/internal/orchestration/portforward"
"github.com/stackvista/stackstate-backup-cli/internal/orchestration/restore"
Expand Down Expand Up @@ -39,24 +40,29 @@ If the restore is still running and --wait is specified, wait for completion bef
func runCheckAndFinalize(appCtx *app.Context) error {
// Setup port-forward to Elasticsearch
serviceName := appCtx.Config.Elasticsearch.Service.Name
localPort := appCtx.Config.Elasticsearch.Service.LocalPortForwardPort
remotePort := appCtx.Config.Elasticsearch.Service.Port

pf, err := portforward.SetupPortForward(appCtx.K8sClient, appCtx.Namespace, serviceName, localPort, remotePort, appCtx.Logger)
pf, err := portforward.SetupPortForward(appCtx.K8sClient, appCtx.Namespace, serviceName, remotePort, appCtx.Logger)
if err != nil {
return err
}
defer close(pf.StopChan)

// Create ES client with actual port
esClient, err := appCtx.NewESClient(pf.LocalPort)
if err != nil {
return fmt.Errorf("failed to create Elasticsearch client: %w", err)
}

repository := appCtx.Config.Elasticsearch.Restore.Repository

return checkAndFinalize(appCtx, repository, checkOperationID, checkWait)
return checkAndFinalize(esClient, appCtx, repository, checkOperationID, checkWait)
}

func checkAndFinalize(appCtx *app.Context, repository, snapshotName string, waitForComplete bool) error {
func checkAndFinalize(esClient es.Interface, appCtx *app.Context, repository, snapshotName string, waitForComplete bool) error {
// Get restore status
appCtx.Logger.Infof("Checking restore status for snapshot: %s", snapshotName)
status, isComplete, err := appCtx.ESClient.GetRestoreStatus(repository, snapshotName)
status, isComplete, err := esClient.GetRestoreStatus(repository, snapshotName)
if err != nil {
return fmt.Errorf("failed to get restore status: %w", err)
}
Expand Down Expand Up @@ -87,7 +93,7 @@ func checkAndFinalize(appCtx *app.Context, repository, snapshotName string, wait

if waitForComplete {
appCtx.Logger.Println()
return waitAndFinalize(appCtx, repository, snapshotName)
return waitAndFinalize(esClient, appCtx, repository, snapshotName)
}

// Not waiting - print status and exit
Expand All @@ -97,12 +103,12 @@ func checkAndFinalize(appCtx *app.Context, repository, snapshotName string, wait
}

// waitAndFinalize waits for restore to complete and finalizes (scale up)
func waitAndFinalize(appCtx *app.Context, repository, snapshotName string) error {
func waitAndFinalize(esClient es.Interface, appCtx *app.Context, repository, snapshotName string) error {
restore.PrintAPIWaitingMessage("elasticsearch", snapshotName, appCtx.Namespace, appCtx.Logger)

// Wait for restore to complete
checkStatusFn := func() (string, bool, error) {
return appCtx.ESClient.GetRestoreStatus(repository, snapshotName)
return esClient.GetRestoreStatus(repository, snapshotName)
}

if err := restore.WaitForAPIRestore(checkStatusFn, 0, appCtx.Logger); err != nil {
Expand Down
15 changes: 10 additions & 5 deletions cmd/elasticsearch/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,33 @@ func runConfigure(appCtx *app.Context) error {

// Setup port-forward to Elasticsearch
serviceName := appCtx.Config.Elasticsearch.Service.Name
localPort := appCtx.Config.Elasticsearch.Service.LocalPortForwardPort
remotePort := appCtx.Config.Elasticsearch.Service.Port

pf, err := portforward.SetupPortForward(appCtx.K8sClient, appCtx.Namespace, serviceName, localPort, remotePort, appCtx.Logger)
pf, err := portforward.SetupPortForward(appCtx.K8sClient, appCtx.Namespace, serviceName, remotePort, appCtx.Logger)
if err != nil {
return err
}
defer close(pf.StopChan)

// Create ES client with actual port
esClient, err := appCtx.NewESClient(pf.LocalPort)
if err != nil {
return fmt.Errorf("failed to create Elasticsearch client: %w", err)
}

// Configure snapshot repository
repo := appCtx.Config.Elasticsearch.SnapshotRepository

// Always unregister existing repository to ensure clean state
appCtx.Logger.Infof("Unregistering snapshot repository '%s'...", repo.Name)
if err := appCtx.ESClient.DeleteSnapshotRepository(repo.Name); err != nil {
if err := esClient.DeleteSnapshotRepository(repo.Name); err != nil {
return fmt.Errorf("failed to unregister snapshot repository: %w", err)
}
appCtx.Logger.Successf("Snapshot repository unregistered successfully")

appCtx.Logger.Infof("Configuring snapshot repository '%s' (bucket: %s)...", repo.Name, repo.Bucket)

err = appCtx.ESClient.ConfigureSnapshotRepository(
err = esClient.ConfigureSnapshotRepository(
repo.Name,
repo.Bucket,
repo.Endpoint,
Expand All @@ -68,7 +73,7 @@ func runConfigure(appCtx *app.Context) error {
slm := appCtx.Config.Elasticsearch.SLM
appCtx.Logger.Infof("Configuring SLM policy '%s'...", slm.Name)

err = appCtx.ESClient.ConfigureSLMPolicy(
err = esClient.ConfigureSLMPolicy(
slm.Name,
slm.Schedule,
slm.SnapshotTemplateName,
Expand Down
4 changes: 1 addition & 3 deletions cmd/elasticsearch/configure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (m *mockESClientForConfigure) IndexExists(_ string) (bool, error) {
return false, fmt.Errorf("not implemented")
}

func (m *mockESClientForConfigure) RestoreSnapshot(_, _, _ string, _ bool) error {
func (m *mockESClientForConfigure) RestoreSnapshot(_, _, _ string) error {
return fmt.Errorf("not implemented")
}

Expand Down Expand Up @@ -138,7 +138,6 @@ elasticsearch:
service:
name: elasticsearch-master
port: 9200
localPortForwardPort: 9200
restore:
scaleDownLabelSelector: app=test
indexPrefix: sts_
Expand Down Expand Up @@ -173,7 +172,6 @@ elasticsearch:
service:
name: elasticsearch-master
port: 9200
localPortForwardPort: 9200
restore:
scaleDownLabelSelector: app=test
indexPrefix: sts_
Expand Down
Loading