Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for skip indexes and moved skip feature to schema migration #996

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 75 additions & 52 deletions accessors/spanner/spanner_accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type SpannerAccessor interface {
ValidateDDL(ctx context.Context, dbURI string) error
// UpdateDDLForeignKeys updates the Spanner database with foreign key constraints using ALTER TABLE statements.
UpdateDDLForeignKeys(ctx context.Context, dbURI string, conv *internal.Conv, driver string, migrationType string)
// UpdateDDLIndexes updates the Spanner database with indexes using CREATE INDEX statements.
UpdateDDLIndexes(ctx context.Context, dbURI string, conv *internal.Conv, driver string, migrationType string)
// Deletes a database.
DropDatabase(ctx context.Context, dbURI string) error
//Runs a query against the provided spanner database and returns if the executed DML is validate or not
Expand Down Expand Up @@ -287,12 +289,7 @@ func (sp *SpannerAccessorImpl) CreateDatabase(ctx context.Context, dbURI string,
req.DatabaseDialect = adminpb.DatabaseDialect_POSTGRESQL
} else {
req.CreateStatement = "CREATE DATABASE `" + dbName + "`"
if migrationType == constants.DATAFLOW_MIGRATION {
req.ExtraStatements = ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: true, ForeignKeys: true, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, conv.SpSequences)
} else {
req.ExtraStatements = ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: true, ForeignKeys: false, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, conv.SpSequences)
}

req.ExtraStatements = ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: true, ForeignKeys: false, Indexes: false, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, conv.SpSequences)
}

op, err := sp.AdminClient.CreateDatabase(ctx, req)
Expand All @@ -316,7 +313,7 @@ func (sp *SpannerAccessorImpl) UpdateDatabase(ctx context.Context, dbURI string,
// Spanner DDL doesn't accept them), and protects table and col names
// using backticks (to avoid any issues with Spanner reserved words).
// Foreign Keys are set to false since we create them post data migration.
schema := ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: true, ForeignKeys: false, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, conv.SpSequences)
schema := ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: true, ForeignKeys: false, Indexes: false, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, conv.SpSequences)
req := &adminpb.UpdateDatabaseDdlRequest{
Database: dbURI,
Statements: schema,
Expand Down Expand Up @@ -383,35 +380,8 @@ func (sp *SpannerAccessorImpl) ValidateDDL(ctx context.Context, dbURI string) er
return nil
}

