Need to connect Postgres to Elasticsearch? Statetrace makes it easy to stream changes from your application database into the popular search engine.
Postgres is great for a lot of things, however sometimes your data access patterns require a search engine like Elasticsearch. Unfortunately it can be tricky to keep Postgres synchronized with Elasticsearch when data changes. Luckily, Statetrace makes it easy.
In this article we will configure a local Postgres instance with Statetrace and pipe all changes into Elasticsearch for easy querying.
Configure the Databases in Docker-Compose
I will use the standard docker-compose.yaml from the tutorial. with the addition of the elasticsearch container.
version: '3'
services:
postgres:
image: postgres
environment:
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
command:
- "postgres"
- "-c"
- "wal_level=logical"
- "-c"
- "max_wal_senders=2"
- "-c"
- "max_replication_slots=2"
ports:
- "5432:5432"
es01:
image: docker.elastic.co/elasticsearch/elasticsearch:7.15.2
container_name: es01
environment:
- node.name=es01
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- data01:/usr/share/elasticsearch/data
ports:
- 9200:9200
networks:
- elastic
# The buffer database
statetrace_db:
image: postgres
environment:
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
command:
- "postgres"
- "-p"
- "5433"
ports:
- "5433:5433"
Configure Statetrace
We will use the following as our configuration for statetrace.
inbounds:
- name: Postgres DB
database_type: postgres
username: postgres
password: postgres
database: postgres
host: localhost
publication: "statetrace"
slot: "statetrace"
port: 5432
log_new_attributes: true
outbounds:
- name: Webhook
handler: webhook
target_url: "http://es01:9200/logs-statetrace_dump/_bulk"
middleware: |
fn
%{relation_name: tn} when tn in ["my_special_table"] -> :ok
_ -> :skip
end
request_builder: |
fn rows ->
payload =
rows
|> Enum.flat_map(fn row ->
[
Jason.encode_to_iodata!(%{"create" => %{}}),
"\n",
Jason.encode_to_iodata!(Map.merge(%{"@timestamp" => row["row_timestamp"]}, row)),
"\n"
]
end)
%WebhookRequest{method: "POST", headers: [{"content-type", "application/json"}], payload: payload}
end
Middleware
An outbound middleware runs on every row and tells whether or not to include it in the outbound request. Elixir's pattern matching syntax makes it a breeze to match the tables we want to send to Elasticsearch.
In this example, we will only put rows from my_special_table
into the index.
Request Builder
In our webhook request, we return a closure that builds our WebhookRequest. The WebhookRequest allows you to set the method, headers and payload of your HTTP request as well as control what fields will be included in the payload. If you want to grab an environment variable, do it outside of the closure.
In Elasticsearch, bulk data is sent as newline-delimited JSON so we will have to format our paylaod accordingly.
Configure Statetrace in Docker-Compose
Now we will add Statetrace to our Docker-compose settings
Get your License
Go to https://www.statetrace.com/statetrace-core to get your free STATETRACE_LICENSE
key. No email or sign-up required.
Make a secret
Run the following command to generate a STATETRACE_SECRET_KEY
head -c 500 /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 32 | head -n 1
Add to YAML
Taking our Statetrace License, our STATETRACE_INBOUND_CONFIG above and tie it all together with the rest of our docker compose file.
version: '3'
services:
postgres:
image: postgres
environment:
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
command:
- "postgres"
- "-c"
- "wal_level=logical"
- "-c"
- "max_wal_senders=2"
- "-c"
- "max_replication_slots=2"
ports:
- "5432:5432"
es01:
image: docker.elastic.co/elasticsearch/elasticsearch:7.15.2
container_name: es01
environment:
- node.name=es01
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- data01:/usr/share/elasticsearch/data
ports:
- 9200:9200
networks:
- elastic
# The buffer database
statetrace_db:
image: postgres
environment:
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
command:
- "postgres"
- "-p"
- "5433"
ports:
- "5433:5433"
statetrace:
image: statetraceofficial/statetrace-beta
environment:
STATETRACE_DATABASE_URL: postgres://postgres:postgres@statetrace_db:5433/postgres
STATETRACE_SECRET_KEY_BASE: "123456789123456789123456789123456789123456789123456789123456789123456789"
STATETRACE_LICENSE: "<statetrace_license>"
STATETRACE_INBOUND_CONFIG: |
inbounds:
- name: Postgres DB
database_type: postgres
username: postgres
password: postgres
database: postgres
host: localhost
publication: "statetrace"
slot: "statetrace"
port: 5432
log_new_attributes: true
outbounds:
- name: Webhook
handler: webhook
target_url: "http://es01:9200/logs-statetrace_dump/_bulk"
middleware: |
fn
%{relation_name: tn} when tn in ["my_special_table"] -> :ok
_ -> :skip
end
request_builder: |
fn rows ->
payload =
rows
|> Enum.flat_map(fn row ->
[
Jason.encode_to_iodata!(%{"create" => %{}}),
"\n",
Jason.encode_to_iodata!(Map.merge(%{"@timestamp" => row.row_timestamp}, row)),
"\n"
]
end)
%WebhookRequest{method: "POST", headers: [{"content-type", "application/json"}], payload: payload}
end
depends_on:
- statetrace_db
- postgres
- es01
Now all changes happening in the postgres database that match the middleware will be streamed into Elasticsearch under the logs-statetrace_dump
index.
Conclusion
Keeping Postgres synchronized with Elasticsearch doesn't need to be hard. We have shown that we can configure the standard webhooks found in Statetrace and use them to push data into the search engine without a lot of configuration.
You can expand on this method to customize the data that you choose to index.