Add event_data_path option to the WebsiteAgent to handle data directly in incoming Events

This commit is contained in:
Andrew Cantino 2015-11-24 12:34:20 -08:00
parent e624815eb7
commit 5eeca709c7
2 changed files with 240 additions and 120 deletions

View file

@ -20,7 +20,12 @@ module Agents
`url` can be a single url, or an array of urls (for example, for multiple pages with the exact same structure but different content to scrape)
The WebsiteAgent can also scrape based on incoming events. It will scrape the url contained in the `url` key of the incoming event payload, or if you set `url_from_event` it is used as a Liquid template to generate the url to access. If you specify `merge` as the `mode`, it will retain the old payload and update it with the new values.
The WebsiteAgent can also scrape based on incoming events.
* If the Event contains a `url` key, that URL will be fetched.
* For more control, you can set the `url_from_event` option and it will be used as a Liquid template to generate the url to access based on the Event.
* If you set `event_data_path` to the [JSONPath](http://goessner.net/articles/JsonPath/) of content in the Event, that will be used directly without fetching any URL.
* If you specify `merge` for the `mode` option, Huginn will retain the old payload and update it with the new values.
# Supported Document Types
@ -140,7 +145,7 @@ module Agents
def validate_options
# Check for required fields
errors.add(:base, "either url or url_from_event is required") unless options['url'].present? || options['url_from_event'].present?
errors.add(:base, "either url, url_from_event, or event_data_path are required") unless options['url'].present? || options['url_from_event'].present? || options['event_data_path'].present?
errors.add(:base, "expected_update_period_in_days is required") unless options['expected_update_period_in_days'].present?
validate_extract_options!
@ -251,15 +256,15 @@ module Agents
check_urls(interpolated['url'])
end
def check_urls(in_url, payload = {})
def check_urls(in_url, existing_payload = {})
return unless in_url.present?
Array(in_url).each do |url|
check_url(url, payload)
check_url(url, existing_payload)
end
end
def check_url(url, payload = {})
def check_url(url, existing_payload = {})
unless /\Ahttps?:\/\//i === url
error "Ignoring a non-HTTP url: #{url.inspect}"
return
@ -271,70 +276,91 @@ module Agents
interpolation_context.stack {
interpolation_context['_response_'] = ResponseDrop.new(response)
body = response.body
doc = parse(body)
handle_data(response.body, response.env[:url], existing_payload)
}
rescue => e
error "Error when fetching url: #{e.message}\n#{e.backtrace.join("\n")}"
end
if extract_full_json?
if store_payload!(previous_payloads(1), doc)
log "Storing new result for '#{name}': #{doc.inspect}"
create_event payload: payload.merge(doc)
end
return
def handle_data(body, url, existing_payload)
doc = parse(body)
if extract_full_json?
if store_payload!(previous_payloads(1), doc)
log "Storing new result for '#{name}': #{doc.inspect}"
create_event payload: existing_payload.merge(doc)
end
return
end
output =
case extraction_type
output =
case extraction_type
when 'json'
extract_json(doc)
when 'text'
extract_text(doc)
else
extract_xml(doc)
end
num_unique_lengths = interpolated['extract'].keys.map { |name| output[name].length }.uniq
if num_unique_lengths.length != 1
raise "Got an uneven number of matches for #{interpolated['name']}: #{interpolated['extract'].inspect}"
end
old_events = previous_payloads num_unique_lengths.first
num_unique_lengths.first.times do |index|
result = {}
interpolated['extract'].keys.each do |name|
result[name] = output[name][index]
if name.to_s == 'url'
result[name] = (response.env[:url] + Utils.normalize_uri(result[name])).to_s
end
end
num_unique_lengths = interpolated['extract'].keys.map { |name| output[name].length }.uniq
if store_payload!(old_events, result)
log "Storing new parsed result for '#{name}': #{result.inspect}"
create_event payload: payload.merge(result)
if num_unique_lengths.length != 1
raise "Got an uneven number of matches for #{interpolated['name']}: #{interpolated['extract'].inspect}"
end
old_events = previous_payloads num_unique_lengths.first
num_unique_lengths.first.times do |index|
result = {}
interpolated['extract'].keys.each do |name|
result[name] = output[name][index]
if name.to_s == 'url' && url.present?
result[name] = (url + Utils.normalize_uri(result[name])).to_s
end
end
}
rescue => e
error "Error when fetching url: #{e.message}\n#{e.backtrace.join("\n")}"
if store_payload!(old_events, result)
log "Storing new parsed result for '#{name}': #{result.inspect}"
create_event payload: existing_payload.merge(result)
end
end
end
def receive(incoming_events)
incoming_events.each do |event|
interpolate_with(event) do
url_to_scrape =
if url_template = options['url_from_event'].presence
interpolate_options(url_template)
existing_payload = interpolated['mode'].to_s == "merge" ? event.payload : {}
if event_data_path = options['event_data_path'].presence
data = Utils.value_at(event.payload, interpolate_options(event_data_path))
if data.present?
handle_event_data(data, event, existing_payload)
else
event.payload['url']
error "No data was found in the Event payload at the JSONPath #{interpolate_options(event_data_path)}", inbound_event: event
end
check_urls(url_to_scrape,
interpolated['mode'].to_s == "merge" ? event.payload : {})
else
url_to_scrape =
if event_data_path = options['event_data_path'].presence
interpolate_options(event_data_path)
elsif url_template = options['url_from_event'].presence
interpolate_options(url_template)
else
event.payload['url']
end
check_urls(url_to_scrape, existing_payload)
end
end
end
end
private
def handle_event_data(data, event, existing_payload)
handle_data(data, event.payload['url'], existing_payload)
rescue => e
error "Error when handling event data: #{e.message}\n#{e.backtrace.join("\n")}", inbound_event: event
end
# This method returns true if the result should be stored as a new event.
# If mode is set to 'on_change', this method may return false and update an existing
# event to expire further in the future.

