You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1352 lines
42 KiB
1352 lines
42 KiB
#
|
|
# $Id: dm.pm,v 1.1.1.1 2004/01/09 09:22:07 jeffo Exp $
|
|
#
|
|
# $Source: /usr/local/cvsroot/Testmanager/cgi-bin/dm.pm,v $
|
|
#
|
|
# Routines for manipulating (ie., querying, updating) a data model.
|
|
#
|
|
#############################################################################
|
|
# #
|
|
# COPYRIGHT NOTICE #
|
|
# #
|
|
# This code was written by Ed Loehr, all rights reserved, 1999. You are #
|
|
# hereby granted a non-exclusive license to do whatever you want with #
|
|
# this code, including modification and redistribution, so long as you #
|
|
# retain this copyright notice in the code and do not attempt to prevent #
|
|
# anyone else from the same privileges. #
|
|
# #
|
|
# ALL SOFTWARE HAS BUGS. THIS CODE COMES "AS-IS" WITH ABSOLUTELY NO #
|
|
# WARRANTY OR GUARANTEE OF ITS FITNESS FOR ANY PURPOSE WHATSOEVER. #
|
|
# BY INCORPORATING THIS CODE, YOU ASSUME COMPLETE RISK FOR ANY #
|
|
# CONSEQUENCES AND AGREE NOT TO BRING ANY GRIEVANCE OR LIABILITTY #
|
|
# CLAIMS WHATSOVER AGAINST Ed Loehr. #
|
|
# #
|
|
#############################################################################
|
|
|
|
package dm;
|
|
|
|
use DBI;
|
|
use logger;
|
|
use toolbox;
|
|
|
|
@dm::initOptions = qw( dbkind dbhost dbname dbport dbuser dbpasswd dbhname );
|
|
BEGIN {
|
|
$dm::default_dbhname = 'tmdbh';
|
|
$dm::dbDefault{paramsep} = ' | ';
|
|
$dm::dbDefault{dbkind} = "Pg";
|
|
$dm::dbDefault{dbhost} = $ENV{PGHOST} || "localhost";
|
|
$dm::dbDefault{dbname} = $ENV{PGDATABASE} || `whoami`;
|
|
chomp $dm::dbDefault{dbname};
|
|
$dm::dbDefault{dbport} = $ENV{PGPORT} || 5432;
|
|
$dm::dbDefault{dbuser} = $ENV{PGUSER} || `whoami`;
|
|
chomp $dm::dbDefault{dbuser};
|
|
$dm::dbDefault{dbpasswd} = undef;
|
|
$dm::dbDefault{dbhname} = $dm::default_dbhname;
|
|
}
|
|
|
|
$dm::app_abbreviation = 'tm';
|
|
$dm::run_dir = "/tmp/".$dm::app_abbreviation."/run";
|
|
$dm::down_file = "$dm::run_dir/dbdown_$dm::app_abbreviation";
|
|
$dm::touchfile_dir = $dm::run_dir;
|
|
$dm::log_touchfile = "$dm::touchfile_dir/log_$dm::app_abbreviation";
|
|
$dm::debug_touchfile = "$dm::touchfile_dir/debug_$dm::app_abbreviation";
|
|
$dm::profile_touchfile = "$dm::touchfile_dir/profile_$dm::app_abbreviation";
|
|
$dm::explain_touchfile = "$dm::touchfile_dir/explain_$dm::app_abbreviation";
|
|
$dm::showdata_touchfile = "$dm::touchfile_dir/showdata_$dm::app_abbreviation";
|
|
|
|
$dm::cleanup_interval = 100; # cleanup (vacuum) after this many transactions...
|
|
$dm::query_count = 0;
|
|
$dm::reconnect_tries = 0;
|
|
$dm::reconnect_limit = 2;
|
|
$dm::vacuum_tries = 0;
|
|
$dm::vacuum_limit = 1;
|
|
|
|
#############################################################################
|
|
## Query Cache NOTES... ############################################
|
|
#############################################################################
|
|
# This is a poor-man's query cache. We k=
|
|
#############################################################################
|
|
## Query Cache Settings... ############################################
|
|
#############################################################################
|
|
# FIXME: Turning the query cache off for now because the race condition
|
|
# FIXME: is too nasty in the problems it leads to. I think we really must
|
|
# FIXME: push any query caching of dynamic data down into the DB.
|
|
$dm::query_cache_enabled = 1;
|
|
$dm::log_cache_actions = 0;
|
|
$dm::log_cache_savings = 0;
|
|
$dm::cache_everything = 0;
|
|
# I dropped the supporting triggers for this table dependency
|
|
# functionality. They must be present for this to work!!! -efl, May2001
|
|
$dm::use_dependent_table_strategy = 0;
|
|
$dm::derive_unstated_table_dependencies = 0;
|
|
%dm::query_cache = ();
|
|
$dm::query_cache_size = 0;
|
|
$dm::query_cache_hits = 0;
|
|
$dm::query_cache_misses = 0;
|
|
$dm::show_query_cache_hitmiss = 1;
|
|
$dm::max_cache_size = 2000000;
|
|
$dm::last_table_status_check_time = 1;
|
|
%dm::table_status = (); # storage of last update time for tables ...
|
|
$dm::tolerable_cache_race_window = 0.51; # seconds
|
|
#############################################################################
|
|
|
|
$dm::pending_queries = 0;
|
|
$dm::dberrstr = undef; # For forwarding to client upon request
|
|
|
|
#
|
|
# Postgres doesn't return error codes, only msgs, so we have to handle them...
|
|
#
|
|
$dm::PGIDX_SCAN_ERR = "ExecInitIndexScan";
|
|
$dm::PGIDX_SCAN_ERR_LEN = length($dm::PGIDX_SCAN_ERR);
|
|
|
|
#
|
|
use constant PGSQL => 1;
|
|
use constant ORACLE => 2;
|
|
use constant SQLFLEX => 3;
|
|
use constant SYBASE => 4;
|
|
|
|
$dm::dbtype = dm::PGSQL;
|
|
|
|
$dm::time_to_die = 0;
|
|
sub TimeToDie {
|
|
return $dm::time_to_die;
|
|
}
|
|
|
|
sub SetTimeToDie {
|
|
$dm::time_to_die = 1;
|
|
}
|
|
|
|
sub SetCacheRaceTolerance {
|
|
my ($tol) = shift;
|
|
$dm::tolerable_cache_race_window = (defined($tol) ? $tol : 0.26);
|
|
return $tol;
|
|
}
|
|
|
|
|
|
sub getDownFileName {
|
|
return $dm::down_file;
|
|
}
|
|
|
|
sub systemDown {
|
|
if ( ! -f getDownFileName() ) {
|
|
return 0;
|
|
}
|
|
|
|
return 1;
|
|
}
|
|
|
|
sub getDBErrStr {
|
|
return $dm::dberrstr;
|
|
}
|
|
|
|
sub getRawDBErrStr {
|
|
return $dm::rawdberrstr;
|
|
}
|
|
|
|
sub setRawDBErrStr {
|
|
my ($str) = @_;
|
|
if ( defined($str) && length($str) ) {
|
|
$dm::rawdberrstr = $str;
|
|
} else {
|
|
$dm::rawdberrstr = undef;
|
|
}
|
|
}
|
|
|
|
sub setDBErrStr {
|
|
my ($str) = @_;
|
|
if ( defined($str) && length($str) ) {
|
|
setRawDBErrStr($str);
|
|
$dm::dberrstr = logger::timestamp()." $str";
|
|
} else {
|
|
$dm::dberrstr = undef;
|
|
}
|
|
}
|
|
|
|
sub addRawDBErrStr {
|
|
my ($str) = @_;
|
|
if ( defined($str) && length($str) ) {
|
|
$dm::rawdberrstr .= $str;
|
|
}
|
|
}
|
|
|
|
sub addDBErrStr {
|
|
my ($str) = @_;
|
|
if ( defined($str) && length($str) ) {
|
|
addRawDBErrStr($str);
|
|
$dm::dberrstr = logger::timestamp()." $str";
|
|
}
|
|
}
|
|
|
|
#
|
|
# I wanted a really easy way to flip on debugging during development,
|
|
# so I made this rather crude file system switch that allows me to
|
|
# turn on debugging output without restarting any systems. This may
|
|
# not be too desirable in a hi-perf situation, but it's worked well
|
|
# so far for my development purposes.
|
|
#
|
|
sub SetDBGSettings {
|
|
my ($opts) = @_;
|
|
if ( -f $dm::log_touchfile || $opts->{log} ) {
|
|
$dm::log = 1;
|
|
} else {
|
|
$dm::log = 0;
|
|
}
|
|
if ( -f $dm::debug_touchfile || $opts->{debug} ) {
|
|
$dm::debug = 1;
|
|
} else {
|
|
$dm::debug = 0;
|
|
}
|
|
if ( -f $dm::showdata_touchfile || $opts->{showdata} ) {
|
|
$dm::showdata = 1;
|
|
} else {
|
|
$dm::showdata = 0;
|
|
}
|
|
if ( -f $dm::profile_touchfile || $opts->{profile} ) {
|
|
$dm::profile = 1;
|
|
} else {
|
|
$dm::profile = 0;
|
|
}
|
|
if ( -f $dm::explain_touchfile || $opts->{explain} ) {
|
|
$dm::explain = 1;
|
|
} else {
|
|
$dm::explain = 0;
|
|
}
|
|
}
|
|
|
|
sub GetDBH {
|
|
my ($opts) = @_;
|
|
|
|
my $dbh;
|
|
eval { $dbh = _GetDBH($opts); };
|
|
if ( $@ ) {
|
|
logger::logerr($@);
|
|
return undef;
|
|
}
|
|
|
|
return $dbh;
|
|
}
|
|
|
|
|
|
sub _GetDBH {
|
|
my ($opts) = @_;
|
|
|
|
|
|
foreach my $o ( @initOptions ) {
|
|
$dm::cfg{$o} = ($opts->{$o} ||= $dm::dbDefault{$o});
|
|
$opts->{$o} = $dm::cfg{$o} if ( $opts->{loadme} );
|
|
logger::loginfo("dm::cfg{$o} = '$dm::cfg{$o}'") if ( $dm::debug );
|
|
}
|
|
|
|
my $options = "";
|
|
my $dsn = "dbi:$dm::cfg{dbkind}:dbname=$dm::cfg{dbname};host=$dm::cfg{dbhost};port=$dm::cfg{dbport}";
|
|
$dm::cfg{dbopts} = {
|
|
ChopBlanks => defined($dm::cfg{ChopBlanks}) ? $dm::cfg{ChopBlanks} : 1,
|
|
AutoCommit => defined($dm::cfg{AutoCommit}) ? $dm::cfg{AutoCommit} : 0,
|
|
};
|
|
|
|
logger::loginfo("Retrieving DBH for '$dm::cfg{dbhname}' (default '$dm::dbDefault{dbhname}')...") if ( $dm::debug );
|
|
# $dm::dbhCache; # hash of various db handles...
|
|
if ( ! defined($dm::dbhCache{$dm::cfg{dbhname}}) ) {
|
|
logger::loginfo("Connecting to $dsn as user $dm::cfg{dbuser}") if ( $dm::log );
|
|
|
|
my $t0 = toolbox::TimeNow();
|
|
logger::loginfo("DBI->connect($dsn, $dm::cfg{dbuser}, $dm::cfg{dbpasswd})") if ( $dm::log );
|
|
$dm::dbhCache{$dm::cfg{dbhname}} = DBI->connect($dsn, $dm::cfg{dbuser}, $dm::cfg{dbpasswd}, $dm::cfg{dbopts});
|
|
if ( ! $dm::dbhCache{$dm::cfg{dbhname}} ) {
|
|
logger::logerr("DBI->connect($dsn, $dm::cfg{dbuser}, $dm::cfg{dbpasswd}, { ".join($dm::cfg{paramsep}, $dm::cfg{dbopts})." }) FAILED; DBI::errstr = [$DBI::errstr]");
|
|
} elsif ($dm::profile) {
|
|
logger::loginfo("Successfully connected to database $dsn");
|
|
logger::loginfo(toolbox::ElapsedTimeString("DBI connect: %.3f secs\n",$t0));
|
|
}
|
|
}
|
|
|
|
|
|
return $dm::dbhCache{$dm::cfg{dbhname}};
|
|
}
|
|
|
|
sub CloseDB {
|
|
my ($opts) = @_;
|
|
my $dbhname = $opts->{dbhname} || $dm::cfg{dbhname};
|
|
if ( defined($dm::dbhCache{$dbhname}) ) {
|
|
logger::loginfo("Nuking db handle for $dbhname ...") if ( $dm::log );
|
|
$dm::dbhCache{$dbhname}->disconnect;
|
|
$dm::dbhCache{$dbhname} = undef;
|
|
}
|
|
}
|
|
|
|
SetDBGSettings;
|
|
|
|
sub QueryCount( $ ) {
|
|
my $reset = shift;
|
|
my $count = $dm::query_count;
|
|
if ( $reset ) {
|
|
$dm::query_count = 0;
|
|
%dm::query_profile = ();
|
|
}
|
|
|
|
return $count;
|
|
}
|
|
|
|
|
|
sub IncQueryCount {
|
|
my ($sql,$et) = @_;
|
|
$dm::query_profile{$sql}{'count'}++;
|
|
$dm::query_profile{$sql}{'et'} += $et;
|
|
return ( ++$dm::query_count );
|
|
}
|
|
|
|
|
|
|
|
|
|
sub DumpQueryProfile {
|
|
#
|
|
# Sort first by total time spent in the query, then by avg time
|
|
# per query...
|
|
#
|
|
my $sort_func = sub {
|
|
( $dm::query_profile{$a}{'et'} <=>
|
|
$dm::query_profile{$b}{'et'} ) ||
|
|
# ( ($dm::query_profile{$a}{'et'} /
|
|
# $dm::query_profile{$a}{'count'}) <=>
|
|
# ($dm::query_profile{$b}{'et'} /
|
|
# $dm::query_profile{$b}{'count'}) ) ||
|
|
($dm::query_profile{$a}{'count'} <=>
|
|
$dm::query_profile{$b}{'count'})
|
|
};
|
|
|
|
foreach my $q ( sort $sort_func keys %dm::query_profile ) {
|
|
print STDERR "##############################################\n";
|
|
print STDERR "##############################################\n";
|
|
print STDERR "##############################################\n";
|
|
print STDERR "QUERY: [$q]\n";
|
|
printf STDERR sprintf("TOT ET: %.3f secs\n",
|
|
$dm::query_profile{$q}{'et'});
|
|
print STDERR "COUNT: [".$dm::query_profile{$q}{'count'}."]\n";
|
|
my $avg = ($dm::query_profile{$q}{'count'} ?
|
|
($dm::query_profile{$q}{'et'} /
|
|
$dm::query_profile{$q}{'count'}) : 0);
|
|
printf STDERR sprintf("AVG ET: %.3f secs\n", $avg);
|
|
print STDERR "CACHE HITS: ".($dm::query_profile{$q}{'hits'}||0)."\n";
|
|
print STDERR "CACHE SAVINGS: ".sprintf("%.3f secs\n", (($dm::query_profile{$q}{'hits'}||0)*($avg)));
|
|
}
|
|
}
|
|
|
|
|
|
#
|
|
# Return the cached data, or undef if not present...
|
|
#
|
|
# FIXME: To use multiple DBs, we'll need to cache by dbhname here...
|
|
sub QueryResultIsCached {
|
|
my ($sql, $key, $opts) = @_;
|
|
|
|
if ( $opts->{deptables} && ! $dm::use_dependent_table_strategy ) {
|
|
$dm::query_cache_misses++;
|
|
return undef;
|
|
}
|
|
|
|
|
|
#
|
|
# cache entry non-existent?
|
|
#
|
|
# logger::loginfo("Checking for cache entry $key existence ...");
|
|
if ( ! exists($dm::query_cache{$key}{'data'}) ) {
|
|
$dm::query_cache_misses++;
|
|
return (undef,undef);
|
|
}
|
|
|
|
#
|
|
# cache entry expired?
|
|
#
|
|
if ( exists($dm::query_cache{$key}{'expires'}) ) {
|
|
# logger::loginfo("OK. Checking for cache entry $key expiration ...");
|
|
if ( $dm::query_cache{$key}{'expires'} <= toolbox::NowAsScalar() ) {
|
|
|
|
# logger::loginfo("Cache entry $key expired...");
|
|
$dm::query_cache_misses++;
|
|
DeleteFromCache($key);
|
|
return (undef,undef);
|
|
}
|
|
}
|
|
|
|
#
|
|
# Dependent tables changed?
|
|
#
|
|
if ( $dm::use_dependent_table_strategy &&
|
|
exists($dm::query_cache{$key}{deptables}) ) {
|
|
|
|
# logger::loginfo("OK. Checking for stale dependent table data ...");
|
|
foreach my $deptable ( @{$dm::query_cache{$key}{deptables}} ) {
|
|
# logger::loginfo("OK. Checking staleness of $deptable ...");
|
|
if ( StaleTableData($deptable, $dm::query_cache{$key}{time}) ) {
|
|
logger::loginfo("Dependent table $deptable is stale...") if ( $dm::log_cache_actions );
|
|
$dm::query_cache_misses++;
|
|
DeleteFromCache($key);
|
|
return (undef,undef);
|
|
}
|
|
}
|
|
}
|
|
|
|
#
|
|
# cache entry transaction completed/gone?
|
|
#
|
|
if ( exists($dm::query_cache{$key}{'cutoff_txid'}) && $opts->{cutoff_txid}){
|
|
logger::loginfo("OK. Checking cache entry $key txid cutoff ...");
|
|
if ( $dm::query_cache{$key}{'cutoff_txid'} < $opts->{cutoff_txid} ) {
|
|
|
|
# logger::loginfo("Cache entry $key txid cutoff $dm::query_cache{$key}{'cutoff_txid'} passed by $opts->{cutoff_txid} ...");
|
|
$dm::query_cache_misses++;
|
|
DeleteFromCache($key);
|
|
return (undef,undef);
|
|
}
|
|
}
|
|
|
|
# logger::loginfo("OK. Returning cache hit");
|
|
$dm::query_cache{$key}{'hits'}++;
|
|
$dm::query_cache_hits++;
|
|
$dm::query_time_saved += $dm::query_cache{$key}{'query_time'};
|
|
logger::loginfo(sprintf("CACHE SAVINGS: (%d hits X %.3f secs/miss = %.3f secs saved, OVERALL: %.1f secs", $dm::query_cache{$key}{'hits'}, $dm::query_cache{$key}{'query_time'}, $dm::query_cache{$key}{'hits'} * $dm::query_cache{$key}{'query_time'}, $dm::query_time_saved)) if ( $dm::log_cache_savings );
|
|
return (1, $dm::query_cache{$key}{'data'});
|
|
}
|
|
|
|
|
|
##############################################################################
|
|
sub StaleTableData( $ $ ) {
|
|
my ($table, $cached_time) = @_;
|
|
|
|
# No data for this table?
|
|
if ( ! exists($dm::table_status{$table}) ) {
|
|
logger::loginfo("No table status info for table [$table]...unable to cache dependencies for query cache invalidation process, so no query caching on this one...");
|
|
return 1;
|
|
}
|
|
|
|
my $now = toolbox::NowAsScalar();
|
|
|
|
my $status_age = $now - $dm::table_status{$table}{last_checked};
|
|
if ( $status_age > $dm::tolerable_cache_race_window ) {
|
|
|
|
logger::loginfo(sprintf("%s table status age (%.3f secs) violates race condition tolerance (%.3f secs) ... will reload latest status for %s table", $table, $status_age, $dm::tolerable_cache_race_window, $table));
|
|
loadTableChanges();
|
|
}
|
|
|
|
# Data grown stale for this table due to other updates?
|
|
if ( $dm::table_status{$table}{last_update} >= $cached_time ) {
|
|
logger::loginfo("Cached data from $table is stale...");
|
|
return 1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
##############################################################################
|
|
sub validateCachedTable {
|
|
}
|
|
|
|
##############################################################################
|
|
sub loadTableChanges {
|
|
my ($table) = @_;
|
|
|
|
logger::loginfo("Loading table_status changes...");
|
|
my $sql = qq{
|
|
SELECT id, date_part('epoch', last_update) AS "last_update", tablename
|
|
FROM table_status
|
|
WHERE last_update >= ?
|
|
};
|
|
my $params = [ $dm::last_table_status_check_time || 1 ];
|
|
if ( $table ) {
|
|
$sql .= qq{ AND tablename = ?};
|
|
push( @$params, $table );
|
|
logger::loginfo("Reloading table status changes for $table");
|
|
}
|
|
my $now = toolbox::NowAsScalar();
|
|
my $data = dm::SelectData($sql, $params);
|
|
foreach ( @$data ) {
|
|
my $table = $_->{tablename};
|
|
$dm::table_status{$table}{last_update} = $_->{last_update};
|
|
$dm::table_status{$table}{id} = $_->{id};
|
|
$dm::table_status{$table}{last_checked} = $now;
|
|
}
|
|
logger::loginfo("Done loading table_status changes...");
|
|
}
|
|
|
|
|
|
|
|
#
|
|
# Make room for $size more bytes in the cache...
|
|
#
|
|
sub PruneCache ( $ ) {
|
|
my ($size) = @_;
|
|
|
|
logger::loginfo("DB query cache is full ($dm::query_cache_size/$dm::max_cache_size)...pruning LRU queries...");
|
|
#
|
|
# LRU cache...remove LRU items until size available...
|
|
#
|
|
my $i = 1;
|
|
my $reduction = (0.2 * $dm::max_cache_size) + $size;
|
|
my $targetsize = $dm::query_cache_size - $reduction;
|
|
|
|
my $sortf = sub {
|
|
if ( exists($dm::query_cache{$a}{'time'}) &&
|
|
exists($dm::query_cache{$b}{'time'}) ) {
|
|
|
|
$dm::query_cache{$a}{'time'} <=> $dm::query_cache{$b}{'time'}
|
|
} else {
|
|
# if ( ! exists($dm::query_cache{$a}{'time'}) ) {
|
|
# logger::logwarn("Undefined 'time' value in dm::query_cache{$a}");
|
|
# }
|
|
# if ( ! exists($dm::query_cache{$b}{'time'}) ) {
|
|
# logger::logwarn("Undefined 'time' value in dm::query_cache{$b}");
|
|
# }
|
|
return 0;
|
|
}
|
|
};
|
|
|
|
foreach my $q (sort $sortf keys %dm::query_cache) {
|
|
# logger::loginfo("Pruning ". $i++. ": ". $dm::query_cache{$q}{'time'}. ", ". $dm::query_cache{$q}{'size'}); # if ( $dm::debug );
|
|
|
|
DeleteFromCache($q);
|
|
|
|
#
|
|
# Have we made enough room yet?
|
|
#
|
|
if ( $dm::query_cache_size <= $targetsize ) {
|
|
last;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
sub DeleteFromCache ( $ ) {
|
|
my ($key) = @_;
|
|
|
|
$dm::query_cache_size -= ($dm::query_cache{$key}{'size'} || 0);
|
|
return delete $dm::query_cache{$key};
|
|
}
|
|
|
|
|
|
# return list of dependent tables
|
|
sub getDependentTables {
|
|
my ($sql,$opts) = @_;
|
|
|
|
my $origsql = $sql;
|
|
|
|
$sql = join(/ /, $sql);
|
|
$sql =~ s/\n/ /g;
|
|
|
|
# bail if it has nested subqueries (too hard to parse)
|
|
my $selectcount = 0;
|
|
while ( $sql =~ /SELECT/gi ) {
|
|
$selectcount++;
|
|
}
|
|
|
|
if ( $selectcount > 1 ) {
|
|
logger::loginfo("Not able to parse subqueries yet (found $selectcount here)...bailing on caching this one: [$origsql]");
|
|
return undef;
|
|
}
|
|
|
|
# slice off the SELECT clause...
|
|
$sql =~ s/^\s*SELECT\s+\S+.*\s+FROM\s+(\S+.*$)/$1/g;
|
|
$sql =~ s/(.*)WHERE\s+.*$/$1/g;
|
|
$sql =~ s/(.*)GROUP BY\s+.*$/$1/g;
|
|
$sql =~ s/(.*)HAVING\s+.*$/$1/g;
|
|
$sql =~ s/(.*)ORDER BY\s+.*$/$1/g;
|
|
$sql =~ s/(.*)LIMIT\s+.*$/$1/g;
|
|
$sql =~ s/(.*)OFFSET\s+.*$/$1/g;
|
|
$sql =~ s/;//g;
|
|
|
|
# get rid of the aliases
|
|
while ( $sql =~ s/(\s*[A-Za-z0-9_]+)\s+[A-Za-z0-9_]+,/$1,/g ) {};
|
|
while ( $sql =~ s/(\s*[A-Za-z0-9_]+)\s+[A-Za-z0-9_]+\s*$/$1/g ) {};
|
|
|
|
# punt if we don't have something that looks like a clean list...
|
|
if ( $sql !~ /^\s*([A-Za-z0-9_]+)(\s*,\s+[A-Za-z0-9_]+)*\s*$/ ) {
|
|
logger::loginfo("Unable to parse dependent tables from query: $origsql ... parsed result looks like this: [$sql]");
|
|
return undef;
|
|
}
|
|
my @deptables = split(/,/, $sql);
|
|
|
|
foreach ( @deptables ) {
|
|
s/^\s+//g;
|
|
s/\s+$//g;
|
|
}
|
|
|
|
return \@deptables;
|
|
}
|
|
|
|
sub CacheResults {
|
|
my ($sql, $data, $sqlkey, $opts) = @_;
|
|
|
|
if ( ! $sql || ! $sqlkey ) {
|
|
return undef;
|
|
}
|
|
|
|
if ( $opts->{deptables} && ! $dm::use_dependent_table_strategy ) {
|
|
return undef;
|
|
}
|
|
|
|
my $key = getCacheKey($sqlkey);
|
|
|
|
my $t0 = toolbox::NowAsScalar();
|
|
|
|
my $i = 1;
|
|
my $size = 0;
|
|
logger::loginfo("Sizing ".scalar(@$data)." rows of data...") if ( $dm::log_cache_actions );
|
|
if ( $opts->{want_array_refs} ) {
|
|
foreach my $aref ( @$data ) {
|
|
foreach my $cref ( @$aref ) {
|
|
$size += (defined($cref) ? length($cref) : 0);
|
|
$size += length($cref);
|
|
}
|
|
}
|
|
} else {
|
|
foreach my $href ( @$data ) {
|
|
while ( my($k,$v) = each %$href ) {
|
|
$size += (defined($v) ? length($v) : 0);
|
|
$size += length($k);
|
|
}
|
|
}
|
|
}
|
|
$size += length($key);
|
|
|
|
if ( $size + $dm::query_cache_size > $dm::max_cache_size ) {
|
|
logger::loginfo("Cache too big...pruning $size bytes now ...");
|
|
PruneCache($size);
|
|
}
|
|
|
|
$dm::query_cache_size += $dm::query_cache{$key}{'size'} = $size;
|
|
$dm::query_cache{$key}{'data'} = $data;
|
|
$dm::query_cache{$key}{'query_time'} = $opts->{query_time};
|
|
$dm::query_cache{$key}{'time'} = toolbox::NowAsScalar();
|
|
|
|
if ( $dm::use_dependent_table_strategy &&
|
|
$opts->{deptables} && ref($opts->{deptables}) eq 'ARRAY' ) {
|
|
|
|
#
|
|
# Dependencies explicitly specified, so use them...
|
|
#
|
|
$dm::query_cache{$key}{deptables} = [ @{$opts->{deptables}} ];
|
|
logger::loginfo("Caching ".scalar(@{$dm::query_cache{$key}{deptables}})." deptables ... ") if ( $dm::log_cache_actions );
|
|
} elsif ( $dm::use_dependent_table_strategy &&
|
|
$dm::derive_unstated_table_dependencies ) {
|
|
#
|
|
# This is where we attempt to determine the set of tables on which
|
|
# this query depends. This works great in simple cases, but needs
|
|
# refinement!
|
|
#
|
|
$dm::query_cache{$key}{deptables} = getDependentTables($sql);
|
|
if ( ! $dm::query_cache{$key}{deptables} ) {
|
|
logger::loginfo("getDependentTables() failed for query: $sql") if ( $dm::log_cache_actions );
|
|
delete $dm::query_cache{$key};
|
|
return undef;
|
|
}
|
|
logger::loginfo("Dependent tables: ",@{$dm::query_cache{$key}{deptables}}) if ( $dm::log_cache_actions );
|
|
}
|
|
|
|
if ( $opts->{cache_lifespan} ) {
|
|
$dm::query_cache{$key}{'expires'} =
|
|
$dm::query_cache{$key}{'time'} + $opts->{'cache_lifespan'};
|
|
}
|
|
|
|
if ( $opts->{cutoff_txid} ) {
|
|
$dm::query_cache{$key}{'cutoff_txid'} = $opts->{'cutoff_txid'};
|
|
}
|
|
|
|
$sqlkey =~ s/\n/ /g;
|
|
$sqlkey =~ s/(\S)\s+(\S)/$1 $2/g;
|
|
$sqlkey =~ s/^\s+(\S)/$1/g;
|
|
$sqlkey =~ s/(\S)\s+$/$1/g;
|
|
$sqlkey = substr($sqlkey, 0, 25);
|
|
logger::loginfo("Caching $dm::query_cache{$key}{'size'}B ".($dm::query_cache{$key}{'expires'} ? "until $dm::query_cache{$key}{'expires'}" : "")." ".($dm::query_cache{$key}{'cutoff_txid'} ? "for txid $dm::query_cache{$key}{'cutoff_txid'}" : "")." under key $key for $sqlkey (cache size: $dm::query_cache_size bytes...)\n") if ( $dm::log_cache_actions ); # if ( $dm::debug );
|
|
|
|
my $et = toolbox::NowAsScalar() - $t0;
|
|
logger::loginfo("dm::CacheResults() took ".sprintf("%.3f secs", ($et))." to cache [$sqlkey]") if ( $dm::log_cache_actions );
|
|
$dm::query_cache{$key}{'caching_time'} = $et;
|
|
$dm::query_time_saved -= $et;
|
|
|
|
return 1;
|
|
}
|
|
|
|
|
|
#
|
|
# Return 1 if parameter mismatch, 0 otherwise...
|
|
#
|
|
sub ParameterMismatch( $ $ ) {
|
|
my ($sql, $params) = @_;
|
|
|
|
if ( defined($params) && ref($params) ne 'ARRAY' ) {
|
|
logger::logerr("Bind params were not passed as aref");
|
|
return 1;
|
|
}
|
|
|
|
my $pcount = ($sql =~ tr/?/?/);
|
|
my $bcount = (defined($params) ? scalar(@$params) : 0);
|
|
|
|
if ( $pcount != $bcount ) {
|
|
logger::logerr("Number of parameter placeholders ($pcount) does not match the number of input bind params ($bcount) for sql: [$sql], params: [".((defined($params) && (ref($params) eq 'ARRAY')) ? join($dm::cfg{paramsep},@$params) : "")."]");
|
|
return 1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
#
|
|
# Select a single scalar value from a single record in the DB. If the
|
|
# query returns anything else, it is an error and the result is undef.
|
|
#
|
|
sub SelectDataValue {
|
|
my ($sql,$params,$opts) = @_;
|
|
|
|
my $data = SelectData($sql, $params, $opts);
|
|
if ( ! defined($data) ) {
|
|
logger::logerr("Query passed to SelectDataValue() returned undef. SQL: [$sql], PARAMS: [".join($dm::cfg{paramsep},@$params)."]");
|
|
return undef;
|
|
}
|
|
|
|
if ( ref($data) ne 'ARRAY' ) {
|
|
logger::logerr("Query passed to SelectDataValue() returned non-ARRAY ref. SQL: [$sql], PARAMS: [".join($dm::cfg{paramsep},@$params)."]");
|
|
return undef;
|
|
}
|
|
|
|
my $count = scalar(@$data);
|
|
if ( $count != 1 ) {
|
|
logger::logerr("Query passed to SelectDataValue() returned $count records when 1 was expected. SQL: [$sql], PARAMS: [".join($dm::cfg{paramsep},@$params)."]");
|
|
return undef;
|
|
}
|
|
|
|
my @keys = keys %{$data->[0]};
|
|
$count = scalar(@keys);
|
|
if ( $count != 1 ) {
|
|
logger::logerr("Query passed to SelectDataValue() returned $count columns when 1 was expected. SQL: [$sql], PARAMS: [".join($dm::cfg{paramsep},@$params)."]");
|
|
return undef;
|
|
}
|
|
|
|
return $data->[0]->{$keys[0]};
|
|
}
|
|
|
|
|
|
|
|
sub SelectData {
|
|
$dm::reconnect_tries = 0;
|
|
$dm::vacuum_tries = 0;
|
|
my $data;
|
|
eval { $data = _SelectData(@_); };
|
|
if ( $@ ) {
|
|
logger::logerr($@);
|
|
return undef;
|
|
}
|
|
|
|
return $data;
|
|
}
|
|
|
|
|
|
sub Reconnect {
|
|
my ($opts) = @_;
|
|
|
|
my $dbhname = $opts->{dbhname} || $dm::cfg{dbhname};
|
|
|
|
if ( $dm::reconnect_tries++ < $dm::reconnect_limit ) {
|
|
logger::loginfo("Trying to reconnect to DB (attempt #$dm::reconnect_tries)...");
|
|
eval { CloseDB($opts) };
|
|
$dm::dbhCache{$dbhname} = GetDBH($opts);
|
|
if ( ! $dm::dbhCache{$dbhname} ) {
|
|
logger::logerr("GetDBH() FAILED...");
|
|
} else {
|
|
logger::loginfo("Hey, looks like reconnect worked...cleaning up by force just in case...");
|
|
dm::Cleanup(undef,1);
|
|
}
|
|
return $dm::dbhCache{$dbhname};
|
|
}
|
|
logger::logerr("Giving up on attempted DB reconnection after $dm::reconnect_tries tries...");
|
|
return undef;
|
|
}
|
|
|
|
sub getCacheKey ( $ ) {
|
|
my ($longkey) = @_;
|
|
return $longkey;
|
|
}
|
|
|
|
sub CachingIsAppropriate( $ ) {
|
|
my ($opts) = @_;
|
|
|
|
if ( $dm::query_cache_enabled &&
|
|
(($opts && $opts->{cache}) || $dm::cache_everything) ) {
|
|
|
|
return 1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
sub _SelectData {
|
|
my ($sql,$params,$opts) = @_;
|
|
|
|
if ( ! defined($sql) ) {
|
|
logger::logerr("Undefined SQL query string; ABORTING QUERY.");
|
|
return undef;
|
|
}
|
|
|
|
if ( ref($sql) ) {
|
|
logger::logerr("Non-scalar SQL query string [$sql]; ABORTING QUERY.");
|
|
return undef;
|
|
}
|
|
|
|
|
|
my $t0 = toolbox::TimeNow();
|
|
SetDBGSettings($opts);
|
|
|
|
my @data = ();
|
|
|
|
# FIXME: FOR DEBUGGING ONLY, THIS WILL KILL PERFORMANCE IN PRODUCTION
|
|
if ( 0 ) {
|
|
logger::loginfo("SQL: $sql");
|
|
foreach my $p ( @$params ) {
|
|
logger::loginfo("PARAM: $p");
|
|
}
|
|
}
|
|
my $pstr = (defined($params) && ref($params) eq 'ARRAY' ? join($dm::cfg{paramsep}, @$params) : "");
|
|
|
|
if ( ParameterMismatch($sql,$params) ) {
|
|
logger::logerr("Parameter mismatch for sql: [$sql] and bind params [$pstr]");
|
|
return undef;
|
|
}
|
|
|
|
#
|
|
# See if we have this one sitting in the cache...
|
|
#
|
|
my $key = $sql . $pstr;
|
|
my ($cache_hit,$cached_data);
|
|
|
|
#
|
|
# Maybe we could use a tied IPC::Sharable implementation for sharing a
|
|
# value that indicates when child client processes need to clear their
|
|
# caches? Needs more thought...
|
|
#
|
|
if ( CachingIsAppropriate( $opts ) ) {
|
|
|
|
$key =~ s/\n//g;
|
|
$key =~ s/\s//g;
|
|
my $cachekey = getCacheKey($key);
|
|
($cache_hit,$cached_data) =
|
|
QueryResultIsCached($sql, $cachekey, $opts);
|
|
if ( $cache_hit ) {
|
|
# logger::loginfo("DB CACHE HIT ...\n") if ( $dm::show_query_cache_hitmiss || $dm::profile );
|
|
$dm::query_profile{$sql}{'hits'}++;
|
|
return $cached_data;
|
|
} else {
|
|
# logger::loginfo("DB CACHE MISS ...\n") if ( $dm::show_query_cache_hitmiss || $dm::profile );
|
|
}
|
|
}
|
|
|
|
logger::loginfo("SQL: [" . $sql . "], INPUT PARAMS: [" . $pstr . "]") if ( $dm::debug );
|
|
|
|
my $dbh = GetDBH($opts);
|
|
if (! $dbh) {
|
|
logger::logerr("DBI::connect() FAILED for SQL [$sql], params [$pstr], with DBI::err [". $DBI::err . "], DBI::state [". $DBI::state . "], DBI::errstr [" . $DBI::errstr . "] and params ['dbi:$$dm::cfg{dbkind}:dbname=$$dm::cfg{dbhname}', '$$dm::cfg{dbuser}', '<passwd>']\n");
|
|
return undef;
|
|
}
|
|
|
|
if ( $dm::explain_dm ) {
|
|
logger::loginfo("EXPLAIN ON");
|
|
eval {
|
|
$dbh->do("EXPLAIN VERBOSE $sql");
|
|
}
|
|
}
|
|
|
|
my $sth = $dbh->prepare( $sql );
|
|
if ( ! $sth ) {
|
|
logger::logerr("DBI::prepare() FAILED for SQL [$sql], params [$pstr], with DBI::err [". $DBI::err . "], DBI::state [". $DBI::state . "], DBI::errstr [" . $DBI::errstr . "]\n");
|
|
return undef;
|
|
}
|
|
|
|
if ( defined($params) ) {
|
|
my $reftype = ref $params;
|
|
if ( $reftype eq 'ARRAY' ) {
|
|
for ( my $i = 1; $i <= scalar(@$params); $i++ ) {
|
|
my $bind_value = $params->[$i-1];
|
|
logger::loginfo("Binding param #$i [$bind_value]...") if ( $dm::debug );
|
|
if ( ! defined($sth->bind_param($i, $bind_value) ) ) {
|
|
logger::logerr("DBI::bind_param($i, $bind_value) FAILED for SQL [$sql], params [$pstr], with DBI::err [" . $sth->err . "], DBI::state [" . $sth->state . "], DBI::errstr [".$sth->errstr."]\n");
|
|
return undef;
|
|
} else {
|
|
logger::loginfo("Successful bind of param #$i...") if ( $dm::debug );
|
|
}
|
|
}
|
|
} else {
|
|
logger::logerr("BAD REF TYPE $reftype\n");
|
|
return undef;
|
|
}
|
|
}
|
|
|
|
|
|
if ( ! $sth->execute() ) {
|
|
my $err = $sth->err();
|
|
my $errstr = $sth->errstr();
|
|
my $state = $sth->state();
|
|
logger::logerr("DBI::execute() FAILED in dm::_SelectData() with DBI::err [$err], DBI::state [$state], DBI::errstr [$errstr] for SQL [$sql], params [$pstr]\n");
|
|
addDBErrStr($errstr);
|
|
|
|
if ( $err == -1 && dm::Reconnect() ) {
|
|
|
|
logger::loginfo("Reconnect successful...trying query again ...");
|
|
return dm::_SelectData($sql,$params);
|
|
|
|
#
|
|
# For this particular error, mailing list discussions
|
|
# and experience suggest that vacuuming will fix it...
|
|
#
|
|
} elsif ( $err == 7 && ($errstr =~ /$dm::PGIDX_SCAN_ERR/) ) {
|
|
|
|
logger::logerr("THAT PESKY ExecInitIndexScan BUG HAS REARED ITS HEAD AGAIN...TRYING TO VACUUM...");
|
|
|
|
if ( dm::Vacuum($dbh) ) {
|
|
logger::loginfo("Vacuuming successful...retrying query on the fly...");
|
|
return dm::_SelectData($sql,$params);
|
|
} else {
|
|
logger::logerr("No luck, vacuum failed.");
|
|
}
|
|
};
|
|
logger::loginfo("DBI::execute() AND all on-the-fly recovery attempts FAILED for SQL [$sql], params [$pstr]\n");
|
|
|
|
return undef;
|
|
}
|
|
|
|
if ( $opts->{want_array_refs} ) {
|
|
while ( my $aref = $sth->fetchrow_arrayref) {
|
|
push( @data, [ @$aref ] );
|
|
}
|
|
# push columns names on as first row...
|
|
my $hdraref;
|
|
foreach my $name ( @{$sth->{NAME}} ) {
|
|
push( @$hdraref, $name );
|
|
}
|
|
unshift( @data, [ @$hdraref ] );
|
|
} else {
|
|
while ( my $href = $sth->fetchrow_hashref) {
|
|
push( @data, { %$href } );
|
|
}
|
|
}
|
|
|
|
my $et = toolbox::ElapsedTime($t0);
|
|
|
|
if ( ! $sth->finish() ) {
|
|
@data = ();
|
|
logger::logerr("DBI::finish() FAILED for SQL [$sql], params [$pstr], with DBI::err [" . $sth->err . "], DBI::state [" . $sth->state . "], DBI::errstr [" . $sth->errstr . "]\n");
|
|
setDBErrStr($sth->errstr);
|
|
|
|
return undef;
|
|
}
|
|
|
|
if ( $dm::debug == 1 && $dm::showdata ) {
|
|
printf STDERR "#############################################\n";
|
|
printf STDERR "RESULTS RETURNED BY SQL: [" . $sql . "], INPUT PARAMS: [" . $pstr . "]:\n";
|
|
my $i = 0;
|
|
if ( $opts->{want_array_refs} ) {
|
|
foreach my $aref ( @data ) {
|
|
printf STDERR "Row[".$i++."]";
|
|
foreach my $cref ( @$aref ) {
|
|
printf STDERR ", $cref";
|
|
}
|
|
printf STDERR "\n";
|
|
}
|
|
} else {
|
|
foreach my $href ( @data ) {
|
|
printf STDERR "Row[".$i++."]";
|
|
while ( my($k,$v) = each %$href ) {
|
|
my $key = (defined($k) ? $k : "undef");
|
|
my $val = (defined($v) ? $v : "undef");
|
|
printf STDERR ", '$key' = [$val]";
|
|
}
|
|
printf STDERR "\n";
|
|
}
|
|
}
|
|
printf STDERR "######## END OF RESULTS ####################\n";
|
|
}
|
|
|
|
if ( CachingIsAppropriate( $opts ) &&
|
|
( ! $cache_hit || $opts->{'refresh_cache'} ) ) {
|
|
|
|
$opts->{query_time} = $et;
|
|
|
|
if ( ! CacheResults ( $sql, \@data, $key, $opts ) ) {
|
|
logger::loginfo("Unable to cache query results for $sql") if ( $dm::log_cache_actions );
|
|
}
|
|
}
|
|
|
|
if ( $dm::profile ) {
|
|
print STDERR sprintf("DBI query: %.3f secs for SQL [%s]\n", $et, $sql);
|
|
}
|
|
IncQueryCount($sql,$et);
|
|
|
|
return \@data;
|
|
}
|
|
|
|
|
|
sub InsertData { return NonSelectQuery(@_); }
|
|
sub ModifyData { return NonSelectQuery(@_); }
|
|
sub DeleteData { return NonSelectQuery(@_); }
|
|
|
|
sub NonSelectQuery {
|
|
if ( systemDown() ) {
|
|
return undef;
|
|
}
|
|
|
|
$dm::reconnect_tries = 0;
|
|
my $data;
|
|
eval { $data = _NonSelectQuery(@_); };
|
|
if ( $@ ) {
|
|
logger::logerr($@);
|
|
return undef;
|
|
}
|
|
|
|
return $data;
|
|
}
|
|
|
|
|
|
sub _NonSelectQuery {
|
|
my $t0 = toolbox::TimeNow();
|
|
|
|
my ($sql,$params,$opts) = @_;
|
|
my $data = ();
|
|
|
|
SetDBGSettings($opts);
|
|
|
|
if ( ! defined($sql) || ref($sql) ) {
|
|
logger::logerr("Undefined or non-scalar SQL string passed in");
|
|
Carp::cluck("Undefined or non-scalar SQL string passed in\n");
|
|
return undef;
|
|
}
|
|
|
|
if ( defined($params) && (ref($params) ne 'ARRAY') ) {
|
|
logger::logerr("Non-ARRAY ref passed as parameter list for SQL [$sql]");
|
|
return undef;
|
|
}
|
|
|
|
if ( ParameterMismatch($sql,$params) ) {
|
|
logger::logerr("Parameter mismatch for sql: [$sql] and bind params [".join($dm::cfg{paramsep},@$params)."]");
|
|
return undef;
|
|
}
|
|
|
|
logger::logquery($sql,$params);
|
|
|
|
# my $pstr = (defined($params) ? join($dm::cfg{paramsep}, @$params) : '');
|
|
my $pstr = (defined($params) ? join(' | ', @$params) : '');
|
|
logger::loginfo("SQL: [" . $sql . "], INPUT PARAMS: [" . $pstr . "]") if ( $dm::debug );
|
|
|
|
my $dbh = GetDBH($opts);
|
|
if (! $dbh) {
|
|
logger::logerr("dm::GetDBH() FAILED for SQL [$sql], params [$pstr], with DBI::err [". $DBI::err . "], DBI::state [". $DBI::state . "], DBI::errstr [" . $DBI::errstr . "] and params ['dbi:$dm::cfg{dbkind}:dbname=$dm::cfg{dbname}', '$dm::cfg{dbuser}', '<passwd>']\n");
|
|
return undef;
|
|
}
|
|
|
|
if ( $opts->{'AUTOCOMMIT'} ) {
|
|
#
|
|
# Caller wants to reset AUTOCOMMIT in driver...
|
|
#
|
|
if ( $opts->{'AUTOCOMMIT'} == 0 ||
|
|
$opts->{'AUTOCOMMIT'} == 1 ) {
|
|
$dbh->{'AutoCommit'} = $opts->{'AUTOCOMMIT'};
|
|
} else {
|
|
logger::logerr("Illegal 'AUTOCOMMIT' value: [".
|
|
$opts->{'AUTOCOMMIT'} .
|
|
"] ... aborting query [$sql], params [$pstr]");
|
|
return undef;
|
|
}
|
|
}
|
|
|
|
|
|
my $sth = $dbh->prepare( $sql );
|
|
if ( ! $sth ) {
|
|
logger::logerr("DBI::prepare() FAILED for SQL [$sql], params [$pstr], with DBI::err [". $DBI::err . "], DBI::state [". $DBI::state . "], DBI::errstr [" . $DBI::errstr . "]\n");
|
|
return undef;
|
|
}
|
|
|
|
if ( defined($params) ) {
|
|
my $reftype = ref $params;
|
|
if ( $reftype eq 'ARRAY' ) {
|
|
for ( my $i = 1; $i <= scalar(@$params); $i++ ) {
|
|
logger::loginfo("Binding param #$i...") if ( $dm::debug );
|
|
my $bind_value = $params->[$i-1];
|
|
if ( ! defined($sth->bind_param($i, $bind_value) ) ) {
|
|
logger::logerr("DBI::bind_param($i, $bind_value) FAILED for SQL [$sql], params [$pstr], with DBI::err [". $DBI::err . "], DBI::state [". $DBI::state . "], DBI::errstr [".$DBI::errstr."]\n");
|
|
return undef;
|
|
} else {
|
|
logger::loginfo("Successful bind of param #$i...") if ( $dm::debug );
|
|
}
|
|
}
|
|
} else {
|
|
logger::logerr("BAD REF TYPE $reftype\n");
|
|
return undef;
|
|
}
|
|
}
|
|
|
|
|
|
my $rc;
|
|
|
|
$dm::pending_queries++;
|
|
|
|
if ( ! defined($rc = $sth->execute()) ) {
|
|
logger::logerr("DBI::execute() FAILED for SQL [$sql], params [$pstr], with DBI::err [". $DBI::err . "], DBI::state [". $DBI::state . "], DBI::errstr [" . $DBI::errstr . "]\n");
|
|
addDBErrStr($DBI::errstr);
|
|
return undef;
|
|
}
|
|
if ( $opts->{'SERIAL'} && $dm::dbtype == dm::PGSQL ) {
|
|
#
|
|
# Retrieve postgresql serial ID in the customary PG manner...
|
|
#
|
|
if ( $opts->{'TABLE'} ) {
|
|
my $ser_sql = qq{
|
|
SELECT id FROM $opts->{'TABLE'} WHERE oid = ?;
|
|
};
|
|
$rc = dm::SelectDataValue($ser_sql, [ $sth->{pg_oid_status} ]);
|
|
} else {
|
|
$rc = undef;
|
|
}
|
|
}
|
|
|
|
|
|
if ( ! $sth->finish() ) {
|
|
$data = ();
|
|
logger::logerr("DBI::finish() FAILED for SQL [$sql], params [$pstr], with DBI::err [" . $sth->err . "], DBI::state [" . $sth->state . "], DBI::errstr [" . $DBI::errstr . "]\n");
|
|
return undef;
|
|
}
|
|
|
|
my $et = toolbox::ElapsedTime($t0);
|
|
if ( $dm::profile ) {
|
|
print STDERR sprintf("DBI query: %.3f secs for SQL [%s]\n", $et, $sql);
|
|
}
|
|
IncQueryCount($sql,$et);
|
|
|
|
my $retval = undef;
|
|
$retval = (defined($rc) && ($rc == '0E0') ? 0 : $rc);
|
|
|
|
if ( ! ref($retval) ) {
|
|
if ( $opts->{'SERIAL'} && $dm::dbtype == dm::PGSQL ) {
|
|
logger::loginfo("DB returned serial ID [$retval]") if $dm::debug;
|
|
} else {
|
|
logger::loginfo("DB effected $retval record(s)");
|
|
}
|
|
}
|
|
|
|
|
|
return ($retval);
|
|
}
|
|
|
|
|
|
sub SetOpts {
|
|
my ($opts) = @_;
|
|
|
|
my $dbh = GetDBH($opts);
|
|
if (! $dbh) {
|
|
logger::logerr("dm::GetDBH() FAILED to get db handle ... are we connected? DBI::err [". $DBI::err . "], DBI::state [". $DBI::state . "], DBI::errstr [" . $DBI::errstr . "]");
|
|
return undef;
|
|
}
|
|
|
|
if ( exists($opts->{'AUTOCOMMIT'}) ) {
|
|
#
|
|
# Caller wants to reset AUTOCOMMIT in driver...
|
|
#
|
|
if ( $opts->{'AUTOCOMMIT'} == 0 ||
|
|
$opts->{'AUTOCOMMIT'} == 1 ) {
|
|
$dbh->{'AutoCommit'} = $opts->{'AUTOCOMMIT'};
|
|
logger::loginfo("Setting AutoCommit to ".
|
|
$opts->{'AUTOCOMMIT'} ) if ( $dm::debug );
|
|
} else {
|
|
logger::logerr("Illegal 'AUTOCOMMIT' value: [".
|
|
$opts->{'AUTOCOMMIT'} .
|
|
"] ... attempt to set options failed");
|
|
return undef;
|
|
}
|
|
}
|
|
return 1;
|
|
}
|
|
|
|
|
|
sub Cleanup {
|
|
my ($txid,$force) = @_;
|
|
|
|
return; # vacuuming is not safe while inserts/deletes are occurring...
|
|
|
|
#
|
|
# Only cleanup after $dm::cleanup_interval transactions...
|
|
#
|
|
# Tx IDs are sync'ed across all child apache processes...
|
|
#
|
|
print STDERR "txid = $txid, cleanup_interval = $dm::cleanup_interval\n";
|
|
if ( ! $force && (! defined($txid) || ($txid % $dm::cleanup_interval != 0)) ) {
|
|
return;
|
|
}
|
|
|
|
my $data;
|
|
#
|
|
# Obvious race condition here, but unlikely we'll hit it as it is
|
|
# a small race window...
|
|
#
|
|
eval { $data = _Cleanup(@_); };
|
|
if ( $@ ) {
|
|
logger::logerr($@);
|
|
return undef;
|
|
}
|
|
|
|
return $data;
|
|
}
|
|
|
|
|
|
sub Vacuum ($) {
|
|
if ( systemDown() ) {
|
|
return undef;
|
|
}
|
|
|
|
my ($dbh) = shift;
|
|
|
|
if ( $dm::vacuum_tries++ >= $dm::vacuum_limit ) {
|
|
logger::logwarn("Giving up on attempted DB vacuum after $dm::vacuum_tries tries...");
|
|
return undef;
|
|
}
|
|
|
|
my $rv;
|
|
logger::loginfo("Attempting VACUUM...");
|
|
eval { $rv = $dbh->do("VACUUM ANALYZE") };
|
|
if ( $@ ) {
|
|
logger::logerr($@);
|
|
logger::logerr("DBI::do('VACUUM') FAILED (eval block failed) with DBI::err [". $DBI::err . "], DBI::state [". $DBI::state . "], DBI::errstr [" . $DBI::errstr . "]");
|
|
return undef;
|
|
} elsif ( ! defined($rv) ) {
|
|
logger::logerr("DBI::do('VACUUM') FAILED (DBI returned undef) with DBI::err [". $DBI::err . "], DBI::state [". $DBI::state . "], DBI::errstr [" . $DBI::errstr . "]");
|
|
return undef;
|
|
} else {
|
|
logger::loginfo("Vacuum appears possibly successful with DBI::do() return value of [$rv] ...");
|
|
}
|
|
|
|
return 1;
|
|
}
|
|
|
|
|
|
sub _Cleanup {
|
|
my ($opts) = @_;
|
|
|
|
my $dbh = GetDBH($opts);
|
|
if (! $dbh) {
|
|
logger::logerr("FAILED to get db handle ... are we connected? DBI::err [". $DBI::err . "], DBI::state [". $DBI::state . "], DBI::errstr [" . $DBI::errstr . "]");
|
|
return undef;
|
|
}
|
|
|
|
if ( ! dm::Vacuum($dbh) ) {
|
|
logger::loginfo("Attempt to vacuum database was unsuccessful (that's ok if db was locked for vacuuming by another process)...");
|
|
}
|
|
}
|
|
|
|
|
|
|
|
sub Commit {
|
|
my ($opts) = @_;
|
|
|
|
my $force = $opts->{force};
|
|
|
|
if ( systemDown() ) {
|
|
logger::loginfo("DB system down (".getDownFileName()." exists) ... aborting commit");
|
|
return undef;
|
|
}
|
|
|
|
if ( ! $dm::pending_queries && ! $force ) {
|
|
# logger::loginfo("No pending queries; commit ignored");
|
|
return 1;
|
|
}
|
|
|
|
my $data;
|
|
eval { $data = _Commit($opts); };
|
|
if ( $@ ) {
|
|
logger::logerr($@);
|
|
return undef;
|
|
}
|
|
|
|
return $data;
|
|
}
|
|
|
|
|
|
sub _Commit {
|
|
my ($opts) = @_;
|
|
|
|
my $dbhname = $opts->{dbhname} || $dm::cfg{dbhname};
|
|
my $dbh = GetDBH($opts);
|
|
|
|
if (! $dbh) {
|
|
logger::logerr("FAILED to get db handle for $dbhname ... are we connected? DBI::err [". $DBI::err . "], DBI::state [". $DBI::state . "], DBI::errstr [" . $DBI::errstr . "]");
|
|
return undef;
|
|
}
|
|
|
|
logger::logquery("COMMIT -- ($dm::pending_queries pending queries for $dbhname)");
|
|
|
|
my $rc = $dbh->commit();
|
|
|
|
if ( ! $rc ) {
|
|
logger::logerr($dbh->errstr);
|
|
} else {
|
|
$dm::pending_queries = 0;
|
|
}
|
|
|
|
return $rc;
|
|
}
|
|
|
|
|
|
|
|
sub Rollback {
|
|
my ($opts) = @_;
|
|
my $force = $opts->{force};
|
|
if ( systemDown() ) {
|
|
logger::loginfo("DB system down (".getDownFileName()." exists) ... aborting rollback");
|
|
return undef;
|
|
}
|
|
|
|
#
|
|
# Always rollback, because some failed queries still need it...
|
|
#
|
|
if ( (! $force) && (! $dm::pending_queries) ) {
|
|
logger::loginfo("No pending queries; rollback ignored");
|
|
return 1;
|
|
}
|
|
|
|
my $data;
|
|
eval { $data = _Rollback($opts); };
|
|
if ( $@ ) {
|
|
logger::logerr($@);
|
|
return undef;
|
|
}
|
|
|
|
return $data;
|
|
}
|
|
|
|
|
|
sub _Rollback {
|
|
my ($opts) = @_;
|
|
|
|
my $dbhname = $opts->{dbhname} || $dm::cfg{dbhname};
|
|
my $dbh = GetDBH($opts);
|
|
if (! $dbh) {
|
|
logger::logerr("FAILED to get db handle for $dbhname ... are we connected? DBI::err [". $DBI::err . "], DBI::state [". $DBI::state . "], DBI::errstr [" . $DBI::errstr . "]");
|
|
return undef;
|
|
}
|
|
|
|
logger::loginfo("Rolling back $dm::pending_queries pending queries for $dbhname");
|
|
logger::logquery("ROLLBACK -- $dbhname");
|
|
|
|
my $rc = $dbh->rollback();
|
|
|
|
if ( ! $rc ) {
|
|
logger::logerr($dbh->errstr);
|
|
} else {
|
|
$dm::pending_queries = 0;
|
|
}
|
|
|
|
return $rc;
|
|
}
|
|
|
|
|
|
END {
|
|
for my $dbhname ( keys %dm::dbhCache ) {
|
|
&dm::CloseDB( { dbhname => $dbhname } );
|
|
}
|
|
}
|
|
|
|
|
|
1;
|
|
|