C# Client Library
A C# Client Library for the AnalyzeRe REST API
Loading...
Searching...
No Matches
GZipBufferedBytesProducer.cs
Go to the documentation of this file.
1using System;
2using System.Diagnostics;
3using System.IO;
4using System.IO.Compression;
5
7{
10 {
11 #region Private Properties
13 private readonly IProducerConsumerBuffer<BufferedBytes> _source;
14
16 private GZipStream _compressor;
17
19 private readonly MemoryStream _gzipOutput;
20
23 private readonly CompressionLevel _compressionLevel;
24
28 private readonly CompressionStrategy _compressionStrategy;
29 #endregion Private Properties
30
31 #region Public Properties
33 public int TotalBytesIn { get; private set; }
34
36 public int TotalBytesOut { get; private set; }
37
49
50#if NET40
52 public enum CompressionLevel
53 {
55 Fastest
56 }
57#endif
58 #endregion Public Properties
59
80 int? min_buffer_length = null, int? max_buffer_length = null,
81 CompressionStrategy compressionStrategy = CompressionStrategy.CompressEachChunk,
82 CompressionLevel compressionLevel = CompressionLevel.Fastest)
83 : base(min_buffer_length, max_buffer_length)
84 {
85 _source = source;
86 _gzipOutput = new MemoryStream(MaximumBufferLength);
87 _compressionStrategy = compressionStrategy;
88 _compressionLevel = compressionLevel;
89 if (_compressionStrategy == CompressionStrategy.CompressEntireFile)
90 _compressor = GetCompressorForFramework();
91 }
92
95 private GZipStream GetCompressorForFramework()
96 {
97#if NET40
98 return new GZipStream(_gzipOutput, CompressionMode.Compress, true);
99#else
100 return new GZipStream(_gzipOutput, _compressionLevel, true);
101#endif
102 }
103
105 protected override void OnDispose()
106 {
107 try { _gzipOutput.Dispose(); }
108 catch (ObjectDisposedException) { }
109 try { _compressor?.Dispose(); }
110 catch (ObjectDisposedException) { }
111 }
112
114 protected override void OnStart()
115 {
116 base.OnStart();
117 // Start the source producer if it hasn't already started.
118 if (!_source.IsRunning)
119 _source.Start(Cancellation.Token);
120 }
121
124 protected override bool IsProducerFinished() => !_source.CanTake;
125
130 protected override bool TryProduceNext(out BufferedBytes next, int buffer_size)
131 {
132 next = new BufferedBytes();
133 if (!_source.CanTake)
134 return false;
135 // Used to determine how many bytes were compressed to produce the next buffer
136 int previousTotalBytesIn = TotalBytesIn;
137 // Set our MemoryStream buffer back to position 0 for re-use between BufferedBytes produced.
138 _gzipOutput.Position = 0;
139 if (_compressionStrategy == CompressionStrategy.CompressEachChunk)
140 _compressor = GetCompressorForFramework();
141
142 do
143 {
144 BufferedBytes buffer = _source.TryTake(out bool success, Cancellation.Token);
145 if (success)
146 {
147 _compressor.Write(buffer.Bytes, 0, buffer.LengthFilled);
148 TotalBytesIn += buffer.LengthFilled;
150 Debug.WriteLine($"GZip got {buffer.LengthFilled} bytes to compress.");
151 }
152 // Instruct gzip to flush to the output stream so we can see how many bytes have been produced
153 _compressor.Flush();
154 // Continue to consume buffered bytes and write to the gzip stream until the
155 // gzip stream writes the minimum number of compressed bytes to the output stream.
156 } while (_source.CanTake && _gzipOutput.Position < buffer_size);
157
158 if (Utilities.EnableDebugLogging && !_source.CanTake)
159 Debug.WriteLine("GZip got EOF from its source stream.");
160
161 if (!_source.CanTake || _compressionStrategy == CompressionStrategy.CompressEachChunk)
162 {
163 _compressor.Dispose();
164 _compressor = null;
165 }
166
167 // Determine how many compressed bytes overwrote the buffer by its current position
168 int compressed_length = (int)_gzipOutput.Position;
169 if (compressed_length == 0)
170 {
171 if (TotalBytesIn - previousTotalBytesIn > 0)
172 throw new EndOfStreamException("GZip compressor produced no compressed bytes " +
173 $"despite {TotalBytesIn - previousTotalBytesIn} bytes being written to the compressor!");
174 return false;
175 }
176
177 // Copy the new compressed data to the next BufferedBytes output
178 next.Bytes = new byte[compressed_length];
179 _gzipOutput.Position = 0;
180 next.LengthFilled += _gzipOutput.Read(next.Bytes, 0, compressed_length);
181 next.TotalBytesRead = TotalBytesIn;
182 if (next.LengthFilled != compressed_length)
183 throw new EndOfStreamException("The GZip Output MemoryStream did not return all compressed data!");
184
186 Debug.WriteLine($"GZip produced {next.LengthFilled} bytes of compressed " +
187 $"data from {TotalBytesIn - previousTotalBytesIn} read bytes.");
188 TotalBytesOut += next.LengthFilled;
189 return true;
190 }
191 }
192}
Produces BufferedByte objects of a variable size depending on whether the consumer of this buffer is ...
int MaximumBufferLength
The max length of the each byte array to generate. (Default 32 MiB)
CompressionStrategy
Indicates whether the resulting chunks of compressed data should be pieces of a zip file,...
@ CompressEachChunk
Each output chunk will be its own zip file. Required by protocols that decompress each chunk as you u...
@ CompressEntireFile
All output chunks will be part of the same zip file. Required by protocols that only decompress the e...
override bool TryProduceNext(out BufferedBytes next, int buffer_size)
Compress the next BufferedBytes.
GZipBufferedBytesProducer(IProducerConsumerBuffer< BufferedBytes > source, int? min_buffer_length=null, int? max_buffer_length=null, CompressionStrategy compressionStrategy=CompressionStrategy.CompressEachChunk, CompressionLevel compressionLevel=CompressionLevel.Fastest)
Construct a producer of strings from buffered bytes.
override void OnStart()
Invoked before the producer thread is started.
int TotalBytesIn
Returns the total number of input bytes processed by this compressor.
int TotalBytesOut
The total number of output bytes produced after compression.
override bool IsProducerFinished()
Indicates whether it can produce more items.
CancellationTokenSource Cancellation
Can be used to cancel the buffering process.
Large Data Upload Utilities.
Definition Utilities.cs:11
static bool EnableDebugLogging
When in debug mode, allows detailed debug logging of the data upload process.
Definition Utilities.cs:19
Interface for a class that consumes from a source asynchronously and produces a queue of some other r...
byte[] Bytes
The buffer of bytes filled.
int LengthFilled
The actual length of bytes, which may be shorter.
The structure containing an array of bytes and integer indicating the number of bytes in the array th...