Skip to content

Commit

Permalink
Move interactive-specific TestStreamService to interactive protos.
Browse files Browse the repository at this point in the history
Notably this allows the protos in model/pipeline to be definitions only
rather than mixing services and definitions.
  • Loading branch information
robertwb committed Feb 4, 2025
1 parent e2e1cb4 commit 0788129
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,16 @@ message TestStreamFileRecord {
// The recorded event from an element stream.
org.apache.beam.model.pipeline.v1.TestStreamPayload.Event recorded_event = 1;
}

service TestStreamService {
// A TestStream will request for events using this RPC.
rpc Events(EventsRequest) returns (stream org.apache.beam.model.pipeline.v1.TestStreamPayload.Event) {}
}

message EventsRequest {
// The set of PCollections to read from. These are the PTransform outputs
// local names. These are a subset of the TestStream's outputs. This allows
// Interactive Beam to cache many PCollections from a pipeline then replay a
// subset of them.
repeated string output_ids = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -722,19 +722,6 @@ message TestStreamPayload {
}
}

service TestStreamService {
// A TestStream will request for events using this RPC.
rpc Events(EventsRequest) returns (stream TestStreamPayload.Event) {}
}

message EventsRequest {
// The set of PCollections to read from. These are the PTransform outputs
// local names. These are a subset of the TestStream's outputs. This allows
// Interactive Beam to cache many PCollections from a pipeline then replay a
// subset of them.
repeated string output_ids = 1;
}

// The payload for the special-but-not-primitive WriteFiles transform.
message WriteFilesPayload {

Expand All @@ -750,7 +737,7 @@ message WriteFilesPayload {

map<string, SideInput> side_inputs = 5;

// This is different from runner based sharding. This is done by the runner backend, where as runner_determined_sharding
// This is different from runner based sharding. This is done by the runner backend, where as runner_determined_sharding
// is by the runner translator
bool auto_sharded = 6;
}
Expand Down Expand Up @@ -968,7 +955,7 @@ message StandardCoders {
// 01 - on time
// 10 - late
// 11 - unknown
// * bit 6 is 1 if this is the last pane, 0 otherwise.
// * bit 6 is 1 if this is the last pane, 0 otherwise.
// Commonly set with `byte |= 0x02`
// * bit 7 is 1 if this is the first pane, 0 otherwise.
// Commonly set with `byte |= 0x01`
Expand Down
7 changes: 4 additions & 3 deletions sdks/python/apache_beam/runners/direct/test_stream_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@
from apache_beam import ParDo
from apache_beam import coders
from apache_beam import pvalue
from apache_beam.portability.api import beam_interactive_api_pb2
from apache_beam.portability.api import beam_interactive_api_pb2_grpc
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.portability.api import beam_runner_api_pb2_grpc
from apache_beam.testing.test_stream import ElementEvent
from apache_beam.testing.test_stream import ProcessingTimeEvent
from apache_beam.testing.test_stream import WatermarkEvent
Expand Down Expand Up @@ -267,10 +268,10 @@ def _stream_events_from_rpc(endpoint, output_tags, coder, channel, is_alive):
is placed on the channel to signify a successful end.
"""
stub_channel = grpc.insecure_channel(endpoint)
stub = beam_runner_api_pb2_grpc.TestStreamServiceStub(stub_channel)
stub = beam_interactive_api_pb2_grpc.TestStreamServiceStub(stub_channel)

# Request the PCollections that we are looking for from the service.
event_request = beam_runner_api_pb2.EventsRequest(
event_request = beam_interactive_api_pb2.EventsRequest(
output_ids=[str(tag) for tag in output_tags])

event_stream = stub.Events(event_request)
Expand Down
6 changes: 3 additions & 3 deletions sdks/python/apache_beam/testing/test_stream_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@

import grpc

from apache_beam.portability.api import beam_runner_api_pb2_grpc
from apache_beam.portability.api import beam_interactive_api_pb2_grpc


class TestStreamServiceController(
beam_runner_api_pb2_grpc.TestStreamServiceServicer):
beam_interactive_api_pb2_grpc.TestStreamServiceServicer):
"""A server that streams TestStreamPayload.Events from a single EventRequest.
This server is used as a way for TestStreams to receive events from file.
Expand All @@ -42,7 +42,7 @@ def __init__(self, reader, endpoint=None, exception_handler=None):
port = self._server.add_insecure_port('localhost:0')
self.endpoint = 'localhost:{}'.format(port)

beam_runner_api_pb2_grpc.add_TestStreamServiceServicer_to_server(
beam_interactive_api_pb2_grpc.add_TestStreamServiceServicer_to_server(
self, self._server)
self._reader = reader
self._exception_handler = exception_handler
Expand Down
10 changes: 5 additions & 5 deletions sdks/python/apache_beam/testing/test_stream_service_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import grpc

from apache_beam.portability.api import beam_interactive_api_pb2
from apache_beam.portability.api import beam_interactive_api_pb2_grpc
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.portability.api import beam_runner_api_pb2_grpc
from apache_beam.testing.test_stream_service import TestStreamServiceController

# Nose automatically detects tests if they match a regex. Here, it mistakens
Expand Down Expand Up @@ -63,14 +63,14 @@ def setUp(self):
self.controller.start()

channel = grpc.insecure_channel(self.controller.endpoint)
self.stub = beam_runner_api_pb2_grpc.TestStreamServiceStub(channel)
self.stub = beam_interactive_api_pb2_grpc.TestStreamServiceStub(channel)

def tearDown(self):
self.controller.stop()

def test_normal_run(self):
r = self.stub.Events(
beam_runner_api_pb2.EventsRequest(output_ids=EXPECTED_KEYS))
beam_interactive_api_pb2.EventsRequest(output_ids=EXPECTED_KEYS))
events = [e for e in r]
expected_events = [
e for e in EventsReader(
Expand All @@ -81,9 +81,9 @@ def test_normal_run(self):

def test_multiple_sessions(self):
resp_a = self.stub.Events(
beam_runner_api_pb2.EventsRequest(output_ids=EXPECTED_KEYS))
beam_interactive_api_pb2.EventsRequest(output_ids=EXPECTED_KEYS))
resp_b = self.stub.Events(
beam_runner_api_pb2.EventsRequest(output_ids=EXPECTED_KEYS))
beam_interactive_api_pb2.EventsRequest(output_ids=EXPECTED_KEYS))

events_a = []
events_b = []
Expand Down

0 comments on commit 0788129

Please sign in to comment.