Skip to content

Commit

Permalink
Merge pull request #170 from juha-ylikoski/master
Browse files Browse the repository at this point in the history
Fix FILOSQLiteAckQueue.get(raw=True) throwing index error.
  • Loading branch information
peter-wangxu authored Aug 5, 2021
2 parents cc974b1 + f36dc81 commit 2a5f422
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 34 deletions.
2 changes: 1 addition & 1 deletion persistqueue/sqlackqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ class FILOSQLiteAckQueue(SQLiteAckQueue):
_TABLE_NAME = 'ack_filo_queue'
# SQL to select a record
_SQL_SELECT = (
'SELECT {key_column}, data FROM {table_name} '
'SELECT {key_column}, data, timestamp, status FROM {table_name} '
'WHERE {key_column} < {rowid} and status < %s '
'ORDER BY {key_column} DESC LIMIT 1' % AckStatus.unack
)
Expand Down
116 changes: 83 additions & 33 deletions persistqueue/tests/test_sqlackqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import tempfile
import unittest
from threading import Thread
import uuid

from persistqueue.sqlackqueue import (
SQLiteAckQueue,
Expand All @@ -19,12 +20,13 @@ class SQLite3AckQueueTest(unittest.TestCase):
def setUp(self):
self.path = tempfile.mkdtemp(suffix='sqlackqueue')
self.auto_commit = True
self.queue_class = SQLiteAckQueue

def tearDown(self):
shutil.rmtree(self.path, ignore_errors=True)

def test_raise_empty(self):
q = SQLiteAckQueue(self.path, auto_commit=self.auto_commit)
q = self.queue_class(self.path, auto_commit=self.auto_commit)

q.put('first')
d = q.get()
Expand All @@ -37,7 +39,7 @@ def test_raise_empty(self):
self.assertRaises(ValueError, q.get, block=True, timeout=-1.0)

def test_empty(self):
q = SQLiteAckQueue(self.path, auto_commit=self.auto_commit)
q = self.queue_class(self.path, auto_commit=self.auto_commit)
self.assertEqual(q.empty(), True)

q.put('first')
Expand All @@ -49,23 +51,23 @@ def test_empty(self):
def test_open_close_single(self):
"""Write 1 item, close, reopen checking if same item is there"""

q = SQLiteAckQueue(self.path, auto_commit=self.auto_commit)
q = self.queue_class(self.path, auto_commit=self.auto_commit)
q.put(b'var1')
del q
q = SQLiteAckQueue(self.path)
q = self.queue_class(self.path)
self.assertEqual(1, q.qsize())
self.assertEqual(b'var1', q.get())

def test_open_close_1000(self):
"""Write 1000 items, close, reopen checking if all items are there"""

q = SQLiteAckQueue(self.path, auto_commit=self.auto_commit)
q = self.queue_class(self.path, auto_commit=self.auto_commit)
for i in range(1000):
q.put('var%d' % i)

self.assertEqual(1000, q.qsize())
del q
q = SQLiteAckQueue(self.path)
q = self.queue_class(self.path)
self.assertEqual(1000, q.qsize())
for i in range(1000):
data = q.get()
Expand All @@ -78,7 +80,7 @@ def test_open_close_1000(self):
def test_random_read_write(self):
"""Test random read/write"""

q = SQLiteAckQueue(self.path, auto_commit=self.auto_commit)
q = self.queue_class(self.path, auto_commit=self.auto_commit)
n = 0
for _ in range(1000):
if random.random() < 0.5:
Expand All @@ -88,15 +90,17 @@ def test_random_read_write(self):
else:
self.assertRaises(Empty, q.get, block=False)
else:
q.put('var%d' % random.getrandbits(16))
# UniqueQueue will block at get() if this is not unique
# uuid.uuid4() should be unique
q.put('var%d' % uuid.uuid4())
n += 1

def test_multi_threaded_parallel(self):
"""Create consumer and producer threads, check parallelism"""

# self.skipTest("Not supported multi-thread.")

m_queue = SQLiteAckQueue(
m_queue = self.queue_class(
path=self.path, multithreading=True, auto_commit=self.auto_commit
)

Expand All @@ -121,7 +125,7 @@ def consumer():

def test_multi_threaded_multi_producer(self):
"""Test sqlqueue can be used by multiple producers."""
queue = SQLiteAckQueue(
queue = self.queue_class(
path=self.path, multithreading=True, auto_commit=self.auto_commit
)

Expand Down Expand Up @@ -150,7 +154,7 @@ def consumer():
def test_multiple_consumers(self):
"""Test sqlqueue can be used by multiple consumers."""

queue = SQLiteAckQueue(
queue = self.queue_class(
path=self.path, multithreading=True, auto_commit=self.auto_commit
)

Expand Down Expand Up @@ -189,19 +193,19 @@ def consumer(index):

def test_protocol_1(self):
shutil.rmtree(self.path, ignore_errors=True)
q = SQLiteAckQueue(path=self.path)
q = self.queue_class(path=self.path)
self.assertEqual(
q._serializer.protocol, 2 if sys.version_info[0] == 2 else 4
)

def test_protocol_2(self):
q = SQLiteAckQueue(path=self.path)
q = self.queue_class(path=self.path)
self.assertEqual(
q._serializer.protocol, 2 if sys.version_info[0] == 2 else 4
)

def test_ack_and_clear(self):
q = SQLiteAckQueue(path=self.path)
q = self.queue_class(path=self.path)
ret_list = []
for _ in range(100):
q.put("val%s" % _)
Expand All @@ -215,7 +219,7 @@ def test_ack_and_clear(self):
q.shrink_disk_usage()

def test_ack_unknown_item(self):
q = SQLiteAckQueue(path=self.path)
q = self.queue_class(path=self.path)
q.put("val1")
val1 = q.get()
q.ack("val2")
Expand All @@ -227,7 +231,7 @@ def test_ack_unknown_item(self):
self.assertEqual(q.unack_count(), 0)

def test_resume_unack(self):
q = SQLiteAckQueue(path=self.path)
q = self.queue_class(path=self.path)
q.put("val1")
val1 = q.get()
self.assertEqual(q.empty(), True)
Expand All @@ -236,7 +240,7 @@ def test_resume_unack(self):
self.assertEqual(q.ready_count(), 0)
del q

q = SQLiteAckQueue(path=self.path, auto_resume=False)
q = self.queue_class(path=self.path, auto_resume=False)
self.assertEqual(q.empty(), True)
self.assertEqual(q.qsize(), 0)
self.assertEqual(q.unack_count(), 1)
Expand All @@ -249,15 +253,15 @@ def test_resume_unack(self):
self.assertEqual(val1, q.get())
del q

q = SQLiteAckQueue(path=self.path, auto_resume=True)
q = self.queue_class(path=self.path, auto_resume=True)
self.assertEqual(q.empty(), False)
self.assertEqual(q.qsize(), 1)
self.assertEqual(q.unack_count(), 0)
self.assertEqual(q.ready_count(), 1)
self.assertEqual(val1, q.get())

def test_ack_unack_ack_failed(self):
q = SQLiteAckQueue(path=self.path)
q = self.queue_class(path=self.path)
q.put("val1")
q.put("val2")
q.put("val3")
Expand Down Expand Up @@ -291,13 +295,13 @@ def test_ack_unack_ack_failed(self):
self.assertEqual(q.ready_count(), 0)

def test_put_0(self):
q = SQLiteAckQueue(path=self.path)
q = self.queue_class(path=self.path)
q.put(0)
d = q.get(block=False)
self.assertIsNotNone(d)

def test_get_id(self):
q = SQLiteAckQueue(path=self.path)
q = self.queue_class(path=self.path)
q.put("val1")
val2_id = q.put("val2")
q.put("val3")
Expand All @@ -308,7 +312,7 @@ def test_get_id(self):
self.assertEqual(item, 'val2')

def test_get_next_in_order(self):
q = SQLiteAckQueue(path=self.path)
q = self.queue_class(path=self.path)
val1_id = q.put("val1")
q.put("val2")
q.put("val3")
Expand All @@ -319,15 +323,15 @@ def test_get_next_in_order(self):
self.assertEqual(item, 'val2')

def test_get_raw(self):
q = SQLiteAckQueue(path=self.path)
q = self.queue_class(path=self.path)
q.put("val1")
item = q.get(raw=True)
# item should get val2
self.assertEqual(True, "pqid" in item)
self.assertEqual(item.get("data"), 'val1')

def test_nack_raw(self):
q = SQLiteAckQueue(path=self.path)
q = self.queue_class(path=self.path)
q.put("val1")
item = q.get(raw=True)
# nack a raw return
Expand All @@ -336,7 +340,7 @@ def test_nack_raw(self):
self.assertEqual(q.qsize(), 1)

def test_ack_active_size(self):
q = SQLiteAckQueue(path=self.path)
q = self.queue_class(path=self.path)
q.put("val1")
item = q.get(raw=True)
# active_size should be 1 as it hasn't been acked
Expand All @@ -346,7 +350,7 @@ def test_ack_active_size(self):
self.assertEqual(q.active_size(), 0)

def test_queue(self):
q = SQLiteAckQueue(path=self.path)
q = self.queue_class(path=self.path)
q.put("val1")
q.put("val2")
q.put("val3")
Expand All @@ -356,7 +360,7 @@ def test_queue(self):
self.assertEqual(d[1].get("data"), "val2")

def test_update(self):
q = SQLiteAckQueue(path=self.path)
q = self.queue_class(path=self.path)
qid = q.put("val1")
q.update(id=qid, item="val2")
item = q.get(id=qid)
Expand All @@ -367,6 +371,7 @@ class SQLite3QueueInMemory(SQLite3AckQueueTest):
def setUp(self):
self.path = ":memory:"
self.auto_commit = True
self.queue_class = SQLiteAckQueue

def test_open_close_1000(self):
self.skipTest('Memory based sqlite is not persistent.')
Expand Down Expand Up @@ -399,23 +404,24 @@ def test_resume_unack(self):
self.skipTest('Memory based sqlite is not persistent.')


class FILOSQLite3AckQueueTest(unittest.TestCase):
class FILOSQLite3AckQueueTest(SQLite3AckQueueTest):
def setUp(self):
self.path = tempfile.mkdtemp(suffix='filo_sqlackqueue')
self.auto_commit = True
self.queue_class = FILOSQLiteAckQueue

def tearDown(self):
shutil.rmtree(self.path, ignore_errors=True)

def test_open_close_1000(self):
"""Write 1000 items, close, reopen checking if all items are there"""

q = FILOSQLiteAckQueue(self.path, auto_commit=self.auto_commit)
q = self.queue_class(self.path, auto_commit=self.auto_commit)
for i in range(1000):
q.put('var%d' % i)
self.assertEqual(1000, q.qsize())
del q
q = FILOSQLiteAckQueue(self.path)
q = self.queue_class(self.path)
self.assertEqual(1000, q.qsize())
for i in range(1000):
data = q.get()
Expand All @@ -425,14 +431,58 @@ def test_open_close_1000(self):
data = q.get()
self.assertEqual('foobar', data)

def test_multi_threaded_parallel(self):
"""Create consumer and producer threads, check parallelism"""

# self.skipTest("Not supported multi-thread.")

m_queue = self.queue_class(
path=self.path, multithreading=True, auto_commit=self.auto_commit
)

def producer():
for i in range(1000):
m_queue.put('var%d' % i)

def consumer():
# We cannot quarantee what next number will be like in FIFO
for _ in range(1000):
x = m_queue.get(block=True)
self.assertTrue('var' in x)

c = Thread(target=consumer)
c.start()
p = Thread(target=producer)
p.start()
p.join()
c.join()
self.assertEqual(0, m_queue.size)
self.assertEqual(0, len(m_queue))
self.assertRaises(Empty, m_queue.get, block=False)

def test_get_next_in_order(self):
q = self.queue_class(path=self.path)
val1_id = q.put("val1")
q.put("val2")
q.put("val3")
item = q.get(id=val1_id, next_in_order=True)
# item id should be 1
self.assertEqual(val1_id, 1)
# item should get val2
self.assertEqual(item, 'val3')


class SQLite3UniqueAckQueueTest(unittest.TestCase):
# Note
# We have to be carefull to avoid test cases from SQLite3AckQueueTest having
# duplicate values in their q.put()'s. This could block the test indefinitely
class SQLite3UniqueAckQueueTest(SQLite3AckQueueTest):
def setUp(self):
self.path = tempfile.mkdtemp(suffix='sqlackqueue')
self.auto_commit = True
self.queue_class = UniqueAckQ

def test_add_duplicate_item(self):
q = UniqueAckQ(self.path)
q = self.queue_class(self.path)
q.put(1111)
self.assertEqual(1, q.size)
# put duplicate item
Expand All @@ -443,5 +493,5 @@ def test_add_duplicate_item(self):
self.assertEqual(2, q.size)

del q
q = UniqueAckQ(self.path)
q = self.queue_class(self.path)
self.assertEqual(2, q.size)

0 comments on commit 2a5f422

Please sign in to comment.