-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
executable file
·152 lines (129 loc) · 4.12 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
/*
* index.js: Island Worker boilerplate
*
*/
var http = require('http');
var Step = require('step');
var _ = require('underscore');
var db = require('mongish');
var zmq = require('zmq');
var start = exports.start = function (opts, cb) {
cb = cb || function(){};
// Router-Dealer pattern for two-way communication.
if (opts.routerPort) {
var frontPort = opts.routerPort;
var backPort = 'ipc:///tmp/queue';
// Create a router and dealer for client messages.
var frontSock = zmq.socket('router');
var backSock = zmq.socket('dealer');
frontSock.identity = 'r' + process.pid;
backSock.identity = 'd' + process.pid;
// Router handling.
frontSock.bindSync(frontPort);
frontSock.on('message', function () {
// Inform server.
backSock.send(Array.prototype.slice.call(arguments));
});
// Dealer handling.
backSock.bindSync(backPort);
backSock.on('message', function (id, del, data) {
// Send message receipt to appropriate client.
frontSock.send(Array.prototype.slice.call(arguments));
});
// Create a queue for client messages.
var queue = zmq.socket('rep');
queue.identity = 'q' + process.pid;
// Queue handling.
queue.on('message', function (data) {
data = JSON.parse(data.toString());
var res = {__cb: data.__cb};
function _cb() {
// Inform dealer.
queue.send(JSON.stringify(res));
}
if (opts.onSocketMessage) {
opts.onSocketMessage(data.msg, function (err, msg) {
if (err) {
res.error = err;
} else if (_.isObject(msg)) {
res.msg = msg;
} else {
res.msg = {result: msg};
}
_cb();
});
} else {
_cb()
}
});
// Connect to router.
queue.connect(backPort, function (err) {
if (err) throw err;
});
}
// XPub-XSub pattern for brokering messages.
if (opts.pubPort && opts.subPort) {
var hwm = 1000;
var verbose = 0;
// The xsub listener is where pubs connect to
var subSock = zmq.socket('xsub');
subSock.identity = 'subscriber' + process.pid;
subSock.bindSync(opts.subPort);
// The xpub listener is where subs connect to
var pubSock = zmq.socket('xpub');
pubSock.identity = 'publisher' + process.pid;
pubSock.setsockopt(zmq.ZMQ_SNDHWM, hwm);
// By default xpub only signals new subscriptions
// Settings it to verbose = 1 , will signal on every new subscribe
pubSock.setsockopt(zmq.ZMQ_XPUB_VERBOSE, verbose);
pubSock.bindSync(opts.pubPort);
// When we receive data on subSock, it means someone is publishing
subSock.on('message', function(data) {
// We just relay it to the pubSock, so subscribers can receive it
pubSock.send(data);
});
// When Pubsock receives a message, it subscribe requests
pubSock.on('message', function(data, bla) {
// The data is a slow Buffer
// The first byte is the subscribe (1) /unsubscribe flag (0)
var type = data[0]===0 ? 'unsubscribe': 'subscribe';
// The channel name is the rest of the buffer
var channel = data.slice(1).toString();
console.log(type + ':' + channel);
// We send it to subSock, so it knows to what channels to listen to
subSock.send(data);
});
}
// Open a route for health check.
http.createServer(function (req, res) {
res.writeHead(200, {'Content-Type': 'text/plain'});
res.end();
}).listen(opts.port);
Step(
function () {
// Open DB connection if URI is present in opts.
if (opts.mongoURI) {
new db.Connection(opts.mongoURI, {ensureIndexes: opts.indexDb}, this);
} else {
this();
}
},
function (err, connection) {
if (err) return this(err);
// Init collections.
if (!connection || _.size(opts.collections) === 0) {
return this();
}
_.each(opts.collections, _.bind(function (c, name) {
connection.add(name, c, this.parallel());
}, this));
},
function (err) {
if (err) return cb(err);
cb();
if (opts.onReady) {
opts.onReady(db);
}
}
);
}