-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconnection.h
144 lines (116 loc) · 4.66 KB
/
connection.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#pragma once
#include <functional>
#include <vector>
#include <one/arcus/error.h>
#include <one/arcus/c_platform.h>
#include <one/arcus/internal/accumulator.h>
#include <one/arcus/internal/health.h>
#include <one/arcus/internal/ring.h>
#include <one/arcus/internal/time.h>
namespace i3d {
namespace one {
namespace codec {
struct Header;
}
class Socket;
class Message;
template <typename T>
class RingBuffer;
namespace connection {
// Stream buffers sizes used to pump pending data from/to the connection's
// socket.
constexpr size_t stream_send_buffer_size() {
return 1024 * 128;
}
constexpr size_t stream_receive_buffer_size() {
return 1024 * 128;
}
} // namespace connection
// Connection manages Arcus protocol communication between two TCP sockets.
class Connection final {
public:
// A default that can be used for production.
static constexpr size_t max_message_default = 48;
static constexpr int handshake_timeout_seconds = 1;
// Connection must be given an active socket. Socket errors encountered
// during processing will be returned as errors, and it is the caller's
// responsibilty to either destroy the Connection, or restore the Socket's
// state for communication.
// Creating the conneciton starts the handshake timeout.
Connection(size_t max_messages_in, size_t max_messages_out);
~Connection() = default;
// Init the connection with the given socket. The given socket should be
// active. Must be called after construction and shutdown. Handshaking timers
// start when init is called.
void init(Socket &socket);
// Clears Connection to construction state. Erases all pending incoming
// and outgoing data. Unassigns the socket.
void shutdown();
// Marks this side of the connection as responsible for initiating the
// handshaking process. Must be called from one side of the connection
// only. Attempting to send a Message or any other data to other side of
// the Connection before handshaking is complete results in an error.
// Must be called after init, before updating.
OneError initiate_handshake();
// Update process incoming and outgoing messges. It attempts to read
// all incoming messages that are available. It attempts to send all
// queued outgoing messages. Must be called after init.
OneError update();
enum class Status {
uninitialized,
handshake_not_started,
handshake_hello_received,
handshake_hello_scheduled,
handshake_hello_sent,
ready,
error
};
Status status() const;
// Adds a Message to the outgoing message queue, and passes the message
// back in a modifier function that allows the caller to configure the
// queued message. If the outgoing message queue is full, then the
// call fails with ONE_ERROR_INSUFFICIENT_SPACE and the queue is not
// modified. Must be called after init.
OneError add_outgoing(const Message &message);
// The number of incoming messages available for pop. Must be called after
// init.
OneError incoming_count(unsigned int &count) const;
// Removes a message from the incoming message queue, but before doing so
// passes the message into the given callback for reading.
// Returns ONE_ERROR_EMPTY if the incoming_count is
// zero and there is no message to pop.
// Note that some messages are internally consumed and do not show up in
// the incoming count or here.
// Must be called after init.
OneError remove_incoming(
std::function<OneError(const Message &message)> read_callback);
private:
Connection() = delete;
OneError process_handshake();
// Reads all available incoming messages from the socket and stores them in
// the incoming message queue.
OneError process_incoming_messages();
// Sends all outgoing messages in the queue as long as the socket is ready
// for sending.
OneError process_outgoing_messages();
OneError process_health();
// Message helpers.
OneError try_read_data_into_in_stream();
OneError try_read_message_from_in_stream(codec::Header &header, Message &message);
// Handshake helpers.
OneError ensure_nothing_received();
OneError try_send_hello(); // The initial hello type.
OneError try_receive_hello();
OneError try_send_hello_message(); // Hello as a Message with opcode.
OneError try_receive_hello_message();
Socket *_socket;
Status _status;
Accumulator _in_stream;
Accumulator _out_stream;
Ring<Message> _incoming_messages;
Ring<Message> _outgoing_messages;
IntervalTimer _handshake_timer;
HealthChecker _health_checker;
};
} // namespace one
} // namespace i3d