Skip to content

Parallel jobs in Postgres

Parallel jobs

Solution to implement parallel jobs in Postgres

Problem

Common approach for running time consuming or maintenance tasks in databases is using job processes. Two classic extensions that allow to launch scheduled jobs in Postgres are pgAgent and pg_cron. Problem is that if you have database hosted in a cloud they might be unavailable. This happens for Postgres RDS in AWS cloud. One of the unofficial recommendation from AWS team on support forum was to launch EC2 instance and trigger via crontab on that server psql which will execute some script or call an API on database side.

Such approach requires from you to keep (and pay for) separate EC2 instance. It has to be continiously up and running. Another advise was even more complex setup with Lambda functions.

Solution

Fortunately, there is dblink extension exists which allows to perform asynchronous calls to remote database over the database link. After sending such command you can periodically ping spawned remote session whether the call has completed. If you create database link pointed to itself then you obtain a job executed in parallel to current session. This is not fully scheduled approach through. But you can have always running session which will call pg_sleep on required interval and thus will mimic scheduler daemon.

And luckily for us, this extension is available for installation on AWS RDS instance.

Installation

Note: Unless explicitly specified all commands are executed under default superuser postgres account.

dblink extension is available as part of default postgres installation

=> select * from pg_available_extensions where name = 'dblink';
--   name  | default_version | installed_version |                           comment
-- --------+-----------------+-------------------+-------------------------------------------------------------
--  dblink | 1.2             |                   | connect to other PostgreSQL databases from within a database

Good practice is to install extensions into individual schemas. Commands below install extension into the schema named as extension itself.

=> create schema dblink;
CREATE SCHEMA
=> create extension dblink schema dblink;
CREATE EXTENSION

You can observe all installed module functions via psql \df dblink. command. To start using module we need to find out how to create dblink and connect. Methods of interest could be listed as:

=> select distinct proname from pg_proc where pronamespace = 'dblink'::regnamespace and proname like '%connect%';
        proname
------------------------
 dblink_connect
 dblink_connect_u
 dblink_disconnect
 dblink_get_connections

Connection

In order to establish connection to a remote database we should use dblink_connect method. It accepts either fully composed connection string or the name of the foreign server. In order do not specify all connection parameters each time it is a good idea to define foreign server once and use it name instead.

Foreign server is just a metadata object which stores host, port and target database name. Common pattern is to create it under privileged account. Ordinary users should be granted usage access on it.

For each user that will use dblink we also should create a user mapping. This is another metadata object which stores user credentials which will be used to establish remote session. In case of dblink self reference we can use for mapping the same user who originates call over dblink.

=> create server localhost foreign data wrapper dblink_fdw options (host 'localhost', dbname 'postgres', port '5432');
CREATE SERVER

=> create user dbl password 'dbl';
CREATE ROLE
=> create user mapping for dbl server localhost options (user 'dbl', password 'dbl');
CREATE USER MAPPING

=> grant usage on foreign server localhost to dbl;
GRANT
=> grant usage on schema dblink to dbl;
GRANT
=> grant execute on all functions in schema dblink to dbl;
GRANT

Let's create a test table in default public schema.

=> create table t as select id from generate_series(1, 10) t(id);
SELECT 10
=> grant select on t to dbl;
GRANT

Now we are ready to connect under newly created dbl user, create named dblink connection test and select from t table over it.

=> \c - dbl
You are now connected to database "postgres" as user "dbl".

=> select * from dblink.dblink_connect('test', 'localhost');
 dblink_connect
----------------
 OK

=> select * from dblink.dblink('test', 'select * from t') as t(id int);
 id
----
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
(10 rows)

In order this to work you should have server authorization mode set in pg_hba.conf to some value except default trust. In case of trust any non-privileged user will be able to connect over dblink as privileged one. This is security break and such behavior is not allowed. Setting authorization mode to md5 is a common choice.

=> select * from dblink.dblink_connect('test', 'localhost');
-- ERROR:  password is required
-- DETAIL:  Non-superuser cannot connect if the server does not request a password.
-- HINT:  Target server's authentication method must be changed.

Parallel calls

The main feature of dblink extension that makes it possible to use it for parallel jobs creation is dblink_send_query method. It sends query/command to remote server and returns immediately. I.e. it runs asynchronously as Promise or Observable in web development.

Let's create simple test table which will be populated via parallel calls triggered from single main session. It has job_id identificator and ts attribute to record timestamp when job will insert a row into the table.

=> \c - dbl
You are now connected to database "postgres" as user "dbl".

=> create table test(job_id int, ts timestamp default clock_timestamp());
CREATE TABLE

Each process will call function f below. It accepts job id and performs loop with 10 iterations. During each iteration job checks whether it is allowed to insert a row. If mod between iteration number and job id is 0 then job will insert a row. After that process freezes for 1 second via pg_sleep call. Finally it returns notification string value that it completed its execution.

=> create or replace function f(p_job int) returns text as $$
begin
   for i in 1..10 loop
      if mod(i, p_job) = 0 then
         insert into test values (p_job);
      end if;

      perform pg_sleep(1);
   end loop;

   return concat('Job ', p_job, ' finished');

end; $$ language plpgsql;

CREATE FUNCTION

Now we start 3 independent job processes which will call f function. Initial calling loop finishes immediately. While each spawn job process will run for 10 secs.

do $$
begin
   raise info using message = concat('start: ', clock_timestamp());

   for i in 1..3 loop
      perform dblink.dblink_connect('job' || i::text, 'localhost');
      perform dblink.dblink_send_query('job' || i::text, format('select f(%s)', i));
   end loop;

   raise info using message = concat('end: ', clock_timestamp());
