这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 8 additions & 48 deletions Microsoft.DotNet.Try.Jupyter.Tests/ExecuteRequestHandlerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public ExecuteRequestHandlerTests()
[Fact]
public void cannot_handle_requests_that_are_not_executeRequest()
{
var handler = new ExecuteRequestHandler(new KernelSimulator());
var kernel = new CSharpRepl();
var handler = new ExecuteRequestHandler(kernel);
var request = Message.Create(new DisplayData(), null);
Func<Task> messageHandling = () => handler.Handle(new JupyterRequestContext(_serverChannel, _ioPubChannel, request, _kernelStatus));
messageHandling.Should().ThrowExactly<InvalidOperationException>();
Expand All @@ -40,17 +41,7 @@ public void cannot_handle_requests_that_are_not_executeRequest()
[Fact]
public async Task handles_executeRequest()
{
var kernel = new KernelSimulator((command, channel) =>
{
switch (command)
{
case SubmitCode _:
return Task.CompletedTask;
default:
throw new NotImplementedException();
}
});

var kernel = new CSharpRepl();
var handler = new ExecuteRequestHandler(kernel);
var request = Message.Create(new ExecuteRequest("var a =12;"), null);
await handler.Handle(new JupyterRequestContext(_serverChannel, _ioPubChannel, request, _kernelStatus));
Expand All @@ -59,17 +50,7 @@ public async Task handles_executeRequest()
[Fact]
public async Task sends_executeReply_message_on_codeSubmissionEvaluated()
{
var kernel = new KernelSimulator((command, channel) =>
{
switch (command)
{
case SubmitCode codeSubmission:
channel.OnNext(new CodeSubmissionEvaluated(codeSubmission.Id));
return Task.CompletedTask;
default:
throw new NotImplementedException();
}
});
var kernel = new CSharpRepl();

var handler = new ExecuteRequestHandler(kernel);
var request = Message.Create(new ExecuteRequest("var a =12;"), null);
Expand All @@ -83,20 +64,10 @@ public async Task sends_executeReply_message_on_codeSubmissionEvaluated()
[Fact]
public async Task sends_executeReply_with_error_message_on_codeSubmissionEvaluated()
{
var kernel = new KernelSimulator((command, channel) =>
{
switch (command)
{
case SubmitCode codeSubmission:
channel.OnNext(new CodeSubmissionEvaluationFailed(codeSubmission.Id, new InvalidOperationException("failed")));
return Task.CompletedTask;
default:
throw new NotImplementedException();
}
});
var kernel = new CSharpRepl();

var handler = new ExecuteRequestHandler(kernel);
var request = Message.Create(new ExecuteRequest("var a =12;"), null);
var request = Message.Create(new ExecuteRequest("asdes"), null);
await handler.Handle(new JupyterRequestContext(_serverChannel, _ioPubChannel, request, _kernelStatus));

_serverRecordingSocket.DecodedMessages
Expand All @@ -114,21 +85,10 @@ public async Task sends_executeReply_with_error_message_on_codeSubmissionEvaluat
[Fact]
public async Task sends_executeResult_message_on_valueProduced()
{
var kernel = new KernelSimulator((command, channel) =>
{
switch (command)
{
case SubmitCode codeSubmission:
channel.OnNext(new ValueProduced(codeSubmission.Id, new[] { 1, 2, 3 }));
channel.OnNext(new CodeSubmissionEvaluated(codeSubmission.Id));
return Task.CompletedTask;
default:
throw new NotImplementedException();
}
});
var kernel = new CSharpRepl();

var handler = new ExecuteRequestHandler(kernel);
var request = Message.Create(new ExecuteRequest("var a =12;"), null);
var request = Message.Create(new ExecuteRequest("2+2"), null);
await handler.Handle(new JupyterRequestContext(_serverChannel, _ioPubChannel, request, _kernelStatus));

_serverRecordingSocket.DecodedMessages
Expand Down
39 changes: 0 additions & 39 deletions Microsoft.DotNet.Try.Jupyter.Tests/KernelSimulator.cs

This file was deleted.

158 changes: 117 additions & 41 deletions Microsoft.DotNet.Try.Jupyter/ExecuteRequestHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using System.Reactive.Disposables;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.DotNet.Try.Jupyter.Protocol;
Expand All @@ -14,15 +14,18 @@

namespace Microsoft.DotNet.Try.Jupyter
{
public class ExecuteRequestHandler : IObserver<IKernelEvent>
public class ExecuteRequestHandler : IDisposable
{
private readonly IKernel _kernel;
private readonly RenderingEngine _renderingEngine;
private readonly ConcurrentDictionary<Guid, OpenRequest> _openRequests = new ConcurrentDictionary<Guid, OpenRequest>();
private int _executionCount;
private readonly CodeSubmissionProcessors _processors;
private readonly CompositeDisposable _disposables = new CompositeDisposable();

private class OpenRequest
private class OpenRequest : IDisposable
{
private readonly CompositeDisposable _disposables = new CompositeDisposable();
public Guid Id { get; }
public Dictionary<string, object> Transient { get; }
public JupyterRequestContext Context { get; }
Expand All @@ -38,6 +41,16 @@ public OpenRequest(JupyterRequestContext context, ExecuteRequest executeRequest,
Id = id;
Transient = transient;
}

public void AddDisposable(IDisposable disposable)
{
_disposables.Add(disposable);
}

public void Dispose()
{
_disposables.Dispose();
}
}

public ExecuteRequestHandler(IKernel kernel)
Expand All @@ -48,33 +61,66 @@ public ExecuteRequestHandler(IKernel kernel)
_renderingEngine.RegisterRenderer(typeof(IDictionary), new DictionaryRenderer());
_renderingEngine.RegisterRenderer(typeof(IList), new ListRenderer());
_renderingEngine.RegisterRenderer(typeof(IEnumerable), new SequenceRenderer());

_kernel.KernelEvents.Subscribe(this);
_processors = new CodeSubmissionProcessors();
}

public Task Handle(JupyterRequestContext context)
public async Task Handle(JupyterRequestContext context)
{
var executeRequest = context.GetRequestContent<ExecuteRequest>() ?? throw new InvalidOperationException($"Request Content must be a not null {typeof(ExecuteRequest).Name}");
context.RequestHandlerStatus.SetAsBusy();
var command = new SubmitCode(executeRequest.Code);
var id = command.Id;
var transient = new Dictionary<string, object> { { "display_id", id.ToString() } };
var executionCount = executeRequest.Silent ? _executionCount : Interlocked.Increment(ref _executionCount);
_openRequests[id] = new OpenRequest(context, executeRequest, executionCount, id, transient);
return _kernel.SendAsync(command);
}

void IObserver<IKernelEvent>.OnCompleted()
{
throw new NotImplementedException();
}

void IObserver<IKernelEvent>.OnError(Exception error)
{
throw new NotImplementedException();

try
{
var command = new SubmitCode(executeRequest.Code, "csharp");
command = await _processors.ProcessAsync(command);
var id = command.Id;
var transient = new Dictionary<string, object> { { "display_id", id.ToString() } };

var openRequest = new OpenRequest(context, executeRequest, executionCount, id, transient);
_openRequests[id] = openRequest;

var kernelResult = await _kernel.SendAsync(command);
openRequest.AddDisposable(kernelResult.Events.Subscribe(OnKernelResultEvent));
}
catch (Exception e)
{
var errorContent = new Error(
eName: "Unhandled Exception",
eValue: $"{e.Message}"
);

if (!executeRequest.Silent)
{
// send on io
var error = Message.Create(
errorContent,
context.Request.Header);
context.IoPubChannel.Send(error);

// send on stderr
var stdErr = new StdErrStream(errorContent.EValue);
var stream = Message.Create(
stdErr,
context.Request.Header);
context.IoPubChannel.Send(stream);
}

// reply Error
var executeReplyPayload = new ExecuteReplyError(errorContent, executionCount: executionCount);

// send to server
var executeReply = Message.CreateResponse(
executeReplyPayload,
context.Request);

context.ServerChannel.Send(executeReply);
context.RequestHandlerStatus.SetAsIdle();
}
}


void IObserver<IKernelEvent>.OnNext(IKernelEvent value)
void OnKernelResultEvent(IKernelEvent value)
{
switch (value)
{
Expand All @@ -99,7 +145,7 @@ void IObserver<IKernelEvent>.OnNext(IKernelEvent value)
private static void OnCodeSubmissionEvaluatedFailed(CodeSubmissionEvaluationFailed codeSubmissionEvaluationFailed, ConcurrentDictionary<Guid, OpenRequest> openRequests)
{
var openRequest = openRequests[codeSubmissionEvaluationFailed.ParentId];

var errorContent = new Error(
eName: "Unhandled Exception",
eValue: $"{codeSubmissionEvaluationFailed.Message}"
Expand Down Expand Up @@ -138,25 +184,50 @@ private static void OnValueProduced(ValueProduced valueProduced,
ConcurrentDictionary<Guid, OpenRequest> openRequests, RenderingEngine renderingEngine)
{
var openRequest = openRequests[valueProduced.ParentId];

var rendering = renderingEngine.Render(valueProduced.Value);


// executeResult data
var executeResultData = new ExecuteResult(
openRequest.ExecutionCount,
transient: openRequest.Transient,
data: new Dictionary<string, object> {
{ rendering.Mime, rendering.Content}
});

if (!openRequest.ExecuteRequest.Silent)
try
{
// send on io
var executeResultMessage = Message.Create(
executeResultData,
openRequest.Context.Request.Header);
openRequest.Context.IoPubChannel.Send(executeResultMessage);
var rendering = renderingEngine.Render(valueProduced.Value);

// executeResult data
var executeResultData = new ExecuteResult(
openRequest.ExecutionCount,
transient: openRequest.Transient,
data: new Dictionary<string, object>
{
{rendering.Mime, rendering.Content}
});

if (!openRequest.ExecuteRequest.Silent)
{
// send on io
var executeResultMessage = Message.Create(
executeResultData,
openRequest.Context.Request.Header);
openRequest.Context.IoPubChannel.Send(executeResultMessage);
}
}
catch (Exception e)
{
var errorContent = new Error(
eName: "Unhandled Exception",
eValue: $"{e.Message}"
);

if (!openRequest.ExecuteRequest.Silent)
{
// send on io
var error = Message.Create(
errorContent,
openRequest.Context.Request.Header);
openRequest.Context.IoPubChannel.Send(error);

// send on stderr
var stdErr = new StdErrStream(errorContent.EValue);
var stream = Message.Create(
stdErr,
openRequest.Context.Request.Header);
openRequest.Context.IoPubChannel.Send(stream);
}
}
}

Expand All @@ -175,5 +246,10 @@ private static void OnCodeSubmissionEvaluated(CodeSubmissionEvaluated codeSubmis
openRequest.Context.ServerChannel.Send(executeReply);
openRequest.Context.RequestHandlerStatus.SetAsIdle();
}

public void Dispose()
{
_disposables.Dispose();
}
}
}
1 change: 0 additions & 1 deletion Microsoft.DotNet.Try.Jupyter/Protocol/ExecuteRequest.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) .NET Foundation and contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Collections.Generic;
using Newtonsoft.Json;

Expand Down
7 changes: 0 additions & 7 deletions Microsoft.DotNet.Try.Jupyter/Rendering/DefaultRenderer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,12 @@ public IRendering Render(object source, IRenderingEngine engine = null)

public IRendering RenderObject(object source, IRenderingEngine engine = null)
{
try
{
var rows = CreateRows(source, engine);
var table = $@"<table>
{rows}
</table>";

return new HtmlRendering(table);
}
catch (Exception)
{
return new PlainTextRendering(source?.ToString());
}
}

private string CreateRows(object source, IRenderingEngine engine)
Expand Down
Loading