Data Ingestion using Logstash: PostgreSql to Elastic

Data Ingestion using Logstash: PostgreSql to Elastic

Publish Date: Jun 26
0 0

What is Logstash?

Logstash is an open-source data processing pipeline from Elastic. It is being used to ingest, transform and ship data to different sources including Elasticsearch, Kafka, flat files etc.

Logstash pipeline includes 3 different processes:

  • Input: It is the data source from which the data is collected for ingestion

  • Filter: It transforms (cleanup, aggregate etc.) the data using plugins like Grok, Mutate, Date etc.

  • Output: Destination for ingestion.(Elasticsearch, flat files, db etc)

Below are the prerequisites to send data using Logstash to elastic:

  1. Logstash installed on the system with JDBC driver for Postgres
  2. Postgres database with a table or function to sync.
  3. Elasticsearch instance is running.

Logstash Setup (for Windows):

Below are the steps in brief to install and run Logstash locally.

1. Install java:
Download JDK package (java 8 or later) from Official Oracle Website . Once the download is complete, extract the files to the preferred location.
Once the files are extracted, the environment variable needs to be added for the system to recognize java commands.
Go to environment variables, add a new variable with name JAVA_HOME and point it to the directory where the java files are located. Append %JAVA_HOME%\bin to the path.
To verify the successful installation, go to command prompt and run below command.

java -version
Enter fullscreen mode Exit fullscreen mode

If everything is set up correctly, it will show the java version.

2. Install Logstash:
To install Logstash, download the package from Official Elastic Website and extract to preferred location.
To test this locally, open command prompt, go to the bin folder from Logstash folder and run below command to test.

logstash -e "input { stdin {} } output { stdout {} }" 
Enter fullscreen mode Exit fullscreen mode

Logstash Ingestion Pipeline:

1. Install required JDBC driver:
Download the Postgres driver from Official PostgreSql Website
Place the jar file to some accessible location.

2. Create logstash pipeline:
Here is the sample pipeline.

input {
    jdbc {
        jdbc_driver_library => "c:/logstash/jdbc/postgresql.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        jdbc_connection_string => "${JDBC_HOST}"
        jdbc_user => "${DB_USER}"
        jdbc_password => "${DB_PWD}"
        jdbc_paging_enabled => true
        jdbc_page_size => 1000
        schedule => "* * * * *"  # schedule to run every minute
        statement => "SELECT * FROM employee WHERE updated_at > :sql_last_value"
        use_column_value => true
        tracking_column => "updated_at"
        tracking_column_type => "timestamp"
        last_run_metadata_path => "c:/logstash/employee.tracker"
    }
}

filter {
}
    mutate {
        remove_field => ["date", "@timestamp", "host"]
    }

    # Example of parsing JSON fields if needed
    json {
         source => "first_name"
         target => "name"
    }
}

output {
    stdout { codec => json_lines }
    elasticsearch {
        hosts => ["http://localhost:9200"]
        index => "my_table_index"
        custom_headers => {
                "Authorization" => "${AUTH_KEY}"
            }
        document_id => "%{table_id}" # Unique identifier from the table
        timeout => 120
    }
}

Enter fullscreen mode Exit fullscreen mode

The above pipeline is used for incremental ingestion. It means that it tracks the last run and takes the records from the last run to ingest the data on the schedule.

Here are the key concepts used:

Input:

  • jdbc_driver_library: location where the jdbc driver file(.jar) is stored.
  • jdbc_driver_class: the driver class being used.
  • jdbc_connection_string: postgres db connection string
  • jdbc_user: Database username
  • jdbc_password: database password for the user

  • paging: In this pipeline, the data will be sent in multiple pages with page size of 1000. It will improve the performance of the pipeline and will help to track the number of records sent to elastic search.

  • schedule: The above pipeline is scheduled to run every minute.
    Here is the format for schedule.

