Keep X events to make Digest from (#2041)

* Keep X events to make Digest from

Added a configuration option to save a fixed number of events for the Digest. Currently all received events are purged when an event is emitted by this agent. This option allows received events to be remembered and reused for future Digests.

* added validate_options for new config

Added a check to prevent negative numbers, which would cause breakage. Also gave an upper limit at 999, just to set a sane maximum.

* removed unnecessary while loop

As suggested by @dsander here: https://github.com/huginn/huginn/pull/2041#discussion_r123965060

* Updated name of feature and description

* ensure retained_events is an Integer

* typo in check if retained events is int

* specs for digest_agent

* removed validate as integer

* removed check for integer status

* Rspec mostly working

Got rspec mostly working. The #'working?' check for events received in last few days is failing for me, but it was failing for other agents too on my setup. Likely a local problem, so lets see if this passes the automated builds.

* removed comments

ack. forgot about those
This commit is contained in:
nogre 2017-07-28 08:29:25 -04:00 committed by Dominik Sander
parent 4cf58ebbd3
commit 79fc461bd9
2 changed files with 136 additions and 35 deletions

View file

@ -10,6 +10,10 @@ module Agents
The resulting Event will have a payload message of `message`. You can use liquid templating in the `message`, have a look at the [Wiki](https://github.com/cantino/huginn/wiki/Formatting-Events-using-Liquid) for details.
Set `expected_receive_period_in_days` to the maximum amount of time that you'd expect to pass between Events being received by this Agent.
If `retained_events` is set to 0 (the default), all received events are cleared after a digest is sent. Set `retained_events` to a value larger than 0 to keep a certain number of events around on a rolling basis to re-send in future digests.
For instance, say `retained_events` is set to 3 and the Agent has received Events `5`, `4`, and `3`. When a digest is sent, Events `5`, `4`, and `3` are retained for a future digest. After Event `6` is received, the next digest will contain Events `6`, `5`, and `4`.
MD
event_description <<-MD
@ -24,12 +28,18 @@ module Agents
def default_options
{
"expected_receive_period_in_days" => "2",
"message" => "{{ events | map: 'message' | join: ',' }}"
"message" => "{{ events | map: 'message' | join: ',' }}",
"retained_events" => "0"
}
end
form_configurable :message, type: :text
form_configurable :expected_receive_period_in_days
form_configurable :retained_events
def validate_options
errors.add(:base, 'retained_events must be 0 to 999') unless options['retained_events'].to_i >= 0 && options['retained_events'].to_i < 1000
end
def working?
last_receive_at && last_receive_at > interpolated["expected_receive_period_in_days"].to_i.days.ago && !recent_error_logs?
@ -40,6 +50,9 @@ module Agents
incoming_events.each do |event|
self.memory["queue"] << event.id
end
if interpolated["retained_events"].to_i > 0 && memory["queue"].length > interpolated["retained_events"].to_i
memory["queue"].shift(memory["queue"].length - interpolated["retained_events"].to_i)
end
end
def check
@ -48,7 +61,9 @@ module Agents
payload = { "events" => events.map { |event| event.payload } }
payload["message"] = interpolated(payload)["message"]
create_event :payload => payload
self.memory["queue"] = []
if interpolated["retained_events"].to_i == 0
self.memory["queue"] = []
end
end
end
end

View file

@ -2,7 +2,7 @@ require "rails_helper"
describe Agents::DigestAgent do
before do
@checker = Agents::DigestAgent.new(:name => "something", :options => { :expected_receive_period_in_days => "2", :message => "{{ events | map:'data' | join:';' }}" })
@checker = Agents::DigestAgent.new(:name => "something", :options => { :expected_receive_period_in_days => "2", :retained_events => "0", :message => "{{ events | map:'data' | join:';' }}" })
@checker.user = users(:bob)
@checker.save!
end
@ -15,54 +15,140 @@ describe Agents::DigestAgent do
event.save!
expect(@checker).not_to be_working # no events have ever been received
Agents::DigestAgent.async_receive(@checker.id, [event.id])
@checker.options[:expected_receive_period_in_days] = 2
@checker.save!
Agents::DigestAgent.async_receive @checker.id, [event.id]
expect(@checker.reload).to be_working # Events received
three_days_from_now = 3.days.from_now
stub(Time).now { three_days_from_now }
expect(@checker.reload).not_to be_working # too much time has passed
expect(@checker).not_to be_working # too much time has passed
end
end
describe "validation" do
before do
expect(@checker).to be_valid
end
it "should validate retained_events" do
@checker.options[:retained_events] = ""
expect(@checker).to be_valid
@checker.options[:retained_events] = "0"
expect(@checker).to be_valid
@checker.options[:retained_events] = "10"
expect(@checker).to be_valid
@checker.options[:retained_events] = "10000"
expect(@checker).not_to be_valid
@checker.options[:retained_events] = "-1"
expect(@checker).not_to be_valid
end
end
describe "#receive" do
it "queues any payloads it receives" do
event1 = Event.new
event1.agent = agents(:bob_rain_notifier_agent)
event1.payload = { :data => "event1" }
event1.save!
describe "and retained_events is 0" do
before { @checker.options['retained_events'] = 0 }
it "retained_events any payloads it receives" do
event1 = Event.new
event1.agent = agents(:bob_rain_notifier_agent)
event1.payload = { :data => "event1" }
event1.save!
event2 = Event.new
event2.agent = agents(:bob_weather_agent)
event2.payload = { :data => "event2" }
event2.save!
Agents::DigestAgent.async_receive(@checker.id, [event1.id, event2.id])
expect(@checker.reload.memory[:queue]).to eq([event1.id, event2.id])
event2 = Event.new
event2.agent = agents(:bob_weather_agent)
event2.payload = { :data => "event2" }
event2.save!
@checker.receive([event1])
@checker.receive([event2])
expect(@checker.memory["queue"]).to eq([event1.id, event2.id])
end
end
describe "but retained_events is 1" do
before { @checker.options['retained_events'] = 1 }
it "retained_eventss only 1 event at a time" do
event1 = Event.new
event1.agent = agents(:bob_rain_notifier_agent)
event1.payload = { :data => "event1" }
event1.save!
event2 = Event.new
event2.agent = agents(:bob_weather_agent)
event2.payload = { :data => "event2" }
event2.save!
@checker.receive([event1])
@checker.receive([event2])
expect(@checker.memory['queue']).to eq([event2.id])
end
end
end
describe "#check" do
it "should emit a event" do
expect { Agents::DigestAgent.async_check(@checker.id) }.not_to change { Event.count }
describe "and retained_events is 0" do
before { @checker.options['retained_events'] = 0 }
it "should emit a event" do
expect { Agents::DigestAgent.async_check(@checker.id) }.not_to change { Event.count }
event1 = Event.new
event1.agent = agents(:bob_rain_notifier_agent)
event1.payload = { :data => "event" }
event1.save!
event1 = Event.new
event1.agent = agents(:bob_rain_notifier_agent)
event1.payload = { :data => "event" }
event1.save!
event2 = Event.new
event2.agent = agents(:bob_weather_agent)
event2.payload = { :data => "event" }
event2.save!
event2 = Event.new
event2.agent = agents(:bob_weather_agent)
event2.payload = { :data => "event" }
event2.save!
Agents::DigestAgent.async_receive(@checker.id, [event1.id, event2.id])
@checker.sources << agents(:bob_rain_notifier_agent) << agents(:bob_weather_agent)
@checker.save!
@checker.receive([event1])
@checker.receive([event2])
@checker.sources << agents(:bob_rain_notifier_agent) << agents(:bob_weather_agent)
@checker.save!
expect { Agents::DigestAgent.async_check(@checker.id) }.to change { Event.count }.by(1)
@checker.reload
expect(@checker.most_recent_event.payload["events"]).to eq([event1.payload, event2.payload])
expect(@checker.most_recent_event.payload["message"]).to eq("event;event")
expect(@checker.memory[:queue]).to be_empty
expect { @checker.check }.to change { Event.count }.by(1)
expect(@checker.most_recent_event.payload["events"]).to eq([event1.payload, event2.payload])
expect(@checker.most_recent_event.payload["message"]).to eq("event;event")
expect(@checker.memory['queue']).to be_empty
end
end
describe "but retained_events is 1" do
before { @checker.options['retained_events'] = 1 }
it "should emit a event" do
expect { Agents::DigestAgent.async_check(@checker.id) }.not_to change { Event.count }
event1 = Event.new
event1.agent = agents(:bob_rain_notifier_agent)
event1.payload = { :data => "event" }
event1.save!
event2 = Event.new
event2.agent = agents(:bob_weather_agent)
event2.payload = { :data => "event" }
event2.save!
@checker.receive([event1])
@checker.receive([event2])
@checker.sources << agents(:bob_rain_notifier_agent) << agents(:bob_weather_agent)
@checker.save!
expect { @checker.check }.to change { Event.count }.by(1)
expect(@checker.most_recent_event.payload["events"]).to eq([event2.payload])
expect(@checker.most_recent_event.payload["message"]).to eq("event")
expect(@checker.memory['queue'].length).to eq(1)
end
end
end
end