Parallel jobs in Postgres¶
Solution to implement parallel jobs in Postgres
Problem¶
Common approach for running time consuming or maintenance tasks in databases is using job processes.
Two classic extensions that allow to launch scheduled jobs in Postgres are pgAgent
and pg_cron
.
Problem is that if you have database hosted in a cloud they might be unavailable. This happens for Postgres RDS in AWS cloud.
One of the unofficial recommendation from AWS team on support forum was to launch EC2 instance and trigger via crontab on that server psql which will execute some script or call an API on database side.
Such approach requires from you to keep (and pay for) separate EC2 instance. It has to be continiously up and running. Another advise was even more complex setup with Lambda functions.
Solution¶
Fortunately, there is dblink
extension exists which allows to perform asynchronous calls to remote database over the database link.
After sending such command you can periodically ping spawned remote session whether the call has completed.
If you create database link pointed to itself then you obtain a job executed in parallel to current session.
This is not fully scheduled approach through.
But you can have always running session which will call pg_sleep
on required interval and thus will mimic scheduler daemon.
And luckily for us, this extension is available for installation on AWS RDS instance.
Installation¶
Note: Unless explicitly specified all commands are executed under default superuser postgres
account.
dblink
extension is available as part of default postgres installation
=> select * from pg_available_extensions where name = 'dblink';
-- name | default_version | installed_version | comment
-- --------+-----------------+-------------------+-------------------------------------------------------------
-- dblink | 1.2 | | connect to other PostgreSQL databases from within a database
Good practice is to install extensions into individual schemas. Commands below install extension into the schema named as extension itself.
You can observe all installed module functions via psql \df dblink.
command.
To start using module we need to find out how to create dblink and connect. Methods of interest could be listed as:
=> select distinct proname from pg_proc where pronamespace = 'dblink'::regnamespace and proname like '%connect%';
proname
------------------------
dblink_connect
dblink_connect_u
dblink_disconnect
dblink_get_connections
Connection¶
In order to establish connection to a remote database we should use dblink_connect
method.
It accepts either fully composed connection string or the name of the foreign server.
In order do not specify all connection parameters each time it is a good idea to define foreign server once and use it name instead.
Foreign server is just a metadata object which stores host, port and target database name. Common pattern is to create it under privileged account. Ordinary users should be granted usage access on it.
For each user that will use dblink we also should create a user mapping. This is another metadata object which stores user credentials which will be used to establish remote session. In case of dblink self reference we can use for mapping the same user who originates call over dblink.
=> create server localhost foreign data wrapper dblink_fdw options (host 'localhost', dbname 'postgres', port '5432');
CREATE SERVER
=> create user dbl password 'dbl';
CREATE ROLE
=> create user mapping for dbl server localhost options (user 'dbl', password 'dbl');
CREATE USER MAPPING
=> grant usage on foreign server localhost to dbl;
GRANT
=> grant usage on schema dblink to dbl;
GRANT
=> grant execute on all functions in schema dblink to dbl;
GRANT
Let's create a test table in default public
schema.
=> create table t as select id from generate_series(1, 10) t(id);
SELECT 10
=> grant select on t to dbl;
GRANT
Now we are ready to connect under newly created dbl
user, create named dblink connection test
and select from t
table over it.
=> \c - dbl
You are now connected to database "postgres" as user "dbl".
=> select * from dblink.dblink_connect('test', 'localhost');
dblink_connect
----------------
OK
=> select * from dblink.dblink('test', 'select * from t') as t(id int);
id
----
1
2
3
4
5
6
7
8
9
10
(10 rows)
In order this to work you should have server authorization mode set in pg_hba.conf
to some value except default trust
.
In case of trust
any non-privileged user will be able to connect over dblink as privileged one.
This is security break and such behavior is not allowed. Setting authorization mode to md5
is a common choice.
=> select * from dblink.dblink_connect('test', 'localhost');
-- ERROR: password is required
-- DETAIL: Non-superuser cannot connect if the server does not request a password.
-- HINT: Target server's authentication method must be changed.
Parallel calls¶
The main feature of dblink
extension that makes it possible to use it for parallel jobs creation is dblink_send_query
method.
It sends query/command to remote server and returns immediately.
I.e. it runs asynchronously as Promise or Observable in web development.
Let's create simple test table which will be populated via parallel calls triggered from single main session.
It has job_id
identificator and ts
attribute to record timestamp when job will insert a row into the table.
=> \c - dbl
You are now connected to database "postgres" as user "dbl".
=> create table test(job_id int, ts timestamp default clock_timestamp());
CREATE TABLE
Each process will call function f
below.
It accepts job id and performs loop with 10 iterations.
During each iteration job checks whether it is allowed to insert a row.
If mod
between iteration number and job id is 0 then job will insert a row.
After that process freezes for 1 second via pg_sleep
call.
Finally it returns notification string value that it completed its execution.
=> create or replace function f(p_job int) returns text as $$
begin
for i in 1..10 loop
if mod(i, p_job) = 0 then
insert into test values (p_job);
end if;
perform pg_sleep(1);
end loop;
return concat('Job ', p_job, ' finished');
end; $$ language plpgsql;
CREATE FUNCTION
Now we start 3 independent job processes which will call f
function.
Initial calling loop finishes immediately. While each spawn job process will run for 10 secs.
do $$
begin
raise info using message = concat('start: ', clock_timestamp());
for i in 1..3 loop
perform dblink.dblink_connect('job' || i::text, 'localhost');
perform dblink.dblink_send_query('job' || i::text, format('select f(%s)', i));
end loop;
raise info using message = concat('end: ', clock_timestamp());
end;
$$;
-- INFO: start: 2020-06-18 21:30:09.409148+00
-- INFO: end: 2020-06-18 21:30:09.431779+00
After 10 seconds passed we can check test
table for the results.
It can be seen that all jobs were working simultaneously and inserting data on individual interval.
Job with id=1 inserted 10 rows, with id=2 - 5 rows and with id=3 - 3 rows.
Main thing is that it was happenning in parallel and asynchronously to main session.
=> select * from test order by ts;
job_id | ts
--------+----------------------------
1 | 2020-06-18 21:30:09.424624
1 | 2020-06-18 21:30:10.426445
2 | 2020-06-18 21:30:10.43233
1 | 2020-06-18 21:30:11.427722
3 | 2020-06-18 21:30:11.436677
1 | 2020-06-18 21:30:12.42902
2 | 2020-06-18 21:30:12.434721
1 | 2020-06-18 21:30:13.430361
1 | 2020-06-18 21:30:14.431699
2 | 2020-06-18 21:30:14.436725
3 | 2020-06-18 21:30:14.439847
1 | 2020-06-18 21:30:15.432963
1 | 2020-06-18 21:30:16.433618
2 | 2020-06-18 21:30:16.438628
1 | 2020-06-18 21:30:17.434656
3 | 2020-06-18 21:30:17.442745
1 | 2020-06-18 21:30:18.435624
2 | 2020-06-18 21:30:18.440616
(18 rows)
=> select job_id, count(*) from test group by job_id order by job_id;
job_id | count
--------+-------
1 | 10
2 | 5
3 | 3
Getting results¶
In previous section we successfully launched 3 sessions in parallel from within single session. One important missed piece through in provided example is that we didn't get any result of the remote calls from within the calling code itself. We checked it manually after spawned sessions finished execution.
For such purpose dblink
module provides dblink_is_busy
and dblink_get_result
methods.
Lets extend our example a bit and add results getting functionality into the code.
=> truncate table test;
TRUNCATE TABLE
do $$
declare
rec record;
result text;
begin
raise info using message = concat('start: ', clock_timestamp());
for i in 1..3 loop
perform dblink.dblink_connect('job' || i::text, 'localhost');
perform dblink.dblink_send_query('job' || i::text, format('select f(%s)', i));
end loop;
while array_length(dblink.dblink_get_connections(), 1) > 0
loop
for rec in
select unnest(dblink.dblink_get_connections()) val
loop
if dblink.dblink_is_busy(rec.val) = 0 then
select t.res
into result
from dblink.dblink_get_result(rec.val) as t(res text)
;
raise info using message = concat(rec.val, ': ', result);
perform dblink.dblink_disconnect(rec.val);
else
raise info using message = 'Job ' || rec.val || ' is busy';
end if;
end loop;
perform pg_sleep(1);
end loop;
raise info using message = concat('end: ', clock_timestamp());
end;
$$;
-- INFO: start: 2020-06-18 22:24:53.205129+00
-- INFO: Job job2 is busy
-- INFO: Job job1 is busy
-- INFO: Job job3 is busy
-- ...
-- INFO: Job job2 is busy
-- INFO: Job job1 is busy
-- INFO: Job job3 is busy
-- INFO: job2: Job 2 finished
-- INFO: job1: Job 1 finished
-- INFO: Job job3 is busy
-- INFO: job3: Job 3 finished
-- INFO: end: 2020-06-18 22:25:05.250809+00
We just added a WHILE
loop which was waiting for all opened connections to be closed.
Connection is closed right after getting signal from dblink_is_busy
method that it is not busy anymore.
Right before closure we fetch result of a remote call via dblink_get_result
method.
Whole run duration now takes the same 10 seconds as individual job run time.
In test
table we see the same picture as in previous example. All three jobs were running simultaneously with individual intervals.
=> select * from test order by ts;
job_id | ts
--------+----------------------------
1 | 2020-06-18 22:24:53.223376
1 | 2020-06-18 22:24:54.225418
2 | 2020-06-18 22:24:54.232214
1 | 2020-06-18 22:24:55.225772
3 | 2020-06-18 22:24:55.241323
1 | 2020-06-18 22:24:56.227109
2 | 2020-06-18 22:24:56.234673
1 | 2020-06-18 22:24:57.228453
1 | 2020-06-18 22:24:58.229832
2 | 2020-06-18 22:24:58.237117
3 | 2020-06-18 22:24:58.244731
1 | 2020-06-18 22:24:59.231053
1 | 2020-06-18 22:25:00.232278
2 | 2020-06-18 22:25:00.23957
1 | 2020-06-18 22:25:01.233572
3 | 2020-06-18 22:25:01.247735
1 | 2020-06-18 22:25:02.234838
2 | 2020-06-18 22:25:02.242008
(18 rows)
=> select job_id, count(*) from test group by job_id order by job_id;
job_id | count
--------+-------
1 | 10
2 | 5
3 | 3
Summary¶
We have learned how to spawn parallel processing jobs in Postgres via dblink
extension.
Also were able to pick-up results from remote calls from within parent session.
Major benefit of such approach is that dblink
extension is available on cloud based AWS RDS Postgres instance.
While standard pgAgent
and pg_cron
extensions are forbidden.