Преглед изворни кода

Add Task async await check when evaluate await expression (#1882)

* Fix event loop using concurrent queue
* Add continuation task fix
* Enhance continuation task
* Remove wait in event loop

---------

Co-authored-by: Trung Tran <[email protected]>
Co-authored-by: Ngoc.TranThi <[email protected]>
Trung Tran пре 1 година
родитељ
комит
4e82c672eb

+ 71 - 0
Jint.Tests/Runtime/AsyncTests.cs

@@ -1,3 +1,5 @@
+using System.Collections.Concurrent;
+using Jint.Native;
 using Jint.Tests.Runtime.TestClasses;
 
 namespace Jint.Tests.Runtime;
@@ -278,4 +280,73 @@ public class AsyncTests
         var val = result.GetValue("main").Call();
         Assert.Equal(1, val.UnwrapIfPromise().AsInteger());
     }
+
+    [Fact]
+    public async Task ShouldEventLoopBeThreadSafeWhenCalledConcurrently()
+    {
+        const int ParallelCount = 1000;
+
+        // [NOTE] perform 5 runs since the bug does not always happen
+        for (int run = 0; run < 5; run++)
+        {
+            var tasks = new List<TaskCompletionSource<object>>();
+
+            for (int i = 0; i < ParallelCount; i++)
+                tasks.Add(new TaskCompletionSource<object>());
+
+            for (int i = 0; i < ParallelCount; i++)
+            {
+                int taskIdx = i;
+                _ = Task.Factory.StartNew(() =>
+                {
+                    try
+                    {
+                        Engine engine = new(options => options.ExperimentalFeatures = ExperimentalFeature.TaskInterop);
+
+                        const string Script = """
+                        async function main(testObj) {
+                            async function run(i) {
+                                await testObj.Delay(100);
+                                await testObj.Add(`${i}`);
+                            }
+                        
+                            const tasks = [];
+                            for (let i = 0; i < 10; i++) {
+                                tasks.push(run(i));
+                            }
+                            for (let i = 0; i < 10; i++) {
+                                await tasks[i];
+                            }
+                            return 1;
+                        }
+                        """;
+                        var result = engine.Execute(Script);
+                        var testObj = JsValue.FromObject(engine, new TestAsyncClass());
+                        var val = result.GetValue("main").Call(testObj);
+                        tasks[taskIdx].SetResult(null);
+                    }
+                    catch (Exception ex)
+                    {
+                        tasks[taskIdx].SetException(ex);
+                    }
+                }, creationOptions: TaskCreationOptions.LongRunning);
+            }
+
+            await Task.WhenAll(tasks.Select(t => t.Task));
+            await Task.Delay(100);
+        }
+    }
+
+    class TestAsyncClass
+    {
+        private readonly ConcurrentBag<string> _values = new();
+
+        public Task Delay(int ms) => Task.Delay(ms);
+
+        public Task Add(string value)
+        {
+            _values.Add(value);
+            return Task.CompletedTask;
+        }
+    }
 }

+ 3 - 14
Jint/Engine.cs

@@ -1,4 +1,5 @@
 using System.Diagnostics;
+using System.Collections.Concurrent;
 using System.Diagnostics.CodeAnalysis;
 using System.Runtime.CompilerServices;
 using Jint.Native;
@@ -508,25 +509,13 @@ namespace Jint
         internal void RunAvailableContinuations()
         {
             var queue = _eventLoop.Events;
-            if (queue.Count == 0)
-            {
-                return;
-            }
-
             DoProcessEventLoop(queue);
         }
 
-        private static void DoProcessEventLoop(Queue<Action> queue)
+        private static void DoProcessEventLoop(ConcurrentQueue<Action> queue)
         {
-            while (true)
+            while (queue.TryDequeue(out var nextContinuation))
             {
-                if (queue.Count == 0)
-                {
-                    return;
-                }
-
-                var nextContinuation = queue.Dequeue();
-
                 // note that continuation can enqueue new events
                 nextContinuation();
             }

+ 2 - 10
Jint/JsValueExtensions.cs

@@ -650,17 +650,9 @@ public static class JsValueExtensions
         if (value is JsPromise promise)
         {
             var engine = promise.Engine;
-            var task = promise.TaskCompletionSource.Task;
-            engine.RunAvailableContinuations();
-            engine.AddToEventLoop(() =>
-            {
-                if (!task.IsCompleted)
-                {
-                    // Task.Wait has the potential of inlining the task's execution on the current thread; avoid this.
-                    ((IAsyncResult) task).AsyncWaitHandle.WaitOne();
-                }
-            });
+            var completedEvent = promise.CompletedEvent;
             engine.RunAvailableContinuations();
+            completedEvent.Wait();
             switch (promise.State)
             {
                 case PromiseState.Pending:

+ 4 - 3
Jint/Native/JsPromise.cs

@@ -1,3 +1,4 @@
+using System.Threading;
 using Jint.Native.Object;
 using Jint.Native.Promise;
 using Jint.Runtime;
@@ -12,7 +13,7 @@ internal sealed class JsPromise : ObjectInstance
 
     // valid only in settled state (Fulfilled or Rejected)
     internal JsValue Value { get; private set; } = null!;
-    internal TaskCompletionSource<JsPromise> TaskCompletionSource { get; }= new();
+    internal ManualResetEventSlim CompletedEvent { get; } = new();
 
     internal List<PromiseReaction> PromiseRejectReactions = new();
     internal List<PromiseReaction> PromiseFulfillReactions = new();
@@ -127,7 +128,7 @@ internal sealed class JsPromise : ObjectInstance
         var reactions = PromiseRejectReactions;
         PromiseRejectReactions = new List<PromiseReaction>();
         PromiseFulfillReactions.Clear();
-        TaskCompletionSource.SetCanceled();
+        CompletedEvent.Set();
 
         // Note that this part is skipped because there is no tracking yet
         // 7. If promise.[[PromiseIsHandled]] is false, perform HostPromiseRejectionTracker(promise, "reject").
@@ -147,7 +148,7 @@ internal sealed class JsPromise : ObjectInstance
         var reactions = PromiseFulfillReactions;
         PromiseFulfillReactions = new List<PromiseReaction>();
         PromiseRejectReactions.Clear();
-        TaskCompletionSource.SetResult(this);
+        CompletedEvent.Set();
 
         return PromiseOperations.TriggerPromiseReactions(_engine, reactions, result);
     }

+ 3 - 1
Jint/Native/JsValue.cs

@@ -176,7 +176,9 @@ namespace Jint.Native
                         resolve(FromObject(engine, JsValue.Undefined));
                     }
                 }
-            });
+            },
+            // Ensure continuation is completed before unwrapping Promise
+            continuationOptions: TaskContinuationOptions.AttachedToParent | TaskContinuationOptions.ExecuteSynchronously);
 
             return promise;
         }

+ 3 - 1
Jint/Runtime/EventLoop.cs

@@ -1,7 +1,9 @@
+using System.Collections.Concurrent;
+
 namespace Jint.Runtime
 {
     internal sealed record EventLoop
     {
-        internal readonly Queue<Action> Events = new();
+        internal readonly ConcurrentQueue<Action> Events = new();
     }
 }