Skip to content

Commit

Permalink
udp_session_filters: move and rename interfaces (#35665)
Browse files Browse the repository at this point in the history
Additional Description: Step 1 in support of ECDS with UDP session
filters. This PR moves UDP session filter interfaces to core code, later
to be consumed by new ConfigProviderManager. Includes required renaming
to interfaces and objects
Risk Level: low
Testing: unit test
Docs Changes: none
Release Notes: none
Platform Specific Features: none

---------

Signed-off-by: Ohad Vano <ohadvano@gmail.com>
  • Loading branch information
ohadvano authored Aug 14, 2024
1 parent ad805fa commit bf27014
Show file tree
Hide file tree
Showing 27 changed files with 322 additions and 338 deletions.
211 changes: 211 additions & 0 deletions envoy/network/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,217 @@ class UdpListenerFilterManager {
using UdpListenerFilterFactoryCb = std::function<void(
UdpListenerFilterManager& udp_listener_filter_manager, UdpReadFilterCallbacks& callbacks)>;

/**
* Common interface for UdpSessionReadFilterCallbacks and UdpSessionWriteFilterCallbacks.
*/
class UdpSessionFilterCallbacks {
public:
virtual ~UdpSessionFilterCallbacks() = default;

/**
* @return uint64_t the ID of the originating UDP session.
*/
virtual uint64_t sessionId() const PURE;

/**
* @return StreamInfo for logging purposes.
*/
virtual StreamInfo::StreamInfo& streamInfo() PURE;

/**
* Allows a filter to inject a datagram to successive filters in the session filter chain.
* The injected datagram will be iterated as a regular received datagram, and may also be
* stopped by further filters. This can be used, for example, to continue processing previously
* buffered datagrams by a filter after an asynchronous operation ended.
*/
virtual void injectDatagramToFilterChain(Network::UdpRecvData& data) PURE;
};

class UdpSessionReadFilterCallbacks : public UdpSessionFilterCallbacks {
public:
~UdpSessionReadFilterCallbacks() override = default;

/**
* If a read filter stopped filter iteration, continueFilterChain() can be called to continue the
* filter chain. It will have onNewSession() called if it was not previously called.
* @return false if the session is removed and no longer valid, otherwise returns true.
*/
virtual bool continueFilterChain() PURE;
};

class UdpSessionWriteFilterCallbacks : public UdpSessionFilterCallbacks {};

class UdpSessionFilterBase {
public:
virtual ~UdpSessionFilterBase() = default;

/**
* This routine is called before the access log handlers' final log() is called. Filters can use
* this callback to enrich the data passed in to the log handlers.
*/
void onSessionComplete() {
if (!on_session_complete_already_called_) {
onSessionCompleteInternal();
on_session_complete_already_called_ = true;
}
}

protected:
/**
* This routine is called by onSessionComplete to enrich the data passed in to the log handlers.
*/
virtual void onSessionCompleteInternal() { ASSERT(!on_session_complete_already_called_); }

private:
bool on_session_complete_already_called_{false};
};

/**
* Return codes for read filter invocations.
*/
enum class UdpSessionReadFilterStatus {
// Continue to further session filters.
Continue,
// Stop executing further session filters.
StopIteration,
};

/**
* Session read filter interface.
*/
class UdpSessionReadFilter : public virtual UdpSessionFilterBase {
public:
~UdpSessionReadFilter() override = default;

/**
* Called when a new UDP session is first established. Filters should do one time long term
* processing that needs to be done when a session is established. Filter chain iteration
* can be stopped if needed.
* @return status used by the filter manager to manage further filter iteration.
*/
virtual UdpSessionReadFilterStatus onNewSession() PURE;

/**
* Called when UDP datagram is read and matches the session that manages the filter.
* @param data supplies the read data which may be modified.
* @return status used by the filter manager to manage further filter iteration.
*/
virtual UdpSessionReadFilterStatus onData(Network::UdpRecvData& data) PURE;

/**
* Initializes the read filter callbacks used to interact with the filter manager. It will be
* called by the filter manager a single time when the filter is first registered.
*
* IMPORTANT: No outbound networking or complex processing should be done in this function.
* That should be done in the context of onNewSession() if needed.
*
* @param callbacks supplies the callbacks.
*/
virtual void initializeReadFilterCallbacks(UdpSessionReadFilterCallbacks& callbacks) PURE;
};

using UdpSessionReadFilterSharedPtr = std::shared_ptr<UdpSessionReadFilter>;

/**
* Return codes for write filter invocations.
*/
enum class UdpSessionWriteFilterStatus {
// Continue to further session filters.
Continue,
// Stop executing further session filters.
StopIteration,
};

/**
* Session write filter interface.
*/
class UdpSessionWriteFilter : public virtual UdpSessionFilterBase {
public:
~UdpSessionWriteFilter() override = default;

/**
* Called when data is to be written on the UDP session.
* @param data supplies the buffer to be written which may be modified.
* @return status used by the filter manager to manage further filter iteration.
*/
virtual UdpSessionWriteFilterStatus onWrite(Network::UdpRecvData& data) PURE;

/**
* Initializes the write filter callbacks used to interact with the filter manager. It will be
* called by the filter manager a single time when the filter is first registered.
*
* IMPORTANT: No outbound networking or complex processing should be done in this function.
* That should be done in the context of ReadFilter::onNewSession() if needed.
*
* @param callbacks supplies the callbacks.
*/
virtual void initializeWriteFilterCallbacks(UdpSessionWriteFilterCallbacks& callbacks) PURE;
};

using UdpSessionWriteFilterSharedPtr = std::shared_ptr<UdpSessionWriteFilter>;

/**
* A combination read and write filter. This allows a single filter instance to cover
* both the read and write paths.
*/
class UdpSessionFilter : public virtual UdpSessionReadFilter,
public virtual UdpSessionWriteFilter {};
using UdpSessionFilterSharedPtr = std::shared_ptr<UdpSessionFilter>;

/**
* These callbacks are provided by the UDP session manager to the factory so that the factory
* can * build the filter chain in an application specific way.
*/
class UdpSessionFilterChainFactoryCallbacks {
public:
virtual ~UdpSessionFilterChainFactoryCallbacks() = default;

/**
* Add a read filter that is used when reading UDP session data.
* @param filter supplies the filter to add.
*/
virtual void addReadFilter(UdpSessionReadFilterSharedPtr filter) PURE;

/**
* Add a write filter that is used when writing UDP session data.
* @param filter supplies the filter to add.
*/
virtual void addWriteFilter(UdpSessionWriteFilterSharedPtr filter) PURE;

/**
* Add a bidirectional filter that is used when reading and writing UDP session data.
* @param filter supplies the filter to add.
*/
virtual void addFilter(UdpSessionFilterSharedPtr filter) PURE;
};

/**
* This function is used to wrap the creation of a UDP session filter chain for new sessions as they
* come in. Filter factories create the function at configuration initialization time, and then
* they are used at runtime.
* @param callbacks supplies the callbacks for the stream to install filters to. Typically the
* function will install a single filter, but it's technically possibly to install more than one
* if desired.
*/
using UdpSessionFilterFactoryCb =
std::function<void(UdpSessionFilterChainFactoryCallbacks& callbacks)>;

/**
* A UdpSessionFilterChainFactory is used by a UDP session manager to create a UDP session filter
* chain when a new session is created.
*/
class UdpSessionFilterChainFactory {
public:
virtual ~UdpSessionFilterChainFactory() = default;

/**
* Called when a new UDP session is created.
* @param callbacks supplies the "sink" that is used for actually creating the filter chain. @see
* UdpSessionFilterChainFactoryCallbacks.
*/
virtual void createFilterChain(UdpSessionFilterChainFactoryCallbacks& callbacks) const PURE;
};

/**
* Creates a chain of network filters for a new connection.
*/
Expand Down
23 changes: 23 additions & 0 deletions envoy/server/filter_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,29 @@ class NamedUdpListenerFilterConfigFactory : public ListenerFilterConfigFactoryBa
std::string category() const override { return "envoy.filters.udp_listener"; }
};

/**
* Implemented by each UDP session filter and registered via Registry::registerFactory or the
* convenience class RegisterFactory.
*/
class NamedUdpSessionFilterConfigFactory : public Envoy::Config::TypedFactory {
public:
~NamedUdpSessionFilterConfigFactory() override = default;

/**
* Create a particular UDP session filter factory implementation. If the implementation is
* unable to produce a factory with the provided parameters, it should throw an EnvoyException
* in the case of general error. The returned callback should always be initialized.
* @param config supplies the configuration for the filter
* @param context supplies the filter's context.
* @return UdpSessionFilterFactoryCb the factory creation function.
*/
virtual Network::UdpSessionFilterFactoryCb
createFilterFactoryFromProto(const Protobuf::Message& config,
Server::Configuration::FactoryContext& context) PURE;

std::string category() const override { return "envoy.filters.udp.session"; }
};

/**
* Implemented by each QUIC listener filter and registered via Registry::registerFactory()
* or the convenience class RegisterFactory.
Expand Down
3 changes: 1 addition & 2 deletions source/extensions/filters/udp/udp_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ envoy_cc_library(
"//source/common/stream_info:stream_info_lib",
"//source/common/upstream:load_balancer_context_base_lib",
"//source/extensions/filters/udp/udp_proxy/router:router_lib",
"//source/extensions/filters/udp/udp_proxy/session_filters:filter_config_interface",
"//source/extensions/filters/udp/udp_proxy/session_filters:filter_interface",
"@envoy_api//envoy/config/accesslog/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/filters/udp/udp_proxy/v3:pkg_cc_proto",
],
Expand All @@ -64,6 +62,7 @@ envoy_cc_extension(
":udp_proxy_filter_lib",
"//envoy/registry",
"//envoy/server:filter_config_interface",
"//source/common/filter:config_discovery_lib",
"//source/common/formatter:substitution_format_string_lib",
"@envoy_api//envoy/extensions/filters/udp/udp_proxy/v3:pkg_cc_proto",
],
Expand Down
6 changes: 4 additions & 2 deletions source/extensions/filters/udp/udp_proxy/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,12 @@ UdpProxyFilterConfigImpl::UdpProxyFilterConfigImpl(
MessageUtil::getJsonStringFromMessageOrError(
static_cast<const Protobuf::Message&>(filter.typed_config()), true));

auto& factory = Config::Utility::getAndCheckFactory<NamedUdpSessionFilterConfigFactory>(filter);
auto& factory = Config::Utility::getAndCheckFactory<
Server::Configuration::NamedUdpSessionFilterConfigFactory>(filter);
ProtobufTypes::MessagePtr message = Envoy::Config::Utility::translateToFactoryConfig(
filter, context.messageValidationVisitor(), factory);
FilterFactoryCb callback = factory.createFilterFactoryFromProto(*message, context);
Network::UdpSessionFilterFactoryCb callback =
factory.createFilterFactoryFromProto(*message, context);
filter_factories_.push_back(callback);
}
}
Expand Down
12 changes: 6 additions & 6 deletions source/extensions/filters/udp/udp_proxy/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class TunnelingConfigImpl : public UdpTunnelingConfig {
};

class UdpProxyFilterConfigImpl : public UdpProxyFilterConfig,
public FilterChainFactory,
public UdpSessionFilterChainFactory,
Logger::Loggable<Logger::Id::config> {
public:
UdpProxyFilterConfigImpl(
Expand Down Expand Up @@ -138,7 +138,7 @@ class UdpProxyFilterConfigImpl : public UdpProxyFilterConfig,
const std::vector<AccessLog::InstanceSharedPtr>& proxyAccessLogs() const override {
return proxy_access_logs_;
}
const FilterChainFactory& sessionFilterFactory() const override { return *this; };
const UdpSessionFilterChainFactory& sessionFilterFactory() const override { return *this; };
bool hasSessionFilters() const override { return !filter_factories_.empty(); }
const UdpTunnelingConfigPtr& tunnelingConfig() const override { return tunneling_config_; };
bool flushAccessLogOnTunnelConnected() const override {
Expand All @@ -149,9 +149,9 @@ class UdpProxyFilterConfigImpl : public UdpProxyFilterConfig,
}
Random::RandomGenerator& randomGenerator() const override { return random_generator_; }

// FilterChainFactory
void createFilterChain(FilterChainFactoryCallbacks& callbacks) const override {
for (const FilterFactoryCb& factory : filter_factories_) {
// UdpSessionFilterChainFactory
void createFilterChain(Network::UdpSessionFilterChainFactoryCallbacks& callbacks) const override {
for (const Network::UdpSessionFilterFactoryCb& factory : filter_factories_) {
factory(callbacks);
}
};
Expand All @@ -178,7 +178,7 @@ class UdpProxyFilterConfigImpl : public UdpProxyFilterConfig,
std::vector<AccessLog::InstanceSharedPtr> session_access_logs_;
std::vector<AccessLog::InstanceSharedPtr> proxy_access_logs_;
UdpTunnelingConfigPtr tunneling_config_;
std::list<SessionFilters::FilterFactoryCb> filter_factories_;
std::list<Network::UdpSessionFilterFactoryCb> filter_factories_;
Random::RandomGenerator& random_generator_;
};

Expand Down
27 changes: 1 addition & 26 deletions source/extensions/filters/udp/udp_proxy/session_filters/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -8,42 +8,17 @@ licenses(["notice"]) # Apache 2

envoy_extension_package()

envoy_cc_library(
name = "filter_config_interface",
hdrs = ["filter_config.h"],
deps = [
":filter_interface",
"//envoy/config:typed_config_interface",
"//envoy/server:filter_config_interface",
"//source/common/common:macros",
"//source/common/protobuf:cc_wkt_protos",
],
)

envoy_cc_library(
name = "factory_base_lib",
hdrs = ["factory_base.h"],
visibility = ["//visibility:public"],
deps = [
":filter_config_interface",
"//envoy/server:filter_config_interface",
"//source/common/protobuf:utility_lib",
],
)

envoy_cc_library(
name = "filter_interface",
hdrs = ["filter.h"],
visibility = ["//visibility:public"],
deps = [
"//envoy/network:listener_interface",
"//envoy/stream_info:stream_info_interface",
],
)

envoy_cc_library(
name = "pass_through_filter_lib",
hdrs = ["pass_through_filter.h"],
deps = [
":filter_interface",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ envoy_cc_library(
"//source/common/http:header_utility_lib",
"//source/common/stream_info:uint32_accessor_lib",
"//source/extensions/common/dynamic_forward_proxy:dns_cache_interface",
"//source/extensions/filters/udp/udp_proxy/session_filters:filter_interface",
"@envoy_api//envoy/extensions/filters/udp/udp_proxy/session/dynamic_forward_proxy/v3:pkg_cc_proto",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ FilterFactoryCb DynamicForwardProxyNetworkFilterConfigFactory::createFilterFacto
ProxyFilterConfigSharedPtr filter_config(
std::make_shared<ProxyFilterConfig>(proto_config, cache_manager_factory, context));

return [filter_config](FilterChainFactoryCallbacks& callbacks) -> void {
return [filter_config](Network::UdpSessionFilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addReadFilter(std::make_shared<ProxyFilter>(filter_config));
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace DynamicForwardProxy {

using FilterConfig =
envoy::extensions::filters::udp::udp_proxy::session::dynamic_forward_proxy::v3::FilterConfig;
using FilterFactoryCb = Network::UdpSessionFilterFactoryCb;

/**
* Config registration for the dynamic_forward_proxy filter. @see
Expand Down
Loading

0 comments on commit bf27014

Please sign in to comment.