Skip to content

Commit

Permalink
Performance optimization for sqlite3 based queue (#21)
Browse files Browse the repository at this point in the history
Previously, each put/get is done within transaction. Since sqlite3
can only perform few dozen transaction per second, it's practical to avoid
transaction for each put/get.

In this patch, sqlite3 based queues accept the
*auto_commit=[True|False]* when initialization, when auto_commit=False is
set user need to perform *queue.task_done()* after a bulk of putting/getting
to commit all the changes made.

``auto_commit=True`` keeps the previous behavior.
  • Loading branch information
peter-wangxu authored Sep 23, 2017
1 parent d47d564 commit 10b8fa0
Show file tree
Hide file tree
Showing 8 changed files with 309 additions and 32 deletions.
45 changes: 41 additions & 4 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ While *queuelib* and *python-pqueue* cannot fulfil all of above. After some try,
implementation without huge code change. this is the motivation to start this project.

*persist-queue* use *pickle* object serialization module to support object instances.
To support customized objects, please refer to `Pickling and unpickling extension types(Python2) <https://docs.python.org/2/library/pickle.html#pickling-and-unpickling-normal-class-instances>`_
Most built-in type, like `int`, `dict`, `list` are able to be persisted by `persist-queue` directly, to support customized objects,
please refer to `Pickling and unpickling extension types(Python2) <https://docs.python.org/2/library/pickle.html#pickling-and-unpickling-normal-class-instances>`_
and `Pickling Class Instances(Python3) <https://docs.python.org/3/library/pickle.html#pickling-class-instances>`_

Requirements
Expand Down Expand Up @@ -56,6 +57,34 @@ from source code
python setup.py install
Benchmark
---------

Here is the result for writing/reading **10000** items to the disk comparing the sqlite3 and file queue.

Environment:
- OS: Windows 10
- Disk: SATA3 SSD
- RAM: 16 GiB

+---------+-----------------------+----------------+----------------------------+---------------------+
| | Transaction write (s) | Bulk write (s) | Transaction write/read (s) | Bulk write/read (s) |
+---------+-----------------------+----------------+----------------------------+---------------------+
| SQLite3 | 64.98 | 0.19 | 142.82 | 63.82 |
+---------+-----------------------+----------------+----------------------------+---------------------+
| File | 89.68 | 85.78 | 101.37 | 85.76 |
+---------+-----------------------+----------------+----------------------------+---------------------+

- **Transaction** refers to commit the change to disk on every write.
- **Bulk** refers to only commit the change to disk on last write.

To see the real performance on your host, run the script under `benchmark/run_benchmark.py`:

.. code-block:: console
python benchmark/run_benchmark.py
Examples
--------

Expand All @@ -66,7 +95,7 @@ Example usage with a SQLite3 based queue
.. code-block:: python
>>> import persistqueue
>>> q = persistqueue.SQLiteQueue('mypath')
>>> q = persistqueue.SQLiteQueue('mypath', auto_commit=True)
>>> q.put('str1')
>>> q.put('str2')
>>> q.put('str3')
Expand All @@ -80,7 +109,7 @@ Close the console, and then recreate the queue:
.. code-block:: python
>>> import persistqueue
>>> q = persistqueue.SQLiteQueue('mypath')
>>> q = persistqueue.SQLiteQueue('mypath', auto_commit=True)
>>> q.get()
'str2'
>>>
Expand Down Expand Up @@ -129,7 +158,7 @@ Example usage with a SQLite3 based dict
>>> q['key1']
123
>>> len(q)
3
2
>>> del q['key1']
>>> q['key1']
Traceback (most recent call last):
Expand Down Expand Up @@ -199,6 +228,14 @@ multi-thread usage for **Queue**
Performance impact
------------------

Since persistqueue v0.3.0, a new parameter ``auto_commit`` is introduced to tweak
the performance for sqlite3 based queues as needed. When specify ``auto_commit=False``, user
needs to perform ``queue.task_done()`` to persist the changes made to the disk since
last ``task_done`` invocation.

Tests
-----

Expand Down
153 changes: 153 additions & 0 deletions benchmark/run_benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
"""This file provides tests to benchmark performance sqlite/file queue
on specific hardware. User can easily evaluate the performance by running this
file directly via `python run_benchmark.py`
"""
from persistqueue import SQLiteQueue
from persistqueue import Queue
import tempfile
import time

BENCHMARK_COUNT = 100


def time_it(func):
def _exec(*args, **kwargs):
start = time.time()
func(*args, **kwargs)
end = time.time()
print(
"\t{} => time used: {:.4f} seconds.".format(
func.__doc__,
(end - start)))

return _exec


class TransactionBench(object):
"""Benchmark transaction write/read."""

def __init__(self, prefix=None):
self.path = prefix

@time_it
def benchmark_sqlite_write_10000(self):
"""Benchmark sqlite queue by writing <BENCHMARK_COUNT> items."""

self.path = tempfile.mkdtemp('b_sql_10000')
q = SQLiteQueue(self.path, auto_commit=True)
for i in range(BENCHMARK_COUNT):
q.put('bench%d' % i)

@time_it
def benchmark_sqlite_wr_10000(self):
"""Benchmark sqlite queue by writing and reading <BENCHMARK_COUNT> items."""
self.path = tempfile.mkdtemp('b_sql_10000')
q = SQLiteQueue(self.path, auto_commit=True)
for i in range(BENCHMARK_COUNT):
q.put('bench%d' % i)

for i in range(BENCHMARK_COUNT):
q.get()

@time_it
def benchmark_file_write_10000(self):
"""Benchmark file queue by writing <BENCHMARK_COUNT> items."""

self.path = tempfile.mkdtemp('b_file_10000')
q = Queue(self.path)
for i in range(BENCHMARK_COUNT):
q.put('bench%d' % i)
q.task_done()

@time_it
def benchmark_file_wr_10000(self):
"""Benchmark file queue by writing and reading <BENCHMARK_COUNT> items."""

self.path = tempfile.mkdtemp('b_file_10000')
q = Queue(self.path)
for i in range(BENCHMARK_COUNT):
q.put('bench%d' % i)
q.task_done()

for i in range(BENCHMARK_COUNT):
q.get()

@classmethod
def run(cls):
print(cls.__doc__)
ins = cls()
for name in sorted(cls.__dict__):
if name.startswith('benchmark'):
func = getattr(ins, name)
func()


class BulkBench(object):
"""Benchmark bulk write/read."""

@time_it
def benchmark_sqlite_write_10000(self):
"""Benchmark sqlite queue by writing <BENCHMARK_COUNT> items."""

self.path = tempfile.mkdtemp('b_sql_10000')
q = SQLiteQueue(self.path, auto_commit=False)
for i in range(BENCHMARK_COUNT):
q.put('bench%d' % i)
q.task_done()

@time_it
def benchmark_sqlite_wr_10000(self):
"""Benchmark sqlite queue by writing and reading <BENCHMARK_COUNT> items."""
self.path = tempfile.mkdtemp('b_sql_10000')
q = SQLiteQueue(self.path, auto_commit=False)
for i in range(BENCHMARK_COUNT):
q.put('bench%d' % i)
q.task_done()

for i in range(BENCHMARK_COUNT):
q.get()

@time_it
def benchmark_file_write_10000(self):
"""Benchmark file queue by writing <BENCHMARK_COUNT> items."""

self.path = tempfile.mkdtemp('b_file_10000')
q = Queue(self.path)
for i in range(BENCHMARK_COUNT):
q.put('bench%d' % i)
q.task_done()

@time_it
def benchmark_file_wr_10000(self):
"""Benchmark file queue by writing and reading <BENCHMARK_COUNT> items."""

self.path = tempfile.mkdtemp('b_file_10000')
q = Queue(self.path)
for i in range(BENCHMARK_COUNT):
q.put('bench%d' % i)
q.task_done()

for i in range(BENCHMARK_COUNT):
q.get()

@classmethod
def run(cls):
print(cls.__doc__)
ins = cls()
for name in sorted(cls.__dict__):

if name.startswith('benchmark'):
func = getattr(ins, name)
func()


if __name__ == '__main__':
import sys

if len(sys.argv) > 1:
BENCHMARK_COUNT = int(sys.argv[1])
print("<BENCHMARK_COUNT> = {}".format(BENCHMARK_COUNT))
transaction = TransactionBench()
transaction.run()
bulk = BulkBench()
bulk.run()
2 changes: 1 addition & 1 deletion persistqueue/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# coding=utf-8
__author__ = 'Peter Wang'
__license__ = 'BSD License'
__version__ = '0.2.3'
__version__ = '0.3.0'

from .exceptions import Empty, Full # noqa
from .pdict import PDict # noqa
Expand Down
4 changes: 3 additions & 1 deletion persistqueue/pdict.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ class PDict(sqlbase.SQLiteBase, dict):
_SQL_UPDATE = 'UPDATE {table_name} SET data = ? WHERE {key_column} = ?'

def __init__(self, path, name, multithreading=False):
# PDict is always auto_commit=True
super(PDict, self).__init__(path, name=name,
multithreading=multithreading)
multithreading=multithreading,
auto_commit=True)

