Skip to content

Commit

Permalink
feat(webflux): Support EventStreamQueryHandler for ListQueryEventStre…
Browse files Browse the repository at this point in the history
…amHandlerFunction (#1049)
  • Loading branch information
Ahoo-Wang authored Dec 18, 2024
1 parent 477b4a4 commit 3655c1d
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ import me.ahoo.wow.filter.FilterChainBuilder
import me.ahoo.wow.filter.LogErrorHandler
import me.ahoo.wow.query.event.EventStreamQueryServiceFactory
import me.ahoo.wow.query.event.NoOpEventStreamQueryServiceFactory
import me.ahoo.wow.query.event.filter.DefaultEventStreamQueryHandler
import me.ahoo.wow.query.event.filter.EventStreamQueryContext
import me.ahoo.wow.query.event.filter.EventStreamQueryFilter
import me.ahoo.wow.query.event.filter.EventStreamQueryHandler
import me.ahoo.wow.query.event.filter.MaskingEventStreamQueryFilter
import me.ahoo.wow.query.event.filter.TailEventStreamQueryFilter
import me.ahoo.wow.query.mask.EventStreamDynamicDocumentMasker
import me.ahoo.wow.query.mask.EventStreamMaskerRegistry
import me.ahoo.wow.query.mask.StateDataMaskerRegistry
import me.ahoo.wow.query.mask.StateDynamicDocumentMasker
import me.ahoo.wow.query.snapshot.NoOpSnapshotQueryServiceFactory
Expand Down Expand Up @@ -54,25 +62,48 @@ class QueryAutoConfiguration {
fun stateDataMaskerRegistry(
maskers: List<StateDynamicDocumentMasker>
): StateDataMaskerRegistry {
val stateDataMaskerRegistry = StateDataMaskerRegistry()
val maskerRegistry = StateDataMaskerRegistry()
maskers.forEach {
stateDataMaskerRegistry.register(it)
maskerRegistry.register(it)
}
return stateDataMaskerRegistry
return maskerRegistry
}

@Bean
fun eventStreamMaskerRegistry(
maskers: List<EventStreamDynamicDocumentMasker>
): EventStreamMaskerRegistry {
val maskerRegistry = EventStreamMaskerRegistry()
maskers.forEach {
maskerRegistry.register(it)
}
return maskerRegistry
}

@Bean
fun maskingSnapshotQueryFilter(stateDataMaskerRegistry: StateDataMaskerRegistry): SnapshotQueryFilter {
return MaskingSnapshotQueryFilter(stateDataMaskerRegistry)
}

@Bean
fun maskingEventStreamQueryFilter(eventStreamMaskerRegistry: EventStreamMaskerRegistry): EventStreamQueryFilter {
return MaskingEventStreamQueryFilter(eventStreamMaskerRegistry)
}

@Bean
fun tailSnapshotQueryFilter(
snapshotQueryServiceFactory: ObjectProvider<SnapshotQueryServiceFactory>,
): TailSnapshotQueryFilter<Any> {
return TailSnapshotQueryFilter(snapshotQueryServiceFactory.getOrNoOp())
}

@Bean
fun tailEventStreamQueryFilter(
eventStreamQueryServiceFactory: ObjectProvider<EventStreamQueryServiceFactory>,
): TailEventStreamQueryFilter {
return TailEventStreamQueryFilter(eventStreamQueryServiceFactory.getOrNoOp())
}

@Bean
fun snapshotQueryFilterChain(
filters: List<Filter<SnapshotQueryContext<*, *, *>>>
Expand All @@ -83,12 +114,28 @@ class QueryAutoConfiguration {
.build()
}

@Bean
fun eventStreamQueryFilterChain(
filters: List<Filter<EventStreamQueryContext<*, *, *>>>
): FilterChain<EventStreamQueryContext<*, *, *>> {
return FilterChainBuilder<EventStreamQueryContext<*, *, *>>()
.addFilters(filters)
.filterCondition(EventStreamQueryHandler::class)
.build()
}

@Bean("snapshotQueryErrorHandler")
@ConditionalOnMissingBean(name = ["snapshotQueryErrorHandler"])
fun snapshotQueryErrorHandler(): ErrorHandler<SnapshotQueryContext<*, *, *>> {
return LogErrorHandler()
}

@Bean("eventStreamQueryErrorHandler")
@ConditionalOnMissingBean(name = ["eventStreamQueryErrorHandler"])
fun eventStreamQueryErrorHandler(): ErrorHandler<EventStreamQueryContext<*, *, *>> {
return LogErrorHandler()
}

@Bean
fun snapshotQueryHandler(
@Qualifier("snapshotQueryFilterChain") chain: FilterChain<SnapshotQueryContext<*, *, *>>,
Expand All @@ -97,6 +144,14 @@ class QueryAutoConfiguration {
return DefaultSnapshotQueryHandler(chain, queryErrorHandler)
}

@Bean
fun eventStreamQueryHandler(
@Qualifier("eventStreamQueryFilterChain") chain: FilterChain<EventStreamQueryContext<*, *, *>>,
@Qualifier("eventStreamQueryErrorHandler") queryErrorHandler: ErrorHandler<EventStreamQueryContext<*, *, *>>
): EventStreamQueryHandler {
return DefaultEventStreamQueryHandler(chain, queryErrorHandler)
}

@Bean
@ConditionalOnMissingBean(SnapshotQueryServiceFactory::class)
fun noOpSnapshotQueryServiceFactory(): SnapshotQueryServiceFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ import me.ahoo.wow.messaging.compensation.EventCompensateSupporter
import me.ahoo.wow.modeling.state.StateAggregateFactory
import me.ahoo.wow.modeling.state.StateAggregateRepository
import me.ahoo.wow.openapi.RouterSpecs
import me.ahoo.wow.query.event.EventStreamQueryServiceFactory
import me.ahoo.wow.query.event.filter.EventStreamQueryHandler
import me.ahoo.wow.query.snapshot.filter.SnapshotQueryHandler
import me.ahoo.wow.spring.boot.starter.ConditionalOnWowEnabled
import me.ahoo.wow.spring.boot.starter.command.CommandAutoConfiguration
import me.ahoo.wow.spring.boot.starter.kafka.KafkaProperties
import me.ahoo.wow.spring.boot.starter.openapi.OpenAPIAutoConfiguration
import me.ahoo.wow.spring.query.getOrNoOp
import me.ahoo.wow.webflux.exception.DefaultRequestExceptionHandler
import me.ahoo.wow.webflux.exception.GlobalExceptionHandler
import me.ahoo.wow.webflux.exception.RequestExceptionHandler
Expand Down Expand Up @@ -418,11 +417,11 @@ class WebFluxAutoConfiguration {
@Order(Ordered.HIGHEST_PRECEDENCE)
@ConditionalOnMissingBean(name = [LIST_QUERY_EVENT_STREAM_HANDLER_FUNCTION_FACTORY_BEAN_NAME])
fun listQueryEventStreamHandlerFunctionFactory(
eventStreamQueryServiceFactoryProvider: ObjectProvider<EventStreamQueryServiceFactory>,
eventStreamQueryHandler: EventStreamQueryHandler,
exceptionHandler: RequestExceptionHandler
): ListQueryEventStreamHandlerFunctionFactory {
return ListQueryEventStreamHandlerFunctionFactory(
eventStreamQueryServiceFactory = eventStreamQueryServiceFactoryProvider.getOrNoOp(),
eventStreamQueryHandler = eventStreamQueryHandler,
exceptionHandler = exceptionHandler
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package me.ahoo.wow.spring.boot.starter.query

import me.ahoo.wow.filter.ErrorHandler
import me.ahoo.wow.filter.FilterChain
import io.mockk.every
import io.mockk.spyk
import me.ahoo.wow.query.event.filter.EventStreamQueryHandler
import me.ahoo.wow.query.mask.EventStreamDynamicDocumentMasker
import me.ahoo.wow.query.mask.StateDynamicDocumentMasker
import me.ahoo.wow.query.snapshot.filter.MaskingSnapshotQueryFilter
import me.ahoo.wow.query.snapshot.filter.SnapshotQueryHandler
import me.ahoo.wow.query.snapshot.filter.TailSnapshotQueryFilter
import me.ahoo.wow.spring.boot.starter.enableWow
import me.ahoo.wow.tck.mock.MOCK_AGGREGATE_METADATA
import org.assertj.core.api.AssertionsForInterfaceTypes
import org.junit.jupiter.api.Test
import org.springframework.boot.test.context.assertj.AssertableApplicationContext
Expand All @@ -19,14 +23,27 @@ class QueryAutoConfigurationTest {
contextRunner
.enableWow()
.withUserConfiguration(QueryAutoConfiguration::class.java)
.withBean(StateDynamicDocumentMasker::class.java, {
spyk<StateDynamicDocumentMasker> {
every { namedAggregate } returns MOCK_AGGREGATE_METADATA
}
})
.withBean(EventStreamDynamicDocumentMasker::class.java, {
spyk<EventStreamDynamicDocumentMasker> {
every { namedAggregate } returns MOCK_AGGREGATE_METADATA
}
})
.run { context: AssertableApplicationContext ->
AssertionsForInterfaceTypes.assertThat(context)
.hasBean(ExistsBeanName.SNAPSHOT_QUERY_SERVICE)
.hasSingleBean(MaskingSnapshotQueryFilter::class.java)
.hasSingleBean(TailSnapshotQueryFilter::class.java)
.hasSingleBean(FilterChain::class.java)
.hasSingleBean(ErrorHandler::class.java)
.hasBean("snapshotQueryFilterChain")
.hasBean("eventStreamQueryFilterChain")
.hasBean("snapshotQueryErrorHandler")
.hasBean("eventStreamQueryErrorHandler")
.hasSingleBean(SnapshotQueryHandler::class.java)
.hasSingleBean(EventStreamQueryHandler::class.java)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import me.ahoo.wow.eventsourcing.snapshot.SnapshotRepository
import me.ahoo.wow.messaging.compensation.EventCompensateSupporter
import me.ahoo.wow.modeling.state.ConstructorStateAggregateFactory
import me.ahoo.wow.modeling.state.StateAggregateFactory
import me.ahoo.wow.query.event.filter.EventStreamQueryHandler
import me.ahoo.wow.query.snapshot.filter.SnapshotQueryHandler
import me.ahoo.wow.spring.boot.starter.command.CommandAutoConfiguration
import me.ahoo.wow.spring.boot.starter.command.CommandGatewayAutoConfiguration
Expand Down Expand Up @@ -62,6 +63,7 @@ internal class WebFluxAutoConfigurationTest {
.withBean(StateEventCompensator::class.java, { mockk() })
.withBean(EventCompensateSupporter::class.java, { mockk() })
.withBean(SnapshotQueryHandler::class.java, { spyk<SnapshotQueryHandler>() })
.withBean(EventStreamQueryHandler::class.java, { spyk<EventStreamQueryHandler>() })
.withBean(HostAddressSupplier::class.java, { LocalHostAddressSupplier.INSTANCE })
.withUserConfiguration(
CommandAutoConfiguration::class.java,
Expand Down Expand Up @@ -95,6 +97,7 @@ internal class WebFluxAutoConfigurationTest {
.withBean(StateEventCompensator::class.java, { mockk() })
.withBean(EventCompensateSupporter::class.java, { mockk() })
.withBean(SnapshotQueryHandler::class.java, { spyk<SnapshotQueryHandler>() })
.withBean(EventStreamQueryHandler::class.java, { spyk<EventStreamQueryHandler>() })
.withBean(KafkaProperties::class.java, {
KafkaProperties(bootstrapServers = listOf("localhost:9092"))
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ import me.ahoo.wow.api.query.ListQuery
import me.ahoo.wow.modeling.matedata.AggregateMetadata
import me.ahoo.wow.openapi.event.ListQueryEventStreamRouteSpec
import me.ahoo.wow.query.context.Contexts.writeRawRequest
import me.ahoo.wow.query.event.EventStreamQueryService
import me.ahoo.wow.query.event.EventStreamQueryServiceFactory
import me.ahoo.wow.query.event.filter.EventStreamQueryHandler
import me.ahoo.wow.webflux.exception.RequestExceptionHandler
import me.ahoo.wow.webflux.exception.toServerResponse
import me.ahoo.wow.webflux.route.RouteHandlerFunctionFactory
Expand All @@ -30,7 +29,7 @@ import reactor.core.publisher.Mono

class ListQueryEventStreamHandlerFunction(
private val aggregateMetadata: AggregateMetadata<*, *>,
private val eventStreamQueryService: EventStreamQueryService,
private val eventStreamQueryHandler: EventStreamQueryHandler,
private val exceptionHandler: RequestExceptionHandler
) : HandlerFunction<ServerResponse> {

Expand All @@ -39,28 +38,25 @@ class ListQueryEventStreamHandlerFunction(
return request.bodyToMono(ListQuery::class.java)
.flatMap {
val query = if (tenantId == null) it else it.appendTenantId(tenantId)
eventStreamQueryService.dynamicList(query)
eventStreamQueryHandler.dynamicList(aggregateMetadata, query)
.collectList()
.writeRawRequest(request)
}.toServerResponse(request, exceptionHandler)
}
}

class ListQueryEventStreamHandlerFunctionFactory(
private val eventStreamQueryServiceFactory: EventStreamQueryServiceFactory,
private val eventStreamQueryHandler: EventStreamQueryHandler,
private val exceptionHandler: RequestExceptionHandler
) : RouteHandlerFunctionFactory<ListQueryEventStreamRouteSpec> {
override val supportedSpec: Class<ListQueryEventStreamRouteSpec>
get() = ListQueryEventStreamRouteSpec::class.java

override fun create(spec: ListQueryEventStreamRouteSpec): HandlerFunction<ServerResponse> {
eventStreamQueryServiceFactory.create(spec.aggregateMetadata)
.let { eventStreamQueryService ->
return ListQueryEventStreamHandlerFunction(
spec.aggregateMetadata,
eventStreamQueryService,
exceptionHandler
)
}
return ListQueryEventStreamHandlerFunction(
spec.aggregateMetadata,
eventStreamQueryHandler,
exceptionHandler
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@ import io.mockk.every
import io.mockk.mockk
import me.ahoo.wow.api.query.Condition
import me.ahoo.wow.api.query.ListQuery
import me.ahoo.wow.filter.FilterChainBuilder
import me.ahoo.wow.filter.LogErrorHandler
import me.ahoo.wow.openapi.command.CommandHeaders
import me.ahoo.wow.openapi.event.ListQueryEventStreamRouteSpec
import me.ahoo.wow.query.event.NoOpEventStreamQueryServiceFactory
import me.ahoo.wow.query.event.filter.DefaultEventStreamQueryHandler
import me.ahoo.wow.query.event.filter.EventStreamQueryContext
import me.ahoo.wow.query.event.filter.EventStreamQueryHandler
import me.ahoo.wow.query.event.filter.TailEventStreamQueryFilter
import me.ahoo.wow.serialization.MessageRecords
import me.ahoo.wow.tck.mock.MOCK_AGGREGATE_METADATA
import me.ahoo.wow.webflux.exception.DefaultRequestExceptionHandler
Expand All @@ -19,11 +25,21 @@ import reactor.kotlin.core.publisher.toMono
import reactor.kotlin.test.test

class ListQueryEventStreamHandlerFunctionTest {
private val tailSnapshotQueryFilter = TailEventStreamQueryFilter(NoOpEventStreamQueryServiceFactory)
private val queryFilterChain = FilterChainBuilder<EventStreamQueryContext<*, *, *>>()
.addFilters(listOf(tailSnapshotQueryFilter))
.filterCondition(EventStreamQueryHandler::class)
.build()
private val queryHandler = DefaultEventStreamQueryHandler(
queryFilterChain,
LogErrorHandler()
)

@Test
fun handle() {
val handlerFunction =
ListQueryEventStreamHandlerFunctionFactory(
NoOpEventStreamQueryServiceFactory,
queryHandler,
DefaultRequestExceptionHandler
)
.create(
Expand Down

0 comments on commit 3655c1d

Please sign in to comment.