Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .autover/changes/c27a62e6-91ca-4a59-9406-394866cdfa62.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"Projects": [
{
"Name": "Amazon.Lambda.RuntimeSupport",
"Type": "Minor",
"ChangelogMessages": [
"(Preview) Add response streaming support"
]
},
{
"Name": "Amazon.Lambda.Core",
"Type": "Minor",
"ChangelogMessages": [
"(Preview) Add response streaming support"
]
}
]
}
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*.suo
*.user

**/.kiro/

####################
# Build/Test folders
####################
Expand Down
19 changes: 17 additions & 2 deletions Libraries/Libraries.sln
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.0.31717.71
# Visual Studio Version 18
VisualStudioVersion = 18.3.11512.155 d18.3
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{AAB54E74-20B1-42ED-BC3D-CE9F7BC7FD12}"
EndProject
Expand Down Expand Up @@ -151,6 +151,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TestCustomAuthorizerApp.Int
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TestCustomAuthorizerApp", "test\TestCustomAuthorizerApp\TestCustomAuthorizerApp.csproj", "{3BFA4B73-BA61-4578-833B-C5B3A16EDA9E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ResponseStreamingFunctionHandlers", "test\Amazon.Lambda.RuntimeSupport.Tests\ResponseStreamingFunctionHandlers\ResponseStreamingFunctionHandlers.csproj", "{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -941,6 +943,18 @@ Global
{3BFA4B73-BA61-4578-833B-C5B3A16EDA9E}.Release|x64.Build.0 = Release|Any CPU
{3BFA4B73-BA61-4578-833B-C5B3A16EDA9E}.Release|x86.ActiveCfg = Release|Any CPU
{3BFA4B73-BA61-4578-833B-C5B3A16EDA9E}.Release|x86.Build.0 = Release|Any CPU
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9}.Debug|x64.ActiveCfg = Debug|Any CPU
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9}.Debug|x64.Build.0 = Debug|Any CPU
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9}.Debug|x86.ActiveCfg = Debug|Any CPU
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9}.Debug|x86.Build.0 = Debug|Any CPU
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9}.Release|Any CPU.Build.0 = Release|Any CPU
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9}.Release|x64.ActiveCfg = Release|Any CPU
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9}.Release|x64.Build.0 = Release|Any CPU
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9}.Release|x86.ActiveCfg = Release|Any CPU
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -1015,6 +1029,7 @@ Global
{8D03BDF3-7078-4B46-A3F1-C73BE6D6CE0D} = {1DE4EE60-45BA-4EF7-BE00-B9EB861E4C69}
{8EEDD576-7FC4-4FAC-A5A2-F58562753A53} = {1DE4EE60-45BA-4EF7-BE00-B9EB861E4C69}
{3BFA4B73-BA61-4578-833B-C5B3A16EDA9E} = {1DE4EE60-45BA-4EF7-BE00-B9EB861E4C69}
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9} = {B5BD0336-7D08-492C-8489-42C987E29B39}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {503678A4-B8D1-4486-8915-405A3E9CF0EB}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
#if NET8_0_OR_GREATER
using System.Collections.Generic;
using System.Net;
using System.Runtime.Versioning;
using System.Text.Json;

