Skip to content

Commit

Permalink
Merge pull request #99 from FlowFuse/98-qos-2
Browse files Browse the repository at this point in the history
Ensure all pub/sub MQTT operations use QoS 2
  • Loading branch information
knolleary authored Sep 2, 2024
2 parents e8ae1f8 + d27595e commit 330e6e1
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 60 deletions.
4 changes: 2 additions & 2 deletions nodes/project-link.js
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ module.exports = function (RED) {
}
/** @type {MQTT.IClientSubscribeOptions} */
const subOptions = Object.assign({}, options)
subOptions.qos = subOptions.qos == null ? 1 : subOptions.qos
subOptions.qos = subOptions.qos == null ? 2 : subOptions.qos
subOptions.properties = Object.assign({}, options.properties)
subOptions.properties.userProperties = subOptions.properties.userProperties || {}
subOptions.properties.userProperties._projectID = RED.settings.flowforge.projectID || ''
Expand Down Expand Up @@ -524,7 +524,7 @@ module.exports = function (RED) {
}
/** @type {MQTT.IClientPublishOptions} */
const pubOptions = Object.assign({}, options)
pubOptions.qos = pubOptions.qos == null ? 1 : pubOptions.qos
pubOptions.qos = pubOptions.qos == null ? 2 : pubOptions.qos
pubOptions.properties = Object.assign({}, options.properties)
pubOptions.properties.userProperties = pubOptions.properties.userProperties || {}
pubOptions.properties.userProperties._projectID = RED.settings.flowforge.projectID || ''
Expand Down
242 changes: 184 additions & 58 deletions test/unit/nodes/project-link_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,78 +7,90 @@ const projectLinkPackage = require('../../../nodes/project-link.js')
const { HttpProxyAgent } = require('http-proxy-agent')
const { HttpsProxyAgent } = require('https-proxy-agent')