def __iter__(self):
raise NotImplementedError('Not supported.')
Expand Down
2 changes: 1 addition & 1 deletion persistqueue/queue.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# coding=utf-8

"""A single process, persistent multi-producer, multi-consumer queue."""
"""A thread-safe disk based persistent queue in Python."""

import logging
import os
Expand Down
48 changes: 38 additions & 10 deletions persistqueue/sqlbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,39 @@

sqlite3.enable_callback_tracebacks(True)


log = logging.getLogger(__name__)


def with_transaction(func):
def with_conditional_transaction(func):
def _execute(obj, *args, **kwargs):
with obj.tran_lock:
with obj._putter as tran:
if obj.auto_commit:
with obj._putter as tran:
stat, param = func(obj, *args, **kwargs)
tran.execute(stat, param)
else:
stat, param = func(obj, *args, **kwargs)
tran.execute(stat, param)
obj._putter.execute(stat, param)
# commit_ignore_error(obj._putter)

return _execute


def commit_ignore_error(conn):
"""Ignore the error of no transaction is active.
The transaction may be already committed by user's task_done call.
It's safe to to ignore all errors of this kind.
"""
try:
conn.commit()
except sqlite3.OperationalError as ex:
if 'no transaction is active' in str(ex):
log.warning(
'Not able to commit the transaction, '
'may already be committed.')


