# Logstash configuration that continously reads the contents of the example accounts table # every 10 seconds starting at the date of the updatetime and the account of the last query. # Depending on the SQL limit (4 is just for testing purposes) this will take some time # when used to initially rebuild the accounts index, but will then continously update # the index when an entry and its "updatetime" is changed. input { jdbc { jdbc_connection_string => 'jdbc:h2:tcp://localhost/mem:search-menu-ui-example' jdbc_user => 'sa' jdbc_password => 'sa' jdbc_driver_library => '/Users/johannestroppacher/Downloads/Engineering/Java/H2/h2-1.4.200.jar' jdbc_driver_class => 'org.h2.Driver' statement => "SELECT coalesce(year(updatetime), 1970) * 100 * 100 * 100000000000 + coalesce(month(updatetime), 1) * 100 * 100000000000 + coalesce(day(updatetime), 1) * 100000000000 + accountnumber as dateid ,* FROM ACCOUNT WHERE -- initial run where sql_last_value hadn't been set ? < 1900010100000000000 -- updatetime's date is greater/newer than the date of the last updatetime of sql_last_value OR CAST(updatetime AS DATE) > CAST(PARSEDATETIME(SUBSTRING(CAST(? AS CHAR), 0, 8),'yyyyMMdd') AS DATE) -- updatetime's date is equal to the date of the last updatetime of sql_last_value but the accountnumber is higher than the last one OR (CAST(updatetime AS DATE) = CAST(PARSEDATETIME(SUBSTRING(CAST(? AS CHAR), 0, 8),'yyyyMMdd') AS DATE) AND accountnumber > MOD(?, 100000000000) ) ORDER BY updatetime, accountnumber LIMIT 4; " # use a prepared statement with a faster and more secure approach using placeholders use_prepared_statements => true prepared_statement_name => "updated_accounts" # bind the build in "sql_last_value" to the four "?" placeholders prepared_statement_bind_values => [":sql_last_value", ":sql_last_value", ":sql_last_value", ":sql_last_value"] # enable "sql_last_value" to remember the position of the last query use_column_value => true # use the column "dateid" as numeric value to remember the position of the last query tracking_column => "dateid" # run every 10 seconds schedule => "*/10 * * * * *" } } filter { translate { field => "online" destination => "online_tag" dictionary => { "1" => "online" } fallback => "" } translate { field => "deleted" destination => "deleted_tag" dictionary => { "0" => "active" } fallback => "deleted" } translate { field => "longterm" destination => "longterm_tag" dictionary => { "1" => "long-term" } fallback => "" } translate { field => "shortterm" destination => "shortterm_tag" dictionary => { "1" => "short-term" } fallback => "" } translate { field => "bulletbond" destination => "bulletbond_tag" dictionary => { "1" => "bullet bond" } fallback => "" } # Puts all tag values into one "tags" field. Each tag is separated by semicolon mutate { add_field => { "tags" => "%{online_tag};%{deleted_tag};%{longterm_tag};%{shortterm_tag};%{bulletbond_tag}" } } # Converts the tags (semicolon separated string) into an array mutate { gsub => [ # Replaces muliple semicolons in a row by a single one "tags", ";+", ";", # Removes semicolons at the beginning "tags", "^;+", "", # Removes semicolons at the end "tags", ";+$", "" ] # Splits the semicolon separated values into an array without those semicolons split => { "tags" => ";" } } # Remove owner field, if its value is empty prune { blacklist_values => ["owner", "^\s*$"] } # The following ruby code is the best solution i found to get the date value in database format # without without UTC convertion. Reference: # https://discuss.elastic.co/t/converting-a-date-input-as-a-string-into-a-new-format/177498 ruby { code => ' t = event.get("creationdate") event.set("creationdate", Time.at(t.to_f).strftime("%Y-%m-%d")) ' } # The following ruby code is the best solution i found to get the timestamp value in database format # without without UTC convertion. Reference: # https://discuss.elastic.co/t/converting-a-date-input-as-a-string-into-a-new-format/177498 ruby { code => ' t = event.get("updatetime") event.set("updatetime", Time.at(t.to_f).strftime("%Y-%m-%d %H:%M:%S.%3N")) ' } mutate { # convert fields, that might end up as floating point values, into integers convert => { "tenantnumber" => "integer" "customernumber" => "integer" "accountnumber" => "integer" } remove_field => ["dateid", "online", "online_tag", "deleted", "deleted_tag", "longterm", "longterm_tag", "shortterm", "shortterm_tag", "bulletbond", "bulletbond_tag", "@timestamp", "@version"] } } output { # formatted console out for debugging stdout { codec => rubydebug } # elasticsearch import into test index "accountstemp" elasticsearch { action => "index" hosts => ["127.0.0.1:9200"] index => "accounts" document_id => "%{tenantnumber}%{accountnumber}" } }