Skip to content

Commit

Permalink
Short circuit WritingTask and await its completion in dispose(async) (#…
Browse files Browse the repository at this point in the history
…44)

* Short circuit WritingTask and await its completion in dispose(async)

This to ensure there is no ongoing IO when _streamWriter disposes:

```
  Error: System.InvalidOperationException : The stream is currently in use by a previous operation on the stream.

    Failed Elastic.OpenTelemetry.Tests.TransactionIdProcessorTests.TransactionId_IsAddedToTags [564 ms]
    Error Message:
     System.InvalidOperationException : The stream is currently in use by a previous operation on the stream.
    Stack Trace:
       at System.IO.StreamWriter.ThrowAsyncIOInProgress()
     at System.IO.StreamWriter.Dispose(Boolean disposing)
     at System.IO.TextWriter.Dispose()
     at Elastic.OpenTelemetry.Diagnostics.LogFileWriter.Dispose() in /home/runner/work/elastic-otel-dotnet/elastic-otel-dotnet/src/Elastic.OpenTelemetry/Diagnostics/LogFileWriter.cs:line 297
     at Elastic.OpenTelemetry.AgentBuilder.Agent.Dispose() in /home/runner/work/elastic-otel-dotnet/elastic-otel-dotnet/src/Elastic.OpenTelemetry/AgentBuilder.cs:line 270
     at Elastic.OpenTelemetry.Tests.TransactionIdProcessorTests.TransactionId_IsAddedToTags() in /home/runner/work/elastic-otel-dotnet/elastic-otel-dotnet/tests/Elastic.OpenTelemetry.Tests/TransactionIdProcessorTests.cs:line 36
     at System.RuntimeMethodHandle.InvokeMethod(Object target, Void** arguments, Signature sig, Boolean isConstructor)
     at System.Reflection.MethodBaseInvoker.InvokeWithNoArgs(Object obj, BindingFlags invokeAttr)
```

this was highlighted on our CI test runs:

https://github.com/elastic/elastic-otel-dotnet/actions/runs/8020502607/job/21910481682?pr=39

* _syncDisposeWaitHandle should be readonly
  • Loading branch information
Mpdreamz authored Feb 23, 2024
1 parent 4356e95 commit 3284bfc
Showing 1 changed file with 21 additions and 18 deletions.
39 changes: 21 additions & 18 deletions src/Elastic.OpenTelemetry/Diagnostics/LogFileWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ internal sealed class LogFileWriter : IDisposable, IAsyncDisposable
{
public static readonly bool FileLoggingEnabled = IsFileLoggingEnabled();

private bool _disposing;
private readonly ManualResetEventSlim _syncDisposeWaitHandle = new(false);
private readonly StreamWriter _streamWriter;
private readonly Channel<string> _channel = Channel.CreateBounded<string>(new BoundedChannelOptions(1024)
{
Expand Down Expand Up @@ -46,13 +48,11 @@ private LogFileWriter()

WritingTask = Task.Run(async () =>
{
while (await _channel.Reader.WaitToReadAsync().ConfigureAwait(false))
{
while (_channel.Reader.TryRead(out var logLine))
{
await _streamWriter.WriteLineAsync(logLine).ConfigureAwait(false);
}
}
while (await _channel.Reader.WaitToReadAsync().ConfigureAwait(false) && !_disposing)
while (_channel.Reader.TryRead(out var logLine) && !_disposing)
await _streamWriter.WriteLineAsync(logLine).ConfigureAwait(false);

_syncDisposeWaitHandle.Set();
});

var builder = StringBuilderCache.Acquire();
Expand Down Expand Up @@ -258,9 +258,10 @@ public void WriteLogLine(Activity? activity, int managedThreadId, DateTime dateT
}

var spin = new SpinWait();
while (true)
var message = StringBuilderCache.GetStringAndRelease(builder);
while (!_disposing)
{
if (_channel.Writer.TryWrite(StringBuilderCache.GetStringAndRelease(builder)))
if (_channel.Writer.TryWrite(message))
break;
spin.SpinOnce();
}
Expand Down Expand Up @@ -288,23 +289,25 @@ private static bool IsFileLoggingEnabled()

public void Dispose()
{
// We don't wait for the channel to be drained which is probably the correct choice.
// Dispose should be a quick operation with no chance of exceptions.
// We should document methods to wait for the WritingTask, before disposal, if that matters to the
// consumer.

//tag that we are running a dispose this allows running tasks and spin waits to short circuit
_disposing = true;
_channel.Writer.TryComplete();

_syncDisposeWaitHandle.Wait(TimeSpan.FromSeconds(1));

_streamWriter.Dispose();
}

public async ValueTask DisposeAsync()
{
// We don't wait for the channel to be drained which is probably the correct choice.
// Dispose should be a quick operation with no chance of exceptions.
// We should document methods to await the WritingTask, before disposal, if that matters to the
// consumer.
//tag that we are running a dispose this allows running tasks and spin waits to short circuit
_disposing = true;

_channel.Writer.TryComplete();

//Writing task will short circuit once _disposing is flipped to true
await WritingTask.ConfigureAwait(false);

await _streamWriter.DisposeAsync().ConfigureAwait(false);
}
}

0 comments on commit 3284bfc

Please sign in to comment.