Skip to main content

Stream Postgres Changes into Elasticsearch with Statetrace

· 4 min read
Kyle Hanson

Need to connect Postgres to Elasticsearch? Statetrace makes it easy to stream changes from your application database into the popular search engine.

Postgres is great for a lot of things, however sometimes your data access patterns require a search engine like Elasticsearch. Unfortunately it can be tricky to keep Postgres synchronized with Elasticsearch when data changes. Luckily, Statetrace makes it easy.

In this article we will configure a local Postgres instance with Statetrace and pipe all changes into Elasticsearch for easy querying.

Configure the Databases in Docker-Compose

I will use the standard docker-compose.yaml from the tutorial. with the addition of the elasticsearch container.

version: '3'

services:
postgres:
image: postgres
environment:
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres

command:
- "postgres"
- "-c"
- "wal_level=logical"
- "-c"
- "max_wal_senders=2"
- "-c"
- "max_replication_slots=2"
ports:
- "5432:5432"

es01:
image: docker.elastic.co/elasticsearch/elasticsearch:7.15.2
container_name: es01
environment:
- node.name=es01
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- data01:/usr/share/elasticsearch/data
ports:
- 9200:9200
networks:
- elastic

# The buffer database
statetrace_db:
image: postgres
environment:
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres

command:
- "postgres"
- "-p"
- "5433"
ports:
- "5433:5433"

Configure Statetrace

We will use the following as our configuration for statetrace.

STATETRACE_INBOUND_CONFIG
inbounds:
- name: Postgres DB
database_type: postgres
username: postgres
password: postgres
database: postgres
host: localhost
publication: "statetrace"
slot: "statetrace"
port: 5432
log_new_attributes: true
outbounds:
- name: Webhook
handler: webhook
target_url: "http://es01:9200/logs-statetrace_dump/_bulk"
middleware: |
fn
%{relation_name: tn} when tn in ["my_special_table"] -> :ok
_ -> :skip
end
request_builder: |
fn rows ->
payload =
rows
|> Enum.flat_map(fn row ->
[
Jason.encode_to_iodata!(%{"create" => %{}}),
"\n",
Jason.encode_to_iodata!(Map.merge(%{"@timestamp" => row["row_timestamp"]}, row)),
"\n"
]
end)
%WebhookRequest{method: "POST", headers: [{"content-type", "application/json"}], payload: payload}
end

Middleware

An outbound middleware runs on every row and tells whether or not to include it in the outbound request. Elixir's pattern matching syntax makes it a breeze to match the tables we want to send to Elasticsearch.

In this example, we will only put rows from my_special_table into the index.

Request Builder

In our webhook request, we return a closure that builds our WebhookRequest. The WebhookRequest allows you to set the method, headers and payload of your HTTP request as well as control what fields will be included in the payload. If you want to grab an environment variable, do it outside of the closure.

In Elasticsearch, bulk data is sent as newline-delimited JSON so we will have to format our paylaod accordingly.

Configure Statetrace in Docker-Compose

Now we will add Statetrace to our Docker-compose settings

Get your License

Go to https://www.statetrace.com/statetrace-core to get your free STATETRACE_LICENSE key. No email or sign-up required.

Make a secret

Run the following command to generate a STATETRACE_SECRET_KEY

head -c 500 /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 32 | head -n 1

Add to YAML

Taking our Statetrace License, our STATETRACE_INBOUND_CONFIG above and tie it all together with the rest of our docker compose file.

docker-compose.yaml
version: '3'

services:
postgres:
image: postgres
environment:
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres

command:
- "postgres"
- "-c"
- "wal_level=logical"
- "-c"
- "max_wal_senders=2"
- "-c"
- "max_replication_slots=2"
ports:
- "5432:5432"

es01:
image: docker.elastic.co/elasticsearch/elasticsearch:7.15.2
container_name: es01
environment:
- node.name=es01
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- data01:/usr/share/elasticsearch/data
ports:
- 9200:9200
networks:
- elastic

# The buffer database
statetrace_db:
image: postgres
environment:
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres

command:
- "postgres"
- "-p"
- "5433"
ports:
- "5433:5433"


statetrace:
image: statetraceofficial/statetrace-beta
environment:
STATETRACE_DATABASE_URL: postgres://postgres:postgres@statetrace_db:5433/postgres
STATETRACE_SECRET_KEY_BASE: "123456789123456789123456789123456789123456789123456789123456789123456789"
STATETRACE_LICENSE: "<statetrace_license>"
STATETRACE_INBOUND_CONFIG: |
inbounds:
- name: Postgres DB
database_type: postgres
username: postgres
password: postgres
database: postgres
host: localhost
publication: "statetrace"
slot: "statetrace"
port: 5432
log_new_attributes: true
outbounds:
- name: Webhook
handler: webhook
target_url: "http://es01:9200/logs-statetrace_dump/_bulk"
middleware: |
fn
%{relation_name: tn} when tn in ["my_special_table"] -> :ok
_ -> :skip
end
request_builder: |
fn rows ->
payload =
rows
|> Enum.flat_map(fn row ->
[
Jason.encode_to_iodata!(%{"create" => %{}}),
"\n",
Jason.encode_to_iodata!(Map.merge(%{"@timestamp" => row.row_timestamp}, row)),
"\n"
]
end)
%WebhookRequest{method: "POST", headers: [{"content-type", "application/json"}], payload: payload}
end
depends_on:
- statetrace_db
- postgres
- es01

Now all changes happening in the postgres database that match the middleware will be streamed into Elasticsearch under the logs-statetrace_dump index.

Conclusion

Keeping Postgres synchronized with Elasticsearch doesn't need to be hard. We have shown that we can configure the standard webhooks found in Statetrace and use them to push data into the search engine without a lot of configuration.

You can expand on this method to customize the data that you choose to index.