Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
aritchie committed Jan 25, 2025
2 parents ff79946 + 8a3ac43 commit 536dd00
Show file tree
Hide file tree
Showing 14 changed files with 131 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ [EnumeratorCancellation] CancellationToken ct

if (cache != null)
{
// TODO: force refresh?
var item = await cache.Get<TResult>(requestKey).ConfigureAwait(false);
if (item == null)
{
Expand Down
14 changes: 9 additions & 5 deletions src/Shiny.Mediator.DapperRequests/Contracts.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
namespace Shiny.Mediator;
using System.Data;

namespace Shiny.Mediator;


public record DapperRequest<TResult>(
FormattableString Sql,
int? CommandTimeout = null,
string? RequestKey = null
) : IRequest<TResult>, IRequestKey
{
Expand All @@ -19,29 +20,32 @@ public string GetKey()
public record DapperQuery<TResult>(
FormattableString Sql,
int? CommandTimeout = null,
IDbTransaction? Transaction = null,
CommandType? CommandType = null,
string? RequestKey = null
) : DapperRequest<IEnumerable<TResult>>(
Sql,
CommandTimeout,
RequestKey
);

public record DapperFirstQuery<TResult>(
FormattableString Sql,
int? CommandTimeout = null,
IDbTransaction? Transaction = null,
CommandType? CommandType = null,
string? RequestKey = null
) : DapperRequest<TResult>(
Sql,
CommandTimeout,
RequestKey
);

public record DapperScalar(
FormattableString Sql,
int? CommandTimeout = null,
IDbTransaction? Transaction = null,
CommandType? CommandType = null,
string? RequestKey = null
) : DapperRequest<object>(
Sql,
CommandTimeout,
RequestKey
);
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ CancellationToken cancellationToken
.Create(request)
.QueryBuilder(request.Sql)
.QueryFirstOrDefaultAsync<TResult>(
commandTimeout: request.CommandTimeout,
cancellationToken: cancellationToken
request.Transaction,
request.CommandTimeout,
request.CommandType,
cancellationToken
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ CancellationToken cancellationToken
.Create(request)
.QueryBuilder(request.Sql)
.QueryAsync<TResult>(
null,
request.Transaction,
request.CommandTimeout,
cancellationToken: cancellationToken
request.CommandType,
cancellationToken
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,10 @@ public Task<object> Handle(DapperScalar request, RequestContext<DapperScalar> co
=> connectionProvider
.Create(request)
.QueryBuilder(request.Sql)
.ExecuteScalarAsync(null, request.CommandTimeout, cancellationToken: cancellationToken);
.ExecuteScalarAsync(
request.Transaction,
request.CommandTimeout,
request.CommandType,
cancellationToken
);
}
5 changes: 0 additions & 5 deletions src/Shiny.Mediator/Caching/CacheExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@ namespace Shiny.Mediator;

public static class CacheExtensions
{
public static readonly (string Key, bool Value) ForceCacheRefreshHeader = ("ForceCacheRefresh", true);

public static bool HasForceCacheRefresh(this RequestContext context)
=> context.Values.ContainsKey(ForceCacheRefreshHeader.Key);

public static ShinyConfigurator AddCaching<TCache>(this ShinyConfigurator cfg) where TCache : class, ICacheService
{
cfg.Services.AddSingletonAsImplementedInterfaces<TCache>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ CancellationToken cancellationToken
};
}

var config = this.GetItemConfig(attribute, context.Request);
var config = this.GetItemConfig(context, attribute, context.Request);
if (config == null)
return await next().ConfigureAwait(false);

Expand Down Expand Up @@ -71,8 +71,12 @@ CancellationToken cancellationToken
}


protected virtual CacheItemConfig? GetItemConfig(CacheAttribute? attribute, TRequest request)
protected virtual CacheItemConfig? GetItemConfig(RequestContext context, CacheAttribute? attribute, TRequest request)
{
var cache = context.TryGetCacheConfig();
if (cache != null)
return cache;

if (request is ICacheControl control)
{
return new CacheItemConfig(
Expand Down
39 changes: 39 additions & 0 deletions src/Shiny.Mediator/Headers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using Shiny.Mediator.Infrastructure;

namespace Shiny.Mediator;

public static class Headers
{
const string TimerRefreshHeader = nameof(TimerRefreshHeader);
public static (string Key, int Value) TimerRefresh(int timerRefreshSeconds)
=> (TimerRefreshHeader, timerRefreshSeconds);

public static int? TryGetTimerRefresh(this RequestContext context)
=> context.TryGetValue<int>(TimerRefreshHeader);


const string CommandScheduleHeader = nameof(CommandScheduleHeader);
public static (string Key, object Value) SetCommandSchedule(DateTimeOffset dueAt)
=> (CommandScheduleHeader, dueAt);

public static DateTimeOffset? TryGetCommandSchedule(this CommandContext context)
=> context.TryGetValue<DateTimeOffset>(CommandScheduleHeader);

#region Caching

const string CacheConfigHeader = nameof(CacheConfigHeader);
public static CacheItemConfig? TryGetCacheConfig(this RequestContext context)
=> context.TryGetValue<CacheItemConfig>(CacheConfigHeader);

public static (string Key, object Value) SetCacheConfig(this RequestContext context, CacheItemConfig cfg)
=> (CacheConfigHeader, cfg);


const string ForceCacheRefreshHeader = nameof(ForceCacheRefreshHeader);
public static (string Key, bool Value) ForceCacheRefresh { get; } = (ForceCacheRefreshHeader, true);

public static bool HasForceCacheRefresh(this RequestContext context)
=> context.Values.ContainsKey(ForceCacheRefreshHeader);

#endregion
}
15 changes: 14 additions & 1 deletion src/Shiny.Mediator/Http/HttpRequestHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ await decorator
};


protected virtual HttpRequestMessage ContractToHttpRequest(TRequest request, HttpAttribute attribute, string baseUri)
protected virtual HttpRequestMessage ContractToHttpRequest(
TRequest request,
HttpAttribute attribute,
string baseUri
)
{
var httpMethod = ToMethod(attribute.Verb);
logger.LogDebug("HTTP Method: " + httpMethod);
Expand All @@ -109,6 +113,15 @@ protected virtual HttpRequestMessage ContractToHttpRequest(TRequest request, Htt
.GetProperties(BindingFlags.Public | BindingFlags.Instance)
.ToList();

// TODO: headers to http headers - think on this one
// foreach (var header in context.Values)
// {
// if (header.Value is string)
// {
//
// }
// }

var uri = baseUri + attribute.Route;
foreach (var property in properties)
{
Expand Down
2 changes: 2 additions & 0 deletions src/Shiny.Mediator/Infrastructure/ICommandScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ public interface ICommandScheduler
/// Schedules and executes command
/// </summary>
/// <param name="context">The context containing the headers and contract</param>
/// <param name="dueAt">The schedule date</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<bool> Schedule(
CommandContext context,
DateTimeOffset dueAt,
CancellationToken cancellationToken
);
}
20 changes: 9 additions & 11 deletions src/Shiny.Mediator/Infrastructure/Impl/InMemoryCommandScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,19 @@ public class InMemoryCommandScheduler(
TimeProvider timeProvider
) : ICommandScheduler
{
readonly List<CommandContext> commands = new();
readonly List<(DateTimeOffset DueAt, CommandContext Context)> commands = new();
ITimer? timer;


public Task<bool> Schedule(CommandContext command, CancellationToken cancellationToken)
public Task<bool> Schedule(CommandContext command, DateTimeOffset dueAt, CancellationToken cancellationToken)
{
var scheduled = false;
if (command.Command is not IScheduledCommand scheduledCommand)
throw new InvalidCastException($"Command {command.Command} is not of IScheduledCommand");

var now = timeProvider.GetUtcNow();
if (scheduledCommand.DueAt > now)

if (dueAt > now)
{
lock (this.commands)
this.commands.Add(command);
this.commands.Add((dueAt, command));

scheduled = true;
this.timer ??= timeProvider.CreateTimer(_ => this.OnTimerElapsed(), null, TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));
Expand All @@ -36,25 +34,25 @@ protected virtual async void OnTimerElapsed()
{
this.timer!.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); // stop

List<CommandContext> items = null!;
List<(DateTimeOffset DueAt, CommandContext Context)> items = null!;
lock (this.commands)
items = this.commands.ToList();

foreach (var item in items)
{
var command = (IScheduledCommand)item.Command;
var time = timeProvider.GetUtcNow();
if (command.DueAt < time)
if (item.DueAt < time)
{
var headers = item
.Context
.Values
.Select(x => (Key: x.Key, Value: x.Value))
.ToList();

try
{
await mediator
.Send(command, CancellationToken.None, headers)
.Send(item.Context.Command, CancellationToken.None, headers)
.ConfigureAwait(false);
}
catch (Exception ex)
Expand Down
11 changes: 6 additions & 5 deletions src/Shiny.Mediator/Middleware/ScheduledCommandMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,26 @@ public class ScheduledCommandMiddleware<TCommand>(
ILogger<ScheduledCommandMiddleware<TCommand>> logger,
TimeProvider timeProvider,
ICommandScheduler scheduler
) : ICommandMiddleware<TCommand> where TCommand : IScheduledCommand
) : ICommandMiddleware<TCommand> where TCommand : ICommand
{
public async Task Process(
CommandContext<TCommand> context,
CommandHandlerDelegate next,
CancellationToken cancellationToken
)
{
var dueAt = context.TryGetCommandSchedule() ?? (context.Command as IScheduledCommand)?.DueAt;
var now = timeProvider.GetUtcNow();
if (context.Command.DueAt < now)
if (dueAt == null || dueAt < now)
{
logger.LogWarning($"Executing Scheduled Command '{context.Command}' that was due at {context.Command.DueAt}");
logger.LogWarning($"Executing Scheduled Command '{context.Command}' that was due at {dueAt}");
await next().ConfigureAwait(false);
}
else
{
logger.LogInformation($"Command '{context.Command}' scheduled for {context.Command.DueAt}");
logger.LogInformation($"Command '{context.Command}' scheduled for {dueAt}");
await scheduler
.Schedule(context, cancellationToken)
.Schedule(context, dueAt.Value, cancellationToken)
.ConfigureAwait(false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,25 @@ CancellationToken cancellationToken
)
{
var interval = 0;
var section = configuration.GetHandlerSection("TimerRefresh", context.Request, context.RequestHandler);
if (section != null)

var header = context.TryGetTimerRefresh();
if (header != null)
{
interval = section.GetValue("IntervalSeconds", 0);
interval = header.Value;
}
else
{
var attribute = context.RequestHandler.GetHandlerHandleMethodAttribute<TRequest, TimerRefreshAttribute>();
if (attribute != null)
interval = attribute.IntervalSeconds;
var section = configuration.GetHandlerSection("TimerRefresh", context.Request, context.RequestHandler);
if (section != null)
{
interval = section.GetValue("IntervalSeconds", 0);
}
else
{
var attribute = context.RequestHandler.GetHandlerHandleMethodAttribute<TRequest, TimerRefreshAttribute>();
if (attribute != null)
interval = attribute.IntervalSeconds;
}
}

if (interval <= 0)
Expand Down
30 changes: 17 additions & 13 deletions tests/Shiny.Mediator.Tests/DapperTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.ComponentModel;
using Dapper;
using DryIoc.Microsoft.DependencyInjection;
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Hosting;
Expand All @@ -11,24 +12,28 @@ namespace Shiny.Mediator.Tests;

public class DapperTests(ITestOutputHelper output)
{
// const string CONN_STRING = "Host=localhost;Database=;Port=5432;Username=;Password=";
const string CONN_STRING = "Data Source=:memory:";

[Fact]
public async Task Scrutor_EndToEnd()
{
var services = new ServiceCollection();
services.AddShinyMediator(cfg => cfg
.AddDapper<SqliteConnection>(CONN_STRING)
// .AddDapper<NpgsqlConnection>(CONN_STRING)
);

await RunDbHits(services.BuildServiceProvider());
}
// [Fact]
// public async Task Scrutor_EndToEnd()
// {
// var services = new ServiceCollection();
// services.AddShinyMediator(cfg => cfg
// // .AddDapper<SqliteConnection>(CONN_STRING)
// .AddDapper<NpgsqlConnection>(CONN_STRING)
// );
//
// await RunDbHits(services.BuildServiceProvider());
// }

[Fact]
public async Task DryIoc_EndToEnd()
{
using (var conn = new SqliteConnection(CONN_STRING))
{
conn.Open();
conn.Execute("CREATE TABLE Users(Id INTEGER PRIMARY KEY, Email TEXT)");
}
var host = Host
.CreateDefaultBuilder()
.ConfigureLogging(x => x.AddXUnit(output))
Expand All @@ -37,7 +42,6 @@ public async Task DryIoc_EndToEnd()
{
services.AddShinyMediator(cfg => cfg
.AddDapper<SqliteConnection>(CONN_STRING)
// .AddDapper<NpgsqlConnection>(CONN_STRING)
);
})
.Build();
Expand Down

0 comments on commit 536dd00

Please sign in to comment.