Skip to content

Commit 83fd453

Browse files
committed
Allow publishing partition changes via ancestors
To control whether partition changes are replicated using their own identity and schema or an ancestor's, add a new parameter that can be set per publication named 'publish_via_partition_root'. This allows replicating a partitioned table into a different partition structure on the subscriber. Author: Amit Langote <[email protected]> Reviewed-by: Rafia Sabih <[email protected]> Reviewed-by: Peter Eisentraut <[email protected]> Reviewed-by: Petr Jelinek <[email protected]> Discussion: https://www.postgresql.org/message-id/flat/CA+HiwqH=Y85vRK3mOdjEkqFK+E=ST=eQiHdpj43L=_eJMOOznQ@mail.gmail.com
1 parent 1aac32d commit 83fd453

File tree

15 files changed

+724
-174
lines changed

15 files changed

+724
-174
lines changed

Diff for: doc/src/sgml/catalogs.sgml

+10
Original file line numberDiff line numberDiff line change
@@ -5437,6 +5437,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
54375437
<entry>If true, <command>TRUNCATE</command> operations are replicated for
54385438
tables in the publication.</entry>
54395439
</row>
5440+
5441+
<row>
5442+
<entry><structfield>pubviaroot</structfield></entry>
5443+
<entry><type>bool</type></entry>
5444+
<entry></entry>
5445+
<entry>If true, operations on a leaf partition are replicated using the
5446+
identity and schema of its topmost partitioned ancestor mentioned in the
5447+
publication instead of its own.
5448+
</entry>
5449+
</row>
54405450
</tbody>
54415451
</tgroup>
54425452
</table>

Diff for: doc/src/sgml/logical-replication.sgml

+8-4
Original file line numberDiff line numberDiff line change
@@ -411,10 +411,14 @@
411411
<listitem>
412412
<para>
413413
When replicating between partitioned tables, the actual replication
414-
originates from the leaf partitions on the publisher, so partitions on
415-
the publisher must also exist on the subscriber as valid target tables.
416-
(They could either be leaf partitions themselves, or they could be
417-
further subpartitioned, or they could even be independent tables.)
414+
originates, by default, from the leaf partitions on the publisher, so
415+
partitions on the publisher must also exist on the subscriber as valid
416+
target tables. (They could either be leaf partitions themselves, or they
417+
could be further subpartitioned, or they could even be independent
418+
tables.) Publications can also specify that changes are to be replicated
419+
using the identity and schema of the partitioned root table instead of
420+
that of the individual leaf partitions in which the changes actually
421+
originate (see <xref linkend="sql-createpublication"/>).
418422
</para>
419423
</listitem>
420424
</itemizedlist>

Diff for: doc/src/sgml/ref/create_publication.sgml

+20
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,26 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
123123
</para>
124124
</listitem>
125125
</varlistentry>
126+
127+
<varlistentry>
128+
<term><literal>publish_via_partition_root</literal> (<type>boolean</type>)</term>
129+
<listitem>
130+
<para>
131+
This parameter determines whether changes in a partitioned table (or
132+
on its partitions) contained in the publication will be published
133+
using the identity and schema of the partitioned table rather than
134+
that of the individual partitions that are actually changed; the
135+
latter is the default. Enablings this allows the changes to be
136+
replicated into a non-partitioned table or a partitioned table
137+
consisting of a different set of partitions.
138+
</para>
139+
140+
<para>
141+
If this is enabled, <literal>TRUNCATE</literal> operations performed
142+
directly on partitions are not replicated.
143+
</para>
144+
</listitem>
145+
</varlistentry>
126146
</variablelist>
127147

128148
</para>

Diff for: src/backend/catalog/pg_publication.c

+35-35
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@
4242
#include "utils/rel.h"
4343
#include "utils/syscache.h"
4444

