2using System.Collections.Concurrent;
3using System.Diagnostics;
6using System.Threading.Tasks;
17 #region Public Properties
19 protected BlockingCollection<TProduced>
Produced {
get; }
34 [System.Diagnostics.CodeAnalysis.SuppressMessage(
"Microsoft.Design",
35 "CA1065:DoNotRaiseExceptionsInUnexpectedLocations")]
40 if (_producerLoopException !=
null)
49 #endregion Public Properties
51 #region Internal Properties
56 private Task _runningThread;
59 private bool _isDisposed =
false;
61 private Exception _producerLoopException =
null;
62 #endregion Internal Properties
64 #region Constructor and Destructor
71 Produced =
new BlockingCollection<TProduced>(max_queue_size);
79 GC.SuppressFinalize(
this);
91 protected virtual void Dispose(
bool disposing)
93 if (!disposing)
return;
94 if (_isDisposed)
return;
103 #endregion Constructor and Destructor
105 #region Public Methods
110 public void Start(CancellationToken? cancellationToken =
null)
112 string logPrefix = !Utilities.EnableDebugLogging ? null :
113 $
"{GetType().NiceTypeName()} on thread {Thread.CurrentThread.ManagedThreadId} - ";
115 Debug.WriteLine($
"{logPrefix} Starting...");
117 cancellationToken?.Register(
Cancel);
118 if (_runningThread !=
null)
119 throw new InvalidOperationException(
"Producer is already running.");
121 _runningThread =
new Task(AsyncProducer);
122 _runningThread.Start(TaskScheduler.Default);
125 Debug.WriteLine($
"{logPrefix} Initialized and new thread started.");
138 public TProduced
TryTake(out
bool success, CancellationToken? cancellation)
140 string logPrefix = !Utilities.EnableDebugLogging ? null :
141 $
"{GetType().NiceTypeName()} on thread {Thread.CurrentThread.ManagedThreadId} - ";
147 throw new InvalidOperationException(
"Producer is not running. You must invoke " +
148 "the Start() method to spawn the producer before blocking for items.");
150 if (_producerLoopException !=
null)
154 Debug.WriteLine($
"{logPrefix}TryTake running with {Produced.Count} buffers in queue.");
156 success = !cancellation.HasValue ?
Produced.TryTake(out TProduced item, -1) :
157 Produced.TryTake(out item, -1, cancellation.Value);
158 if (_producerLoopException !=
null)
162 Debug.WriteLine($
"{logPrefix}TryTake Completed. success={success}. " +
163 $
"{Produced.Count} buffers remain in queue.");
175 _runningThread?.Wait();
176 _runningThread =
null;
178 #endregion Public Methods
180 #region Private Methods
181 private void AsyncProducer()
183 string logPrefix = !Utilities.EnableDebugLogging ? null :
184 $
"{GetType().NiceTypeName()} on thread {Thread.CurrentThread.ManagedThreadId} - ";
190 Debug.WriteLine($
"{logPrefix}Try Produce Next has queued up {Produced.Count} buffers.");
194 Debug.WriteLine(logPrefix +
"Produced (size: " +
196 ((
BufferedBytes)(
object)next).Bytes.Length : next.GetType().NiceTypeName()) +
")");
199 else if (Utilities.EnableDebugLogging)
200 Debug.WriteLine($
"{logPrefix}Try Produce Next returned nothing new.");
203 catch (OperationCanceledException) when (
Cancellation.Token.IsCancellationRequested)
205 if (Utilities.EnableDebugLogging)
206 Debug.WriteLine($
"{logPrefix}Caught cancellation.");
210 if (Utilities.EnableDebugLogging)
211 Debug.WriteLine($
"{logPrefix}Caught unhandled exception:\n{ex}");
212 _producerLoopException = ex;
217 if (Utilities.EnableDebugLogging)
218 Debug.WriteLine($
"{logPrefix}Completed Adding");
221 #endregion Private Methods
223 #region Abstract and Virtual Methods
235 #endregion Abstract and Virtual Methods
Indicates that an exception occurred inside of a buffered producer thread.
Base class that consumes from a source asynchronously and produces a queue of some other resource to ...
void Dispose()
Cancel the producer thread and dispose of the queue.
TProduced TryTake(out bool success, CancellationToken? cancellation)
Attempts to remove an item from the produced list. May block if there is not yet an item ready,...
CancellationTokenSource Cancellation
Can be used to cancel the buffering process.
bool IsProducerFinished()
Implementer indicates whether it can produce more items.
ProducerConsumerBuffer(int max_queue_size)
Construct a new ProducerConsumer that will read from the specified source and produce a Queue of buff...
bool IsConsumerWaiting
Determines whether the consumer of this buffer is currently waiting for it to produce some data.
BlockingCollection< TProduced > Produced
The collection of objects produced.
bool IsRunning
Indicates whether the producer is currently running.
virtual void Dispose(bool disposing)
Cancel the producer thread and dispose of the queue.
virtual void Cancel()
Stop the producer from reading any more bytes. Warning: This will leave the stream in its current pos...
bool TryProduceNext(out TProduced next)
The method which asynchronously produces items.
void Start(CancellationToken? cancellationToken=null)
Start the asynchronous producer.
virtual void OnStart()
Invoked before the asynchronous producer thread is started.
virtual void OnDispose()
Deriving classes' dispose actions.
bool CanTake
Indicates whether a call to Take will ever return another item.
Large Data Upload Utilities.
static bool EnableDebugLogging
When in debug mode, allows detailed debug logging of the data upload process.
Interface for a class that consumes from a source asynchronously and produces a queue of some other r...
The structure containing an array of bytes and integer indicating the number of bytes in the array th...