diff --git a/app/models/agents/mqtt_agent.rb b/app/models/agents/mqtt_agent.rb index 73a889c9..e60c575d 100644 --- a/app/models/agents/mqtt_agent.rb +++ b/app/models/agents/mqtt_agent.rb @@ -115,15 +115,26 @@ module Agents def check + last_message = memory['last_message'] + mqtt_client.connect do |c| begin Timeout.timeout((interpolated['max_read_time'].presence || 15).to_i) { - c.get(interpolated['topic']) do |topic, message| + c.get_packet(interpolated['topic']) do |packet| + topic, payload = message = [packet.topic, packet.payload] - # A lot of services generate JSON. Try that first - payload = JSON.parse(message) rescue message + # Ignore a message if it is previously received + next if (packet.retain || packet.duplicate) && message == last_message - create_event :payload => { + last_message = message + + # A lot of services generate JSON, so try that. + begin + payload = JSON.parse(payload) + rescue + end + + create_event payload: { 'topic' => topic, 'message' => payload, 'time' => Time.now.to_i @@ -133,6 +144,10 @@ module Agents rescue Timeout::Error end end + + # Remember the last original (non-retain, non-duplicate) message + self.memory['last_message'] = last_message + save! end end diff --git a/spec/models/agents/mqtt_agent_spec.rb b/spec/models/agents/mqtt_agent_spec.rb index 0b070102..953e4a9f 100644 --- a/spec/models/agents/mqtt_agent_spec.rb +++ b/spec/models/agents/mqtt_agent_spec.rb @@ -8,7 +8,6 @@ describe Agents::MqttAgent do @error_log = StringIO.new @server = MQTT::FakeServer.new(41234, '127.0.0.1') - @server.just_one = true @server.logger = Logger.new(@error_log) @server.logger.level = Logger::DEBUG @server.start @@ -34,7 +33,14 @@ describe Agents::MqttAgent do end describe "#check" do - it "should check that initial run creates an event" do + it "should create events in the initial run" do + expect { @checker.check }.to change { Event.count }.by(2) + end + + it "should ignore retained messages that are previously received" do + expect { @checker.check }.to change { Event.count }.by(2) + expect { @checker.check }.to change { Event.count }.by(1) + expect { @checker.check }.to change { Event.count }.by(1) expect { @checker.check }.to change { Event.count }.by(2) end end diff --git a/spec/support/fake_mqtt_server.rb b/spec/support/fake_mqtt_server.rb index 39f8db74..71c3ab1f 100644 --- a/spec/support/fake_mqtt_server.rb +++ b/spec/support/fake_mqtt_server.rb @@ -52,7 +52,9 @@ class MQTT::FakeServer @port = @socket.addr[1] @thread ||= Thread.new do logger.info "Started a fake MQTT server on #{@address}:#{@port}" + @times = 0 loop do + @times += 1 # Wait for a client to connect client = @socket.accept @pings_received = 0 @@ -103,16 +105,33 @@ class MQTT::FakeServer :granted_qos => 0 ) topic = packet.topics[0][0] - client.write MQTT::Packet::Publish.new( - :topic => topic, - :payload => "hello #{topic}", - :retain => true - ) - client.write MQTT::Packet::Publish.new( - :topic => topic, - :payload => "did you know about #{topic}", - :retain => true - ) + case @times + when 1, ->x { x >= 3 } + # Deliver retained messages + client.write MQTT::Packet::Publish.new( + :topic => topic, + :payload => "did you know about #{topic}", + :retain => true + ) + client.write MQTT::Packet::Publish.new( + :topic => topic, + :payload => "hello #{topic}", + :retain => true + ) + when 2 + # Deliver a still retained message + client.write MQTT::Packet::Publish.new( + :topic => topic, + :payload => "hello #{topic}", + :retain => true + ) + # Deliver a fresh message + client.write MQTT::Packet::Publish.new( + :topic => topic, + :payload => "did you know about #{topic}", + :retain => false + ) + end when MQTT::Packet::Pingreq client.write MQTT::Packet::Pingresp.new @@ -134,4 +153,4 @@ if __FILE__ == $0 server = MQTT::FakeServer.new(MQTT::DEFAULT_PORT) server.logger.level = Logger::DEBUG server.run -end \ No newline at end of file +end