45-
static List *get_rel_publications(Oid relid);
46-
4745
/*
4846
* Check if relation can be in given publication and throws appropriate
4947
* error if not.
@@ -216,37 +214,9 @@ publication_add_relation(Oid pubid, Relation targetrel,
216214
return myself;
217215
}
218216

219-
220-
/*
221-
* Gets list of publication oids for a relation, plus those of ancestors,
222-
* if any, if the relation is a partition.
223-
*/
217+
/* Gets list of publication oids for a relation */
224218
List *
225219
GetRelationPublications(Oid relid)
226-
{
227-
List *result = NIL;
228-
229-
result = get_rel_publications(relid);
230-
if (get_rel_relispartition(relid))
231-
{
232-
List *ancestors = get_partition_ancestors(relid);
233-
ListCell *lc;
234-
235-
foreach(lc, ancestors)
236-
{
237-
Oid ancestor = lfirst_oid(lc);
238-
List *ancestor_pubs = get_rel_publications(ancestor);
239-
240-
result = list_concat(result, ancestor_pubs);
241-
}
242-
}
243-
244-
return result;
245-
}
246-
247-
/* Workhorse of GetRelationPublications() */
248-
static List *
249-
get_rel_publications(Oid relid)
250220
{
251221
List *result = NIL;
252222
CatCList *pubrellist;
@@ -373,9 +343,13 @@ GetAllTablesPublications(void)
373343

374344
/*
375345
* Gets list of all relation published by FOR ALL TABLES publication(s).
346+
*
347+
* If the publication publishes partition changes via their respective root
348+
* partitioned tables, we must exclude partitions in favor of including the
349+
* root partitioned tables.
376350
*/
377351
List *
378-
GetAllTablesPublicationRelations(void)
352+
GetAllTablesPublicationRelations(bool pubviaroot)
379353
{
380354
Relation classRel;
381355
ScanKeyData key[1];
@@ -397,12 +371,35 @@ GetAllTablesPublicationRelations(void)
397371
Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
398372
Oid relid = relForm->oid;
399373

400-
if (is_publishable_class(relid, relForm))
374+
if (is_publishable_class(relid, relForm) &&
375+
!(relForm->relispartition && pubviaroot))
401376
result = lappend_oid(result, relid);
402377
}
403378

404379
table_endscan(scan);
405-
table_close(classRel, AccessShareLock);
380+
381+
if (pubviaroot)
382+
{
383+
ScanKeyInit(&key[0],
384+
Anum_pg_class_relkind,
385+
BTEqualStrategyNumber, F_CHAREQ,
386+
CharGetDatum(RELKIND_PARTITIONED_TABLE));
387+
388+
scan = table_beginscan_catalog(classRel, 1, key);
389+
390+
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
391+
{
392+
Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
393+
Oid relid = relForm->oid;
394+
395+
if (is_publishable_class(relid, relForm) &&
396+
!relForm->relispartition)
397+
result = lappend_oid(result, relid);
398+
}
399+
400+
table_endscan(scan);
401+
table_close(classRel, AccessShareLock);
402+
}
406403

407404
return result;
408405
}
@@ -433,6 +430,7 @@ GetPublication(Oid pubid)
433430
pub->pubactions.pubupdate = pubform->pubupdate;
434431
pub->pubactions.pubdelete = pubform->pubdelete;
435432
pub->pubactions.pubtruncate = pubform->pubtruncate;
433+
pub->pubviaroot = pubform->pubviaroot;
436434

437435
ReleaseSysCache(tup);
438436

@@ -533,9 +531,11 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
533531
* need those.
534532
*/
535533
if (publication->alltables)
536-
tables = GetAllTablesPublicationRelations();
534+
tables = GetAllTablesPublicationRelations(publication->pubviaroot);
537535
else
538536
tables = GetPublicationRelations(publication->oid,
537+
publication->pubviaroot ?
538+
PUBLICATION_PART_ROOT :
539539
PUBLICATION_PART_LEAF);
540540
funcctx->user_fctx = (void *) tables;
541541

Diff for: src/backend/commands/publicationcmds.c

