diff --git a/accessors/spanner/spanner_accessor.go b/accessors/spanner/spanner_accessor.go index 8a924999c6..f92db08c86 100644 --- a/accessors/spanner/spanner_accessor.go +++ b/accessors/spanner/spanner_accessor.go @@ -74,6 +74,8 @@ type SpannerAccessor interface { ValidateDDL(ctx context.Context, dbURI string) error // UpdateDDLForeignKeys updates the Spanner database with foreign key constraints using ALTER TABLE statements. UpdateDDLForeignKeys(ctx context.Context, dbURI string, conv *internal.Conv, driver string, migrationType string) + // UpdateDDLIndexes updates the Spanner database with indexes using CREATE INDEX statements. + UpdateDDLIndexes(ctx context.Context, dbURI string, conv *internal.Conv, driver string, migrationType string) // Deletes a database. DropDatabase(ctx context.Context, dbURI string) error //Runs a query against the provided spanner database and returns if the executed DML is validate or not @@ -287,12 +289,7 @@ func (sp *SpannerAccessorImpl) CreateDatabase(ctx context.Context, dbURI string, req.DatabaseDialect = adminpb.DatabaseDialect_POSTGRESQL } else { req.CreateStatement = "CREATE DATABASE `" + dbName + "`" - if migrationType == constants.DATAFLOW_MIGRATION { - req.ExtraStatements = ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: true, ForeignKeys: true, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, conv.SpSequences) - } else { - req.ExtraStatements = ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: true, ForeignKeys: false, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, conv.SpSequences) - } - + req.ExtraStatements = ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: true, ForeignKeys: false, Indexes: false, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, conv.SpSequences) } op, err := sp.AdminClient.CreateDatabase(ctx, req) @@ -316,7 +313,7 @@ func (sp *SpannerAccessorImpl) UpdateDatabase(ctx context.Context, dbURI string, // Spanner DDL doesn't accept them), and protects table and col names // using backticks (to avoid any issues with Spanner reserved words). // Foreign Keys are set to false since we create them post data migration. - schema := ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: true, ForeignKeys: false, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, conv.SpSequences) + schema := ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: true, ForeignKeys: false, Indexes: false, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, conv.SpSequences) req := &adminpb.UpdateDatabaseDdlRequest{ Database: dbURI, Statements: schema, @@ -383,35 +380,8 @@ func (sp *SpannerAccessorImpl) ValidateDDL(ctx context.Context, dbURI string) er return nil } -// UpdateDDLForeignKeys updates the Spanner database with foreign key -// constraints using ALTER TABLE statements. -func (sp *SpannerAccessorImpl) UpdateDDLForeignKeys(ctx context.Context, dbURI string, conv *internal.Conv, driver string, migrationType string) { - - if conv.SpDialect != constants.DIALECT_POSTGRESQL && migrationType == constants.DATAFLOW_MIGRATION { - //foreign keys were applied as part of CreateDatabase - return - } - - // The schema we send to Spanner excludes comments (since Cloud - // Spanner DDL doesn't accept them), and protects table and col names - // using backticks (to avoid any issues with Spanner reserved words). - // Sequences will not be passed as they have already been created. - fkStmts := ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: false, ForeignKeys: true, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, make(map[string]ddl.Sequence)) - if len(fkStmts) == 0 { - return - } - if len(fkStmts) > 50 { - logger.Log.Warn(` - Warning: Large number of foreign keys detected. Spanner can take a long amount of - time to create foreign keys (over 5 mins per batch of Foreign Keys even with no data). - Spanner migration tool does not have control over a single foreign key creation time. The number - of concurrent Foreign Key Creation Requests sent to spanner can be increased by - tweaking the MaxWorkers variable (https://github.com/GoogleCloudPlatform/spanner-migration-tool/blob/master/conversion/conversion.go#L89). - However, setting it to a very high value might lead to exceeding the admin quota limit. Spanner migration tool tries to stay under the - admin quota limit by spreading the FK creation requests over time.`) - } - msg := fmt.Sprintf("Updating schema of database %s with foreign key constraints ...", dbURI) - conv.Audit.Progress = *internal.NewProgress(int64(len(fkStmts)), msg, internal.Verbose(), true, int(internal.ForeignKeyUpdateInProgress)) +func (sp *SpannerAccessorImpl) updateForeignKeyAndIndexes(stmts []string, conv *internal.Conv, msg string, ctx context.Context, dbURI string, progressTracker, completionTracker int) { + conv.Audit.Progress = *internal.NewProgress(int64(len(stmts)), msg, internal.Verbose(), true, progressTracker) workers := make(chan int, MaxWorkers) for i := 1; i <= MaxWorkers; i++ { @@ -420,13 +390,13 @@ func (sp *SpannerAccessorImpl) UpdateDDLForeignKeys(ctx context.Context, dbURI s var progressMutex sync.Mutex progress := int64(0) - // We dispatch parallel foreign key create requests to ensure the backfill runs in parallel to reduce overall time. + // We dispatch parallel requests to ensure the backfill runs in parallel to reduce overall time. // This cuts down the time taken to a third (approx) compared to Serial and Batched creation. We also do not want to create // too many requests and get throttled due to network or hitting catalog memory limits. // Ensure atmost `MaxWorkers` go routines run in parallel that each update the ddl with one foreign key statement. - for _, fkStmt := range fkStmts { + for _, stmt := range stmts { workerID := <-workers - go func(fkStmt string, workerID int) { + go func(stmt string, workerID int) { defer func() { // Locking the progress reporting otherwise progress results displayed could be in random order. progressMutex.Lock() @@ -435,37 +405,90 @@ func (sp *SpannerAccessorImpl) UpdateDDLForeignKeys(ctx context.Context, dbURI s progressMutex.Unlock() workers <- workerID }() - internal.VerbosePrintf("Submitting new FK create request: %s\n", fkStmt) - logger.Log.Debug("Submitting new FK create request", zap.String("fkStmt", fkStmt)) + internal.VerbosePrintf("Submitting new DDL request: %s\n", stmt) + logger.Log.Debug("Submitting new DDL request", zap.String("stmt", stmt)) op, err := sp.AdminClient.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{ Database: dbURI, - Statements: []string{fkStmt}, + Statements: []string{stmt}, }) if err != nil { - logger.Log.Debug("Can't add foreign key with statement:" + fkStmt + "\n due to error:" + err.Error() + " Skipping this foreign key...\n") - conv.Unexpected(fmt.Sprintf("Can't add foreign key with statement %s: %s", fkStmt, err)) + logger.Log.Debug("Can't update schema with statement:" + stmt + "\n due to error:" + err.Error() + " Skipping this request...\n") + conv.Unexpected(fmt.Sprintf("Can't update schema with statement %s: %s", stmt, err)) return } if err := op.Wait(ctx); err != nil { - logger.Log.Debug("Can't add foreign key with statement:" + fkStmt + "\n due to error:" + err.Error() + " Skipping this foreign key...\n") - conv.Unexpected(fmt.Sprintf("Can't add foreign key with statement %s: %s", fkStmt, err)) + logger.Log.Debug("Can't update schema with statement:" + stmt + "\n due to error:" + err.Error() + " Skipping this request...\n") + conv.Unexpected(fmt.Sprintf("Can't update schema with statement %s: %s", stmt, err)) return } - internal.VerbosePrintln("Updated schema with statement: " + fkStmt) - logger.Log.Debug("Updated schema with statement", zap.String("fkStmt", fkStmt)) - }(fkStmt, workerID) - // Send out an FK creation request every second, with total of maxWorkers request being present in a batch. + internal.VerbosePrintln("Updated schema with statement: " + stmt) + logger.Log.Debug("Updated schema with statement", zap.String("stmt", stmt)) + }(stmt, workerID) + // Send out a request every second, with total of maxWorkers request being present in a batch. time.Sleep(time.Second) } // Wait for all the goroutines to finish. for i := 1; i <= MaxWorkers; i++ { <-workers } - conv.Audit.Progress.UpdateProgress("Foreign key update complete.", 100, internal.ForeignKeyUpdateComplete) + conv.Audit.Progress.UpdateProgress("Update complete.", 100, internal.ProgressStatus(completionTracker)) conv.Audit.Progress.Done() } +// UpdateDDLForeignKeys updates the Spanner database with foreign key +// constraints using ALTER TABLE statements. +func (sp *SpannerAccessorImpl) UpdateDDLForeignKeys(ctx context.Context, dbURI string, conv *internal.Conv, driver string, migrationType string) { + + // The schema we send to Spanner excludes comments (since Cloud + // Spanner DDL doesn't accept them), and protects table and col names + // using backticks (to avoid any issues with Spanner reserved words). + // Sequences will not be passed as they have already been created. + fkStmts := ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: false, ForeignKeys: true, Indexes: false, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, make(map[string]ddl.Sequence)) + if len(fkStmts) == 0 { + return + } + if len(fkStmts) > 50 { + logger.Log.Warn(` + Warning: Large number of foreign keys detected. Spanner can take a long amount of + time to create foreign keys (over 5 mins per batch of Foreign Keys even with no data). + Spanner migration tool does not have control over a single foreign key creation time. The number + of concurrent requests sent to spanner can be increased by + tweaking the MaxWorkers variable (https://github.com/GoogleCloudPlatform/spanner-migration-tool/blob/master/conversion/conversion.go#L89). + However, setting it to a very high value might lead to exceeding the admin quota limit. Spanner migration tool tries to stay under the + admin quota limit by spreading the requests over time.`) + } + msg := fmt.Sprintf("Updating schema of database %s with foreign key constraints ...", dbURI) + sp.updateForeignKeyAndIndexes(fkStmts, conv, msg, ctx, dbURI, int(internal.ForeignKeyUpdateInProgress), int(internal.ForeignKeyUpdateComplete)) + +} + +// UpdateDDLIndexes updates the Spanner database with indexes using CREATE INDEX statements. +func (sp *SpannerAccessorImpl) UpdateDDLIndexes(ctx context.Context, dbURI string, conv *internal.Conv, driver string, migrationType string) { + + // The schema we send to Spanner excludes comments (since Cloud + // Spanner DDL doesn't accept them), and protects table and col names + // using backticks (to avoid any issues with Spanner reserved words). + // Sequences will not be passed as they have already been created. + indexStmts := ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: false, ForeignKeys: false, Indexes: true, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, make(map[string]ddl.Sequence)) + if len(indexStmts) == 0 { + return + } + if len(indexStmts) > 50 { + logger.Log.Warn(` + Warning: Large number of indexes detected. Spanner can take a long amount of + time to create indexes. + Spanner migration tool does not have control over a single index creation time. The number + of concurrent requests sent to spanner can be increased by + tweaking the MaxWorkers variable (https://github.com/GoogleCloudPlatform/spanner-migration-tool/blob/master/conversion/conversion.go#L89). + However, setting it to a very high value might lead to exceeding the admin quota limit. Spanner migration tool tries to stay under the + admin quota limit by spreading the requests over time.`) + } + msg := fmt.Sprintf("Updating schema of database %s with indexes ...", dbURI) + sp.updateForeignKeyAndIndexes(indexStmts, conv, msg, ctx, dbURI, int(internal.IndexUpdateInProgress), int(internal.IndexUpdateComplete)) + +} + func (sp *SpannerAccessorImpl) DropDatabase(ctx context.Context, dbURI string) error { err := sp.AdminClient.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: dbURI}) @@ -490,6 +513,6 @@ func (sp *SpannerAccessorImpl) ValidateDML(ctx context.Context, query string) (b } } -func (sp *SpannerAccessorImpl) Refresh(ctx context.Context, dbURI string) () { +func (sp *SpannerAccessorImpl) Refresh(ctx context.Context, dbURI string) { sp.SpannerClient.Refresh(ctx, dbURI) } diff --git a/cmd/data.go b/cmd/data.go index 372bacc373..40582a22ee 100644 --- a/cmd/data.go +++ b/cmd/data.go @@ -203,7 +203,7 @@ func (cmd *DataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface } // validateExistingDb validates that the existing spanner schema is in accordance with the one specified in the session file. -func validateExistingDb(ctx context.Context, spDialect, dbURI string, adminClient *database.DatabaseAdminClient, client *sp.Client, conv *internal.Conv) error { +func validateExistingDb(ctx context.Context, spDialect, dbURI string, adminClient *database.DatabaseAdminClient, client *sp.Client, conv *internal.Conv) error { spA, err := spanneraccessor.NewSpannerAccessorClientImpl(ctx) if err != nil { return err diff --git a/cmd/schema.go b/cmd/schema.go index 0d605030ef..0a20ff9730 100644 --- a/cmd/schema.go +++ b/cmd/schema.go @@ -37,16 +37,18 @@ import ( // SchemaCmd struct with flags. type SchemaCmd struct { - source string - sourceProfile string - target string - targetProfile string - filePrefix string // TODO: move filePrefix to global flags - project string - logLevel string - dryRun bool - validate bool - sessionJSON string + source string + sourceProfile string + target string + targetProfile string + filePrefix string // TODO: move filePrefix to global flags + project string + logLevel string + dryRun bool + validate bool + sessionJSON string + SkipForeignKeys bool + SkipIndexes bool } // Name returns the name of operation. @@ -82,6 +84,8 @@ func (cmd *SchemaCmd) SetFlags(f *flag.FlagSet) { f.BoolVar(&cmd.dryRun, "dry-run", false, "Flag for generating DDL and schema conversion report without creating a spanner database") f.BoolVar(&cmd.validate, "validate", false, "Flag for validating if all the required input parameters are present") f.StringVar(&cmd.sessionJSON, "session", "", "Optional. Specifies the file we restore session state from.") + f.BoolVar(&cmd.SkipForeignKeys, "skip-foreign-keys", false, "Skip creating foreign keys after schema migration is complete (ddl statements for foreign keys can still be found in the downloaded schema.ddl.txt file and the same can be applied separately)") + f.BoolVar(&cmd.SkipIndexes, "skip-indexes", false, "Skip creating indexes as a part of the schema migration (ddl statements for indexes can still be found in the downloaded schema.ddl.txt file and the same can be applied separately)") } func (cmd *SchemaCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus { diff --git a/cmd/schema_and_data.go b/cmd/schema_and_data.go index 1c0d1901bb..25f5bbe5a3 100644 --- a/cmd/schema_and_data.go +++ b/cmd/schema_and_data.go @@ -44,6 +44,7 @@ type SchemaAndDataCmd struct { target string targetProfile string SkipForeignKeys bool + SkipIndexes bool filePrefix string // TODO: move filePrefix to global flags project string WriteLimit int64 @@ -80,7 +81,8 @@ func (cmd *SchemaAndDataCmd) SetFlags(f *flag.FlagSet) { f.StringVar(&cmd.sourceProfile, "source-profile", "", "Flag for specifying connection profile for source database e.g., \"file=,format=dump\"") f.StringVar(&cmd.target, "target", "Spanner", "Specifies the target DB, defaults to Spanner (accepted values: `Spanner`)") f.StringVar(&cmd.targetProfile, "target-profile", "", "Flag for specifying connection profile for target database e.g., \"dialect=postgresql\"") - f.BoolVar(&cmd.SkipForeignKeys, "skip-foreign-keys", false, "Skip creating foreign keys after data migration is complete (ddl statements for foreign keys can still be found in the downloaded schema.ddl.txt file and the same can be applied separately)") + f.BoolVar(&cmd.SkipForeignKeys, "skip-foreign-keys", false, "Skip creating foreign keys after schema migration is complete (ddl statements for foreign keys can still be found in the downloaded schema.ddl.txt file and the same can be applied separately)") + f.BoolVar(&cmd.SkipIndexes, "skip-indexes", false, "Skip creating indexes as a part of the schema migration (ddl statements for indexes can still be found in the downloaded schema.ddl.txt file and the same can be applied separately)") f.StringVar(&cmd.filePrefix, "prefix", "", "File prefix for generated files") f.StringVar(&cmd.project, "project", "", "Flag spcifying default project id for all the generated resources for the migration") f.Int64Var(&cmd.WriteLimit, "write-limit", DefaultWritersLimit, "Write limit for writes to spanner") diff --git a/cmd/utils.go b/cmd/utils.go index 11ad101bbf..5848d034fe 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -145,7 +145,7 @@ func MigrateDatabase(ctx context.Context, migrationProjectId string, targetProfi defer client.Close() switch v := cmd.(type) { case *SchemaCmd: - err = migrateSchema(ctx, targetProfile, sourceProfile, ioHelper, conv, dbURI, adminClient) + err = migrateSchema(ctx, targetProfile, sourceProfile, ioHelper, conv, dbURI, adminClient, v) case *DataCmd: bw, err = migrateData(ctx, migrationProjectId, targetProfile, sourceProfile, ioHelper, conv, dbURI, adminClient, client, v) case *SchemaAndDataCmd: @@ -159,7 +159,7 @@ func MigrateDatabase(ctx context.Context, migrationProjectId string, targetProfi } func migrateSchema(ctx context.Context, targetProfile profiles.TargetProfile, sourceProfile profiles.SourceProfile, - ioHelper *utils.IOStreams, conv *internal.Conv, dbURI string, adminClient *database.DatabaseAdminClient) error { + ioHelper *utils.IOStreams, conv *internal.Conv, dbURI string, adminClient *database.DatabaseAdminClient, cmd *SchemaCmd) error { spA, err := spanneraccessor.NewSpannerAccessorClientImpl(ctx) if err != nil { return err @@ -171,6 +171,12 @@ func migrateSchema(ctx context.Context, targetProfile profiles.TargetProfile, so } metricsPopulation(ctx, sourceProfile.Driver, conv) conv.Audit.Progress.UpdateProgress("Schema migration complete.", completionPercentage, internal.SchemaMigrationComplete) + if !cmd.SkipIndexes { + spA.UpdateDDLIndexes(ctx, dbURI, conv, sourceProfile.Driver, sourceProfile.Config.ConfigType) + } + if !cmd.SkipForeignKeys { + spA.UpdateDDLForeignKeys(ctx, dbURI, conv, sourceProfile.Driver, sourceProfile.Config.ConfigType) + } return nil } @@ -206,11 +212,7 @@ func migrateData(ctx context.Context, migrationProjectId string, targetProfile p } conv.Audit.Progress.UpdateProgress("Data migration complete.", completionPercentage, internal.DataMigrationComplete) if !cmd.SkipForeignKeys { - spA, err := spanneraccessor.NewSpannerAccessorClientImpl(ctx) - if err != nil { - return bw, err - } - spA.UpdateDDLForeignKeys(ctx, dbURI, conv, sourceProfile.Driver, sourceProfile.Config.ConfigType) + fmt.Printf("skip-foreign-keys flag has been deprecated from the data migration mode and foreign keys will not be created as a part of data migration. Please use schema or schem-and-data subcommand for foreign key creation.") } return bw, nil } @@ -229,6 +231,13 @@ func migrateSchemaAndData(ctx context.Context, migrationProjectId string, target metricsPopulation(ctx, sourceProfile.Driver, conv) conv.Audit.Progress.UpdateProgress("Schema migration complete.", completionPercentage, internal.SchemaMigrationComplete) + if !cmd.SkipIndexes { + spA.UpdateDDLIndexes(ctx, dbURI, conv, sourceProfile.Driver, sourceProfile.Config.ConfigType) + } + if !cmd.SkipForeignKeys { + spA.UpdateDDLForeignKeys(ctx, dbURI, conv, sourceProfile.Driver, sourceProfile.Config.ConfigType) + } + // If migration type is Minimal Downtime, validate if required resources can be generated if !conv.UI && sourceProfile.Driver == constants.MYSQL && sourceProfile.Ty == profiles.SourceProfileTypeConfig && sourceProfile.Config.ConfigType == constants.DATAFLOW_MIGRATION { err := ValidateResourceGenerationHelper(ctx, migrationProjectId, targetProfile.Conn.Sp.Instance, sourceProfile, conv) @@ -246,9 +255,6 @@ func migrateSchemaAndData(ctx context.Context, migrationProjectId string, target } conv.Audit.Progress.UpdateProgress("Data migration complete.", completionPercentage, internal.DataMigrationComplete) - if !cmd.SkipForeignKeys { - spA.UpdateDDLForeignKeys(ctx, dbURI, conv, sourceProfile.Driver, sourceProfile.Config.ConfigType) - } return bw, nil } diff --git a/conversion/store_files.go b/conversion/store_files.go index 23eb6e502f..0c7c8f67e6 100644 --- a/conversion/store_files.go +++ b/conversion/store_files.go @@ -42,7 +42,7 @@ func WriteSchemaFile(conv *internal.Conv, now time.Time, name string, out *os.Fi // and doesn't add backticks around table and column names. This file is // intended for explanatory and documentation purposes, and is not strictly // legal Cloud Spanner DDL (Cloud Spanner doesn't currently support comments). - spDDL := ddl.GetDDL(ddl.Config{Comments: true, ProtectIds: false, Tables: true, ForeignKeys: true, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, conv.SpSequences) + spDDL := ddl.GetDDL(ddl.Config{Comments: true, ProtectIds: false, Tables: true, ForeignKeys: true, Indexes: true, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, conv.SpSequences) if len(spDDL) == 0 { spDDL = []string{"\n-- Schema is empty -- no tables found\n"} } @@ -69,7 +69,7 @@ func WriteSchemaFile(conv *internal.Conv, now time.Time, name string, out *os.Fi // We change 'Comments' to false and 'ProtectIds' to true below to write out a // schema file that is a legal Cloud Spanner DDL. - spDDL = ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: true, ForeignKeys: true, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, conv.SpSequences) + spDDL = ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: true, ForeignKeys: true, Indexes: true, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, conv.SpSequences) if len(spDDL) == 0 { spDDL = []string{"\n-- Schema is empty -- no tables found\n"} } diff --git a/internal/progress.go b/internal/progress.go index 8035a09317..30bcbed5a5 100644 --- a/internal/progress.go +++ b/internal/progress.go @@ -50,6 +50,8 @@ const ( DataWriteInProgress ForeignKeyUpdateInProgress ForeignKeyUpdateComplete + IndexUpdateInProgress + IndexUpdateComplete ) // NewProgress creates and returns a Progress instance. diff --git a/spanner/ddl/ast.go b/spanner/ddl/ast.go index db2cf02cdb..da7d79aa8c 100644 --- a/spanner/ddl/ast.go +++ b/spanner/ddl/ast.go @@ -195,6 +195,7 @@ type Config struct { ProtectIds bool // If true, table and col names are quoted using backticks (avoids reserved-word issue). Tables bool // If true, print tables ForeignKeys bool // If true, print foreign key constraints. + Indexes bool // If true, print indexes. SpDialect string Source string // SourceDB information for determining case-sensitivity handling for PGSQL } @@ -647,6 +648,11 @@ func GetDDL(c Config, tableSchema Schema, sequenceSchema map[string]Sequence) [] if c.Tables { for _, tableId := range tableIds { ddl = append(ddl, tableSchema[tableId].PrintCreateTable(tableSchema, c)) + } + } + + if c.Indexes { + for _, tableId := range tableIds { for _, index := range tableSchema[tableId].Indexes { ddl = append(ddl, index.PrintCreateIndex(tableSchema[tableId], c)) } diff --git a/testing/accessors/spanner/spanner_accessor_test.go b/testing/accessors/spanner/spanner_accessor_test.go index ef3730bd61..76fdd2c4df 100644 --- a/testing/accessors/spanner/spanner_accessor_test.go +++ b/testing/accessors/spanner/spanner_accessor_test.go @@ -107,7 +107,7 @@ func TestCheckExistingDb(t *testing.T) { t.Fatal(err) } spA := spanneraccessor.SpannerAccessorImpl{AdminClient: adminClientImpl} - dbURI := fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectID, instanceID, "check-db-exists") + dbURI := fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectID, instanceID, "check-db-exists") err = spA.CreateDatabase(ctx, dbURI, internal.MakeConv(), "", constants.BULK_MIGRATION) if err != nil { t.Fatal(err) diff --git a/testing/conversion/conversion_test.go b/testing/conversion/conversion_test.go index 30232f5e00..c0d5b2ef1d 100644 --- a/testing/conversion/conversion_test.go +++ b/testing/conversion/conversion_test.go @@ -36,9 +36,9 @@ import ( "go.uber.org/zap" database "cloud.google.com/go/spanner/admin/database/apiv1" - databasepb "google.golang.org/genproto/googleapis/spanner/admin/database/v1" - spanneraccessor "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/spanner" spanneradmin "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/clients/spanner/admin" + spanneraccessor "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/spanner" + databasepb "google.golang.org/genproto/googleapis/spanner/admin/database/v1" ) var ( diff --git a/ui/src/app/app.constants.ts b/ui/src/app/app.constants.ts index 2b77d10079..489cf7145f 100644 --- a/ui/src/app/app.constants.ts +++ b/ui/src/app/app.constants.ts @@ -68,12 +68,16 @@ export enum MigrationDetails { DataMigrationProgress = 'dataMigrationProgress', SchemaMigrationProgress = 'schemaMigrationProgress', HasForeignKeyUpdateStarted = 'hasForeignKeyUpdateStarted', + HasIndexUpdateStarted = 'hasIndexUpdateStarted', ForeignKeyProgressMessage = 'foreignKeyProgressMessage', ForeignKeyUpdateProgress = 'foreignKeyUpdateProgress', + IndexProgressMessage = 'indexProgressMessage', + IndexUpdateProgress = 'indexUpdateProgress', GeneratingResources = 'generatingResources', NumberOfShards = 'numberOfShards', NumberOfInstances = 'numberOfInstances', isForeignKeySkipped = 'isForeignKeySkipped', + isIndexSkipped = 'isIndexSkipped', IsGcsMetadataPathSet = 'isGcsMetadataPathSet' } @@ -106,7 +110,9 @@ export enum ProgressStatus { DataMigrationComplete = 3, DataWriteInProgress = 4, ForeignKeyUpdateInProgress = 5, - ForeignKeyUpdateComplete = 6 + ForeignKeyUpdateComplete = 6, + IndexUpdateInProgress = 5, + IndexUpdateComplete = 6 } export const DialectList = [ diff --git a/ui/src/app/components/prepare-migration/prepare-migration.component.html b/ui/src/app/components/prepare-migration/prepare-migration.component.html index 397c664e68..bbb28e94e0 100644 --- a/ui/src/app/components/prepare-migration/prepare-migration.component.html +++ b/ui/src/app/components/prepare-migration/prepare-migration.component.html @@ -64,11 +64,20 @@

Source and Target Database definitions (per shard)

info
-
+
Skip Foreign Key Creation: - + + {{ element.displayName }} + + + +
+ + Skip Index Creation: + + {{ element.displayName }} @@ -354,6 +363,11 @@

Source database details:

{{this.dataProgressMessage}}
+
+
+ + {{this.indexProgressMessage}} +

diff --git a/ui/src/app/components/prepare-migration/prepare-migration.component.ts b/ui/src/app/components/prepare-migration/prepare-migration.component.ts index d9de4930f6..55e3eca10e 100644 --- a/ui/src/app/components/prepare-migration/prepare-migration.component.ts +++ b/ui/src/app/components/prepare-migration/prepare-migration.component.ts @@ -23,7 +23,7 @@ import { IShardSessionDetails } from 'src/app/model/db-config' import { ShardedDataflowMigrationDetailsFormComponent } from '../sharded-dataflow-migration-details-form/sharded-dataflow-migration-details-form.component' import { SidenavService } from 'src/app/services/sidenav/sidenav.service' import { downloadSession } from 'src/app/utils/utils' -import {MatPaginator} from '@angular/material/paginator'; +import { MatPaginator } from '@angular/material/paginator'; import { MatTableDataSource } from '@angular/material/table' import { GcsMetadataDetailsFormComponent } from '../gcs-metadata-details-form/gcs-metadata-details-form.component' @Component({ @@ -55,12 +55,14 @@ export class PrepareMigrationComponent implements OnInit { isSourceDetailsSet: boolean = false isTargetDetailSet: boolean = false isForeignKeySkipped: boolean = false + isIndexSkipped: boolean = false isMigrationDetailSet: boolean = false isStreamingSupported: boolean = false isGcsMetadataDetailSet: boolean = false hasDataMigrationStarted: boolean = false hasSchemaMigrationStarted: boolean = false hasForeignKeyUpdateStarted: boolean = false + hasIndexUpdateStarted: boolean = false selectedMigrationMode: string = MigrationModes.schemaAndData connectionType: string = InputType.DirectConnect selectedMigrationType: string = MigrationTypes.lowDowntimeMigration @@ -72,9 +74,11 @@ export class PrepareMigrationComponent implements OnInit { schemaProgressMessage: string = 'Schema migration in progress...' dataProgressMessage: string = 'Data migration in progress...' foreignKeyProgressMessage: string = 'Foreign key update in progress...' + indexProgressMessage: string = 'Index update in progress...' dataMigrationProgress: number = 0 schemaMigrationProgress: number = 0 foreignKeyUpdateProgress: number = 0 + indexUpdateProgress: number = 0 sourceDatabaseName: string = '' sourceDatabaseType: string = '' resourcesGenerated: IGeneratedResources = { @@ -95,18 +99,18 @@ export class PrepareMigrationComponent implements OnInit { DlqPubsubTopicUrl: '', DlqPubsubSubscriptionName: '', DlqPubsubSubscriptionUrl: '', - MonitoringDashboardName:'', - MonitoringDashboardUrl:'', - AggMonitoringDashboardName:'', - AggMonitoringDashboardUrl:'', + MonitoringDashboardName: '', + MonitoringDashboardUrl: '', + AggMonitoringDashboardName: '', + AggMonitoringDashboardUrl: '', DataflowGcloudCmd: '', ShardToShardResourcesMap: new Map(), } generatedResourcesColumns = ['shardId', 'resourceType', 'resourceName', 'resourceUrl'] - + @ViewChild(MatPaginator) paginator!: MatPaginator - + displayedResources: ResourceDetails[] = [] displayedResourcesDataSource: MatTableDataSource = new MatTableDataSource(this.displayedResources) configuredMigrationProfile!: IMigrationProfile @@ -140,7 +144,7 @@ export class PrepareMigrationComponent implements OnInit { ttlInDays: localStorage.getItem(Gcs.TtlInDays) as string, ttlInDaysSet: (localStorage.getItem(Gcs.TtlInDaysSet) as string === 'true') } - + dataflowConfig: IDataflowConfig = { network: localStorage.getItem(Dataflow.Network) as string, subnetwork: localStorage.getItem(Dataflow.Subnetwork) as string, @@ -164,7 +168,7 @@ export class PrepareMigrationComponent implements OnInit { IsMetadataDbCreated: false, IsConfigValid: false } - skipForeignKeyResponseList = [ + skipSchemaUpdateResponseList = [ { value: false, displayName: 'No' }, { value: true, displayName: 'Yes' }, ] @@ -312,6 +316,9 @@ export class PrepareMigrationComponent implements OnInit { if (localStorage.getItem(MigrationDetails.isForeignKeySkipped) != null) { this.isForeignKeySkipped = localStorage.getItem(MigrationDetails.isForeignKeySkipped) === 'true' } + if (localStorage.getItem(MigrationDetails.isIndexSkipped) != null) { + this.isIndexSkipped = localStorage.getItem(MigrationDetails.isIndexSkipped) === 'true' + } if (localStorage.getItem(MigrationDetails.IsMigrationInProgress) != null) { this.isMigrationInProgress = (localStorage.getItem(MigrationDetails.IsMigrationInProgress) as string) === 'true' @@ -391,6 +398,20 @@ export class PrepareMigrationComponent implements OnInit { this.hasForeignKeyUpdateStarted = (localStorage.getItem(MigrationDetails.HasForeignKeyUpdateStarted) as string) === 'true' } + if (localStorage.getItem(MigrationDetails.HasIndexUpdateStarted) != null) { + this.hasIndexUpdateStarted = + (localStorage.getItem(MigrationDetails.HasIndexUpdateStarted) as string) === 'true' + } + if (localStorage.getItem(MigrationDetails.IndexProgressMessage) != null) { + this.indexProgressMessage = localStorage.getItem( + MigrationDetails.IndexProgressMessage + ) as string + } + if (localStorage.getItem(MigrationDetails.IndexUpdateProgress) != null) { + this.indexUpdateProgress = parseInt( + localStorage.getItem(MigrationDetails.IndexUpdateProgress) as string + ) + } if (localStorage.getItem(MigrationDetails.GeneratingResources) != null) { this.generatingResources = (localStorage.getItem(MigrationDetails.GeneratingResources) as string) === 'true' @@ -409,6 +430,7 @@ export class PrepareMigrationComponent implements OnInit { localStorage.removeItem(MigrationDetails.IsTargetDetailSet) localStorage.removeItem(MigrationDetails.IsGcsMetadataPathSet) localStorage.removeItem(MigrationDetails.isForeignKeySkipped) + localStorage.removeItem(MigrationDetails.isIndexSkipped) localStorage.removeItem(MigrationDetails.IsSourceConnectionProfileSet) localStorage.removeItem(MigrationDetails.IsTargetConnectionProfileSet) localStorage.removeItem(MigrationDetails.IsSourceDetailsSet) @@ -441,11 +463,14 @@ export class PrepareMigrationComponent implements OnInit { localStorage.removeItem(MigrationDetails.ForeignKeyProgressMessage) localStorage.removeItem(MigrationDetails.ForeignKeyUpdateProgress) localStorage.removeItem(MigrationDetails.HasForeignKeyUpdateStarted) + localStorage.removeItem(MigrationDetails.HasIndexUpdateStarted) + localStorage.removeItem(MigrationDetails.IndexProgressMessage) + localStorage.removeItem(MigrationDetails.IndexUpdateProgress) localStorage.removeItem(MigrationDetails.GeneratingResources) localStorage.removeItem(MigrationDetails.NumberOfShards) localStorage.removeItem(MigrationDetails.NumberOfInstances) } - + openConnectionProfileForm(isSource: boolean) { let payload: ISetUpConnectionProfile = { IsSource: isSource, @@ -530,21 +555,21 @@ export class PrepareMigrationComponent implements OnInit { ) } - openGcloudPopup(cmd: string){ + openGcloudPopup(cmd: string) { let dialogRef = this.dialog.open(EquivalentGcloudCommandComponent, { - width: '30vw', - minWidth: '400px', - maxWidth: '500px', - data: cmd, - }) + width: '30vw', + minWidth: '400px', + maxWidth: '500px', + data: cmd, + }) } - openTuneDatastreamForm(){ + openTuneDatastreamForm() { let dialogRef = this.dialog.open(TuneDatastreamFormComponent, { width: '4000px', minWidth: '400px', maxWidth: '500px', - data: { sourceType : this.sourceDatabaseType }, + data: { sourceType: this.sourceDatabaseType }, }) dialogRef.afterClosed().subscribe(() => { this.datastreamConfig = { @@ -567,7 +592,7 @@ export class PrepareMigrationComponent implements OnInit { ) } - openTuneGcsForm(){ + openTuneGcsForm() { let dialogRef = this.dialog.open(TuneGcsFormComponent, { width: '4000px', minWidth: '400px', @@ -576,7 +601,7 @@ export class PrepareMigrationComponent implements OnInit { dialogRef.afterClosed().subscribe(() => { this.gcsConfig = { ttlInDays: localStorage.getItem(Gcs.TtlInDays) as string, - ttlInDaysSet: (localStorage.getItem(Gcs.TtlInDaysSet) as string === 'true') + ttlInDaysSet: (localStorage.getItem(Gcs.TtlInDaysSet) as string === 'true') } this.isGcsConfigurationSet = localStorage.getItem(Gcs.IsGcsConfigSet) as string === 'true' // We only call setGcsDetailsForShardedMigrations for sharded flows which is fetched later to create sharding configs for the shards. @@ -766,7 +791,8 @@ export class PrepareMigrationComponent implements OnInit { IsSharded: this.isSharded, MigrationType: this.selectedMigrationType, MigrationMode: this.selectedMigrationMode, - skipForeignKeys: this.isForeignKeySkipped + skipForeignKeys: this.isForeignKeySkipped, + skipIndexes: this.isIndexSkipped } this.fetch.migrate(payload).subscribe({ next: () => { @@ -805,137 +831,297 @@ export class PrepareMigrationComponent implements OnInit { }) } + // subscribeMigrationProgress() { + // var displayStreamingMsg = false + // this.subscription = interval(5000).subscribe((x) => { + // this.fetch.getProgress().subscribe({ + // next: (res: IProgress) => { + // if (res.ErrorMessage == '') { + // // Checking for completion of schema migration + // if (res.ProgressStatus == ProgressStatus.SchemaMigrationComplete) { + // localStorage.setItem(MigrationDetails.SchemaMigrationProgress, '100') + // this.schemaMigrationProgress = parseInt( + // localStorage.getItem(MigrationDetails.SchemaMigrationProgress) as string + // ) + // if (this.isForeignKeySkipped && this.isIndexSkipped) { + // if (this.selectedMigrationMode == MigrationModes.schemaOnly) { + // this.markMigrationComplete() + // } else if (this.selectedMigrationType == MigrationTypes.lowDowntimeMigration) { + // this.markSchemaMigrationComplete() + // this.generatingResources = true + // localStorage.setItem( + // MigrationDetails.GeneratingResources, + // this.generatingResources.toString() + // ) + // if (!displayStreamingMsg) { + // this.snack.openSnackBar('Setting up dataflow and datastream jobs', 'Close') + // displayStreamingMsg = true + // } + // } + // } else { + // if (!this.isIndexSkipped) { + + // } else if (!this.isForeignKeySkipped) { + + // } else { + // this.markSchemaMigrationComplete() + // this.hasDataMigrationStarted = true + // localStorage.setItem( + // MigrationDetails.HasDataMigrationStarted, + // this.hasDataMigrationStarted.toString() + // ) + // } + // } + // } else if (res.ProgressStatus == ProgressStatus.DataMigrationComplete) { + // if (this.selectedMigrationType != MigrationTypes.lowDowntimeMigration) { + // this.hasDataMigrationStarted = true + // localStorage.setItem( + // MigrationDetails.HasDataMigrationStarted, + // this.hasDataMigrationStarted.toString() + // ) + // } + // this.generatingResources = false + // localStorage.setItem( + // MigrationDetails.GeneratingResources, + // this.generatingResources.toString() + // ) + // this.markMigrationComplete() + // } + // // Checking for data migration in progress + // else if (res.ProgressStatus == ProgressStatus.DataWriteInProgress) { + // this.markSchemaMigrationComplete() + // this.hasDataMigrationStarted = true + // localStorage.setItem( + // MigrationDetails.HasDataMigrationStarted, + // this.hasDataMigrationStarted.toString() + // ) + // localStorage.setItem(MigrationDetails.DataMigrationProgress, res.Progress.toString()) + // this.dataMigrationProgress = parseInt( + // localStorage.getItem(MigrationDetails.DataMigrationProgress) as string + // ) + // } else if (res.ProgressStatus == ProgressStatus.ForeignKeyUpdateComplete) { + // this.markMigrationComplete() + // } + // // Checking for foreign key update in progress + // else if (res.ProgressStatus == ProgressStatus.ForeignKeyUpdateInProgress) { + // this.markSchemaMigrationComplete() + // if (this.selectedMigrationType == MigrationTypes.bulkMigration) { + // this.hasDataMigrationStarted = true + // localStorage.setItem( + // MigrationDetails.HasDataMigrationStarted, + // this.hasDataMigrationStarted.toString() + // ) + // } + // this.markForeignKeyUpdateInitiation() + // this.dataMigrationProgress = 100 + // localStorage.setItem( + // MigrationDetails.DataMigrationProgress, + // this.dataMigrationProgress.toString() + // ) + // localStorage.setItem( + // MigrationDetails.ForeignKeyUpdateProgress, + // res.Progress.toString() + // ) + // this.foreignKeyUpdateProgress = parseInt( + // localStorage.getItem(MigrationDetails.ForeignKeyUpdateProgress) as string + // ) + // this.generatingResources = false + // localStorage.setItem( + // MigrationDetails.GeneratingResources, + // this.generatingResources.toString() + // ) + // this.fetchGeneratedResources() + // } + // } else { + // this.errorMessage = res.ErrorMessage + // this.subscription.unsubscribe() + // this.isMigrationInProgress = !this.isMigrationInProgress + // this.snack.openSnackBarWithoutTimeout(this.errorMessage, 'Close') + // this.schemaProgressMessage = 'Schema migration cancelled!' + // this.dataProgressMessage = 'Data migration cancelled!' + // this.foreignKeyProgressMessage = 'Foreign key update cancelled!' + // this.generatingResources = false + // this.isLowDtMigrationRunning = false + // this.clearLocalStorage() + // } + // }, + // error: (err: any) => { + // this.snack.openSnackBar(err.error, 'Close') + // this.isMigrationInProgress = !this.isMigrationInProgress + // this.clearLocalStorage() + // }, + // }) + // }) + // } + subscribeMigrationProgress() { - var displayStreamingMsg = false - this.subscription = interval(5000).subscribe((x) => { + let displayStreamingMsg = false; + this.subscription = interval(5000).subscribe(() => { this.fetch.getProgress().subscribe({ next: (res: IProgress) => { - if (res.ErrorMessage == '') { - // Checking for completion of schema migration - if (res.ProgressStatus == ProgressStatus.SchemaMigrationComplete) { - localStorage.setItem(MigrationDetails.SchemaMigrationProgress, '100') - this.schemaMigrationProgress = parseInt( - localStorage.getItem(MigrationDetails.SchemaMigrationProgress) as string - ) - if (this.selectedMigrationMode == MigrationModes.schemaOnly) { - this.markMigrationComplete() - } else if (this.selectedMigrationType == MigrationTypes.lowDowntimeMigration) { - this.markSchemaMigrationComplete() - this.generatingResources = true - localStorage.setItem( - MigrationDetails.GeneratingResources, - this.generatingResources.toString() - ) - if (!displayStreamingMsg) { - this.snack.openSnackBar('Setting up dataflow and datastream jobs', 'Close') - displayStreamingMsg = true - } - } else { - this.markSchemaMigrationComplete() - this.hasDataMigrationStarted = true - localStorage.setItem( - MigrationDetails.HasDataMigrationStarted, - this.hasDataMigrationStarted.toString() - ) - } - } else if (res.ProgressStatus == ProgressStatus.DataMigrationComplete) { - if (this.selectedMigrationType != MigrationTypes.lowDowntimeMigration) { - this.hasDataMigrationStarted = true - localStorage.setItem( - MigrationDetails.HasDataMigrationStarted, - this.hasDataMigrationStarted.toString() - ) - } - this.generatingResources = false - localStorage.setItem( - MigrationDetails.GeneratingResources, - this.generatingResources.toString() - ) - this.markMigrationComplete() - } - // Checking for data migration in progress - else if (res.ProgressStatus == ProgressStatus.DataWriteInProgress) { - this.markSchemaMigrationComplete() - this.hasDataMigrationStarted = true - localStorage.setItem( - MigrationDetails.HasDataMigrationStarted, - this.hasDataMigrationStarted.toString() - ) - localStorage.setItem(MigrationDetails.DataMigrationProgress, res.Progress.toString()) - this.dataMigrationProgress = parseInt( - localStorage.getItem(MigrationDetails.DataMigrationProgress) as string - ) - } else if (res.ProgressStatus == ProgressStatus.ForeignKeyUpdateComplete) { - this.markMigrationComplete() - } - // Checking for foreign key update in progress - else if (res.ProgressStatus == ProgressStatus.ForeignKeyUpdateInProgress) { - this.markSchemaMigrationComplete() - if (this.selectedMigrationType == MigrationTypes.bulkMigration) { - this.hasDataMigrationStarted = true - localStorage.setItem( - MigrationDetails.HasDataMigrationStarted, - this.hasDataMigrationStarted.toString() - ) - } - this.markForeignKeyUpdateInitiation() - this.dataMigrationProgress = 100 - localStorage.setItem( - MigrationDetails.DataMigrationProgress, - this.dataMigrationProgress.toString() - ) - localStorage.setItem( - MigrationDetails.ForeignKeyUpdateProgress, - res.Progress.toString() - ) - this.foreignKeyUpdateProgress = parseInt( - localStorage.getItem(MigrationDetails.ForeignKeyUpdateProgress) as string - ) - this.generatingResources = false - localStorage.setItem( - MigrationDetails.GeneratingResources, - this.generatingResources.toString() - ) - this.fetchGeneratedResources() + if (res.ErrorMessage === '') { + switch (res.ProgressStatus) { + case ProgressStatus.SchemaMigrationComplete: + this.handleSchemaMigrationComplete(); + break; + + case ProgressStatus.IndexUpdateComplete: + this.handleIndexCreationComplete(); + break; + + case ProgressStatus.ForeignKeyUpdateComplete: + this.handleForeignKeyUpdateComplete(); + break; + + case ProgressStatus.DataMigrationComplete: + this.handleDataMigrationComplete(); + break; + + case ProgressStatus.DataWriteInProgress: + this.updateDataMigrationProgress(res.Progress); + break; + + case ProgressStatus.IndexUpdateInProgress: + this.updateIndexProgress(res.Progress); + break; + + case ProgressStatus.ForeignKeyUpdateInProgress: + this.updateForeignKeyProgress(res.Progress); + break; + + default: + break; } } else { - this.errorMessage = res.ErrorMessage - this.subscription.unsubscribe() - this.isMigrationInProgress = !this.isMigrationInProgress - this.snack.openSnackBarWithoutTimeout(this.errorMessage, 'Close') - this.schemaProgressMessage = 'Schema migration cancelled!' - this.dataProgressMessage = 'Data migration cancelled!' - this.foreignKeyProgressMessage = 'Foreign key update cancelled!' - this.generatingResources = false - this.isLowDtMigrationRunning = false - this.clearLocalStorage() + this.handleMigrationError(res.ErrorMessage); } }, error: (err: any) => { - this.snack.openSnackBar(err.error, 'Close') - this.isMigrationInProgress = !this.isMigrationInProgress - this.clearLocalStorage() + this.snack.openSnackBar(err.error, 'Close'); + this.isMigrationInProgress = false; + this.clearLocalStorage(); }, - }) - }) + }); + }); + } + + private handleSchemaMigrationComplete() { + localStorage.setItem(MigrationDetails.SchemaMigrationProgress, '100'); + this.schemaMigrationProgress = 100; + this.markSchemaMigrationComplete(); + + if (this.isIndexSkipped && this.isForeignKeySkipped) { + // If both are skipped proceed to data migration + this.startDataMigration(); + } else if (!this.isIndexSkipped) { + this.markIndexUpdateInitiation(); + } else if (!this.isForeignKeySkipped) { + this.markForeignKeyUpdateInitiation(); + } + } + + private handleIndexCreationComplete() { + this.markSchemaMigrationComplete() + this.markIndexCreationComplete(); + if (this.isForeignKeySkipped) { + this.startDataMigration(); + } else { + this.markForeignKeyUpdateInitiation(); + } + } + + private handleForeignKeyUpdateComplete() { + this.markSchemaMigrationComplete() + this.markIndexCreationComplete(); + this.markForeignKeyUpdateComplete(); + this.startDataMigration(); + } + + private handleDataMigrationComplete() { + this.generatingResources = false; + localStorage.setItem(MigrationDetails.GeneratingResources, 'false'); + this.markMigrationComplete(); } + + private updateDataMigrationProgress(progress: number) { + this.hasDataMigrationStarted = true; + localStorage.setItem(MigrationDetails.HasDataMigrationStarted, 'true'); + localStorage.setItem(MigrationDetails.DataMigrationProgress, progress.toString()); + this.dataMigrationProgress = progress; + } + + private updateIndexProgress(progress: number) { + this.markSchemaMigrationComplete() + this.markIndexUpdateInitiation() + localStorage.setItem(MigrationDetails.IndexUpdateProgress, progress.toString()); + this.indexUpdateProgress = progress; + } + + private updateForeignKeyProgress(progress: number) { + this.markSchemaMigrationComplete() + this.markIndexCreationComplete() + this.markForeignKeyUpdateInitiation() + localStorage.setItem(MigrationDetails.ForeignKeyUpdateProgress, progress.toString()); + this.foreignKeyUpdateProgress = progress; + } + + private startDataFlowSetup() { + this.generatingResources = true; + localStorage.setItem(MigrationDetails.GeneratingResources, 'true'); + this.snack.openSnackBar('Setting up dataflow and datastream jobs', 'Close'); + } + + private startDataMigration() { + if (this.selectedMigrationMode === MigrationModes.schemaOnly) { + this.markMigrationComplete(); + } else if (this.selectedMigrationType === MigrationTypes.lowDowntimeMigration) { + this.startDataFlowSetup(); + } else { + this.hasDataMigrationStarted = true; + localStorage.setItem(MigrationDetails.HasDataMigrationStarted, 'true'); + } + } + + private handleMigrationError(errorMessage: string) { + this.errorMessage = errorMessage; + this.subscription.unsubscribe(); + this.isMigrationInProgress = false; + + this.snack.openSnackBarWithoutTimeout(this.errorMessage, 'Close'); + this.schemaProgressMessage = 'Schema migration cancelled!'; + this.dataProgressMessage = 'Data migration cancelled!'; + this.foreignKeyProgressMessage = 'Foreign key update cancelled!'; + this.generatingResources = false; + this.isLowDtMigrationRunning = false; + this.clearLocalStorage(); + } + markForeignKeyUpdateInitiation() { - this.dataMigrationProgress = 100 - this.dataProgressMessage = 'Data migration completed successfully!' - localStorage.setItem( - MigrationDetails.DataMigrationProgress, - this.dataMigrationProgress.toString() - ) - localStorage.setItem( - MigrationDetails.DataMigrationProgress, - this.dataMigrationProgress.toString() - ) this.hasForeignKeyUpdateStarted = true this.foreignKeyUpdateProgress = parseInt( localStorage.getItem(MigrationDetails.ForeignKeyUpdateProgress) as string ) } + markIndexUpdateInitiation() { + this.hasIndexUpdateStarted = true + this.indexUpdateProgress = parseInt( + localStorage.getItem(MigrationDetails.IndexUpdateProgress) as string + ) + } + + markForeignKeyUpdateComplete() { + this.foreignKeyUpdateProgress = 100 + this.foreignKeyProgressMessage = 'Foreign key updated successfully!' + localStorage.setItem( + MigrationDetails.ForeignKeyUpdateProgress, + this.foreignKeyUpdateProgress.toString() + ) + localStorage.setItem( + MigrationDetails.ForeignKeyProgressMessage, + this.foreignKeyProgressMessage.toString() + ) + } markSchemaMigrationComplete() { this.schemaMigrationProgress = 100 this.schemaProgressMessage = 'Schema migration completed successfully!' @@ -946,6 +1132,16 @@ export class PrepareMigrationComponent implements OnInit { localStorage.setItem(MigrationDetails.SchemaProgressMessage, this.schemaProgressMessage) } + markIndexCreationComplete() { + this.indexUpdateProgress = 100 + this.indexProgressMessage = 'Index update completed successfully!' + localStorage.setItem( + MigrationDetails.IndexUpdateProgress, + this.indexUpdateProgress.toString() + ) + localStorage.setItem(MigrationDetails.IndexProgressMessage, this.indexProgressMessage) + } + downloadConfiguration() { this.fetch.getSourceProfile().subscribe({ next: (res: IMigrationProfile) => { @@ -1028,10 +1224,10 @@ export class PrepareMigrationComponent implements OnInit { DlqPubsubTopicUrl: '', DlqPubsubSubscriptionName: '', DlqPubsubSubscriptionUrl: '', - MonitoringDashboardName:'', - MonitoringDashboardUrl:'', - AggMonitoringDashboardName:'', - AggMonitoringDashboardUrl:'', + MonitoringDashboardName: '', + MonitoringDashboardUrl: '', + AggMonitoringDashboardName: '', + AggMonitoringDashboardUrl: '', DataflowGcloudCmd: '', ShardToShardResourcesMap: new Map(), } @@ -1041,6 +1237,7 @@ export class PrepareMigrationComponent implements OnInit { localStorage.setItem(MigrationDetails.MigrationMode, this.selectedMigrationMode) localStorage.setItem(MigrationDetails.MigrationType, this.selectedMigrationType) localStorage.setItem(MigrationDetails.isForeignKeySkipped, this.isForeignKeySkipped.toString()) + localStorage.setItem(MigrationDetails.isIndexSkipped, this.isIndexSkipped.toString()) localStorage.setItem( MigrationDetails.IsMigrationInProgress, this.isMigrationInProgress.toString() @@ -1088,10 +1285,10 @@ export class PrepareMigrationComponent implements OnInit { prepareGeneratedResourcesTableData(resourcesGenerated: IGeneratedResources) { for (let [shardId, resourceList] of resourcesGenerated.ShardToShardResourcesMap) { - for (let resource of resourceList) { - resource.DataShardId = shardId - } - this.displayedResources.push(...resourceList) + for (let resource of resourceList) { + resource.DataShardId = shardId + } + this.displayedResources.push(...resourceList) } this.displayedResourcesDataSource = new MatTableDataSource(this.displayedResources) this.displayedResourcesDataSource.paginator = this.paginator diff --git a/ui/src/app/model/migrate.ts b/ui/src/app/model/migrate.ts index 44657073bb..f37ab40522 100644 --- a/ui/src/app/model/migrate.ts +++ b/ui/src/app/model/migrate.ts @@ -10,6 +10,7 @@ export default interface IMigrationDetails { MigrationMode: string IsSharded: boolean skipForeignKeys: boolean + skipIndexes: boolean } export interface IProgress { diff --git a/webv2/api/reports.go b/webv2/api/reports.go index fb28471b4e..c63c6ff3fd 100644 --- a/webv2/api/reports.go +++ b/webv2/api/reports.go @@ -6,17 +6,13 @@ import ( "encoding/json" "fmt" "net/http" - "os" - "path/filepath" "strings" "time" - "github.com/GoogleCloudPlatform/spanner-migration-tool/common/utils" "github.com/GoogleCloudPlatform/spanner-migration-tool/conversion" "github.com/GoogleCloudPlatform/spanner-migration-tool/internal/reports" "github.com/GoogleCloudPlatform/spanner-migration-tool/spanner/ddl" "github.com/GoogleCloudPlatform/spanner-migration-tool/webv2/session" - "github.com/GoogleCloudPlatform/spanner-migration-tool/webv2/utilities" ) type ReportAPIHandler struct { @@ -24,28 +20,6 @@ type ReportAPIHandler struct { ReportGenerator reports.ReportInterface } -// getReportFile generates report file and returns file path. -func (reportHandler *ReportAPIHandler) GetReportFile(w http.ResponseWriter, r *http.Request) { - ioHelper := &utils.IOStreams{In: os.Stdin, Out: os.Stdout} - var err error - now := time.Now() - filePrefix, err := utilities.GetFilePrefix(now) - if err != nil { - http.Error(w, fmt.Sprintf("Can not get file prefix : %v", err), http.StatusInternalServerError) - } - reportFileName := "frontend/" + filePrefix - sessionState := session.GetSessionState() - sessionState.Conv.ConvLock.Lock() - defer sessionState.Conv.ConvLock.Unlock() - reportHandler.Report.GenerateReport(sessionState.Driver, nil, ioHelper.BytesRead, "", sessionState.Conv, reportFileName, sessionState.DbName, ioHelper.Out) - reportAbsPath, err := filepath.Abs(reportFileName) - if err != nil { - http.Error(w, fmt.Sprintf("Can not create absolute path : %v", err), http.StatusInternalServerError) - } - w.WriteHeader(http.StatusOK) - w.Write([]byte(reportAbsPath)) -} - // generates a downloadable structured report and send it as a JSON response func (reportHandler *ReportAPIHandler) GetDStructuredReport(w http.ResponseWriter, r *http.Request) { sessionState := session.GetSessionState() @@ -85,7 +59,7 @@ func GetDSpannerDDL(w http.ResponseWriter, r *http.Request) { defer sessionState.Conv.ConvLock.RUnlock() conv := sessionState.Conv now := time.Now() - spDDL := ddl.GetDDL(ddl.Config{Comments: true, ProtectIds: false, Tables: true, ForeignKeys: true, SpDialect: conv.SpDialect, Source: sessionState.Driver}, conv.SpSchema, conv.SpSequences) + spDDL := ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: false, Tables: true, ForeignKeys: true, Indexes: true, SpDialect: conv.SpDialect, Source: sessionState.Driver}, conv.SpSchema, conv.SpSequences) if len(spDDL) == 0 { spDDL = []string{"\n-- Schema is empty -- no tables found\n"} } diff --git a/webv2/api/reports_test.go b/webv2/api/reports_test.go index 2018763ff5..282edc3e9b 100644 --- a/webv2/api/reports_test.go +++ b/webv2/api/reports_test.go @@ -32,23 +32,6 @@ func (r *GenerateReportMock) GenerateStructuredReport(driverName string, dbName } } -func TestGetReportFile(t *testing.T) { - reportAPIHandler := api.ReportAPIHandler{ - Report: &ReportMock{}, - } - req, err := http.NewRequest("POST", "/report", nil) - if err != nil { - t.Fatal(err) - } - req.Header.Set("Content-Type", "application/json") - rr := httptest.NewRecorder() - handler := http.HandlerFunc(reportAPIHandler.GetReportFile) - handler.ServeHTTP(rr, req) - //API generates a report file and returns a file path - reportFilePath := string(rr.Body.String()) - assert.Contains(t, reportFilePath, "frontend") -} - func TestGetDStructuredReport(t *testing.T) { reportAPIHandler := api.ReportAPIHandler{ ReportGenerator: &GenerateReportMock{}, diff --git a/webv2/routes.go b/webv2/routes.go index 90970c6ca8..3b2f6786d9 100644 --- a/webv2/routes.go +++ b/webv2/routes.go @@ -26,6 +26,7 @@ import ( storageaccessor "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/storage" "github.com/GoogleCloudPlatform/spanner-migration-tool/conversion" "github.com/GoogleCloudPlatform/spanner-migration-tool/expressions_api" + "github.com/GoogleCloudPlatform/spanner-migration-tool/internal/reports" "github.com/GoogleCloudPlatform/spanner-migration-tool/webv2/api" "github.com/GoogleCloudPlatform/spanner-migration-tool/webv2/config" "github.com/GoogleCloudPlatform/spanner-migration-tool/webv2/primarykey" @@ -41,7 +42,8 @@ func getRoutes() *mux.Router { frontendRoot, _ := fs.Sub(FrontendDir, "ui/dist/ui") frontendStatic := http.FileServer(http.FS(frontendRoot)) reportAPIHandler := api.ReportAPIHandler{ - Report: &conversion.ReportImpl{}, + Report: &conversion.ReportImpl{}, + ReportGenerator: &reports.ReportImpl{}, } ctx := context.Background() ddlVerifier, _ := expressions_api.NewDDLVerifierImpl(ctx, "", "") @@ -71,11 +73,9 @@ func getRoutes() *mux.Router { router.HandleFunc("/seqDdl", api.GetSequenceDDL).Methods("GET") router.HandleFunc("/conversion", api.GetConversionRate).Methods("GET") router.HandleFunc("/typemap", api.GetTypeMap).Methods("GET") - router.HandleFunc("/report", reportAPIHandler.GetReportFile).Methods("GET") router.HandleFunc("/downloadStructuredReport", reportAPIHandler.GetDStructuredReport).Methods("GET") router.HandleFunc("/downloadTextReport", reportAPIHandler.GetDTextReport).Methods("GET") router.HandleFunc("/downloadDDL", api.GetDSpannerDDL).Methods("GET") - router.HandleFunc("/schema", getSchemaFile).Methods("GET") router.HandleFunc("/applyrule", api.ApplyRule).Methods("POST") router.HandleFunc("/dropRule", api.DropRule).Methods("POST") router.HandleFunc("/typemap/table", table.UpdateTableSchema).Methods("POST") diff --git a/webv2/types/types.go b/webv2/types/types.go index 0f03968a7a..234d73ab0f 100644 --- a/webv2/types/types.go +++ b/webv2/types/types.go @@ -66,6 +66,7 @@ type MigrationDetails struct { MigrationType string `json:"MigrationType"` IsSharded bool `json:"IsSharded"` SkipForeignKeys bool `json:"skipForeignKeys"` + SkipIndexes bool `json:"skipIndexes"` } type TargetDetails struct { diff --git a/webv2/web.go b/webv2/web.go index f0b62ebd39..9d7e8307ec 100644 --- a/webv2/web.go +++ b/webv2/web.go @@ -30,7 +30,6 @@ import ( "path/filepath" "strconv" "strings" - "time" instance "cloud.google.com/go/spanner/admin/instance/apiv1" storageclient "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/clients/storage" @@ -479,29 +478,6 @@ func fetchLastLoadedSessionDetails(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(convm) } -// getSchemaFile generates schema file and returns file path. -func getSchemaFile(w http.ResponseWriter, r *http.Request) { - ioHelper := &utils.IOStreams{In: os.Stdin, Out: os.Stdout} - var err error - now := time.Now() - filePrefix, err := utilities.GetFilePrefix(now) - if err != nil { - http.Error(w, fmt.Sprintf("Can not get file prefix : %v", err), http.StatusInternalServerError) - } - schemaFileName := "frontend/" + filePrefix + "schema.txt" - - sessionState := session.GetSessionState() - sessionState.Conv.ConvLock.RLock() - defer sessionState.Conv.ConvLock.RUnlock() - conversion.WriteSchemaFile(sessionState.Conv, now, schemaFileName, ioHelper.Out, sessionState.Driver) - schemaAbsPath, err := filepath.Abs(schemaFileName) - if err != nil { - http.Error(w, fmt.Sprintf("Can not create absolute path : %v", err), http.StatusInternalServerError) - } - w.WriteHeader(http.StatusOK) - w.Write([]byte(schemaAbsPath)) -} - // getIssueDescription maps IssueDB's Category to corresponding CategoryDescription(if present), // or to the Brief if not present and pass the map to frontend to be used in assessment report UI func getIssueDescription(w http.ResponseWriter, r *http.Request) { @@ -648,13 +624,16 @@ func migrate(w http.ResponseWriter, r *http.Request) { // Set env variable SKIP_METRICS_POPULATION to true in case of dev testing sessionState.Conv.Audit.SkipMetricsPopulation = os.Getenv("SKIP_METRICS_POPULATION") == "true" if details.MigrationMode == helpers.SCHEMA_ONLY { + schemaCmd := &cmd.SchemaCmd{ + SkipForeignKeys: details.SkipForeignKeys, + SkipIndexes: details.SkipIndexes, + } log.Println("Starting schema only migration") sessionState.Conv.Audit.MigrationType = migration.MigrationData_SCHEMA_ONLY.Enum() - go cmd.MigrateDatabase(ctx, migrationProjectId, targetProfile, sourceProfile, dbName, &ioHelper, &cmd.SchemaCmd{}, sessionState.Conv, &sessionState.Error) + go cmd.MigrateDatabase(ctx, migrationProjectId, targetProfile, sourceProfile, dbName, &ioHelper, schemaCmd, sessionState.Conv, &sessionState.Error) } else if details.MigrationMode == helpers.DATA_ONLY { dataCmd := &cmd.DataCmd{ - SkipForeignKeys: details.SkipForeignKeys, - WriteLimit: cmd.DefaultWritersLimit, + WriteLimit: cmd.DefaultWritersLimit, } log.Println("Starting data only migration") sessionState.Conv.Audit.MigrationType = migration.MigrationData_DATA_ONLY.Enum() @@ -663,6 +642,7 @@ func migrate(w http.ResponseWriter, r *http.Request) { schemaAndDataCmd := &cmd.SchemaAndDataCmd{ SkipForeignKeys: details.SkipForeignKeys, WriteLimit: cmd.DefaultWritersLimit, + SkipIndexes: details.SkipIndexes, } log.Println("Starting schema and data migration") sessionState.Conv.Audit.MigrationType = migration.MigrationData_SCHEMA_AND_DATA.Enum()