Skip to content

Some optimizations and fixes #102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 37 additions & 28 deletions mysql_ch_replicator/clickhouse_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,46 +179,55 @@ def create_table(self, structure: TableStructure, additional_indexes: list | Non
def insert(self, table_name, records, table_structure: TableStructure = None):
current_version = self.get_last_used_version(table_name) + 1

records_to_insert = []
for record in records:
new_record = []
for i, e in enumerate(record):
if not records:
return

if not isinstance(records, list):
records = list(records)

process_indices = []
if table_structure:
for i, field in enumerate(table_structure.fields):
if (("DateTime" in field.field_type or "Date32" in field.field_type) and "Nullable" not in field.field_type):
process_indices.append(i)
else:
first_record = records[0]
for i, value in enumerate(first_record):
if isinstance(value, (datetime.date, datetime.datetime)):
process_indices.append(i)

for j, record in enumerate(records):
if not isinstance(record, list):
record = list(record)
records[j] = record

for i in process_indices:
e = record[i]

if isinstance(e, datetime.date) and not isinstance(e, datetime.datetime):
try:
e = datetime.datetime.combine(e, datetime.time())
record[i] = datetime.datetime.combine(e, datetime.time())
except ValueError:
e = datetime.datetime(1970, 1, 1)
if isinstance(e, datetime.datetime):
record[i] = datetime.datetime(1970, 1, 1)
elif isinstance(e, datetime.datetime):
try:
e.timestamp()
except ValueError:
e = datetime.datetime(1970, 1, 1)
if table_structure is not None:
field: TableField = table_structure.fields[i]
is_datetime = (
('DateTime' in field.field_type) or
('Date32' in field.field_type)
)
if is_datetime and 'Nullable' not in field.field_type:
try:
e.timestamp()
except (ValueError, AttributeError):
e = datetime.datetime(1970, 1, 1)
new_record.append(e)
record = new_record

records_to_insert.append(tuple(record) + (current_version,))
except (ValueError, AttributeError):
record[i] = datetime.datetime(1970, 1, 1)

record.append(current_version)
current_version += 1

full_table_name = f'`table_name`'
if '.' not in full_table_name:
if '.' in table_name:
full_table_name = f'`{table_name}`'
else:
full_table_name = f'`{self.database}`.`{table_name}`'

duration = 0.0
for attempt in range(ClickhouseApi.MAX_RETRIES):
try:
t1 = time.time()
self.client.insert(table=full_table_name, data=records_to_insert)
self.client.insert(table=full_table_name, data=records)
t2 = time.time()
duration += (t2 - t1)
break
Expand All @@ -232,7 +241,7 @@ def insert(self, table_name, records, table_structure: TableStructure = None):
table_name=table_name,
duration=duration,
is_insert=True,
records=len(records_to_insert),
records=len(records),
)

self.set_last_used_version(table_name, current_version)
Expand Down
35 changes: 29 additions & 6 deletions mysql_ch_replicator/db_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from enum import Enum
from dataclasses import dataclass
from collections import defaultdict
from datetime import date

from .config import Settings, MysqlSettings, ClickhouseSettings
from .mysql_api import MySQLApi
Expand Down Expand Up @@ -268,6 +269,30 @@ def perform_initial_replication(self):
self.clickhouse_api.database = self.target_database
logger.info(f'initial replication - done')

def to_date_if_str(self, value):
if not isinstance(value, str):
return value

if len(value) == 10 and value[4] == '-' and value[7] == '-':
try:
year = int(value[0:4])
month = int(value[5:7])
day = int(value[8:10])
return date(year, month, day)
except ValueError:
return value

if len(value) == 12 and value[5] == '-' and value[8] == '-' and ((value[0] == '\'' and value[11] == '\'') or (value[0] == '"' and value[11] == '"')):
try:
year = int(value[1:5])
month = int(value[6:8])
day = int(value[9:11])
return date(year, month, day)
except ValueError:
return value

return value

def perform_initial_replication_table(self, table_name):
logger.info(f'running initial replication for table {table_name}')

Expand Down Expand Up @@ -329,13 +354,11 @@ def perform_initial_replication_table(self, table_name):

if not records:
break

self.clickhouse_api.insert(table_name, records, table_structure=clickhouse_table_structure)
for record in records:
record_primary_key = [record[key_idx] for key_idx in primary_key_ids]
if max_primary_key is None:
max_primary_key = record_primary_key
else:
max_primary_key = max(max_primary_key, record_primary_key)

last_record = records[-1]
max_primary_key = [self.to_date_if_str(last_record[key_idx]) for key_idx in primary_key_ids]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only attempt to convert for datetime or date types, not for arbitrary strings. Please do the following:

  1. Get MySQL field types for primary keys (add it after line 300):

    field_types_mysql = [field.field_type for field in mysql_table_structure.fields]
    primary_key_types_mysql = [field_types[key_idx] for key_idx in primary_key_ids]
    
  2. Check if the type is datetime and only in this case try to convert it to dates.


self.state.initial_replication_max_primary_key = max_primary_key
self.save_state_if_required()
Expand Down
26 changes: 22 additions & 4 deletions mysql_ch_replicator/mysql_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,30 @@ def get_table_create_statement(self, table_name) -> str:

def get_records(self, table_name, order_by, limit, start_value=None):
self.reconnect_if_required()
order_by = ','.join(order_by)
order_by_str = ','.join(order_by)
where = ''

if start_value is not None:
start_value = ','.join(map(str, start_value))
where = f'WHERE ({order_by}) > ({start_value}) '
query = f'SELECT * FROM `{table_name}` {where}ORDER BY {order_by} LIMIT {limit}'
or_clauses = []

for i in range(len(order_by)):
eq_parts = []

for j in range(i):
eq_parts.append(f"{order_by[j]} = {start_value[j]}")

gt_part = f"{order_by[i]} > {start_value[i]}"

if eq_parts:
clause = f"({' AND '.join(eq_parts)} AND {gt_part})"
else:
clause = f"({gt_part})"

or_clauses.append(clause)
where = f"WHERE {' OR '.join(or_clauses)} "

query = f'SELECT * FROM `{table_name}` {where}ORDER BY {order_by_str} LIMIT {limit}'

self.cursor.execute(query)
res = self.cursor.fetchall()
records = [x for x in res]
Expand Down
Loading