Skip to content

Commit 5f8a57c

Browse files
authored
Make data-branch keywords context-aware (#23197)
Make data-branch keywords context-aware to avoid conflict with the user's input SQLs. Approved by: @XuPeng-SH, @iamlinjunhong
1 parent 47ccc48 commit 5f8a57c

File tree

3 files changed

+9961
-9861
lines changed

3 files changed

+9961
-9861
lines changed

pkg/frontend/data_branch.go

Lines changed: 85 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,9 @@ func tryFlushDeletesOrReplace(
577577
ses *Session,
578578
bh BackgroundExec,
579579
tblStuff tableStuff,
580-
newWrapped *batchWithKind,
580+
newKind string,
581+
newValsLen int,
582+
newRowCnt int,
581583
deleteCnt *int,
582584
deletesBuf *bytes.Buffer,
583585
replaceCnt *int,
@@ -611,14 +613,15 @@ func tryFlushDeletesOrReplace(
611613
}
612614

613615
// if wrapped is nil, means force flush
614-
if newWrapped != nil {
615-
newSize := newWrapped.batch.Size()
616-
if newWrapped.kind == diffDelete {
617-
if deletesBuf.Len()+newSize >= maxSqlBatchSize || *deleteCnt >= maxSqlBatchCnt {
616+
if newKind != "" {
617+
if newKind == diffDelete {
618+
if deletesBuf.Len()+newValsLen >= maxSqlBatchSize ||
619+
*deleteCnt+newRowCnt >= maxSqlBatchCnt {
618620
return flushDeletes()
619621
}
620622
} else {
621-
if replaceBuf.Len()+newSize >= maxSqlBatchSize || *replaceCnt >= maxSqlBatchCnt {
623+
if replaceBuf.Len()+newValsLen >= maxSqlBatchSize ||
624+
*replaceCnt+newRowCnt >= maxSqlBatchCnt {
622625
return flushReplace()
623626
}
624627
}
@@ -658,11 +661,13 @@ func mergeDiffs(
658661
replaceIntoVals = acquireBuffer()
659662
deleteFromVals = acquireBuffer()
660663
firstErr error
664+
tmpValsBuffer = acquireBuffer()
661665
)
662666

663667
defer func() {
664668
releaseBuffer(replaceIntoVals)
665669
releaseBuffer(deleteFromVals)
670+
releaseBuffer(tmpValsBuffer)
666671
}()
667672

668673
defer func() {
@@ -682,18 +687,36 @@ func mergeDiffs(
682687
continue
683688
}
684689

685-
if err = tryFlushDeletesOrReplace(
686-
ctx, ses, bh, tblStuff, &wrapped,
687-
&deleteCnt, deleteFromVals, &replaceCnt, replaceIntoVals, nil,
690+
tmpValsBuffer.Reset()
691+
692+
if err = constructValsFromBatch(
693+
ctx, ses, tblStuff, wrapped, tmpValsBuffer, tmpValsBuffer,
688694
); err != nil {
689695
firstErr = err
690696
cancel()
691697
tblStuff.retPool.releaseRetBatch(wrapped.batch, false)
692698
continue
693699
}
694700

695-
if err = constructValsFromBatch(
696-
ctx, ses, tblStuff, wrapped, deleteFromVals, replaceIntoVals,
701+
if tmpValsBuffer.Len() == 0 {
702+
tblStuff.retPool.releaseRetBatch(wrapped.batch, false)
703+
continue
704+
}
705+
706+
newRowCnt := wrapped.batch.RowCount()
707+
newValsLen := tmpValsBuffer.Len()
708+
if wrapped.kind == diffDelete {
709+
if deleteFromVals.Len() > 0 {
710+
newValsLen++
711+
}
712+
} else {
713+
if replaceIntoVals.Len() > 0 {
714+
newValsLen++
715+
}
716+
}
717+
if err = tryFlushDeletesOrReplace(
718+
ctx, ses, bh, tblStuff, wrapped.kind, newValsLen, newRowCnt,
719+
&deleteCnt, deleteFromVals, &replaceCnt, replaceIntoVals, nil,
697720
); err != nil {
698721
firstErr = err
699722
cancel()
@@ -702,17 +725,25 @@ func mergeDiffs(
702725
}
703726

704727
if wrapped.kind == diffDelete {
705-
deleteCnt += wrapped.batch.RowCount()
728+
if deleteFromVals.Len() > 0 {
729+
deleteFromVals.WriteString(",")
730+
}
731+
deleteFromVals.Write(tmpValsBuffer.Bytes())
732+
deleteCnt += newRowCnt
706733
} else {
707-
replaceCnt += wrapped.batch.RowCount()
734+
if replaceIntoVals.Len() > 0 {
735+
replaceIntoVals.WriteString(",")
736+
}
737+
replaceIntoVals.Write(tmpValsBuffer.Bytes())
738+
replaceCnt += newRowCnt
708739
}
709740

710741
tblStuff.retPool.releaseRetBatch(wrapped.batch, false)
711742
}
712743

713744
if err = tryFlushDeletesOrReplace(
714-
ctx, ses, bh, tblStuff, nil,
715-
&deleteCnt, deleteFromVals, &replaceCnt, replaceIntoVals, nil,
745+
ctx, ses, bh, tblStuff, "",
746+
0, 0, &deleteCnt, deleteFromVals, &replaceCnt, replaceIntoVals, nil,
716747
); err != nil {
717748
if firstErr == nil {
718749
firstErr = err
@@ -836,6 +867,7 @@ func satisfyDiffOutputOpt(
836867

837868
deleteFromValsBuffer = acquireBuffer()
838869
replaceIntoValsBuffer = acquireBuffer()
870+
tmpValsBuffer = acquireBuffer()
839871

840872
fileHint string
841873
fullFilePath string
@@ -854,6 +886,7 @@ func satisfyDiffOutputOpt(
854886
}
855887
releaseBuffer(deleteFromValsBuffer)
856888
releaseBuffer(replaceIntoValsBuffer)
889+
releaseBuffer(tmpValsBuffer)
857890
}()
858891

859892
if fullFilePath, fileHint, writeFile, release, cleanup, err = prepareFSForDiffAsFile(
@@ -881,29 +914,56 @@ func satisfyDiffOutputOpt(
881914
}
882915

883916
if wrapped.name == tblStuff.tarRel.GetTableName() {
884-
if err = tryFlushDeletesOrReplace(
885-
ctx, ses, bh, tblStuff, &wrapped,
886-
&deleteCnt, deleteFromValsBuffer, &replaceCnt, replaceIntoValsBuffer,
887-
writeFile,
917+
tmpValsBuffer.Reset()
918+
919+
if err = constructValsFromBatch(
920+
ctx, ses, tblStuff, wrapped, tmpValsBuffer, tmpValsBuffer,
888921
); err != nil {
889922
first = err
890923
cancel()
891924
tblStuff.retPool.releaseRetBatch(wrapped.batch, false)
892925
continue
893926
}
894927

895-
if err = constructValsFromBatch(
896-
ctx, ses, tblStuff, wrapped, deleteFromValsBuffer, replaceIntoValsBuffer,
928+
if tmpValsBuffer.Len() == 0 {
929+
tblStuff.retPool.releaseRetBatch(wrapped.batch, false)
930+
continue
931+
}
932+
933+
newRowCnt := wrapped.batch.RowCount()
934+
newValsLen := tmpValsBuffer.Len()
935+
if wrapped.kind == diffDelete {
936+
if deleteFromValsBuffer.Len() > 0 {
937+
newValsLen++
938+
}
939+
} else {
940+
if replaceIntoValsBuffer.Len() > 0 {
941+
newValsLen++
942+
}
943+
}
944+
if err = tryFlushDeletesOrReplace(
945+
ctx, ses, bh, tblStuff, wrapped.kind, newValsLen, newRowCnt,
946+
&deleteCnt, deleteFromValsBuffer, &replaceCnt, replaceIntoValsBuffer,
947+
writeFile,
897948
); err != nil {
898949
first = err
899950
cancel()
900951
tblStuff.retPool.releaseRetBatch(wrapped.batch, false)
901952
continue
902953
}
954+
903955
if wrapped.kind == diffDelete {
904-
deleteCnt += int(wrapped.batch.RowCount())
956+
if deleteFromValsBuffer.Len() > 0 {
957+
deleteFromValsBuffer.WriteString(",")
958+
}
959+
deleteFromValsBuffer.Write(tmpValsBuffer.Bytes())
960+
deleteCnt += newRowCnt
905961
} else {
906-
replaceCnt += int(wrapped.batch.RowCount())
962+
if replaceIntoValsBuffer.Len() > 0 {
963+
replaceIntoValsBuffer.WriteString(",")
964+
}
965+
replaceIntoValsBuffer.Write(tmpValsBuffer.Bytes())
966+
replaceCnt += newRowCnt
907967
}
908968
}
909969

@@ -915,8 +975,8 @@ func satisfyDiffOutputOpt(
915975
}
916976

917977
if err = tryFlushDeletesOrReplace(
918-
ctx, ses, bh, tblStuff, nil,
919-
&deleteCnt, deleteFromValsBuffer, &replaceCnt, replaceIntoValsBuffer,
978+
ctx, ses, bh, tblStuff, "",
979+
0, 0, &deleteCnt, deleteFromValsBuffer, &replaceCnt, replaceIntoValsBuffer,
920980
writeFile,
921981
); err != nil {
922982
first = err

0 commit comments

Comments
 (0)