From e7adb2857f2726bb0997efd14fd8edc0054e9f3f Mon Sep 17 00:00:00 2001 From: Diego Date: Thu, 4 Jul 2019 20:45:27 +0100 Subject: [PATCH 1/9] consume processor in kernel --- Microsoft.DotNet.Try.Jupyter/ExecuteRequestHandler.cs | 8 ++++++-- WorkspaceServer/Kernel/CodeSubmissionProcessors.cs | 1 + 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/Microsoft.DotNet.Try.Jupyter/ExecuteRequestHandler.cs b/Microsoft.DotNet.Try.Jupyter/ExecuteRequestHandler.cs index ee46fb38b..bd461f797 100644 --- a/Microsoft.DotNet.Try.Jupyter/ExecuteRequestHandler.cs +++ b/Microsoft.DotNet.Try.Jupyter/ExecuteRequestHandler.cs @@ -20,6 +20,7 @@ public class ExecuteRequestHandler : IObserver private readonly RenderingEngine _renderingEngine; private readonly ConcurrentDictionary _openRequests = new ConcurrentDictionary(); private int _executionCount; + private readonly CodeSubmissionProcessors _processors; private class OpenRequest { @@ -48,20 +49,23 @@ public ExecuteRequestHandler(IKernel kernel) _renderingEngine.RegisterRenderer(typeof(IDictionary), new DictionaryRenderer()); _renderingEngine.RegisterRenderer(typeof(IList), new ListRenderer()); _renderingEngine.RegisterRenderer(typeof(IEnumerable), new SequenceRenderer()); + _processors = new CodeSubmissionProcessors(); _kernel.KernelEvents.Subscribe(this); } - public Task Handle(JupyterRequestContext context) + public async Task Handle(JupyterRequestContext context) { var executeRequest = context.GetRequestContent() ?? throw new InvalidOperationException($"Request Content must be a not null {typeof(ExecuteRequest).Name}"); context.RequestHandlerStatus.SetAsBusy(); var command = new SubmitCode(executeRequest.Code); + command = await _processors.ProcessAsync(command); var id = command.Id; var transient = new Dictionary { { "display_id", id.ToString() } }; var executionCount = executeRequest.Silent ? _executionCount : Interlocked.Increment(ref _executionCount); _openRequests[id] = new OpenRequest(context, executeRequest, executionCount, id, transient); - return _kernel.SendAsync(command); + + await _kernel.SendAsync(command); } void IObserver.OnCompleted() diff --git a/WorkspaceServer/Kernel/CodeSubmissionProcessors.cs b/WorkspaceServer/Kernel/CodeSubmissionProcessors.cs index 55b157bd5..1b662f053 100644 --- a/WorkspaceServer/Kernel/CodeSubmissionProcessors.cs +++ b/WorkspaceServer/Kernel/CodeSubmissionProcessors.cs @@ -21,6 +21,7 @@ public class CodeSubmissionProcessors public CodeSubmissionProcessors() { _rootCommand = new RootCommand(); + _parser = new CommandLineBuilder(_rootCommand).Build(); } public void Register(ICodeSubmissionProcessor processor) From 95ee1c4398486145a9b50cc73dda5287a1202b5b Mon Sep 17 00:00:00 2001 From: Diego Date: Fri, 5 Jul 2019 09:50:33 +0100 Subject: [PATCH 2/9] the jupyter kernel and the repl kernel can have different processors --- WorkspaceServer/Kernel/CSharpRepl.cs | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/WorkspaceServer/Kernel/CSharpRepl.cs b/WorkspaceServer/Kernel/CSharpRepl.cs index e11cce6f7..024fb4401 100644 --- a/WorkspaceServer/Kernel/CSharpRepl.cs +++ b/WorkspaceServer/Kernel/CSharpRepl.cs @@ -28,12 +28,14 @@ public class CSharpRepl : IKernel protected ScriptOptions ScriptOptions; protected StringBuilder _inputBuffer = new StringBuilder(); + private CodeSubmissionProcessors _processors; public IObservable KernelEvents => _channel; public CSharpRepl() { _channel = new Subject(); + _processors = new CodeSubmissionProcessors(); SetupScriptOptions(); } @@ -53,15 +55,17 @@ private void SetupScriptOptions() typeof(Task<>).GetTypeInfo().Assembly); } - public async Task SendAsync(SubmitCode submitCode, CancellationToken cancellationToken) + public async Task SendAsync(SubmitCode codeSubmission, CancellationToken cancellationToken) { - _channel.OnNext(new CodeSubmissionReceived(submitCode.Id, submitCode.Value)); + _channel.OnNext(new CodeSubmissionReceived(codeSubmission.Id, codeSubmission.Value)); - var (shouldExecute, code) = ComputeFullSubmission(submitCode.Value); + codeSubmission = await _processors.ProcessAsync(codeSubmission); + + var (shouldExecute, code) = ComputeFullSubmission(codeSubmission.Value); if (shouldExecute) { - _channel.OnNext(new CompleteCodeSubmissionReceived(submitCode.Id)); + _channel.OnNext(new CompleteCodeSubmissionReceived(codeSubmission.Id)); Exception exception = null; try { @@ -94,7 +98,7 @@ public async Task SendAsync(SubmitCode submitCode, CancellationToken cancellatio if (hasReturnValue) { - _channel.OnNext(new ValueProduced(submitCode.Id, _scriptState.ReturnValue)); + _channel.OnNext(new ValueProduced(codeSubmission.Id, _scriptState.ReturnValue)); } if (exception != null) { @@ -103,21 +107,21 @@ public async Task SendAsync(SubmitCode submitCode, CancellationToken cancellatio { var message = string.Join("\n", diagnostics.Select(d => d.GetMessage())); - _channel.OnNext(new CodeSubmissionEvaluationFailed(submitCode.Id, exception, message)); + _channel.OnNext(new CodeSubmissionEvaluationFailed(codeSubmission.Id, exception, message)); } else { - _channel.OnNext(new CodeSubmissionEvaluationFailed(submitCode.Id, exception)); + _channel.OnNext(new CodeSubmissionEvaluationFailed(codeSubmission.Id, exception)); } } else { - _channel.OnNext(new CodeSubmissionEvaluated(submitCode.Id)); + _channel.OnNext(new CodeSubmissionEvaluated(codeSubmission.Id)); } } else { - _channel.OnNext(new IncompleteCodeSubmissionReceived(submitCode.Id)); + _channel.OnNext(new IncompleteCodeSubmissionReceived(codeSubmission.Id)); } } From cb1402379efb3044737d960688c5ade83a11809f Mon Sep 17 00:00:00 2001 From: Diego Date: Fri, 5 Jul 2019 10:23:13 +0100 Subject: [PATCH 3/9] rename --- .../ExecuteRequestHandler.cs | 1 - .../Kernel/CodeSubmissionProcessorTests.cs | 12 ++++++------ WorkspaceServer/Kernel/CSharpRepl.cs | 7 ++++++- WorkspaceServer/Kernel/CodeSubmissionProcessors.cs | 2 +- 4 files changed, 13 insertions(+), 9 deletions(-) diff --git a/Microsoft.DotNet.Try.Jupyter/ExecuteRequestHandler.cs b/Microsoft.DotNet.Try.Jupyter/ExecuteRequestHandler.cs index bd461f797..aecdaee1d 100644 --- a/Microsoft.DotNet.Try.Jupyter/ExecuteRequestHandler.cs +++ b/Microsoft.DotNet.Try.Jupyter/ExecuteRequestHandler.cs @@ -5,7 +5,6 @@ using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; -using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.DotNet.Try.Jupyter.Protocol; diff --git a/WorkspaceServer.Tests/Kernel/CodeSubmissionProcessorTests.cs b/WorkspaceServer.Tests/Kernel/CodeSubmissionProcessorTests.cs index 91b7fa5bc..c335f43f3 100644 --- a/WorkspaceServer.Tests/Kernel/CodeSubmissionProcessorTests.cs +++ b/WorkspaceServer.Tests/Kernel/CodeSubmissionProcessorTests.cs @@ -22,7 +22,7 @@ public CodeSubmissionProcessorTests() [Fact] public void can_register_processorHandlers() { - var action = new Action(() => _processors.Register(new ReplaceAllProcessor())); + var action = new Action(() => _processors.Add(new ReplaceAllProcessor())); action.Should().NotThrow(); _processors.ProcessorsCount.Should().BeGreaterThan(0); } @@ -30,7 +30,7 @@ public void can_register_processorHandlers() [Fact] public async Task processing_code_submission_removes_processors() { - _processors.Register(new PassThroughAllProcessor()); + _processors.Add(new PassThroughProcessor()); var submission = new SubmitCode("#pass\nthis should remain"); submission = await _processors.ProcessAsync(submission); submission.Value.Should().NotContain("#pass") @@ -40,7 +40,7 @@ public async Task processing_code_submission_removes_processors() [Fact] public async Task processing_code_submission_leaves_unprocessed_directives() { - _processors.Register(new PassThroughAllProcessor()); + _processors.Add(new PassThroughProcessor()); var submission = new SubmitCode("#pass\n#region code\nthis should remain\n#endregion"); submission = await _processors.ProcessAsync(submission); submission.Value.Should().NotContain("#pass") @@ -51,7 +51,7 @@ public async Task processing_code_submission_leaves_unprocessed_directives() [Fact] public async Task processing_code_submission_respect_directive_order() { - _processors.Register(new AppendProcessor()); + _processors.Add(new AppendProcessor()); var submission = new SubmitCode("#append --value PART1\n#append --value PART2\n#region code\nthis should remain\n#endregion"); submission = await _processors.ProcessAsync(submission); submission.Value.Should().NotContain("#pass") @@ -73,9 +73,9 @@ public Task ProcessAsync(SubmitCode codeSubmission) } } - private class PassThroughAllProcessor : ICodeSubmissionProcessor + private class PassThroughProcessor : ICodeSubmissionProcessor { - public PassThroughAllProcessor() + public PassThroughProcessor() { Command = new Command("#pass", "pass all code"); } diff --git a/WorkspaceServer/Kernel/CSharpRepl.cs b/WorkspaceServer/Kernel/CSharpRepl.cs index 024fb4401..fe403337f 100644 --- a/WorkspaceServer/Kernel/CSharpRepl.cs +++ b/WorkspaceServer/Kernel/CSharpRepl.cs @@ -35,8 +35,13 @@ public class CSharpRepl : IKernel public CSharpRepl() { _channel = new Subject(); - _processors = new CodeSubmissionProcessors(); SetupScriptOptions(); + SetupProcessors(); + } + + private void SetupProcessors() + { + _processors = new CodeSubmissionProcessors(); } private void SetupScriptOptions() diff --git a/WorkspaceServer/Kernel/CodeSubmissionProcessors.cs b/WorkspaceServer/Kernel/CodeSubmissionProcessors.cs index 1b662f053..d1de52dcc 100644 --- a/WorkspaceServer/Kernel/CodeSubmissionProcessors.cs +++ b/WorkspaceServer/Kernel/CodeSubmissionProcessors.cs @@ -24,7 +24,7 @@ public CodeSubmissionProcessors() _parser = new CommandLineBuilder(_rootCommand).Build(); } - public void Register(ICodeSubmissionProcessor processor) + public void Add(ICodeSubmissionProcessor processor) { _processors[processor.Command] = processor; _rootCommand.AddCommand(processor.Command); From 050e31c2829eb4b3f1aeaaf389f621be1631d403 Mon Sep 17 00:00:00 2001 From: Diego Date: Fri, 5 Jul 2019 10:36:50 +0100 Subject: [PATCH 4/9] prototyping the emit for csharp repl --- WorkspaceServer/Kernel/CSharpRepl.cs | 1 + WorkspaceServer/Kernel/EmitProcessors.cs | 64 ++++++++++++++++++++++++ 2 files changed, 65 insertions(+) create mode 100644 WorkspaceServer/Kernel/EmitProcessors.cs diff --git a/WorkspaceServer/Kernel/CSharpRepl.cs b/WorkspaceServer/Kernel/CSharpRepl.cs index fe403337f..3ee999e3c 100644 --- a/WorkspaceServer/Kernel/CSharpRepl.cs +++ b/WorkspaceServer/Kernel/CSharpRepl.cs @@ -42,6 +42,7 @@ public CSharpRepl() private void SetupProcessors() { _processors = new CodeSubmissionProcessors(); + _processors.Add(new EmitProcessors(() => _scriptState)); } private void SetupScriptOptions() diff --git a/WorkspaceServer/Kernel/EmitProcessors.cs b/WorkspaceServer/Kernel/EmitProcessors.cs new file mode 100644 index 000000000..8cd13830f --- /dev/null +++ b/WorkspaceServer/Kernel/EmitProcessors.cs @@ -0,0 +1,64 @@ +// Copyright (c) .NET Foundation and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.CommandLine; +using System.CommandLine.Invocation; +using System.IO; +using System.Threading.Tasks; +using Microsoft.CodeAnalysis.Scripting; + +namespace WorkspaceServer.Kernel +{ + public class EmitProcessors : ICodeSubmissionProcessor + { + private readonly Func _getScriptState; + + public EmitProcessors(Func getScriptState) + { + _getScriptState = getScriptState; + Command = new Command("emit"); + var outputOption = new Option("--output") + { + Argument = new Argument() + }; + + Command.AddOption(outputOption); + Command.Handler = CommandHandler.Create((options) => + { + if (!options.Output.Exists) + { + options.Output.Create(); + } + + var state = getScriptState(); + + var codeFile = new FileInfo(Path.Combine(options.Output.FullName, "code.cs")); + + using (var destination = codeFile.OpenWrite()) + using (var textWriter = new StreamWriter(destination)) + { + var source = state?.Script?.Code ?? string.Empty; + textWriter.Write($"// generated code\n{source}"); + } + + }); + } + public Task ProcessAsync(SubmitCode codeSubmission) + { + return Task.FromResult(codeSubmission); + } + + public Command Command { get; } + + private class EmitProcessorsOptions + { + public DirectoryInfo Output { get; } + + public EmitProcessorsOptions(DirectoryInfo output) + { + Output = output; + } + } + } +} \ No newline at end of file From 8a440c8f5d222a8a96e22a8f800f0219051f8d8c Mon Sep 17 00:00:00 2001 From: Diego Date: Mon, 8 Jul 2019 23:02:56 +0100 Subject: [PATCH 5/9] explicit exception types for rendering and code submission processors --- .../ExecuteRequestHandler.cs | 116 ++++++++++++++---- .../Rendering/DefaultRenderer.cs | 7 -- .../CodeSubmissionProcessorException.cs | 17 +++ .../Kernel/CodeSubmissionProcessors.cs | 42 ++++--- WorkspaceServer/Kernel/RenderingEngine.cs | 21 +++- .../Kernel/RenderingEngineException.cs | 17 +++ 6 files changed, 166 insertions(+), 54 deletions(-) create mode 100644 WorkspaceServer/Kernel/CodeSubmissionProcessorException.cs create mode 100644 WorkspaceServer/Kernel/RenderingEngineException.cs diff --git a/Microsoft.DotNet.Try.Jupyter/ExecuteRequestHandler.cs b/Microsoft.DotNet.Try.Jupyter/ExecuteRequestHandler.cs index aecdaee1d..9d843429d 100644 --- a/Microsoft.DotNet.Try.Jupyter/ExecuteRequestHandler.cs +++ b/Microsoft.DotNet.Try.Jupyter/ExecuteRequestHandler.cs @@ -57,14 +57,53 @@ public async Task Handle(JupyterRequestContext context) { var executeRequest = context.GetRequestContent() ?? throw new InvalidOperationException($"Request Content must be a not null {typeof(ExecuteRequest).Name}"); context.RequestHandlerStatus.SetAsBusy(); - var command = new SubmitCode(executeRequest.Code); - command = await _processors.ProcessAsync(command); - var id = command.Id; - var transient = new Dictionary { { "display_id", id.ToString() } }; var executionCount = executeRequest.Silent ? _executionCount : Interlocked.Increment(ref _executionCount); - _openRequests[id] = new OpenRequest(context, executeRequest, executionCount, id, transient); - - await _kernel.SendAsync(command); + try + { + var command = new SubmitCode(executeRequest.Code); + command = await _processors.ProcessAsync(command); + var id = command.Id; + var transient = new Dictionary { { "display_id", id.ToString() } }; + + var openRequest = new OpenRequest(context, executeRequest, executionCount, id, transient); + _openRequests[id] = openRequest; + + await _kernel.SendAsync(command); + } + catch (Exception e) + { + var errorContent = new Error( + eName: "Unhandled Exception", + eValue: $"{e.Message}" + ); + + if (!executeRequest.Silent) + { + // send on io + var error = Message.Create( + errorContent, + context.Request.Header); + context.IoPubChannel.Send(error); + + // send on stderr + var stdErr = new StdErrStream(errorContent.EValue); + var stream = Message.Create( + stdErr, + context.Request.Header); + context.IoPubChannel.Send(stream); + } + + // reply Error + var executeReplyPayload = new ExecuteReplyError(errorContent, executionCount: executionCount); + + // send to server + var executeReply = Message.CreateResponse( + executeReplyPayload, + context.Request); + + context.ServerChannel.Send(executeReply); + context.RequestHandlerStatus.SetAsIdle(); + } } void IObserver.OnCompleted() @@ -102,7 +141,7 @@ void IObserver.OnNext(IKernelEvent value) private static void OnCodeSubmissionEvaluatedFailed(CodeSubmissionEvaluationFailed codeSubmissionEvaluationFailed, ConcurrentDictionary openRequests) { var openRequest = openRequests[codeSubmissionEvaluationFailed.ParentId]; - + var errorContent = new Error( eName: "Unhandled Exception", eValue: $"{codeSubmissionEvaluationFailed.Message}" @@ -141,25 +180,50 @@ private static void OnValueProduced(ValueProduced valueProduced, ConcurrentDictionary openRequests, RenderingEngine renderingEngine) { var openRequest = openRequests[valueProduced.ParentId]; - - var rendering = renderingEngine.Render(valueProduced.Value); - - - // executeResult data - var executeResultData = new ExecuteResult( - openRequest.ExecutionCount, - transient: openRequest.Transient, - data: new Dictionary { - { rendering.Mime, rendering.Content} - }); - - if (!openRequest.ExecuteRequest.Silent) + try { - // send on io - var executeResultMessage = Message.Create( - executeResultData, - openRequest.Context.Request.Header); - openRequest.Context.IoPubChannel.Send(executeResultMessage); + var rendering = renderingEngine.Render(valueProduced.Value); + + // executeResult data + var executeResultData = new ExecuteResult( + openRequest.ExecutionCount, + transient: openRequest.Transient, + data: new Dictionary + { + {rendering.Mime, rendering.Content} + }); + + if (!openRequest.ExecuteRequest.Silent) + { + // send on io + var executeResultMessage = Message.Create( + executeResultData, + openRequest.Context.Request.Header); + openRequest.Context.IoPubChannel.Send(executeResultMessage); + } + } + catch (Exception e) + { + var errorContent = new Error( + eName: "Unhandled Exception", + eValue: $"{e.Message}" + ); + + if (!openRequest.ExecuteRequest.Silent) + { + // send on io + var error = Message.Create( + errorContent, + openRequest.Context.Request.Header); + openRequest.Context.IoPubChannel.Send(error); + + // send on stderr + var stdErr = new StdErrStream(errorContent.EValue); + var stream = Message.Create( + stdErr, + openRequest.Context.Request.Header); + openRequest.Context.IoPubChannel.Send(stream); + } } } diff --git a/Microsoft.DotNet.Try.Jupyter/Rendering/DefaultRenderer.cs b/Microsoft.DotNet.Try.Jupyter/Rendering/DefaultRenderer.cs index f6aad9d9a..734207b9e 100644 --- a/Microsoft.DotNet.Try.Jupyter/Rendering/DefaultRenderer.cs +++ b/Microsoft.DotNet.Try.Jupyter/Rendering/DefaultRenderer.cs @@ -28,19 +28,12 @@ public IRendering Render(object source, IRenderingEngine engine = null) public IRendering RenderObject(object source, IRenderingEngine engine = null) { - try - { var rows = CreateRows(source, engine); var table = $@" {rows}
"; return new HtmlRendering(table); - } - catch (Exception) - { - return new PlainTextRendering(source?.ToString()); - } } private string CreateRows(object source, IRenderingEngine engine) diff --git a/WorkspaceServer/Kernel/CodeSubmissionProcessorException.cs b/WorkspaceServer/Kernel/CodeSubmissionProcessorException.cs new file mode 100644 index 000000000..e2594cabf --- /dev/null +++ b/WorkspaceServer/Kernel/CodeSubmissionProcessorException.cs @@ -0,0 +1,17 @@ +// Copyright (c) .NET Foundation and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; + +namespace WorkspaceServer.Kernel +{ + public class CodeSubmissionProcessorException : Exception + { + public SubmitCode CodeSubmission { get; } + + public CodeSubmissionProcessorException(Exception exception, SubmitCode codeSubmission) : base("CodeSubmission processing failed", exception) + { + CodeSubmission = codeSubmission; + } + } +} \ No newline at end of file diff --git a/WorkspaceServer/Kernel/CodeSubmissionProcessors.cs b/WorkspaceServer/Kernel/CodeSubmissionProcessors.cs index d1de52dcc..c0372ab09 100644 --- a/WorkspaceServer/Kernel/CodeSubmissionProcessors.cs +++ b/WorkspaceServer/Kernel/CodeSubmissionProcessors.cs @@ -33,25 +33,37 @@ public void Add(ICodeSubmissionProcessor processor) public async Task ProcessAsync(SubmitCode codeSubmission) { - var lines = new Queue( codeSubmission.Value.Split(new[] {"\r\n", "\n"}, StringSplitOptions.None)); - var unhandledLines = new Queue(); - while (lines.Count > 0) + try { - var currentLine = lines.Dequeue(); - var result = _parser.Parse(currentLine); - - if (result.CommandResult != null && _processors.TryGetValue(result.CommandResult.Command, out var processor)) - { - await _parser.InvokeAsync(result); - var newSubmission = await processor.ProcessAsync(new SubmitCode(string.Join("\n", lines), codeSubmission.Id, codeSubmission.ParentId)); - lines = new Queue(newSubmission.Value.Split(new[] { "\r\n", "\n" }, StringSplitOptions.None)); - } - else + var lines = new Queue(codeSubmission.Value.Split(new[] {"\r\n", "\n"}, + StringSplitOptions.None)); + var unhandledLines = new Queue(); + while (lines.Count > 0) { - unhandledLines.Enqueue(currentLine); + var currentLine = lines.Dequeue(); + var result = _parser.Parse(currentLine); + + if (result.CommandResult != null && + _processors.TryGetValue(result.CommandResult.Command, out var processor)) + { + await _parser.InvokeAsync(result); + var newSubmission = await processor.ProcessAsync(new SubmitCode(string.Join("\n", lines), + codeSubmission.Id, codeSubmission.ParentId)); + lines = new Queue(newSubmission.Value.Split(new[] {"\r\n", "\n"}, + StringSplitOptions.None)); + } + else + { + unhandledLines.Enqueue(currentLine); + } } + + return new SubmitCode(string.Join("\n", unhandledLines), codeSubmission.Id, codeSubmission.ParentId); + } + catch (Exception e) + { + throw new CodeSubmissionProcessorException(e, codeSubmission); } - return new SubmitCode(string.Join("\n", unhandledLines), codeSubmission.Id, codeSubmission.ParentId); } } } \ No newline at end of file diff --git a/WorkspaceServer/Kernel/RenderingEngine.cs b/WorkspaceServer/Kernel/RenderingEngine.cs index efefad776..025a1c5e8 100644 --- a/WorkspaceServer/Kernel/RenderingEngine.cs +++ b/WorkspaceServer/Kernel/RenderingEngine.cs @@ -12,21 +12,30 @@ public class RenderingEngine : IRenderingEngine { private readonly IRenderer _defaultRenderer; private readonly IRendering _nullRendering; - private readonly Dictionary _rendererRegistry = new Dictionary(); + private readonly Dictionary _rendererRegistry = new Dictionary(); public RenderingEngine(IRenderer defaultRenderer, IRendering nullRendering) { _defaultRenderer = defaultRenderer; _nullRendering = nullRendering; } + public IRendering Render(object source) { - if (source == null) + try { - return _nullRendering; + if (source == null) + { + return _nullRendering; + } + + var renderer = FindRenderer(source.GetType()); + return renderer.Render(source, this); + } + catch (Exception e) + { + throw new RenderingEngineException(e, source); } - var renderer = FindRenderer(source.GetType()); - return renderer.Render(source, this); } public IRenderer FindRenderer(Type sourceType) @@ -98,7 +107,7 @@ public IRenderer FindRenderer() public void RegisterRenderer(IRenderer renderer) { - RegisterRenderer(typeof(T), renderer); + RegisterRenderer(typeof(T), renderer); } public void RegisterRenderer(IRenderer renderer) diff --git a/WorkspaceServer/Kernel/RenderingEngineException.cs b/WorkspaceServer/Kernel/RenderingEngineException.cs new file mode 100644 index 000000000..8696f7171 --- /dev/null +++ b/WorkspaceServer/Kernel/RenderingEngineException.cs @@ -0,0 +1,17 @@ +// Copyright (c) .NET Foundation and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; + +namespace WorkspaceServer.Kernel +{ + public class RenderingEngineException : Exception + { + public object RenderingSource { get; } + + public RenderingEngineException(Exception exception, object renderingRenderingSource) : base("Rendering Failed",exception) + { + RenderingSource = renderingRenderingSource; + } + } +} \ No newline at end of file From 238d7eccf351c041b5aa41365ed12e7a0767aeae Mon Sep 17 00:00:00 2001 From: Diego Date: Tue, 9 Jul 2019 14:07:14 +0100 Subject: [PATCH 6/9] change semantic of add --- WorkspaceServer/Kernel/CodeSubmissionProcessors.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/WorkspaceServer/Kernel/CodeSubmissionProcessors.cs b/WorkspaceServer/Kernel/CodeSubmissionProcessors.cs index c0372ab09..b1904a37a 100644 --- a/WorkspaceServer/Kernel/CodeSubmissionProcessors.cs +++ b/WorkspaceServer/Kernel/CodeSubmissionProcessors.cs @@ -26,7 +26,7 @@ public CodeSubmissionProcessors() public void Add(ICodeSubmissionProcessor processor) { - _processors[processor.Command] = processor; + _processors.Add(processor.Command, processor); _rootCommand.AddCommand(processor.Command); _parser = new CommandLineBuilder(_rootCommand).Build(); } From 65d248511cd8bcd2c22a166e5ccb6cc9b0db53d7 Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 10 Jul 2019 00:56:17 +0100 Subject: [PATCH 7/9] middleware design --- .../Kernel/CodeSubmissionProcessorTests.cs | 12 +-- .../Kernel/KernelCommandPipelineTests.cs | 44 ++++++++++ WorkspaceServer/Kernel/CSharpRepl.cs | 81 ++++++++++++++++++- .../Kernel/CodeSubmissionProcessors.cs | 11 +-- WorkspaceServer/Kernel/SubmitCode.cs | 13 ++- 5 files changed, 141 insertions(+), 20 deletions(-) create mode 100644 WorkspaceServer.Tests/Kernel/KernelCommandPipelineTests.cs diff --git a/WorkspaceServer.Tests/Kernel/CodeSubmissionProcessorTests.cs b/WorkspaceServer.Tests/Kernel/CodeSubmissionProcessorTests.cs index c335f43f3..cb06eee01 100644 --- a/WorkspaceServer.Tests/Kernel/CodeSubmissionProcessorTests.cs +++ b/WorkspaceServer.Tests/Kernel/CodeSubmissionProcessorTests.cs @@ -33,7 +33,7 @@ public async Task processing_code_submission_removes_processors() _processors.Add(new PassThroughProcessor()); var submission = new SubmitCode("#pass\nthis should remain"); submission = await _processors.ProcessAsync(submission); - submission.Value.Should().NotContain("#pass") + submission.Code.Should().NotContain("#pass") .And.Contain("this should remain"); } @@ -43,7 +43,7 @@ public async Task processing_code_submission_leaves_unprocessed_directives() _processors.Add(new PassThroughProcessor()); var submission = new SubmitCode("#pass\n#region code\nthis should remain\n#endregion"); submission = await _processors.ProcessAsync(submission); - submission.Value.Should().NotContain("#pass") + submission.Code.Should().NotContain("#pass") .And.Match("*#region code\nthis should remain\n#endregion*"); } @@ -54,7 +54,7 @@ public async Task processing_code_submission_respect_directive_order() _processors.Add(new AppendProcessor()); var submission = new SubmitCode("#append --value PART1\n#append --value PART2\n#region code\nthis should remain\n#endregion"); submission = await _processors.ProcessAsync(submission); - submission.Value.Should().NotContain("#pass") + submission.Code.Should().NotContain("#pass") .And.Match("*#region code\nthis should remain\n#endregion\nPART1\nPART2*"); } @@ -69,7 +69,8 @@ public ReplaceAllProcessor() public Task ProcessAsync(SubmitCode codeSubmission) { - return Task.FromResult(new SubmitCode(string.Empty, codeSubmission.Id, codeSubmission.ParentId)); + codeSubmission.Code = string.Empty; + return Task.FromResult(codeSubmission); } } @@ -122,7 +123,8 @@ public AppendProcessor() public Task ProcessAsync(SubmitCode codeSubmission) { - return Task.FromResult(new SubmitCode(codeSubmission.Value + $"\n{_valueToAppend}" , codeSubmission.Id, codeSubmission.ParentId)); + codeSubmission.Code = codeSubmission.Code + $"\n{_valueToAppend}"; + return Task.FromResult(codeSubmission); } } } diff --git a/WorkspaceServer.Tests/Kernel/KernelCommandPipelineTests.cs b/WorkspaceServer.Tests/Kernel/KernelCommandPipelineTests.cs new file mode 100644 index 000000000..48e3a2ce4 --- /dev/null +++ b/WorkspaceServer.Tests/Kernel/KernelCommandPipelineTests.cs @@ -0,0 +1,44 @@ +// Copyright (c) .NET Foundation and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using FluentAssertions; +using WorkspaceServer.Kernel; +using Xunit; + +namespace WorkspaceServer.Tests.Kernel +{ + public class KernelCommandPipelineTests + { + [Fact] + public void When_SubmitCode_command_adds_packages_to_fsharp_kernel_then_the_submission_is_passed_to_fsi() + { + throw new NotImplementedException(); + } + + [Fact] + public void When_SubmitCode_command_adds_packages_to_fsharp_kernel_then_PackageAdded_event_is_raised() + { + throw new NotImplementedException(); + } + + [Fact] + public async Task When_SubmitCode_command_adds_packages_to_csharp_kernel_then_the_submission_is_not_passed_to_csharpScript() + { + var kernel = new CompositeKernel(new[] { new CSharpRepl() }); + + var command = new SubmitCode("#r \"nuget:PocketLogger, 1.2.3\" \nvar a = new List();", "csharp"); + await kernel.SendAsync(command); + command.Code.Should().Be("var a = new List();"); + } + + [Fact] + public void When_SubmitCode_command_adds_packages_to_csharp_kernel_then_PackageAdded_event_is_raised() + { + throw new NotImplementedException(); + } + } + +} \ No newline at end of file diff --git a/WorkspaceServer/Kernel/CSharpRepl.cs b/WorkspaceServer/Kernel/CSharpRepl.cs index 3ee999e3c..6b3ba53f2 100644 --- a/WorkspaceServer/Kernel/CSharpRepl.cs +++ b/WorkspaceServer/Kernel/CSharpRepl.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Reactive.Linq; using System.Reactive.Subjects; using System.Reflection; using System.Text; @@ -13,9 +14,85 @@ using Microsoft.CodeAnalysis.CSharp; using Microsoft.CodeAnalysis.CSharp.Scripting; using Microsoft.CodeAnalysis.Scripting; +using CancellationToken = System.Threading.CancellationToken; namespace WorkspaceServer.Kernel { + public delegate Task KernelCommandPipelineMiddleware( + InvocationContext context, + Func next); + + + public class InvocationContext + { + public IKernelCommand Command { get; } + public CancellationToken CancellationToken { get; } + + public InvocationContext(IKernelCommand command, CancellationToken cancellationToken) + { + Command = command; + CancellationToken = cancellationToken; + } + } + + + public class KernelCommandPipeline { + public Task InvokeAsync(InvocationContext context) + { + throw new NotImplementedException(); + } + + public void AddMiddleware(KernelCommandPipelineMiddleware middleware) + { + throw new NotImplementedException(); + } + } + public abstract class KernelBase: IKernel + { + protected KernelCommandPipeline Pipeline { get; } + + private readonly Subject _channel = new Subject(); + public IObservable KernelEvents => _channel; + + protected KernelBase() + { + Pipeline = new KernelCommandPipeline(); + } + public Task SendAsync(IKernelCommand command, CancellationToken cancellationToken) + { + return Pipeline.InvokeAsync(new InvocationContext(command, cancellationToken)); + } + + public Task SendAsync(IKernelCommand command) + { + return SendAsync(command, CancellationToken.None); + } + + protected void PublishEvent(IKernelEvent kernelEvent) + { + _channel.OnNext(kernelEvent); + } + } + public class CompositeKernel : KernelBase + { + public CompositeKernel(IReadOnlyList kernels) + { + kernels = kernels ?? throw new ArgumentNullException(nameof(kernels)); + kernels.Select(k => k.KernelEvents).Merge().Subscribe(PublishEvent); + Pipeline.AddMiddleware(async (context, next) => { + foreach (var kernel in kernels.OfType()) + { + await kernel.Pipeline.InvokeAsync(context); + if (context.Result != null) + { + return; + } + } + }); + } + + } + public class CSharpRepl : IKernel { private static readonly MethodInfo _hasReturnValueMethod = typeof(Script) @@ -63,11 +140,11 @@ private void SetupScriptOptions() public async Task SendAsync(SubmitCode codeSubmission, CancellationToken cancellationToken) { - _channel.OnNext(new CodeSubmissionReceived(codeSubmission.Id, codeSubmission.Value)); + _channel.OnNext(new CodeSubmissionReceived(codeSubmission.Id, codeSubmission.Code)); codeSubmission = await _processors.ProcessAsync(codeSubmission); - var (shouldExecute, code) = ComputeFullSubmission(codeSubmission.Value); + var (shouldExecute, code) = ComputeFullSubmission(codeSubmission.Code); if (shouldExecute) { diff --git a/WorkspaceServer/Kernel/CodeSubmissionProcessors.cs b/WorkspaceServer/Kernel/CodeSubmissionProcessors.cs index b1904a37a..da6276e4a 100644 --- a/WorkspaceServer/Kernel/CodeSubmissionProcessors.cs +++ b/WorkspaceServer/Kernel/CodeSubmissionProcessors.cs @@ -35,7 +35,7 @@ public async Task ProcessAsync(SubmitCode codeSubmission) { try { - var lines = new Queue(codeSubmission.Value.Split(new[] {"\r\n", "\n"}, + var lines = new Queue(codeSubmission.Code.Split(new[] {"\r\n", "\n"}, StringSplitOptions.None)); var unhandledLines = new Queue(); while (lines.Count > 0) @@ -47,9 +47,9 @@ public async Task ProcessAsync(SubmitCode codeSubmission) _processors.TryGetValue(result.CommandResult.Command, out var processor)) { await _parser.InvokeAsync(result); - var newSubmission = await processor.ProcessAsync(new SubmitCode(string.Join("\n", lines), - codeSubmission.Id, codeSubmission.ParentId)); - lines = new Queue(newSubmission.Value.Split(new[] {"\r\n", "\n"}, + codeSubmission.Code = string.Join("\n", lines); + var newSubmission = await processor.ProcessAsync(codeSubmission); + lines = new Queue(newSubmission.Code.Split(new[] {"\r\n", "\n"}, StringSplitOptions.None)); } else @@ -58,7 +58,8 @@ public async Task ProcessAsync(SubmitCode codeSubmission) } } - return new SubmitCode(string.Join("\n", unhandledLines), codeSubmission.Id, codeSubmission.ParentId); + codeSubmission.Code = string.Join("\n", unhandledLines); + return codeSubmission; } catch (Exception e) { diff --git a/WorkspaceServer/Kernel/SubmitCode.cs b/WorkspaceServer/Kernel/SubmitCode.cs index 5a17e4974..186d219e8 100644 --- a/WorkspaceServer/Kernel/SubmitCode.cs +++ b/WorkspaceServer/Kernel/SubmitCode.cs @@ -7,16 +7,13 @@ namespace WorkspaceServer.Kernel { public class SubmitCode : KernelCommandBase { - public string Value { get; } + public string Code { get; set; } + public string Language { get; set; } - public SubmitCode(string value) + public SubmitCode(string code, string language = null) { - Value = value ?? throw new ArgumentNullException(nameof(value)); - } - - public SubmitCode(string value, Guid id, Guid parentId) : base(id, parentId) - { - Value = value ?? throw new ArgumentNullException(nameof(value)); + Code = code ?? throw new ArgumentNullException(nameof(code)); + Language = language; } } } \ No newline at end of file From f5afa1c34375b43d36ff8a218c146f2ab9418965 Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 10 Jul 2019 20:32:57 +0100 Subject: [PATCH 8/9] use middleware and handler --- .../Kernel/CSharpReplTests.cs | 26 ++-- .../Kernel/KernelCommandPipelineTests.cs | 1 - WorkspaceServer/Kernel/CSharpRepl.cs | 118 +++++++++++------- 3 files changed, 86 insertions(+), 59 deletions(-) diff --git a/WorkspaceServer.Tests/Kernel/CSharpReplTests.cs b/WorkspaceServer.Tests/Kernel/CSharpReplTests.cs index ace30d0ed..4552e42c0 100644 --- a/WorkspaceServer.Tests/Kernel/CSharpReplTests.cs +++ b/WorkspaceServer.Tests/Kernel/CSharpReplTests.cs @@ -31,7 +31,7 @@ public async Task it_returns_the_result_of_a_non_null_expression() { var repl = await CreateKernelAsync(); - await repl.SendAsync(new SubmitCode("123")); + await repl.SendAsync(new SubmitCode("123", "csharp")); KernelEvents.OfType() .Last() @@ -48,8 +48,8 @@ public async Task it_returns_exceptions_thrown_in_user_code() { var repl = await CreateKernelAsync(); - await repl.SendAsync(new SubmitCode("using System;")); - await repl.SendAsync(new SubmitCode("throw new NotImplementedException();")); + await repl.SendAsync(new SubmitCode("using System;", "csharp")); + await repl.SendAsync(new SubmitCode("throw new NotImplementedException();", "csharp")); KernelEvents.Last() .Should() @@ -65,8 +65,8 @@ public async Task it_returns_diagnostics() { var repl = await CreateKernelAsync(); - await repl.SendAsync(new SubmitCode("using System;")); - await repl.SendAsync(new SubmitCode("aaaadd")); + await repl.SendAsync(new SubmitCode("using System;", "csharp")); + await repl.SendAsync(new SubmitCode("aaaadd", "csharp")); KernelEvents.Last() .Should() @@ -82,9 +82,9 @@ public async Task it_notifies_when_submission_is_complete() { var repl = await CreateKernelAsync(); - await repl.SendAsync(new SubmitCode("var a =")); + await repl.SendAsync(new SubmitCode("var a =", "csharp")); - await repl.SendAsync(new SubmitCode("12;")); + await repl.SendAsync(new SubmitCode("12;", "csharp")); KernelEvents.Should() .NotContain(e => e is ValueProduced); @@ -99,7 +99,7 @@ public async Task it_notifies_when_submission_is_incomplete() { var repl = await CreateKernelAsync(); - await repl.SendAsync(new SubmitCode("var a =")); + await repl.SendAsync(new SubmitCode("var a =", "csharp")); KernelEvents.Should() .NotContain(e => e is ValueProduced); @@ -114,7 +114,7 @@ public async Task it_returns_the_result_of_a_null_expression() { var repl = await CreateKernelAsync(); - await repl.SendAsync(new SubmitCode("null")); + await repl.SendAsync(new SubmitCode("null", "csharp")); KernelEvents.OfType() .Last() @@ -131,7 +131,7 @@ public async Task it_does_not_return_a_result_for_a_statement() { var repl = await CreateKernelAsync(); - await repl.SendAsync(new SubmitCode("var x = 1;")); + await repl.SendAsync(new SubmitCode("var x = 1;", "csharp")); KernelEvents .Should() @@ -143,9 +143,9 @@ public async Task it_aggregates_multiple_submissions() { var repl = await CreateKernelAsync(); - await repl.SendAsync(new SubmitCode("var x = new List{1,2};")); - await repl.SendAsync(new SubmitCode("x.Add(3);")); - await repl.SendAsync(new SubmitCode("x.Max()")); + await repl.SendAsync(new SubmitCode("var x = new List{1,2};", "csharp")); + await repl.SendAsync(new SubmitCode("x.Add(3);", "csharp")); + await repl.SendAsync(new SubmitCode("x.Max()", "csharp")); KernelEvents.OfType() .Last() diff --git a/WorkspaceServer.Tests/Kernel/KernelCommandPipelineTests.cs b/WorkspaceServer.Tests/Kernel/KernelCommandPipelineTests.cs index 48e3a2ce4..18bf6d3ed 100644 --- a/WorkspaceServer.Tests/Kernel/KernelCommandPipelineTests.cs +++ b/WorkspaceServer.Tests/Kernel/KernelCommandPipelineTests.cs @@ -2,7 +2,6 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; -using System.Collections.Generic; using System.Threading.Tasks; using FluentAssertions; using WorkspaceServer.Kernel; diff --git a/WorkspaceServer/Kernel/CSharpRepl.cs b/WorkspaceServer/Kernel/CSharpRepl.cs index 6b3ba53f2..2707ac06c 100644 --- a/WorkspaceServer/Kernel/CSharpRepl.cs +++ b/WorkspaceServer/Kernel/CSharpRepl.cs @@ -8,7 +8,6 @@ using System.Reactive.Subjects; using System.Reflection; using System.Text; -using System.Threading; using System.Threading.Tasks; using Microsoft.CodeAnalysis; using Microsoft.CodeAnalysis.CSharp; @@ -25,6 +24,7 @@ public delegate Task KernelCommandPipelineMiddleware( public class InvocationContext { + public object Result { get; set; } public IKernelCommand Command { get; } public CancellationToken CancellationToken { get; } @@ -37,26 +37,52 @@ public InvocationContext(IKernelCommand command, CancellationToken cancellationT public class KernelCommandPipeline { - public Task InvokeAsync(InvocationContext context) + private readonly KernelBase _kernel; + + private readonly List _invocations = new List(); + + public KernelCommandPipeline(KernelBase kernel) + { + _kernel = kernel ?? throw new ArgumentNullException(nameof(kernel)); + } + + public async Task InvokeAsync(InvocationContext context) + { + var invocationChain = BuildInvocationChain(); + + await invocationChain(context, invocationContext => Task.CompletedTask); + } + + private KernelCommandPipelineMiddleware BuildInvocationChain() { - throw new NotImplementedException(); + var invocations = new List(_invocations); + + invocations.Add(async (invocationContext, _) => + { + await _kernel.HandleAsync(invocationContext); + }); + + return invocations.Aggregate( + (function, continuation) => + (ctx, next) => + function(ctx, c => continuation(c, next))); } public void AddMiddleware(KernelCommandPipelineMiddleware middleware) { - throw new NotImplementedException(); + _invocations.Add(middleware); } } public abstract class KernelBase: IKernel { - protected KernelCommandPipeline Pipeline { get; } + public KernelCommandPipeline Pipeline { get; } private readonly Subject _channel = new Subject(); public IObservable KernelEvents => _channel; protected KernelBase() { - Pipeline = new KernelCommandPipeline(); + Pipeline = new KernelCommandPipeline(this); } public Task SendAsync(IKernelCommand command, CancellationToken cancellationToken) { @@ -72,54 +98,55 @@ protected void PublishEvent(IKernelEvent kernelEvent) { _channel.OnNext(kernelEvent); } + + protected internal abstract Task HandleAsync(InvocationContext context); } public class CompositeKernel : KernelBase { + private readonly IReadOnlyList _kernels; + public CompositeKernel(IReadOnlyList kernels) { - kernels = kernels ?? throw new ArgumentNullException(nameof(kernels)); + _kernels = kernels ?? throw new ArgumentNullException(nameof(kernels)); kernels.Select(k => k.KernelEvents).Merge().Subscribe(PublishEvent); - Pipeline.AddMiddleware(async (context, next) => { - foreach (var kernel in kernels.OfType()) + } + + protected internal override async Task HandleAsync(InvocationContext context) + { + foreach (var kernel in _kernels.OfType()) + { + await kernel.Pipeline.InvokeAsync(context); + if (context.Result != null) { - await kernel.Pipeline.InvokeAsync(context); - if (context.Result != null) - { - return; - } + return; } - }); + } } - } - public class CSharpRepl : IKernel + public class CSharpRepl : KernelBase { private static readonly MethodInfo _hasReturnValueMethod = typeof(Script) .GetMethod("HasReturnValue", BindingFlags.Instance | BindingFlags.NonPublic); - private readonly Subject _channel; private ScriptState _scriptState; protected CSharpParseOptions ParseOptions = new CSharpParseOptions(LanguageVersion.Latest, kind: SourceCodeKind.Script); protected ScriptOptions ScriptOptions; - protected StringBuilder _inputBuffer = new StringBuilder(); - private CodeSubmissionProcessors _processors; + private StringBuilder _inputBuffer = new StringBuilder(); - public IObservable KernelEvents => _channel; public CSharpRepl() { - _channel = new Subject(); SetupScriptOptions(); - SetupProcessors(); + SetupPipeline(); } - private void SetupProcessors() + private void SetupPipeline() { - _processors = new CodeSubmissionProcessors(); - _processors.Add(new EmitProcessors(() => _scriptState)); + + } private void SetupScriptOptions() @@ -138,17 +165,15 @@ private void SetupScriptOptions() typeof(Task<>).GetTypeInfo().Assembly); } - public async Task SendAsync(SubmitCode codeSubmission, CancellationToken cancellationToken) + private async Task HandleCodeSubmission(SubmitCode codeSubmission, CancellationToken cancellationToken) { - _channel.OnNext(new CodeSubmissionReceived(codeSubmission.Id, codeSubmission.Code)); - - codeSubmission = await _processors.ProcessAsync(codeSubmission); + PublishEvent(new CodeSubmissionReceived(codeSubmission.Id, codeSubmission.Code)); var (shouldExecute, code) = ComputeFullSubmission(codeSubmission.Code); if (shouldExecute) { - _channel.OnNext(new CompleteCodeSubmissionReceived(codeSubmission.Id)); + PublishEvent(new CompleteCodeSubmissionReceived(codeSubmission.Id)); Exception exception = null; try { @@ -181,7 +206,7 @@ public async Task SendAsync(SubmitCode codeSubmission, CancellationToken cancell if (hasReturnValue) { - _channel.OnNext(new ValueProduced(codeSubmission.Id, _scriptState.ReturnValue)); + PublishEvent(new ValueProduced(codeSubmission.Id, _scriptState.ReturnValue)); } if (exception != null) { @@ -190,21 +215,21 @@ public async Task SendAsync(SubmitCode codeSubmission, CancellationToken cancell { var message = string.Join("\n", diagnostics.Select(d => d.GetMessage())); - _channel.OnNext(new CodeSubmissionEvaluationFailed(codeSubmission.Id, exception, message)); + PublishEvent(new CodeSubmissionEvaluationFailed(codeSubmission.Id, exception, message)); } else { - _channel.OnNext(new CodeSubmissionEvaluationFailed(codeSubmission.Id, exception)); + PublishEvent(new CodeSubmissionEvaluationFailed(codeSubmission.Id, exception)); } } else { - _channel.OnNext(new CodeSubmissionEvaluated(codeSubmission.Id)); + PublishEvent(new CodeSubmissionEvaluated(codeSubmission.Id)); } } else { - _channel.OnNext(new IncompleteCodeSubmissionReceived(codeSubmission.Id)); + PublishEvent(new IncompleteCodeSubmissionReceived(codeSubmission.Id)); } } @@ -224,20 +249,23 @@ public async Task SendAsync(SubmitCode codeSubmission, CancellationToken cancell return (true, code); } - public Task SendAsync(IKernelCommand command) - { - return SendAsync(command, CancellationToken.None); - } - - public Task SendAsync(IKernelCommand command, CancellationToken cancellationToken) + protected internal override Task HandleAsync(InvocationContext context) { - switch (command) + switch (context.Command) { case SubmitCode submitCode: - return SendAsync(submitCode, cancellationToken); + if (submitCode.Language == "csharp") + { + context.Result = new object(); + return HandleCodeSubmission(submitCode, context.CancellationToken); + } + else + { + return Task.CompletedTask; + } default: - throw new KernelCommandNotSupportedException(command, this); + return Task.CompletedTask; } } } From 383421c49cfc5d19617d117daec34b8ea631ebb7 Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 10 Jul 2019 23:18:42 +0100 Subject: [PATCH 9/9] kernel events and kernel result events --- .../ExecuteRequestHandlerTests.cs | 56 +------ .../KernelSimulator.cs | 39 ----- .../ExecuteRequestHandler.cs | 43 +++--- .../Protocol/ExecuteRequest.cs | 1 - .../Kernel/KernelCommandPipelineTests.cs | 8 +- WorkspaceServer/Kernel/CSharpRepl.cs | 142 +++--------------- WorkspaceServer/Kernel/CompositeKernel.cs | 35 +++++ WorkspaceServer/Kernel/IKernel.cs | 5 +- .../Kernel/IKernelCommandResult.cs | 12 ++ WorkspaceServer/Kernel/KernelBase.cs | 60 ++++++++ .../Kernel/KernelCommandContext.cs | 20 +++ .../Kernel/KernelCommandPipeline.cs | 48 ++++++ .../Kernel/KernelCommandPipelineMiddleware.cs | 12 ++ WorkspaceServer/Kernel/KernelCommandResult.cs | 43 ++++++ WorkspaceServer/Kernel/KernelExtensions.cs | 16 ++ 15 files changed, 305 insertions(+), 235 deletions(-) delete mode 100644 Microsoft.DotNet.Try.Jupyter.Tests/KernelSimulator.cs create mode 100644 WorkspaceServer/Kernel/CompositeKernel.cs create mode 100644 WorkspaceServer/Kernel/IKernelCommandResult.cs create mode 100644 WorkspaceServer/Kernel/KernelBase.cs create mode 100644 WorkspaceServer/Kernel/KernelCommandContext.cs create mode 100644 WorkspaceServer/Kernel/KernelCommandPipeline.cs create mode 100644 WorkspaceServer/Kernel/KernelCommandPipelineMiddleware.cs create mode 100644 WorkspaceServer/Kernel/KernelCommandResult.cs create mode 100644 WorkspaceServer/Kernel/KernelExtensions.cs diff --git a/Microsoft.DotNet.Try.Jupyter.Tests/ExecuteRequestHandlerTests.cs b/Microsoft.DotNet.Try.Jupyter.Tests/ExecuteRequestHandlerTests.cs index da19d3c28..7601b448a 100644 --- a/Microsoft.DotNet.Try.Jupyter.Tests/ExecuteRequestHandlerTests.cs +++ b/Microsoft.DotNet.Try.Jupyter.Tests/ExecuteRequestHandlerTests.cs @@ -31,7 +31,8 @@ public ExecuteRequestHandlerTests() [Fact] public void cannot_handle_requests_that_are_not_executeRequest() { - var handler = new ExecuteRequestHandler(new KernelSimulator()); + var kernel = new CSharpRepl(); + var handler = new ExecuteRequestHandler(kernel); var request = Message.Create(new DisplayData(), null); Func messageHandling = () => handler.Handle(new JupyterRequestContext(_serverChannel, _ioPubChannel, request, _kernelStatus)); messageHandling.Should().ThrowExactly(); @@ -40,17 +41,7 @@ public void cannot_handle_requests_that_are_not_executeRequest() [Fact] public async Task handles_executeRequest() { - var kernel = new KernelSimulator((command, channel) => - { - switch (command) - { - case SubmitCode _: - return Task.CompletedTask; - default: - throw new NotImplementedException(); - } - }); - + var kernel = new CSharpRepl(); var handler = new ExecuteRequestHandler(kernel); var request = Message.Create(new ExecuteRequest("var a =12;"), null); await handler.Handle(new JupyterRequestContext(_serverChannel, _ioPubChannel, request, _kernelStatus)); @@ -59,17 +50,7 @@ public async Task handles_executeRequest() [Fact] public async Task sends_executeReply_message_on_codeSubmissionEvaluated() { - var kernel = new KernelSimulator((command, channel) => - { - switch (command) - { - case SubmitCode codeSubmission: - channel.OnNext(new CodeSubmissionEvaluated(codeSubmission.Id)); - return Task.CompletedTask; - default: - throw new NotImplementedException(); - } - }); + var kernel = new CSharpRepl(); var handler = new ExecuteRequestHandler(kernel); var request = Message.Create(new ExecuteRequest("var a =12;"), null); @@ -83,20 +64,10 @@ public async Task sends_executeReply_message_on_codeSubmissionEvaluated() [Fact] public async Task sends_executeReply_with_error_message_on_codeSubmissionEvaluated() { - var kernel = new KernelSimulator((command, channel) => - { - switch (command) - { - case SubmitCode codeSubmission: - channel.OnNext(new CodeSubmissionEvaluationFailed(codeSubmission.Id, new InvalidOperationException("failed"))); - return Task.CompletedTask; - default: - throw new NotImplementedException(); - } - }); + var kernel = new CSharpRepl(); var handler = new ExecuteRequestHandler(kernel); - var request = Message.Create(new ExecuteRequest("var a =12;"), null); + var request = Message.Create(new ExecuteRequest("asdes"), null); await handler.Handle(new JupyterRequestContext(_serverChannel, _ioPubChannel, request, _kernelStatus)); _serverRecordingSocket.DecodedMessages @@ -114,21 +85,10 @@ public async Task sends_executeReply_with_error_message_on_codeSubmissionEvaluat [Fact] public async Task sends_executeResult_message_on_valueProduced() { - var kernel = new KernelSimulator((command, channel) => - { - switch (command) - { - case SubmitCode codeSubmission: - channel.OnNext(new ValueProduced(codeSubmission.Id, new[] { 1, 2, 3 })); - channel.OnNext(new CodeSubmissionEvaluated(codeSubmission.Id)); - return Task.CompletedTask; - default: - throw new NotImplementedException(); - } - }); + var kernel = new CSharpRepl(); var handler = new ExecuteRequestHandler(kernel); - var request = Message.Create(new ExecuteRequest("var a =12;"), null); + var request = Message.Create(new ExecuteRequest("2+2"), null); await handler.Handle(new JupyterRequestContext(_serverChannel, _ioPubChannel, request, _kernelStatus)); _serverRecordingSocket.DecodedMessages diff --git a/Microsoft.DotNet.Try.Jupyter.Tests/KernelSimulator.cs b/Microsoft.DotNet.Try.Jupyter.Tests/KernelSimulator.cs deleted file mode 100644 index 41a239d27..000000000 --- a/Microsoft.DotNet.Try.Jupyter.Tests/KernelSimulator.cs +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright (c) .NET Foundation and contributors. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -using System; -using System.Reactive.Subjects; -using System.Threading; -using System.Threading.Tasks; -using WorkspaceServer.Kernel; - -namespace Microsoft.DotNet.Try.Jupyter.Tests -{ - class KernelSimulator : IKernel - { - private readonly Func, Task> _commandHandler; - private readonly Subject _eventsStream; - public IObservable KernelEvents { get; } - - public KernelSimulator(Func, Task> commandHandler = null) - { - _commandHandler = commandHandler; - _eventsStream = new Subject(); - KernelEvents = _eventsStream; - } - public Task SendAsync(IKernelCommand command, CancellationToken cancellationToken) - { - if(_commandHandler == null) - { - throw new NotImplementedException(); - } - - return _commandHandler(command, _eventsStream); - } - - public Task SendAsync(IKernelCommand command) - { - return SendAsync(command, CancellationToken.None); - } - } -} \ No newline at end of file diff --git a/Microsoft.DotNet.Try.Jupyter/ExecuteRequestHandler.cs b/Microsoft.DotNet.Try.Jupyter/ExecuteRequestHandler.cs index 9d843429d..975ed862d 100644 --- a/Microsoft.DotNet.Try.Jupyter/ExecuteRequestHandler.cs +++ b/Microsoft.DotNet.Try.Jupyter/ExecuteRequestHandler.cs @@ -5,6 +5,7 @@ using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Reactive.Disposables; using System.Threading; using System.Threading.Tasks; using Microsoft.DotNet.Try.Jupyter.Protocol; @@ -13,16 +14,18 @@ namespace Microsoft.DotNet.Try.Jupyter { - public class ExecuteRequestHandler : IObserver + public class ExecuteRequestHandler : IDisposable { private readonly IKernel _kernel; private readonly RenderingEngine _renderingEngine; private readonly ConcurrentDictionary _openRequests = new ConcurrentDictionary(); private int _executionCount; private readonly CodeSubmissionProcessors _processors; + private readonly CompositeDisposable _disposables = new CompositeDisposable(); - private class OpenRequest + private class OpenRequest : IDisposable { + private readonly CompositeDisposable _disposables = new CompositeDisposable(); public Guid Id { get; } public Dictionary Transient { get; } public JupyterRequestContext Context { get; } @@ -38,6 +41,16 @@ public OpenRequest(JupyterRequestContext context, ExecuteRequest executeRequest, Id = id; Transient = transient; } + + public void AddDisposable(IDisposable disposable) + { + _disposables.Add(disposable); + } + + public void Dispose() + { + _disposables.Dispose(); + } } public ExecuteRequestHandler(IKernel kernel) @@ -49,8 +62,6 @@ public ExecuteRequestHandler(IKernel kernel) _renderingEngine.RegisterRenderer(typeof(IList), new ListRenderer()); _renderingEngine.RegisterRenderer(typeof(IEnumerable), new SequenceRenderer()); _processors = new CodeSubmissionProcessors(); - - _kernel.KernelEvents.Subscribe(this); } public async Task Handle(JupyterRequestContext context) @@ -58,9 +69,10 @@ public async Task Handle(JupyterRequestContext context) var executeRequest = context.GetRequestContent() ?? throw new InvalidOperationException($"Request Content must be a not null {typeof(ExecuteRequest).Name}"); context.RequestHandlerStatus.SetAsBusy(); var executionCount = executeRequest.Silent ? _executionCount : Interlocked.Increment(ref _executionCount); + try { - var command = new SubmitCode(executeRequest.Code); + var command = new SubmitCode(executeRequest.Code, "csharp"); command = await _processors.ProcessAsync(command); var id = command.Id; var transient = new Dictionary { { "display_id", id.ToString() } }; @@ -68,7 +80,8 @@ public async Task Handle(JupyterRequestContext context) var openRequest = new OpenRequest(context, executeRequest, executionCount, id, transient); _openRequests[id] = openRequest; - await _kernel.SendAsync(command); + var kernelResult = await _kernel.SendAsync(command); + openRequest.AddDisposable(kernelResult.Events.Subscribe(OnKernelResultEvent)); } catch (Exception e) { @@ -105,18 +118,9 @@ public async Task Handle(JupyterRequestContext context) context.RequestHandlerStatus.SetAsIdle(); } } + - void IObserver.OnCompleted() - { - throw new NotImplementedException(); - } - - void IObserver.OnError(Exception error) - { - throw new NotImplementedException(); - } - - void IObserver.OnNext(IKernelEvent value) + void OnKernelResultEvent(IKernelEvent value) { switch (value) { @@ -242,5 +246,10 @@ private static void OnCodeSubmissionEvaluated(CodeSubmissionEvaluated codeSubmis openRequest.Context.ServerChannel.Send(executeReply); openRequest.Context.RequestHandlerStatus.SetAsIdle(); } + + public void Dispose() + { + _disposables.Dispose(); + } } } \ No newline at end of file diff --git a/Microsoft.DotNet.Try.Jupyter/Protocol/ExecuteRequest.cs b/Microsoft.DotNet.Try.Jupyter/Protocol/ExecuteRequest.cs index 0f7067d82..1f02b9796 100644 --- a/Microsoft.DotNet.Try.Jupyter/Protocol/ExecuteRequest.cs +++ b/Microsoft.DotNet.Try.Jupyter/Protocol/ExecuteRequest.cs @@ -1,7 +1,6 @@ // Copyright (c) .NET Foundation and contributors. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -using System; using System.Collections.Generic; using Newtonsoft.Json; diff --git a/WorkspaceServer.Tests/Kernel/KernelCommandPipelineTests.cs b/WorkspaceServer.Tests/Kernel/KernelCommandPipelineTests.cs index 18bf6d3ed..bebb957e2 100644 --- a/WorkspaceServer.Tests/Kernel/KernelCommandPipelineTests.cs +++ b/WorkspaceServer.Tests/Kernel/KernelCommandPipelineTests.cs @@ -11,19 +11,19 @@ namespace WorkspaceServer.Tests.Kernel { public class KernelCommandPipelineTests { - [Fact] + [Fact(Skip = "WIP")] public void When_SubmitCode_command_adds_packages_to_fsharp_kernel_then_the_submission_is_passed_to_fsi() { throw new NotImplementedException(); } - [Fact] + [Fact(Skip = "WIP")] public void When_SubmitCode_command_adds_packages_to_fsharp_kernel_then_PackageAdded_event_is_raised() { throw new NotImplementedException(); } - [Fact] + [Fact(Skip = "WIP")] public async Task When_SubmitCode_command_adds_packages_to_csharp_kernel_then_the_submission_is_not_passed_to_csharpScript() { var kernel = new CompositeKernel(new[] { new CSharpRepl() }); @@ -33,7 +33,7 @@ public async Task When_SubmitCode_command_adds_packages_to_csharp_kernel_then_th command.Code.Should().Be("var a = new List();"); } - [Fact] + [Fact(Skip = "WIP")] public void When_SubmitCode_command_adds_packages_to_csharp_kernel_then_PackageAdded_event_is_raised() { throw new NotImplementedException(); diff --git a/WorkspaceServer/Kernel/CSharpRepl.cs b/WorkspaceServer/Kernel/CSharpRepl.cs index 2707ac06c..de91c40e2 100644 --- a/WorkspaceServer/Kernel/CSharpRepl.cs +++ b/WorkspaceServer/Kernel/CSharpRepl.cs @@ -4,8 +4,6 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Reactive.Linq; -using System.Reactive.Subjects; using System.Reflection; using System.Text; using System.Threading.Tasks; @@ -13,117 +11,9 @@ using Microsoft.CodeAnalysis.CSharp; using Microsoft.CodeAnalysis.CSharp.Scripting; using Microsoft.CodeAnalysis.Scripting; -using CancellationToken = System.Threading.CancellationToken; namespace WorkspaceServer.Kernel { - public delegate Task KernelCommandPipelineMiddleware( - InvocationContext context, - Func next); - - - public class InvocationContext - { - public object Result { get; set; } - public IKernelCommand Command { get; } - public CancellationToken CancellationToken { get; } - - public InvocationContext(IKernelCommand command, CancellationToken cancellationToken) - { - Command = command; - CancellationToken = cancellationToken; - } - } - - - public class KernelCommandPipeline { - private readonly KernelBase _kernel; - - private readonly List _invocations = new List(); - - public KernelCommandPipeline(KernelBase kernel) - { - _kernel = kernel ?? throw new ArgumentNullException(nameof(kernel)); - } - - public async Task InvokeAsync(InvocationContext context) - { - var invocationChain = BuildInvocationChain(); - - await invocationChain(context, invocationContext => Task.CompletedTask); - } - - private KernelCommandPipelineMiddleware BuildInvocationChain() - { - var invocations = new List(_invocations); - - invocations.Add(async (invocationContext, _) => - { - await _kernel.HandleAsync(invocationContext); - }); - - return invocations.Aggregate( - (function, continuation) => - (ctx, next) => - function(ctx, c => continuation(c, next))); - } - - public void AddMiddleware(KernelCommandPipelineMiddleware middleware) - { - _invocations.Add(middleware); - } - } - public abstract class KernelBase: IKernel - { - public KernelCommandPipeline Pipeline { get; } - - private readonly Subject _channel = new Subject(); - public IObservable KernelEvents => _channel; - - protected KernelBase() - { - Pipeline = new KernelCommandPipeline(this); - } - public Task SendAsync(IKernelCommand command, CancellationToken cancellationToken) - { - return Pipeline.InvokeAsync(new InvocationContext(command, cancellationToken)); - } - - public Task SendAsync(IKernelCommand command) - { - return SendAsync(command, CancellationToken.None); - } - - protected void PublishEvent(IKernelEvent kernelEvent) - { - _channel.OnNext(kernelEvent); - } - - protected internal abstract Task HandleAsync(InvocationContext context); - } - public class CompositeKernel : KernelBase - { - private readonly IReadOnlyList _kernels; - - public CompositeKernel(IReadOnlyList kernels) - { - _kernels = kernels ?? throw new ArgumentNullException(nameof(kernels)); - kernels.Select(k => k.KernelEvents).Merge().Subscribe(PublishEvent); - } - - protected internal override async Task HandleAsync(InvocationContext context) - { - foreach (var kernel in _kernels.OfType()) - { - await kernel.Pipeline.InvokeAsync(context); - if (context.Result != null) - { - return; - } - } - } - } - public class CSharpRepl : KernelBase { private static readonly MethodInfo _hasReturnValueMethod = typeof(Script) @@ -165,15 +55,18 @@ private void SetupScriptOptions() typeof(Task<>).GetTypeInfo().Assembly); } - private async Task HandleCodeSubmission(SubmitCode codeSubmission, CancellationToken cancellationToken) + private async Task HandleCodeSubmission(SubmitCode codeSubmission, KernelCommandContext context) { - PublishEvent(new CodeSubmissionReceived(codeSubmission.Id, codeSubmission.Code)); + var commandResult = new KernelCommandResult(); + commandResult.RelayEventsOn(PublishEvent); + context.Result = commandResult; + commandResult.OnNext(new CodeSubmissionReceived(codeSubmission.Id, codeSubmission.Code)); var (shouldExecute, code) = ComputeFullSubmission(codeSubmission.Code); if (shouldExecute) { - PublishEvent(new CompleteCodeSubmissionReceived(codeSubmission.Id)); + commandResult.OnNext(new CompleteCodeSubmissionReceived(codeSubmission.Id)); Exception exception = null; try { @@ -182,7 +75,7 @@ private async Task HandleCodeSubmission(SubmitCode codeSubmission, CancellationT _scriptState = await CSharpScript.RunAsync( code, ScriptOptions, - cancellationToken: cancellationToken); + cancellationToken: context.CancellationToken); } else { @@ -194,7 +87,7 @@ private async Task HandleCodeSubmission(SubmitCode codeSubmission, CancellationT exception = e; return true; }, - cancellationToken); + context.CancellationToken); } } catch (Exception e) @@ -206,7 +99,7 @@ private async Task HandleCodeSubmission(SubmitCode codeSubmission, CancellationT if (hasReturnValue) { - PublishEvent(new ValueProduced(codeSubmission.Id, _scriptState.ReturnValue)); + commandResult.OnNext(new ValueProduced(codeSubmission.Id, _scriptState.ReturnValue)); } if (exception != null) { @@ -215,21 +108,25 @@ private async Task HandleCodeSubmission(SubmitCode codeSubmission, CancellationT { var message = string.Join("\n", diagnostics.Select(d => d.GetMessage())); - PublishEvent(new CodeSubmissionEvaluationFailed(codeSubmission.Id, exception, message)); + commandResult.OnNext(new CodeSubmissionEvaluationFailed(codeSubmission.Id, exception, message)); } else { - PublishEvent(new CodeSubmissionEvaluationFailed(codeSubmission.Id, exception)); + commandResult.OnNext(new CodeSubmissionEvaluationFailed(codeSubmission.Id, exception)); + } + commandResult.OnError(exception); } else { - PublishEvent(new CodeSubmissionEvaluated(codeSubmission.Id)); + commandResult.OnNext(new CodeSubmissionEvaluated(codeSubmission.Id)); + commandResult.OnCompleted(); } } else { - PublishEvent(new IncompleteCodeSubmissionReceived(codeSubmission.Id)); + commandResult.OnNext(new IncompleteCodeSubmissionReceived(codeSubmission.Id)); + commandResult.OnCompleted(); } } @@ -249,15 +146,14 @@ private async Task HandleCodeSubmission(SubmitCode codeSubmission, CancellationT return (true, code); } - protected internal override Task HandleAsync(InvocationContext context) + protected internal override Task HandleAsync(KernelCommandContext context) { switch (context.Command) { case SubmitCode submitCode: if (submitCode.Language == "csharp") { - context.Result = new object(); - return HandleCodeSubmission(submitCode, context.CancellationToken); + return HandleCodeSubmission(submitCode, context); } else { diff --git a/WorkspaceServer/Kernel/CompositeKernel.cs b/WorkspaceServer/Kernel/CompositeKernel.cs new file mode 100644 index 000000000..55c4c38b6 --- /dev/null +++ b/WorkspaceServer/Kernel/CompositeKernel.cs @@ -0,0 +1,35 @@ +// Copyright (c) .NET Foundation and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reactive.Linq; +using System.Threading.Tasks; + +namespace WorkspaceServer.Kernel +{ + public class CompositeKernel : KernelBase + { + private readonly IReadOnlyList _kernels; + + + public CompositeKernel(IReadOnlyList kernels) + { + _kernels = kernels ?? throw new ArgumentNullException(nameof(kernels)); + AddDisposable( kernels.Select(k => k.KernelEvents).Merge().Subscribe(PublishEvent)); + } + + protected internal override async Task HandleAsync(KernelCommandContext context) + { + foreach (var kernel in _kernels.OfType()) + { + await kernel.Pipeline.InvokeAsync(context); + if (context.Result != null) + { + return; + } + } + } + } +} \ No newline at end of file diff --git a/WorkspaceServer/Kernel/IKernel.cs b/WorkspaceServer/Kernel/IKernel.cs index 29fedbd72..6e5ad6875 100644 --- a/WorkspaceServer/Kernel/IKernel.cs +++ b/WorkspaceServer/Kernel/IKernel.cs @@ -7,10 +7,9 @@ namespace WorkspaceServer.Kernel { - public interface IKernel + public interface IKernel: IDisposable { IObservable KernelEvents { get; } - Task SendAsync(IKernelCommand command, CancellationToken cancellationToken); - Task SendAsync(IKernelCommand command); + Task SendAsync(IKernelCommand command, CancellationToken cancellationToken); } } \ No newline at end of file diff --git a/WorkspaceServer/Kernel/IKernelCommandResult.cs b/WorkspaceServer/Kernel/IKernelCommandResult.cs new file mode 100644 index 000000000..fae1cfbba --- /dev/null +++ b/WorkspaceServer/Kernel/IKernelCommandResult.cs @@ -0,0 +1,12 @@ +// Copyright (c) .NET Foundation and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; + +namespace WorkspaceServer.Kernel +{ + public interface IKernelCommandResult + { + IObservable Events { get; } + } +} \ No newline at end of file diff --git a/WorkspaceServer/Kernel/KernelBase.cs b/WorkspaceServer/Kernel/KernelBase.cs new file mode 100644 index 000000000..7f6b80536 --- /dev/null +++ b/WorkspaceServer/Kernel/KernelBase.cs @@ -0,0 +1,60 @@ +// Copyright (c) .NET Foundation and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Reactive.Disposables; +using System.Reactive.Subjects; +using System.Threading; +using System.Threading.Tasks; + +namespace WorkspaceServer.Kernel +{ + public abstract class KernelBase: IKernel + { + public KernelCommandPipeline Pipeline { get; } + + private readonly Subject _channel = new Subject(); + private readonly CompositeDisposable _disposables; + public IObservable KernelEvents => _channel; + + protected KernelBase() + { + Pipeline = new KernelCommandPipeline(this); + _disposables = new CompositeDisposable(); + } + + public async Task SendAsync(IKernelCommand command, CancellationToken cancellationToken) + { + if (command == null) { + throw new ArgumentNullException(nameof(command)); + + } + var invocationContext = new KernelCommandContext(command, cancellationToken); + await Pipeline.InvokeAsync(invocationContext); + return invocationContext.Result; + } + + protected void PublishEvent(IKernelEvent kernelEvent) + { + if (kernelEvent == null) + { + throw new ArgumentNullException(nameof(kernelEvent)); + } + _channel.OnNext(kernelEvent); + } + protected void AddDisposable(IDisposable disposable) + { + if (disposable == null) + { + throw new ArgumentNullException(nameof(disposable)); + } + _disposables.Add(disposable); + } + + protected internal abstract Task HandleAsync(KernelCommandContext context); + public void Dispose() + { + _disposables.Dispose(); + } + } +} \ No newline at end of file diff --git a/WorkspaceServer/Kernel/KernelCommandContext.cs b/WorkspaceServer/Kernel/KernelCommandContext.cs new file mode 100644 index 000000000..cb1d65de2 --- /dev/null +++ b/WorkspaceServer/Kernel/KernelCommandContext.cs @@ -0,0 +1,20 @@ +// Copyright (c) .NET Foundation and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System.Threading; + +namespace WorkspaceServer.Kernel +{ + public class KernelCommandContext + { + public IKernelCommandResult Result { get; set; } + public IKernelCommand Command { get; } + public CancellationToken CancellationToken { get; } + + public KernelCommandContext(IKernelCommand command, CancellationToken cancellationToken) + { + Command = command; + CancellationToken = cancellationToken; + } + } +} \ No newline at end of file diff --git a/WorkspaceServer/Kernel/KernelCommandPipeline.cs b/WorkspaceServer/Kernel/KernelCommandPipeline.cs new file mode 100644 index 000000000..1d1db546a --- /dev/null +++ b/WorkspaceServer/Kernel/KernelCommandPipeline.cs @@ -0,0 +1,48 @@ +// Copyright (c) .NET Foundation and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace WorkspaceServer.Kernel +{ + public class KernelCommandPipeline { + private readonly KernelBase _kernel; + + private readonly List _invocations = new List(); + + public KernelCommandPipeline(KernelBase kernel) + { + _kernel = kernel ?? throw new ArgumentNullException(nameof(kernel)); + } + + public async Task InvokeAsync(KernelCommandContext context) + { + var invocationChain = BuildInvocationChain(); + + await invocationChain(context, invocationContext => Task.CompletedTask); + } + + private KernelCommandPipelineMiddleware BuildInvocationChain() + { + var invocations = new List(_invocations); + + invocations.Add(async (invocationContext, _) => + { + await _kernel.HandleAsync(invocationContext); + }); + + return invocations.Aggregate( + (function, continuation) => + (ctx, next) => + function(ctx, c => continuation(c, next))); + } + + public void AddMiddleware(KernelCommandPipelineMiddleware middleware) + { + _invocations.Add(middleware); + } + } +} \ No newline at end of file diff --git a/WorkspaceServer/Kernel/KernelCommandPipelineMiddleware.cs b/WorkspaceServer/Kernel/KernelCommandPipelineMiddleware.cs new file mode 100644 index 000000000..56817974a --- /dev/null +++ b/WorkspaceServer/Kernel/KernelCommandPipelineMiddleware.cs @@ -0,0 +1,12 @@ +// Copyright (c) .NET Foundation and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Threading.Tasks; + +namespace WorkspaceServer.Kernel +{ + public delegate Task KernelCommandPipelineMiddleware( + KernelCommandContext context, + Func next); +} \ No newline at end of file diff --git a/WorkspaceServer/Kernel/KernelCommandResult.cs b/WorkspaceServer/Kernel/KernelCommandResult.cs new file mode 100644 index 000000000..741745893 --- /dev/null +++ b/WorkspaceServer/Kernel/KernelCommandResult.cs @@ -0,0 +1,43 @@ +// Copyright (c) .NET Foundation and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Reactive.Subjects; + +namespace WorkspaceServer.Kernel +{ + internal class KernelCommandResult : IKernelCommandResult, IObserver + { + private readonly ReplaySubject _events; + private Action _eventRelay; + + public KernelCommandResult() + { + _events = new ReplaySubject(); + } + + public IObservable Events => _events; + + + public void OnCompleted() + { + _events.OnCompleted(); + } + + public void OnError(Exception error) + { + _events.OnError(error); + } + + public void OnNext(IKernelEvent kernelEvent) + { + _events.OnNext(kernelEvent); + _eventRelay?.Invoke(kernelEvent); + } + + public void RelayEventsOn(Action eventRelay) + { + _eventRelay = eventRelay; + } + } +} \ No newline at end of file diff --git a/WorkspaceServer/Kernel/KernelExtensions.cs b/WorkspaceServer/Kernel/KernelExtensions.cs new file mode 100644 index 000000000..10780c2ec --- /dev/null +++ b/WorkspaceServer/Kernel/KernelExtensions.cs @@ -0,0 +1,16 @@ +// Copyright (c) .NET Foundation and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System.Threading; +using System.Threading.Tasks; + +namespace WorkspaceServer.Kernel +{ + public static class KernelExtensions + { + public static Task SendAsync(this IKernel kernel, IKernelCommand command) + { + return kernel.SendAsync(command, CancellationToken.None); + } + } +} \ No newline at end of file