C# Client Library
A C# Client Library for the AnalyzeRe REST API
Loading...
Searching...
No Matches
ProducerConsumerBuffer.cs
Go to the documentation of this file.
1using System;
2using System.Collections.Concurrent;
3using System.Diagnostics;
4using System.Linq;
5using System.Threading;
6using System.Threading.Tasks;
7
9
11{
15 public abstract class ProducerConsumerBuffer<TProduced> : IProducerConsumerBuffer<TProduced>
16 {
17 #region Public Properties
19 protected BlockingCollection<TProduced> Produced { get; }
20
23 protected bool IsConsumerWaiting { get; private set; }
24
26 public bool IsRunning => _runningThread != null;
27
34 [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design",
35 "CA1065:DoNotRaiseExceptionsInUnexpectedLocations")]
36 public bool CanTake
37 {
38 get
39 {
40 if (_producerLoopException != null)
41 throw new BufferProducerException(_producerLoopException);
42 // ReSharper disable once InvertIf (order important to avoid race condition)
43 if (Produced.IsAddingCompleted)
44 if (!Produced.Any())
45 return false;
46 return true;
47 }
48 }
49 #endregion Public Properties
50
51 #region Internal Properties
53 protected CancellationTokenSource Cancellation { get; }
54
56 private Task _runningThread;
57
59 private bool _isDisposed = false;
60
61 private Exception _producerLoopException = null;
62 #endregion Internal Properties
63
64 #region Constructor and Destructor
69 protected ProducerConsumerBuffer(int max_queue_size)
70 {
71 Produced = new BlockingCollection<TProduced>(max_queue_size);
72 Cancellation = new CancellationTokenSource();
73 }
74
76 public void Dispose()
77 {
78 Dispose(true);
79 GC.SuppressFinalize(this);
80 }
81
84 {
85 Dispose(false);
86 }
87
91 protected virtual void Dispose(bool disposing)
92 {
93 if (!disposing) return;
94 if (_isDisposed) return;
95 Cancel();
96 OnDispose();
97 Produced.Dispose();
98 _isDisposed = true;
99 }
100
102 protected virtual void OnDispose() { }
103 #endregion Constructor and Destructor
104
105 #region Public Methods
110 public void Start(CancellationToken? cancellationToken = null)
111 {
112 string logPrefix = !Utilities.EnableDebugLogging ? null :
113 $"{GetType().NiceTypeName()} on thread {Thread.CurrentThread.ManagedThreadId} - ";
115 Debug.WriteLine($"{logPrefix} Starting...");
116
117 cancellationToken?.Register(Cancel);
118 if (_runningThread != null)
119 throw new InvalidOperationException("Producer is already running.");
120 OnStart();
121 _runningThread = new Task(AsyncProducer);
122 _runningThread.Start(TaskScheduler.Default);
123
125 Debug.WriteLine($"{logPrefix} Initialized and new thread started.");
126 }
127
138 public TProduced TryTake(out bool success, CancellationToken? cancellation)
139 {
140 string logPrefix = !Utilities.EnableDebugLogging ? null :
141 $"{GetType().NiceTypeName()} on thread {Thread.CurrentThread.ManagedThreadId} - ";
142
143 try
144 {
145 // Start the producer thread if it's not already running.
146 if (!IsRunning)
147 throw new InvalidOperationException("Producer is not running. You must invoke " +
148 "the Start() method to spawn the producer before blocking for items.");
149 IsConsumerWaiting = true;
150 if (_producerLoopException != null)
151 throw new BufferProducerException(_producerLoopException);
152
154 Debug.WriteLine($"{logPrefix}TryTake running with {Produced.Count} buffers in queue.");
155
156 success = !cancellation.HasValue ? Produced.TryTake(out TProduced item, -1) :
157 Produced.TryTake(out item, -1, cancellation.Value);
158 if (_producerLoopException != null)
159 throw new BufferProducerException(_producerLoopException);
160
162 Debug.WriteLine($"{logPrefix}TryTake Completed. success={success}. " +
163 $"{Produced.Count} buffers remain in queue.");
164 return item;
165 }
166 finally { IsConsumerWaiting = false; }
167 }
168
172 public virtual void Cancel()
173 {
174 Cancellation.Cancel();
175 _runningThread?.Wait();
176 _runningThread = null;
177 }
178 #endregion Public Methods
179
180 #region Private Methods
181 private void AsyncProducer()
182 {
183 string logPrefix = !Utilities.EnableDebugLogging ? null :
184 $"{GetType().NiceTypeName()} on thread {Thread.CurrentThread.ManagedThreadId} - ";
185 try
186 {
187 while (!Cancellation.Token.IsCancellationRequested && !IsProducerFinished())
188 {
190 Debug.WriteLine($"{logPrefix}Try Produce Next has queued up {Produced.Count} buffers.");
191 if (TryProduceNext(out TProduced next))
192 {
194 Debug.WriteLine(logPrefix + "Produced (size: " +
195 (next is BufferedBytes ? ((BufferedBytes)(object)next).LengthFilled + " of " +
196 ((BufferedBytes)(object)next).Bytes.Length : next.GetType().NiceTypeName()) + ")");
197 Produced.Add(next, Cancellation.Token);
198 }
199 else if (Utilities.EnableDebugLogging)
200 Debug.WriteLine($"{logPrefix}Try Produce Next returned nothing new.");
201 }
202 }
203 catch (OperationCanceledException) when (Cancellation.Token.IsCancellationRequested)
204 {
205 if (Utilities.EnableDebugLogging)
206 Debug.WriteLine($"{logPrefix}Caught cancellation.");
207 }
208 catch (Exception ex)
209 {
210 if (Utilities.EnableDebugLogging)
211 Debug.WriteLine($"{logPrefix}Caught unhandled exception:\n{ex}");
212 _producerLoopException = ex;
213 }
214 finally
215 {
216 Produced.CompleteAdding();
217 if (Utilities.EnableDebugLogging)
218 Debug.WriteLine($"{logPrefix}Completed Adding");
219 }
220 }
221 #endregion Private Methods
222
223 #region Abstract and Virtual Methods
225 protected virtual void OnStart() { }
226
229 protected abstract bool IsProducerFinished();
230
234 protected abstract bool TryProduceNext(out TProduced next);
235 #endregion Abstract and Virtual Methods
236 }
237}
Indicates that an exception occurred inside of a buffered producer thread.
Base class that consumes from a source asynchronously and produces a queue of some other resource to ...
void Dispose()
Cancel the producer thread and dispose of the queue.
TProduced TryTake(out bool success, CancellationToken? cancellation)
Attempts to remove an item from the produced list. May block if there is not yet an item ready,...
CancellationTokenSource Cancellation
Can be used to cancel the buffering process.
bool IsProducerFinished()
Implementer indicates whether it can produce more items.
ProducerConsumerBuffer(int max_queue_size)
Construct a new ProducerConsumer that will read from the specified source and produce a Queue of buff...
bool IsConsumerWaiting
Determines whether the consumer of this buffer is currently waiting for it to produce some data.
BlockingCollection< TProduced > Produced
The collection of objects produced.
bool IsRunning
Indicates whether the producer is currently running.
virtual void Dispose(bool disposing)
Cancel the producer thread and dispose of the queue.
virtual void Cancel()
Stop the producer from reading any more bytes. Warning: This will leave the stream in its current pos...
bool TryProduceNext(out TProduced next)
The method which asynchronously produces items.
void Start(CancellationToken? cancellationToken=null)
Start the asynchronous producer.
virtual void OnStart()
Invoked before the asynchronous producer thread is started.
virtual void OnDispose()
Deriving classes' dispose actions.
bool CanTake
Indicates whether a call to Take will ever return another item.
Large Data Upload Utilities.
Definition Utilities.cs:11
static bool EnableDebugLogging
When in debug mode, allows detailed debug logging of the data upload process.
Definition Utilities.cs:19
Interface for a class that consumes from a source asynchronously and produces a queue of some other r...
The structure containing an array of bytes and integer indicating the number of bytes in the array th...