Skip to content

Commit

Permalink
Prepare for the release 0.4.0
Browse files Browse the repository at this point in the history
Major changes:

- Add doc for SQLiteAckQueue
- README.rst structure update
- other minor changes
  • Loading branch information
peter-wangxu committed Jun 17, 2018
1 parent f6c2053 commit d228cd4
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 18 deletions.
36 changes: 36 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,29 @@ This queue does not allow duplicate items.
2
>>>
Example usage of SQLite3 based ``SQLiteAckQueue``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The core functions:
``get``: get from queue and mark item as unack
``ack``: mark item as acked
``nack``: there might be something wrong with current consumer, so mark item as ready and new consumer will get it
``ack_failed``: there might be something wrong during process, so just mark item as failed.

.. code-block:: python
>>> import persisitqueue
>>> ackq = persistqueue.SQLiteAckQueue('path')
>>> ackq.put('str1')
>>> item = ackq.get()
>>> # Do something with the item
>>> ackq.ack(item) # If done with the item
>>> ackq.nack(item) # Else mark item as `nack` so that it can be proceeded again by any worker
>>> ackq.ack_failed() # Or else mark item as `ack_failed` to discard this item
Note: this queue does not support ``auto_commit=True``

Example usage with a file based queue
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down Expand Up @@ -317,6 +340,8 @@ That's saying, the data in ``persistqueue.Queue`` could be in unreadable state w

**DO NOT put any critical data on persistqueue.queue on Windows**.

This issue is under track by issue `Atomic renames on windows <https://github.com/peter-wangxu/persist-queue/issues/48>`_

Contribution
------------

Expand All @@ -329,6 +354,11 @@ License

`BSD <LICENSE>`_

Contributors
------------

`Contributors <https://github.com/peter-wangxu/persist-queue/graphs/contributors>`_

FAQ
---

Expand All @@ -339,3 +369,9 @@ SQLite database is locked until that transaction is committed. The ``timeout``
parameter specifies how long the connection should wait for the lock to go away
until raising an exception. Default time is **10**, increase ``timeout``
when creating the queue if above error occurs.

* sqlite3 based queues are not thread-safe.

The sqlite3 queues are heavily tested under multi-threading environment, if you find it's not thread-safe, please
make sure you set the ``multithreading=True`` when initializing the queue before submitting new issue:).

7 changes: 4 additions & 3 deletions persistqueue/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# coding=utf-8
__author__ = 'Peter Wang'
__license__ = 'BSD'
__version__ = '0.3.6'
__version__ = '0.4.0'

from .exceptions import Empty, Full # noqa
from .pdict import PDict # noqa
from .queue import Queue # noqa
from .sqlqueue import SQLiteQueue, FIFOSQLiteQueue, FILOSQLiteQueue, UniqueQ # noqa
from .sqlackqueue import SQLiteAckQueue

__all__ = ["Queue", "SQLiteQueue", "FIFOSQLiteQueue", "FILOSQLiteQueue",
"UniqueQ", "PDict", "Empty", "Full", "__author__", "__license__",
"__version__"]
"UniqueQ", "PDict", "SQLiteAckQueue", "Empty", "Full",
"__author__", "__license__", "__version__"]
30 changes: 15 additions & 15 deletions persistqueue/sqlackqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
TICK_FOR_WAIT = 10


class ACK_STATUS:
class AckStatus(object):
inited = '0'
ready = '1'
unack = '2'
Expand All @@ -40,17 +40,17 @@ class SQLiteAckQueue(sqlbase.SQLiteBase):
'data BLOB, timestamp FLOAT, status INTEGER)')
# SQL to insert a record
_SQL_INSERT = 'INSERT INTO {table_name} (data, timestamp, status)'\
' VALUES (?, ?, %s)' % ACK_STATUS.inited
' VALUES (?, ?, %s)' % AckStatus.inited
# SQL to select a record
_SQL_SELECT = ('SELECT {key_column}, data, status FROM {table_name} '
'WHERE status < %s '
'ORDER BY {key_column} ASC LIMIT 1' % ACK_STATUS.unack)
'ORDER BY {key_column} ASC LIMIT 1' % AckStatus.unack)
_SQL_MARK_ACK_UPDATE = 'UPDATE {table_name} SET status = ?'\
' WHERE {key_column} = ?'
_SQL_SELECT_WHERE = 'SELECT {key_column}, data FROM {table_name}'\
' WHERE status < %s AND' \
' {column} {op} ? ORDER BY {key_column} ASC'\
' LIMIT 1 ' % ACK_STATUS.unack
' LIMIT 1 ' % AckStatus.unack