// UpdateDDLForeignKeys updates the Spanner database with foreign key
// constraints using ALTER TABLE statements.
func (sp *SpannerAccessorImpl) UpdateDDLForeignKeys(ctx context.Context, dbURI string, conv *internal.Conv, driver string, migrationType string) {

if conv.SpDialect != constants.DIALECT_POSTGRESQL && migrationType == constants.DATAFLOW_MIGRATION {
//foreign keys were applied as part of CreateDatabase
return
}

// The schema we send to Spanner excludes comments (since Cloud
// Spanner DDL doesn't accept them), and protects table and col names
// using backticks (to avoid any issues with Spanner reserved words).
// Sequences will not be passed as they have already been created.
fkStmts := ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: false, ForeignKeys: true, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, make(map[string]ddl.Sequence))
if len(fkStmts) == 0 {
return
}
if len(fkStmts) > 50 {
logger.Log.Warn(`
Warning: Large number of foreign keys detected. Spanner can take a long amount of
time to create foreign keys (over 5 mins per batch of Foreign Keys even with no data).
Spanner migration tool does not have control over a single foreign key creation time. The number
of concurrent Foreign Key Creation Requests sent to spanner can be increased by
tweaking the MaxWorkers variable (https://github.com/GoogleCloudPlatform/spanner-migration-tool/blob/master/conversion/conversion.go#L89).
However, setting it to a very high value might lead to exceeding the admin quota limit. Spanner migration tool tries to stay under the
admin quota limit by spreading the FK creation requests over time.`)
}
msg := fmt.Sprintf("Updating schema of database %s with foreign key constraints ...", dbURI)
conv.Audit.Progress = *internal.NewProgress(int64(len(fkStmts)), msg, internal.Verbose(), true, int(internal.ForeignKeyUpdateInProgress))
func (sp *SpannerAccessorImpl) updateForeignKeyAndIndexes(stmts []string, conv *internal.Conv, msg string, ctx context.Context, dbURI string, progressTracker, completionTracker int) {
conv.Audit.Progress = *internal.NewProgress(int64(len(stmts)), msg, internal.Verbose(), true, progressTracker)

workers := make(chan int, MaxWorkers)
for i := 1; i <= MaxWorkers; i++ {
Expand All @@ -420,13 +390,13 @@ func (sp *SpannerAccessorImpl) UpdateDDLForeignKeys(ctx context.Context, dbURI s
var progressMutex sync.Mutex
progress := int64(0)

// We dispatch parallel foreign key create requests to ensure the backfill runs in parallel to reduce overall time.
// We dispatch parallel requests to ensure the backfill runs in parallel to reduce overall time.
// This cuts down the time taken to a third (approx) compared to Serial and Batched creation. We also do not want to create
// too many requests and get throttled due to network or hitting catalog memory limits.
// Ensure atmost `MaxWorkers` go routines run in parallel that each update the ddl with one foreign key statement.
for _, fkStmt := range fkStmts {
for _, stmt := range stmts {
workerID := <-workers
go func(fkStmt string, workerID int) {
go func(stmt string, workerID int) {
defer func() {
// Locking the progress reporting otherwise progress results displayed could be in random order.
progressMutex.Lock()
Expand All @@ -435,37 +405,90 @@ func (sp *SpannerAccessorImpl) UpdateDDLForeignKeys(ctx context.Context, dbURI s
progressMutex.Unlock()
workers <- workerID
}()
internal.VerbosePrintf("Submitting new FK create request: %s\n", fkStmt)
logger.Log.Debug("Submitting new FK create request", zap.String("fkStmt", fkStmt))
internal.VerbosePrintf("Submitting new DDL request: %s\n", stmt)
logger.Log.Debug("Submitting new DDL request", zap.String("stmt", stmt))

op, err := sp.AdminClient.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
Database: dbURI,
Statements: []string{fkStmt},
Statements: []string{stmt},
})
if err != nil {
logger.Log.Debug("Can't add foreign key with statement:" + fkStmt + "\n due to error:" + err.Error() + " Skipping this foreign key...\n")
conv.Unexpected(fmt.Sprintf("Can't add foreign key with statement %s: %s", fkStmt, err))
logger.Log.Debug("Can't update schema with statement:" + stmt + "\n due to error:" + err.Error() + " Skipping this request...\n")
conv.Unexpected(fmt.Sprintf("Can't update schema with statement %s: %s", stmt, err))
return
}
if err := op.Wait(ctx); err != nil {
logger.Log.Debug("Can't add foreign key with statement:" + fkStmt + "\n due to error:" + err.Error() + " Skipping this foreign key...\n")
conv.Unexpected(fmt.Sprintf("Can't add foreign key with statement %s: %s", fkStmt, err))
logger.Log.Debug("Can't update schema with statement:" + stmt + "\n due to error:" + err.Error() + " Skipping this request...\n")
conv.Unexpected(fmt.Sprintf("Can't update schema with statement %s: %s", stmt, err))
return
}
internal.VerbosePrintln("Updated schema with statement: " + fkStmt)
logger.Log.Debug("Updated schema with statement", zap.String("fkStmt", fkStmt))
}(fkStmt, workerID)
// Send out an FK creation request every second, with total of maxWorkers request being present in a batch.
internal.VerbosePrintln("Updated schema with statement: " + stmt)
logger.Log.Debug("Updated schema with statement", zap.String("stmt", stmt))
}(stmt, workerID)
// Send out a request every second, with total of maxWorkers request being present in a batch.
time.Sleep(time.Second)
}
// Wait for all the goroutines to finish.
for i := 1; i <= MaxWorkers; i++ {
<-workers
}
conv.Audit.Progress.UpdateProgress("Foreign key update complete.", 100, internal.ForeignKeyUpdateComplete)
conv.Audit.Progress.UpdateProgress("Update complete.", 100, internal.ProgressStatus(completionTracker))
conv.Audit.Progress.Done()
}

// UpdateDDLForeignKeys updates the Spanner database with foreign key
// constraints using ALTER TABLE statements.
func (sp *SpannerAccessorImpl) UpdateDDLForeignKeys(ctx context.Context, dbURI string, conv *internal.Conv, driver string, migrationType string) {

// The schema we send to Spanner excludes comments (since Cloud
// Spanner DDL doesn't accept them), and protects table and col names
// using backticks (to avoid any issues with Spanner reserved words).
// Sequences will not be passed as they have already been created.
fkStmts := ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: false, ForeignKeys: true, Indexes: false, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, make(map[string]ddl.Sequence))
if len(fkStmts) == 0 {
return
}
if len(fkStmts) > 50 {
logger.Log.Warn(`
Warning: Large number of foreign keys detected. Spanner can take a long amount of
time to create foreign keys (over 5 mins per batch of Foreign Keys even with no data).
Spanner migration tool does not have control over a single foreign key creation time. The number
of concurrent requests sent to spanner can be increased by
tweaking the MaxWorkers variable (https://github.com/GoogleCloudPlatform/spanner-migration-tool/blob/master/conversion/conversion.go#L89).
However, setting it to a very high value might lead to exceeding the admin quota limit. Spanner migration tool tries to stay under the
admin quota limit by spreading the requests over time.`)
}
msg := fmt.Sprintf("Updating schema of database %s with foreign key constraints ...", dbURI)
sp.updateForeignKeyAndIndexes(fkStmts, conv, msg, ctx, dbURI, int(internal.ForeignKeyUpdateInProgress), int(internal.ForeignKeyUpdateComplete))

}

// UpdateDDLIndexes updates the Spanner database with indexes using CREATE INDEX statements.
func (sp *SpannerAccessorImpl) UpdateDDLIndexes(ctx context.Context, dbURI string, conv *internal.Conv, driver string, migrationType string) {

// The schema we send to Spanner excludes comments (since Cloud
// Spanner DDL doesn't accept them), and protects table and col names
// using backticks (to avoid any issues with Spanner reserved words).
// Sequences will not be passed as they have already been created.
indexStmts := ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: false, ForeignKeys: false, Indexes: true, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, make(map[string]ddl.Sequence))
if len(indexStmts) == 0 {
return
}
if len(indexStmts) > 50 {
logger.Log.Warn(`
Warning: Large number of indexes detected. Spanner can take a long amount of
time to create indexes.
Spanner migration tool does not have control over a single index creation time. The number
of concurrent requests sent to spanner can be increased by
tweaking the MaxWorkers variable (https://github.com/GoogleCloudPlatform/spanner-migration-tool/blob/master/conversion/conversion.go#L89).
However, setting it to a very high value might lead to exceeding the admin quota limit. Spanner migration tool tries to stay under the
admin quota limit by spreading the requests over time.`)
}
msg := fmt.Sprintf("Updating schema of database %s with indexes ...", dbURI)
sp.updateForeignKeyAndIndexes(indexStmts, conv, msg, ctx, dbURI, int(internal.IndexUpdateInProgress), int(internal.IndexUpdateComplete))

}