namespace Amazon.Lambda.Core.ResponseStreaming
{
/// <summary>
/// The HTTP response prelude to be sent as the first chunk of a streaming response when using <see cref="LambdaResponseStreamFactory.CreateHttpStream"/>.
/// </summary>
[RequiresPreviewFeatures(LambdaResponseStreamFactory.ParameterizedPreviewMessage)]
public class HttpResponseStreamPrelude
{
/// <summary>
/// The Http status code to include in the response prelude.
/// </summary>
public HttpStatusCode? StatusCode { get; set; }

/// <summary>
/// The response headers to include in the response prelude. This collection supports setting single value for the same headers.
/// </summary>
public IDictionary<string, string> Headers { get; set; } = new Dictionary<string, string>();

/// <summary>
/// The response headers to include in the response prelude. This collection supports setting multiple values for the same headers.
/// </summary>
public IDictionary<string, IList<string>> MultiValueHeaders { get; set; } = new Dictionary<string, IList<string>>();

/// <summary>
/// The list of cookies to include in the response prelude. This is used for Lambda Function URL responses, which support a separate "cookies" field in the response JSON for setting cookies, rather than requiring cookies to be set via the "Set-Cookie" header.
/// </summary>
public IList<string> Cookies { get; set; } = new List<string>();

internal byte[] ToByteArray()
{
var bufferWriter = new System.Buffers.ArrayBufferWriter<byte>();
using (var writer = new Utf8JsonWriter(bufferWriter))
{
writer.WriteStartObject();

if (StatusCode.HasValue)
writer.WriteNumber("statusCode", (int)StatusCode);

if (Headers?.Count > 0)
{
writer.WriteStartObject("headers");
foreach (var header in Headers)
{
writer.WriteString(header.Key, header.Value);
}
writer.WriteEndObject();
}

if (MultiValueHeaders?.Count > 0)
{
writer.WriteStartObject("multiValueHeaders");
foreach (var header in MultiValueHeaders)
{
writer.WriteStartArray(header.Key);
foreach (var value in header.Value)
{
writer.WriteStringValue(value);
}
writer.WriteEndArray();
}
writer.WriteEndObject();
}

if (Cookies?.Count > 0)
{
writer.WriteStartArray("cookies");
foreach (var cookie in Cookies)
{
writer.WriteStringValue(cookie);
}
writer.WriteEndArray();
}

writer.WriteEndObject();
}

return bufferWriter.WrittenSpan.ToArray();
}
}
}
#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
#if NET8_0_OR_GREATER
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Amazon.Lambda.Core.ResponseStreaming
{
/// <summary>
/// Interface for writing streaming responses in AWS Lambda functions.
/// Obtained by calling <see cref="LambdaResponseStreamFactory.CreateStream"/> within a handler.
/// </summary>
internal interface ILambdaResponseStream : IDisposable
{
/// <summary>
/// Asynchronously writes a portion of a byte array to the response stream.
/// </summary>
/// <param name="buffer">The byte array containing data to write.</param>
/// <param name="offset">The zero-based byte offset in buffer at which to begin copying bytes.</param>
/// <param name="count">The number of bytes to write.</param>
/// <param name="cancellationToken">Optional cancellation token.</param>
/// <returns>A task representing the asynchronous operation.</returns>
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has been reported.</exception>
Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default);


/// <summary>
/// Gets the total number of bytes written to the stream so far.
/// </summary>
long BytesWritten { get; }


/// <summary>
/// Gets whether an error has been reported.
/// </summary>
bool HasError { get; }
}
}
#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
#if NET8_0_OR_GREATER

using System;
using System.IO;
using System.Runtime.Versioning;
using System.Threading;
using System.Threading.Tasks;

namespace Amazon.Lambda.Core.ResponseStreaming
{
/// <summary>
/// A write-only, non-seekable <see cref="Stream"/> subclass that streams response data
/// to the Lambda Runtime API. Returned by <see cref="LambdaResponseStreamFactory.CreateStream"/>.
/// Integrates with standard .NET stream consumers such as <see cref="System.IO.StreamWriter"/>.
/// </summary>
[RequiresPreviewFeatures(LambdaResponseStreamFactory.ParameterizedPreviewMessage)]
public class LambdaResponseStream : Stream
{
private readonly ILambdaResponseStream _responseStream;

internal LambdaResponseStream(ILambdaResponseStream responseStream)
{
_responseStream = responseStream;
}

/// <summary>
/// The number of bytes written to the Lambda response stream so far.
/// </summary>
public long BytesWritten => _responseStream.BytesWritten;

/// <summary>
/// Asynchronously writes a byte array to the response stream.
/// </summary>
/// <param name="buffer">The byte array to write.</param>
/// <param name="cancellationToken">Optional cancellation token.</param>
/// <returns>A task representing the asynchronous operation.</returns>
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has been reported.</exception>
public async Task WriteAsync(byte[] buffer, CancellationToken cancellationToken = default)
{
if (buffer == null)
throw new ArgumentNullException(nameof(buffer));

await WriteAsync(buffer, 0, buffer.Length, cancellationToken);
}

/// <summary>
/// Asynchronously writes a portion of a byte array to the response stream.
/// </summary>
/// <param name="buffer">The byte array containing data to write.</param>
/// <param name="offset">The zero-based byte offset in buffer at which to begin copying bytes.</param>
/// <param name="count">The number of bytes to write.</param>
/// <param name="cancellationToken">Optional cancellation token.</param>
/// <returns>A task representing the asynchronous operation.</returns>
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has been reported.</exception>
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default)
{
await _responseStream.WriteAsync(buffer, offset, count, cancellationToken);
}

#region Noop Overrides

/// <summary>Gets a value indicating whether the stream supports reading. Always <c>false</c>.</summary>
public override bool CanRead => false;

/// <summary>Gets a value indicating whether the stream supports seeking. Always <c>false</c>.</summary>
public override bool CanSeek => false;

/// <summary>Gets a value indicating whether the stream supports writing. Always <c>true</c>.</summary>
public override bool CanWrite => true;

/// <summary>
/// Gets the total number of bytes written to the stream so far.
/// Equivalent to <see cref="BytesWritten"/>.
/// </summary>
public override long Length => BytesWritten;

/// <summary>
/// Getting or setting the position is not supported.
/// </summary>
/// <exception cref="NotSupportedException">Always thrown.</exception>
public override long Position
{
get => throw new NotSupportedException("LambdaResponseStream does not support seeking.");
set => throw new NotSupportedException("LambdaResponseStream does not support seeking.");
}

/// <summary>Not supported.</summary>
/// <exception cref="NotImplementedException">Always thrown.</exception>
public override long Seek(long offset, SeekOrigin origin)
=> throw new NotImplementedException("LambdaResponseStream does not support seeking.");

/// <summary>Not supported.</summary>
/// <exception cref="NotImplementedException">Always thrown.</exception>
public override int Read(byte[] buffer, int offset, int count)
=> throw new NotImplementedException("LambdaResponseStream does not support reading.");

/// <summary>Not supported.</summary>
/// <exception cref="NotImplementedException">Always thrown.</exception>
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> throw new NotImplementedException("LambdaResponseStream does not support reading.");

/// <summary>
/// Writes a sequence of bytes to the stream. Delegates to the async path synchronously.
/// Prefer <see cref="WriteAsync(byte[], int, int, CancellationToken)"/> to avoid blocking.
/// </summary>
public override void Write(byte[] buffer, int offset, int count)
=> WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult();

/// <summary>
/// Flush is a no-op; data is sent to the Runtime API immediately on each write.
/// </summary>
public override void Flush() { }

/// <summary>Not supported.</summary>
/// <exception cref="NotSupportedException">Always thrown.</exception>
public override void SetLength(long value)
=> throw new NotSupportedException("LambdaResponseStream does not support SetLength.");
#endregion
}
}
#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
#if NET8_0_OR_GREATER
using System;
using System.IO;
using System.Runtime.Versioning;