def __init__(self, *args, **kwargs):
super(SQLiteAckQueue, self).__init__(*args, **kwargs)
Expand All @@ -75,7 +75,7 @@ def _count(self):
sql = 'SELECT COUNT({}) FROM {}'\
' WHERE status < ?'.format(self._key_column,
self._table_name)
row = self._getter.execute(sql, (ACK_STATUS.unack, )).fetchone()
row = self._getter.execute(sql, (AckStatus.unack,)).fetchone()
return row[0] if row else 0

def _ack_count_via_status(self, status):
Expand All @@ -86,16 +86,16 @@ def _ack_count_via_status(self, status):
return row[0] if row else 0

def unack_count(self):
return self._ack_count_via_status(ACK_STATUS.unack)
return self._ack_count_via_status(AckStatus.unack)

def acked_count(self):
return self._ack_count_via_status(ACK_STATUS.acked)
return self._ack_count_via_status(AckStatus.acked)

def ready_count(self):
return self._ack_count_via_status(ACK_STATUS.ready)
return self._ack_count_via_status(AckStatus.ready)

def ack_failed_count(self):
return self._ack_count_via_status(ACK_STATUS.ack_failed)
return self._ack_count_via_status(AckStatus.ack_failed)

@sqlbase.with_conditional_transaction
def _mark_ack_status(self, key, status):
Expand All @@ -111,7 +111,7 @@ def clear_acked_data(self):
)""".format(table_name=self._table_name,
key_column=self._key_column,
max_acked_length=self._MAX_ACKED_LENGTH)
return sql, ACK_STATUS.acked
return sql, AckStatus.acked

@property
def _sql_mark_ack_status(self):
Expand All @@ -124,7 +124,7 @@ def _pop(self):
# Perhaps a sqlite3 bug, sometimes (None, None) is returned
# by select, below can avoid these invalid records.
if row and row[0] is not None:
self._mark_ack_status(row[0], ACK_STATUS.unack)
self._mark_ack_status(row[0], AckStatus.unack)
pickled_data = row[1] # pickled data
item = pickle.loads(pickled_data)
self._unack_cache[row[0]] = item
Expand All @@ -144,23 +144,23 @@ def ack(self, item):
_id = self._find_item_id(item)
if _id is None:
return
self._mark_ack_status(_id, ACK_STATUS.acked)
self._mark_ack_status(_id, AckStatus.acked)
self._unack_cache.pop(_id)

def ack_failed(self, item):
with self.action_lock:
_id = self._find_item_id(item)
if _id is None:
return
self._mark_ack_status(_id, ACK_STATUS.ack_failed)
self._mark_ack_status(_id, AckStatus.ack_failed)
self._unack_cache.pop(_id)

def nack(self, item):
with self.action_lock:
_id = self._find_item_id(item)
if _id is None:
return
self._mark_ack_status(_id, ACK_STATUS.ready)
self._mark_ack_status(_id, AckStatus.ready)
self._unack_cache.pop(_id)
self.total += 1

Expand Down Expand Up @@ -219,7 +219,7 @@ class FILOSQLiteAckQueue(SQLiteAckQueue):
# SQL to select a record
_SQL_SELECT = ('SELECT {key_column}, data FROM {table_name} '
'WHERE status < %s '
'ORDER BY {key_column} DESC LIMIT 1' % ACK_STATUS.unack)
'ORDER BY {key_column} DESC LIMIT 1' % AckStatus.unack)


class UniqueAckQ(SQLiteAckQueue):
Expand Down

0 comments on commit d228cd4

Please sign in to comment.