如何兼顾性能+实时性处理缓冲数据?
(资料图)
我们经常会遇到这样的数据处理应用场景:我们利用一个组件实时收集外部交付给它的数据,并由它转发给一个外部处理程序进行处理。考虑到性能,它会将数据存储在本地缓冲区,等累积到指定的数量后打包发送;考虑到实时性,数据不能在缓冲区存太长的时间,必须设置一个延时时间,一旦超过这个时间,缓冲的数据必须立即发出去。看似简单的需求,如果需要综合考虑性能、线程安全、内存分配,要实现起来还真有点麻烦,本文提供一种简单的实现方式。
一、实例演示一、实例演示二、待处理的批量数据:Batch
三、感知数据处理的时机:BatchChangeToken 四、接收、缓冲、打包和处理数据:Batcher
我们先来看看最终达成的效果。在如下这段代码中,我们使用一个Batcher
var batcher = new Batcher( processor:Process, batchSize:10, interval: TimeSpan.FromSeconds(5));var random = new Random();while (true){ var count = random.Next(1, 4); for (var i = 0; i < count; i++) { batcher.Add(Guid.NewGuid().ToString()); } await Task.Delay(1000);}static void Process(Batch batch)=> Console.WriteLine($"[{DateTimeOffset.Now}]{batch.Count} items are delivered.");
如上面的代码片段所示,在一个循环中,我们每隔1秒钟随机添加1-3个数据项。从下图中可以看出,Process方法的调用具有两种触发条件,一是累积的数据量达到设置的阈值10,另一个则是当前时间与上一次处理时间间隔超过5秒。
二、待处理的批量数据:Batch除了上面实例涉及的Batcher
public sealed class Batch三、感知数据处理的时机:BatchChangeToken: IEnumerable , IDisposable where T : class{ private bool _isDisposed; private int? _count; private readonly T[] _data; private static readonly ArrayPool _pool = ArrayPool .Create(); public int Count { get { if (_isDisposed) throw new ObjectDisposedException(nameof(Batch )); if(_count.HasValue) return _count.Value; var count = 0; for (int index = 0; index < _data.Length; index++) { if (_data[index] is null) { break; } count++; } return (_count = count).Value; } } public Batch(T[] data) => _data = data ?? throw new ArgumentNullException(nameof(data)); public void Dispose() { _pool.Return(_data, clearArray: true); _isDisposed = true; } public IEnumerator GetEnumerator() => new Enumerator(this); IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); public static T[] CreatePooledArray(int batchSize) => _pool.Rent(batchSize); private void EnsureNotDisposed() { if (_isDisposed) throw new ObjectDisposedException(nameof(Batch )); } private sealed class Enumerator : IEnumerator { private readonly Batch _batch; private readonly T[] _data; private int _index = -1; public Enumerator(Batch batch) { _batch = batch; _data = batch._data; } public T Current { get { _batch.EnsureNotDisposed(); return _data[_index]; } } object IEnumerator.Current => Current; public void Dispose() { } public bool MoveNext() { _batch.EnsureNotDisposed(); return ++_index < _data.Length && _data[_index] is not null; } public void Reset() { _batch.EnsureNotDisposed(); _index = -1; } }}
Batcher具有两个触发数据处理的设置:缓冲的数据量和两次数据处理之间的最长间隔。当累积的数据量或者当前时间与上一次处理的间隔达到阈值,缓冲的数据将自动被处理。.NET Core经常利用一个IChangeToken作为通知的令牌,为此我们定义了如下这个实现了该接口的BatchChangeToken类型。如下面的代码片段所示,上述两个触发条件体现在两个CancellationToken对象上,我们利用它们创建了对应的CancellationChangeToken对象,最后利用这两个CancellationChangeToken创建了一个CompositeChangeToken对象。这个CompositeChangeToken对象最终被用来实现了IChangeToken接口的三个成员。
internal sealed class BatchChangeToken : IChangeToken{ private readonly IChangeToken _innerToken; private readonly int _countThreshold; private readonly CancellationTokenSource _expirationTokenSource; private readonly CancellationTokenSource _countTokenSource; private int _counter; public BatchChangeToken(int countThreshold, TimeSpan timeThreshold) { _countThreshold = countThreshold; _countTokenSource = new CancellationTokenSource(); _expirationTokenSource = new CancellationTokenSource(timeThreshold); var countToken = new CancellationChangeToken(_countTokenSource.Token); var expirationToken = new CancellationChangeToken(_expirationTokenSource.Token); _innerToken = new CompositeChangeToken(new IChangeToken[] { countToken, expirationToken }); } public bool HasChanged => _innerToken.HasChanged; public bool ActiveChangeCallbacks => _innerToken.ActiveChangeCallbacks; public IDisposable RegisterChangeCallback(Action
上述两个CancellationToken来源于对应的CancellationTokenSource,对应的字段为_countTokenSource和_expirationTokenSource。_expirationTokenSource根据设置的数据处理时间间隔创建而成。为了确定缓冲的数据量,我们提供了一个计数器,并利用Increase方法进行计数。在超过设置的数据量时,该方法会调用_expirationTokenSource的Cancel方法。在实现的ActiveChangeCallbacks方法种,我们将针对这两个CancellationTokenSource的释放放在注册的回调中。
四、接收、缓冲、打包和处理数据:Batcher最终用于打包的Batcher类型定义如下。在构造函数中,我们除了提供上述两个阈值外,还提供了一个Action
public sealed class Batcher: IDisposable where T : class{ private readonly Action > _processor; private T[] _data; private BatchChangeToken _changeToken = default!; private readonly int _batchSize; private int _index = -1; private readonly IDisposable _scheduler; public Batcher(Action > processor, int batchSize, TimeSpan interval) { _processor = processor ?? throw new ArgumentNullException(nameof(processor)); _batchSize = batchSize; _data = Batch .CreatePooledArray(batchSize); _scheduler = ChangeToken.OnChange(() => _changeToken = new BatchChangeToken(_batchSize, interval), OnChange); void OnChange() { var data = Interlocked.Exchange(ref _data, Batch .CreatePooledArray(batchSize)); if (data[0] is not null) { Interlocked.Exchange(ref _index, -1); _ = Task.Run(() => _processor.Invoke(new Batch (data))); } } } public void Add(T item) { if (item is null) throw new ArgumentNullException(nameof(item)); var index = Interlocked.Increment(ref _index); if (index >= _batchSize) { SpinWait.SpinUntil(() => _index < _batchSize - 1); Add(item); } _data[index] = item; _changeToken.Increase(); } public void Dispose() => _scheduler.Dispose();}
在构造函数中,我们调用了ChangeToken的静态方法OnChange将数据处理操作绑定到创建的BatchChangeToken对象上,并确保每次发送“数据处理”后将重新创建的BatchChangeToken对象赋值到_changeToken字段上,因为Add放到需要调用它的Increase增加计数。当接收到数据处理通知后,我们会调用Batch