You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1353 lines
42 KiB

#
# $Id: dm.pm,v 1.1.1.1 2004/01/09 09:22:07 jeffo Exp $
#
# $Source: /usr/local/cvsroot/Testmanager/cgi-bin/dm.pm,v $
#
# Routines for manipulating (ie., querying, updating) a data model.
#
#############################################################################
# #
# COPYRIGHT NOTICE #
# #
# This code was written by Ed Loehr, all rights reserved, 1999. You are #
# hereby granted a non-exclusive license to do whatever you want with #
# this code, including modification and redistribution, so long as you #
# retain this copyright notice in the code and do not attempt to prevent #
# anyone else from the same privileges. #
# #
# ALL SOFTWARE HAS BUGS. THIS CODE COMES "AS-IS" WITH ABSOLUTELY NO #
# WARRANTY OR GUARANTEE OF ITS FITNESS FOR ANY PURPOSE WHATSOEVER. #
# BY INCORPORATING THIS CODE, YOU ASSUME COMPLETE RISK FOR ANY #
# CONSEQUENCES AND AGREE NOT TO BRING ANY GRIEVANCE OR LIABILITTY #
# CLAIMS WHATSOVER AGAINST Ed Loehr. #
# #
#############################################################################
package dm;
use DBI;
use logger;
use toolbox;
@dm::initOptions = qw( dbkind dbhost dbname dbport dbuser dbpasswd dbhname );
BEGIN {
$dm::default_dbhname = 'tmdbh';
$dm::dbDefault{paramsep} = ' | ';
$dm::dbDefault{dbkind} = "Pg";
$dm::dbDefault{dbhost} = $ENV{PGHOST} || "localhost";
$dm::dbDefault{dbname} = $ENV{PGDATABASE} || `whoami`;
chomp $dm::dbDefault{dbname};
$dm::dbDefault{dbport} = $ENV{PGPORT} || 5432;
$dm::dbDefault{dbuser} = $ENV{PGUSER} || `whoami`;
chomp $dm::dbDefault{dbuser};
$dm::dbDefault{dbpasswd} = undef;
$dm::dbDefault{dbhname} = $dm::default_dbhname;
}
$dm::app_abbreviation = 'tm';
$dm::run_dir = "/tmp/".$dm::app_abbreviation."/run";
$dm::down_file = "$dm::run_dir/dbdown_$dm::app_abbreviation";
$dm::touchfile_dir = $dm::run_dir;
$dm::log_touchfile = "$dm::touchfile_dir/log_$dm::app_abbreviation";
$dm::debug_touchfile = "$dm::touchfile_dir/debug_$dm::app_abbreviation";
$dm::profile_touchfile = "$dm::touchfile_dir/profile_$dm::app_abbreviation";
$dm::explain_touchfile = "$dm::touchfile_dir/explain_$dm::app_abbreviation";
$dm::showdata_touchfile = "$dm::touchfile_dir/showdata_$dm::app_abbreviation";
$dm::cleanup_interval = 100; # cleanup (vacuum) after this many transactions...
$dm::query_count = 0;
$dm::reconnect_tries = 0;
$dm::reconnect_limit = 2;
$dm::vacuum_tries = 0;
$dm::vacuum_limit = 1;
#############################################################################
## Query Cache NOTES... ############################################
#############################################################################
# This is a poor-man's query cache. We k=
#############################################################################
## Query Cache Settings... ############################################
#############################################################################
# FIXME: Turning the query cache off for now because the race condition
# FIXME: is too nasty in the problems it leads to. I think we really must
# FIXME: push any query caching of dynamic data down into the DB.
$dm::query_cache_enabled = 1;
$dm::log_cache_actions = 0;
$dm::log_cache_savings = 0;
$dm::cache_everything = 0;
# I dropped the supporting triggers for this table dependency
# functionality. They must be present for this to work!!! -efl, May2001
$dm::use_dependent_table_strategy = 0;
$dm::derive_unstated_table_dependencies = 0;
%dm::query_cache = ();
$dm::query_cache_size = 0;
$dm::query_cache_hits = 0;
$dm::query_cache_misses = 0;
$dm::show_query_cache_hitmiss = 1;
$dm::max_cache_size = 2000000;
$dm::last_table_status_check_time = 1;
%dm::table_status = (); # storage of last update time for tables ...
$dm::tolerable_cache_race_window = 0.51; # seconds
#############################################################################
$dm::pending_queries = 0;
$dm::dberrstr = undef; # For forwarding to client upon request
#
# Postgres doesn't return error codes, only msgs, so we have to handle them...
#
$dm::PGIDX_SCAN_ERR = "ExecInitIndexScan";
$dm::PGIDX_SCAN_ERR_LEN = length($dm::PGIDX_SCAN_ERR);
#
use constant PGSQL => 1;
use constant ORACLE => 2;
use constant SQLFLEX => 3;
use constant SYBASE => 4;
$dm::dbtype = dm::PGSQL;
$dm::time_to_die = 0;
sub TimeToDie {
return $dm::time_to_die;
}
sub SetTimeToDie {
$dm::time_to_die = 1;
}
sub SetCacheRaceTolerance {
my ($tol) = shift;
$dm::tolerable_cache_race_window = (defined($tol) ? $tol : 0.26);
return $tol;
}
sub getDownFileName {
return $dm::down_file;
}
sub systemDown {
if ( ! -f getDownFileName() ) {
return 0;
}
return 1;
}
sub getDBErrStr {
return $dm::dberrstr;
}
sub getRawDBErrStr {
return $dm::rawdberrstr;
}
sub setRawDBErrStr {
my ($str) = @_;
if ( defined($str) && length($str) ) {
$dm::rawdberrstr = $str;
} else {
$dm::rawdberrstr = undef;
}
}
sub setDBErrStr {
my ($str) = @_;
if ( defined($str) && length($str) ) {
setRawDBErrStr($str);
$dm::dberrstr = logger::timestamp()." $str";
} else {
$dm::dberrstr = undef;
}
}
sub addRawDBErrStr {
my ($str) = @_;
if ( defined($str) && length($str) ) {
$dm::rawdberrstr .= $str;
}
}
sub addDBErrStr {
my ($str) = @_;
if ( defined($str) && length($str) ) {
addRawDBErrStr($str);
$dm::dberrstr = logger::timestamp()." $str";
}
}
#
# I wanted a really easy way to flip on debugging during development,
# so I made this rather crude file system switch that allows me to
# turn on debugging output without restarting any systems. This may
# not be too desirable in a hi-perf situation, but it's worked well
# so far for my development purposes.
#
sub SetDBGSettings {
my ($opts) = @_;
if ( -f $dm::log_touchfile || $opts->{log} ) {
$dm::log = 1;
} else {
$dm::log = 0;
}
if ( -f $dm::debug_touchfile || $opts->{debug} ) {
$dm::debug = 1;
} else {
$dm::debug = 0;
}
if ( -f $dm::showdata_touchfile || $opts->{showdata} ) {
$dm::showdata = 1;
} else {
$dm::showdata = 0;
}
if ( -f $dm::profile_touchfile || $opts->{profile} ) {
$dm::profile = 1;
} else {
$dm::profile = 0;
}
if ( -f $dm::explain_touchfile || $opts->{explain} ) {
$dm::explain = 1;
} else {
$dm::explain = 0;
}
}
sub GetDBH {
my ($opts) = @_;
my $dbh;
eval { $dbh = _GetDBH($opts); };
if ( $@ ) {
logger::logerr($@);
return undef;
}
return $dbh;
}
sub _GetDBH {
my ($opts) = @_;
foreach my $o ( @initOptions ) {
$dm::cfg{$o} = ($opts->{$o} ||= $dm::dbDefault{$o});
$opts->{$o} = $dm::cfg{$o} if ( $opts->{loadme} );
logger::loginfo("dm::cfg{$o} = '$dm::cfg{$o}'") if ( $dm::debug );
}
my $options = "";
my $dsn = "dbi:$dm::cfg{dbkind}:dbname=$dm::cfg{dbname};host=$dm::cfg{dbhost};port=$dm::cfg{dbport}";
$dm::cfg{dbopts} = {
ChopBlanks => defined($dm::cfg{ChopBlanks}) ? $dm::cfg{ChopBlanks} : 1,
AutoCommit => defined($dm::cfg{AutoCommit}) ? $dm::cfg{AutoCommit} : 0,
};
logger::loginfo("Retrieving DBH for '$dm::cfg{dbhname}' (default '$dm::dbDefault{dbhname}')...") if ( $dm::debug );
# $dm::dbhCache; # hash of various db handles...
if ( ! defined($dm::dbhCache{$dm::cfg{dbhname}}) ) {
logger::loginfo("Connecting to $dsn as user $dm::cfg{dbuser}") if ( $dm::log );
my $t0 = toolbox::TimeNow();
logger::loginfo("DBI->connect($dsn, $dm::cfg{dbuser}, $dm::cfg{dbpasswd})") if ( $dm::log );
$dm::dbhCache{$dm::cfg{dbhname}} = DBI->connect($dsn, $dm::cfg{dbuser}, $dm::cfg{dbpasswd}, $dm::cfg{dbopts});
if ( ! $dm::dbhCache{$dm::cfg{dbhname}} ) {
logger::logerr("DBI->connect($dsn, $dm::cfg{dbuser}, $dm::cfg{dbpasswd}, { ".join($dm::cfg{paramsep}, $dm::cfg{dbopts})." }) FAILED; DBI::errstr = [$DBI::errstr]");
} elsif ($dm::profile) {
logger::loginfo("Successfully connected to database $dsn");
logger::loginfo(toolbox::ElapsedTimeString("DBI connect: %.3f secs\n",$t0));
}
}
return $dm::dbhCache{$dm::cfg{dbhname}};
}
sub CloseDB {
my ($opts) = @_;
my $dbhname = $opts->{dbhname} || $dm::cfg{dbhname};
if ( defined($dm::dbhCache{$dbhname}) ) {
logger::loginfo("Nuking db handle for $dbhname ...") if ( $dm::log );
$dm::dbhCache{$dbhname}->disconnect;
$dm::dbhCache{$dbhname} = undef;
}
}
SetDBGSettings;
sub QueryCount( $ ) {
my $reset = shift;
my $count = $dm::query_count;
if ( $reset ) {
$dm::query_count = 0;
%dm::query_profile = ();
}
return $count;
}
sub IncQueryCount {
my ($sql,$et) = @_;
$dm::query_profile{$sql}{'count'}++;
$dm::query_profile{$sql}{'et'} += $et;
return ( ++$dm::query_count );
}
sub DumpQueryProfile {
#
# Sort first by total time spent in the query, then by avg time
# per query...
#
my $sort_func = sub {
( $dm::query_profile{$a}{'et'} <=>
$dm::query_profile{$b}{'et'} ) ||
# ( ($dm::query_profile{$a}{'et'} /
# $dm::query_profile{$a}{'count'}) <=>
# ($dm::query_profile{$b}{'et'} /
# $dm::query_profile{$b}{'count'}) ) ||
($dm::query_profile{$a}{'count'} <=>
$dm::query_profile{$b}{'count'})
};
foreach my $q ( sort $sort_func keys %dm::query_profile ) {
print STDERR "##############################################\n";
print STDERR "##############################################\n";
print STDERR "##############################################\n";
print STDERR "QUERY: [$q]\n";
printf STDERR sprintf("TOT ET: %.3f secs\n",
$dm::query_profile{$q}{'et'});
print STDERR "COUNT: [".$dm::query_profile{$q}{'count'}."]\n";
my $avg = ($dm::query_profile{$q}{'count'} ?
($dm::query_profile{$q}{'et'} /
$dm::query_profile{$q}{'count'}) : 0);
printf STDERR sprintf("AVG ET: %.3f secs\n", $avg);
print STDERR "CACHE HITS: ".($dm::query_profile{$q}{'hits'}||0)."\n";
print STDERR "CACHE SAVINGS: ".sprintf("%.3f secs\n", (($dm::query_profile{$q}{'hits'}||0)*($avg)));
}
}
#
# Return the cached data, or undef if not present...
#
# FIXME: To use multiple DBs, we'll need to cache by dbhname here...
sub QueryResultIsCached {
my ($sql, $key, $opts) = @_;
if ( $opts->{deptables} && ! $dm::use_dependent_table_strategy ) {
$dm::query_cache_misses++;
return undef;
}
#
# cache entry non-existent?
#
# logger::loginfo("Checking for cache entry $key existence ...");
if ( ! exists($dm::query_cache{$key}{'data'}) ) {
$dm::query_cache_misses++;
return (undef,undef);
}
#
# cache entry expired?
#
if ( exists($dm::query_cache{$key}{'expires'}) ) {
# logger::loginfo("OK. Checking for cache entry $key expiration ...");
if ( $dm::query_cache{$key}{'expires'} <= toolbox::NowAsScalar() ) {
# logger::loginfo("Cache entry $key expired...");
$dm::query_cache_misses++;
DeleteFromCache($key);
return (undef,undef);
}
}
#
# Dependent tables changed?
#
if ( $dm::use_dependent_table_strategy &&
exists($dm::query_cache{$key}{deptables}) ) {
# logger::loginfo("OK. Checking for stale dependent table data ...");
foreach my $deptable ( @{$dm::query_cache{$key}{deptables}} ) {
# logger::loginfo("OK. Checking staleness of $deptable ...");
if ( StaleTableData($deptable, $dm::query_cache{$key}{time}) ) {
logger::loginfo("Dependent table $deptable is stale...") if ( $dm::log_cache_actions );
$dm::query_cache_misses++;
DeleteFromCache($key);
return (undef,undef);
}
}
}
#
# cache entry transaction completed/gone?
#
if ( exists($dm::query_cache{$key}{'cutoff_txid'}) && $opts->{cutoff_txid}){
logger::loginfo("OK. Checking cache entry $key txid cutoff ...");
if ( $dm::query_cache{$key}{'cutoff_txid'} < $opts->{cutoff_txid} ) {
# logger::loginfo("Cache entry $key txid cutoff $dm::query_cache{$key}{'cutoff_txid'} passed by $opts->{cutoff_txid} ...");
$dm::query_cache_misses++;
DeleteFromCache($key);
return (undef,undef);
}
}
# logger::loginfo("OK. Returning cache hit");
$dm::query_cache{$key}{'hits'}++;
$dm::query_cache_hits++;
$dm::query_time_saved += $dm::query_cache{$key}{'query_time'};
logger::loginfo(sprintf("CACHE SAVINGS: (%d hits X %.3f secs/miss = %.3f secs saved, OVERALL: %.1f secs", $dm::query_cache{$key}{'hits'}, $dm::query_cache{$key}{'query_time'}, $dm::query_cache{$key}{'hits'} * $dm::query_cache{$key}{'query_time'}, $dm::query_time_saved)) if ( $dm::log_cache_savings );
return (1, $dm::query_cache{$key}{'data'});
}
##############################################################################
sub StaleTableData( $ $ ) {
my ($table, $cached_time) = @_;
# No data for this table?
if ( ! exists($dm::table_status{$table}) ) {
logger::loginfo("No table status info for table [$table]...unable to cache dependencies for query cache invalidation process, so no query caching on this one...");
return 1;
}
my $now = toolbox::NowAsScalar();
my $status_age = $now - $dm::table_status{$table}{last_checked};
if ( $status_age > $dm::tolerable_cache_race_window ) {
logger::loginfo(sprintf("%s table status age (%.3f secs) violates race condition tolerance (%.3f secs) ... will reload latest status for %s table", $table, $status_age, $dm::tolerable_cache_race_window, $table));
loadTableChanges();
}
# Data grown stale for this table due to other updates?
if ( $dm::table_status{$table}{last_update} >= $cached_time ) {
logger::loginfo("Cached data from $table is stale...");
return 1;
}
return 0;
}
##############################################################################
sub validateCachedTable {
}
##############################################################################
sub loadTableChanges {
my ($table) = @_;
logger::loginfo("Loading table_status changes...");
my $sql = qq{
SELECT id, date_part('epoch', last_update) AS "last_update", tablename
FROM table_status
WHERE last_update >= ?
};
my $params = [ $dm::last_table_status_check_time || 1 ];
if ( $table ) {
$sql .= qq{ AND tablename = ?};
push( @$params, $table );
logger::loginfo("Reloading table status changes for $table");
}
my $now = toolbox::NowAsScalar();
my $data = dm::SelectData($sql, $params);
foreach ( @$data ) {
my $table = $_->{tablename};
$dm::table_status{$table}{last_update} = $_->{last_update};
$dm::table_status{$table}{id} = $_->{id};
$dm::table_status{$table}{last_checked} = $now;
}
logger::loginfo("Done loading table_status changes...");
}
#
# Make room for $size more bytes in the cache...
#
sub PruneCache ( $ ) {
my ($size) = @_;
logger::loginfo("DB query cache is full ($dm::query_cache_size/$dm::max_cache_size)...pruning LRU queries...");
#
# LRU cache...remove LRU items until size available...
#
my $i = 1;
my $reduction = (0.2 * $dm::max_cache_size) + $size;
my $targetsize = $dm::query_cache_size - $reduction;
my $sortf = sub {
if ( exists($dm::query_cache{$a}{'time'}) &&
exists($dm::query_cache{$b}{'time'}) ) {
$dm::query_cache{$a}{'time'} <=> $dm::query_cache{$b}{'time'}
} else {
# if ( ! exists($dm::query_cache{$a}{'time'}) ) {
# logger::logwarn("Undefined 'time' value in dm::query_cache{$a}");
# }
# if ( ! exists($dm::query_cache{$b}{'time'}) ) {
# logger::logwarn("Undefined 'time' value in dm::query_cache{$b}");
# }
return 0;
}
};
foreach my $q (sort $sortf keys %dm::query_cache) {
# logger::loginfo("Pruning ". $i++. ": ". $dm::query_cache{$q}{'time'}. ", ". $dm::query_cache{$q}{'size'}); # if ( $dm::debug );
DeleteFromCache($q);
#
# Have we made enough room yet?
#
if ( $dm::query_cache_size <= $targetsize ) {
last;
}
}
}
sub DeleteFromCache ( $ ) {
my ($key) = @_;
$dm::query_cache_size -= ($dm::query_cache{$key}{'size'} || 0);
return delete $dm::query_cache{$key};
}
# return list of dependent tables
sub getDependentTables {
my ($sql,$opts) = @_;
my $origsql = $sql;
$sql = join(/ /, $sql);
$sql =~ s/\n/ /g;
# bail if it has nested subqueries (too hard to parse)
my $selectcount = 0;
while ( $sql =~ /SELECT/gi ) {
$selectcount++;
}
if ( $selectcount > 1 ) {
logger::loginfo("Not able to parse subqueries yet (found $selectcount here)...bailing on caching this one: [$origsql]");
return undef;
}
# slice off the SELECT clause...
$sql =~ s/^\s*SELECT\s+\S+.*\s+FROM\s+(\S+.*$)/$1/g;
$sql =~ s/(.*)WHERE\s+.*$/$1/g;
$sql =~ s/(.*)GROUP BY\s+.*$/$1/g;
$sql =~ s/(.*)HAVING\s+.*$/$1/g;
$sql =~ s/(.*)ORDER BY\s+.*$/$1/g;
$sql =~ s/(.*)LIMIT\s+.*$/$1/g;
$sql =~ s/(.*)OFFSET\s+.*$/$1/g;
$sql =~ s/;//g;
# get rid of the aliases
while ( $sql =~ s/(\s*[A-Za-z0-9_]+)\s+[A-Za-z0-9_]+,/$1,/g ) {};
while ( $sql =~ s/(\s*[A-Za-z0-9_]+)\s+[A-Za-z0-9_]+\s*$/$1/g ) {};
# punt if we don't have something that looks like a clean list...
if ( $sql !~ /^\s*([A-Za-z0-9_]+)(\s*,\s+[A-Za-z0-9_]+)*\s*$/ ) {
logger::loginfo("Unable to parse dependent tables from query: $origsql ... parsed result looks like this: [$sql]");
return undef;
}
my @deptables = split(/,/, $sql);
foreach ( @deptables ) {
s/^\s+//g;
s/\s+$//g;
}
return \@deptables;
}
sub CacheResults {
my ($sql, $data, $sqlkey, $opts) = @_;
if ( ! $sql || ! $sqlkey ) {
return undef;
}
if ( $opts->{deptables} && ! $dm::use_dependent_table_strategy ) {
return undef;
}
my $key = getCacheKey($sqlkey);
my $t0 = toolbox::NowAsScalar();
my $i = 1;
my $size = 0;
logger::loginfo("Sizing ".scalar(@$data)." rows of data...") if ( $dm::log_cache_actions );
if ( $opts->{want_array_refs} ) {
foreach my $aref ( @$data ) {
foreach my $cref ( @$aref ) {
$size += (defined($cref) ? length($cref) : 0);
$size += length($cref);
}
}
} else {
foreach my $href ( @$data ) {
while ( my($k,$v) = each %$href ) {
$size += (defined($v) ? length($v) : 0);
$size += length($k);
}
}
}
$size += length($key);
if ( $size + $dm::query_cache_size > $dm::max_cache_size ) {
logger::loginfo("Cache too big...pruning $size bytes now ...");
PruneCache($size);
}
$dm::query_cache_size += $dm::query_cache{$key}{'size'} = $size;
$dm::query_cache{$key}{'data'} = $data;
$dm::query_cache{$key}{'query_time'} = $opts->{query_time};
$dm::query_cache{$key}{'time'} = toolbox::NowAsScalar();
if ( $dm::use_dependent_table_strategy &&
$opts->{deptables} && ref($opts->{deptables}) eq 'ARRAY' ) {
#
# Dependencies explicitly specified, so use them...
#
$dm::query_cache{$key}{deptables} = [ @{$opts->{deptables}} ];
logger::loginfo("Caching ".scalar(@{$dm::query_cache{$key}{deptables}})." deptables ... ") if ( $dm::log_cache_actions );
} elsif ( $dm::use_dependent_table_strategy &&
$dm::derive_unstated_table_dependencies ) {
#
# This is where we attempt to determine the set of tables on which
# this query depends. This works great in simple cases, but needs
# refinement!
#
$dm::query_cache{$key}{deptables} = getDependentTables($sql);
if ( ! $dm::query_cache{$key}{deptables} ) {
logger::loginfo("getDependentTables() failed for query: $sql") if ( $dm::log_cache_actions );
delete $dm::query_cache{$key};
return undef;
}
logger::loginfo("Dependent tables: ",@{$dm::query_cache{$key}{deptables}}) if ( $dm::log_cache_actions );
}
if ( $opts->{cache_lifespan} ) {
$dm::query_cache{$key}{'expires'} =
$dm::query_cache{$key}{'time'} + $opts->{'cache_lifespan'};
}
if ( $opts->{cutoff_txid} ) {
$dm::query_cache{$key}{'cutoff_txid'} = $opts->{'cutoff_txid'};
}
$sqlkey =~ s/\n/ /g;
$sqlkey =~ s/(\S)\s+(\S)/$1 $2/g;
$sqlkey =~ s/^\s+(\S)/$1/g;
$sqlkey =~ s/(\S)\s+$/$1/g;
$sqlkey = substr($sqlkey, 0, 25);
logger::loginfo("Caching $dm::query_cache{$key}{'size'}B ".($dm::query_cache{$key}{'expires'} ? "until $dm::query_cache{$key}{'expires'}" : "")." ".($dm::query_cache{$key}{'cutoff_txid'} ? "for txid $dm::query_cache{$key}{'cutoff_txid'}" : "")." under key $key for $sqlkey (cache size: $dm::query_cache_size bytes...)\n") if ( $dm::log_cache_actions ); # if ( $dm::debug );
my $et = toolbox::NowAsScalar() - $t0;
logger::loginfo("dm::CacheResults() took ".sprintf("%.3f secs", ($et))." to cache [$sqlkey]") if ( $dm::log_cache_actions );
$dm::query_cache{$key}{'caching_time'} = $et;
$dm::query_time_saved -= $et;
return 1;
}
#
# Return 1 if parameter mismatch, 0 otherwise...
#
sub ParameterMismatch( $ $ ) {
my ($sql, $params) = @_;
if ( defined($params) && ref($params) ne 'ARRAY' ) {
logger::logerr("Bind params were not passed as aref");
return 1;
}
my $pcount = ($sql =~ tr/?/?/);
my $bcount = (defined($params) ? scalar(@$params) : 0);
if ( $pcount != $bcount ) {
logger::logerr("Number of parameter placeholders ($pcount) does not match the number of input bind params ($bcount) for sql: [$sql], params: [".((defined($params) && (ref($params) eq 'ARRAY')) ? join($dm::cfg{paramsep},@$params) : "")."]");
return 1;
}
return 0;
}
#
# Select a single scalar value from a single record in the DB. If the
# query returns anything else, it is an error and the result is undef.
#
sub SelectDataValue {
my ($sql,$params,$opts) = @_;
my $data = SelectData($sql, $params, $opts);
if ( ! defined($data) ) {
logger::logerr("Query passed to SelectDataValue() returned undef. SQL: [$sql], PARAMS: [".join($dm::cfg{paramsep},@$params)."]");
return undef;
}
if ( ref($data) ne 'ARRAY' ) {
logger::logerr("Query passed to SelectDataValue() returned non-ARRAY ref. SQL: [$sql], PARAMS: [".join($dm::cfg{paramsep},@$params)."]");
return undef;
}
my $count = scalar(@$data);
if ( $count != 1 ) {
logger::logerr("Query passed to SelectDataValue() returned $count records when 1 was expected. SQL: [$sql], PARAMS: [".join($dm::cfg{paramsep},@$params)."]");
return undef;
}
my @keys = keys %{$data->[0]};
$count = scalar(@keys);
if ( $count != 1 ) {
logger::logerr("Query passed to SelectDataValue() returned $count columns when 1 was expected. SQL: [$sql], PARAMS: [".join($dm::cfg{paramsep},@$params)."]");
return undef;
}
return $data->[0]->{$keys[0]};
}
sub SelectData {
$dm::reconnect_tries = 0;
$dm::vacuum_tries = 0;
my $data;
eval { $data = _SelectData(@_); };
if ( $@ ) {
logger::logerr($@);
return undef;
}
return $data;
}
sub Reconnect {
my ($opts) = @_;
my $dbhname = $opts->{dbhname} || $dm::cfg{dbhname};
if ( $dm::reconnect_tries++ < $dm::reconnect_limit ) {
logger::loginfo("Trying to reconnect to DB (attempt #$dm::reconnect_tries)...");
eval { CloseDB($opts) };
$dm::dbhCache{$dbhname} = GetDBH($opts);
if ( ! $dm::dbhCache{$dbhname} ) {
logger::logerr("GetDBH() FAILED...");
} else {
logger::loginfo("Hey, looks like reconnect worked...cleaning up by force just in case...");
dm::Cleanup(undef,1);
}
return $dm::dbhCache{$dbhname};
}
logger::logerr("Giving up on attempted DB reconnection after $dm::reconnect_tries tries...");
return undef;
}
sub getCacheKey ( $ ) {
my ($longkey) = @_;
return $longkey;
}
sub CachingIsAppropriate( $ ) {
my ($opts) = @_;
if ( $dm::query_cache_enabled &&
(($opts && $opts->{cache}) || $dm::cache_everything) ) {
return 1;
}
return 0;
}
sub _SelectData {
my ($sql,$params,$opts) = @_;
if ( ! defined($sql) ) {
logger::logerr("Undefined SQL query string; ABORTING QUERY.");
return undef;
}
if ( ref($sql) ) {
logger::logerr("Non-scalar SQL query string [$sql]; ABORTING QUERY.");
return undef;
}
my $t0 = toolbox::TimeNow();
SetDBGSettings($opts);
my @data = ();
# FIXME: FOR DEBUGGING ONLY, THIS WILL KILL PERFORMANCE IN PRODUCTION
if ( 0 ) {
logger::loginfo("SQL: $sql");
foreach my $p ( @$params ) {
logger::loginfo("PARAM: $p");
}
}
my $pstr = (defined($params) && ref($params) eq 'ARRAY' ? join($dm::cfg{paramsep}, @$params) : "");
if ( ParameterMismatch($sql,$params) ) {
logger::logerr("Parameter mismatch for sql: [$sql] and bind params [$pstr]");
return undef;
}
#
# See if we have this one sitting in the cache...
#
my $key = $sql . $pstr;
my ($cache_hit,$cached_data);
#
# Maybe we could use a tied IPC::Sharable implementation for sharing a
# value that indicates when child client processes need to clear their
# caches? Needs more thought...
#
if ( CachingIsAppropriate( $opts ) ) {
$key =~ s/\n//g;
$key =~ s/\s//g;
my $cachekey = getCacheKey($key);
($cache_hit,$cached_data) =
QueryResultIsCached($sql, $cachekey, $opts);
if ( $cache_hit ) {
# logger::loginfo("DB CACHE HIT ...\n") if ( $dm::show_query_cache_hitmiss || $dm::profile );
$dm::query_profile{$sql}{'hits'}++;
return $cached_data;
} else {
# logger::loginfo("DB CACHE MISS ...\n") if ( $dm::show_query_cache_hitmiss || $dm::profile );
}
}
logger::loginfo("SQL: [" . $sql . "], INPUT PARAMS: [" . $pstr . "]") if ( $dm::debug );
my $dbh = GetDBH($opts);
if (! $dbh) {
logger::logerr("DBI::connect() FAILED for SQL [$sql], params [$pstr], with DBI::err [". $DBI::err . "], DBI::state [". $DBI::state . "], DBI::errstr [" . $DBI::errstr . "] and params ['dbi:$$dm::cfg{dbkind}:dbname=$$dm::cfg{dbhname}', '$$dm::cfg{dbuser}', '<passwd>']\n");
return undef;
}
if ( $dm::explain_dm ) {
logger::loginfo("EXPLAIN ON");
eval {
$dbh->do("EXPLAIN VERBOSE $sql");
}
}
my $sth = $dbh->prepare( $sql );
if ( ! $sth ) {
logger::logerr("DBI::prepare() FAILED for SQL [$sql], params [$pstr], with DBI::err [". $DBI::err . "], DBI::state [". $DBI::state . "], DBI::errstr [" . $DBI::errstr . "]\n");
return undef;
}
if ( defined($params) ) {
my $reftype = ref $params;
if ( $reftype eq 'ARRAY' ) {
for ( my $i = 1; $i <= scalar(@$params); $i++ ) {
my $bind_value = $params->[$i-1];
logger::loginfo("Binding param #$i [$bind_value]...") if ( $dm::debug );
if ( ! defined($sth->bind_param($i, $bind_value) ) ) {
logger::logerr("DBI::bind_param($i, $bind_value) FAILED for SQL [$sql], params [$pstr], with DBI::err [" . $sth->err . "], DBI::state [" . $sth->state . "], DBI::errstr [".$sth->errstr."]\n");
return undef;
} else {
logger::loginfo("Successful bind of param #$i...") if ( $dm::debug );
}
}
} else {
logger::logerr("BAD REF TYPE $reftype\n");
return undef;
}
}
if ( ! $sth->execute() ) {
my $err = $sth->err();
my $errstr = $sth->errstr();
my $state = $sth->state();
logger::logerr("DBI::execute() FAILED in dm::_SelectData() with DBI::err [$err], DBI::state [$state], DBI::errstr [$errstr] for SQL [$sql], params [$pstr]\n");
addDBErrStr($errstr);
if ( $err == -1 && dm::Reconnect() ) {
logger::loginfo("Reconnect successful...trying query again ...");
return dm::_SelectData($sql,$params);
#
# For this particular error, mailing list discussions
# and experience suggest that vacuuming will fix it...
#
} elsif ( $err == 7 && ($errstr =~ /$dm::PGIDX_SCAN_ERR/) ) {
logger::logerr("THAT PESKY ExecInitIndexScan BUG HAS REARED ITS HEAD AGAIN...TRYING TO VACUUM...");
if ( dm::Vacuum($dbh) ) {
logger::loginfo("Vacuuming successful...retrying query on the fly...");
return dm::_SelectData($sql,$params);
} else {
logger::logerr("No luck, vacuum failed.");
}
};
logger::loginfo("DBI::execute() AND all on-the-fly recovery attempts FAILED for SQL [$sql], params [$pstr]\n");
return undef;
}
if ( $opts->{want_array_refs} ) {
while ( my $aref = $sth->fetchrow_arrayref) {
push( @data, [ @$aref ] );
}
# push columns names on as first row...
my $hdraref;
foreach my $name ( @{$sth->{NAME}} ) {
push( @$hdraref, $name );
}
unshift( @data, [ @$hdraref ] );
} else {
while ( my $href = $sth->fetchrow_hashref) {
push( @data, { %$href } );
}
}
my $et = toolbox::ElapsedTime($t0);
if ( ! $sth->finish() ) {
@data = ();
logger::logerr("DBI::finish() FAILED for SQL [$sql], params [$pstr], with DBI::err [" . $sth->err . "], DBI::state [" . $sth->state . "], DBI::errstr [" . $sth->errstr . "]\n");
setDBErrStr($sth->errstr);
return undef;
}
if ( $dm::debug == 1 && $dm::showdata ) {
printf STDERR "#############################################\n";
printf STDERR "RESULTS RETURNED BY SQL: [" . $sql . "], INPUT PARAMS: [" . $pstr . "]:\n";
my $i = 0;
if ( $opts->{want_array_refs} ) {
foreach my $aref ( @data ) {
printf STDERR "Row[".$i++."]";
foreach my $cref ( @$aref ) {
printf STDERR ", $cref";
}
printf STDERR "\n";
}
} else {
foreach my $href ( @data ) {
printf STDERR "Row[".$i++."]";
while ( my($k,$v) = each %$href ) {
my $key = (defined($k) ? $k : "undef");
my $val = (defined($v) ? $v : "undef");
printf STDERR ", '$key' = [$val]";
}
printf STDERR "\n";
}
}
printf STDERR "######## END OF RESULTS ####################\n";
}
if ( CachingIsAppropriate( $opts ) &&
( ! $cache_hit || $opts->{'refresh_cache'} ) ) {
$opts->{query_time} = $et;
if ( ! CacheResults ( $sql, \@data, $key, $opts ) ) {
logger::loginfo("Unable to cache query results for $sql") if ( $dm::log_cache_actions );
}
}
if ( $dm::profile ) {
print STDERR sprintf("DBI query: %.3f secs for SQL [%s]\n", $et, $sql);
}
IncQueryCount($sql,$et);
return \@data;
}
sub InsertData { return NonSelectQuery(@_); }
sub ModifyData { return NonSelectQuery(@_); }
sub DeleteData { return NonSelectQuery(@_); }
sub NonSelectQuery {
if ( systemDown() ) {
return undef;
}
$dm::reconnect_tries = 0;
my $data;
eval { $data = _NonSelectQuery(@_); };
if ( $@ ) {
logger::logerr($@);
return undef;
}
return $data;
}
sub _NonSelectQuery {
my $t0 = toolbox::TimeNow();
my ($sql,$params,$opts) = @_;
my $data = ();
SetDBGSettings($opts);
if ( ! defined($sql) || ref($sql) ) {
logger::logerr("Undefined or non-scalar SQL string passed in");
Carp::cluck("Undefined or non-scalar SQL string passed in\n");
return undef;
}
if ( defined($params) && (ref($params) ne 'ARRAY') ) {
logger::logerr("Non-ARRAY ref passed as parameter list for SQL [$sql]");
return undef;
}
if ( ParameterMismatch($sql,$params) ) {
logger::logerr("Parameter mismatch for sql: [$sql] and bind params [".join($dm::cfg{paramsep},@$params)."]");
return undef;
}
logger::logquery($sql,$params);
# my $pstr = (defined($params) ? join($dm::cfg{paramsep}, @$params) : '');
my $pstr = (defined($params) ? join(' | ', @$params) : '');
logger::loginfo("SQL: [" . $sql . "], INPUT PARAMS: [" . $pstr . "]") if ( $dm::debug );
my $dbh = GetDBH($opts);
if (! $dbh) {
logger::logerr("dm::GetDBH() FAILED for SQL [$sql], params [$pstr], with DBI::err [". $DBI::err . "], DBI::state [". $DBI::state . "], DBI::errstr [" . $DBI::errstr . "] and params ['dbi:$dm::cfg{dbkind}:dbname=$dm::cfg{dbname}', '$dm::cfg{dbuser}', '<passwd>']\n");
return undef;
}
if ( $opts->{'AUTOCOMMIT'} ) {
#
# Caller wants to reset AUTOCOMMIT in driver...
#
if ( $opts->{'AUTOCOMMIT'} == 0 ||
$opts->{'AUTOCOMMIT'} == 1 ) {
$dbh->{'AutoCommit'} = $opts->{'AUTOCOMMIT'};
} else {
logger::logerr("Illegal 'AUTOCOMMIT' value: [".
$opts->{'AUTOCOMMIT'} .
"] ... aborting query [$sql], params [$pstr]");
return undef;
}
}
my $sth = $dbh->prepare( $sql );
if ( ! $sth ) {
logger::logerr("DBI::prepare() FAILED for SQL [$sql], params [$pstr], with DBI::err [". $DBI::err . "], DBI::state [". $DBI::state . "], DBI::errstr [" . $DBI::errstr . "]\n");
return undef;
}
if ( defined($params) ) {
my $reftype = ref $params;
if ( $reftype eq 'ARRAY' ) {
for ( my $i = 1; $i <= scalar(@$params); $i++ ) {
logger::loginfo("Binding param #$i...") if ( $dm::debug );
my $bind_value = $params->[$i-1];
if ( ! defined($sth->bind_param($i, $bind_value) ) ) {
logger::logerr("DBI::bind_param($i, $bind_value) FAILED for SQL [$sql], params [$pstr], with DBI::err [". $DBI::err . "], DBI::state [". $DBI::state . "], DBI::errstr [".$DBI::errstr."]\n");
return undef;
} else {
logger::loginfo("Successful bind of param #$i...") if ( $dm::debug );
}
}
} else {
logger::logerr("BAD REF TYPE $reftype\n");
return undef;
}
}
my $rc;
$dm::pending_queries++;
if ( ! defined($rc = $sth->execute()) ) {
logger::logerr("DBI::execute() FAILED for SQL [$sql], params [$pstr], with DBI::err [". $DBI::err . "], DBI::state [". $DBI::state . "], DBI::errstr [" . $DBI::errstr . "]\n");
addDBErrStr($DBI::errstr);
return undef;
}
if ( $opts->{'SERIAL'} && $dm::dbtype == dm::PGSQL ) {
#
# Retrieve postgresql serial ID in the customary PG manner...
#
if ( $opts->{'TABLE'} ) {
my $ser_sql = qq{
SELECT id FROM $opts->{'TABLE'} WHERE oid = ?;
};
$rc = dm::SelectDataValue($ser_sql, [ $sth->{pg_oid_status} ]);
} else {
$rc = undef;
}
}
if ( ! $sth->finish() ) {
$data = ();
logger::logerr("DBI::finish() FAILED for SQL [$sql], params [$pstr], with DBI::err [" . $sth->err . "], DBI::state [" . $sth->state . "], DBI::errstr [" . $DBI::errstr . "]\n");
return undef;
}
my $et = toolbox::ElapsedTime($t0);
if ( $dm::profile ) {
print STDERR sprintf("DBI query: %.3f secs for SQL [%s]\n", $et, $sql);
}
IncQueryCount($sql,$et);
my $retval = undef;
$retval = (defined($rc) && ($rc == '0E0') ? 0 : $rc);
if ( ! ref($retval) ) {
if ( $opts->{'SERIAL'} && $dm::dbtype == dm::PGSQL ) {
logger::loginfo("DB returned serial ID [$retval]") if $dm::debug;
} else {
logger::loginfo("DB effected $retval record(s)");
}
}
return ($retval);
}
sub SetOpts {
my ($opts) = @_;
my $dbh = GetDBH($opts);
if (! $dbh) {
logger::logerr("dm::GetDBH() FAILED to get db handle ... are we connected? DBI::err [". $DBI::err . "], DBI::state [". $DBI::state . "], DBI::errstr [" . $DBI::errstr . "]");
return undef;
}
if ( exists($opts->{'AUTOCOMMIT'}) ) {
#
# Caller wants to reset AUTOCOMMIT in driver...
#
if ( $opts->{'AUTOCOMMIT'} == 0 ||
$opts->{'AUTOCOMMIT'} == 1 ) {
$dbh->{'AutoCommit'} = $opts->{'AUTOCOMMIT'};
logger::loginfo("Setting AutoCommit to ".
$opts->{'AUTOCOMMIT'} ) if ( $dm::debug );
} else {
logger::logerr("Illegal 'AUTOCOMMIT' value: [".
$opts->{'AUTOCOMMIT'} .
"] ... attempt to set options failed");
return undef;
}
}
return 1;
}
sub Cleanup {
my ($txid,$force) = @_;
return; # vacuuming is not safe while inserts/deletes are occurring...
#
# Only cleanup after $dm::cleanup_interval transactions...
#
# Tx IDs are sync'ed across all child apache processes...
#
print STDERR "txid = $txid, cleanup_interval = $dm::cleanup_interval\n";
if ( ! $force && (! defined($txid) || ($txid % $dm::cleanup_interval != 0)) ) {
return;
}
my $data;
#
# Obvious race condition here, but unlikely we'll hit it as it is
# a small race window...
#
eval { $data = _Cleanup(@_); };
if ( $@ ) {
logger::logerr($@);
return undef;
}
return $data;
}
sub Vacuum ($) {
if ( systemDown() ) {
return undef;
}
my ($dbh) = shift;
if ( $dm::vacuum_tries++ >= $dm::vacuum_limit ) {
logger::logwarn("Giving up on attempted DB vacuum after $dm::vacuum_tries tries...");
return undef;
}
my $rv;
logger::loginfo("Attempting VACUUM...");
eval { $rv = $dbh->do("VACUUM ANALYZE") };
if ( $@ ) {
logger::logerr($@);
logger::logerr("DBI::do('VACUUM') FAILED (eval block failed) with DBI::err [". $DBI::err . "], DBI::state [". $DBI::state . "], DBI::errstr [" . $DBI::errstr . "]");
return undef;
} elsif ( ! defined($rv) ) {
logger::logerr("DBI::do('VACUUM') FAILED (DBI returned undef) with DBI::err [". $DBI::err . "], DBI::state [". $DBI::state . "], DBI::errstr [" . $DBI::errstr . "]");
return undef;
} else {
logger::loginfo("Vacuum appears possibly successful with DBI::do() return value of [$rv] ...");
}
return 1;
}
sub _Cleanup {
my ($opts) = @_;
my $dbh = GetDBH($opts);
if (! $dbh) {
logger::logerr("FAILED to get db handle ... are we connected? DBI::err [". $DBI::err . "], DBI::state [". $DBI::state . "], DBI::errstr [" . $DBI::errstr . "]");
return undef;
}
if ( ! dm::Vacuum($dbh) ) {
logger::loginfo("Attempt to vacuum database was unsuccessful (that's ok if db was locked for vacuuming by another process)...");
}
}
sub Commit {
my ($opts) = @_;
my $force = $opts->{force};
if ( systemDown() ) {
logger::loginfo("DB system down (".getDownFileName()." exists) ... aborting commit");
return undef;
}
if ( ! $dm::pending_queries && ! $force ) {
# logger::loginfo("No pending queries; commit ignored");
return 1;
}
my $data;
eval { $data = _Commit($opts); };
if ( $@ ) {
logger::logerr($@);
return undef;
}
return $data;
}
sub _Commit {
my ($opts) = @_;
my $dbhname = $opts->{dbhname} || $dm::cfg{dbhname};
my $dbh = GetDBH($opts);
if (! $dbh) {
logger::logerr("FAILED to get db handle for $dbhname ... are we connected? DBI::err [". $DBI::err . "], DBI::state [". $DBI::state . "], DBI::errstr [" . $DBI::errstr . "]");
return undef;
}
logger::logquery("COMMIT -- ($dm::pending_queries pending queries for $dbhname)");
my $rc = $dbh->commit();
if ( ! $rc ) {
logger::logerr($dbh->errstr);
} else {
$dm::pending_queries = 0;
}
return $rc;
}
sub Rollback {
my ($opts) = @_;
my $force = $opts->{force};
if ( systemDown() ) {
logger::loginfo("DB system down (".getDownFileName()." exists) ... aborting rollback");
return undef;
}
#
# Always rollback, because some failed queries still need it...
#
if ( (! $force) && (! $dm::pending_queries) ) {
logger::loginfo("No pending queries; rollback ignored");
return 1;
}
my $data;
eval { $data = _Rollback($opts); };
if ( $@ ) {
logger::logerr($@);
return undef;
}
return $data;
}
sub _Rollback {
my ($opts) = @_;
my $dbhname = $opts->{dbhname} || $dm::cfg{dbhname};
my $dbh = GetDBH($opts);
if (! $dbh) {
logger::logerr("FAILED to get db handle for $dbhname ... are we connected? DBI::err [". $DBI::err . "], DBI::state [". $DBI::state . "], DBI::errstr [" . $DBI::errstr . "]");
return undef;
}
logger::loginfo("Rolling back $dm::pending_queries pending queries for $dbhname");
logger::logquery("ROLLBACK -- $dbhname");
my $rc = $dbh->rollback();
if ( ! $rc ) {
logger::logerr($dbh->errstr);
} else {
$dm::pending_queries = 0;
}
return $rc;
}
END {
for my $dbhname ( keys %dm::dbhCache ) {
&dm::CloseDB( { dbhname => $dbhname } );
}
}
1;