As a longtimeĀ elasticsearch/logstash user,Ā one of the things I’ve always planned to do was setup basic rules-based monitoring of elasticsearch for correlation and alerting. I’ve broken ground on the work to do so and am sharing this simple framework for anyone who would like to expand upon it. All of this was put together over 2 days having never used ruby before so this should not be taken as an opportunity to assess the quality of the code. Functionality and security were the first considerations. Other future improvements formulate an extensive list of “wants”. For example, standard deviations and other analytics.

Installation notes:
#. You must install a few ruby gems to get this working and you must be using a recent ruby. I build this on 2.3.1.
#. Use gem to install elasticsearch, activesupport and clockwork. (gem install elasticsearch, gem install clockwork, etc.)
#. Clockwork is the scheduling platform I’m using. I chose not to use cron because this was more efficient and made the script highly pluggable.
#. If you want to start and stop clockwork as a daemon I can tell you I used systemd. (Despite my dislike of the platform.) It sufficed but your own init scripts or other startup for your linux platform should be able to handle it. To start it using clockwork, the command is “/path/clockwork /path/clock.rb”. To start it using clockworkd, “/path/clockworkd -c /path/clock.rb start”.
#. Each “rule” is written as its own ruby script and stored in the monitors sub-folder. The script will include all .rb files from this folder so make sure you secure it.

The files:

/home/monitor/clock.rb

This is the core scheduler. This file locates the necessary sub scripts and executes them on a timer. To disable rules or enable new ones, simply place them in the monitors folder or remove them from it to a disabled folder or whatever suits your preference.

#!/usr/local/bin/ruby

require 'elasticsearch'
require 'pp'
require 'date'
require 'net/smtp'
require 'clockwork'

module Clockwork
  handler do |job|
    send(job)
  end
end

Dir[File.dirname(__FILE__) + '/monitors/*.rb'].each {|file| require file }

/home/monitor/monitors/foreignlogins.rb

This is a sample of what you can do with your rule scripts. Each rule script follows the same format. A clockwork statement to list how often the script should run. (2.minutes, 4.hours, etc. For details of this syntax take a look at the ActiveSupport class docs. Then you have the method defined which can run any code you need to run on a scheduled basis. For example, one of my rules checks for the last log received from each system as appropriate and sends that timestamp to zabbix. This way zabbix monitors to ensure that each server is transmitting its logs appropriately.

Now you have a starttime and endtime for the query. Obviously there is no sense running a query outside of the window of interest. Most times this calculation for start time is going to be equal to the frequency you set for clockwork but you may have a use case that needs to look back further. (Every 30 minutes, check the last hour of events?)

Next you have the elasticsearch query to run to find the data you’re interested in reviewing. The elasticsearch client library for ruby makes this very straight forward.

The last section is your analysis and action code. In my case I have it generate an email at this time but other alerts (notifymyandroid for example) are on the future list.

Clockwork.every(2.minutes, 'foreignlogin')

def foreignlogin()
  client = Elasticsearch::Client.new log: false

  endTime = Time.now.to_i * 1000
  startTime = (Time.now.to_i - (2 * 60)) * 1000

  res = client.search index: 'logstash*', body:
{
  "query": {
    "filtered": {
      "query": {
        "query_string": {
          "query": "mechanisms:login AND result:success AND _exists_:src_geoip.country_code2 AND NOT src_geoip.country_code2:US"
        }
      },
      "filter": {
        "bool": {
          "must": [
            {
              "range": {
                "@timestamp": {
                  "gte": startTime,
                  "lte": endTime,
                  "format": "epoch_millis"
                }
              }
            }
          ],
          "must_not": []
        }
      }
    }
  },

  "size": 500,
  "sort": [
    {
      "@timestamp": {
        "order": "desc",
        "unmapped_type": "boolean"
      }
    }
  ],
  "fields": [
    "*",
    "_source"
  ]
}

  message = <<-MSG
Subject: Foreign Login Detected
To: User <user@domain.com>

MSG
  data = res["hits"]["hits"]
  if(data.count > 0) then
    data.each do |v|
      recv = DateTime.parse(v["_source"]["received_at"])
      recv += Rational(Time.zone_offset(Time.now.getlocal.zone),86400)
      message += "#{recv.strftime("%m/%d/%Y %H:%M:%S")} #{Time.now.getlocal.zone} #{v["_source"]["username"]} via #{v["_source"]["syslog_program"]} from #{v["_source"]["src_ip"]} in #{v["_source"]["src_geoip"]["city_name"]}, #{v["_s
ource"]["src_geoip"]["country_name"]} #{v["_source"]["src_geoip"]["continent_code"]}\n"
    end
    Net::SMTP.start('mx.mailserver.com') do |smtp|
      smtp.send_message message, "monitor@domain.com", "user@domain.com"
    end
  end
end

 

In closing, please note that this code could use some clean up (error checking for example for connection issues or queries that fail) but this is a starting point. As I make improvements over time I will post updates. Also your data will likely look different. For me, I use fields like mechanisms to identify the type of activity the log referenced, and result to indicate whether the action went through, was blocked or otherwise failed. This means I can do a query like “mechanisms:login AND (resources:system OR resources:application) AND result:success” to get a list of all successful logins to any application/os platform. Your individual use cases might very but with a simple ruby script you will likely be able to tackle it.

Enjoy! Geek on.

Update:

I failed to include one useful item. When testing a new rule script, I created a replacement for clock.rb called test.rb which calls a specific script sent via command line. It extends the clockwork method quite sloppily but it appears to get the job done. Sharing here as someone might have suggested improvements and others might simply find it useful for testing new rule scripts.

To execute, I simply do “./test.rb ./rule-script.rb”. This will run the selected script once and also set a global $debug value which can be used to include debugging information that you don’t want included or executed during a production run.

#!/usr/local/bin/ruby

require 'elasticsearch'
require 'pp'
require 'date'
require 'net/smtp'
require 'active_support/time'

module Clockwork
 def included(klass)
 klass.send "include", Methods
 klass.extend Methods
 end

 @@myjob = ''

 def self.get
 @@myjob
 end

 def self.set(x)
 @@myjob = x
 end

 module Methods

 def every(period, job)
 set(job)
 end

 def run()
 exit
 end

 end

 extend Methods
end


value = ARGV.shift
require value
$debug = true

puts "Running #{Clockwork.get}"
send(Clockwork.get)