end;
$$;

-- INFO:  start: 2020-06-18 21:30:09.409148+00
-- INFO:  end: 2020-06-18 21:30:09.431779+00

After 10 seconds passed we can check test table for the results. It can be seen that all jobs were working simultaneously and inserting data on individual interval. Job with id=1 inserted 10 rows, with id=2 - 5 rows and with id=3 - 3 rows. Main thing is that it was happenning in parallel and asynchronously to main session.

=> select * from test order by ts;
 job_id |             ts
--------+----------------------------
      1 | 2020-06-18 21:30:09.424624
      1 | 2020-06-18 21:30:10.426445
      2 | 2020-06-18 21:30:10.43233
      1 | 2020-06-18 21:30:11.427722
      3 | 2020-06-18 21:30:11.436677
      1 | 2020-06-18 21:30:12.42902
      2 | 2020-06-18 21:30:12.434721
      1 | 2020-06-18 21:30:13.430361
      1 | 2020-06-18 21:30:14.431699
      2 | 2020-06-18 21:30:14.436725
      3 | 2020-06-18 21:30:14.439847
      1 | 2020-06-18 21:30:15.432963
      1 | 2020-06-18 21:30:16.433618
      2 | 2020-06-18 21:30:16.438628
      1 | 2020-06-18 21:30:17.434656
      3 | 2020-06-18 21:30:17.442745
      1 | 2020-06-18 21:30:18.435624
      2 | 2020-06-18 21:30:18.440616
(18 rows)

=> select job_id, count(*) from test group by job_id order by job_id;
 job_id | count
--------+-------
      1 |    10
      2 |     5
      3 |     3

Getting results

In previous section we successfully launched 3 sessions in parallel from within single session. One important missed piece through in provided example is that we didn't get any result of the remote calls from within the calling code itself. We checked it manually after spawned sessions finished execution.

For such purpose dblink module provides dblink_is_busy and dblink_get_result methods. Lets extend our example a bit and add results getting functionality into the code.

=> truncate table test;
TRUNCATE TABLE

do $$
declare
   rec      record;
   result   text;
begin
   raise info using message = concat('start: ', clock_timestamp());

   for i in 1..3 loop
      perform dblink.dblink_connect('job' || i::text, 'localhost');
      perform dblink.dblink_send_query('job' || i::text, format('select f(%s)', i));
   end loop;

   while array_length(dblink.dblink_get_connections(), 1) > 0
   loop
      for rec in
         select unnest(dblink.dblink_get_connections()) val
      loop
         if dblink.dblink_is_busy(rec.val) = 0 then
            select t.res
              into result
              from dblink.dblink_get_result(rec.val) as t(res text)
            ;
            raise info using message = concat(rec.val, ': ', result);
            perform dblink.dblink_disconnect(rec.val);
         else
            raise info using message = 'Job ' || rec.val || ' is busy';
         end if;
      end loop;

      perform pg_sleep(1);
   end loop;

   raise info using message = concat('end: ', clock_timestamp());
end;
$$;

-- INFO:  start: 2020-06-18 22:24:53.205129+00
-- INFO:  Job job2 is busy
-- INFO:  Job job1 is busy
-- INFO:  Job job3 is busy
-- ...
-- INFO:  Job job2 is busy
-- INFO:  Job job1 is busy
-- INFO:  Job job3 is busy
-- INFO:  job2: Job 2 finished
-- INFO:  job1: Job 1 finished
-- INFO:  Job job3 is busy
-- INFO:  job3: Job 3 finished
-- INFO:  end: 2020-06-18 22:25:05.250809+00

We just added a WHILE loop which was waiting for all opened connections to be closed. Connection is closed right after getting signal from dblink_is_busy method that it is not busy anymore. Right before closure we fetch result of a remote call via dblink_get_result method. Whole run duration now takes the same 10 seconds as individual job run time.

In test table we see the same picture as in previous example. All three jobs were running simultaneously with individual intervals.

=> select * from test order by ts;
 job_id |             ts
--------+----------------------------
      1 | 2020-06-18 22:24:53.223376
      1 | 2020-06-18 22:24:54.225418
      2 | 2020-06-18 22:24:54.232214
      1 | 2020-06-18 22:24:55.225772
      3 | 2020-06-18 22:24:55.241323
      1 | 2020-06-18 22:24:56.227109
      2 | 2020-06-18 22:24:56.234673
      1 | 2020-06-18 22:24:57.228453
      1 | 2020-06-18 22:24:58.229832
      2 | 2020-06-18 22:24:58.237117
      3 | 2020-06-18 22:24:58.244731
      1 | 2020-06-18 22:24:59.231053
      1 | 2020-06-18 22:25:00.232278
      2 | 2020-06-18 22:25:00.23957
      1 | 2020-06-18 22:25:01.233572
      3 | 2020-06-18 22:25:01.247735
      1 | 2020-06-18 22:25:02.234838
      2 | 2020-06-18 22:25:02.242008
(18 rows)

=> select job_id, count(*) from test group by job_id order by job_id;
 job_id | count
--------+-------
      1 |    10
      2 |     5
      3 |     3

Summary

We have learned how to spawn parallel processing jobs in Postgres via dblink extension. Also were able to pick-up results from remote calls from within parent session. Major benefit of such approach is that dblink extension is available on cloud based AWS RDS Postgres instance. While standard pgAgent and pg_cron extensions are forbidden.

Comments