const TEAM_ID = 'ABCD5678'
const PROJECT_ID = '1234-5678-9012-9876abcdef12'
describe('project-link node', function () {
afterEach(function () {
delete process.env.http_proxy
delete process.env.https_proxy
sinon.restore()
})

describe('proxy', function () {
function setup (httpProxy, httpsProxy, allProxy, noProxy, forgeUrl, mqttUrl) {
const mqttConnectStub = sinon.stub(MQTT, 'connect').returns({
on: sinon.fake(),
subscribe: sinon.fake(),
unsubscribe: sinon.fake(),
publish: sinon.fake(),
removeAllListeners: sinon.fake(),
end: sinon.fake()
})
process.env.http_proxy = httpProxy || ''
process.env.https_proxy = httpsProxy || ''
process.env.all_proxy = allProxy || ''
process.env.no_proxy = noProxy || ''
const nodes = {}
const RED = {
settings: {
flowforge: {
forgeURL: forgeUrl || 'https://local.testfuse.com',
projectID: '1234',
teamID: '5678',
projectLink: {
featureEnabled: true,
broker: {
url: mqttUrl || 'ws://localhost',
clientId: 'nr-project-link'
}
function setup (httpProxy, httpsProxy, allProxy, noProxy, forgeUrl, mqttUrl) {
const mqttStub = {
on: sinon.fake(),
subscribe: sinon.fake(),
unsubscribe: sinon.fake(),
publish: sinon.fake(function (topic, message, options, callback) {
const pkt = { topic, message, options }
callback(null, pkt)
}),
removeAllListeners: sinon.fake(),
end: sinon.fake()
}
const mqttConnectStub = sinon.stub(MQTT, 'connect').returns(mqttStub)
process.env.http_proxy = httpProxy || ''
process.env.https_proxy = httpsProxy || ''
process.env.all_proxy = allProxy || ''
process.env.no_proxy = noProxy || ''
const nodes = {}
const RED = {
settings: {
flowforge: {
forgeURL: forgeUrl || 'https://local.testfuse.com',
projectID: PROJECT_ID,
teamID: TEAM_ID,
projectLink: {
featureEnabled: true,
broker: {
url: mqttUrl || 'ws://localhost',
clientId: 'nr-project-link'
}
}
}
},
log: {
error: sinon.fake(() => { console.error(...arguments) }),
debug: sinon.fake(() => { console.debug(...arguments) }),
log: sinon.fake(() => { console.log(...arguments) }),
warn: sinon.fake(() => { console.warn(...arguments) }),
info: sinon.fake(() => { console.info(...arguments) }),
trace: sinon.fake(() => { console.trace(...arguments) })
},
nodes: {
createNode: function (node, config) {
return node
},
log: {
error: sinon.fake(() => { console.error(...arguments) }),
debug: sinon.fake(() => { console.debug(...arguments) }),
log: sinon.fake(() => { console.log(...arguments) }),
warn: sinon.fake(() => { console.warn(...arguments) }),
info: sinon.fake(() => { console.info(...arguments) }),
trace: sinon.fake(() => { console.trace(...arguments) })
},
nodes: {
createNode: function (node, config) {
return node
},
// RED.nodes.registerType('project link in', ProjectLinkInNode, {
registerType: sinon.fake((name, NodeConstructor, options) => {
nodes[name] = {
name,
NodeConstructor,
options
}
})
},
httpAdmin: {
get: sinon.stub()
},
auth: {
needsPermission: sinon.stub()
// RED.nodes.registerType('project link in', ProjectLinkInNode, {
registerType: sinon.fake((name, NodeConstructor, options) => {
nodes[name] = {
name,
NodeConstructor,
options
}
})
},
httpAdmin: {
get: sinon.stub()
},
auth: {
needsPermission: sinon.stub()
},
util: {
cloneMessage (msg) {
return JSON.parse(JSON.stringify(msg))
}
}
return {
RED,
nodes,
mqttConnectStub
}
}
return {
RED,
nodes,
mqttStub,
mqttConnectStub
}
}

describe('proxy', function () {
afterEach(function () {
sinon.restore()
})
Expand Down Expand Up @@ -255,4 +267,118 @@ describe('project-link node', function () {
})
})
})
describe('Nodes', function () {
it('project link in should subscribe using QoS 2', function () {
const env = setup()
const inNode = {
on: sinon.fake(),
type: 'project link in'
}
const topic = 'cloud/project-nodes-test/xxx'
const RED = env.RED
projectLinkPackage(RED)
const NodeConstructor = env.nodes['project link in'].NodeConstructor
NodeConstructor.call(inNode, { topic, project: PROJECT_ID })

env.mqttStub.subscribe.calledOnce.should.be.true()
should(env.mqttStub.subscribe.args[0][0]).equal(`ff/v1/${TEAM_ID}/p/${PROJECT_ID}/in/${topic}`)
const options = env.mqttStub.subscribe.args[0][1]
should(options).be.an.Object()
options.should.have.property('qos').and.equal(2)
options.should.have.property('properties').and.be.an.Object()
options.properties.should.have.property('subscriptionIdentifier').and.be.a.Number()
})
it('project link call should publish and subscribe using QoS 2', async function () {
const env = setup()
const RED = env.RED
const nodeEvents = {}
const callNode = {
on: (event, cb) => {
nodeEvents[event] = cb
},
error: sinon.fake(),
type: 'project link call'
}
const topic = 'cloud/project-nodes-test/call'
const expectedPubTopic = `ff/v1/${TEAM_ID}/p/${PROJECT_ID}/in/${topic}`
const expectedSubTopic = `ff/v1/${TEAM_ID}/p/${PROJECT_ID}/res/${topic}`
projectLinkPackage(RED)
const NodeConstructor = env.nodes['project link call'].NodeConstructor
NodeConstructor.call(callNode, { topic, project: PROJECT_ID })
callNode.should.have.property('topic', expectedPubTopic)
callNode.should.have.property('responseTopic', expectedSubTopic)
env.mqttStub.subscribe.calledOnce.should.be.true()
should(env.mqttStub.subscribe.args[0][0]).equal(expectedSubTopic)
const options = env.mqttStub.subscribe.args[0][1]
should(options).be.an.Object()
options.should.have.property('qos').and.equal(2)

// send a message to the node so that it can publish
await nodeEvents.input({ payload: 'test' }, sinon.fake(), sinon.fake())

// ensure qos 2 on the publish
env.mqttStub.publish.calledOnce.should.be.true()
const pubTopic = env.mqttStub.publish.args[0][0]
should(pubTopic).equal(expectedPubTopic)
const pubMessageStr = env.mqttStub.publish.args[0][1]
const pubMessage = JSON.parse(pubMessageStr)
should(pubMessage).be.an.Object()
pubMessage.should.have.property('payload').and.equal('test')
pubMessage.should.have.property('projectLink').and.be.an.Object()
pubMessage.projectLink.should.have.property('callStack').and.be.an.Array()
pubMessage.projectLink.callStack.should.have.length(1)
pubMessage.projectLink.callStack[0].should.have.property('topic').and.equal(topic)
pubMessage.projectLink.callStack[0].should.have.property('ts').and.be.a.Number()
pubMessage.projectLink.callStack[0].should.have.property('response').and.equal('res')
pubMessage.projectLink.callStack[0].should.have.property('application')
pubMessage.projectLink.callStack[0].should.have.property('instance')
pubMessage.projectLink.callStack[0].should.have.property('node')
pubMessage.projectLink.callStack[0].should.have.property('project').and.equal(PROJECT_ID)
pubMessage.projectLink.callStack[0].should.have.property('eventId').and.be.a.String()

const pubOptions = env.mqttStub.publish.args[0][2]
should(pubOptions).be.an.Object()
pubOptions.should.have.property('qos').and.equal(2)
})
it('project link out should publish using QoS 2', async function () {
const env = setup()
const RED = env.RED
const nodeEvents = {}
const outNode = {
on: (event, cb) => {
nodeEvents[event] = cb
},
error: sinon.fake(),
type: 'project link call',
mode: 'link'
}
const topic = 'cloud/project-nodes-test/xxx'
const expectedPubTopic = `ff/v1/${TEAM_ID}/p/${PROJECT_ID}/in/${topic}`
projectLinkPackage(RED)
const NodeConstructor = env.nodes['project link out'].NodeConstructor
NodeConstructor.call(outNode, { topic, project: PROJECT_ID })
outNode.should.have.property('subTopic', topic)

// send a message to the node so that it can publish
await nodeEvents.input({ payload: 'test' }, sinon.fake(), sinon.fake())

env.mqttStub.publish.calledOnce.should.be.true()
const pubTopic = env.mqttStub.publish.args[0][0]
should(pubTopic).equal(expectedPubTopic)
const pubMessageStr = env.mqttStub.publish.args[0][1]
const pubMessage = JSON.parse(pubMessageStr)
should(pubMessage).be.an.Object()
pubMessage.should.have.property('payload').and.equal('test')
const pubOptions = env.mqttStub.publish.args[0][2]
should(pubOptions).be.an.Object()
pubOptions.should.have.property('qos').and.equal(2)
pubOptions.should.have.property('properties').and.be.an.Object()
pubOptions.properties.should.have.property('contentType', 'application/json')
pubOptions.properties.should.have.property('userProperties').and.be.an.Object()
pubOptions.properties.userProperties.should.have.property('_nodeID')
pubOptions.properties.userProperties.should.have.property('_projectID', PROJECT_ID)
pubOptions.properties.userProperties.should.have.property('_applicationID')
pubOptions.properties.userProperties.should.have.property('_publishTime').and.be.a.Number()
})
})
})

0 comments on commit 330e6e1

Please sign in to comment.