adodb-replicate.inc.php 34 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180
  1. <?php
  2. define('ADODB_REPLICATE',1.2);
  3. include_once(ADODB_DIR.'/adodb-datadict.inc.php');
  4. /*
  5. 1.2 9 June 2009
  6. Minor patches
  7. 1.1 8 June 2009
  8. Added $lastUpdateFld to replicatedata
  9. Added $rep->compat. If compat set to 1.0, then $lastUpdateFld not used during MergeData.
  10. 1.0 Apr 2009
  11. Added support for MFFA
  12. 0.9 ? 2008
  13. First release
  14. Note: this code assumes that comments such as / * * / ar`e allowed which works with:
  15. Note: this code assumes that comments such as / * * / are allowed which works with:
  16. mssql, postgresql, oracle, mssql
  17. Replication engine to
  18. - copy table structures and data from different databases (e.g. mysql to oracle)
  19. for replication purposes
  20. - generate CREATE TABLE, CREATE INDEX, INSERT ... for installation scripts
  21. Table Structure copying includes
  22. - fields and limited subset of types
  23. - optional default values
  24. - indexes
  25. - but not constraints
  26. Two modes of data copy:
  27. ReplicateData
  28. - Copy from src to dest, with update of status of copy back to src,
  29. with configurable src SELECT where clause
  30. MergeData
  31. - Copy from src to dest based on last mod date field and/or copied flag field
  32. Default settings are
  33. - do not execute, generate sql ($rep->execute = false)
  34. - do not delete records in dest table first ($rep->deleteFirst = false).
  35. if $rep->deleteFirst is true and primary keys are defined,
  36. then no deletion will occur unless *INSERTONLY* is defined in pkey array
  37. - only commit once at the end of every ReplicateData ($rep->commitReplicate = true)
  38. - do not autocommit every x records processed ($rep->commitRecs = -1)
  39. - even if error occurs on one record, continue copying remaining records ($rep->neverAbort = true)
  40. - debugging turned off ($rep->debug = false)
  41. */
  42. class ADODB_Replicate {
  43. var $connSrc;
  44. var $connDest;
  45. var $connSrc2 = false;
  46. var $connDest2 = false;
  47. var $ddSrc;
  48. var $ddDest;
  49. var $execute = false;
  50. var $debug = false;
  51. var $deleteFirst = false;
  52. var $commitReplicate = true; // commit at end of replicatedata
  53. var $commitRecs = -1; // only commit at end of ReplicateData()
  54. var $selFilter = false;
  55. var $fieldFilter = false;
  56. var $indexFilter = false;
  57. var $updateFilter = false;
  58. var $insertFilter = false;
  59. var $updateSrcFn = false;
  60. var $limitRecs = false;
  61. var $neverAbort = true;
  62. var $copyTableDefaults = false; // turn off because functions defined as defaults will not work when copied
  63. var $errHandler = false; // name of error handler function, if used.
  64. var $htmlSpecialChars = true; // if execute false, then output with htmlspecialchars enabled.
  65. // Will autoconfigure itself. No need to modify
  66. var $trgSuffix = '_mrgTr';
  67. var $idxSuffix = '_mrgidx';
  68. var $trLogic = '1 = 1';
  69. var $datesAreTimeStamps = false;
  70. var $oracleSequence = false;
  71. var $readUncommitted = false; // read without obeying shared locks for fast select (mssql)
  72. var $compat = false;
  73. // connSrc2 and connDest2 are only required if the db driver
  74. // does not allow updates back to src db in first connection (the select connection),
  75. // so we need 2nd connection
  76. function __construct($connSrc, $connDest, $connSrc2=false, $connDest2=false)
  77. {
  78. if (strpos($connSrc->databaseType,'odbtp') !== false) {
  79. $connSrc->_bindInputArray = false; # bug in odbtp, binding fails
  80. }
  81. if (strpos($connDest->databaseType,'odbtp') !== false) {
  82. $connDest->_bindInputArray = false; # bug in odbtp, binding fails
  83. }
  84. $this->connSrc = $connSrc;
  85. $this->connDest = $connDest;
  86. $this->connSrc2 = ($connSrc2) ? $connSrc2 : $connSrc;
  87. $this->connDest2 = ($connDest2) ? $connDest2 : $connDest;
  88. $this->ddSrc = NewDataDictionary($connSrc);
  89. $this->ddDest = NewDataDictionary($connDest);
  90. $this->htmlSpecialChars = isset($_SERVER['HTTP_HOST']);
  91. }
  92. function ExecSQL($sql)
  93. {
  94. if (!is_array($sql)) $sql[] = $sql;
  95. $ret = true;
  96. foreach($sql as $s)
  97. if (!$this->execute) echo "<pre>",$s.";\n</pre>";
  98. else {
  99. $ok = $this->connDest->Execute($s);
  100. if (!$ok)
  101. if ($this->neverAbort) $ret = false;
  102. else return false;
  103. }
  104. return $ret;
  105. }
  106. /*
  107. We assume replication between $table and $desttable only works if the field names and types match for both tables.
  108. Also $table and desttable can have different names.
  109. */
  110. function CopyTableStruct($table,$desttable='')
  111. {
  112. $sql = $this->CopyTableStructSQL($table,$desttable);
  113. if (empty($sql)) return false;
  114. return $this->ExecSQL($sql);
  115. }
  116. function RunFieldFilter(&$fld, $mode = '')
  117. {
  118. if ($this->fieldFilter) {
  119. $fn = $this->fieldFilter;
  120. return $fn($fld, $mode);
  121. } else
  122. return $fld;
  123. }
  124. function RunUpdateFilter($table, $fld, $val)
  125. {
  126. if ($this->updateFilter) {
  127. $fn = $this->updateFilter;
  128. return $fn($table, $fld, $val);
  129. } else
  130. return $val;
  131. }
  132. function RunInsertFilter($table, $fld, &$val)
  133. {
  134. if ($this->insertFilter) {
  135. $fn = $this->insertFilter;
  136. return $fn($table, $fld, $val);
  137. } else
  138. return $fld;
  139. }
  140. /*
  141. $mode = INS or UPD
  142. The lastUpdateFld holds the field that counts the number of updates or the date of last mod. This ensures that
  143. if the rec was modified after replicatedata retrieves the data but before we update back the src record,
  144. we don't set the copiedflag='Y' yet.
  145. */
  146. function RunUpdateSrcFn($srcdb, $table, $fldoffsets, $row, $where, $mode, $dest_insertid=null, $lastUpdateFld='')
  147. {
  148. if (!$this->updateSrcFn) return;
  149. $bindarr = array();
  150. foreach($fldoffsets as $k) {
  151. $bindarr[$k] = $row[$k];
  152. }
  153. $last = sizeof($row);
  154. if ($lastUpdateFld && $row[$last-1]) {
  155. $ds = $row[$last-1];
  156. if (strpos($ds,':') !== false) $s = $srcdb->DBTimeStamp($ds);
  157. else $s = $srcdb->qstr($ds);
  158. $where = "WHERE $lastUpdateFld = $s and $where";
  159. } else
  160. $where = "WHERE $where";
  161. $fn = $this->updateSrcFn;
  162. if (is_array($fn)) {
  163. if (sizeof($fn) == 1) $set = reset($fn);
  164. else $set = @$fn[$mode];
  165. if ($set) {
  166. if (strlen($dest_insertid) == 0) $dest_insertid = 'null';
  167. $set = str_replace('$INSERT_ID',$dest_insertid,$set);
  168. $sql = "UPDATE $table SET $set $where ";
  169. $ok = $srcdb->Execute($sql,$bindarr);
  170. if (!$ok) {
  171. echo $srcdb->ErrorMsg(),"<br>\n";
  172. die();
  173. }
  174. }
  175. } else $fn($srcdb, $table, $row, $where, $bindarr, $mode, $dest_insertid);
  176. }
  177. function CopyTableStructSQL($table, $desttable='',$dropdest =false)
  178. {
  179. if (!$desttable) {
  180. $desttable = $table;
  181. $prefixidx = '';
  182. } else
  183. $prefixidx = $desttable;
  184. $conn = $this->connSrc;
  185. $types = $conn->MetaColumns($table);
  186. if (!$types) {
  187. echo "$table does not exist in source db<br>\n";
  188. return array();
  189. }
  190. if (!$dropdest && $this->connDest->MetaColumns($desttable)) {
  191. echo "$desttable already exists in dest db<br>\n";
  192. return array();
  193. }
  194. if ($this->debug) var_dump($types);
  195. $sa = array();
  196. $idxcols = array();
  197. foreach($types as $name => $t) {
  198. $s = '';
  199. $mt = $this->ddSrc->MetaType($t->type);
  200. $len = $t->max_length;
  201. $fldname = $this->RunFieldFilter($t->name,'TABLE');
  202. if (!$fldname) continue;
  203. $s .= $fldname . ' '.$mt;
  204. if (isset($t->scale)) $precision = '.'.$t->scale;
  205. else $precision = '';
  206. if ($mt == 'C' or $mt == 'X') $s .= "($len)";
  207. else if ($mt == 'N' && $precision) $s .= "($len$precision)";
  208. if ($mt == 'R') $idxcols[] = $fldname;
  209. if ($this->copyTableDefaults) {
  210. if (isset($t->default_value)) {
  211. $v = $t->default_value;
  212. if ($mt == 'C' or $mt == 'X') $v = $this->connDest->qstr($v); // might not work as this could be function
  213. $s .= ' DEFAULT '.$v;
  214. }
  215. }
  216. $sa[] = $s;
  217. }
  218. $s = implode(",\n",$sa);
  219. // dump adodb intermediate data dictionary format
  220. if ($this->debug) echo '<pre>'.$s.'</pre>';
  221. $sqla = $this->ddDest->CreateTableSQL($desttable,$s);
  222. /*
  223. if ($idxcols) {
  224. $idxoptions = array('UNIQUE'=>1);
  225. $sqla2 = $this->ddDest->_IndexSQL($table.'_'.$fldname.'_SERIAL', $desttable, $idxcols,$idxoptions);
  226. $sqla = array_merge($sqla,$sqla2);
  227. }*/
  228. $idxs = $conn->MetaIndexes($table);
  229. if ($idxs)
  230. foreach($idxs as $name => $iarr) {
  231. $idxoptions = array();
  232. $fldnames = array();
  233. if(!empty($iarr['unique'])) {
  234. $idxoptions['UNIQUE'] = 1;
  235. }
  236. foreach($iarr['columns'] as $fld) {
  237. $fldnames[] = $this->RunFieldFilter($fld,'TABLE');
  238. }
  239. $idxname = $prefixidx.str_replace($table,$desttable,$name);
  240. if (!empty($this->indexFilter)) {
  241. $fn = $this->indexFilter;
  242. $idxname = $fn($desttable,$idxname,$fldnames,$idxoptions);
  243. }
  244. $sqla2 = $this->ddDest->_IndexSQL($idxname, $desttable, $fldnames,$idxoptions);
  245. $sqla = array_merge($sqla,$sqla2);
  246. }
  247. return $sqla;
  248. }
  249. function _clearcache()
  250. {
  251. }
  252. function _concat($v)
  253. {
  254. return $this->connDest->concat("' ","chr(".ord($v).")","'");
  255. }
  256. function fixupbinary($v)
  257. {
  258. return str_replace(
  259. array("\r","\n"),
  260. array($this->_concat("\r"),$this->_concat("\n")),
  261. $v );
  262. }
  263. function SwapDBs()
  264. {
  265. $o = $this->connSrc;
  266. $this->connSrc = $this->connDest;
  267. $this->connDest = $o;
  268. $o = $this->connSrc2;
  269. $this->connSrc2 = $this->connDest2;
  270. $this->connDest2 = $o;
  271. $o = $this->ddSrc;
  272. $this->ddSrc = $this->ddDest;
  273. $this->ddDest = $o;
  274. }
  275. /*
  276. // if no uniqflds defined, then all desttable recs will be deleted before insert
  277. // $where clause must include the WHERE word if used
  278. // if $this->commitRecs is set to a +ve value, then it will autocommit every $this->commitRecs records
  279. // -- this should never be done with 7x24 db's
  280. Returns an array:
  281. $arr[0] = true if no error, false if error
  282. $arr[1] = number of recs processed
  283. $arr[2] = number of successful inserts
  284. $arr[3] = number of successful updates
  285. ReplicateData() params:
  286. $table = src table name
  287. $desttable = dest table name, leave blank to use src table name
  288. $uniqflds = array() = an array. If set, then inserts and updates will occur. eg. array('PK1', 'PK2');
  289. To prevent updates to desttable (allow only to src table), add '*INSERTONLY*' or '*ONLYINSERT*' to array.
  290. Sometimes you are replicating a src table with an autoinc primary key.
  291. You sometimes create recs in the dest table. The dest table has to retrieve the
  292. src table's autoinc key (stored in a 2nd field) so you can match the two tables.
  293. To define this, and the uniqflds contains nested arrays. Copying from autoinc table to other table:
  294. array(array($destpkey), array($destfld_holds_src_autoinc_pkey))
  295. Copying from normal table to autoinc table:
  296. array(array($destpkey), array(), array($srcfld_holds_dest_autoinc_pkey))
  297. $where = where clause for SELECT from $table $where. Include the WHERE reserved word in beginning.
  298. You can put ORDER BY at the end also
  299. $ignoreflds = array(), list of fields to ignore. e.g. array('FLD1',FLD2');
  300. $dstCopyDateFld = date field on $desttable to update with current date
  301. $extraflds allows you to add additional flds to insert/update. Format
  302. array(fldname => $fldval)
  303. $fldval itself can be an array or a string. If an array, then
  304. $extraflds = array($fldname => array($insertval, $updateval))
  305. Thus we have the following behaviours:
  306. a. Delete all data in $desttable then insert from src $table
  307. $rep->execute = true;
  308. $rep->ReplicateData($table, $desttable)
  309. b. Update $desttable if record exists (based on $uniqflds), otherwise insert.
  310. $rep->execute = true;
  311. $rep->ReplicateData($table, $desttable, $array($pkey1, $pkey2))
  312. c. Select from src $table all data modified since a date. Then update $desttable
  313. if record exists (based on $uniqflds), otherwise insert
  314. $rep->execute = true;
  315. $rep->ReplicateData($table, $desttable, array($pkey1, $pkey2), "WHERE update_datetime_fld > $LAST_REFRESH")
  316. d. Insert all records into $desttable modified after a certain id (or time) in src $table:
  317. $rep->execute = true;
  318. $rep->ReplicateData($table, $desttable, false, "WHERE id_fld > $LAST_ID_SAVED", true);
  319. For (a) to (d), returns array: array($boolean_ok_fail, $no_recs_selected_from_src_db, $no_recs_inserted, $no_recs_updated);
  320. e. Generate sample SQL:
  321. $rep->execute = false;
  322. $rep->ReplicateData(....);
  323. This returns $array, which contains:
  324. $array['SEL'] = select stmt from src db
  325. $array['UPD'] = update stmt to dest db
  326. $array['INS'] = insert stmt to dest db
  327. Error-handling
  328. ==============
  329. Default is never abort if error occurs. You can set $rep->neverAbort = false; to force replication to abort if an error occurs.
  330. Value Filtering
  331. ========
  332. Sometimes you might need to modify/massage the data before the code works. Assume that the value used for True and False is
  333. 'T' and 'F' in src DB, but is 'Y' and 'N' in dest DB for field[2] in select stmt. You can do this by
  334. $rep->filterSelect = 'filter';
  335. $rep->ReplicateData(...);
  336. function filter($table,& $fields, $deleteFirst)
  337. {
  338. if ($table == 'SOMETABLE') {
  339. if ($fields[2] == 'T') $fields[2] = 'Y';
  340. else if ($fields[2] == 'F') $fields[2] = 'N';
  341. }
  342. }
  343. We pass in $deleteFirst as that determines the order of the fields (which are numeric-based):
  344. TRUE: the order of fields matches the src table order
  345. FALSE: the order of fields is all non-primary key fields first, followed by primary key fields. This is because it needs
  346. to match the UPDATE statement, which is UPDATE $table SET f2 = ?, f3 = ? ... WHERE f1 = ?
  347. Name Filtering
  348. =========
  349. Sometimes field names that are legal in one RDBMS can be illegal in another.
  350. We allow you to handle this using a field filter.
  351. Also if you don't want to replicate certain fields, just return false.
  352. $rep->fieldFilter = 'ffilter';
  353. function ffilter(&$fld,$mode)
  354. {
  355. $uf = strtoupper($fld);
  356. switch($uf) {
  357. case 'GROUP':
  358. if ($mode == 'SELECT') $fld = '"Group"';
  359. return 'GroupFld';
  360. case 'PRIVATEFLD': # do not replicate
  361. return false;
  362. }
  363. return $fld;
  364. }
  365. UPDATE FILTERING
  366. ================
  367. Sometimes, when we want to update
  368. UPDATE table SET fld = val WHERE ....
  369. we want to modify val. To do so, define
  370. $rep->updateFilter = 'ufilter';
  371. function ufilter($table, $fld, $val)
  372. {
  373. return "nvl($fld, $val)";
  374. }
  375. Sending back audit info back to src Table
  376. =========================================
  377. Use $rep->updateSrcFn. This can be an array of strings, or the name of a php function to call.
  378. If an array of strings is defined, then it will perform an update statement...
  379. UPDATE srctable SET $string WHERE ....
  380. With $string set to the array you define. If a new record was inserted into desttable, then the
  381. 'INS' string is used ($INSERT_ID will be replaced with the real INSERT_ID, if any),
  382. and if an update then use the 'UPD' string.
  383. array(
  384. 'INS' => 'insertid = $INSERT_ID, copieddate=getdate(), copied = 1',
  385. 'UPD' => 'copieddate=getdate(), copied = 1'
  386. )
  387. If a single string array is defined, then it will be used for both insert and update.
  388. array('copieddate=getdate(), copied = 1')
  389. Note that the where clause is automatically defined by the system.
  390. If $rep->updateSrcFn is a PHP function name, then it will be called with the following params:
  391. $fn($srcConnection, $tableName, $row, $where, $bindarr, $mode, $dest_insertid)
  392. $srcConnection - source db connection
  393. $tableName - source tablename
  394. $row - array holding records updated into dest
  395. $where - where clause to be used (uses bind vars)
  396. $bindarr - array holding bind variables for where clause
  397. $mode - INS or UPD
  398. $dest_insertid - when mode=INS, then the insert_id is stored here.
  399. oracle mssql
  400. ---> insert
  401. mssqlid <--- insert_id
  402. ----> update with mssqlid
  403. <---- update with mssqlid
  404. TODO: add src pkey and dest pkey for updates. Also sql stmt needs to be tuned, so dest pkey, src pkey
  405. */
  406. function ReplicateData($table, $desttable = '', $uniqflds = array(), $where = '',$ignore_flds = array(),
  407. $dstCopyDateFld='', $extraflds = array(), $lastUpdateFld = '')
  408. {
  409. if (is_array($where)) {
  410. $wheresrc = $where[0];
  411. $wheredest = $where[1];
  412. } else {
  413. $wheresrc = $wheredest = $where;
  414. }
  415. $dstCopyDateName = $dstCopyDateFld;
  416. $dstCopyDateFld = strtoupper($dstCopyDateFld);
  417. $this->_clearcache();
  418. if (is_string($uniqflds) && strlen($uniqflds)) $uniqflds = array($uniqflds);
  419. if (!$desttable) $desttable = $table;
  420. $uniq = array();
  421. if ($uniqflds) {
  422. if (is_array(reset($uniqflds))) {
  423. /*
  424. primary key of src and dest tables differ. This means when we perform the select stmts
  425. we retrieve both keys. Then any insert statement will have to ignore one array element.
  426. Any update statement will need to use a different where clause
  427. */
  428. $destuniqflds = $uniqflds[0];
  429. if (sizeof($uniqflds)>1 && $uniqflds[1]) // srckey field name in dest table
  430. $srcuniqflds = $uniqflds[1];
  431. else
  432. $srcuniqflds = array();
  433. if (sizeof($uniqflds)>2)
  434. $srcPKDest = reset($uniqflds[2]);
  435. } else {
  436. $destuniqflds = $uniqflds;
  437. $srcuniqflds = array();
  438. }
  439. $onlyInsert = false;
  440. foreach($destuniqflds as $k => $u) {
  441. if ($u == '*INSERTONLY*' || $u == '*ONLYINSERT*') {
  442. $onlyInsert = true;
  443. continue;
  444. }
  445. $uniq[strtoupper($u)] = $k;
  446. }
  447. $deleteFirst = $this->deleteFirst;
  448. } else {
  449. $deleteFirst = true;
  450. }
  451. if ($deleteFirst) $onlyInsert = true;
  452. if ($ignore_flds) {
  453. foreach($ignore_flds as $u) {
  454. $ignoreflds[strtoupper($u)] = 1;
  455. }
  456. } else
  457. $ignoreflds = array();
  458. $src = $this->connSrc;
  459. $dest = $this->connDest;
  460. $src2 = $this->connSrc2;
  461. $dest->noNullStrings = false;
  462. $src->noNullStrings = false;
  463. $src2->noNullStrings = false;
  464. if ($src === $dest) $this->execute = false;
  465. $types = $src->MetaColumns($table);
  466. if (!$types) {
  467. echo "Source $table does not exist<br>\n";
  468. return array();
  469. }
  470. $dtypes = $dest->MetaColumns($desttable);
  471. if (!$dtypes) {
  472. echo "Destination $desttable does not exist<br>\n";
  473. return array();
  474. }
  475. $sa = array();
  476. $selflds = array();
  477. $wheref = array();
  478. $wheres = array();
  479. $srcwheref = array();
  480. $fldoffsets = array();
  481. $k = 0;
  482. foreach($types as $name => $t) {
  483. $name2 = strtoupper($this->RunFieldFilter($name,'SELECT'));
  484. // handle quotes
  485. if ($name2 && $name2[0] == '"' && $name2[strlen($name2)-1] == '"') $name22 = substr($name2,1,strlen($name2)-2);
  486. elseif ($name2 && $name2[0] == '`' && $name2[strlen($name2)-1] == '`') $name22 = substr($name2,1,strlen($name2)-2);
  487. else $name22 = $name2;
  488. //else $name22 = $name2; // this causes problem for quotes strip above
  489. if (!isset($dtypes[($name22)]) || !$name2) {
  490. if ($this->debug) echo " Skipping $name ==> $name2 as not in destination $desttable<br>";
  491. continue;
  492. }
  493. if ($name2 == $dstCopyDateFld) {
  494. $dstCopyDateName = $t->name;
  495. continue;
  496. }
  497. $fld = $t->name;
  498. $fldval = $t->name;
  499. $mt = $src->MetaType($t->type);
  500. if ($this->datesAreTimeStamps && $mt == 'D') $mt = 'T';
  501. if ($mt == 'D') $fldval = $dest->DBDate($fldval);
  502. elseif ($mt == 'T') $fldval = $dest->DBTimeStamp($fldval);
  503. $ufld = strtoupper($fld);
  504. if (isset($ignoreflds[($name2)]) && !isset($uniq[$ufld])) {
  505. continue;
  506. }
  507. if ($this->debug) echo " field=$fld type=$mt fldval=$fldval<br>";
  508. if (!isset($uniq[$ufld])) {
  509. $selfld = $fld;
  510. $fld = $this->RunFieldFilter($selfld,'SELECT');
  511. $selflds[] = $selfld;
  512. $p = $dest->Param($k);
  513. if ($mt == 'D') $p = $dest->DBDate($p, true);
  514. else if ($mt == 'T') $p = $dest->DBTimeStamp($p, true);
  515. # UPDATES
  516. $sets[] = "$fld = ".$this->RunUpdateFilter($desttable, $fld, $p);
  517. # INSERTS
  518. $insflds[] = $this->RunInsertFilter($desttable,$fld, $p); $params[] = $p;
  519. $k++;
  520. } else {
  521. $fld = $this->RunFieldFilter($fld);
  522. $wheref[] = $fld;
  523. if (!empty($srcuniqflds)) $srcwheref[] = $srcuniqflds[$uniq[$ufld]];
  524. if ($mt == 'C') { # normally we don't include the primary key in the insert if it is numeric, but ok if varchar
  525. $insertpkey = true;
  526. }
  527. }
  528. }
  529. foreach($extraflds as $fld => $evals) {
  530. if (!is_array($evals)) $evals = array($evals, $evals);
  531. $insflds[] = $this->RunInsertFilter($desttable,$fld, $p); $params[] = $evals[0];
  532. $sets[] = "$fld = ".$evals[1];
  533. }
  534. if ($dstCopyDateFld) {
  535. $sets[] = "$dstCopyDateName = ".$dest->sysTimeStamp;
  536. $insflds[] = $this->RunInsertFilter($desttable,$dstCopyDateName, $p); $params[] = $dest->sysTimeStamp;
  537. }
  538. if (!empty($srcPKDest)) {
  539. $selflds[] = $srcPKDest;
  540. $fldoffsets = array($k+1);
  541. }
  542. foreach($wheref as $uu => $fld) {
  543. $p = $dest->Param($k);
  544. $sp = $src->Param($k);
  545. if (!empty($srcuniqflds)) {
  546. if ($uu > 1) die("Only one primary key for srcuniqflds allowed currently");
  547. $destsrckey = reset($srcuniqflds);
  548. $wheres[] = reset($srcuniqflds).' = '.$p;
  549. $insflds[] = $this->RunInsertFilter($desttable,$destsrckey, $p);
  550. $params[] = $p;
  551. } else {
  552. $wheres[] = $fld.' = '.$p;
  553. if (!isset($ignoreflds[strtoupper($fld)]) || !empty($insertpkey)) {
  554. $insflds[] = $this->RunInsertFilter($desttable,$fld, $p);
  555. $params[] = $p;
  556. }
  557. }
  558. $selflds[] = $fld;
  559. $srcwheres[] = $fld.' = '.$sp;
  560. $fldoffsets[] = $k;
  561. $k++;
  562. }
  563. if (!empty($srcPKDest)) {
  564. $fldoffsets = array($k);
  565. $srcwheres = array($fld.'='.$src->Param($k));
  566. $k++;
  567. }
  568. if ($lastUpdateFld) {
  569. $selflds[] = $lastUpdateFld;
  570. } else
  571. $selflds[] = 'null as Z55_DUMMY_LA5TUPD';
  572. $insfldss = implode(', ', $insflds);
  573. $fldss = implode(', ', $selflds);
  574. $setss = implode(', ', $sets);
  575. $paramss = implode(', ', $params);
  576. $wheress = implode(' AND ', $wheres);
  577. if (isset($srcwheres))
  578. $srcwheress = implode(' AND ',$srcwheres);
  579. $seltable = $table;
  580. if ($this->readUncommitted && strpos($src->databaseType,'mssql')) $seltable .= ' with (NOLOCK)';
  581. $sa['SEL'] = "SELECT $fldss FROM $seltable $wheresrc";
  582. $sa['INS'] = "INSERT INTO $desttable ($insfldss) VALUES ($paramss) /**INS**/";
  583. $sa['UPD'] = "UPDATE $desttable SET $setss WHERE $wheress /**UPD**/";
  584. $DB1 = "/* <font color=green> Source DB - sample sql in case you need to adapt code\n\n";
  585. $DB2 = "/* <font color=green> Dest DB - sample sql in case you need to adapt code\n\n";
  586. if (!$this->execute) echo '/*<style>
  587. pre {
  588. white-space: pre-wrap; /* css-3 */
  589. white-space: -moz-pre-wrap !important; /* Mozilla, since 1999 */
  590. white-space: -pre-wrap; /* Opera 4-6 */
  591. white-space: -o-pre-wrap; /* Opera 7 */
  592. word-wrap: break-word; /* Internet Explorer 5.5+ */
  593. }
  594. </style><pre>*/
  595. ';
  596. if ($deleteFirst && $this->deleteFirst) {
  597. $where = preg_replace('/[ \n\r\t]+order[ \n\r\t]+by.*$/i', '', $where);
  598. $sql = "DELETE FROM $desttable $wheredest\n";
  599. if (!$this->execute) echo $DB2,'</font>*/',$sql,"\n";
  600. else $dest->Execute($sql);
  601. }
  602. global $ADODB_COUNTRECS;
  603. $err = false;
  604. $savemode = $src->setFetchMode(ADODB_FETCH_NUM);
  605. $ADODB_COUNTRECS = false;
  606. if (!$this->execute) {
  607. echo $DB1,$sa['SEL'],"</font>\n*/\n\n";
  608. echo $DB2,$sa['INS'],"</font>\n*/\n\n";
  609. $suffix = ($onlyInsert) ? ' PRIMKEY=?' : '';
  610. echo $DB2,$sa['UPD'],"$suffix</font>\n*/\n\n";
  611. $rs = $src->Execute($sa['SEL']);
  612. $cnt = 1;
  613. $upd = 0;
  614. $ins = 0;
  615. $sqlarr = explode('?',$sa['INS']);
  616. $nparams = sizeof($sqlarr)-1;
  617. $useQmark = $dest && ($dest->dataProvider != 'oci8');
  618. while ($rs && !$rs->EOF) {
  619. if ($useQmark) {
  620. $sql = ''; $i = 0;
  621. $arr = array_reverse($rs->fields);
  622. foreach ($arr as $v) {
  623. $sql .= $sqlarr[$i];
  624. // from Ron Baldwin <ron.baldwin#sourceprose.com>
  625. // Only quote string types
  626. $typ = gettype($v);
  627. if ($typ == 'string')
  628. //New memory copy of input created here -mikefedyk
  629. $sql .= $dest->qstr($v);
  630. else if ($typ == 'double')
  631. $sql .= str_replace(',','.',$v); // locales fix so 1.1 does not get converted to 1,1
  632. else if ($typ == 'boolean')
  633. $sql .= $v ? $dest->true : $dest->false;
  634. else if ($typ == 'object') {
  635. if (method_exists($v, '__toString')) $sql .= $dest->qstr($v->__toString());
  636. else $sql .= $dest->qstr((string) $v);
  637. } else if ($v === null)
  638. $sql .= 'NULL';
  639. else
  640. $sql .= $v;
  641. $i += 1;
  642. if ($i == $nparams) break;
  643. } // while
  644. if (isset($sqlarr[$i])) {
  645. $sql .= $sqlarr[$i];
  646. }
  647. $INS = $sql;
  648. } else {
  649. $INS = $sa['INS'];
  650. $arr = array_reverse($rs->fields);
  651. foreach($arr as $k => $v) { // only works on oracle currently
  652. $k = sizeof($arr)-$k-1;
  653. $v = str_replace(":","%~%COLON%!%",$v);
  654. $INS = str_replace(':'.$k,$this->fixupbinary($dest->qstr($v)),$INS);
  655. }
  656. $INS = str_replace("%~%COLON%!%",":",$INS);
  657. if ($this->htmlSpecialChars) $INS = htmlspecialchars($INS);
  658. }
  659. echo "-- $cnt\n",$INS,";\n\n";
  660. $cnt += 1;
  661. $ins += 1;
  662. $rs->MoveNext();
  663. }
  664. $src->setFetchMode($savemode);
  665. return $sa;
  666. } else {
  667. $saved = $src->debug;
  668. #$src->debug=1;
  669. if ($this->limitRecs>100)
  670. $rs = $src->SelectLimit($sa['SEL'],$this->limitRecs);
  671. else
  672. $rs = $src->Execute($sa['SEL']);
  673. $src->debug = $saved;
  674. if (!$rs) {
  675. if ($this->errHandler) $this->_doerr('SEL',array());
  676. return array(0,0,0,0);
  677. }
  678. if ($this->commitReplicate || $commitRecs > 0) {
  679. $dest->BeginTrans();
  680. if ($this->updateSrcFn) $src2->BeginTrans();
  681. }
  682. if ($this->updateSrcFn && strpos($src2->databaseType,'mssql') !== false) {
  683. # problem is writers interfere with readers in mssql
  684. $rs = $src->_rs2rs($rs);
  685. }
  686. $cnt = 0;
  687. $upd = 0;
  688. $ins = 0;
  689. $sizeofrow = sizeof($selflds);
  690. $fn = $this->selFilter;
  691. $commitRecs = $this->commitRecs;
  692. $saved = $dest->debug;
  693. if ($this->deleteFirst) $onlyInsert = true;
  694. while ($origrow = $rs->FetchRow()) {
  695. if ($dest->debug) {flush(); @ob_flush();}
  696. if ($fn) {
  697. if (!$fn($desttable, $origrow, $deleteFirst, $this, $selflds)) continue;
  698. }
  699. $doinsert = true;
  700. $row = array_slice($origrow,0,$sizeofrow-1);
  701. if (!$onlyInsert) {
  702. $doinsert = false;
  703. $upderr = false;
  704. if (isset($srcPKDest)) {
  705. if (is_null($origrow[$sizeofrow-3])) {
  706. $doinsert = true;
  707. $upderr = true;
  708. }
  709. }
  710. if (!$upderr && !$dest->Execute($sa['UPD'],$row)) {
  711. $err = true;
  712. $upderr = true;
  713. if ($this->errHandler) $this->_doerr('UPD',$row);
  714. if (!$this->neverAbort) break;
  715. }
  716. if ($upderr || $dest->Affected_Rows() == 0) {
  717. $doinsert = true;
  718. } else {
  719. if (!empty($uniqflds)) $this->RunUpdateSrcFn($src2, $table, $fldoffsets, $origrow, $srcwheress, 'UPD', null, $lastUpdateFld);
  720. $upd += 1;
  721. }
  722. }
  723. if ($doinsert) {
  724. $inserr = false;
  725. if (isset($srcPKDest)) {
  726. $row = array_slice($origrow,0,$sizeofrow-2);
  727. }
  728. if (! $dest->Execute($sa['INS'],$row)) {
  729. $err = true;
  730. $inserr = true;
  731. if ($this->errHandler) $this->_doerr('INS',$row);
  732. if ($this->neverAbort) continue;
  733. else break;
  734. } else {
  735. if ($dest->dataProvider == 'oci8') {
  736. if ($this->oracleSequence) $lastid = $dest->GetOne("select ".$this->oracleSequence.".currVal from dual");
  737. else $lastid = 'null';
  738. } else {
  739. $lastid = $dest->Insert_ID();
  740. }
  741. if (!$inserr && !empty($uniqflds)) {
  742. $this->RunUpdateSrcFn($src2, $table, $fldoffsets, $origrow, $srcwheress, 'INS', $lastid,$lastUpdateFld);
  743. }
  744. $ins += 1;
  745. }
  746. }
  747. $cnt += 1;
  748. if ($commitRecs > 0 && ($cnt % $commitRecs) == 0) {
  749. $dest->CommitTrans();
  750. $dest->BeginTrans();
  751. if ($this->updateSrcFn) {
  752. $src2->CommitTrans();
  753. $src2->BeginTrans();
  754. }
  755. }
  756. } // while
  757. if ($this->commitReplicate || $commitRecs > 0) {
  758. if (!$this->neverAbort && $err) {
  759. $dest->RollbackTrans();
  760. if ($this->updateSrcFn) $src2->RollbackTrans();
  761. } else {
  762. $dest->CommitTrans();
  763. if ($this->updateSrcFn) $src2->CommitTrans();
  764. }
  765. }
  766. }
  767. if ($cnt != $ins + $upd) echo "<p>ERROR: $cnt != INS $ins + UPD $upd</p>";
  768. $src->setFetchMode($savemode);
  769. return array(!$err, $cnt, $ins, $upd);
  770. }
  771. // trigger support only for sql server and oracle
  772. // need to add
  773. function MergeSrcSetup($srcTable, $pkeys, $srcUpdateDateFld, $srcCopyDateFld, $srcCopyFlagFld,
  774. $srcCopyFlagType='C(1)', $srcCopyFlagVals = array('Y','N','P','='))
  775. {
  776. $sqla = array();
  777. $src = $this->connSrc;
  778. $idx = $srcTable.'_mrgIdx';
  779. $cols = $src->MetaColumns($srcTable);
  780. #adodb_pr($cols);
  781. if (!isset($cols[strtoupper($srcUpdateDateFld)])) {
  782. $sqla = $this->ddSrc->AddColumnSQL($srcTable, "$srcUpdateDateFld TS DEFTIMESTAMP");
  783. foreach($sqla as $sql) $src->Execute($sql);
  784. }
  785. if ($srcCopyDateFld && !isset($cols[strtoupper($srcCopyDateFld)])) {
  786. $sqla = $this->ddSrc->AddColumnSQL($srcTable, "$srcCopyDateFld TS DEFTIMESTAMP");
  787. foreach($sqla as $sql) $src->Execute($sql);
  788. }
  789. $sysdate = $src->sysTimeStamp;
  790. $arrv0 = $src->qstr($srcCopyFlagVals[0]);
  791. $arrv1 = $src->qstr($srcCopyFlagVals[1]);
  792. $arrv2 = $src->qstr($srcCopyFlagVals[2]);
  793. $arrv3 = $src->qstr($srcCopyFlagVals[3]);
  794. if ($srcCopyFlagFld && !isset($cols[strtoupper($srcCopyFlagFld)])) {
  795. $sqla = $this->ddSrc->AddColumnSQL($srcTable, "$srcCopyFlagFld $srcCopyFlagType DEFAULT $arrv1");
  796. foreach($sqla as $sql) $src->Execute($sql);
  797. }
  798. $sqla = array();
  799. $name = "{$srcTable}_mrgTr";
  800. if (is_array($pkeys) && strpos($src->databaseType,'mssql') !== false) {
  801. $pk = reset($pkeys);
  802. #$sqla[] = "DROP TRIGGER $name";
  803. $sqltr = "
  804. TRIGGER $name
  805. ON $srcTable /* for data replication and merge */
  806. AFTER UPDATE
  807. AS
  808. UPDATE $srcTable
  809. SET
  810. $srcUpdateDateFld = case when I.$srcCopyFlagFld = $arrv2 or I.$srcCopyFlagFld = $arrv3 then I.$srcUpdateDateFld
  811. else $sysdate end,
  812. $srcCopyFlagFld = case
  813. when I.$srcCopyFlagFld = $arrv2 then $arrv0
  814. when I.$srcCopyFlagFld = $arrv3 then D.$srcCopyFlagFld
  815. else $arrv1 end
  816. FROM $srcTable S Join Inserted AS I on I.$pk = S.$pk
  817. JOIN Deleted as D ON I.$pk = D.$pk
  818. WHERE I.$srcCopyFlagFld = D.$srcCopyFlagFld or I.$srcCopyFlagFld = $arrv2
  819. or I.$srcCopyFlagFld = $arrv3 or I.$srcCopyFlagFld is null
  820. ";
  821. $sqla[] = 'CREATE '.$sqltr; // first if does not exists
  822. $sqla[] = 'ALTER '.$sqltr; // second if it already exists
  823. } else if (strpos($src->databaseType,'oci') !== false) {
  824. if (strlen($srcTable)>22) $tableidx = substr($srcTable,0,16).substr(crc32($srcTable),6);
  825. else $tableidx = $srcTable;
  826. $name = $tableidx.$this->trgSuffix;
  827. $idx = $tableidx.$this->idxSuffix;
  828. $sqla[] = "
  829. CREATE OR REPLACE TRIGGER $name /* for data replication and merge */
  830. BEFORE UPDATE ON $srcTable REFERENCING NEW AS NEW OLD AS OLD
  831. FOR EACH ROW
  832. BEGIN
  833. if :new.$srcCopyFlagFld = $arrv2 then
  834. :new.$srcCopyFlagFld := $arrv0;
  835. elsif :new.$srcCopyFlagFld = $arrv3 then
  836. :new.$srcCopyFlagFld := :old.$srcCopyFlagFld;
  837. elsif :old.$srcCopyFlagFld = :new.$srcCopyFlagFld or :new.$srcCopyFlagFld is null then
  838. if $this->trLogic then
  839. :new.$srcUpdateDateFld := $sysdate;
  840. :new.$srcCopyFlagFld := $arrv1;
  841. end if;
  842. end if;
  843. END;
  844. ";
  845. }
  846. foreach($sqla as $sql) $src->Execute($sql);
  847. if ($srcCopyFlagFld) $srcCopyFlagFld .= ', ';
  848. $src->Execute("CREATE INDEX {$idx} on $srcTable ($srcCopyFlagFld$srcUpdateDateFld)");
  849. }
  850. /*
  851. Perform Merge by copying all data modified from src to dest
  852. then update src copied flag if present.
  853. Returns array taken from ReplicateData:
  854. Returns an array:
  855. $arr[0] = true if no error, false if error
  856. $arr[1] = number of recs processed
  857. $arr[2] = number of successful inserts
  858. $arr[3] = number of successful updates
  859. $srcTable = src table
  860. $dstTable = dest table
  861. $pkeys = primary keys array. if empty, then only inserts will occur
  862. $srcignoreflds = ignore these flds (must be upper cased)
  863. $setsrc = updateSrcFn string
  864. $srcUpdateDateFld = field in src with the last update date
  865. $srcCopyFlagFld = false = optional field that holds the copied indicator
  866. $flagvals=array('Y','N','P','=') = array of values indicating array(copied, not copied).
  867. Null is assumed to mean not copied. The 3rd value 'P' indicates that we want to force 'Y', bypassing
  868. default trigger behaviour to reset the COPIED='N' when the record is replicated from other side.
  869. The last value '=' is don't change copyflag.
  870. $srcCopyDateFld = field that holds last copy date in src table, which will be updated on Merge()
  871. $dstCopyDateFld = field that holds last copy date in dst table, which will be updated on Merge()
  872. $defaultDestRaiseErrorFn = The adodb raiseErrorFn handler. Default is to not raise an error.
  873. Just output error message to stdout
  874. */
  875. function Merge($srcTable, $dstTable, $pkeys, $srcignoreflds, $setsrc,
  876. $srcUpdateDateFld,
  877. $srcCopyFlagFld, $flagvals=array('Y','N','P','='),
  878. $srcCopyDateFld = false,
  879. $dstCopyDateFld = false,
  880. $whereClauses = '',
  881. $orderBy = '', # MUST INCLUDE THE "ORDER BY" suffix
  882. $copyDoneFlagIdx = 3,
  883. $defaultDestRaiseErrorFn = '')
  884. {
  885. $src = $this->connSrc;
  886. $dest = $this->connDest;
  887. $time = $src->Time();
  888. $delfirst = $this->deleteFirst;
  889. $upd = $this->updateSrcFn;
  890. $this->deleteFirst = false;
  891. //$this->updateFirst = true;
  892. $srcignoreflds[] = $srcUpdateDateFld;
  893. $srcignoreflds[] = $srcCopyFlagFld;
  894. $srcignoreflds[] = $srcCopyDateFld;
  895. if (empty($whereClauses)) $whereClauses = '1=1';
  896. $where = " WHERE ($whereClauses) and ($srcCopyFlagFld = ".$src->qstr($flagvals[1]).')';
  897. if ($orderBy) $where .= ' '.$orderBy;
  898. else $where .= ' ORDER BY '.$srcUpdateDateFld;
  899. if ($setsrc) $set[] = $setsrc;
  900. else $set = array();
  901. if ($srcCopyFlagFld) $set[] = "$srcCopyFlagFld = ".$src->qstr($flagvals[2]);
  902. if ($srcCopyDateFld) $set[]= "$srcCopyDateFld = ".$src->sysTimeStamp;
  903. if ($set) $this->updateSrcFn = array(implode(', ',$set));
  904. else $this->updateSrcFn = '';
  905. $extra[$srcCopyFlagFld] = array($dest->qstr($flagvals[0]),$dest->qstr($flagvals[$copyDoneFlagIdx]));
  906. $saveraise = $dest->raiseErrorFn;
  907. $dest->raiseErrorFn = '';
  908. if ($this->compat && $this->compat == 1.0) $srcUpdateDateFld = '';
  909. $arr = $this->ReplicateData($srcTable, $dstTable, $pkeys, $where, $srcignoreflds,
  910. $dstCopyDateFld,$extra,$srcUpdateDateFld);
  911. $dest->raiseErrorFn = $saveraise;
  912. $this->updateSrcFn = $upd;
  913. $this->deleteFirst = $delfirst;
  914. return $arr;
  915. }
  916. /*
  917. If doing a 2 way merge, then call
  918. $rep->Merge()
  919. to save without modifying the COPIEDFLAG ('=').
  920. Then can the following to set the COPIEDFLAG to 'P' which forces the COPIEDFLAG = 'Y'
  921. $rep->MergeDone()
  922. */
  923. function MergeDone($srcTable, $dstTable, $pkeys, $srcignoreflds, $setsrc,
  924. $srcUpdateDateFld,
  925. $srcCopyFlagFld, $flagvals=array('Y','N','P','='),
  926. $srcCopyDateFld = false,
  927. $dstCopyDateFld = false,
  928. $whereClauses = '',
  929. $orderBy = '', # MUST INCLUDE THE "ORDER BY" suffix
  930. $copyDoneFlagIdx = 2,
  931. $defaultDestRaiseErrorFn = '')
  932. {
  933. return $this->Merge($srcTable, $dstTable, $pkeys, $srcignoreflds, $setsrc,
  934. $srcUpdateDateFld,
  935. $srcCopyFlagFld, $flagvals,
  936. $srcCopyDateFld,
  937. $dstCopyDateFld,
  938. $whereClauses,
  939. $orderBy, # MUST INCLUDE THE "ORDER BY" suffix
  940. $copyDoneFlagIdx,
  941. $defaultDestRaiseErrorFn);
  942. }
  943. function _doerr($reason, $selflds)
  944. {
  945. $fn = $this->errHandler;
  946. if ($fn) $fn($this, $reason, $selflds); // set $this->neverAbort to true or false as required inside $fn
  947. }
  948. }