来源:http://www.anysql.net 该Perl脚本代码如下:
#!/export/home/oracle/dbaperl/bin/perl -w # # Manual refresh rows between databases
use strict; use DBI; use Getopt::Std; use FileHandle;
my ($rc, $verbose, $src_db, $dest_db, $cfgfile, $looptime, $waittime, $arraydml, $unique_errno, $exitapp, $batchsize, $longsize, $exitloop, $updatefirst, $logfile, $quietmode, $lckfile); my ($mv_seqcol, $mv_typcol, $mv_seqtab);
format HEADER =
AnySQL.net Synchronize Data Utility Version 2.0.0
Syntax: asyncdata.pl -S source -T target -C configfile
This script will replicat changed rows between different database types. The changed rows is captured by materialized view log (with primary key, sequence). For performance, please create index on the sequence column on MVIEW log table.
-h - Give help screen -S Source - source database (USERNAME#PASSWORD#Oracle:TNSNAME) -T Target - target database (USERNAME#PASSWORD#DBTYPE:HOST:DATABASE) -C configfile - config file contains the tables need to be replicated (Source Table #Primary Key #MVIEW Log #Target Table) -L longsize - the maximum long or lob value (default 256, Unit: KB) -B batchsize - array DML size (default 50) -W waittime - wait time between each loop (default 2 seconds) -A - Enable array DML interface for better performance -U - Update first, not delete then insert, reduce cost on target -Q - run in quiet mode, will not print message to screen. -N temp_table - Temp table name used for hold the sequence.
Examples: asyncdata.pl -S anysql#anysql#Oracle:prod -T root#mysql#mysql:test -C demo.conf .
$verbose = 0; $looptime = 200; #Will reload the configuration file after loop. $waittime = 1; $arraydml = 0; $exitapp = 0; $exitloop = 0; $batchsize = 100; $longsize = 256 * 1024; $updatefirst = 0; $quietmode = 0; $mv_seqcol = "SEQUENCE\$\$"; $mv_typcol = "DMLTYPE\$\$"; $mv_seqtab = "TMP_ASYNCDATA";
# Add signal process for SIGINT & SITSTOP # When get this signal, set the exit app flag to true sub set_exitapp_flags { $exitapp = 1; } $SIG{INT} = \&set_exitapp_flags; $SIG{STOP} = \&set_exitapp_flags; $SIG{QUIT} = \&set_exitapp_flags; $SIG{HUP} = 'IGNORE';
# MySQL 1062 # Oracle 1 $unique_errno = 1062;
&do_opts();
$logfile = $cfgfile; $lckfile = $cfgfile; if ($logfile =~ /\.\w*$/) { $logfile =~ s/\.\w*/_refresh\.log/; $lckfile =~ s/\.\w*/_refresh\.lck/; } else { $logfile = $logfile."_refresh.log"; $lckfile = $lckfile."_refresh.lck"; }
$rc = &flock_lockfile($lckfile);
if ($rc) { print("Cannot obtain lock for file $lckfile !!!\n") if (!$quietmode); print("Another asyncdata thread may already running for this configuration file !!!\n") if (!$quietmode); exit(); }
open (LOG,">>$logfile") || die ("\n\nCan't open log file to write to: $logfile\n"); LOG->autoflush(1);
sub refresh_single { my ($srcdbh, $dstdbh, $base_table, $pk_columns, $mlog_table, $target_table, $deletewhere) = @_;
# my $base_table = "T_MVLOG"; # my $pk_columns = "COL1"; # my $mlog_table = "MLOG\$_T_MVLOG"; # my $target_table = "MV_T_MVLOG2"; # my $deletewhere = "1=1";
my ($min_seq, $max_seq, $rows, $rowcnt, $i, $keycol, $colscnt); my ($delete_mv_sql, $insert_mv_sql, $delete_mvlog_sql, $delete_target_sql, $insert_target_sql, $update_mv_sql, $update_target_sql);
my $view_sql = "SELECT * FROM ".$base_table; my @pkcol_array = split ",", $pk_columns; my @pkpos_array = (); my @nonpkpos_array = ();
# Get the min sequence number $min_seq = 0; $rows = $srcdbh->selectall_arrayref("SELECT MIN($mv_seqcol) FROM $mlog_table"); if (!defined($rows)) { &prtit("Database Error:"); &prtit($srcdbh->errstr()); return 2; } $min_seq = $rows->[0]->[0] if (@$rows);
# Get the max sequence number $max_seq = 1000; $rows = $srcdbh->selectall_arrayref("SELECT MAX($mv_seqcol) FROM $mlog_table"); if (!defined($rows)) { &prtit("Database Error:"); &prtit($srcdbh->errstr()); return 2; } $max_seq = $rows->[0]->[0] if (@$rows);
# Return if nothing need to be replicated if (!defined($min_seq) or !defined($max_seq)) { # &prtit("Nothing need to be replicated.") if ($verbose); return 0; }
# for each refresh, we will refresh a batch $max_seq = $min_seq + $batchsize if ($max_seq > ($min_seq + $batchsize));
&prtit("Start copy from $base_table to $target_table, seq $min_seq to $max_seq ...");
$srcdbh->do("INSERT INTO $mv_seqtab ($mv_seqcol) \n". "SELECT $mv_seqcol FROM $mlog_table WHERE $mv_seqcol >= ? AND $mv_seqcol <= ?", undef, $min_seq, $max_seq);
if ($srcdbh->err()) { &prtit("Error when save sequence to middle table \n". $srcdbh->errstr()); return 2; }
# Get the delete rows $delete_mv_sql = "SELECT DISTINCT ".$pk_columns; $delete_mv_sql .= " \n FROM ".$mlog_table. " MLOG \n WHERE $mv_seqcol IN (SELECT $mv_seqcol FROM $mv_seqtab) \n ";
if ($updatefirst) { # $delete_mv_sql .= " AND NOT EXISTS \n (SELECT 1 FROM ".$base_table." MBASE \n WHERE "; # foreach my $keycol (@pkcol_array) # { # $delete_mv_sql .= $keycol."=MLOG.".$keycol." AND "; # } # $delete_mv_sql .= "1=1)"; $delete_mv_sql .= "AND ($mv_typcol = 'D')"; } else { $delete_mv_sql .= "AND ($mv_typcol <> 'I')"; }
if ($verbose > 1) { &prtit("SQL to get the rows that need to be deleted on target table : \n $delete_mv_sql"); }
# perform the delete on target table $delete_target_sql = "DELETE FROM ".$target_table."\n WHERE "; foreach $keycol (@pkcol_array) { $delete_target_sql .= $keycol."=? AND "; } $delete_target_sql .= $deletewhere;
if ($verbose > 1) { &prtit("SQL to delete the rows on target table : \n $delete_target_sql"); }
my $delrows = $srcdbh->selectall_arrayref($delete_mv_sql);
if ($srcdbh->err()) { &prtit("Error when deleting rows on target table $target_table"); &prtit($srcdbh->errstr()); $dstdbh->rollback(); $srcdbh->rollback(); return 2; }
if ($delrows) { if ($verbose > 1) { &prtit("Start to delete rows on target table $target_table ..."); } my $delstmt = $dstdbh->prepare($delete_target_sql); if (defined($delstmt)) { $colscnt = @pkcol_array;
if ($arraydml) { my (%bind_array, @rowstats); for ($i=1; $i<=$colscnt; $i++) { my @colarr = (); $bind_array{$i} = \@colarr; } foreach my $delrow (@$delrows) { for ($i=1; $i<=$colscnt; $i++) { my $colarr = $bind_array{$i}; push(@$colarr, $delrow->[$i-1]); } }
for ($i=1; $i<=$colscnt; $i++) { $delstmt->bind_param_array($i, $bind_array{$i}); } my $rv = $delstmt->execute_array({ArrayTupleStatus => \@rowstats});
if (! defined($rv)) { &prtit("Error when deleting rows on target table $target_table"); if ($delstmt->err()) { &prtit($delstmt->errstr()); } else { my $updcnt = @rowstats; my $j = 0; for($i=0;$i<$updcnt;$i++) { if (ref $rowstats[$i]) { &prtit("[".$rowstats[$i]->[0]."] ".$rowstats[$i]->[1]); my $pk_value_list = "Primary Key Value: D,$target_table"; for ($j = 1; $j <= $colscnt; $j++) { $pk_value_list .= ", ".$bind_array{$j}->[$i]; } $pk_value_list =~ s/: ,/: /; &prtit($pk_value_list); } } } $delstmt->finish(); $dstdbh->rollback(); $srcdbh->rollback(); return 2; } } else { foreach my $delrow (@$delrows) { for ($i=1; $i<=$colscnt; $i++) { $delstmt->bind_param($i, $delrow->[$i-1]); } if (! ($delstmt->execute())) { &prtit("Error when deleting rows on target table $target_table"); &prtit($delstmt->errstr()); my $pk_value_list = "Primary Key Value: D,$target_table"; foreach my $tmp_pk (@pkpos_array) { $pk_value_list .= ", ".$delrow->[$tmp_pk-1]; } $pk_value_list =~ s/: ,/: /; &prtit($pk_value_list); $delstmt->finish(); $dstdbh->rollback(); $srcdbh->rollback(); return 2; } } } $delstmt->finish(); } else { &prtit("Error when deleting rows on target table $target_table"); $dstdbh->rollback(); $srcdbh->rollback(); return 2; } if ($verbose > 1) { &prtit("Delete rows on target table $target_table finished."); } }
# get the rows need to be updated on target table if ($updatefirst) { $update_mv_sql = "SELECT CURR.* \n FROM (".$view_sql.") CURR, \n ("; $update_mv_sql .= "SELECT DISTINCT ".$pk_columns." FROM ".$mlog_table. " \n WHERE $mv_seqcol IN (SELECT $mv_seqcol FROM $mv_seqtab)\n AND ($mv_typcol = 'U')) MLOG "; $update_mv_sql .= " \n WHERE "; foreach $keycol (@pkcol_array) { $update_mv_sql .= "CURR.".$keycol."=MLOG.".$keycol." AND "; } $update_mv_sql .= "1=1";
my $updrow_stmt = $srcdbh->prepare($update_mv_sql); if (defined($updrow_stmt)) { $updrow_stmt->execute(); if ($srcdbh->err()) { &prtit("Error when updating rows on target table $target_table"); &prtit($srcdbh->errstr()); $dstdbh->rollback(); $srcdbh->rollback(); return 2; }
$colscnt = $updrow_stmt->{NUM_OF_FIELDS};
# Get the primary key column offset @pkpos_array=(); @nonpkpos_array=(); for ($i=0;$i<$colscnt; $i++) { my $ispkcolumn = 0; foreach my $keycol (@pkcol_array) { if ($keycol =~ /^$updrow_stmt->{NAME}->[$i]$/i) { $ispkcolumn = 1; last; } } if ($ispkcolumn) { push(@pkpos_array, ($i+1)); } else { push(@nonpkpos_array, ($i+1)); } }
# Create the update sql statement $update_target_sql = "UPDATE ".$target_table." SET "; foreach my $keycol (@nonpkpos_array) { $update_target_sql .= $updrow_stmt->{NAME}->[$keycol - 1]." = ? ,"; } chop($update_target_sql); $update_target_sql .= " WHERE "; foreach my $keycol (@pkpos_array) { $update_target_sql .= $updrow_stmt->{NAME}->[$keycol - 1]." = ? AND " } $update_target_sql .= " 1 = 1 ";
if ($verbose > 1) { &prtit("SQL to update the rows on target table : \n $update_target_sql"); }
my $update_rows = $updrow_stmt->fetchall_arrayref(); my $update_stmt = $dstdbh->prepare($update_target_sql);
if ($arraydml) { my (%bind_array, @rowstats); for ($i=1; $i<=$colscnt; $i++) { my @colarr = (); $bind_array{$i} = \@colarr; } foreach my $insrow (@$update_rows) { for ($i=1; $i<=$colscnt; $i++) { my $colarr = $bind_array{$i}; push(@$colarr, $insrow->[$i-1]); } } $colscnt = @nonpkpos_array; for ($i=0; $i<$colscnt; $i++) { $update_stmt->bind_param_array($i+1, $bind_array{$nonpkpos_array[$i]}, $updrow_stmt->{TYPE}->[$nonpkpos_array[$i] - 1]); } $colscnt = @pkpos_array; for ($i=0; $i<$colscnt; $i++) { $update_stmt->bind_param_array($i+1+@nonpkpos_array, $bind_array{$pkpos_array[$i]}, $updrow_stmt->{TYPE}->[$pkpos_array[$i] - 1]); } my $rv = $update_stmt->execute_array({ArrayTupleStatus => \@rowstats});
if (!defined($rv)) { &prtit("Error when updating rows on target table $target_table"); my $fatal_error = 0; if ($update_stmt->err()) { &prtit($update_stmt->errstr()); $fatal_error = 1; } else { my $updcnt = @rowstats; for($i=0;$i<$updcnt;$i++) { if (ref $rowstats[$i]) { &prtit("[".$rowstats[$i]->[0]."] ".$rowstats[$i]->[1]); my $pk_value_list = "Primary Key Value: U,$target_table"; foreach my $tmp_pk (@pkpos_array) { $pk_value_list .= ", ".$bind_array{$tmp_pk}->[$i]; } $pk_value_list =~ s/: ,/: /; &prtit($pk_value_list); $fatal_error = 1 if ($rowstats[$i]->[0] != $unique_errno); } } } $update_stmt->finish(); $dstdbh->rollback(); $srcdbh->rollback(); return 2; } } else { foreach my $insrow (@$update_rows) { $colscnt = @nonpkpos_array; for ($i=0; $i<$colscnt; $i++) { $update_stmt->bind_param($i+1, $insrow->[$nonpkpos_array[$i]-1], $updrow_stmt->{TYPE}->[$nonpkpos_array[$i] - 1]); } $colscnt = @pkpos_array; for ($i=0; $i<$colscnt; $i++) { $update_stmt->bind_param($i+1+@nonpkpos_array, $insrow->[$pkpos_array[$i]-1], $updrow_stmt->{TYPE}->[$pkpos_array[$i] - 1]); } if (! ($update_stmt->execute())) { &prtit("Error when updating rows on target table $target_table"); &prtit($update_stmt->errstr()); my $pk_value_list = "Primary Key Value: U,$target_table"; foreach my $tmp_pk (@pkpos_array) { $pk_value_list .= ", ".$insrow->[$tmp_pk-1]; } $pk_value_list =~ s/: ,/: /; &prtit($pk_value_list); $update_stmt->finish(); $dstdbh->rollback(); $srcdbh->rollback(); return 2; } } } $update_stmt->finish(); } else { &prtit("Error when updating rows on target table $target_table"); $dstdbh->rollback(); $srcdbh->rollback(); return 2; } }
# get the new rows need to be inserted into target table $insert_mv_sql = "SELECT CURR.* \n FROM (".$view_sql.") CURR, \n ("; if ($updatefirst) { $insert_mv_sql .= "SELECT DISTINCT ".$pk_columns." FROM ".$mlog_table. " \n WHERE $mv_seqcol IN (SELECT $mv_seqcol FROM $mv_seqtab) \n AND ($mv_typcol = 'I')) MLOG "; } else { $insert_mv_sql .= "SELECT DISTINCT ".$pk_columns." FROM ".$mlog_table. " \n WHERE $mv_seqcol IN (SELECT $mv_seqcol FROM $mv_seqtab) \n AND ($mv_typcol <> 'D')) MLOG "; } $insert_mv_sql .= " \n WHERE "; foreach $keycol (@pkcol_array) { $insert_mv_sql .= "CURR.".$keycol."=MLOG.".$keycol." AND "; } $insert_mv_sql .= "1=1";
if ($verbose > 1) { &prtit("SQL to get the current value of rows need to be inserted : \n $insert_mv_sql"); }
# perform the insertion on target table my $insrow_stmt = $srcdbh->prepare($insert_mv_sql); if (defined($insrow_stmt)) { $insrow_stmt->execute();
if ($srcdbh->err()) { &prtit("Error when inserting rows on target table $target_table"); &prtit($srcdbh->errstr()); $dstdbh->rollback(); $srcdbh->rollback(); return 2; }
$colscnt = $insrow_stmt->{NUM_OF_FIELDS};
# Get the primary key column offset @pkpos_array=(); foreach my $keycol (@pkcol_array) { for ($i=0;$i<$colscnt; $i++) { push(@pkpos_array, ($i+1)) if ($keycol =~ /^$insrow_stmt->{NAME}->[$i]$/i); } }
$insert_target_sql = "INSERT INTO ".$target_table." ("; for ($i=0;$i<$colscnt; $i++) { $insert_target_sql .= $insrow_stmt->{NAME}->[$i]; $insert_target_sql .= "," if ($i < $colscnt - 1); } $insert_target_sql .= ") VALUES ("; for ($i=0;$i<$colscnt; $i++) { $insert_target_sql .= "?"; $insert_target_sql .= "," if ($i < $colscnt - 1); } $insert_target_sql .= ")";
if ($verbose > 1) { &prtit("SQL to insert rows on target table: \n $insert_target_sql"); }
my $insert_rows = $insrow_stmt->fetchall_arrayref();
my $insert_stmt = $dstdbh->prepare($insert_target_sql); if (defined($insert_stmt)) { if ($arraydml) { my (%bind_array, @rowstats); for ($i=1; $i<=$colscnt; $i++) { my @colarr = (); $bind_array{$i} = \@colarr; }
foreach my $insrow (@$insert_rows) { for ($i=1; $i<=$colscnt; $i++) { my $colarr = $bind_array{$i}; push(@$colarr, $insrow->[$i-1]); } }
for ($i=1; $i<=$colscnt; $i++) { $insert_stmt->bind_param_array($i, $bind_array{$i}, $insrow_stmt->{TYPE}->[$i - 1]); } my $rv = $insert_stmt->execute_array({ArrayTupleStatus => \@rowstats}); if (!defined($rv)) { &prtit("Error when inserting rows on target table $target_table"); my $fatal_error = 0; if ($insert_stmt->err()) { &prtit($insert_stmt->errstr()); $fatal_error = 1; } else { my $updcnt = @rowstats; for($i=0;$i<$updcnt;$i++) { if (ref $rowstats[$i]) { &prtit("[".$rowstats[$i]->[0]."] ".$rowstats[$i]->[1]); my $pk_value_list = "Primary Key Value: I,$target_table"; foreach my $tmp_pk (@pkpos_array) { $pk_value_list .= ", ".$bind_array{$tmp_pk}->[$i]; } $pk_value_list =~ s/: ,/: /; &prtit($pk_value_list); $fatal_error = 1 if ($rowstats[$i]->[0] != $unique_errno); } } } if ($fatal_error) { $insert_stmt->finish(); $dstdbh->rollback(); $srcdbh->rollback(); return 2; } } } else { foreach my $insrow (@$insert_rows) { for ($i=1; $i<=$colscnt; $i++) { $insert_stmt->bind_param($i, $insrow->[$i-1], $insrow_stmt->{TYPE}->[$i - 1]); } if (! ($insert_stmt->execute())) { &prtit("Error when inserting rows on target table $target_table"); &prtit($insert_stmt->errstr()); my $pk_value_list = "Primary Key Value: I,$target_table"; foreach my $tmp_pk (@pkpos_array) { $pk_value_list .= ", ".$insrow->[$tmp_pk-1]; } $pk_value_list =~ s/: ,/: /; &prtit($pk_value_list); if ($insert_stmt->err() != $unique_errno) { $insert_stmt->finish(); $dstdbh->rollback(); $srcdbh->rollback(); return 2; } } } } $insert_stmt->finish(); } $dstdbh->commit(); } else { &prtit("Error when inserting rows on target table $target_table"); $dstdbh->rollback(); $srcdbh->rollback(); return 2; }
$delete_mvlog_sql = "DELETE FROM ".$mlog_table." \n WHERE $mv_seqcol IN (SELECT $mv_seqcol FROM $mv_seqtab)"; if ($verbose > 1) { &prtit("SQL to delte applied materialized view log rows : \n $delete_mvlog_sql"); }
$srcdbh->do($delete_mvlog_sql); $srcdbh->do("DELETE FROM $mv_seqtab"); $srcdbh->commit();
&prtit("End copy from $base_table to $target_table, seq $min_seq to $max_seq."); return 1; }
sub formatNumber { my ($num) = @_;
if ($num < 10) { return "0".$num; } else { return $num; } }
sub prtit($) { my $prt = shift; my @timeData = localtime(time); my $date = formatNumber($timeData[4])."/".formatNumber($timeData[3])." ".formatNumber($timeData[2]).":".formatNumber($timeData[1]).":".formatNumber($timeData[0]);
my @values = split('\n', $prt); foreach my $val (@values) { #chop($val); print "$date - $val\n" if (!$quietmode); print LOG "$date - $val\n"; } }
sub refresh_all { my ($srcdbh, $dstdbh, $config_file) = @_; my ($line, $i, $rv);
my @configs = (); if (-r $config_file) { open CONFIG_FILE, $config_file; my @config_lines = <CONFIG_FILE>; close CONFIG_FILE;
foreach $line (@config_lines) { chomp($line); next if ( $line =~ /^\s*#/ ); next if ( $line =~ /^\s*$/ ); $line =~ s/ //g; my @elements = split "#", $line; my $element_len = @elements; next if ($element_len < 4); my (%cfgrow); $cfgrow{"SOURCE"} = $elements[0]; $cfgrow{"PKEY"} = $elements[1]; $cfgrow{"MVLOG"} = $elements[2]; $cfgrow{"TARGET"} = $elements[3]; $cfgrow{"WHERE"} = "1=1"; $cfgrow{"WHERE"} = $elements[4] if ($element_len>4); push (@configs, \%cfgrow); } $line = @configs;
&prtit("Replication started, $line tables in configuration file.");
for ($line = 1; $line <= $looptime; $line = $line + 1) { last if ($exitloop or $exitapp); foreach (@configs) { for ($i = 0; $i < 10; $i++) { my $rv = refresh_single($srcdbh, $dstdbh, $_->{"SOURCE"}, $_->{"PKEY"}, $_->{"MVLOG"}, $_->{"TARGET"}, $_->{"WHERE"});
last if ($rv == 0); $exitloop = 1 if ($rv == 2); last if ($rv == 2); } } sleep($waittime); } } }
sub do_opts { #-------------------------------------------------------------------------- # Split the options and load variables #-------------------------------------------------------------------------- # Exempt these varaibles from needing to be declared use vars '$opt_h','$opt_H','$opt_S','$opt_s', '$opt_T','$opt_t','$opt_C','$opt_c', '$opt_V','$opt_v','$opt_L','$opt_l', '$opt_W','$opt_w','$opt_B','$opt_b', '$opt_U','$opt_u','$opt_Q','$opt_q', '$opt_A','$opt_a','$opt_N','$opt_n';
my $returncode = getopts('hHaAuUqQ:S:s:T:t:C:c:V:v:L:l:W:w:B:b:N:n:');
&do_help if (defined $opt_h or defined $opt_H or $returncode != 1 );
$cfgfile = $opt_C if (defined $opt_C); $cfgfile = $opt_c if (defined $opt_c);
$verbose = $opt_V if (defined $opt_V); $verbose = $opt_v if (defined $opt_v);
$src_db = $opt_S if (defined $opt_S); $src_db = $opt_s if (defined $opt_s);
$dest_db = $opt_T if (defined $opt_T); $dest_db = $opt_t if (defined $opt_t);
$mv_seqtab = $opt_N if (defined $opt_N); $mv_seqtab = $opt_n if (defined $opt_n);
$longsize = 1024 * $opt_L if (defined $opt_L); $longsize = 1024 * $opt_l if (defined $opt_l);
$batchsize = $opt_B if (defined $opt_B); $batchsize = $opt_b if (defined $opt_b);
$waittime = $opt_W if (defined $opt_W); $waittime = $opt_w if (defined $opt_w);
$updatefirst = 1 if (defined $opt_U); $updatefirst = 1 if (defined $opt_u);
$quietmode = 1 if (defined $opt_Q); $quietmode = 1 if (defined $opt_q);
$arraydml = 1 if (defined $opt_A); $arraydml = 1 if (defined $opt_a);
&do_help if (!defined($src_db) or !defined($dest_db) or !defined($cfgfile));
my @src_conn = split "#", $src_db; my @dst_conn = split "#", $dest_db; my $src_conn_len = @src_conn; my $dst_conn_len = @dst_conn;
&do_help if ($src_conn_len != 3 or $dst_conn_len != 3); }
sub do_help { #-------------------------------------------------------------------------- # Give help screen and exit #-------------------------------------------------------------------------- $~ = "HEADER"; write; exit(); }
sub flock_lockfile { my ($lockfile) = @_; open LOCKF, ">".$lockfile || return 1; return 1 if !(flock(LOCKF, 2+4)); return 0; }
sub startMain { my @src_conn = split "#", $src_db; my @dst_conn = split "#", $dest_db; my $src_conn_len = @src_conn; my $dst_conn_len = @dst_conn;
&do_help if ($src_conn_len != 3 or $dst_conn_len != 3);
$exitloop = 0;
$src_conn[2] =~ s/^oracle:/Oracle:/i; $src_conn[2] =~ s/^mysql:/mysql:/i; $src_conn[2] =~ s/^odbc:/ODBC:/i;
$dst_conn[2] =~ s/^oracle:/Oracle:/i; $dst_conn[2] =~ s/^mysql:/mysql:/i; $dst_conn[2] =~ s/^odbc:/ODBC:/i;
my $srcdbh = DBI->connect("dbi:$src_conn[2]", $src_conn[0],$src_conn[1], {PrintError=>0}); if (!defined($srcdbh)) { &prtit("Cannot connect to source database."); # $exitapp = 1; return; } $srcdbh->{AutoCommit}=0; $srcdbh->{LongTruncOk} = 1; $srcdbh->{LongReadLen} = $longsize;
if ($src_conn[2] =~ /^Oracle:/i) { $srcdbh->do("ALTER SESSION SET NLS_DATE_FORMAT='YYYY-MM-DD HH24:MI:SS'"); }
if ($src_conn[2] =~ /^mysql:/i) { # Set transaction isolation level to read commited $srcdbh->do("SET TRANSACTION ISOLATION LEVEL READ COMMITTED"); }
my $dstdbh = DBI->connect("dbi:$dst_conn[2]", $dst_conn[0],$dst_conn[1], {PrintError=>0}); if (!defined($dstdbh)) { &prtit("Cannot connect to source database."); &prtit($dstdbh->errstr()) if (defined($dstdbh)); $srcdbh->rollback(); $srcdbh->disconnect(); # $exitapp = 1; return; } $dstdbh->{AutoCommit}=0; $dstdbh->{LongTruncOk} = 1; $dstdbh->{LongReadLen} = $longsize;
if ($dst_conn[2] =~ /^Oracle:/i) { $dstdbh->do("ALTER SESSION SET NLS_DATE_FORMAT='YYYY-MM-DD HH24:MI:SS'"); }
if ($dst_conn[2] =~ /^mysql:/i) { # Set duplicate key error number for MySQL database $unique_errno = 1062; # Set transaction isolation level to read commited $dstdbh->do("SET TRANSACTION ISOLATION LEVEL READ COMMITTED"); } elsif ($dst_conn[2] =~ /^oracle:/i) { # Set duplicate key error number for Oracle database $unique_errno = 1; }
$SIG{INT} = \&set_exitapp_flags; $SIG{STOP} = \&set_exitapp_flags; $SIG{QUIT} = \&set_exitapp_flags; $SIG{HUP} = 'IGNORE';
refresh_all($srcdbh, $dstdbh, $cfgfile);
$srcdbh->rollback(); $dstdbh->rollback();
$srcdbh->disconnect(); $dstdbh->disconnect(); }
while(1) { last if ($exitapp); &startMain(); sleep(2); } |