| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180 |
- <?php
- define('ADODB_REPLICATE',1.2);
- include_once(ADODB_DIR.'/adodb-datadict.inc.php');
- /*
- 1.2 9 June 2009
- Minor patches
- 1.1 8 June 2009
- Added $lastUpdateFld to replicatedata
- Added $rep->compat. If compat set to 1.0, then $lastUpdateFld not used during MergeData.
- 1.0 Apr 2009
- Added support for MFFA
- 0.9 ? 2008
- First release
- Note: this code assumes that comments such as / * * / ar`e allowed which works with:
- Note: this code assumes that comments such as / * * / are allowed which works with:
- mssql, postgresql, oracle, mssql
- Replication engine to
- - copy table structures and data from different databases (e.g. mysql to oracle)
- for replication purposes
- - generate CREATE TABLE, CREATE INDEX, INSERT ... for installation scripts
- Table Structure copying includes
- - fields and limited subset of types
- - optional default values
- - indexes
- - but not constraints
- Two modes of data copy:
- ReplicateData
- - Copy from src to dest, with update of status of copy back to src,
- with configurable src SELECT where clause
- MergeData
- - Copy from src to dest based on last mod date field and/or copied flag field
- Default settings are
- - do not execute, generate sql ($rep->execute = false)
- - do not delete records in dest table first ($rep->deleteFirst = false).
- if $rep->deleteFirst is true and primary keys are defined,
- then no deletion will occur unless *INSERTONLY* is defined in pkey array
- - only commit once at the end of every ReplicateData ($rep->commitReplicate = true)
- - do not autocommit every x records processed ($rep->commitRecs = -1)
- - even if error occurs on one record, continue copying remaining records ($rep->neverAbort = true)
- - debugging turned off ($rep->debug = false)
- */
- class ADODB_Replicate {
- var $connSrc;
- var $connDest;
- var $connSrc2 = false;
- var $connDest2 = false;
- var $ddSrc;
- var $ddDest;
- var $execute = false;
- var $debug = false;
- var $deleteFirst = false;
- var $commitReplicate = true; // commit at end of replicatedata
- var $commitRecs = -1; // only commit at end of ReplicateData()
- var $selFilter = false;
- var $fieldFilter = false;
- var $indexFilter = false;
- var $updateFilter = false;
- var $insertFilter = false;
- var $updateSrcFn = false;
- var $limitRecs = false;
- var $neverAbort = true;
- var $copyTableDefaults = false; // turn off because functions defined as defaults will not work when copied
- var $errHandler = false; // name of error handler function, if used.
- var $htmlSpecialChars = true; // if execute false, then output with htmlspecialchars enabled.
- // Will autoconfigure itself. No need to modify
- var $trgSuffix = '_mrgTr';
- var $idxSuffix = '_mrgidx';
- var $trLogic = '1 = 1';
- var $datesAreTimeStamps = false;
- var $oracleSequence = false;
- var $readUncommitted = false; // read without obeying shared locks for fast select (mssql)
- var $compat = false;
- // connSrc2 and connDest2 are only required if the db driver
- // does not allow updates back to src db in first connection (the select connection),
- // so we need 2nd connection
- function __construct($connSrc, $connDest, $connSrc2=false, $connDest2=false)
- {
- if (strpos($connSrc->databaseType,'odbtp') !== false) {
- $connSrc->_bindInputArray = false; # bug in odbtp, binding fails
- }
- if (strpos($connDest->databaseType,'odbtp') !== false) {
- $connDest->_bindInputArray = false; # bug in odbtp, binding fails
- }
- $this->connSrc = $connSrc;
- $this->connDest = $connDest;
- $this->connSrc2 = ($connSrc2) ? $connSrc2 : $connSrc;
- $this->connDest2 = ($connDest2) ? $connDest2 : $connDest;
- $this->ddSrc = NewDataDictionary($connSrc);
- $this->ddDest = NewDataDictionary($connDest);
- $this->htmlSpecialChars = isset($_SERVER['HTTP_HOST']);
- }
- function ExecSQL($sql)
- {
- if (!is_array($sql)) $sql[] = $sql;
- $ret = true;
- foreach($sql as $s)
- if (!$this->execute) echo "<pre>",$s.";\n</pre>";
- else {
- $ok = $this->connDest->Execute($s);
- if (!$ok)
- if ($this->neverAbort) $ret = false;
- else return false;
- }
- return $ret;
- }
- /*
- We assume replication between $table and $desttable only works if the field names and types match for both tables.
- Also $table and desttable can have different names.
- */
- function CopyTableStruct($table,$desttable='')
- {
- $sql = $this->CopyTableStructSQL($table,$desttable);
- if (empty($sql)) return false;
- return $this->ExecSQL($sql);
- }
- function RunFieldFilter(&$fld, $mode = '')
- {
- if ($this->fieldFilter) {
- $fn = $this->fieldFilter;
- return $fn($fld, $mode);
- } else
- return $fld;
- }
- function RunUpdateFilter($table, $fld, $val)
- {
- if ($this->updateFilter) {
- $fn = $this->updateFilter;
- return $fn($table, $fld, $val);
- } else
- return $val;
- }
- function RunInsertFilter($table, $fld, &$val)
- {
- if ($this->insertFilter) {
- $fn = $this->insertFilter;
- return $fn($table, $fld, $val);
- } else
- return $fld;
- }
- /*
- $mode = INS or UPD
- The lastUpdateFld holds the field that counts the number of updates or the date of last mod. This ensures that
- if the rec was modified after replicatedata retrieves the data but before we update back the src record,
- we don't set the copiedflag='Y' yet.
- */
- function RunUpdateSrcFn($srcdb, $table, $fldoffsets, $row, $where, $mode, $dest_insertid=null, $lastUpdateFld='')
- {
- if (!$this->updateSrcFn) return;
- $bindarr = array();
- foreach($fldoffsets as $k) {
- $bindarr[$k] = $row[$k];
- }
- $last = sizeof($row);
- if ($lastUpdateFld && $row[$last-1]) {
- $ds = $row[$last-1];
- if (strpos($ds,':') !== false) $s = $srcdb->DBTimeStamp($ds);
- else $s = $srcdb->qstr($ds);
- $where = "WHERE $lastUpdateFld = $s and $where";
- } else
- $where = "WHERE $where";
- $fn = $this->updateSrcFn;
- if (is_array($fn)) {
- if (sizeof($fn) == 1) $set = reset($fn);
- else $set = @$fn[$mode];
- if ($set) {
- if (strlen($dest_insertid) == 0) $dest_insertid = 'null';
- $set = str_replace('$INSERT_ID',$dest_insertid,$set);
- $sql = "UPDATE $table SET $set $where ";
- $ok = $srcdb->Execute($sql,$bindarr);
- if (!$ok) {
- echo $srcdb->ErrorMsg(),"<br>\n";
- die();
- }
- }
- } else $fn($srcdb, $table, $row, $where, $bindarr, $mode, $dest_insertid);
- }
- function CopyTableStructSQL($table, $desttable='',$dropdest =false)
- {
- if (!$desttable) {
- $desttable = $table;
- $prefixidx = '';
- } else
- $prefixidx = $desttable;
- $conn = $this->connSrc;
- $types = $conn->MetaColumns($table);
- if (!$types) {
- echo "$table does not exist in source db<br>\n";
- return array();
- }
- if (!$dropdest && $this->connDest->MetaColumns($desttable)) {
- echo "$desttable already exists in dest db<br>\n";
- return array();
- }
- if ($this->debug) var_dump($types);
- $sa = array();
- $idxcols = array();
- foreach($types as $name => $t) {
- $s = '';
- $mt = $this->ddSrc->MetaType($t->type);
- $len = $t->max_length;
- $fldname = $this->RunFieldFilter($t->name,'TABLE');
- if (!$fldname) continue;
- $s .= $fldname . ' '.$mt;
- if (isset($t->scale)) $precision = '.'.$t->scale;
- else $precision = '';
- if ($mt == 'C' or $mt == 'X') $s .= "($len)";
- else if ($mt == 'N' && $precision) $s .= "($len$precision)";
- if ($mt == 'R') $idxcols[] = $fldname;
- if ($this->copyTableDefaults) {
- if (isset($t->default_value)) {
- $v = $t->default_value;
- if ($mt == 'C' or $mt == 'X') $v = $this->connDest->qstr($v); // might not work as this could be function
- $s .= ' DEFAULT '.$v;
- }
- }
- $sa[] = $s;
- }
- $s = implode(",\n",$sa);
- // dump adodb intermediate data dictionary format
- if ($this->debug) echo '<pre>'.$s.'</pre>';
- $sqla = $this->ddDest->CreateTableSQL($desttable,$s);
- /*
- if ($idxcols) {
- $idxoptions = array('UNIQUE'=>1);
- $sqla2 = $this->ddDest->_IndexSQL($table.'_'.$fldname.'_SERIAL', $desttable, $idxcols,$idxoptions);
- $sqla = array_merge($sqla,$sqla2);
- }*/
- $idxs = $conn->MetaIndexes($table);
- if ($idxs)
- foreach($idxs as $name => $iarr) {
- $idxoptions = array();
- $fldnames = array();
- if(!empty($iarr['unique'])) {
- $idxoptions['UNIQUE'] = 1;
- }
- foreach($iarr['columns'] as $fld) {
- $fldnames[] = $this->RunFieldFilter($fld,'TABLE');
- }
- $idxname = $prefixidx.str_replace($table,$desttable,$name);
- if (!empty($this->indexFilter)) {
- $fn = $this->indexFilter;
- $idxname = $fn($desttable,$idxname,$fldnames,$idxoptions);
- }
- $sqla2 = $this->ddDest->_IndexSQL($idxname, $desttable, $fldnames,$idxoptions);
- $sqla = array_merge($sqla,$sqla2);
- }
- return $sqla;
- }
- function _clearcache()
- {
- }
- function _concat($v)
- {
- return $this->connDest->concat("' ","chr(".ord($v).")","'");
- }
- function fixupbinary($v)
- {
- return str_replace(
- array("\r","\n"),
- array($this->_concat("\r"),$this->_concat("\n")),
- $v );
- }
- function SwapDBs()
- {
- $o = $this->connSrc;
- $this->connSrc = $this->connDest;
- $this->connDest = $o;
- $o = $this->connSrc2;
- $this->connSrc2 = $this->connDest2;
- $this->connDest2 = $o;
- $o = $this->ddSrc;
- $this->ddSrc = $this->ddDest;
- $this->ddDest = $o;
- }
- /*
- // if no uniqflds defined, then all desttable recs will be deleted before insert
- // $where clause must include the WHERE word if used
- // if $this->commitRecs is set to a +ve value, then it will autocommit every $this->commitRecs records
- // -- this should never be done with 7x24 db's
- Returns an array:
- $arr[0] = true if no error, false if error
- $arr[1] = number of recs processed
- $arr[2] = number of successful inserts
- $arr[3] = number of successful updates
- ReplicateData() params:
- $table = src table name
- $desttable = dest table name, leave blank to use src table name
- $uniqflds = array() = an array. If set, then inserts and updates will occur. eg. array('PK1', 'PK2');
- To prevent updates to desttable (allow only to src table), add '*INSERTONLY*' or '*ONLYINSERT*' to array.
- Sometimes you are replicating a src table with an autoinc primary key.
- You sometimes create recs in the dest table. The dest table has to retrieve the
- src table's autoinc key (stored in a 2nd field) so you can match the two tables.
- To define this, and the uniqflds contains nested arrays. Copying from autoinc table to other table:
- array(array($destpkey), array($destfld_holds_src_autoinc_pkey))
- Copying from normal table to autoinc table:
- array(array($destpkey), array(), array($srcfld_holds_dest_autoinc_pkey))
- $where = where clause for SELECT from $table $where. Include the WHERE reserved word in beginning.
- You can put ORDER BY at the end also
- $ignoreflds = array(), list of fields to ignore. e.g. array('FLD1',FLD2');
- $dstCopyDateFld = date field on $desttable to update with current date
- $extraflds allows you to add additional flds to insert/update. Format
- array(fldname => $fldval)
- $fldval itself can be an array or a string. If an array, then
- $extraflds = array($fldname => array($insertval, $updateval))
- Thus we have the following behaviours:
- a. Delete all data in $desttable then insert from src $table
- $rep->execute = true;
- $rep->ReplicateData($table, $desttable)
- b. Update $desttable if record exists (based on $uniqflds), otherwise insert.
- $rep->execute = true;
- $rep->ReplicateData($table, $desttable, $array($pkey1, $pkey2))
- c. Select from src $table all data modified since a date. Then update $desttable
- if record exists (based on $uniqflds), otherwise insert
- $rep->execute = true;
- $rep->ReplicateData($table, $desttable, array($pkey1, $pkey2), "WHERE update_datetime_fld > $LAST_REFRESH")
- d. Insert all records into $desttable modified after a certain id (or time) in src $table:
- $rep->execute = true;
- $rep->ReplicateData($table, $desttable, false, "WHERE id_fld > $LAST_ID_SAVED", true);
- For (a) to (d), returns array: array($boolean_ok_fail, $no_recs_selected_from_src_db, $no_recs_inserted, $no_recs_updated);
- e. Generate sample SQL:
- $rep->execute = false;
- $rep->ReplicateData(....);
- This returns $array, which contains:
- $array['SEL'] = select stmt from src db
- $array['UPD'] = update stmt to dest db
- $array['INS'] = insert stmt to dest db
- Error-handling
- ==============
- Default is never abort if error occurs. You can set $rep->neverAbort = false; to force replication to abort if an error occurs.
- Value Filtering
- ========
- Sometimes you might need to modify/massage the data before the code works. Assume that the value used for True and False is
- 'T' and 'F' in src DB, but is 'Y' and 'N' in dest DB for field[2] in select stmt. You can do this by
- $rep->filterSelect = 'filter';
- $rep->ReplicateData(...);
- function filter($table,& $fields, $deleteFirst)
- {
- if ($table == 'SOMETABLE') {
- if ($fields[2] == 'T') $fields[2] = 'Y';
- else if ($fields[2] == 'F') $fields[2] = 'N';
- }
- }
- We pass in $deleteFirst as that determines the order of the fields (which are numeric-based):
- TRUE: the order of fields matches the src table order
- FALSE: the order of fields is all non-primary key fields first, followed by primary key fields. This is because it needs
- to match the UPDATE statement, which is UPDATE $table SET f2 = ?, f3 = ? ... WHERE f1 = ?
- Name Filtering
- =========
- Sometimes field names that are legal in one RDBMS can be illegal in another.
- We allow you to handle this using a field filter.
- Also if you don't want to replicate certain fields, just return false.
- $rep->fieldFilter = 'ffilter';
- function ffilter(&$fld,$mode)
- {
- $uf = strtoupper($fld);
- switch($uf) {
- case 'GROUP':
- if ($mode == 'SELECT') $fld = '"Group"';
- return 'GroupFld';
- case 'PRIVATEFLD': # do not replicate
- return false;
- }
- return $fld;
- }
- UPDATE FILTERING
- ================
- Sometimes, when we want to update
- UPDATE table SET fld = val WHERE ....
- we want to modify val. To do so, define
- $rep->updateFilter = 'ufilter';
- function ufilter($table, $fld, $val)
- {
- return "nvl($fld, $val)";
- }
- Sending back audit info back to src Table
- =========================================
- Use $rep->updateSrcFn. This can be an array of strings, or the name of a php function to call.
- If an array of strings is defined, then it will perform an update statement...
- UPDATE srctable SET $string WHERE ....
- With $string set to the array you define. If a new record was inserted into desttable, then the
- 'INS' string is used ($INSERT_ID will be replaced with the real INSERT_ID, if any),
- and if an update then use the 'UPD' string.
- array(
- 'INS' => 'insertid = $INSERT_ID, copieddate=getdate(), copied = 1',
- 'UPD' => 'copieddate=getdate(), copied = 1'
- )
- If a single string array is defined, then it will be used for both insert and update.
- array('copieddate=getdate(), copied = 1')
- Note that the where clause is automatically defined by the system.
- If $rep->updateSrcFn is a PHP function name, then it will be called with the following params:
- $fn($srcConnection, $tableName, $row, $where, $bindarr, $mode, $dest_insertid)
- $srcConnection - source db connection
- $tableName - source tablename
- $row - array holding records updated into dest
- $where - where clause to be used (uses bind vars)
- $bindarr - array holding bind variables for where clause
- $mode - INS or UPD
- $dest_insertid - when mode=INS, then the insert_id is stored here.
- oracle mssql
- ---> insert
- mssqlid <--- insert_id
- ----> update with mssqlid
- <---- update with mssqlid
- TODO: add src pkey and dest pkey for updates. Also sql stmt needs to be tuned, so dest pkey, src pkey
- */
- function ReplicateData($table, $desttable = '', $uniqflds = array(), $where = '',$ignore_flds = array(),
- $dstCopyDateFld='', $extraflds = array(), $lastUpdateFld = '')
- {
- if (is_array($where)) {
- $wheresrc = $where[0];
- $wheredest = $where[1];
- } else {
- $wheresrc = $wheredest = $where;
- }
- $dstCopyDateName = $dstCopyDateFld;
- $dstCopyDateFld = strtoupper($dstCopyDateFld);
- $this->_clearcache();
- if (is_string($uniqflds) && strlen($uniqflds)) $uniqflds = array($uniqflds);
- if (!$desttable) $desttable = $table;
- $uniq = array();
- if ($uniqflds) {
- if (is_array(reset($uniqflds))) {
- /*
- primary key of src and dest tables differ. This means when we perform the select stmts
- we retrieve both keys. Then any insert statement will have to ignore one array element.
- Any update statement will need to use a different where clause
- */
- $destuniqflds = $uniqflds[0];
- if (sizeof($uniqflds)>1 && $uniqflds[1]) // srckey field name in dest table
- $srcuniqflds = $uniqflds[1];
- else
- $srcuniqflds = array();
- if (sizeof($uniqflds)>2)
- $srcPKDest = reset($uniqflds[2]);
- } else {
- $destuniqflds = $uniqflds;
- $srcuniqflds = array();
- }
- $onlyInsert = false;
- foreach($destuniqflds as $k => $u) {
- if ($u == '*INSERTONLY*' || $u == '*ONLYINSERT*') {
- $onlyInsert = true;
- continue;
- }
- $uniq[strtoupper($u)] = $k;
- }
- $deleteFirst = $this->deleteFirst;
- } else {
- $deleteFirst = true;
- }
- if ($deleteFirst) $onlyInsert = true;
- if ($ignore_flds) {
- foreach($ignore_flds as $u) {
- $ignoreflds[strtoupper($u)] = 1;
- }
- } else
- $ignoreflds = array();
- $src = $this->connSrc;
- $dest = $this->connDest;
- $src2 = $this->connSrc2;
- $dest->noNullStrings = false;
- $src->noNullStrings = false;
- $src2->noNullStrings = false;
- if ($src === $dest) $this->execute = false;
- $types = $src->MetaColumns($table);
- if (!$types) {
- echo "Source $table does not exist<br>\n";
- return array();
- }
- $dtypes = $dest->MetaColumns($desttable);
- if (!$dtypes) {
- echo "Destination $desttable does not exist<br>\n";
- return array();
- }
- $sa = array();
- $selflds = array();
- $wheref = array();
- $wheres = array();
- $srcwheref = array();
- $fldoffsets = array();
- $k = 0;
- foreach($types as $name => $t) {
- $name2 = strtoupper($this->RunFieldFilter($name,'SELECT'));
- // handle quotes
- if ($name2 && $name2[0] == '"' && $name2[strlen($name2)-1] == '"') $name22 = substr($name2,1,strlen($name2)-2);
- elseif ($name2 && $name2[0] == '`' && $name2[strlen($name2)-1] == '`') $name22 = substr($name2,1,strlen($name2)-2);
- else $name22 = $name2;
- //else $name22 = $name2; // this causes problem for quotes strip above
- if (!isset($dtypes[($name22)]) || !$name2) {
- if ($this->debug) echo " Skipping $name ==> $name2 as not in destination $desttable<br>";
- continue;
- }
- if ($name2 == $dstCopyDateFld) {
- $dstCopyDateName = $t->name;
- continue;
- }
- $fld = $t->name;
- $fldval = $t->name;
- $mt = $src->MetaType($t->type);
- if ($this->datesAreTimeStamps && $mt == 'D') $mt = 'T';
- if ($mt == 'D') $fldval = $dest->DBDate($fldval);
- elseif ($mt == 'T') $fldval = $dest->DBTimeStamp($fldval);
- $ufld = strtoupper($fld);
- if (isset($ignoreflds[($name2)]) && !isset($uniq[$ufld])) {
- continue;
- }
- if ($this->debug) echo " field=$fld type=$mt fldval=$fldval<br>";
- if (!isset($uniq[$ufld])) {
- $selfld = $fld;
- $fld = $this->RunFieldFilter($selfld,'SELECT');
- $selflds[] = $selfld;
- $p = $dest->Param($k);
- if ($mt == 'D') $p = $dest->DBDate($p, true);
- else if ($mt == 'T') $p = $dest->DBTimeStamp($p, true);
- # UPDATES
- $sets[] = "$fld = ".$this->RunUpdateFilter($desttable, $fld, $p);
- # INSERTS
- $insflds[] = $this->RunInsertFilter($desttable,$fld, $p); $params[] = $p;
- $k++;
- } else {
- $fld = $this->RunFieldFilter($fld);
- $wheref[] = $fld;
- if (!empty($srcuniqflds)) $srcwheref[] = $srcuniqflds[$uniq[$ufld]];
- if ($mt == 'C') { # normally we don't include the primary key in the insert if it is numeric, but ok if varchar
- $insertpkey = true;
- }
- }
- }
- foreach($extraflds as $fld => $evals) {
- if (!is_array($evals)) $evals = array($evals, $evals);
- $insflds[] = $this->RunInsertFilter($desttable,$fld, $p); $params[] = $evals[0];
- $sets[] = "$fld = ".$evals[1];
- }
- if ($dstCopyDateFld) {
- $sets[] = "$dstCopyDateName = ".$dest->sysTimeStamp;
- $insflds[] = $this->RunInsertFilter($desttable,$dstCopyDateName, $p); $params[] = $dest->sysTimeStamp;
- }
- if (!empty($srcPKDest)) {
- $selflds[] = $srcPKDest;
- $fldoffsets = array($k+1);
- }
- foreach($wheref as $uu => $fld) {
- $p = $dest->Param($k);
- $sp = $src->Param($k);
- if (!empty($srcuniqflds)) {
- if ($uu > 1) die("Only one primary key for srcuniqflds allowed currently");
- $destsrckey = reset($srcuniqflds);
- $wheres[] = reset($srcuniqflds).' = '.$p;
- $insflds[] = $this->RunInsertFilter($desttable,$destsrckey, $p);
- $params[] = $p;
- } else {
- $wheres[] = $fld.' = '.$p;
- if (!isset($ignoreflds[strtoupper($fld)]) || !empty($insertpkey)) {
- $insflds[] = $this->RunInsertFilter($desttable,$fld, $p);
- $params[] = $p;
- }
- }
- $selflds[] = $fld;
- $srcwheres[] = $fld.' = '.$sp;
- $fldoffsets[] = $k;
- $k++;
- }
- if (!empty($srcPKDest)) {
- $fldoffsets = array($k);
- $srcwheres = array($fld.'='.$src->Param($k));
- $k++;
- }
- if ($lastUpdateFld) {
- $selflds[] = $lastUpdateFld;
- } else
- $selflds[] = 'null as Z55_DUMMY_LA5TUPD';
- $insfldss = implode(', ', $insflds);
- $fldss = implode(', ', $selflds);
- $setss = implode(', ', $sets);
- $paramss = implode(', ', $params);
- $wheress = implode(' AND ', $wheres);
- if (isset($srcwheres))
- $srcwheress = implode(' AND ',$srcwheres);
- $seltable = $table;
- if ($this->readUncommitted && strpos($src->databaseType,'mssql')) $seltable .= ' with (NOLOCK)';
- $sa['SEL'] = "SELECT $fldss FROM $seltable $wheresrc";
- $sa['INS'] = "INSERT INTO $desttable ($insfldss) VALUES ($paramss) /**INS**/";
- $sa['UPD'] = "UPDATE $desttable SET $setss WHERE $wheress /**UPD**/";
- $DB1 = "/* <font color=green> Source DB - sample sql in case you need to adapt code\n\n";
- $DB2 = "/* <font color=green> Dest DB - sample sql in case you need to adapt code\n\n";
- if (!$this->execute) echo '/*<style>
- pre {
- white-space: pre-wrap; /* css-3 */
- white-space: -moz-pre-wrap !important; /* Mozilla, since 1999 */
- white-space: -pre-wrap; /* Opera 4-6 */
- white-space: -o-pre-wrap; /* Opera 7 */
- word-wrap: break-word; /* Internet Explorer 5.5+ */
- }
- </style><pre>*/
- ';
- if ($deleteFirst && $this->deleteFirst) {
- $where = preg_replace('/[ \n\r\t]+order[ \n\r\t]+by.*$/i', '', $where);
- $sql = "DELETE FROM $desttable $wheredest\n";
- if (!$this->execute) echo $DB2,'</font>*/',$sql,"\n";
- else $dest->Execute($sql);
- }
- global $ADODB_COUNTRECS;
- $err = false;
- $savemode = $src->setFetchMode(ADODB_FETCH_NUM);
- $ADODB_COUNTRECS = false;
- if (!$this->execute) {
- echo $DB1,$sa['SEL'],"</font>\n*/\n\n";
- echo $DB2,$sa['INS'],"</font>\n*/\n\n";
- $suffix = ($onlyInsert) ? ' PRIMKEY=?' : '';
- echo $DB2,$sa['UPD'],"$suffix</font>\n*/\n\n";
- $rs = $src->Execute($sa['SEL']);
- $cnt = 1;
- $upd = 0;
- $ins = 0;
- $sqlarr = explode('?',$sa['INS']);
- $nparams = sizeof($sqlarr)-1;
- $useQmark = $dest && ($dest->dataProvider != 'oci8');
- while ($rs && !$rs->EOF) {
- if ($useQmark) {
- $sql = ''; $i = 0;
- $arr = array_reverse($rs->fields);
- foreach ($arr as $v) {
- $sql .= $sqlarr[$i];
- // from Ron Baldwin <ron.baldwin#sourceprose.com>
- // Only quote string types
- $typ = gettype($v);
- if ($typ == 'string')
- //New memory copy of input created here -mikefedyk
- $sql .= $dest->qstr($v);
- else if ($typ == 'double')
- $sql .= str_replace(',','.',$v); // locales fix so 1.1 does not get converted to 1,1
- else if ($typ == 'boolean')
- $sql .= $v ? $dest->true : $dest->false;
- else if ($typ == 'object') {
- if (method_exists($v, '__toString')) $sql .= $dest->qstr($v->__toString());
- else $sql .= $dest->qstr((string) $v);
- } else if ($v === null)
- $sql .= 'NULL';
- else
- $sql .= $v;
- $i += 1;
- if ($i == $nparams) break;
- } // while
- if (isset($sqlarr[$i])) {
- $sql .= $sqlarr[$i];
- }
- $INS = $sql;
- } else {
- $INS = $sa['INS'];
- $arr = array_reverse($rs->fields);
- foreach($arr as $k => $v) { // only works on oracle currently
- $k = sizeof($arr)-$k-1;
- $v = str_replace(":","%~%COLON%!%",$v);
- $INS = str_replace(':'.$k,$this->fixupbinary($dest->qstr($v)),$INS);
- }
- $INS = str_replace("%~%COLON%!%",":",$INS);
- if ($this->htmlSpecialChars) $INS = htmlspecialchars($INS);
- }
- echo "-- $cnt\n",$INS,";\n\n";
- $cnt += 1;
- $ins += 1;
- $rs->MoveNext();
- }
- $src->setFetchMode($savemode);
- return $sa;
- } else {
- $saved = $src->debug;
- #$src->debug=1;
- if ($this->limitRecs>100)
- $rs = $src->SelectLimit($sa['SEL'],$this->limitRecs);
- else
- $rs = $src->Execute($sa['SEL']);
- $src->debug = $saved;
- if (!$rs) {
- if ($this->errHandler) $this->_doerr('SEL',array());
- return array(0,0,0,0);
- }
- if ($this->commitReplicate || $commitRecs > 0) {
- $dest->BeginTrans();
- if ($this->updateSrcFn) $src2->BeginTrans();
- }
- if ($this->updateSrcFn && strpos($src2->databaseType,'mssql') !== false) {
- # problem is writers interfere with readers in mssql
- $rs = $src->_rs2rs($rs);
- }
- $cnt = 0;
- $upd = 0;
- $ins = 0;
- $sizeofrow = sizeof($selflds);
- $fn = $this->selFilter;
- $commitRecs = $this->commitRecs;
- $saved = $dest->debug;
- if ($this->deleteFirst) $onlyInsert = true;
- while ($origrow = $rs->FetchRow()) {
- if ($dest->debug) {flush(); @ob_flush();}
- if ($fn) {
- if (!$fn($desttable, $origrow, $deleteFirst, $this, $selflds)) continue;
- }
- $doinsert = true;
- $row = array_slice($origrow,0,$sizeofrow-1);
- if (!$onlyInsert) {
- $doinsert = false;
- $upderr = false;
- if (isset($srcPKDest)) {
- if (is_null($origrow[$sizeofrow-3])) {
- $doinsert = true;
- $upderr = true;
- }
- }
- if (!$upderr && !$dest->Execute($sa['UPD'],$row)) {
- $err = true;
- $upderr = true;
- if ($this->errHandler) $this->_doerr('UPD',$row);
- if (!$this->neverAbort) break;
- }
- if ($upderr || $dest->Affected_Rows() == 0) {
- $doinsert = true;
- } else {
- if (!empty($uniqflds)) $this->RunUpdateSrcFn($src2, $table, $fldoffsets, $origrow, $srcwheress, 'UPD', null, $lastUpdateFld);
- $upd += 1;
- }
- }
- if ($doinsert) {
- $inserr = false;
- if (isset($srcPKDest)) {
- $row = array_slice($origrow,0,$sizeofrow-2);
- }
- if (! $dest->Execute($sa['INS'],$row)) {
- $err = true;
- $inserr = true;
- if ($this->errHandler) $this->_doerr('INS',$row);
- if ($this->neverAbort) continue;
- else break;
- } else {
- if ($dest->dataProvider == 'oci8') {
- if ($this->oracleSequence) $lastid = $dest->GetOne("select ".$this->oracleSequence.".currVal from dual");
- else $lastid = 'null';
- } else {
- $lastid = $dest->Insert_ID();
- }
- if (!$inserr && !empty($uniqflds)) {
- $this->RunUpdateSrcFn($src2, $table, $fldoffsets, $origrow, $srcwheress, 'INS', $lastid,$lastUpdateFld);
- }
- $ins += 1;
- }
- }
- $cnt += 1;
- if ($commitRecs > 0 && ($cnt % $commitRecs) == 0) {
- $dest->CommitTrans();
- $dest->BeginTrans();
- if ($this->updateSrcFn) {
- $src2->CommitTrans();
- $src2->BeginTrans();
- }
- }
- } // while
- if ($this->commitReplicate || $commitRecs > 0) {
- if (!$this->neverAbort && $err) {
- $dest->RollbackTrans();
- if ($this->updateSrcFn) $src2->RollbackTrans();
- } else {
- $dest->CommitTrans();
- if ($this->updateSrcFn) $src2->CommitTrans();
- }
- }
- }
- if ($cnt != $ins + $upd) echo "<p>ERROR: $cnt != INS $ins + UPD $upd</p>";
- $src->setFetchMode($savemode);
- return array(!$err, $cnt, $ins, $upd);
- }
- // trigger support only for sql server and oracle
- // need to add
- function MergeSrcSetup($srcTable, $pkeys, $srcUpdateDateFld, $srcCopyDateFld, $srcCopyFlagFld,
- $srcCopyFlagType='C(1)', $srcCopyFlagVals = array('Y','N','P','='))
- {
- $sqla = array();
- $src = $this->connSrc;
- $idx = $srcTable.'_mrgIdx';
- $cols = $src->MetaColumns($srcTable);
- #adodb_pr($cols);
- if (!isset($cols[strtoupper($srcUpdateDateFld)])) {
- $sqla = $this->ddSrc->AddColumnSQL($srcTable, "$srcUpdateDateFld TS DEFTIMESTAMP");
- foreach($sqla as $sql) $src->Execute($sql);
- }
- if ($srcCopyDateFld && !isset($cols[strtoupper($srcCopyDateFld)])) {
- $sqla = $this->ddSrc->AddColumnSQL($srcTable, "$srcCopyDateFld TS DEFTIMESTAMP");
- foreach($sqla as $sql) $src->Execute($sql);
- }
- $sysdate = $src->sysTimeStamp;
- $arrv0 = $src->qstr($srcCopyFlagVals[0]);
- $arrv1 = $src->qstr($srcCopyFlagVals[1]);
- $arrv2 = $src->qstr($srcCopyFlagVals[2]);
- $arrv3 = $src->qstr($srcCopyFlagVals[3]);
- if ($srcCopyFlagFld && !isset($cols[strtoupper($srcCopyFlagFld)])) {
- $sqla = $this->ddSrc->AddColumnSQL($srcTable, "$srcCopyFlagFld $srcCopyFlagType DEFAULT $arrv1");
- foreach($sqla as $sql) $src->Execute($sql);
- }
- $sqla = array();
- $name = "{$srcTable}_mrgTr";
- if (is_array($pkeys) && strpos($src->databaseType,'mssql') !== false) {
- $pk = reset($pkeys);
- #$sqla[] = "DROP TRIGGER $name";
- $sqltr = "
- TRIGGER $name
- ON $srcTable /* for data replication and merge */
- AFTER UPDATE
- AS
- UPDATE $srcTable
- SET
- $srcUpdateDateFld = case when I.$srcCopyFlagFld = $arrv2 or I.$srcCopyFlagFld = $arrv3 then I.$srcUpdateDateFld
- else $sysdate end,
- $srcCopyFlagFld = case
- when I.$srcCopyFlagFld = $arrv2 then $arrv0
- when I.$srcCopyFlagFld = $arrv3 then D.$srcCopyFlagFld
- else $arrv1 end
- FROM $srcTable S Join Inserted AS I on I.$pk = S.$pk
- JOIN Deleted as D ON I.$pk = D.$pk
- WHERE I.$srcCopyFlagFld = D.$srcCopyFlagFld or I.$srcCopyFlagFld = $arrv2
- or I.$srcCopyFlagFld = $arrv3 or I.$srcCopyFlagFld is null
- ";
- $sqla[] = 'CREATE '.$sqltr; // first if does not exists
- $sqla[] = 'ALTER '.$sqltr; // second if it already exists
- } else if (strpos($src->databaseType,'oci') !== false) {
- if (strlen($srcTable)>22) $tableidx = substr($srcTable,0,16).substr(crc32($srcTable),6);
- else $tableidx = $srcTable;
- $name = $tableidx.$this->trgSuffix;
- $idx = $tableidx.$this->idxSuffix;
- $sqla[] = "
- CREATE OR REPLACE TRIGGER $name /* for data replication and merge */
- BEFORE UPDATE ON $srcTable REFERENCING NEW AS NEW OLD AS OLD
- FOR EACH ROW
- BEGIN
- if :new.$srcCopyFlagFld = $arrv2 then
- :new.$srcCopyFlagFld := $arrv0;
- elsif :new.$srcCopyFlagFld = $arrv3 then
- :new.$srcCopyFlagFld := :old.$srcCopyFlagFld;
- elsif :old.$srcCopyFlagFld = :new.$srcCopyFlagFld or :new.$srcCopyFlagFld is null then
- if $this->trLogic then
- :new.$srcUpdateDateFld := $sysdate;
- :new.$srcCopyFlagFld := $arrv1;
- end if;
- end if;
- END;
- ";
- }
- foreach($sqla as $sql) $src->Execute($sql);
- if ($srcCopyFlagFld) $srcCopyFlagFld .= ', ';
- $src->Execute("CREATE INDEX {$idx} on $srcTable ($srcCopyFlagFld$srcUpdateDateFld)");
- }
- /*
- Perform Merge by copying all data modified from src to dest
- then update src copied flag if present.
- Returns array taken from ReplicateData:
- Returns an array:
- $arr[0] = true if no error, false if error
- $arr[1] = number of recs processed
- $arr[2] = number of successful inserts
- $arr[3] = number of successful updates
- $srcTable = src table
- $dstTable = dest table
- $pkeys = primary keys array. if empty, then only inserts will occur
- $srcignoreflds = ignore these flds (must be upper cased)
- $setsrc = updateSrcFn string
- $srcUpdateDateFld = field in src with the last update date
- $srcCopyFlagFld = false = optional field that holds the copied indicator
- $flagvals=array('Y','N','P','=') = array of values indicating array(copied, not copied).
- Null is assumed to mean not copied. The 3rd value 'P' indicates that we want to force 'Y', bypassing
- default trigger behaviour to reset the COPIED='N' when the record is replicated from other side.
- The last value '=' is don't change copyflag.
- $srcCopyDateFld = field that holds last copy date in src table, which will be updated on Merge()
- $dstCopyDateFld = field that holds last copy date in dst table, which will be updated on Merge()
- $defaultDestRaiseErrorFn = The adodb raiseErrorFn handler. Default is to not raise an error.
- Just output error message to stdout
- */
- function Merge($srcTable, $dstTable, $pkeys, $srcignoreflds, $setsrc,
- $srcUpdateDateFld,
- $srcCopyFlagFld, $flagvals=array('Y','N','P','='),
- $srcCopyDateFld = false,
- $dstCopyDateFld = false,
- $whereClauses = '',
- $orderBy = '', # MUST INCLUDE THE "ORDER BY" suffix
- $copyDoneFlagIdx = 3,
- $defaultDestRaiseErrorFn = '')
- {
- $src = $this->connSrc;
- $dest = $this->connDest;
- $time = $src->Time();
- $delfirst = $this->deleteFirst;
- $upd = $this->updateSrcFn;
- $this->deleteFirst = false;
- //$this->updateFirst = true;
- $srcignoreflds[] = $srcUpdateDateFld;
- $srcignoreflds[] = $srcCopyFlagFld;
- $srcignoreflds[] = $srcCopyDateFld;
- if (empty($whereClauses)) $whereClauses = '1=1';
- $where = " WHERE ($whereClauses) and ($srcCopyFlagFld = ".$src->qstr($flagvals[1]).')';
- if ($orderBy) $where .= ' '.$orderBy;
- else $where .= ' ORDER BY '.$srcUpdateDateFld;
- if ($setsrc) $set[] = $setsrc;
- else $set = array();
- if ($srcCopyFlagFld) $set[] = "$srcCopyFlagFld = ".$src->qstr($flagvals[2]);
- if ($srcCopyDateFld) $set[]= "$srcCopyDateFld = ".$src->sysTimeStamp;
- if ($set) $this->updateSrcFn = array(implode(', ',$set));
- else $this->updateSrcFn = '';
- $extra[$srcCopyFlagFld] = array($dest->qstr($flagvals[0]),$dest->qstr($flagvals[$copyDoneFlagIdx]));
- $saveraise = $dest->raiseErrorFn;
- $dest->raiseErrorFn = '';
- if ($this->compat && $this->compat == 1.0) $srcUpdateDateFld = '';
- $arr = $this->ReplicateData($srcTable, $dstTable, $pkeys, $where, $srcignoreflds,
- $dstCopyDateFld,$extra,$srcUpdateDateFld);
- $dest->raiseErrorFn = $saveraise;
- $this->updateSrcFn = $upd;
- $this->deleteFirst = $delfirst;
- return $arr;
- }
- /*
- If doing a 2 way merge, then call
- $rep->Merge()
- to save without modifying the COPIEDFLAG ('=').
- Then can the following to set the COPIEDFLAG to 'P' which forces the COPIEDFLAG = 'Y'
- $rep->MergeDone()
- */
- function MergeDone($srcTable, $dstTable, $pkeys, $srcignoreflds, $setsrc,
- $srcUpdateDateFld,
- $srcCopyFlagFld, $flagvals=array('Y','N','P','='),
- $srcCopyDateFld = false,
- $dstCopyDateFld = false,
- $whereClauses = '',
- $orderBy = '', # MUST INCLUDE THE "ORDER BY" suffix
- $copyDoneFlagIdx = 2,
- $defaultDestRaiseErrorFn = '')
- {
- return $this->Merge($srcTable, $dstTable, $pkeys, $srcignoreflds, $setsrc,
- $srcUpdateDateFld,
- $srcCopyFlagFld, $flagvals,
- $srcCopyDateFld,
- $dstCopyDateFld,
- $whereClauses,
- $orderBy, # MUST INCLUDE THE "ORDER BY" suffix
- $copyDoneFlagIdx,
- $defaultDestRaiseErrorFn);
- }
- function _doerr($reason, $selflds)
- {
- $fn = $this->errHandler;
- if ($fn) $fn($this, $reason, $selflds); // set $this->neverAbort to true or false as required inside $fn
- }
- }
|