diff --git a/RocksDbSharp/Native.Marshaled.cs b/RocksDbSharp/Native.Marshaled.cs index 503a9d1..3f158bb 100644 --- a/RocksDbSharp/Native.Marshaled.cs +++ b/RocksDbSharp/Native.Marshaled.cs @@ -84,6 +84,83 @@ public void rocksdb_put( } } + public void rocksdb_transaction_put( + IntPtr txn, + string key, + string val, + out IntPtr errptr, + ColumnFamilyHandle cf = null, + Encoding encoding = null) + { + unsafe + { + if (encoding == null) + encoding = Encoding.UTF8; + fixed (char* k = key, v = val) + { + var klength = key.Length; + var vlength = val.Length; + var bklength = encoding.GetByteCount(k, klength); + var bvlength = encoding.GetByteCount(v, vlength); + var buffer = Marshal.AllocHGlobal(bklength + bvlength); + var bk = (byte*)buffer.ToPointer(); + encoding.GetBytes(k, klength, bk, bklength); + var bv = bk + bklength; + encoding.GetBytes(v, vlength, bv, bvlength); + + if (cf == null) + rocksdb_transaction_put(txn, bk, new UIntPtr((ulong)bklength), bv, new UIntPtr((ulong)bvlength), out errptr); + else + rocksdb_transaction_put_cf(txn, cf.Handle, bk, new UIntPtr((ulong)bklength), bv, new UIntPtr((ulong)bvlength), out errptr); + +#if DEBUG + Zero(bk, bklength); + Zero(bv, bvlength); +#endif + Marshal.FreeHGlobal(buffer); + } + } + } + + public void rocksdb_transactiondb_put( + IntPtr txn_db, + IntPtr write_options, + string key, + string val, + out IntPtr errptr, + ColumnFamilyHandle cf = null, + Encoding encoding = null) + { + unsafe + { + if (encoding == null) + encoding = Encoding.UTF8; + fixed (char* k = key, v = val) + { + var klength = key.Length; + var vlength = val.Length; + var bklength = encoding.GetByteCount(k, klength); + var bvlength = encoding.GetByteCount(v, vlength); + var buffer = Marshal.AllocHGlobal(bklength + bvlength); + var bk = (byte*)buffer.ToPointer(); + encoding.GetBytes(k, klength, bk, bklength); + var bv = bk + bklength; + encoding.GetBytes(v, vlength, bv, bvlength); + + if (cf == null) + rocksdb_transactiondb_put(txn_db, write_options, bk, new UIntPtr((ulong)bklength), bv, new UIntPtr((ulong)bvlength), out errptr); + else + rocksdb_transactiondb_put_cf(txn_db, write_options, cf.Handle, bk, new UIntPtr((ulong)bklength), bv, new UIntPtr((ulong)bvlength), out errptr); + +#if DEBUG + Zero(bk, bklength); + Zero(bv, bvlength); +#endif + Marshal.FreeHGlobal(buffer); + } + } + } + public string rocksdb_get( /*rocksdb_t**/ IntPtr db, /*const rocksdb_readoptions_t**/ IntPtr read_options, @@ -123,6 +200,84 @@ public string rocksdb_get( } } + public string rocksdb_transaction_get( + IntPtr txn, + IntPtr read_options, + string key, + out IntPtr errptr, + ColumnFamilyHandle cf = null, + Encoding encoding = null) + { + if (encoding == null) + encoding = Encoding.UTF8; + unsafe + { + fixed (char* k = key) + { + var klength = key.Length; + var bklength = encoding.GetByteCount(k, klength); + var buffer = Marshal.AllocHGlobal(bklength); + var bk = (byte*)buffer.ToPointer(); + encoding.GetBytes(k, klength, bk, bklength); + + var resultPtr = cf == null + ? rocksdb_transaction_get(txn, read_options, bk, new UIntPtr((ulong)bklength), out UIntPtr bvlength, out errptr) + : rocksdb_transaction_get_cf(txn, read_options, cf.Handle, bk, new UIntPtr((ulong)bklength), out bvlength, out errptr); + +#if DEBUG + Zero(bk, bklength); +#endif + Marshal.FreeHGlobal(buffer); + + if (errptr != IntPtr.Zero) + return null; + if (resultPtr == IntPtr.Zero) + return null; + + return MarshalAndFreeRocksDbString(resultPtr, (long)bvlength.ToUInt64(), encoding); + } + } + } + + public string rocksdb_transactiondb_get( + IntPtr txn_db, + IntPtr read_options, + string key, + out IntPtr errptr, + ColumnFamilyHandle cf = null, + Encoding encoding = null) + { + if (encoding == null) + encoding = Encoding.UTF8; + unsafe + { + fixed (char* k = key) + { + var klength = key.Length; + var bklength = encoding.GetByteCount(k, klength); + var buffer = Marshal.AllocHGlobal(bklength); + var bk = (byte*)buffer.ToPointer(); + encoding.GetBytes(k, klength, bk, bklength); + + var resultPtr = cf == null + ? rocksdb_transactiondb_get(txn_db, read_options, bk, new UIntPtr((ulong)bklength), out UIntPtr bvlength, out errptr) + : rocksdb_transactiondb_get_cf(txn_db, read_options, cf.Handle, bk, new UIntPtr((ulong)bklength), out bvlength, out errptr); + +#if DEBUG + Zero(bk, bklength); +#endif + Marshal.FreeHGlobal(buffer); + + if (errptr != IntPtr.Zero) + return null; + if (resultPtr == IntPtr.Zero) + return null; + + return MarshalAndFreeRocksDbString(resultPtr, (long)bvlength.ToUInt64(), encoding); + } + } + } + private unsafe string MarshalAndFreeRocksDbString(IntPtr resultPtr, long resultLength, Encoding encoding) { var result = CurrentFramework.CreateString((sbyte*)resultPtr.ToPointer(), 0, (int)resultLength, encoding); @@ -156,6 +311,52 @@ public byte[] rocksdb_get( return result; } + public byte[] rocksdb_transaction_get( + IntPtr txn, + IntPtr read_options, + byte[] key, + long keyLength, + out IntPtr errptr, + ColumnFamilyHandle cf = null) + { + var resultPtr = cf == null + ? rocksdb_transaction_get(txn, read_options, key, new UIntPtr((ulong)keyLength), out UIntPtr valueLength, out errptr) + : rocksdb_transaction_get_cf(txn, read_options, cf.Handle, key, new UIntPtr((ulong)keyLength), out valueLength, out errptr); + + if (errptr != IntPtr.Zero) + return null; + if (resultPtr == IntPtr.Zero) + return null; + + var result = new byte[valueLength.ToUInt64()]; + Marshal.Copy(resultPtr, result, 0, (int)valueLength.ToUInt64()); + rocksdb_free(resultPtr); + return result; + } + + public byte[] rocksdb_transactiondb_get( + IntPtr txn_db, + IntPtr read_options, + byte[] key, + long keyLength, + out IntPtr errptr, + ColumnFamilyHandle cf = null) + { + var resultPtr = cf == null + ? rocksdb_transactiondb_get(txn_db, read_options, key, new UIntPtr((ulong)keyLength), out UIntPtr valueLength, out errptr) + : rocksdb_transactiondb_get_cf(txn_db, read_options, cf.Handle, key, new UIntPtr((ulong)keyLength), out valueLength, out errptr); + + if (errptr != IntPtr.Zero) + return null; + if (resultPtr == IntPtr.Zero) + return null; + + var result = new byte[valueLength.ToUInt64()]; + Marshal.Copy(resultPtr, result, 0, (int)valueLength.ToUInt64()); + rocksdb_free(resultPtr); + return result; + } + /// /// Executes a multi_get with automatic marshalling /// @@ -342,6 +543,37 @@ public void rocksdb_delete( rocksdb_delete_cf(db, writeOptions, cf.Handle, bkey, kLength, out errptr); } + public void rocksdb_transaction_delete( + IntPtr txn, + string key, + out IntPtr errptr, + ColumnFamilyHandle cf = null, + Encoding encoding = null) + { + var bkey = (encoding ?? Encoding.UTF8).GetBytes(key); + if (cf == null) + rocksdb_transaction_delete(txn, bkey, new UIntPtr((ulong)bkey.Length), out errptr); + else + rocksdb_transaction_delete_cf(txn, cf.Handle, bkey, new UIntPtr((ulong)bkey.Length), out errptr); + } + + + + public void rocksdb_transactiondb_delete( + IntPtr txn_db, + IntPtr write_options, + string key, + out IntPtr errptr, + ColumnFamilyHandle cf = null, + Encoding encoding = null) + { + var bkey = (encoding ?? Encoding.UTF8).GetBytes(key); + if (cf == null) + rocksdb_transactiondb_delete(txn_db, write_options, bkey, new UIntPtr((ulong)bkey.Length), out errptr); + else + rocksdb_transactiondb_delete_cf(txn_db, cf.Handle, write_options, bkey, new UIntPtr((ulong)bkey.Length), out errptr); + } + public string rocksdb_options_statistics_get_string_marshaled(IntPtr opts) { return MarshalNullTermAsciiStr(rocksdb_options_statistics_get_string(opts)); diff --git a/RocksDbSharp/Native.Wrap.cs b/RocksDbSharp/Native.Wrap.cs index 4643f87..e8d5816 100644 --- a/RocksDbSharp/Native.Wrap.cs +++ b/RocksDbSharp/Native.Wrap.cs @@ -45,6 +45,69 @@ public void rocksdb_put( throw new RocksDbException(errptr); } + public void rocksdb_transaction_put( + IntPtr txn, + string key, + string val, + ColumnFamilyHandle cf = null, + Encoding encoding = null) + { + rocksdb_transaction_put(txn, key, val, out IntPtr errptr, cf, encoding); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + } + + public void rocksdb_transaction_put( + IntPtr txn, + byte[] key, + long keyLength, + byte[] value, + long valueLength, + ColumnFamilyHandle cf) + { + IntPtr errptr; + UIntPtr sklength = (UIntPtr)keyLength; + UIntPtr svlength = (UIntPtr)valueLength; + if (cf == null) + rocksdb_transaction_put(txn, key, sklength, value, svlength, out errptr); + else + rocksdb_transaction_put_cf(txn, cf.Handle, key, sklength, value, svlength, out errptr); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + } + + public void rocksdb_transactiondb_put( + IntPtr txn_db, + IntPtr writeOptions, + string key, + string val, + ColumnFamilyHandle cf = null, + Encoding encoding = null) + { + rocksdb_transactiondb_put(txn_db, writeOptions, key, val, out IntPtr errptr, cf, encoding); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + } + + public void rocksdb_transactiondb_put( + IntPtr txn_db, + IntPtr writeOptions, + byte[] key, + long keyLength, + byte[] value, + long valueLength, + ColumnFamilyHandle cf) + { + IntPtr errptr; + UIntPtr sklength = (UIntPtr)keyLength; + UIntPtr svlength = (UIntPtr)valueLength; + if (cf == null) + rocksdb_transactiondb_put(txn_db, writeOptions, key, sklength, value, svlength, out errptr); + else + rocksdb_transactiondb_put_cf(txn_db, writeOptions, cf.Handle, key, sklength, value, svlength, out errptr); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + } public string rocksdb_get( /*rocksdb_t**/ IntPtr db, @@ -89,6 +152,94 @@ public byte[] rocksdb_get( throw new RocksDbException(errptr); return result; } + + public string rocksdb_transaction_get( + /*rocksdb_t**/ IntPtr txn, + /*const rocksdb_readoptions_t**/ IntPtr read_options, + string key, + ColumnFamilyHandle cf, + Encoding encoding = null) + { + var result = rocksdb_transaction_get(txn, read_options, key, out IntPtr errptr, cf, encoding); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + return result; + } + + public IntPtr rocksdb_transaction_get( + IntPtr txn, + IntPtr read_options, + byte[] key, + long keyLength, + out long vallen, + ColumnFamilyHandle cf) + { + UIntPtr sklength = (UIntPtr)keyLength; + var result = cf == null + ? rocksdb_transaction_get(txn, read_options, key, sklength, out UIntPtr valLength, out IntPtr errptr) + : rocksdb_transaction_get_cf(txn, read_options, cf.Handle, key, sklength, out valLength, out errptr); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + vallen = (long)valLength; + return result; + } + + public byte[] rocksdb_transaction_get( + IntPtr txn, + IntPtr read_options, + byte[] key, + long keyLength = 0, + ColumnFamilyHandle cf = null) + { + var result = rocksdb_transaction_get(txn, read_options, key, keyLength == 0 ? key.Length : keyLength, out IntPtr errptr, cf); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + return result; + } + + public string rocksdb_transactiondb_get( + /*rocksdb_t**/ IntPtr txn_db, + /*const rocksdb_readoptions_t**/ IntPtr read_options, + string key, + ColumnFamilyHandle cf, + Encoding encoding = null) + { + var result = rocksdb_transactiondb_get(txn_db, read_options, key, out IntPtr errptr, cf, encoding); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + return result; + } + + public IntPtr rocksdb_transactiondb_get( + IntPtr txn_db, + IntPtr read_options, + byte[] key, + long keyLength, + out long vallen, + ColumnFamilyHandle cf) + { + UIntPtr sklength = (UIntPtr)keyLength; + var result = cf == null + ? rocksdb_transactiondb_get(txn_db, read_options, key, sklength, out UIntPtr valLength, out IntPtr errptr) + : rocksdb_transactiondb_get_cf(txn_db, read_options, cf.Handle, key, sklength, out valLength, out errptr); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + vallen = (long)valLength; + return result; + } + + public byte[] rocksdb_transactiondb_get( + IntPtr txn_db, + IntPtr read_options, + byte[] key, + long keyLength = 0, + ColumnFamilyHandle cf = null) + { + var result = rocksdb_transactiondb_get(txn_db, read_options, key, keyLength == 0 ? key.Length : keyLength, out IntPtr errptr, cf); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + return result; + } public System.Collections.Generic.KeyValuePair[] rocksdb_multi_get( IntPtr db, @@ -134,6 +285,27 @@ public void rocksdb_delete( throw new RocksDbException(errptr); } + public void rocksdb_transaction_delete( + IntPtr txn, + string key, + ColumnFamilyHandle cf) + { + rocksdb_transaction_delete(txn, key, out IntPtr errptr, cf); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + } + + public void rocksdb_transactiondb_delete( + IntPtr txn_db, + IntPtr writeOptions, + string key, + ColumnFamilyHandle cf) + { + rocksdb_transactiondb_delete(txn_db, writeOptions, key, out IntPtr errptr, cf); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + } + [Obsolete("Use UIntPtr version instead")] public void rocksdb_delete( /*rocksdb_t**/ IntPtr db, @@ -147,6 +319,29 @@ public void rocksdb_delete( throw new RocksDbException(errptr); } + public void rocksdb_transaction_delete( + IntPtr txn, + byte[] key, + long keylen) + { + UIntPtr sklength = (UIntPtr)keylen; + rocksdb_transaction_delete(txn, key, sklength, out IntPtr errptr); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + } + + public void rocksdb_transactiondb_delete( + IntPtr txn_db, + IntPtr writeOptions, + byte[] key, + long keylen) + { + UIntPtr sklength = (UIntPtr)keylen; + rocksdb_transactiondb_delete(txn_db, writeOptions, key, sklength, out IntPtr errptr); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + } + [Obsolete("Use UIntPtr version instead")] public void rocksdb_delete_cf( /*rocksdb_t**/ IntPtr db, @@ -160,6 +355,29 @@ public void rocksdb_delete_cf( throw new RocksDbException(errptr); } + public void rocksdb_transaction_delete_cf( + IntPtr txn, + byte[] key, + long keylen, + ColumnFamilyHandle cf) + { + rocksdb_transaction_delete_cf(txn, cf.Handle, key, (UIntPtr)keylen, out IntPtr errptr); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + } + + public void rocksdb_transactiondb_delete_cf( + IntPtr txn_db, + IntPtr writeOptions, + byte[] key, + long keylen, + ColumnFamilyHandle cf) + { + rocksdb_transactiondb_delete_cf(txn_db, writeOptions, cf.Handle, key, (UIntPtr)keylen, out IntPtr errptr); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + } + [Obsolete("Use UIntPtr version instead")] public void rocksdb_ingest_external_file(IntPtr db, string[] file_list, ulong list_len, IntPtr opt) { diff --git a/RocksDbSharp/RocksDb.cs b/RocksDbSharp/RocksDb.cs index 770898b..ae711ad 100644 --- a/RocksDbSharp/RocksDb.cs +++ b/RocksDbSharp/RocksDb.cs @@ -239,7 +239,7 @@ public Iterator[] NewIterators(ColumnFamilyHandle[] cfs, ReadOptions[] readOptio public Snapshot CreateSnapshot() { IntPtr snapshotHandle = Native.Instance.rocksdb_create_snapshot(Handle); - return new Snapshot(Handle, snapshotHandle); + return new Snapshot(Handle, snapshotHandle, () => Native.Instance.rocksdb_release_snapshot(Handle, snapshotHandle)); } public static IEnumerable ListColumnFamilies(DbOptions options, string name) diff --git a/RocksDbSharp/Snapshot.cs b/RocksDbSharp/Snapshot.cs index 85af5ff..18f036e 100644 --- a/RocksDbSharp/Snapshot.cs +++ b/RocksDbSharp/Snapshot.cs @@ -13,12 +13,15 @@ namespace RocksDbSharp public class Snapshot : IDisposable { private IntPtr dbHandle; - public IntPtr Handle { get; private set; } + private Action releaseAction; - internal Snapshot(IntPtr dbHandle, IntPtr snapshotHandle) + public IntPtr Handle { get; private set; } + + internal Snapshot(IntPtr dbHandle, IntPtr snapshotHandle, Action doRelease) { this.dbHandle = dbHandle; Handle = snapshotHandle; + releaseAction = doRelease; } public void Dispose() @@ -26,7 +29,7 @@ public void Dispose() if (Handle != IntPtr.Zero) { #if !NODESTROY - Native.Instance.rocksdb_release_snapshot(dbHandle, Handle); + releaseAction?.Invoke(); #endif Handle = IntPtr.Zero; } diff --git a/RocksDbSharp/Transaction.cs b/RocksDbSharp/Transaction.cs new file mode 100644 index 0000000..f4bc502 --- /dev/null +++ b/RocksDbSharp/Transaction.cs @@ -0,0 +1,148 @@ +using System; +using System.Runtime.InteropServices; +using System.Text; +using Transitional; + +namespace RocksDbSharp +{ + public class Transaction : IDisposable + { + internal ReadOptions DefaultReadOptions { get; set; } + internal WriteOptions WriteOptions { get; set; } + internal TransactionOptions Options { get; set; } + internal static Encoding DefaultEncoding => Encoding.UTF8; + + public IntPtr Handle { get; private set; } + + internal Transaction(IntPtr h, WriteOptions wo, TransactionOptions to) + { + Handle = h; + DefaultReadOptions = new ReadOptions(); + WriteOptions = wo; + Options = to; + } + + public void Dispose() + { + if (Handle != IntPtr.Zero) + Native.Instance.rocksdb_transaction_destroy(Handle); + Handle = IntPtr.Zero; + } + + public void Commit() + { + IntPtr err; + Native.Instance.rocksdb_transaction_commit(Handle, out err); + if (err != IntPtr.Zero) + throw new RocksDbException(err); + } + + public void Rollback() + { + IntPtr err; + Native.Instance.rocksdb_transaction_rollback(Handle, out err); + if (err != IntPtr.Zero) + throw new RocksDbException(err); + } + + public void Put(string key, string value, ColumnFamilyHandle cf = null, Encoding encoding = null) + { + Native.Instance.rocksdb_transaction_put(Handle, key, value, cf, encoding ?? DefaultEncoding); + } + + public void Put(byte[] key, byte[] value, ColumnFamilyHandle cf = null) + { + Put(key, key.GetLongLength(0), value, value.GetLongLength(0), cf); + } + + public void Put(byte[] key, long keyLength, byte[] value, long valueLength, ColumnFamilyHandle cf = null) + { + Native.Instance.rocksdb_transaction_put(Handle, key, keyLength, value, valueLength, cf); + } + + public string Get(string key, ColumnFamilyHandle cf = null, ReadOptions readOptions = null, Encoding encoding = null) + { + return Native.Instance.rocksdb_transaction_get(Handle, (readOptions ?? DefaultReadOptions).Handle, key, cf, encoding ?? DefaultEncoding); + } + + public byte[] Get(byte[] key, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + return Get(key, key.GetLongLength(0), cf, readOptions); + } + + public byte[] Get(byte[] key, long keyLength, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + return Native.Instance.rocksdb_transaction_get(Handle, (readOptions ?? DefaultReadOptions).Handle, key, keyLength, cf); + } + + /// + /// Reads the contents of the database value associated with , if present, into the supplied + /// at up to bytes, returning the + /// length of the value in the database, or -1 if the key is not present. + /// + /// + /// + /// + /// + /// + /// + /// The actual length of the database field if it exists, otherwise -1 + public long Get(byte[] key, byte[] buffer, long offset, long length, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + return Get(key, key.GetLongLength(0), buffer, offset, length, cf, readOptions); + } + + /// + /// Reads the contents of the database value associated with , if present, into the supplied + /// at up to bytes, returning the + /// length of the value in the database, or -1 if the key is not present. + /// + /// + /// + /// + /// + /// + /// + /// The actual length of the database field if it exists, otherwise -1 + public long Get(byte[] key, long keyLength, byte[] buffer, long offset, long length, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + unsafe + { + var ptr = Native.Instance.rocksdb_transaction_get(Handle, (readOptions ?? DefaultReadOptions).Handle, key, keyLength, out long valLength, cf); + if (ptr == IntPtr.Zero) + return -1; + var copyLength = Math.Min(length, valLength); + Marshal.Copy(ptr, buffer, (int)offset, (int)copyLength); + Native.Instance.rocksdb_free(ptr); + return valLength; + } + } + + public void Remove(string key, ColumnFamilyHandle cf = null) + { + Native.Instance.rocksdb_transaction_delete(Handle, key, cf); + } + + public void Remove(byte[] key, ColumnFamilyHandle cf = null) + { + Remove(key, key.Length, cf); + } + + public void Remove(byte[] key, long keyLength, ColumnFamilyHandle cf = null) + { + if (cf == null) + Native.Instance.rocksdb_transaction_delete(Handle, key, keyLength); + else + Native.Instance.rocksdb_transaction_delete_cf(Handle, key, keyLength, cf); + } + + public Iterator NewIterator(ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + IntPtr iteratorHandle = cf == null + ? Native.Instance.rocksdb_transaction_create_iterator(Handle, (readOptions ?? DefaultReadOptions).Handle) + : Native.Instance.rocksdb_transaction_create_iterator_cf(Handle, (readOptions ?? DefaultReadOptions).Handle, cf.Handle); + // Note: passing in read options here only to ensure that it is not collected before the iterator + return new Iterator(iteratorHandle, readOptions); + } + } +} diff --git a/RocksDbSharp/TransactionDb.cs b/RocksDbSharp/TransactionDb.cs new file mode 100644 index 0000000..438d4a5 --- /dev/null +++ b/RocksDbSharp/TransactionDb.cs @@ -0,0 +1,201 @@ +using System; +using System.Collections.Generic; +using System.Dynamic; +using System.Runtime.InteropServices; +using System.Text; +using Transitional; + +namespace RocksDbSharp +{ + public class TransactionDb : IDisposable + { + public IntPtr Handle { get; private set; } + internal DbOptions DbOptions { get; private set; } + internal TransactionDbOptions TDbOptions { get; private set; } + public string Name { get; private set; } + internal ReadOptions DefaultReadOptions { get; set; } = new ReadOptions(); + internal WriteOptions DefaultWriteOptions { get; set; } = new WriteOptions(); + internal static Encoding DefaultEncoding => Encoding.UTF8; + private Dictionary columnFamilies; + + // Managed references to unmanaged resources that need to live at least as long as the db + internal dynamic References { get; } = new ExpandoObject(); + + private TransactionDb(IntPtr h, DbOptions db_options, TransactionDbOptions txn_db_options, dynamic cfOptionsRefs, Dictionary columnFamilies = null) + { + Handle = h; + DbOptions = db_options; + TDbOptions = txn_db_options; + References.CfOptions = cfOptionsRefs; + this.columnFamilies = columnFamilies ?? new Dictionary(); + } + + public void Dispose() + { + foreach (var cfh in columnFamilies.Values) + cfh.Dispose(); + + if (Handle != IntPtr.Zero) + Native.Instance.rocksdb_transactiondb_close(Handle); + Handle = IntPtr.Zero; + } + + public Transaction BeginTransaction(WriteOptions wo, TransactionOptions to, Transaction prev = null) + { + IntPtr handle = Native.Instance.rocksdb_transaction_begin(Handle, wo.Handle, to.Handle, (prev != null) ? prev.Handle : IntPtr.Zero); + + return new Transaction(handle, wo, to); + } + + public static TransactionDb Open(DbOptions options, TransactionDbOptions txn_db_options, string path) + { + IntPtr db = Native.Instance.rocksdb_transactiondb_open(options.Handle, txn_db_options.Handle, path); + return new TransactionDb(db, options, txn_db_options, null); + } + + /// + /// Usage: + /// + /// + /// + public Checkpoint Checkpoint() + { + var checkpoint = Native.Instance.rocksdb_transactiondb_checkpoint_object_create(Handle); + return new Checkpoint(checkpoint); + } + + public Snapshot CreateSnapshot() + { + IntPtr snapshotHandle = Native.Instance.rocksdb_transactiondb_create_snapshot(Handle); + return new Snapshot(Handle, snapshotHandle, () => Native.Instance.rocksdb_transactiondb_release_snapshot(Handle, snapshotHandle)); + } + + public ColumnFamilyHandle CreateColumnFamily(ColumnFamilyOptions cfOptions, string name) + { + var cfh = Native.Instance.rocksdb_transactiondb_create_column_family(Handle, cfOptions.Handle, name); + var cfhw = new ColumnFamilyHandleInternal(cfh); + columnFamilies.Add(name, cfhw); + return cfhw; + } + + public ColumnFamilyHandle GetDefaultColumnFamily() + { + return GetColumnFamily(ColumnFamilies.DefaultName); + } + + public ColumnFamilyHandle GetColumnFamily(string name) + { + if (columnFamilies == null) + throw new RocksDbSharpException("Database not opened for column families"); + return columnFamilies[name]; + } + + public void Put(string key, string value, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null, Encoding encoding = null) + { + Native.Instance.rocksdb_transactiondb_put(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, value, cf, encoding ?? DefaultEncoding); + } + + public void Put(byte[] key, byte[] value, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) + { + Put(key, key.GetLongLength(0), value, value.GetLongLength(0), cf, writeOptions); + } + + public void Put(byte[] key, long keyLength, byte[] value, long valueLength, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) + { + Native.Instance.rocksdb_transactiondb_put(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, keyLength, value, valueLength, cf); + } + + public string Get(string key, ColumnFamilyHandle cf = null, ReadOptions readOptions = null, Encoding encoding = null) + { + return Native.Instance.rocksdb_transactiondb_get(Handle, (readOptions ?? DefaultReadOptions).Handle, key, cf, encoding ?? DefaultEncoding); + } + + public byte[] Get(byte[] key, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + return Get(key, key.GetLongLength(0), cf, readOptions); + } + + public byte[] Get(byte[] key, long keyLength, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + return Native.Instance.rocksdb_transactiondb_get(Handle, (readOptions ?? DefaultReadOptions).Handle, key, keyLength, cf); + } + + /// + /// Reads the contents of the database value associated with , if present, into the supplied + /// at up to bytes, returning the + /// length of the value in the database, or -1 if the key is not present. + /// + /// + /// + /// + /// + /// + /// + /// The actual length of the database field if it exists, otherwise -1 + public long Get(byte[] key, byte[] buffer, long offset, long length, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + return Get(key, key.GetLongLength(0), buffer, offset, length, cf, readOptions); + } + + /// + /// Reads the contents of the database value associated with , if present, into the supplied + /// at up to bytes, returning the + /// length of the value in the database, or -1 if the key is not present. + /// + /// + /// + /// + /// + /// + /// + /// The actual length of the database field if it exists, otherwise -1 + public long Get(byte[] key, long keyLength, byte[] buffer, long offset, long length, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + unsafe + { + var ptr = Native.Instance.rocksdb_transactiondb_get(Handle, (readOptions ?? DefaultReadOptions).Handle, key, keyLength, out long valLength, cf); + if (ptr == IntPtr.Zero) + return -1; + var copyLength = Math.Min(length, valLength); + Marshal.Copy(ptr, buffer, (int)offset, (int)copyLength); + Native.Instance.rocksdb_free(ptr); + return valLength; + } + } + + public void Write(WriteBatch writeBatch, WriteOptions writeOptions = null) + { + Native.Instance.rocksdb_transactiondb_write(Handle, (writeOptions ?? DefaultWriteOptions).Handle, writeBatch.Handle); + } + + public void Remove(string key, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) + { + Native.Instance.rocksdb_transactiondb_delete(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, cf); + } + + public void Remove(byte[] key, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) + { + Remove(key, key.Length, cf, writeOptions); + } + + public void Remove(byte[] key, long keyLength, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) + { + if (cf == null) + Native.Instance.rocksdb_transactiondb_delete(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, keyLength); + else + Native.Instance.rocksdb_transactiondb_delete_cf(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, keyLength, cf); + } + + public Iterator NewIterator(ReadOptions readOptions = null) + { + IntPtr iteratorHandle = Native.Instance.rocksdb_transactiondb_create_iterator(Handle, (readOptions ?? DefaultReadOptions).Handle); + // Note: passing in read options here only to ensure that it is not collected before the iterator + return new Iterator(iteratorHandle, readOptions); + } + } +} diff --git a/RocksDbSharp/TransactionDbOptions.cs b/RocksDbSharp/TransactionDbOptions.cs new file mode 100644 index 0000000..f371c61 --- /dev/null +++ b/RocksDbSharp/TransactionDbOptions.cs @@ -0,0 +1,46 @@ +using System; + +namespace RocksDbSharp +{ + public class TransactionDbOptions + { + public IntPtr Handle { get; private set; } + + public TransactionDbOptions() + { + Handle = Native.Instance.rocksdb_transactiondb_options_create(); + } + + ~TransactionDbOptions() + { + if (Handle != IntPtr.Zero) + Native.Instance.rocksdb_transactiondb_options_destroy(Handle); + Handle = IntPtr.Zero; + } + + public TransactionDbOptions SetMaxNumLocks(long max_num_locks) + { + Native.Instance.rocksdb_transactiondb_options_set_max_num_locks(Handle, max_num_locks); + return this; + } + + public TransactionDbOptions SetNumStripes(ulong num_stripes) + { + Native.Instance.rocksdb_transactiondb_options_set_num_stripes(Handle, new UIntPtr(num_stripes)); + return this; + } + + public TransactionDbOptions SetTransactionLockTimeout(long txn_lock_timeout) + { + Native.Instance.rocksdb_transactiondb_options_set_transaction_lock_timeout(Handle, txn_lock_timeout); + return this; + } + + public TransactionDbOptions SetDefaultLockTimeout(long default_lock_timeout) + { + Native.Instance.rocksdb_transactiondb_options_set_default_lock_timeout(Handle, default_lock_timeout); + return this; + } + + } +} diff --git a/RocksDbSharp/TransactionOptions.cs b/RocksDbSharp/TransactionOptions.cs new file mode 100644 index 0000000..e2e5bfd --- /dev/null +++ b/RocksDbSharp/TransactionOptions.cs @@ -0,0 +1,57 @@ +using System; + +namespace RocksDbSharp +{ + public class TransactionOptions + { + public IntPtr Handle { get; private set; } + + public TransactionOptions() + { + Handle = Native.Instance.rocksdb_transaction_options_create(); + } + + ~TransactionOptions() + { + if (Handle != IntPtr.Zero) + Native.Instance.rocksdb_transaction_options_destroy(Handle); + Handle = IntPtr.Zero; + } + + public TransactionOptions SetSetSnapshot(bool v) + { + Native.Instance.rocksdb_transaction_options_set_set_snapshot(Handle, v); + return this; + } + + public TransactionOptions SetDeadlockDetect(bool v) + { + Native.Instance.rocksdb_transaction_options_set_deadlock_detect(Handle, v); + return this; + } + + public TransactionOptions SetLockTimeout(long timeout) + { + Native.Instance.rocksdb_transaction_options_set_lock_timeout(Handle, timeout); + return this; + } + + public TransactionOptions SetExpiration(long expiration) + { + Native.Instance.rocksdb_transaction_options_set_expiration(Handle, expiration); + return this; + } + + public TransactionOptions SetDeadlockDetectDepth(long depth) + { + Native.Instance.rocksdb_transaction_options_set_deadlock_detect_depth(Handle, depth); + return this; + } + + public TransactionOptions SetMaxWriteBatchSize(ulong size) + { + Native.Instance.rocksdb_transaction_options_set_max_write_batch_size(Handle, new UIntPtr(size)); + return this; + } + } +} diff --git a/tests/RocksDbSharpTest/FunctionalTests.cs b/tests/RocksDbSharpTest/FunctionalTests.cs index d90568e..2a4357b 100644 --- a/tests/RocksDbSharpTest/FunctionalTests.cs +++ b/tests/RocksDbSharpTest/FunctionalTests.cs @@ -465,6 +465,150 @@ public void FunctionalTest() Directory.Delete(dbname, true); } + // TransactionDb tests + var testtxdb = Path.Combine(testdir, "txdb"); + var testtxcp = Path.Combine(testdir, "txcp"); + var txdbpath = Environment.ExpandEnvironmentVariables(testtxdb); + var txcppath = Environment.ExpandEnvironmentVariables(testtxcp); + var txdboptions = new TransactionDbOptions(); + + using (var db = TransactionDb.Open(options, txdboptions, txdbpath)) + { + // With strings + string value = db.Get("key"); + db.Put("key", "value"); + Assert.Equal("value", db.Get("key")); + Assert.Null(db.Get("non-existent-key")); + db.Remove("key"); + Assert.Null(db.Get("value")); + + // With bytes + db.Put(Encoding.UTF8.GetBytes("key"), Encoding.UTF8.GetBytes("value")); + Assert.True(BinaryComparer.Default.Equals(Encoding.UTF8.GetBytes("value"), db.Get(Encoding.UTF8.GetBytes("key")))); + // non-existent kiey + Assert.Null(db.Get(new byte[] { 0, 1, 2 })); + db.Remove(Encoding.UTF8.GetBytes("key")); + Assert.Null(db.Get(Encoding.UTF8.GetBytes("key"))); + + db.Put(Encoding.UTF8.GetBytes("key"), new byte[] { 0, 1, 2, 3, 4, 5, 6, 7 }); + + // With buffers + var buffer = new byte[100]; + long length = db.Get(Encoding.UTF8.GetBytes("key"), buffer, 0, buffer.Length); + Assert.Equal(8, length); + Assert.Equal(new byte[] { 0, 1, 2, 3, 4, 5, 6, 7 }, buffer.Take((int)length).ToList()); + + buffer = new byte[5]; + length = db.Get(Encoding.UTF8.GetBytes("key"), buffer, 0, buffer.Length); + Assert.Equal(8, length); + Assert.Equal(new byte[] { 0, 1, 2, 3, 4 }, buffer.Take((int)Math.Min(buffer.Length, length))); + + length = db.Get(Encoding.UTF8.GetBytes("bogus"), buffer, 0, buffer.Length); + Assert.Equal(-1, length); + + // Write batches + // With strings + using (WriteBatch batch = new WriteBatch() + .Put("one", "uno") + .Put("two", "deuce") + .Put("two", "dos") + .Put("three", "tres")) + { + db.Write(batch); + } + Assert.Equal("uno", db.Get("one")); + + // With save point + using (WriteBatch batch = new WriteBatch()) + { + batch + .Put("hearts", "red") + .Put("diamonds", "red"); + batch.SetSavePoint(); + batch + .Put("clubs", "black"); + batch.SetSavePoint(); + batch + .Put("spades", "black"); + batch.RollbackToSavePoint(); + db.Write(batch); + } + Assert.Equal("red", db.Get("diamonds")); + Assert.Equal("black", db.Get("clubs")); + Assert.Null(db.Get("spades")); + + // Save a checkpoint + using (var cp = db.Checkpoint()) + { + cp.Save(txcppath); + } + + // With bytes + var utf8 = Encoding.UTF8; + using (WriteBatch batch = new WriteBatch() + .Put(utf8.GetBytes("four"), new byte[] { 4, 4, 4 }) + .Put(utf8.GetBytes("five"), new byte[] { 5, 5, 5 })) + { + db.Write(batch); + } + Assert.True(BinaryComparer.Default.Equals(new byte[] { 4, 4, 4 }, db.Get(utf8.GetBytes("four")))); + + // Snapshots + using (var snapshot = db.CreateSnapshot()) + { + var before = db.Get("one"); + db.Put("one", "1"); + + var useSnapshot = new ReadOptions() + .SetSnapshot(snapshot); + + // the database value was written + Assert.Equal("1", db.Get("one")); + // but the snapshot still sees the old version + var after = db.Get("one", readOptions: useSnapshot); + Assert.Equal(before, after); + } + + var two = db.Get("two"); + Assert.Equal("dos", two); + + // Iterators + using (var iterator = db.NewIterator( + readOptions: new ReadOptions() + .SetIterateUpperBound("t") + )) + { + iterator.Seek("k"); + Assert.True(iterator.Valid()); + Assert.Equal("key", iterator.StringKey()); + iterator.Next(); + Assert.True(iterator.Valid()); + Assert.Equal("one", iterator.StringKey()); + Assert.Equal("1", iterator.StringValue()); + iterator.Next(); + Assert.False(iterator.Valid()); + } + + // Transaction + using (var transaction = db.BeginTransaction(new WriteOptions(), new TransactionOptions())) + { + Assert.Equal("dos", transaction.Get("two")); + transaction.Put("two", "2"); + Assert.Equal("dos", db.Get("two")); + transaction.Commit(); + } + Assert.Equal("2", db.Get("two")); + } + + // Test reading checkpointed db + using (var cpdb = TransactionDb.Open(options, txdboptions, txcppath)) + { + Assert.Equal("red", cpdb.Get("diamonds")); + Assert.Equal("black", cpdb.Get("clubs")); + Assert.Null(cpdb.Get("spades")); + // Checkpoint occurred before these changes: + Assert.Null(cpdb.Get("four")); + } } class IntegerStringComparator : StringComparatorBase