-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support for ALTER/DROP SUSCRIPTION
#253
Conversation
ValueColumns: []string{"connection", "publication"}, | ||
DDL: "name TEXT PRIMARY KEY, connection TEXT, publication TEXT", | ||
KeyColumns: []string{"subname"}, | ||
ValueColumns: []string{"subconninfo", "subpublication", "subskiplsn", "subenabled"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sub
prefix for all column names can be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sub
is chosen to maintain consistency with the pg_subscription catalog in PostgreSQL.
pgserver/connection_handler.go
Outdated
@@ -1059,15 +1059,15 @@ func (h *ConnectionHandler) handledPSQLCommands(statement string) (bool, error) | |||
statement = strings.ToLower(statement) | |||
// Command: \l | |||
if statement == "select d.datname as \"name\",\n pg_catalog.pg_get_userbyid(d.datdba) as \"owner\",\n pg_catalog.pg_encoding_to_char(d.encoding) as \"encoding\",\n d.datcollate as \"collate\",\n d.datctype as \"ctype\",\n d.daticulocale as \"icu locale\",\n case d.datlocprovider when 'c' then 'libc' when 'i' then 'icu' end as \"locale provider\",\n pg_catalog.array_to_string(d.datacl, e'\\n') as \"access privileges\"\nfrom pg_catalog.pg_database d\norder by 1;" { | |||
query, err := h.convertQuery(`select d.datname as "Name", 'postgres' as "Owner", 'UTF8' as "Encoding", 'en_US.UTF-8' as "Collate", 'en_US.UTF-8' as "Ctype", 'en-US' as "ICU Locale", case d.datlocprovider when 'c' then 'libc' when 'i' then 'icu' end as "locale provider", '' as "access privileges" from pg_catalog.pg_database d order by 1;`) | |||
query, err := h.convertQuery(`select d.datname as "Subscription", 'postgres' as "Owner", 'UTF8' as "Encoding", 'en_US.UTF-8' as "Collate", 'en_US.UTF-8' as "Ctype", 'en-US' as "ICU Locale", case d.datlocprovider when 'c' then 'libc' when 'i' then 'icu' end as "locale provider", '' as "access privileges" from pg_catalog.pg_database d order by 1;`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accidental changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
Enabled: enabled, | ||
Replicator: nil, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check rows.Err()
after the for loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
pgserver/logrepl/subscription.go
Outdated
var lsnValueColumns = []string{"subskiplsn"} | ||
|
||
var subscriptionMap = sync.Map{} | ||
var mu sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary and dangerous global mutex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
pgserver/logrepl/subscription.go
Outdated
} | ||
defer rows.Close() | ||
|
||
if !rows.Next() { | ||
return | ||
var tempMap = make(map[string]*Subscription) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The temp
prefix is unnecessary. It is a local variable, implying it is temporary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
pgserver/subscription_handler.go
Outdated
subscriptionName := subscriptionMatch[1] | ||
connectionString := subscriptionMatch[2] | ||
publicationName := subscriptionMatch[3] | ||
func (h *ConnectionHandler) executeAlterEnable(subscriptionConfig *SubscriptionConfig) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
executeAlterEnable
-> executeEnableSubscription(config *SubscriptionConfig)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
pgserver/subscription_handler.go
Outdated
return commitAndUpdate(sqlCtx) | ||
} | ||
|
||
func (h *ConnectionHandler) executeAlterDisable(subscriptionConfig *SubscriptionConfig) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-> executeDisableSubscription(config *SubscriptionConfig)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks.
Resolve #246
New Features
ALTER SUBSCRIPTION:
Added support for
ALTER SUBSCRIPTION mysub [ENABLE | DISABLE]
.DISABLE
: Pauses replication from the source PostgreSQL.ENABLE
: Resumes replication from the source PostgreSQL.DROP SUBSCRIPTION:
Added support for
DROP SUBSCRIPTION mysub
.Note: Dropping a subscription does not remove any previously replicated data.
Improvements
subscriptionMap
insubscription.go
to manage all subscriptions centrally.__sys__.pg_replication_lsn
into__sys__.pg_subscription
.