From 9779f9f8b000f72ded779a9f2445191cfef53928 Mon Sep 17 00:00:00 2001 From: Peng Zang Date: Tue, 11 Nov 2014 09:30:48 -0500 Subject: [PATCH 01/12] add high level kafka consumer, integrate with zookeeper client to use group id for multiple clients. --- src/kafka-net/HLConsumer.cs | 100 +++++++++++++++++++++++++++++ src/kafka-net/Protocol/Message.cs | 7 ++ src/kafka-net/kafka-net.csproj | 8 +++ src/kafka-net/packages.config | 5 ++ src/kafka-tests/kafka-tests.csproj | 6 ++ src/kafka-tests/packages.config | 2 + 6 files changed, 128 insertions(+) create mode 100644 src/kafka-net/HLConsumer.cs create mode 100644 src/kafka-net/packages.config diff --git a/src/kafka-net/HLConsumer.cs b/src/kafka-net/HLConsumer.cs new file mode 100644 index 00000000..030a04cd --- /dev/null +++ b/src/kafka-net/HLConsumer.cs @@ -0,0 +1,100 @@ +/* + * Created by SharpDevelop. + * User: peng.zang + * Date: 11/10/2014 + * Time: 2:44 PM + * + * To change this template use Tools | Options | Coding | Edit Standard Headers. + */ +using System; +using System.Text; +using System.Linq; +using System.Collections.Generic; + +using KafkaNet.Model; +using KafkaNet.Protocol; + +using ZooKeeperNet; + +namespace KafkaNet +{ + /// + /// Description of HLConsumer. + /// + public class HLConsumer + { + KafkaOptions _options ; + BrokerRouter _router ; + Consumer _consumer; + ZooKeeper _zookeeper; + string _topic; + IWatcher _watcher; + + public HLConsumer(string topic, List brokerList, string zookps, TimeSpan? timeout = null, IWatcher watcher = null) + { + _options = new KafkaOptions(); + _options.KafkaServerUri = brokerList.ConvertAll(x => new Uri(x)); + + _router = new BrokerRouter(_options); + _consumer = new KafkaNet.Consumer(new ConsumerOptions(topic, _router)); + + _topic = topic; + + _zookeeper = new ZooKeeper(zookps, timeout.HasValue ? timeout.Value : new TimeSpan(7,0,0) , watcher); + _watcher = watcher; + } + + public IEnumerable consume(string groupID){ + var path = new StringBuilder( "/consumers" ); //+ groupID + "/offsets/" + this._topic ; + var p = "/consumers/"+groupID+"/offsets/"+this._topic; + try { + if(_zookeeper.Exists(p , _watcher) ==null){ + if(_zookeeper.Exists(path.ToString(), _watcher) ==null){ + _zookeeper.Create(path.ToString(), new byte[0], Ids.CREATOR_ALL_ACL, CreateMode.Persistent); + + } else if(_zookeeper.Exists(path.Append("/").Append(groupID).ToString(), _watcher) == null){ + _zookeeper.Create(path.ToString(), new byte[0], Ids.CREATOR_ALL_ACL, CreateMode.Persistent); + + } else if (_zookeeper.Exists(path.Append("/offsets").ToString(), _watcher) == null){ + _zookeeper.Create(path.ToString(), new byte[0], Ids.CREATOR_ALL_ACL, CreateMode.Persistent); + + } else if (_zookeeper.Exists(path.Append("/").Append(this._topic).ToString(), _watcher) == null){ + _zookeeper.Create(path.ToString(), new byte[0], Ids.CREATOR_ALL_ACL, CreateMode.Persistent); + } + + var common = new MetadataQueries(_router); + var offsets = common.GetTopicOffsetAsync(groupID).Result; + _consumer.SetOffsetPosition(offsets.Select(x => new OffsetPosition(x.PartitionId, x.Offsets.Min())).ToArray()); + offsets.ForEach(off => { + _zookeeper.Create(p + "/" + off.PartitionId.ToString(), + System.Text.Encoding.UTF8.GetBytes(off.Offsets.Min().ToString()), + Ids.OPEN_ACL_UNSAFE, + CreateMode.PersistentSequential); + }); + } + else { + var children = _zookeeper.GetChildren( p, _watcher); //TODO: add watcher and stat support. + var offsets = new List(); + children.ToList().ForEach(x => { + int partition; + if(int.TryParse(x, out partition) ){ + var data = _zookeeper.GetData(p + "/" + partition, _watcher, null); + if(data != null && data.Length >0){ + long offset = 0; + if(long.TryParse(System.Text.Encoding.Default.GetString(data), out offset)){ +// _consumer.SetOffsetPosition(new OffsetPosition(partition, offset)); + offsets.Add(new OffsetPosition(partition, offset)); + } + } + } + }); + _consumer.SetOffsetPosition(offsets.ToArray()); + } + } catch (Exception e) { + //TODO: Log the error, or handle it? + } + + return _consumer.Consume(); + } + } +} diff --git a/src/kafka-net/Protocol/Message.cs b/src/kafka-net/Protocol/Message.cs index f23cb92d..7f5573d0 100644 --- a/src/kafka-net/Protocol/Message.cs +++ b/src/kafka-net/Protocol/Message.cs @@ -181,6 +181,13 @@ public static IEnumerable DecodeMessage(long offset, byte[] payload) throw new NotSupportedException(string.Format("Codec type of {0} is not supported.", codec)); } } + + public override string ToString() + { + return string.Format("[Message Meta={0}, MagicNumber={1}, Attribute={2}, Key={3}, Value={4}]", Meta, MagicNumber, Attribute, + Key==null? "null" : System.Text.Encoding.Default.GetString(Key), Value==null? "null" : System.Text.Encoding.Default.GetString(Value)); + } + } /// diff --git a/src/kafka-net/kafka-net.csproj b/src/kafka-net/kafka-net.csproj index 8e15f79d..97f59e2d 100644 --- a/src/kafka-net/kafka-net.csproj +++ b/src/kafka-net/kafka-net.csproj @@ -33,6 +33,9 @@ false + + ..\packages\log4net.2.0.3\lib\net40-full\log4net.dll + @@ -40,10 +43,14 @@ + + ..\packages\ZooKeeper.Net.3.4.6.0\lib\net40\ZooKeeperNet.dll + + @@ -86,6 +93,7 @@ + diff --git a/src/kafka-net/packages.config b/src/kafka-net/packages.config new file mode 100644 index 00000000..0b9e3efc --- /dev/null +++ b/src/kafka-net/packages.config @@ -0,0 +1,5 @@ + + + + + \ No newline at end of file diff --git a/src/kafka-tests/kafka-tests.csproj b/src/kafka-tests/kafka-tests.csproj index 342fe6e3..5c9fc134 100644 --- a/src/kafka-tests/kafka-tests.csproj +++ b/src/kafka-tests/kafka-tests.csproj @@ -32,6 +32,9 @@ 4 + + ..\packages\log4net.2.0.3\lib\net40-full\log4net.dll + ..\packages\Moq.4.0.10827\lib\NET40\Moq.dll @@ -58,6 +61,9 @@ + + ..\packages\ZooKeeper.Net.3.4.6.0\lib\net40\ZooKeeperNet.dll + diff --git a/src/kafka-tests/packages.config b/src/kafka-tests/packages.config index d33abcce..b77bf776 100644 --- a/src/kafka-tests/packages.config +++ b/src/kafka-tests/packages.config @@ -1,9 +1,11 @@  + + \ No newline at end of file From ac999116597b71208f929a39e15d05c619409712 Mon Sep 17 00:00:00 2001 From: Peng Zang Date: Tue, 11 Nov 2014 09:56:31 -0500 Subject: [PATCH 02/12] code refactoring. --- src/kafka-net/HLConsumer.cs | 50 +++++++++++++++++++++---------------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/src/kafka-net/HLConsumer.cs b/src/kafka-net/HLConsumer.cs index 030a04cd..cbe8d371 100644 --- a/src/kafka-net/HLConsumer.cs +++ b/src/kafka-net/HLConsumer.cs @@ -19,7 +19,7 @@ namespace KafkaNet { /// - /// Description of HLConsumer. + /// High level consumer using zookeeper for coordination. /// public class HLConsumer { @@ -45,31 +45,18 @@ public HLConsumer(string topic, List brokerList, string zookps, TimeSpan } public IEnumerable consume(string groupID){ - var path = new StringBuilder( "/consumers" ); //+ groupID + "/offsets/" + this._topic ; var p = "/consumers/"+groupID+"/offsets/"+this._topic; try { if(_zookeeper.Exists(p , _watcher) ==null){ - if(_zookeeper.Exists(path.ToString(), _watcher) ==null){ - _zookeeper.Create(path.ToString(), new byte[0], Ids.CREATOR_ALL_ACL, CreateMode.Persistent); - - } else if(_zookeeper.Exists(path.Append("/").Append(groupID).ToString(), _watcher) == null){ - _zookeeper.Create(path.ToString(), new byte[0], Ids.CREATOR_ALL_ACL, CreateMode.Persistent); - - } else if (_zookeeper.Exists(path.Append("/offsets").ToString(), _watcher) == null){ - _zookeeper.Create(path.ToString(), new byte[0], Ids.CREATOR_ALL_ACL, CreateMode.Persistent); - - } else if (_zookeeper.Exists(path.Append("/").Append(this._topic).ToString(), _watcher) == null){ - _zookeeper.Create(path.ToString(), new byte[0], Ids.CREATOR_ALL_ACL, CreateMode.Persistent); - } - + CreateZookeeperPath("/consumers","/"+groupID, "/offsets", "/"+this._topic); var common = new MetadataQueries(_router); var offsets = common.GetTopicOffsetAsync(groupID).Result; _consumer.SetOffsetPosition(offsets.Select(x => new OffsetPosition(x.PartitionId, x.Offsets.Min())).ToArray()); offsets.ForEach(off => { _zookeeper.Create(p + "/" + off.PartitionId.ToString(), - System.Text.Encoding.UTF8.GetBytes(off.Offsets.Min().ToString()), - Ids.OPEN_ACL_UNSAFE, - CreateMode.PersistentSequential); + System.Text.Encoding.UTF8.GetBytes(off.Offsets.Min().ToString()), + Ids.OPEN_ACL_UNSAFE, + CreateMode.PersistentSequential); }); } else { @@ -82,7 +69,6 @@ public IEnumerable consume(string groupID){ if(data != null && data.Length >0){ long offset = 0; if(long.TryParse(System.Text.Encoding.Default.GetString(data), out offset)){ -// _consumer.SetOffsetPosition(new OffsetPosition(partition, offset)); offsets.Add(new OffsetPosition(partition, offset)); } } @@ -90,11 +76,33 @@ public IEnumerable consume(string groupID){ }); _consumer.SetOffsetPosition(offsets.ToArray()); } - } catch (Exception e) { - //TODO: Log the error, or handle it? + } catch (Exception) { + //TODO: Log the error, or handle it? } return _consumer.Consume(); } + + /// + /// create zookeeper path hierarchically. + /// + /// paths by level, the next path should be append to the previous one to form a valid path + /// + public bool CreateZookeeperPath(params string[] path){ + var sb = new StringBuilder(); + var success = true; + try{ + for (int i = 0; i < path.Length; i++) { + sb.Append(path[i]); + if(_zookeeper.Exists(sb.ToString(), _watcher) == null){ + _zookeeper.Create(sb.ToString(), new byte[0], Ids.CREATOR_ALL_ACL, CreateMode.Persistent); + } + } + } catch (Exception){ + success = false; + //TODO: + } + return success; + } } } From 971954e0f9324badab15a81af490bf18ef4bd7b2 Mon Sep 17 00:00:00 2001 From: Peng Zang Date: Tue, 11 Nov 2014 17:18:31 -0500 Subject: [PATCH 03/12] add native high level consumer, which directly get consumer offset from kafka instead of zookeeper; Need to add a check in the RefreshOffsets when offsets in record/zookeeper are out of the bound of actual logs. --- src/kafka-net/Consumer.cs | 26 ++-- src/kafka-net/HLConsumer.cs | 16 ++- src/kafka-net/Interfaces/IMetadataQueries.cs | 2 +- src/kafka-net/NativeHLConsumer.cs | 128 +++++++++++++++++++ src/kafka-net/Protocol/Message.cs | 5 + src/kafka-net/kafka-net.csproj | 1 + 6 files changed, 161 insertions(+), 17 deletions(-) create mode 100644 src/kafka-net/NativeHLConsumer.cs diff --git a/src/kafka-net/Consumer.cs b/src/kafka-net/Consumer.cs index 681fcd21..68531614 100755 --- a/src/kafka-net/Consumer.cs +++ b/src/kafka-net/Consumer.cs @@ -18,17 +18,17 @@ namespace KafkaNet /// public class Consumer : IMetadataQueries, IDisposable { - private readonly ConsumerOptions _options; - private readonly BlockingCollection _fetchResponseQueue; - private readonly CancellationTokenSource _disposeToken = new CancellationTokenSource(); - private readonly ConcurrentDictionary _partitionPollingIndex = new ConcurrentDictionary(); - private readonly ConcurrentDictionary _partitionOffsetIndex = new ConcurrentDictionary(); - private readonly IScheduledTimer _topicPartitionQueryTimer; - private readonly IMetadataQueries _metadataQueries; - - private int _disposeCount; - private int _ensureOneThread; - private Topic _topic; + protected readonly ConsumerOptions _options; + protected readonly BlockingCollection _fetchResponseQueue; + protected readonly CancellationTokenSource _disposeToken = new CancellationTokenSource(); + protected readonly ConcurrentDictionary _partitionPollingIndex = new ConcurrentDictionary(); + protected readonly ConcurrentDictionary _partitionOffsetIndex = new ConcurrentDictionary(); + protected readonly IScheduledTimer _topicPartitionQueryTimer; + protected readonly IMetadataQueries _metadataQueries; + + protected int _disposeCount; + protected int _ensureOneThread; + protected Topic _topic; public Consumer(ConsumerOptions options, params OffsetPosition[] positions) { @@ -87,7 +87,7 @@ public List GetOffsetPosition() return _partitionOffsetIndex.Select(x => new OffsetPosition { PartitionId = x.Key, Offset = x.Value }).ToList(); } - private void RefreshTopicPartitions() + protected void RefreshTopicPartitions() { try { @@ -164,13 +164,11 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId) foreach (var message in response.Messages) { _fetchResponseQueue.Add(message, _disposeToken.Token); - if (_disposeToken.IsCancellationRequested) return; } var nextOffset = response.Messages.Max(x => x.Meta.Offset) + 1; _partitionOffsetIndex.AddOrUpdate(partitionId, i => nextOffset, (i, l) => nextOffset); - // sleep is not needed if responses were received continue; } diff --git a/src/kafka-net/HLConsumer.cs b/src/kafka-net/HLConsumer.cs index cbe8d371..3966b8fa 100644 --- a/src/kafka-net/HLConsumer.cs +++ b/src/kafka-net/HLConsumer.cs @@ -13,6 +13,7 @@ using KafkaNet.Model; using KafkaNet.Protocol; +using KafkaNet.Common; using ZooKeeperNet; @@ -44,7 +45,13 @@ public HLConsumer(string topic, List brokerList, string zookps, TimeSpan _watcher = watcher; } - public IEnumerable consume(string groupID){ + /// + /// consume topic specified in the constructor with groupID here. Only consume n messages. + /// + /// + /// + /// + public IEnumerable consume(string groupID, int n){ var p = "/consumers/"+groupID+"/offsets/"+this._topic; try { if(_zookeeper.Exists(p , _watcher) ==null){ @@ -80,9 +87,14 @@ public IEnumerable consume(string groupID){ //TODO: Log the error, or handle it? } - return _consumer.Consume(); + return _consumer.Consume().Take(n); } +// public bool CommitOffset(string path, int pid, long offset){ +// path = path + "/" + pid.ToString(); +// _zookeeper.SetData(path, offset.ToBytes(), -1); +// } + /// /// create zookeeper path hierarchically. /// diff --git a/src/kafka-net/Interfaces/IMetadataQueries.cs b/src/kafka-net/Interfaces/IMetadataQueries.cs index c9443915..61b85d21 100644 --- a/src/kafka-net/Interfaces/IMetadataQueries.cs +++ b/src/kafka-net/Interfaces/IMetadataQueries.cs @@ -7,7 +7,7 @@ namespace KafkaNet /// /// Contains common metadata query commands that are used by both a consumer and producer. /// - interface IMetadataQueries : IDisposable + public interface IMetadataQueries : IDisposable { /// /// Get metadata on the given topic. diff --git a/src/kafka-net/NativeHLConsumer.cs b/src/kafka-net/NativeHLConsumer.cs new file mode 100644 index 00000000..bfa35434 --- /dev/null +++ b/src/kafka-net/NativeHLConsumer.cs @@ -0,0 +1,128 @@ +/* + * Created by SharpDevelop. + * User: peng.zang + * Date: 11/11/2014 + * Time: 10:49 AM + * + * To change this template use Tools | Options | Coding | Edit Standard Headers. + */ +using System; +using System.Linq; +using System.Collections.Generic; + +using KafkaNet.Protocol; +using KafkaNet.Model; + +namespace KafkaNet +{ + /// + /// Description of NativeHLConsumer. Should only be used per + /// + public class NativeHLConsumer : Consumer + { + + protected string _consumerGroup; + + public NativeHLConsumer(ConsumerOptions options, string consumerGroup, params OffsetPosition[] positions) : base(options, positions){ + if(_topic == null || _topic.Name != _options.Topic) + _topic = _metadataQueries.GetTopic(_options.Topic); + _consumerGroup = consumerGroup; + RefreshOffsets(); + } + + public void RefreshOffsets(){ + _topic.Partitions.ForEach( + partition => { + var conn = _options.Router.SelectBrokerRoute(_topic.Name, partition.PartitionId); + conn.Connection + .SendAsync(CreateOffsetFetchRequest(_consumerGroup, partition.PartitionId)) + .Result.ForEach( + offsetResp => { + Console.WriteLine("fetch offset: " + offsetResp.ToString()); + _partitionOffsetIndex.AddOrUpdate(partition.PartitionId, i => offsetResp.Offset, (i, l) => offsetResp.Offset); + }); + }); + +// var actualOffsets = _metadataQueries.GetTopicOffsetAsync(_options.Topic).Result; +// var maxGroup = from o in actualOffsets +// group o by o.PartitionId into g +// select new {pid = g.Key, offset = g.Max(x => x.)}; + + } + + public IEnumerable Consume(int num){ +// RefreshOffsets(); + var result = base.Consume(null).Take(num).ToList(); +// var maxgroups = result.GroupBy(x => x.Meta.PartitionId).GroupJoin(; + var maxgroups = from r in result + group r by r.Meta.PartitionId into g + select new {pid = g.Key, offset = g.Max(m => m.Meta.Offset) + 1 }; + + maxgroups.ToList().ForEach(x => Console.WriteLine(x.pid + " : " + x.offset)); + + foreach (var pos in maxgroups) { +// Console.WriteLine("partition: " + pos.Meta.PartitionId + " , offset: "+ pos.Meta.Offset); + + if(!CommitOffset(pos.pid, pos.offset)){ + var mingroup = from r in result + group r by r.Meta.PartitionId into g + select new { pid = g.Key, offset = g.Min(m => m.Meta.Offset)}; + mingroup.ToList().ForEach(x => CommitOffset(x.pid, x.offset)); + return null; + } + } + return result; + } + + protected bool CommitOffset(int pid, long offset){ + Console.WriteLine("*** Committing partition: " + pid + ", offset: " + offset); + var conn = _options.Router.SelectBrokerRoute(_topic.Name, pid); + var resp = conn.Connection + .SendAsync(CreateOffsetCommitRequest(_consumerGroup, pid, offset)).Result.FirstOrDefault(); + if(resp != null && ((int)resp.Error)== (int)ErrorResponseCode.NoError) + return true; + else { + Console.WriteLine(resp.Error + " topic name: "+ _topic.Name); + return false; + } + } + + protected OffsetFetchRequest CreateOffsetFetchRequest(string consumerGroup, int partitionId) + { + var request = new OffsetFetchRequest + { + ConsumerGroup = consumerGroup, + Topics = new List + { + new OffsetFetch + { + PartitionId = partitionId, + Topic = _options.Topic + } + } + }; + + return request; + } + + protected OffsetCommitRequest CreateOffsetCommitRequest(string consumerGroup, int partitionId, long offset, string metadata = null) + { + var commit = new OffsetCommitRequest + { + ConsumerGroup = consumerGroup, + OffsetCommits = new List + { + new OffsetCommit + { + PartitionId = partitionId, + Topic = _topic.Name, + Offset = offset, + Metadata = metadata + } + } + }; + + return commit; + } + } +} diff --git a/src/kafka-net/Protocol/Message.cs b/src/kafka-net/Protocol/Message.cs index 7f5573d0..c9f624d3 100644 --- a/src/kafka-net/Protocol/Message.cs +++ b/src/kafka-net/Protocol/Message.cs @@ -207,5 +207,10 @@ public class MessageMetadata /// The partition id this offset is from. /// public int PartitionId { get; set; } + public override string ToString() + { + return string.Format("[MessageMetadata Offset={0}, PartitionId={1}]", Offset, PartitionId); + } + } } diff --git a/src/kafka-net/kafka-net.csproj b/src/kafka-net/kafka-net.csproj index 97f59e2d..05c8e7dd 100644 --- a/src/kafka-net/kafka-net.csproj +++ b/src/kafka-net/kafka-net.csproj @@ -66,6 +66,7 @@ + From fb9dfd534f70d4f8facf031e8ab0d1b8fb4cfaef Mon Sep 17 00:00:00 2001 From: Peng Zang Date: Thu, 13 Nov 2014 11:33:46 -0500 Subject: [PATCH 04/12] check offset for consumer group if it's in a correct range; if not, correct it by setting it to minimum offset. --- src/kafka-net/NativeHLConsumer.cs | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/src/kafka-net/NativeHLConsumer.cs b/src/kafka-net/NativeHLConsumer.cs index bfa35434..00f6214a 100644 --- a/src/kafka-net/NativeHLConsumer.cs +++ b/src/kafka-net/NativeHLConsumer.cs @@ -31,6 +31,9 @@ public NativeHLConsumer(ConsumerOptions options, string consumerGroup, params Of } public void RefreshOffsets(){ + var actualOffsets = _metadataQueries.GetTopicOffsetAsync(_options.Topic).Result; + var maxminGroups = actualOffsets.Select(x => new {pid = x.PartitionId, min = x.Offsets.Min(), max = x.Offsets.Max()}); + _topic.Partitions.ForEach( partition => { var conn = _options.Router.SelectBrokerRoute(_topic.Name, partition.PartitionId); @@ -39,14 +42,21 @@ public void RefreshOffsets(){ .Result.ForEach( offsetResp => { Console.WriteLine("fetch offset: " + offsetResp.ToString()); + + if(actualOffsets.Any(x => x.PartitionId==partition.PartitionId)){ + var actual = maxminGroups.First(x => x.pid==partition.PartitionId); + if(actual.min > offsetResp.Offset || actual.max < offsetResp.Offset){ + offsetResp.Offset = actual.min; + } + } _partitionOffsetIndex.AddOrUpdate(partition.PartitionId, i => offsetResp.Offset, (i, l) => offsetResp.Offset); }); }); -// var actualOffsets = _metadataQueries.GetTopicOffsetAsync(_options.Topic).Result; -// var maxGroup = from o in actualOffsets -// group o by o.PartitionId into g -// select new {pid = g.Key, offset = g.Max(x => x.)}; + +// group o by o.PartitionId into g +// select new {pid = g.Key, min = g.Min(x => x.Offsets), max = g.Max(x => x.Offsets) }; + } @@ -55,7 +65,7 @@ public IEnumerable Consume(int num){ var result = base.Consume(null).Take(num).ToList(); // var maxgroups = result.GroupBy(x => x.Meta.PartitionId).GroupJoin(; var maxgroups = from r in result - group r by r.Meta.PartitionId into g + group r by r.Meta.PartitionId into g select new {pid = g.Key, offset = g.Max(m => m.Meta.Offset) + 1 }; maxgroups.ToList().ForEach(x => Console.WriteLine(x.pid + " : " + x.offset)); @@ -64,8 +74,8 @@ group r by r.Meta.PartitionId into g // Console.WriteLine("partition: " + pos.Meta.PartitionId + " , offset: "+ pos.Meta.Offset); if(!CommitOffset(pos.pid, pos.offset)){ - var mingroup = from r in result - group r by r.Meta.PartitionId into g + var mingroup = from r in result + group r by r.Meta.PartitionId into g select new { pid = g.Key, offset = g.Min(m => m.Meta.Offset)}; mingroup.ToList().ForEach(x => CommitOffset(x.pid, x.offset)); return null; From acce75277951538f81e7c95170003effaec5aae6 Mon Sep 17 00:00:00 2001 From: Peng Zang Date: Thu, 13 Nov 2014 13:52:54 -0500 Subject: [PATCH 05/12] refactoring --- src/kafka-net/NativeHLConsumer.cs | 4 ---- .../{HLConsumer.cs => ZookeeperHLConsumer.cs} | 13 ++++--------- src/kafka-net/kafka-net.csproj | 5 ++++- 3 files changed, 8 insertions(+), 14 deletions(-) rename src/kafka-net/{HLConsumer.cs => ZookeeperHLConsumer.cs} (90%) diff --git a/src/kafka-net/NativeHLConsumer.cs b/src/kafka-net/NativeHLConsumer.cs index 00f6214a..5e095c96 100644 --- a/src/kafka-net/NativeHLConsumer.cs +++ b/src/kafka-net/NativeHLConsumer.cs @@ -53,10 +53,6 @@ public void RefreshOffsets(){ }); }); - -// group o by o.PartitionId into g -// select new {pid = g.Key, min = g.Min(x => x.Offsets), max = g.Max(x => x.Offsets) }; - } diff --git a/src/kafka-net/HLConsumer.cs b/src/kafka-net/ZookeeperHLConsumer.cs similarity index 90% rename from src/kafka-net/HLConsumer.cs rename to src/kafka-net/ZookeeperHLConsumer.cs index 3966b8fa..988ed960 100644 --- a/src/kafka-net/HLConsumer.cs +++ b/src/kafka-net/ZookeeperHLConsumer.cs @@ -22,7 +22,7 @@ namespace KafkaNet /// /// High level consumer using zookeeper for coordination. /// - public class HLConsumer + public class ZookeeperHLConsumer { KafkaOptions _options ; BrokerRouter _router ; @@ -31,7 +31,7 @@ public class HLConsumer string _topic; IWatcher _watcher; - public HLConsumer(string topic, List brokerList, string zookps, TimeSpan? timeout = null, IWatcher watcher = null) + public ZookeeperHLConsumer(string topic, List brokerList, string zookps, TimeSpan? timeout = null, IWatcher watcher = null) { _options = new KafkaOptions(); _options.KafkaServerUri = brokerList.ConvertAll(x => new Uri(x)); @@ -51,7 +51,7 @@ public HLConsumer(string topic, List brokerList, string zookps, TimeSpan /// /// /// - public IEnumerable consume(string groupID, int n){ + public IEnumerable Consume(string groupID, int n){ var p = "/consumers/"+groupID+"/offsets/"+this._topic; try { if(_zookeeper.Exists(p , _watcher) ==null){ @@ -89,12 +89,7 @@ public IEnumerable consume(string groupID, int n){ return _consumer.Consume().Take(n); } - -// public bool CommitOffset(string path, int pid, long offset){ -// path = path + "/" + pid.ToString(); -// _zookeeper.SetData(path, offset.ToBytes(), -1); -// } - + /// /// create zookeeper path hierarchically. /// diff --git a/src/kafka-net/kafka-net.csproj b/src/kafka-net/kafka-net.csproj index 05c8e7dd..410c94bd 100644 --- a/src/kafka-net/kafka-net.csproj +++ b/src/kafka-net/kafka-net.csproj @@ -12,6 +12,8 @@ v4.5 512 + ..\ + true true @@ -50,7 +52,7 @@ - + @@ -100,6 +102,7 @@ +