Asynchronous PostgreSQL

Sophisticated

Delayed Processing

For safe signal handling, Perl now internally manages a number of flags. If a script installs a handler for a signal, the interpreter does not call the Perl function when the signal arrives. Instead, the signal handler is in fact an internal function, which only sets one of the flags. At appropriate points, the Perl interpreter then checks, using the macro PERL_ASYNC_CHECK, whether any of the flags are set and, if so, calls the signal handler in Perl. Figure 5 represents this schematically. Because the signal handler now only changes its own data structures, there are no more unexpected program crashes. In this sense, the signal treatment is safe.

Figure 5: Secure Signals: The C signal handler only sets the flags. The Perl signal handler then runs separately later to avoid conflicts when accessing global variables.

Unfortunately, timely delivery of signals can get stuck in the pipeline. The problem results from a race condition. The can_read call in line 23 calls the Perl select command at some time. Within this command, what happens at the C level is approximately this:

PERL_ASYNC_CHECK;
/* Some other C commands */
select(...);       <-- Syscall

A check is performed to see if signals are pending. This check is followed by a series of commands to convert the parameters of the Perl select command into parameters that the kernel select call understands. Finally, the kernel is called, and the process is blocked. But, what happens if the signal arrives between PERL_ASYNC_CHECK and the kernel call? In this case, the C-level signal handler sets its bit as usual. However, it cannot prevent the process blocking in the system call. The signal is therefore not delivered to the Perl handler until the system call returns. For the CGI program, it will look as if the SIGTERM has disappeared. Apache then sends the SIGKILL shortly thereafter, and the CGI program aborts, but the database process continues.

Correct Signal Handling

The problem is that the program waits for two things at the same time: Signals and data from the socket. Three possible approaches could solve this problem:

  • The kernel is told to send a signal when the socket becomes ready to be read [3].
  • The signal is mapped to a kind of file descriptor that becomes readable when a signal is received.
  • The kernel implements a system call, to which the expected signals and file descriptors are passed; it works much like select [4].

All three types are implemented on Linux and, thanks to additional modules from CPAN, can also be used with Perl. In the following discussion, I will focus on the second approach.

A fairly compatible method of implementing this on a Unix-like system is known as the "self-pipe trick" [5]. First, you create a pipe for each expected signal. Both ends of the pipe are switched to non-blocking mode. For this purpose, the signal handler to be implemented in C then writes an arbitrary byte to the pipe. This can go wrong if the pipe is full, but it will not block the process because the file descriptor is non-blocking. Now the read end of the pipe, and other file descriptors, can be monitored with an ordinary select, poll, or epoll call.

Economical

This method is very wasteful, however. To manage one bit of information, a multiple-kilobyte buffer is created in the kernel. The Linux developers have implemented an antidote for this in the form of the system calls signalfd and eventfd. The CPAN Linux::FD module [6] supports both.

For Perl programmers, the AnyEvent module [7] with libev [8] as the back end is more convenient. This uses eventfd if the call is available; otherwise, a self-pipe is created.

Listing 6 shows the program. The interesting part here is played by the query function. If you're not familiar with AnyEvent, see the "AnyEvent – A Brief Overview" box at this point.

AnyEvent – A Brief Overview

AnyEvent provides a framework for event-based programming in Perl. Thus, the approach is somewhat reminiscent of old Windows programs or programs in JavaScript for the browser. The main idea is that precisely one point in the program, the event loop, waits for external events. When an event arrives, it is processed without blocking the process. The program then returns to the event loop and waits for the next event.

Strictly speaking, the concept is very difficult to implement. For example, you could not use the pg_cancel function because it blocks in several places (e.g., in connect, poll).

The central part of an event-based program is the event loop. In AnyEvent, it is generated with a condition variable. Line 14 in Listing 6 creates one. The call to $done->wait in line 29 then represents the actual event loop.

The loop is infinite, so there must be a way to cancel it. In AnyEvent, the send method handles this (e.g., in line 17). Thus, a flag is set that causes the event loop to leave the loop after its done processing the current event. Line 30 is thus only reached if $done->send is called in line 17 or line 20.

Other important ingredients in event-based programming are the events themselves. JavaScript programmers will probably immediately think addEventListener here or recall the various onXYZ attributes in HTML. AnyEvent uses watchers. The program in Listing 6 uses two types of watchers: watchers for signals and watchers for I/O. The AE::io call in line 19 generates a watcher, which calls the function passed into it as the last parameter once the file descriptor $dbh->{pg_socket} becomes readable. In JavaScript, that would be comparable to the readystatechange event of the XMLHttpRequest object.

Listing 6

burn3.pl

01 #!/usr/bin/perl
02
03 use common::sense;
04 use DBI;
05 use DBD::Pg qw/:async/;
06 use AnyEvent;
07 use POSIX qw/SIGTERM SIGINT SIG_BLOCK SIG_UNBLOCK/;
08
09 sub query {
10   my $sql=pop;
11   state $dbh||=DBI->connect('dbi:Pg:dbname=r2', 'ipp', undef, {RaiseError=>1});
12   my $stmt=$dbh->prepare($sql, {pg_async=>PG_ASYNC});
13
14   my $done=AE::cv;
15   my $cancel=sub {
16     $dbh->pg_cancel if $dbh->{pg_async_status}==1;
17     $done->send;
18   };
19   my $pg_w=AE::io $dbh->{pg_socket}, 0, sub {
20     $dbh->pg_ready and $done->send;
21   };
22
23   my $sigblock=POSIX::SigSet->new(SIGTERM, SIGINT);
24   POSIX::sigprocmask SIG_BLOCK, $sigblock;
25   my @sig_w=map {AE::signal $_, $cancel} qw/TERM INT/;
26   $stmt->execute(@_);
27   POSIX::sigprocmask SIG_UNBLOCK, $sigblock;
28
29   $done->wait;
30
31   return $dbh->{pg_async_status}==1 ? ($dbh->pg_result, $stmt) : ();
32 }
33
34 print "Status: 200\nContent-Type: text/plain\n\n";
35 $|=1; $|=0;                     # flush
36
37 my ($rc, $sth)=query($ENV{QUERY_STRING} || '1s', 'select burncpu(?)');
38
39 if( defined $rc ) {
40   while( my $row=$sth->fetchrow_arrayref ) {
41     print "@$row\n";
42   }
43 } else {
44   warn "query cancelled\n";
45 }

Buy ADMIN Magazine

SINGLE ISSUES
 
SUBSCRIPTIONS
 
TABLET & SMARTPHONE APPS
Get it on Google Play

US / Canada

Get it on Google Play

UK / Australia

Related content

comments powered by Disqus