func (sp *SpannerAccessorImpl) DropDatabase(ctx context.Context, dbURI string) error {

err := sp.AdminClient.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: dbURI})
Expand All @@ -490,6 +513,6 @@ func (sp *SpannerAccessorImpl) ValidateDML(ctx context.Context, query string) (b
}
}

func (sp *SpannerAccessorImpl) Refresh(ctx context.Context, dbURI string) () {
func (sp *SpannerAccessorImpl) Refresh(ctx context.Context, dbURI string) {
sp.SpannerClient.Refresh(ctx, dbURI)
}
2 changes: 1 addition & 1 deletion cmd/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (cmd *DataCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface
}

// validateExistingDb validates that the existing spanner schema is in accordance with the one specified in the session file.
func validateExistingDb(ctx context.Context, spDialect, dbURI string, adminClient *database.DatabaseAdminClient, client *sp.Client, conv *internal.Conv) error {
func validateExistingDb(ctx context.Context, spDialect, dbURI string, adminClient *database.DatabaseAdminClient, client *sp.Client, conv *internal.Conv) error {
spA, err := spanneraccessor.NewSpannerAccessorClientImpl(ctx)
if err != nil {
return err
Expand Down
24 changes: 14 additions & 10 deletions cmd/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,18 @@ import (

// SchemaCmd struct with flags.
type SchemaCmd struct {
source string
sourceProfile string
target string
targetProfile string
filePrefix string // TODO: move filePrefix to global flags
project string
logLevel string
dryRun bool
validate bool
sessionJSON string
source string
sourceProfile string
target string
targetProfile string
filePrefix string // TODO: move filePrefix to global flags
project string
logLevel string
dryRun bool
validate bool
sessionJSON string
SkipForeignKeys bool
SkipIndexes bool
}

// Name returns the name of operation.
Expand Down Expand Up @@ -82,6 +84,8 @@ func (cmd *SchemaCmd) SetFlags(f *flag.FlagSet) {
f.BoolVar(&cmd.dryRun, "dry-run", false, "Flag for generating DDL and schema conversion report without creating a spanner database")
f.BoolVar(&cmd.validate, "validate", false, "Flag for validating if all the required input parameters are present")
f.StringVar(&cmd.sessionJSON, "session", "", "Optional. Specifies the file we restore session state from.")
f.BoolVar(&cmd.SkipForeignKeys, "skip-foreign-keys", false, "Skip creating foreign keys after schema migration is complete (ddl statements for foreign keys can still be found in the downloaded schema.ddl.txt file and the same can be applied separately)")
f.BoolVar(&cmd.SkipIndexes, "skip-indexes", false, "Skip creating indexes as a part of the schema migration (ddl statements for indexes can still be found in the downloaded schema.ddl.txt file and the same can be applied separately)")
}

func (cmd *SchemaCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus {
Expand Down
4 changes: 3 additions & 1 deletion cmd/schema_and_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type SchemaAndDataCmd struct {
target string
targetProfile string
SkipForeignKeys bool
SkipIndexes bool
filePrefix string // TODO: move filePrefix to global flags
project string
WriteLimit int64
Expand Down Expand Up @@ -80,7 +81,8 @@ func (cmd *SchemaAndDataCmd) SetFlags(f *flag.FlagSet) {
f.StringVar(&cmd.sourceProfile, "source-profile", "", "Flag for specifying connection profile for source database e.g., \"file=<path>,format=dump\"")
f.StringVar(&cmd.target, "target", "Spanner", "Specifies the target DB, defaults to Spanner (accepted values: `Spanner`)")
f.StringVar(&cmd.targetProfile, "target-profile", "", "Flag for specifying connection profile for target database e.g., \"dialect=postgresql\"")
f.BoolVar(&cmd.SkipForeignKeys, "skip-foreign-keys", false, "Skip creating foreign keys after data migration is complete (ddl statements for foreign keys can still be found in the downloaded schema.ddl.txt file and the same can be applied separately)")
f.BoolVar(&cmd.SkipForeignKeys, "skip-foreign-keys", false, "Skip creating foreign keys after schema migration is complete (ddl statements for foreign keys can still be found in the downloaded schema.ddl.txt file and the same can be applied separately)")
f.BoolVar(&cmd.SkipIndexes, "skip-indexes", false, "Skip creating indexes as a part of the schema migration (ddl statements for indexes can still be found in the downloaded schema.ddl.txt file and the same can be applied separately)")
f.StringVar(&cmd.filePrefix, "prefix", "", "File prefix for generated files")
f.StringVar(&cmd.project, "project", "", "Flag spcifying default project id for all the generated resources for the migration")
f.Int64Var(&cmd.WriteLimit, "write-limit", DefaultWritersLimit, "Write limit for writes to spanner")
Expand Down
26 changes: 16 additions & 10 deletions cmd/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func MigrateDatabase(ctx context.Context, migrationProjectId string, targetProfi
defer client.Close()
switch v := cmd.(type) {
case *SchemaCmd:
err = migrateSchema(ctx, targetProfile, sourceProfile, ioHelper, conv, dbURI, adminClient)
err = migrateSchema(ctx, targetProfile, sourceProfile, ioHelper, conv, dbURI, adminClient, v)
case *DataCmd:
bw, err = migrateData(ctx, migrationProjectId, targetProfile, sourceProfile, ioHelper, conv, dbURI, adminClient, client, v)
case *SchemaAndDataCmd:
Expand All @@ -159,7 +159,7 @@ func MigrateDatabase(ctx context.Context, migrationProjectId string, targetProfi
}

func migrateSchema(ctx context.Context, targetProfile profiles.TargetProfile, sourceProfile profiles.SourceProfile,
ioHelper *utils.IOStreams, conv *internal.Conv, dbURI string, adminClient *database.DatabaseAdminClient) error {
ioHelper *utils.IOStreams, conv *internal.Conv, dbURI string, adminClient *database.DatabaseAdminClient, cmd *SchemaCmd) error {
spA, err := spanneraccessor.NewSpannerAccessorClientImpl(ctx)
if err != nil {
return err
Expand All @@ -171,6 +171,12 @@ func migrateSchema(ctx context.Context, targetProfile profiles.TargetProfile, so
}
metricsPopulation(ctx, sourceProfile.Driver, conv)
conv.Audit.Progress.UpdateProgress("Schema migration complete.", completionPercentage, internal.SchemaMigrationComplete)
if !cmd.SkipIndexes {
spA.UpdateDDLIndexes(ctx, dbURI, conv, sourceProfile.Driver, sourceProfile.Config.ConfigType)
}
if !cmd.SkipForeignKeys {
spA.UpdateDDLForeignKeys(ctx, dbURI, conv, sourceProfile.Driver, sourceProfile.Config.ConfigType)
}
return nil
}

Expand Down Expand Up @@ -206,11 +212,7 @@ func migrateData(ctx context.Context, migrationProjectId string, targetProfile p
}
conv.Audit.Progress.UpdateProgress("Data migration complete.", completionPercentage, internal.DataMigrationComplete)
if !cmd.SkipForeignKeys {
spA, err := spanneraccessor.NewSpannerAccessorClientImpl(ctx)
if err != nil {
return bw, err
}
spA.UpdateDDLForeignKeys(ctx, dbURI, conv, sourceProfile.Driver, sourceProfile.Config.ConfigType)
fmt.Printf("skip-foreign-keys flag has been deprecated from the data migration mode and foreign keys will not be created as a part of data migration. Please use schema or schem-and-data subcommand for foreign key creation.")
}
return bw, nil
}
Expand All @@ -229,6 +231,13 @@ func migrateSchemaAndData(ctx context.Context, migrationProjectId string, target
metricsPopulation(ctx, sourceProfile.Driver, conv)
conv.Audit.Progress.UpdateProgress("Schema migration complete.", completionPercentage, internal.SchemaMigrationComplete)

if !cmd.SkipIndexes {
spA.UpdateDDLIndexes(ctx, dbURI, conv, sourceProfile.Driver, sourceProfile.Config.ConfigType)
}
if !cmd.SkipForeignKeys {
spA.UpdateDDLForeignKeys(ctx, dbURI, conv, sourceProfile.Driver, sourceProfile.Config.ConfigType)
}

// If migration type is Minimal Downtime, validate if required resources can be generated
if !conv.UI && sourceProfile.Driver == constants.MYSQL && sourceProfile.Ty == profiles.SourceProfileTypeConfig && sourceProfile.Config.ConfigType == constants.DATAFLOW_MIGRATION {
err := ValidateResourceGenerationHelper(ctx, migrationProjectId, targetProfile.Conn.Sp.Instance, sourceProfile, conv)
Expand All @@ -246,9 +255,6 @@ func migrateSchemaAndData(ctx context.Context, migrationProjectId string, target
}

conv.Audit.Progress.UpdateProgress("Data migration complete.", completionPercentage, internal.DataMigrationComplete)
if !cmd.SkipForeignKeys {
spA.UpdateDDLForeignKeys(ctx, dbURI, conv, sourceProfile.Driver, sourceProfile.Config.ConfigType)
}
return bw, nil
}

Expand Down
Loading
Loading