class SQLiteBase(object):
"""SQLite3 base class."""

Expand All @@ -31,17 +50,22 @@ class SQLiteBase(object):
_MEMORY = ':memory:' # flag indicating store DB in memory

def __init__(self, path, name='default', multithreading=False,
timeout=10.0):
timeout=10.0, auto_commit=False):
"""Initiate a queue in sqlite3 or memory.
:param path: path for storing DB file
:param path: path for storing DB file.
:param multithreading: if set to True, two db connections will be,
one for **put** and one for **get**
one for **put** and one for **get**.
:param timeout: timeout in second waiting for the database lock.
:param auto_commit: Set to True, if commit is required on every
INSERT/UPDATE action.
"""
self.path = path
self.name = name
self.timeout = timeout
self.multithreading = multithreading
self.auto_commit = auto_commit

self._init()

Expand Down Expand Up @@ -73,16 +97,16 @@ def _new_db_connection(self, path, multithreading, timeout):
timeout=timeout,
check_same_thread=not multithreading)

@with_transaction
@with_conditional_transaction
def _insert_into(self, *record):
return self._sql_insert, record

@with_transaction
@with_conditional_transaction
def _update(self, key, *args):
args = list(args) + [key]
return self._sql_update, args

@with_transaction
@with_conditional_transaction
def _delete(self, key):
sql = 'DELETE FROM {} WHERE {} = ?'.format(self._table_name,
self._key_column)
Expand All @@ -97,6 +121,10 @@ def _count(self):
row = self._putter.execute(sql).fetchone()
return row[0] if row else 0

def _task_done(self):
"""Only required if auto-commit is set as False."""
commit_ignore_error(self._putter)

@property
def _table_name(self):
return '{}_{}'.format(self._TABLE_NAME, self.name)
Expand Down
Loading

0 comments on commit 10b8fa0

Please sign in to comment.