C# Client Library
A C# Client Library for the AnalyzeRe REST API
Loading...
Searching...
No Matches
Test_LargeDataUpload.cs
Go to the documentation of this file.
1using System;
2using System.Collections.Concurrent;
3using System.Collections.Generic;
4using System.Diagnostics;
5using System.Globalization;
6using System.IO;
7using System.Linq;
8using System.Reflection;
9using System.Text;
10using System.Threading;
11using System.Net;
12
13using AnalyzeRe;
19using AnalyzeRe.Rest;
20
21#if MSTEST
22using Microsoft.VisualStudio.TestTools.UnitTesting;
23#elif NUNIT
24using NUnit.Framework;
25using TestClass = NUnit.Framework.TestFixtureAttribute;
26using TestMethod = NUnit.Framework.TestAttribute;
27using TestCategory = NUnit.Framework.CategoryAttribute;
28#endif
29
31{
32 [TestClass]
34 {
35 private const string TypeName = "LargeDataUpload";
36
37 #region String Test Properties
40
44 #endregion String Test Properties
45
46 #region File Test Properties
48 private const string RootSamplesCsvPath = @"..\..\Samples\csv\";
49
51 private const string SampleCatalogEndPath = RootSamplesCsvPath + @"event_catalog_data.csv";
53
55 private static readonly string SampleCatalogFileUtf8;
56 #endregion File Test Properties
57
59 {
60 try
61 {
62 SampleCatalogFileUtf8 = Encoding.UTF8.GetString(File.ReadAllBytes(SampleCatalogFilePath));
63 }
64 catch (Exception ex)
65 {
66 Console.WriteLine("The Test_LargeDataUpload.SampleCatalogFilePath references a " +
67 "location that does not exist in the current context. Tests that rely on " +
68 "this resource will not be able to run. (" + ex.Message + ")");
69 }
70 }
71
72 #region Helper Methods
74 private static string GetTestFilePath(string samplesPath)
75 {
76 char separator = Path.DirectorySeparatorChar;
77 // Linux has '/' as a directory separator, Windows '\'
78 samplesPath = samplesPath.Replace('\\', separator);
79 string basePath = Path.GetDirectoryName(Assembly.GetExecutingAssembly().CodeBase);
80 basePath = basePath != null ? Path.Combine(basePath, samplesPath) : samplesPath;
81 // in Linux directory name has "file:" in the beginning, which cannot be parsed by Uri method
82#if NUNIT
83 return new Uri(basePath.Replace("#", "%23").Replace("file:","")).LocalPath;
84#elif MSTEST
85 return new Uri(basePath.Replace("#", "%23")).LocalPath;
86#endif
87 }
88
94
98 {
99 string retrieved = null;
100 AssertApi.MethodIsAllowed(() => retrieved = dataEndpoint.Get(), "Get Data");
101 Assert.AreEqual(original, retrieved, "Downloaded data did not match uploaded data." +
102 $"\nOriginal Contents:\n{original}\nRetrieved Contents:\n{retrieved}");
103 Debug.WriteLine($"Asserted that the retrieved file contents matched the original:\n{retrieved}");
104 }
105
108 string data, int patchBytes = 0)
109 {
110 dataEndpoint.Post(data.Length);
111 if (patchBytes > 0)
112 dataEndpoint.Patch(data.Substring(0, patchBytes), 0);
113 }
114 #endregion Helper Methods
115
116 #region Simple String Tests
117 // Test uploading a small string as a single segment.
118 [TestMethod, TestCategory(TypeName)]
120 {
122 {
123 data.LargeStreamUpload(SampleCatalogDataStream);
125 });
126 }
127
128 // Test uploading a small string in multiple segments.
129 [TestMethod, TestCategory(TypeName)]
131 {
133 {
134 data.LargeStreamUpload(SampleCatalogDataStream,
135 new Parameters(max_chunk_size: 50, enable_compression: false));
137 });
138 }
139
140 // Test uploading a small string in multiple segments with compression
141 [TestMethod, TestCategory(TypeName)]
143 {
145 {
146 string expectedContents = new StreamReader(SampleCatalogDataStream).ReadToEnd();
147 Debug.WriteLine($"Expected contents are:\n{expectedContents}" +
148 $"\nTotal Content Length: {expectedContents.Length}");
149 data.LargeStreamUpload(SampleCatalogDataStream,
150 new Parameters(max_chunk_size: 10, enable_compression: true));
152 });
153 }
154 #endregion Simple String Tests
155
156 #region Reference Existing Files
157 // Test setting the data to a reference to an existing file on POST
158 [TestMethod, TestCategory(TypeName)]
160 {
162 Assert.Inconclusive("RUN_OFFLINE = true");
165 newCatalog.data_file = other.data_file;
166 newCatalog = newCatalog.Post();
168 Assert.AreEqual(newCatalog.status, TaskStatus.Success);
169 AssertContentsMatch(other.data.Get(), newCatalog.data);
170 }
171
172 // Test setting the data to a reference to an existing file on PUT
173 [TestMethod, TestCategory(TypeName)]
175 {
176 Skip.Indefinitely("ARE-3694 - Setting data reference on PUT does not kick off validation.");
178 GenericTest.POST_ThenDoAction(Samples.EventCatalog.Unposted, posted =>
179 {
180 Assert.IsNull(posted.data_file);
181 posted.data_file = other.data_file;
182 posted = posted.Put();
183 posted = posted.PollUntilReady(Samples.DataPollingOptions);
184 Assert.AreEqual(posted.status, TaskStatus.Success);
185 AssertContentsMatch(other.data.Get(), posted.data);
186 });
187 }
188 #endregion Reference Existing Files
189
190 #region Simple File Tests
191 // Test uploading a small file as a single segment.
192 [TestMethod, TestCategory(TypeName)]
194 {
196 {
197 data.LargeFileUpload(SampleCatalogFilePath);
199 });
200 }
201
202 // Test uploading a small file as a single segment.
203 [TestMethod, TestCategory(TypeName)]
205 {
207 {
208 data.LargeFileUpload(SampleCatalogFilePath,
209 new Parameters(max_chunk_size: 50, enable_compression: false));
211 });
212 }
213
214 // Test the error when supplying a bad file path to the file upload
215 [TestMethod, TestCategory(TypeName)]
217 {
219 () => data.LargeFileUpload("bad path")));
220 }
221
222 // Test being able to open a file in read-only mode if something else is using it.
223 [TestMethod, TestCategory(TypeName)]
225 {
226 // Open the file we plan to test with elsewhere
228 FileMode.Open, FileAccess.ReadWrite, FileShare.Read))
229 {
230 // Try using the already opened file
232 {
233 data.LargeFileUpload(SampleCatalogFilePath);
235 });
236 }
237 }
238
239 // Test being able to open a file in read-only mode if something else is using it.
240 [TestMethod, TestCategory(TypeName)]
242 {
243 // Test this using a completely different file than usual, because
244 // this test might run concurrently with other tests and prevent them
245 // from opening the event catalog too.
246 string fakeCatalogEndPath = RootSamplesCsvPath + @"event_catalog_data_fake.csv";
248
249 using (FileStream temp = File.Create(fakeCatalogFilePath)) { }
250
251 try
252 {
253 // Open the file we plan to test with elsewhere, but lock it
255 FileMode.Open, FileAccess.ReadWrite, FileShare.None))
256 {
257 // Using the locked file will fail. Nothing we can do about that.
258 Test_EventCatalogDataUpload(data => AssertApi.ExceptionThrown<IOException>(
259 () => data.LargeFileUpload(fakeCatalogFilePath)));
260 }
261 }
262 finally
263 {
264 // Clean up after our new file.
266 }
267 }
268 #endregion Simple File Tests
269
270 #region Upload Conflict Tests
271 #region With Compression
272 // Test raising an error when trying to upload data when it already exists.
273 [TestMethod, TestCategory(TypeName)]
275 {
277 {
279 AssertApi.ExceptionThrown<UploadInProgressException>(
280 () => data.LargeStreamUpload(SampleCatalogDataStream,
281 new Parameters(enable_compression: true, handle_existing_upload_strategy:
283 ex => Assert.AreEqual("Could not upload to this endpoint, " +
284 "a large upload is already in progress. To attempt to resume or cancel " +
285 "the existing upload session, set the 'handle_existing_upload_strategy' " +
286 "parameter appropriately.", ex.Message));
287 });
288 }
289
290 // Test setting the upload conflict strategy to delete what is there.
291 [TestMethod, TestCategory(TypeName)]
293 {
295 {
297 data.LargeStreamUpload(SampleCatalogDataStream,
298 new Parameters(enable_compression: true, handle_existing_upload_strategy:
301 });
302 }
303
304 // Test setting the upload conflict strategy to resume the upload
305 [TestMethod, TestCategory(TypeName)]
307 {
309 {
311 data.LargeStreamUpload(SampleCatalogDataStream,
312 new Parameters(enable_compression: true, handle_existing_upload_strategy:
315 });
316 }
317
318 // ARE-4982: Excel DTUV2 layer status can get stuck after upload is complete.
319 // Error: The server reported the following error: Data has been uploaded and processed successfully.
320 [TestMethod, TestCategory(TypeName)]
322 {
324 {
325 // Upload some data
327 data.LargeStreamUpload(SampleCatalogDataStream,
328 new Parameters(enable_compression: true, handle_existing_upload_strategy:
330
331 // Try to call Patch on the uploaded resource, confirm API returns MethodNotAllowed
332 AssertApi.ExceptionThrown<APIRequestException>(
333 () => data.Patch("some_data", 1024),
334 e =>
335 {
336 // Get the status from the server, verify processing is complete
337 Assert.AreEqual(data.status.Get().status.IsProcessingComplete(), true);
338 // Verify API returned status code 405 (method not allowed)
339 Assert.AreEqual(e.RestResponse?.StatusCode, HttpStatusCode.MethodNotAllowed);
340 // Verify the error message returned by the API
341 Assert.AreEqual(e.ServerError.message, "Data has been uploaded and processed successfully.");
342 });
343
344 // Call UploadChunk to verify it's handling the exception when calling Patch
345 // Without the fix, this call would throw APIRequestException with HTTP status MethodNotAllowed
347 object[] parameters = new object[] {
348 data,
349 Encoding.ASCII.GetBytes("some_data"),
350 1024,
351 28,
353 typeof( LargeDataUpload_ExtensionMethods ).GetMethod("UploadChunk",
354 BindingFlags.NonPublic | BindingFlags.Static).Invoke(null, parameters);
355 });
356 }
357
358 // Test a resumed upload where no data has been uploaded yet
359 [TestMethod, TestCategory(TypeName)]
361 {
363 {
365 data.LargeStreamUpload(SampleCatalogDataStream,
366 new Parameters(enable_compression: true, handle_existing_upload_strategy:
369 });
370 }
371
372 // Test a resumed upload where all that's left to do is commit.
373 [TestMethod, TestCategory(TypeName)]
375 {
377 {
379 data.LargeStreamUpload(SampleCatalogDataStream,
380 new Parameters(enable_compression: true, handle_existing_upload_strategy:
383 });
384 }
385
386 // Test raising an error when resuming upload where the files are of different sizes
387 [TestMethod, TestCategory(TypeName)]
389 {
391 {
392 PartiallyUploadCatalog(data, _sampleCatalogData.Substring(0, _sampleCatalogData.Length - 1));
393 AssertApi.ExceptionThrown<UploadInProgressException>(
394 () => data.LargeStreamUpload(SampleCatalogDataStream,
395 new Parameters(enable_compression: true, handle_existing_upload_strategy:
397 ex => Assert.AreEqual(String.Format("Attempted to resume an existing upload session with a " +
398 "file of different size. Prior file size: {0} This file size: {1}",
400 });
401 }
402 #endregion With Compression
403
404 #region Without Compression
405 // Test raising an error when trying to upload data when it already exists.
406 [TestMethod, TestCategory(TypeName)]
408 {
410 {
412 AssertApi.ExceptionThrown<UploadInProgressException>(
413 () => data.LargeStreamUpload(SampleCatalogDataStream,
414 new Parameters(enable_compression: false,
415 handle_existing_upload_strategy: HandleUploadInSessionStrategy.RaiseError)),
416 ex => Assert.AreEqual("Could not upload to this endpoint, " +
417 "a large upload is already in progress. To attempt to resume or cancel " +
418 "the existing upload session, set the 'handle_existing_upload_strategy' " +
419 "parameter appropriately.", ex.Message));
420 });
421 }
422
423 // Test setting the upload conflict strategy to delete what is there.
424 [TestMethod, TestCategory(TypeName)]
426 {
428 {
430 data.LargeStreamUpload(SampleCatalogDataStream,
431 new Parameters(enable_compression: false,
432 handle_existing_upload_strategy: HandleUploadInSessionStrategy.CancelPriorUpload));
434 });
435 }
436
437 // Test setting the upload conflict strategy to resume the upload
438 [TestMethod, TestCategory(TypeName)]
440 {
442 {
444 data.LargeStreamUpload(SampleCatalogDataStream,
445 new Parameters(enable_compression: false,
446 handle_existing_upload_strategy: HandleUploadInSessionStrategy.AttemptResume));
448 });
449 }
450
451 // Test a resumed upload where no data has been uploaded yet
452 [TestMethod, TestCategory(TypeName)]
454 {
456 {
458 data.LargeStreamUpload(SampleCatalogDataStream,
459 new Parameters(enable_compression: false,
460 handle_existing_upload_strategy: HandleUploadInSessionStrategy.AttemptResume));
462 });
463 }
464
465 // Test a resumed upload where all that's left to do is commit.
466 [TestMethod, TestCategory(TypeName)]
468 {
470 {
472 data.LargeStreamUpload(SampleCatalogDataStream,
473 new Parameters(enable_compression: false,
474 handle_existing_upload_strategy: HandleUploadInSessionStrategy.AttemptResume));
476 });
477 }
478
479 // Test raising an error when resuming upload where the files are of different sizes
480 [TestMethod, TestCategory(TypeName)]
482 {
484 {
485 PartiallyUploadCatalog(data, _sampleCatalogData.Substring(0, _sampleCatalogData.Length - 1));
486 AssertApi.ExceptionThrown<UploadInProgressException>(
487 () => data.LargeStreamUpload(SampleCatalogDataStream,
488 new Parameters(enable_compression: false,
489 handle_existing_upload_strategy: HandleUploadInSessionStrategy.AttemptResume)),
490 ex => Assert.AreEqual(String.Format("Attempted to resume an existing upload session with a " +
491 "file of different size. Prior file size: {0} This file size: {1}",
493 });
494 }
495 #endregion Without Compression
496
497 // Test an upload where a a file has already been uploaded and finalized.
498 [TestMethod, TestCategory(TypeName)]
500 {
502 {
503 data.LargeStreamUpload(SampleCatalogDataStream);
504 Assert.AreEqual(TaskStatus.Success, data.status.Get().status);
505 AssertApi.ExceptionThrown<InvalidOperationException>(
506 () => data.LargeStreamUpload(SampleCatalogDataStream),
507 ex => Assert.AreEqual("Cannot upload data for this resource because " +
508 "existing data has already been successfully uploaded.", ex.Message));
509 });
510 }
511
512 // Test an upload where a a file has already been uploaded and finalized.
513 [TestMethod, TestCategory(TypeName)]
515 {
517 {
518 data.LargeStreamUpload(SampleCatalogDataStream);
519 Assert.AreEqual(TaskStatus.Success, data.status.Get().status);
520 // This should return successfully, even though the file is already
521 // completely uploaded and finalized.
522 data.LargeStreamUpload(SampleCatalogDataStream,
523 new Parameters(handle_existing_upload_strategy:
525 });
526 }
527 #endregion Upload Conflict Tests
528
529 #region Asynchronous Upload
530 #region Helper Methods
531 public class AsyncTestState
532 {
533 #region Async Test Properties
536
538 public Exception Error { get; set; }
539
541 public StatusResponse Response { get; set; }
542 #endregion Async Test Properties
543
544 #region Async Test Delegates
547 status => ProgressUpdates.Enqueue(status);
548
550 public Action<Exception> OnError => ex => Error = ex;
551
553 public Action<StatusResponse> OnSuccess =>
554 result => Response = result;
555 #endregion Async Test Delegates
556
558 public AsyncTestState()
559 {
561 Error = null;
562 Response = null;
563 }
564 }
565
569 {
570 return new AsyncParameters(
571 max_chunk_size: 50,
572 // Disable compression when the chunk size is tiny
573 enable_compression: false,
574 commit_polling_options: Samples.DataPollingOptions,
575 onSuccess: async_state.OnSuccess,
576 onError: async_state.OnError,
578 }
579
583 {
584 API.PollUntil(() => async_state.Error != null || async_state.Response != null,
585 new PollingOptions(
586 maxPollInterval: EnvironmentSettings.POLLING_INTERVAL,
588 }
589
592 {
593 // Verify that no error was received.
594 Assert.IsNull(async_state.Error, "An Error was received: " +
595 (async_state.Error?.ToString() ?? ""));
596 // Verify that a successful response was received.
597 Assert.IsNotNull(async_state.Response, "No API response was received.");
598 // Verify that progress updates came through in order.
599 Debug.Write("\nProgress Updates: ");
600 long lastStatusBytes = -1;
601 while (async_state.ProgressUpdates.TryDequeue(out StatusResponse status))
602 {
603 long uploaded = status.bytes_uploaded ?? -1;
604 Debug.Write(uploaded + " (" + (uploaded * 100) / status.total_bytes + "%), ");
605 Assert.IsTrue(uploaded >= lastStatusBytes, "Expected progress update: " +
606 uploaded + " to be >= last progress update: " + lastStatusBytes);
607 // Assert that if there's no change in upload progress, we are done uploading
608 // (i.e. that we are not sending redundant progress updates)
610 Assert.AreNotEqual(status.status, TaskStatus.Uploading);
612 }
613 Debug.WriteLine("");
614 }
615 #endregion Helper Methods
616
617 #region Asynchronous Upload Tests
618 // Test asynchronously uploading a small file in multiple segments.
619 [TestMethod, TestCategory(TypeName)]
621 {
623 {
625 data.LargeFileUpload_Async(SampleCatalogFilePath,
630 });
631 }
632
633 // Test asynchronously uploading a small string in multiple segments.
634 [TestMethod, TestCategory(TypeName)]
636 {
638 {
640 data.LargeStreamUpload_Async(SampleCatalogDataStream,
645 });
646 }
647
648 // Test that Upload_Async can return in error via Callback
649 [TestMethod, TestCategory(TypeName)]
651 {
653 {
656 data.LargeStreamUpload_Async(SampleCatalogDataStream,
659 // Assert that only an error response was received
660 Assert.IsNull(async_state.Response);
661 Assert.IsNotNull(async_state.Error);
663 });
664 }
665
666 // Assert that if no onError callback is supplied, the upload fails silently.
667 [TestMethod, TestCategory(TypeName)]
669 {
671 {
673 data.LargeFileUpload_Async("bad path");
674 });
675 }
676 #endregion Asynchronous Upload Tests
677 #endregion Asynchronous Upload
678
679 #region Encoding Tests
680 // Test uploading a files with different encodings
681 [TestMethod, TestCategory(TypeName)]
683 {
685 {
686 RootSamplesCsvPath + @"event_catalog_encodings\event_catalog_data_ansi.csv",
687 RootSamplesCsvPath + @"event_catalog_encodings\event_catalog_data_exceldefault.csv",
688 RootSamplesCsvPath + @"event_catalog_encodings\event_catalog_data_notepadutf8.csv",
689 RootSamplesCsvPath + @"event_catalog_encodings\event_catalog_data_win1252.csv"
690 });
691 }
692
693 // Test uploading a files with different
694 [TestMethod, TestCategory(TypeName), TestCategory("Skipped")]
696 {
697 // TODO: This test appears to work locally but not on Jenkins. It's possible that git
698 // is interfering with the files when committing them or downloading them remotely.
699 Skip.Indefinitely("(No associated ticket)");
701 {
702 RootSamplesCsvPath + @"event_catalog_encodings\event_catalog_data_unicode.csv",
703 RootSamplesCsvPath + @"event_catalog_encodings\event_catalog_data_unicodebigendian.csv"
704 });
705 }
706
707 // Test uploading a files with different encodings
709 {
711 Assert.Inconclusive("RUN_OFFLINE = true");
713 foreach (string file in files)
714 {
715 string path = GetTestFilePath(file);
716 try
717 {
718 Test_EventCatalogDataUpload(data => data.LargeFileUpload(path));
719 }
720 catch (Exception ex)
721 {
722 string encoding = path.Substring(path.LastIndexOf('\\'));
724 Debug.WriteLine("\nUnsupported encoding: " + encoding +
725 "\nError: " + ex + "\n\n");
726 }
727 }
728 if (unsupported.Count > 0)
729 {
730 Assert.AreEqual(0, unsupported.Count,
731 "One or more unsupported encodings encountered:\n" +
732 String.Join("\n", unsupported));
733 }
734 }
735 #endregion Encoding Tests
736
737 #region Binary Upload
743 public DateTime LogTimeElapsed(DateTime startTime, string actionName = "Action")
744 {
745 DateTime now = DateTime.UtcNow;
746 Console.WriteLine($"{actionName} took {(now - startTime).TotalSeconds} seconds.");
747 return DateTime.UtcNow;
748 }
749
750 // Time uploading a large volume of data.
751 [TestMethod, TestCategory(TypeName)]
752 [Obsolete("ARE-5226 Remove support for Binary YELT Loss Set format. ")]
754 {
756 Assert.Inconclusive("RUN_OFFLINE = true");
757 const int events_per_trial = 125;
758 const int rows = 500000;
759
760 long bytes;
761 double megabytes;
762 TimeSpan elapsed = TimeSpan.MaxValue;
763
764 // Post a contrived event catalog
766 {
767 description = "Minimal Event Catalog",
768 source = "Contrived"
769 }.Post("\"Event Id\",\"Rate\"\n" + String.Join("\n",
770 Enumerable.Range(1, events_per_trial).Select(i => i + "," + "1")));
771 // Post the YELTLossSet Metadata
773 {
774 currency = "USD",
775 description = "Sample Binary YELT",
776 data_type = YELTLossSet.DataType.binary,
777 event_catalogs = new List<IReference<EventCatalog>> { forYELT.ToReference() },
778 trial_count = (int)Math.Ceiling((double)rows / events_per_trial)
779 }.Post();
780
781 // Prepare file upload parameters.
784 new AsyncParameters(
785 commit_polling_options: Samples.DataPollingOptions,
786 onSuccess: async_state.OnSuccess,
787 onError: async_state.OnError,
789
791 // Prepare a large YELT sample file.
792 string temp_file_name = Path.GetTempFileName();
793 Random r = new Random();
794 try
795 {
797 {
798 foreach (int i in Enumerable.Range(0, rows).ToList())
799 writer.Write(
800 (i % events_per_trial) + "," + // Event
801 (i / events_per_trial + 1) + "," + // Trial
802 ((i % events_per_trial) + r.NextDouble()).ToString("0.##") + "," + // Sequence
803 (r.NextDouble() * 100000d).ToString("0.##") + "\n"); // Loss
804 }
806 $"Generating YELT with {rows:#,###} rows.");
807 // Upload the generated file
808 posted.data.LargeFileUpload_Async(temp_file_name, async_params);
809
810 // Monitor the upload, and notice when it changes status
811 bool uploading = true;
813 while (async_state.Error == null && async_state.Response == null &&
814 (DateTime.UtcNow - start).TotalMinutes < 5.0)
815 {
816 while (async_state.ProgressUpdates.Any())
817 async_state.ProgressUpdates.TryDequeue(out lastUpdate);
818 // Capture the switch from uploading to processing
819 if (uploading && lastUpdate != null &&
820 lastUpdate.status != TaskStatus.Uploading)
821 {
822 elapsed = DateTime.UtcNow - start;
823 checkpoint = LogTimeElapsed(checkpoint, "Uploading YELT");
824 uploading = false;
825 }
826 Thread.Sleep(50);
827 }
828 TimeSpan endToEndElapsed = DateTime.UtcNow - start;
829 LogTimeElapsed(checkpoint, "Processing YELT on the server");
831
832 while (async_state.ProgressUpdates.Any())
833 async_state.ProgressUpdates.TryDequeue(out lastUpdate);
834 // Get the upload speed based on the size of the compressed file.
835 bytes = lastUpdate?.total_bytes ?? 0;
836 megabytes = bytes / 1048576d;
837 Console.WriteLine($"\nCompressed File size: {bytes} bytes, {megabytes:0.00}MiB");
838 Console.WriteLine($"Upload Speed: {(int)(bytes / elapsed.TotalSeconds)} bytes/s " +
839 $"({(megabytes / elapsed.TotalSeconds):0.00} MiBps)");
840
841 // Calculate file size
842 bytes = new FileInfo(temp_file_name).Length;
843 megabytes = bytes / 1048576d;
844 Console.WriteLine($"\nOriginal File size: {bytes} bytes, {megabytes:0.00}MiB");
845
846 // Log the throughput for this upload
847 Console.WriteLine($"Effective Throughput: {(int)(bytes / elapsed.TotalSeconds)} " +
848 $"bytes/s ({(megabytes / elapsed.TotalSeconds):0.00} MiBps)\n");
849
850 // Get the throughput counting the server validation time.
851 Console.WriteLine($"Throughput including server time: " +
852 $"{(int)(bytes / endToEndElapsed.TotalSeconds)} bytes/s " +
853 $"({(megabytes / endToEndElapsed.TotalSeconds):0.00} MiBps)\n");
854
855 Assert.IsTrue(async_state.Error == null && !(lastUpdate?.status.IsError() ?? false),
856 $"Async upload appears to have failed: {lastUpdate?.status} - {async_state.Error}");
857 Assert.IsNull(async_state.Error,
858 $"Async upload appears to have never failed: {async_state.Error}");
859 Assert.AreEqual(TaskStatus.Success, lastUpdate?.status,
860 $"Async upload appears to have never completed: {lastUpdate?.status}");
861 }
862 finally
863 {
864 try { File.Delete(temp_file_name); } catch { /* Ignore if cleanup fails. */ }
865 }
866
867 // Now time the upload of the tiniest file possible against a copy of the resource
868 YELTLossSet copy = posted.ShallowCopy().Post();
871 commit_polling_options: Samples.DataPollingOptions,
872 onSuccess: async_state.OnSuccess,
873 onError: async_state.OnError,
875
876 checkpoint = DateTime.UtcNow;
877 copy.data.LargeStreamUpload_Async(
880 TimeSpan overhead = DateTime.UtcNow - checkpoint;
881 Console.WriteLine($"- {overhead.TotalSeconds:0.0000} " +
882 "(Overhead for any upload determined by uploading a teeny tiny file)");
883 elapsed -= overhead;
884 Console.WriteLine($"= {elapsed.TotalSeconds:0.0000} Adjusted Upload-Only Time");
885
886 // Display the adjusted throughput.
887 Console.WriteLine($"\nAdjusted Throughput: {(int)(bytes / elapsed.TotalSeconds)} " +
888 $"bytes/s ({(megabytes / elapsed.TotalSeconds):0.00} MiBps)");
889 Console.WriteLine("Adjusted Throughput is an optimistic measure of the C# " +
890 "client's capacity to push volumes of data to the server by ignoring " +
891 "fixed overhead and subsequent validation performance of the server.\n");
892 }
893
896 [TestMethod, TestCategory(TypeName)]
897 [Obsolete("ARE-5226 Remove support for Binary YELT Loss Set format. ")]
899 {
901 Assert.Inconclusive("RUN_OFFLINE = true");
902 // Post the YELTLossSet Metadata
903 YELTLossSet binaryYelt = Samples.LossSet_YELTLossSet.Unposted
904 .Change(ls => ls.data_type, YELTLossSet.DataType.binary).Post();
905
906 // Create a stream containing a YELT already in the binary format
907 using (MemoryStream ms = new MemoryStream())
908 {
910 new YELTStream(binaryYelt.trial_count, 10, false, new[] { 0, 1, 2 })))
912 {
914 }
915 // Ensure binary conversion is disabled, but compression is enabled.
916 Parameters uploadParams = new Parameters(commit_polling_options: Samples.DataPollingOptions)
917 {
918 binary_yelt_options = Parameters.BinaryYELTUploadOptions.AutomaticCompressionOnly
919 };
920 binaryYelt.data.LargeStreamUpload(ms, uploadParams);
921 }
922
923 // Assert that processing succeeded
924 StatusResponse status = binaryYelt.data.status.Get();
925 Assert.IsTrue(status.commit_error == null,
926 $"Upload appears to have failed: {status.commit_error}");
927 binaryYelt = binaryYelt.Get();
928 Assert.AreEqual(TaskStatus.Success, binaryYelt.status, "Async upload appears to have " +
929 $"not completed successfully: {binaryYelt.status} - {binaryYelt.status_message}");
930 }
931
933 [TestMethod, TestCategory(TypeName)]
934 [Obsolete("ARE-5226 Remove support for Binary YELT Loss Set format. ")]
936 {
938 Assert.Inconclusive("RUN_OFFLINE = true");
939 // Post the YELTLossSet Metadata
940 YELTLossSet binaryYelt = Samples.LossSet_YELTLossSet.Unposted
941 .Change(ls => ls.data_type, YELTLossSet.DataType.binary).Post();
942
943 // Create a stream containing a YELT already in the binary format
944 using (MemoryStream ms = new MemoryStream())
945 {
947 new YELTStream(binaryYelt.trial_count, 10, false, new[] { 0, 1, 2 })))
950 compressionStrategy: GZipBufferedBytesProducer.CompressionStrategy.CompressEntireFile))
951 {
953 }
954 // Ensure binary conversion and compression are disabled in the parameters.
955 Parameters uploadParams = new Parameters(commit_polling_options: Samples.DataPollingOptions)
956 {
957 binary_yelt_options = Parameters.BinaryYELTUploadOptions.None
958 };
959 binaryYelt.data.LargeStreamUpload(ms, uploadParams);
960 }
961
962 // Assert that processing succeeded
963 StatusResponse status = binaryYelt.data.status.Get();
964 Assert.IsTrue(status.commit_error == null,
965 $"Upload appears to have failed: {status.commit_error}");
966 binaryYelt = binaryYelt.Get();
967 Assert.AreEqual(TaskStatus.Success, binaryYelt.status, "Async upload appears to have " +
968 $"not completed successfully: {binaryYelt.status} - {binaryYelt.status_message}");
969 }
970
975 {
976 const int maxFailedReads = 5;
977 int failedReads = 0;
978 producer.Start();
979 while (producer.CanTake && failedReads < maxFailedReads)
980 {
981 BufferedBytes binaryYeltBytes = producer.TryTake(out bool success, null);
982 if (!success)
983 failedReads++;
984 else
985 {
986 failedReads = 0;
987 ms.Write(binaryYeltBytes.Bytes, 0, binaryYeltBytes.LengthFilled);
988 }
989 }
990 Assert.IsTrue(failedReads < maxFailedReads, "The compressor failed to generate any bytes " +
991 "following 5 reads in a row - yet CanTake is still true. Infinite loop avoided.");
992 // Reset the memory stream position to 0 so it can be consumed by the caller
993 ms.Position = 0;
994 }
995
997 [TestMethod, TestCategory(TypeName)]
998 [Obsolete("ARE-5226 Remove support for Binary YELT Loss Set format. ")]
1000 {
1002 Assert.Inconclusive("RUN_OFFLINE = true");
1003 // 1. Post the a binary YELTLossSet
1004 YELTLossSet binaryYelt = Samples.LossSet_YELTLossSet.Unposted
1005 .Change(ls => ls.data_type, YELTLossSet.DataType.binary)
1007 Assert.AreEqual(TaskStatus.Success, binaryYelt.status, binaryYelt.status_message);
1008
1009 // 2. Server should allow us to create a copy of this pretty easily by re-using the same data file
1010 YELTLossSet copy = binaryYelt.DeepCopy().Post().PollUntilReady(Samples.DataPollingOptions);
1011 Assert.AreEqual(TaskStatus.Success, copy.status, copy.status_message);
1012
1013 // We should also be able to download the converted binary file and re-use it that way.
1014 // Ensure binary conversion and compression are disabled in the parameters.
1015 Parameters uploadParams = new Parameters(commit_polling_options: Samples.DataPollingOptions)
1016 {
1017 binary_yelt_options = Parameters.BinaryYELTUploadOptions.None
1018 };
1019
1020 // 3. Download the data from the server as stream of bytes, then re-upload
1021 copy = binaryYelt.DeepCopy().Change(r => r.data_file, null).Post();
1022 using (MemoryStream ms = new MemoryStream())
1023 {
1024 byte[] buffer = new byte[4096];
1025 binaryYelt.data.GetStream(downloadStream =>
1026 {
1027 int count;
1028 while ((count = downloadStream.Read(buffer, 0, buffer.Length)) != 0)
1029 ms.Write(buffer, 0, count);
1030 });
1031 // Reset the memory stream position to 0 so it can be consumed by the uploader
1032 ms.Position = 0;
1033 copy.data.LargeStreamUpload(ms, uploadParams);
1034 }
1035 copy = copy.Get();
1036 Assert.AreEqual(TaskStatus.Success, copy.status, copy.status_message);
1037
1038 // 4. We should also be able to cleverly stream a download a new upload directly.
1039 copy = binaryYelt.DeepCopy().Change(r => r.data_file, null).Post();
1040 binaryYelt.data.GetStream(
1041 downloadStream => copy.data.LargeStreamUpload(downloadStream, uploadParams));
1042 copy = copy.Get();
1043 Assert.AreEqual(TaskStatus.Success, copy.status, copy.status_message);
1044
1045 // 5? Note: We *cannot* read the data into a string to re-upload it
1046 // The binary data is corrupted by the UTF-8 Encoding process.
1047 //string binaryData = binaryYelt.data.Get();
1048 //copy = binaryYelt.DeepCopy().Change(r => r.data_file, null)
1049 // .Post(binaryData, null, uploadParams).PollUntilReady();
1050 //Assert.AreEqual(TaskStatus.Success, copy.status, copy.status_message);
1051 }
1052
1053 [TestMethod, TestCategory(TypeName)]
1054 [Obsolete("ARE-5226 Remove support for Binary YELT Loss Set format. ")]
1056 {
1058 Assert.Inconclusive("RUN_OFFLINE = true");
1060 yelt.data_type = YELTLossSet.DataType.binary;
1061 AssertApi.ExceptionThrown<BufferProducerException>(() =>
1062 yelt.Post("1,0,1.5,542345.543\n" +
1063 "2,98765,-123.456,-9999.9999\n" +
1064 "1,0,1.5,1e4"));
1065 }
1066 #endregion Binary Upload
1067
1068 #region Stream of indeterminate Length
1072 private class UnknownLengthStream : Stream
1073 {
1074 private const int BytesToReturn = 1000000;
1075 private int _bytesReturned = 0;
1076
1077 public override bool CanRead => _bytesReturned < BytesToReturn;
1078 public override bool CanSeek => false;
1079 public override bool CanWrite => false;
1080 public override long Length => throw new NotSupportedException();
1081 public override long Position
1082 {
1083 get => throw new NotSupportedException();
1084 set => throw new NotSupportedException();
1085 }
1086
1087 public override void Flush() { }
1088 public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
1089 public override void SetLength(long value) => throw new NotSupportedException();
1090 public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
1091 public override int Read(byte[] buffer, int offset, int count)
1092 {
1093 int written = 0;
1094 // Due to ARE-7230, we will generate an internal server error unless we include
1095 // line breaks in our data periodically - so we add char(10) every 1000 bytes.
1096 for (int i = offset; i < offset + count; i++)
1097 buffer[i] = i % 1000 == 0 ? (byte)(char)10 : (byte)(char)(44 + (i ^ 13) % (58 - 48));
1099 written = Math.Min(count, BytesToReturn - _bytesReturned);
1101 return written;
1102 }
1103 }
1104
1105 [TestMethod, TestCategory(TypeName)]
1107 {
1109 Assert.Inconclusive("RUN_OFFLINE = true");
1111 AssertApi.ExceptionThrown<CommitFailedException>(() =>
1112 {
1114 {
1115 incomplete.data.LargeStreamUpload(unknownStream);
1116 }
1117 }, ex => Assert.AreEqual("The uploaded file failed validation " +
1118 "with the following message: CSV input error: Event ID column not found.", ex.Message));
1119 }
1120
1121 // A sample stream that produces a YELT CSV stream on unknown length on the fly.
1122 public class YELTStream : Stream
1123 {
1124 private readonly int _eventsPerTrial;
1125 private readonly int _trialCount;
1126 private readonly Random _rng = new Random();
1127 private byte[] _leftOver;
1128 private int _rowsGenerated;
1129 private long _bytesReturned;
1130 private readonly bool _includeHeader;
1131 private readonly int[] _validEventIds;
1132
1133 public bool HeaderGenerated { get; private set; } = false;
1134 public int RowsGenerated => _rowsGenerated;
1135 public long BytesReturned => _bytesReturned;
1136
1137 public override bool CanRead => RowsGenerated < _eventsPerTrial * _trialCount || _leftOver != null;
1138 public override bool CanSeek => false;
1139 public override bool CanWrite => false;
1140 public override long Length => throw new NotSupportedException();
1141
1142 public override long Position
1143 {
1144 get => throw new NotSupportedException();
1145 set => throw new NotSupportedException();
1146 }
1147
1148 public YELTStream(int trial_count, int events_per_trial, bool includeHeader = false,
1150 {
1151 _trialCount = trial_count;
1154 _validEventIds = validEventIds?.ToArray();
1155 }
1156
1157 public override void Flush() { }
1158 public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
1159 public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
1160 public override void SetLength(long value) => throw new NotSupportedException();
1161 public override int Read(byte[] buffer, int offset, int count)
1162 {
1163 if (!CanRead)
1164 return 0;
1165
1166 const string yeltFormat = "{0},{1},{2:f2},{3:f2}\n";
1167 int bytesPrepared = 0;
1168 int currentOffset = offset;
1169 CultureInfo culture = CultureInfo.InvariantCulture;
1170
1171 if (_leftOver != null)
1172 {
1173 int to_copy = Math.Min(count, _leftOver.Length);
1174 Buffer.BlockCopy(_leftOver, 0, buffer, offset, to_copy);
1175 // If we didn't get to use all leftovers, return now
1176 if (count < _leftOver.Length)
1177 {
1178 byte[] leftOverleft = new byte[_leftOver.Length - count];
1179 Buffer.BlockCopy(_leftOver, count, leftOverleft, 0, leftOverleft.Length);
1180 _leftOver = leftOverleft;
1181 _bytesReturned += count;
1182 return count;
1183 }
1186 }
1187 _leftOver = null;
1188
1189 // Keep generating rows until we've reached the number of requested bytes or
1190 // we've reached the number of rows we've been configured to generate.
1192 {
1193 // Produce a random YELT row
1194 string nextRow;
1196 {
1197 nextRow = "Event Id,Trial Id,Day,Loss\n";
1198 HeaderGenerated = true;
1199 }
1200 else
1201 {
1202 int event_id = _validEventIds?[_rng.Next(_validEventIds.Length)] ??
1203 _rng.Next(_eventsPerTrial);
1204 nextRow = String.Format(culture, yeltFormat,
1205 event_id,
1206 RowsGenerated / _eventsPerTrial + 1 /* Trial */,
1207 RowsGenerated % _eventsPerTrial + _rng.NextDouble() /* Day */,
1208 _rng.NextDouble() * 100000d /* Loss */);
1209 }
1210 // Increment the row counter and total bytes counter
1211 // Get the byte encoding for this row
1212 byte[] bytes = Encoding.UTF8.GetBytes(nextRow);
1214 int byte_count = bytes.Length;
1215 int buffer_room = count - bytesPrepared;
1216 // Copy as many bytes as we have or as many as will fit into the current buffer
1218 Buffer.BlockCopy(bytes, 0, buffer, currentOffset, bytes_copied);
1221 // If the whole row didn't fit into the buffer, save what's left for later.
1223 {
1224 _leftOver = new byte[byte_count - bytes_copied];
1225 Buffer.BlockCopy(bytes, bytes_copied, _leftOver, 0, _leftOver.Length);
1226 }
1227 }
1229 // Return the number of bytes we just produced
1230 return bytesPrepared;
1231 }
1232 }
1233
1234 [TestMethod, TestCategory(TypeName)]
1235 [Obsolete("ARE-5226 Remove support for Binary YELT Loss Set format. ")]
1237 {
1239 Assert.Inconclusive("RUN_OFFLINE = true");
1240 const int trialCount = 50000;
1242 incomplete.data_type = YELTLossSet.DataType.binary;
1243 incomplete.trial_count = trialCount;
1244 incomplete = incomplete.Post();
1245 using (YELTStream yeltStream = new YELTStream(trialCount, 4))
1246 {
1247 DateTime start = DateTime.UtcNow;
1248 incomplete.data.LargeStreamUpload(yeltStream);
1249 double elapsedSeconds = (DateTime.UtcNow - start).TotalSeconds;
1250 Console.WriteLine($"Upload of {yeltStream.RowsGenerated} row YELT " +
1251 $"took {elapsedSeconds:0.0} seconds " +
1252 $"({(yeltStream.RowsGenerated / elapsedSeconds):0} rows per second).");
1253 Console.WriteLine($"Total bytes (uncompressed): {yeltStream.BytesReturned}.");
1254 Console.WriteLine("Effective Throughput: " +
1255 $"{(yeltStream.BytesReturned / 1.049e+6 / elapsedSeconds):0.00} MiB per second.");
1256 }
1257 }
1258 #endregion Stream of indeterminate Length
1259
1260 #region StreamProducers
1261 [TestMethod, TestCategory(TypeName)]
1263 {
1264 const int timeoutMs = 20000; // ms
1265 using (YELTStream data = new YELTStream(10, 10))
1266 {
1267 int bytes_to_read = 666;
1270 bool success = true;
1271 int total_buffered = 0;
1272 producer.Start();
1273 while (producer.CanTake && success)
1274 {
1275 try
1276 {
1278 total_buffered += producer.TryTake(out success, timeout.Token).LengthFilled;
1279 if (total_buffered > bytes_to_read || data.BytesReturned > bytes_to_read)
1280 Assert.Fail(total_buffered + " bytes of been buffered, and " +
1281 data.BytesReturned + " bytes requested from the stream, which " +
1282 "exceeds the set number of bytes to read of " + bytes_to_read);
1283 }
1285 {
1286 Assert.Fail("The stream took longer than " + timeoutMs + "ms to respond.");
1287 }
1288 }
1289 Assert.AreEqual(bytes_to_read, total_buffered, "Expected only bytes to read to " +
1290 "be reflected in the total number of bytes returned.");
1291 Assert.AreEqual(bytes_to_read, data.BytesReturned, "Expected only bytes to read to " +
1292 "be reflected in the total number of bytes requested from the stream.");
1293 }
1294 }
1295 #endregion StreamProducers
1296 }
1297}
Create a test class that takes care of setting the server URL and cleaning up after each unit test.
Exposes the various sample CSV files as strings.
Definition Samples.CSV.cs:9
static string Event_Catalog_Data
static string YELTLossSet_10Trials_ForBinary
Exposes sample resource objects, with built-in methods for injecting dependencies.
Definition Samples.cs:14
static readonly PollingOptions DataPollingOptions
Polling options to use when uploading a data file.
IInjectableResource< YELTLossSet > LossSet_YELTLossSet
IInjectableResource< EventCatalog > EventCatalog
static void MethodIsAllowed(Action request, string methodName, bool methodAllowed=true)
Wrap a request in a tryGet with some formatting for testing purposes.
Definition AssertApi.cs:98
Retrieve settings from environment variables if they exist, or the project settings file otherwise.
static bool RUN_OFFLINE
Controls whether tests that normally require a connection to the server should be allowed to try to r...
Generic Unit test implementations that will test REST methods on arbitrary resources.
Class used in unit tests to mark tests as skipped by using Assert.Inconclusive() method.
Definition SkipUntil.cs:14
static void Indefinitely(string ticket=null)
Skip the specified test.
Definition SkipUntil.cs:54
A custom exception class that includes the RestSharp.IRestResponse that generated the exception,...
Describes an endpoint off of some types of resources from which an associated "large data file" can b...
SubResource< LargeDataUpload.StatusResponse > status
The endpoint at which the status of the data can be determined.
void Patch(string data, long bytesOffset, int? timeout=null, IEnumerable< Parameter > requestParameters=null)
Performs a PATCH of part of this object's data at this endpoint.
Describes a collection of resources which can be listed.
T Get(IEnumerable< Parameter > parameters=null, int? timeout=null)
Performs a GET request a at this endpoint.
API methods / requests made available to the user.
static void PollUntil(Func< bool > request, AnalyzeRe.PollingOptions pollingOptions=null)
Poll the specified request until it returns true.
Representation of an event catalog. The event catalog may cover multiple region/perils,...
Parameters to be used in a Large Upload operation.
Converts a YELT CSV data into a specified format.
Indicates that an exception occurred inside of a buffered producer thread.
Produces a BlockingCollection of buffered bytes asynchronously which can be consumed on some other th...
Exception raised when the commit process fails.
Parameters to be used in a Large Data Upload operation.
Definition Parameters.cs:7
Indicates the status of a data endpoint.
string commit_error
If the processing of the file failed, the error message reported by the server.
Indicates that a large upload could not take place because an existing large upload session is alread...
Large Data Upload Utilities.
Definition Utilities.cs:11
static MemoryStream GetStringStream(string input)
Generate a byte stream from a string which can be used in the LargeStreamUpload methods.
Definition Utilities.cs:40
Representation of a loss set with an associated year-event-loss table.
DataType
The format of the data uploaded against this YELT.
Determines the behaviour of the API when automatically retrying a request whose result is not yet rea...
T Unposted
The unPOSTed resource definition.
T Posted
The posted resource, ready to be referenced.
@ Error
Indicates that there was an error while determining the authentication status.
HandleUploadInSessionStrategy
Behaviour to use if an existing upload is already in progress for an endpoint.
@ CancelPriorUpload
If any prior upload in progress is encountered, it will be deleted before beginning anew with the cur...
@ RaiseError
If any prior upload in progress is encountered, do not interfere and raise an UploadInProgress except...
@ AttemptResume
If any prior upload in progress is encountered, attempt to resume the upload starting at the offset w...
The structure containing an array of bytes and integer indicating the number of bytes in the array th...
@ currency
Reinstatement values represent a fixed dollar amount.
TaskStatus
The status of a data upload which may be in progress.
Definition TaskStatus.cs:9