namespace Amazon.Lambda.Core.ResponseStreaming
{
/// <summary>
/// Factory to create Lambda response streams for writing streaming responses in AWS Lambda functions. The created streams are write-only and non-seekable.
/// </summary>
[RequiresPreviewFeatures(LambdaResponseStreamFactory.ParameterizedPreviewMessage)]
public class LambdaResponseStreamFactory
{
internal const string ParameterizedPreviewMessage =
"Response streaming is in preview till a new version of .NET Lambda runtime client that supports response streaming " +
"has been deployed to the .NET Lambda managed runtime. Till deployment has been made the feature can be used by deploying as an " +
"executable including the latest version of Amazon.Lambda.RuntimeSupport and setting the \"EnablePreviewFeatures\" in the Lambda " +
"project file to \"true\"";

private static Func<byte[], ILambdaResponseStream> _streamFactory;

internal static void SetLambdaResponseStream(Func<byte[], ILambdaResponseStream> streamFactory)
{
_streamFactory = streamFactory ?? throw new ArgumentNullException(nameof(streamFactory));
}

/// <summary>
/// Creates a <see cref="Stream"/> that can be used to write streaming responses back to callers of the Lambda function. Once
/// a Lambda function creates a response stream all output must be returned by writing to the stream; the Lambda function's handler
/// return value will be ignored. The stream is write-only and non-seekable.
/// </summary>
/// <returns></returns>
public static Stream CreateStream()
{
var runtimeResponseStream = _streamFactory(Array.Empty<byte>());
return new LambdaResponseStream(runtimeResponseStream);
}

/// <summary>
/// Create a <see cref="Stream"/> for writing streaming responses, with an HTTP response prelude containing status code and headers. This should be used for
/// Lambda functions using response streaming that are invoked via the Lambda Function URLs or API Gateway HTTP APIs, where the response format is expected to be an HTTP response.
/// The prelude will be serialized and sent as the first chunk of the response stream, and should contain any necessary HTTP status code and headers.
/// <para>
/// Once a Lambda function creates a response stream all output must be returned by writing to the stream; the Lambda function's handler
/// return value will be ignored. The stream is write-only and non-seekable.
/// </para>
/// </summary>
/// <param name="prelude">The HTTP response prelude including status code and headers.</param>
/// <returns></returns>
public static Stream CreateHttpStream(HttpResponseStreamPrelude prelude)
{
var runtimeResponseStream = _streamFactory(prelude.ToByteArray());
return new LambdaResponseStream(runtimeResponseStream);
}
}
}
#endif
Loading
Loading