* * * * *
│ │ │ │ │
│ │ │ │ └─── Day of the week (0 - 7) (Sunday is both 0 and 7)
│ │ │ └───── Month (1 - 12)
│ │ └─────── Day of the month (1 - 31)
│ └───────── Hour (0 - 23)
└─────────── Minute (0 - 59)
Enter fullscreen mode Exit fullscreen mode
  • statement: It is a SQL statement which the pipeline will execute. To execute complex statements, it can be saved in a separate .sql file and mention the file path to statement_filepathinstead of statement. It is better to use view or materialized view instead of a query with complex joins.
  • The last section is for incremental ingestion.
use_column_value => true
tracking_column => "updated_dt"
tracking_column_type => "timestamp"
last_run_metadata_path => "c:/project/logstash/date.tracker"
Enter fullscreen mode Exit fullscreen mode

use_column_value is set to true. It lets the Logstash know to track the actual value of the column updated_at used in tracking_column instead of using the time when the query was run last time. In this case, :sql_last_value will use updated_dt value.

If it is set to false, Logstash will use the last query execution time for :sql_last_value.

The last run time will be saved in the file mentioned in last_run_metadata_path. It will be used to track the last time the pipeline was run.

Filter

This is an optional section to manipulate the data before sending it to destination.

In the above pipeline, the date field is being removed from ingestion. Also, it is sending the first_name from the data to name field in destination.

Output

This section defines the destination for the data. In this case, it is Elasticsearch endpoint, authorization key if any, elastic index, document_id. document_id is a unique identifier of the elastic document in index. If this field is not mentioned, elastic search will automatically assign a unique identifier to the document.

In case of incremental ingestion, it is recommended to define this field. During ingestion, elastic search would look for this field in index, if it matched, it would update the same document.

If the field is not defined, it creates a new document in the index resulting duplicate records.

Run the pipeline

To run this pipeline, open command prompt, go to the Logstash folder and run below command.

bin/logstash -f c:/logstash/sample_pipeline.conf
Enter fullscreen mode Exit fullscreen mode

Here is the output of the pipeline.

Logstash Output window

Output from elastic search index.

{
    "took": 1,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 3,
            "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
            {
                "_index": "testing",
                "_id": "1",
                "_score": 1.0,
                "_source": {
                    "name": "James",
                    "id": 1,
                    "last_name": "Smith",
                    "updated_dt": "2024-12-12T16:10:57.349Z",
                    "@version": "1",
                    "@timestamp": "2025-06-25T20:41:02.167442600Z"
                }
            },
            {
                "_index": "testing",
                "_id": "2",
                "_score": 1.0,
                "_source": {
                    "name": "John",
                    "id": 2,
                    "last_name": "Doe",
                    "updated_dt": "2024-12-12T16:10:57.349Z",
                    "@version": "1",
                    "@timestamp": "2025-06-25T20:41:02.169021400Z"
                }
            },
            {
                "_index": "testing",
                "_id": "3",
                "_score": 1.0,
                "_source": {
                    "name": "Kate",
                    "id": 3,
                    "last_name": "Williams",
                    "updated_dt": "2024-12-12T16:10:57.349Z",                    
                    "@version": "1",
                    "@timestamp": "2025-06-25T20:41:02.170098800Z"
                }
            }
        ]
    }
}
Enter fullscreen mode Exit fullscreen mode

There are a few advantages of this method.

  1. Logstash is an open-source tool and easy to implement.
  2. There are over 200+ plugins available for data transformation. Using these plugins, data can be parsed and transformed using filters.
  3. It is a decoupled architecture between data source and the elastic search
  4. It has seamless integration with elastic search.

Although this is an open-source and simple method to implement, there are some disadvantages.

  1. Latency issues: It is not ideal where very low latency or real time data is required. As the pipeline grows, it takes time to load, transform/filter, and send the data.
  2. Error handling: Unless it is explicitly monitored, it is difficult to track down the errors and can result in data drop.
  3. It can create duplicates if not the pipeline not defined properly.
  4. It takes longer time to start compared to other tools.
  5. It uses YAML style config files which make it complex and can be difficult to maintain.
  6. Resource utilization: It can utilize more resources with heavy loads and complex pipelines.

Above pipeline can be used if someone is looking for a more robust and centralized data streaming pipeline. It is not ideal for real time data shipping.

Comments 0 total

    Add comment