+56-39
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "catalog/namespace.h"
2424
#include "catalog/objectaccess.h"
2525
#include "catalog/objectaddress.h"
26+
#include "catalog/partition.h"
2627
#include "catalog/pg_inherits.h"
2728
#include "catalog/pg_publication.h"
2829
#include "catalog/pg_publication_rel.h"
@@ -56,20 +57,21 @@ static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
5657
static void
5758
parse_publication_options(List *options,
5859
bool *publish_given,
59-
bool *publish_insert,
60-
bool *publish_update,
61-
bool *publish_delete,
62-
bool *publish_truncate)
60+
PublicationActions *pubactions,
61+
bool *publish_via_partition_root_given,
62+
bool *publish_via_partition_root)
6363
{
6464
ListCell *lc;
6565

6666
*publish_given = false;
67+
*publish_via_partition_root_given = false;
6768

68-
/* Defaults are true */
69-
*publish_insert = true;
70-
*publish_update = true;
71-
*publish_delete = true;
72-
*publish_truncate = true;
69+
/* defaults */
70+
pubactions->pubinsert = true;
71+
pubactions->pubupdate = true;
72+
pubactions->pubdelete = true;
73+
pubactions->pubtruncate = true;
74+
*publish_via_partition_root = false;
7375

7476
/* Parse options */
7577
foreach(lc, options)
@@ -91,10 +93,10 @@ parse_publication_options(List *options,
9193
* If publish option was given only the explicitly listed actions
9294
* should be published.
9395
*/
94-
*publish_insert = false;
95-
*publish_update = false;
96-
*publish_delete = false;
97-
*publish_truncate = false;
96+
pubactions->pubinsert = false;
97+
pubactions->pubupdate = false;
98+
pubactions->pubdelete = false;
99+
pubactions->pubtruncate = false;
98100

99101
*publish_given = true;
100102
publish = defGetString(defel);
@@ -110,19 +112,28 @@ parse_publication_options(List *options,
110112
char *publish_opt = (char *) lfirst(lc);
111113

112114
if (strcmp(publish_opt, "insert") == 0)
113-
*publish_insert = true;
115+
pubactions->pubinsert = true;
114116
else if (strcmp(publish_opt, "update") == 0)
115-
*publish_update = true;
117+
pubactions->pubupdate = true;
116118
else if (strcmp(publish_opt, "delete") == 0)
117-
*publish_delete = true;
119+
pubactions->pubdelete = true;
118120
else if (strcmp(publish_opt, "truncate") == 0)
119-
*publish_truncate = true;
121+
pubactions->pubtruncate = true;
120122
else
121123
ereport(ERROR,
122124
(errcode(ERRCODE_SYNTAX_ERROR),
123125
errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt)));
124126
}
125127
}
128+
else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
129+
{
130+
if (*publish_via_partition_root_given)
131+
ereport(ERROR,
132+
(errcode(ERRCODE_SYNTAX_ERROR),
133+
errmsg("conflicting or redundant options")));
134+
*publish_via_partition_root_given = true;
135+
*publish_via_partition_root = defGetBoolean(defel);
136+
}
126137
else
127138
ereport(ERROR,
128139
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -143,10 +154,9 @@ CreatePublication(CreatePublicationStmt *stmt)
143154
Datum values[Natts_pg_publication];
144155
HeapTuple tup;
145156
bool publish_given;
146-
bool publish_insert;
147-
bool publish_update;
148-
bool publish_delete;
149-
bool publish_truncate;
157+
PublicationActions pubactions;
158+
bool publish_via_partition_root_given;
159+
bool publish_via_partition_root;
150160
AclResult aclresult;
151161

152162
/* must have CREATE privilege on database */
@@ -183,23 +193,25 @@ CreatePublication(CreatePublicationStmt *stmt)
183193
values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
184194

185195
parse_publication_options(stmt->options,
186-
&publish_given, &publish_insert,
187-
&publish_update, &publish_delete,
188-
&publish_truncate);
196+
&publish_given, &pubactions,
197+
&publish_via_partition_root_given,
198+
&publish_via_partition_root);
189199

190200
puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
191201
Anum_pg_publication_oid);
192202
values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
193203
values[Anum_pg_publication_puballtables - 1] =
194204
BoolGetDatum(stmt->for_all_tables);
195205
values[Anum_pg_publication_pubinsert - 1] =
196-
BoolGetDatum(publish_insert);
206+
BoolGetDatum(pubactions.pubinsert);
197207
values[Anum_pg_publication_pubupdate - 1] =
198-
BoolGetDatum(publish_update);
208+
BoolGetDatum(pubactions.pubupdate);
199209
values[Anum_pg_publication_pubdelete - 1] =
200-
BoolGetDatum(publish_delete);
210+
BoolGetDatum(pubactions.pubdelete);
201211
values[Anum_pg_publication_pubtruncate - 1] =
202-
BoolGetDatum(publish_truncate);
212+
BoolGetDatum(pubactions.pubtruncate);
213+
values[Anum_pg_publication_pubviaroot - 1] =
214+
BoolGetDatum(publish_via_partition_root);
203215

204216
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
205217

@@ -251,17 +263,16 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
251263
bool replaces[Natts_pg_publication];
252264
Datum values[Natts_pg_publication];
253265
bool publish_given;
254-
bool publish_insert;
255-
bool publish_update;
256-
bool publish_delete;
257-
bool publish_truncate;
266+
PublicationActions pubactions;
267+
bool publish_via_partition_root_given;
268+
bool publish_via_partition_root;
258269
ObjectAddress obj;
259270
Form_pg_publication pubform;
260271

261272
parse_publication_options(stmt->options,
262-
&publish_given, &publish_insert,
263-
&publish_update, &publish_delete,
264-
&publish_truncate);
273+
&publish_given, &pubactions,
274+
&publish_via_partition_root_given,
275+
&publish_via_partition_root);
265276

266277
/* Everything ok, form a new tuple. */
267278
memset(values, 0, sizeof(values));
@@ -270,19 +281,25 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
270281

271282
if (publish_given)
272283
{
273-
values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(publish_insert);
284+
values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
274285
replaces[Anum_pg_publication_pubinsert - 1] = true;
275286

276-
values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(publish_update);
287+
values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
277288
replaces[Anum_pg_publication_pubupdate - 1] = true;
278289

279-
values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete);
290+
values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
280291
replaces[Anum_pg_publication_pubdelete - 1] = true;
281292

282-
values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(publish_truncate);
293+
values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
283294
replaces[Anum_pg_publication_pubtruncate - 1] = true;
284295
}
285296

297+
if (publish_via_partition_root_given)
298+
{
299+
values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
300+
replaces[Anum_pg_publication_pubviaroot - 1] = true;
301+
}
302+
286303
tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
287304
replaces);
288305

0 commit comments

Comments
 (0)