View file

@ -763,92 +763,186 @@ fire: hot
end
describe "#receive" do
before do
@event = Event.new
@event.agent = agents(:bob_rain_notifier_agent)
@event.payload = {
'url' => 'http://xkcd.com',
'link' => 'Random',
}
end
it "should scrape from the url element in incoming event payload" do
expect {
@checker.options = @valid_options
@checker.receive([@event])
}.to change { Event.count }.by(1)
end
it "should use url_from_event as url to scrape if it exists when receiving an event" do
stub = stub_request(:any, 'http://example.org/?url=http%3A%2F%2Fxkcd.com')
@checker.options = @valid_options.merge(
'url_from_event' => 'http://example.org/?url={{url | uri_escape}}'
)
@checker.receive([@event])
expect(stub).to have_been_requested
end
it "should allow url_from_event to be an array of urls" do
stub1 = stub_request(:any, 'http://example.org/?url=http%3A%2F%2Fxkcd.com')
stub2 = stub_request(:any, 'http://google.org/?url=http%3A%2F%2Fxkcd.com')
@checker.options = @valid_options.merge(
'url_from_event' => ['http://example.org/?url={{url | uri_escape}}', 'http://google.org/?url={{url | uri_escape}}']
)
@checker.receive([@event])
expect(stub1).to have_been_requested
expect(stub2).to have_been_requested
end
it "should interpolate values from incoming event payload" do
expect {
@valid_options['extract'] = {
'from' => {
'xpath' => '*[1]',
'value' => '{{url | to_xpath}}'
},
'to' => {
'xpath' => '(//a[@href and text()={{link | to_xpath}}])[1]',
'value' => '@href'
},
describe "with a url or url_from_event" do
before do
@event = Event.new
@event.agent = agents(:bob_rain_notifier_agent)
@event.payload = {
'url' => 'http://xkcd.com',
'link' => 'Random',
}
@checker.options = @valid_options
@checker.receive([@event])
}.to change { Event.count }.by(1)
end
expect(Event.last.payload).to eq({
'from' => 'http://xkcd.com',
'to' => 'http://dynamic.xkcd.com/random/comic/',
})
it "should scrape from the url element in incoming event payload" do
expect {
@checker.options = @valid_options
@checker.receive([@event])
}.to change { Event.count }.by(1)
end
it "should use url_from_event as url to scrape if it exists when receiving an event" do
stub = stub_request(:any, 'http://example.org/?url=http%3A%2F%2Fxkcd.com')
@checker.options = @valid_options.merge(
'url_from_event' => 'http://example.org/?url={{url | uri_escape}}'
)
@checker.receive([@event])
expect(stub).to have_been_requested
end
it "should allow url_from_event to be an array of urls" do
stub1 = stub_request(:any, 'http://example.org/?url=http%3A%2F%2Fxkcd.com')
stub2 = stub_request(:any, 'http://google.org/?url=http%3A%2F%2Fxkcd.com')
@checker.options = @valid_options.merge(
'url_from_event' => ['http://example.org/?url={{url | uri_escape}}', 'http://google.org/?url={{url | uri_escape}}']
)
@checker.receive([@event])
expect(stub1).to have_been_requested
expect(stub2).to have_been_requested
end
it "should interpolate values from incoming event payload" do
expect {
@valid_options['extract'] = {
'from' => {
'xpath' => '*[1]',
'value' => '{{url | to_xpath}}'
},
'to' => {
'xpath' => '(//a[@href and text()={{link | to_xpath}}])[1]',
'value' => '@href'
},
}
@checker.options = @valid_options
@checker.receive([@event])
}.to change { Event.count }.by(1)
expect(Event.last.payload).to eq({
'from' => 'http://xkcd.com',
'to' => 'http://dynamic.xkcd.com/random/comic/',
})
end
it "should interpolate values from incoming event payload and _response_" do
@event.payload['title'] = 'XKCD'
expect {
@valid_options['extract'] = {
'response_info' => @valid_options['extract']['url'].merge(
'value' => '{% capture sentence %}The reponse from {{title}} was {{_response_.status}} {{_response_.headers.X-Status-Message}}.{% endcapture %}{{sentence | to_xpath}}'
)
}
@checker.options = @valid_options
@checker.receive([@event])
}.to change { Event.count }.by(1)
expect(Event.last.payload['response_info']).to eq('The reponse from XKCD was 200 OK.')
end
it "should support merging of events" do
expect {
@checker.options = @valid_options
@checker.options[:mode] = "merge"
@checker.receive([@event])
}.to change { Event.count }.by(1)
last_payload = Event.last.payload
expect(last_payload['link']).to eq('Random')
end
end
it "should interpolate values from incoming event payload and _response_" do
@event.payload['title'] = 'XKCD'
describe "with a event_data_path" do
describe "with json data" do
before do
@event = Event.new
@event.agent = agents(:bob_rain_notifier_agent)
@event.payload = {
'something' => 'some value',
'some_object' => {
'some_data' => { hello: 'world' }.to_json
}
}
@event.save!
expect {
@valid_options['extract'] = {
'response_info' => @valid_options['extract']['url'].merge(
'value' => '{% capture sentence %}The reponse from {{title}} was {{_response_.status}} {{_response_.headers.X-Status-Message}}.{% endcapture %}{{sentence | to_xpath}}'
@checker.options = @valid_options.merge(
'type' => 'json',
'event_data_path' => 'some_object.some_data',
'extract' => {
'value' => { 'path' => 'hello' }
}
)
}
@checker.options = @valid_options
@checker.receive([@event])
}.to change { Event.count }.by(1)
end
expect(Event.last.payload['response_info']).to eq('The reponse from XKCD was 200 OK.')
end
it "should extract from the event data in the incoming event payload" do
expect {
@checker.receive([@event])
}.to change { Event.count }.by(1)
expect(@checker.events.last.payload).to eq({ 'value' => 'world' })
end
it "should support merging of events" do
expect {
@checker.options = @valid_options
@checker.options[:mode] = "merge"
@checker.receive([@event])
}.to change { Event.count }.by(1)
last_payload = Event.last.payload
expect(last_payload['link']).to eq('Random')
it "should support merge mode" do
@checker.options['mode'] = "merge"
expect {
@checker.receive([@event])
}.to change { Event.count }.by(1)
expect(@checker.events.last.payload).to eq(@event.payload.merge('value' => 'world'))
end
it "should output an error when nothing can be found at the path" do
@checker.options = @checker.options.merge(
'event_data_path' => 'some_object.mistake'
)
expect {
@checker.receive([@event])
}.to_not change { Event.count }
expect(@checker.logs.last.message).to match(/No data was found in the Event payload at the JSONPath some_object.mistake/)
end
it "should output an error when the data cannot be parsed" do
@event.update_attribute :payload, @event.payload.merge('some_object' => { 'some_data' => '{invalid json' })
expect {
@checker.receive([@event])
}.to_not change { Event.count }
expect(@checker.logs.last.message).to match(/Error when handling event data:/)
end
end
describe "with HTML data" do
before do
@event = Event.new
@event.agent = agents(:bob_rain_notifier_agent)
@event.payload = {
'url' => 'http://xkcd.com',
'some_object' => {
'some_data' => "<div><span class='title'>Title!</span><span class='body'>Body!</span></div>"
}
}
@event.save!
@checker.options = @valid_options.merge(
'type' => 'html',
'event_data_path' => 'some_object.some_data',
'extract' => {
'title' => { 'css' => ".title", 'value' => ".//text()" },
'body' => { 'css' => "div span.body", 'value' => ".//text()" }
}
)
end
it "should extract from the event data in the incoming event payload" do
expect {
@checker.receive([@event])
}.to change { Event.count }.by(1)
expect(@checker.events.last.payload).to eq({ 'title' => 'Title!', 'body' => 'Body!' })
end
end
end
end
end