00001
00002
00003
00004
00005
00006
00007
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 # include "processor/xmlflowcontext.hpp"
00041 # include "streamers/generic/saxstreamer.hpp"
00042 # include "sqlstreamer.hpp"
00043
00044 BEGIN_XDFLENGINE_NS
00045
00046
00047
00048
00049
00050 XMLStreamer* SQLStreamerFactory::getStreamer( StreamerParams* p_pParameters, XMLFlowContext* p_pStreamContext, XMLStreamConsumer* p_pStreamConsumer) const
00051 {
00052 SAXSQLHandler* l_SAXSQLHandler;
00053 bool l_fTransmitData;
00054
00055 l_fTransmitData = false;
00056 if( strcmp( XMLStreamer::getParamValue( p_pParameters, SQLSTREAMER_TRANSMITDATA, REQ_FALSE), REQ_TRUE)==0) l_fTransmitData=true;
00057
00058 l_SAXSQLHandler = new SAXSQLHandler( (DBStreamerFactory*) this, p_pParameters, p_pStreamContext, p_pStreamConsumer);
00059 return (XMLStreamer*) new SaxStreamer( this, p_pParameters, p_pStreamContext, p_pStreamConsumer, l_SAXSQLHandler, l_fTransmitData);
00060 }
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078 SAXSQLHandler::SAXSQLHandler( DBStreamerFactory* p_pStreamerFactory,
00079 StreamerParams* p_pParameters,
00080 XMLFlowContext* p_pStreamContext,
00081 XMLStreamConsumer* p_pStreamConsumer )
00082 {
00083 const char* l_pszConnection;
00084 const char* l_pszFilterData;
00085
00086 m_pStreamerFactory=p_pStreamerFactory;
00087 m_pContext=p_pStreamContext;
00088
00089
00090 l_pszConnection = XMLStreamer::getParamValue( p_pParameters, SQLSTREAMER_CONNECTION, "");
00091 m_pszEncloseRecord = importCharBuffer( 0, XMLStreamer::getParamValue( p_pParameters, SQLSTREAMER_ENCLOSEREC, SQLSTREAMER_ENCLOSEREC_DEF));
00092 m_pszSQLStmt = importCharBuffer( 0, XMLStreamer::getParamValue( p_pParameters, SQLSTREAMER_STATEMENT, ""));
00093 l_pszFilterData = XMLStreamer::getParamValue( p_pParameters, SQLSTREAMER_FILTERCOLS, SQLSTREAMER_FILTERCOLS_DEF);
00094 m_uiFlushPack = atoi( XMLStreamer::getParamValue( p_pParameters, SQLSTREAMER_FLUSH, SQLSTREAMER_FLUSH_DEF) );
00095 m_iCommitPack = atoi( XMLStreamer::getParamValue( p_pParameters, SQLSTREAMER_COMMIT, SQLSTREAMER_COMMIT_DEF) );
00096 m_uiBufSize= atoi( XMLStreamer::getParamValue( p_pParameters, SQLSTREAMER_BUFFER, SQLSTREAMER_BUFFER_DEF));
00097 m_uiStreamObjects= atoi( XMLStreamer::getParamValue( p_pParameters, SQLSTREAMER_STREAMRECORDS, SQLSTREAMER_STREAMRECORDS_DEF));
00098 m_fTransmitData = false;
00099 if( strcmp( XMLStreamer::getParamValue( p_pParameters, SQLSTREAMER_TRANSMITDATA, SQLSTREAMER_TRANSMITDATA_DEF), REQ_TRUE)==0) m_fTransmitData=true;
00100 m_fDBErrorFatal=true;
00101 if( strcmp( XMLStreamer::getParamValue( p_pParameters, SQLSTREAMER_ERRFATAL, SQLSTREAMER_ERRFATAL_DEF), REQ_FALSE)==0) m_fDBErrorFatal=false;
00102 m_pszConnection = importCharBuffer(0, l_pszConnection);
00103
00104
00105 m_pCNN_connection = ((DBStreamerFactory*) m_pStreamerFactory)->getConnection(m_pszConnection,m_pContext->getThreadId() );
00106
00107 m_pOutput = p_pStreamConsumer;
00108 m_varsDesc=0;
00109 m_pszCurVal = 0;
00110
00111 m_iFilterMode=2;
00112 if ( strlen( l_pszFilterData) == 0) m_iFilterMode=0;
00113 if ( strcmp( l_pszFilterData,SQLSTREAMER_FILTERCOLSAUTO) == 0) m_iFilterMode=1;
00114
00115 m_pszFilterData = 0;
00116 if(m_iFilterMode==2)
00117 {
00118 m_pszFilterData = copyCharBuffer( ",");
00119 m_pszFilterData = concatCharBuffer( m_pszFilterData, l_pszFilterData);
00120 m_pszFilterData = concatCharBuffer( m_pszFilterData, ",");
00121 upperize( m_pszFilterData);
00122 }
00123
00124 m_rsData = 0;
00125 DEBUG_CREATE(SAXSQLHandler)
00126 }
00127
00128
00129
00130
00131 SAXSQLHandler::~SAXSQLHandler()
00132
00133 {
00134 m_pszEncloseRecord = releaseCharBuffer( m_pszEncloseRecord);
00135 m_pszSQLStmt = releaseCharBuffer( m_pszSQLStmt);
00136 m_pszFilterData = releaseCharBuffer( m_pszFilterData);
00137 m_pszCurVal = releaseCharBuffer( m_pszCurVal);
00138
00139 if(m_pCNN_connection) ((DBStreamerFactory*) m_pStreamerFactory)->releaseConnection(m_pszConnection, m_pContext->getThreadId());
00140
00141
00142 DEBUG_DEL(SAXSQLHandler)
00143 releaseDataStream();
00144 }
00145
00146
00147
00148
00149
00150
00151 void SAXSQLHandler::startDocument()
00152 {
00153 DEBUG_FUNC(SAXSQLHandler::startDocument)
00154 openDataStream();
00155 m_uiLevel=-1;
00156 m_iDescrPos=0;
00157 m_pszCurVal=0;
00158 m_uiFlush = 0;
00159 m_iCommit = 0;
00160 m_uiToStream = 0;
00161 m_fCurRecordError = false;
00162 }
00163
00164
00165
00166
00167 void SAXSQLHandler::characters(const XMLCh* const chars ,const unsigned int length)
00168 {
00169 char* l_pszChars = 0;
00170
00171 DEBUG_FUNC(SAXSQLHandler::characters)
00172
00173 if(m_uiLevel==2)
00174 {
00175 l_pszChars = XSTR(m_pContext->getTranscoder(), chars, length);
00176 m_pszCurVal = concatCharBuffer(m_pszCurVal, l_pszChars);
00177 l_pszChars = releaseCharBuffer( l_pszChars);
00178 }
00179 }
00180
00181
00182
00183
00184 void SAXSQLHandler::startElement( const XMLCh* const uri, const XMLCh* const localname, const XMLCh* const qname, const Attributes& attrs)
00185 {
00186 DEBUG_FUNC(SAXSQLHandler::startElement)
00187
00188 m_uiLevel++;
00189
00190 }
00191
00192
00193
00194
00195 void SAXSQLHandler::endElement( const XMLCh* const uri, const XMLCh* const localname, const XMLCh* const qname)
00196 {
00197
00198 char l_szTagName[XMLPROCESSOR_MAX_TAG_SIZE];
00199
00200
00201 DEBUG_IN(SAXSQLHandler::endElement)
00202 PREP_CATCH_XML_FLOW_ERROR;
00203
00204 m_uiLevel--;
00205
00206
00207 if(m_uiLevel==1)
00208 {
00209 WATCH_XML_FLOW_ERROR
00210 {
00211 if(!m_fCurRecordError)
00212 {
00213 m_fCurRecordError = true;
00214 setInVar( XSTR(m_pContext->getTranscoder(), localname, (char*) &l_szTagName),m_pszCurVal);
00215 m_fCurRecordError = false;
00216 }
00217 }
00218 CATCH_XML_FLOW_ERROR_RELEASE_AND_RETURN
00219 m_pszCurVal = releaseCharBuffer( m_pszCurVal);
00220 }
00221
00222
00223 if(m_uiLevel==0)
00224 {
00225 m_uiFlush++;
00226 m_iCommit++;
00227
00228 if(! m_fCurRecordError)
00229 {
00230 WATCH_XML_FLOW_ERROR
00231 {
00232 if( (m_uiFlushPack!=0) && (m_uiFlush >= m_uiFlushPack)) flushRecords();
00233 }
00234 CATCH_XML_FLOW_ERROR_RELEASE_AND_RETURN
00235 }
00236 else
00237 {
00238 rollbackRecords();
00239 m_fCurRecordError = false;
00240 }
00241
00242 m_iDescrPos=0;
00243 }
00244
00245 RELEASE_AND_RETURN:
00246
00247 DEBUG_OUT(SAXSQLHandler::endElement)
00248 ON_XML_FLOW_ERROR_DO
00249 {
00250 if( m_fDBErrorFatal ) throw l_pXMLFlowException;
00251 else delete l_pXMLFlowException;
00252 }
00253 }
00254
00255
00256
00257
00258 void SAXSQLHandler::endDocument()
00259 {
00260 DEBUG_FUNC(SAXSQLHandler::endDocument)
00261 PREP_CATCH_XML_FLOW_ERROR;
00262
00263 WATCH_XML_FLOW_ERROR {
00264
00265
00266 logout << "\t" << SQLSTREAMER_TAGNAME<< " : " << "flush last records\n";
00267 flushRecords();
00268 if(m_iCommitPack!=-1)
00269 {
00270 logout << "\t" << SQLSTREAMER_TAGNAME<< " : " << "commit last records\n";
00271 if(! m_fCurRecordError) commitRecords();
00272 }
00273
00274 } CATCH_XML_FLOW_ERROR_RELEASE_AND_RETURN;
00275
00276 RELEASE_AND_RETURN:
00277
00278 releaseDataStream();
00279 ON_XML_FLOW_ERROR_DO
00280 {
00281 if( m_fDBErrorFatal) throw l_pXMLFlowException;
00282 else delete l_pXMLFlowException;
00283 }
00284 }
00285
00286
00287
00288
00289
00290
00291
00292
00293 void SAXSQLHandler::openDataStream()
00294 {
00295 DEBUG_FUNC(SQLHandler::executeSQL)
00296 PREP_CATCH_XML_FLOW_ERROR;
00297
00298 WATCH_XML_FLOW_ERROR {
00299
00300
00301 logout << "\t" << SQLSTREAMER_TAGNAME<< " : " << "opening db stream for " << m_pszSQLStmt << "...\n";
00302
00303 try
00304 {
00305 m_rsData = 0;
00306 m_rsData=OTL_openDataStream( m_pszSQLStmt, m_pCNN_connection, m_uiBufSize);
00307 }
00308 catch(otl_exception& p)
00309 {
00310
00311 char* msg=copyCharBuffer( (char*)(&(p.msg)));
00312 msg=concatCharBuffer( msg," - ");
00313 msg=concatCharBuffer( msg, m_pszSQLStmt);
00314 MAKE_XMLFLOW_EXCEPTION( ERRCODE_LOC_SQLHANDLER + ERRCODE_CAUSE_DATAACCESS , "Error while opening data stream. ", msg, "SAXSQLHandler::openDataStream", "", false);
00315 msg = releaseCharBuffer( msg);
00316 p.~otl_tmpl_exception();
00317 OTL_releaseDataStream( m_rsData);
00318 m_rsData = 0;
00319 m_varsDesc = 0;
00320 goto RELEASE_AND_RETURN;
00321 }
00322
00323 m_varsDesc=m_rsData->describe_in_vars(m_iDescLen);
00324 logout << "\t" << SQLSTREAMER_TAGNAME<< " : " << m_iDescLen << " bind variables\n";
00325
00326 } CATCH_XML_FLOW_ERROR_RELEASE_AND_RETURN;
00327
00328 RELEASE_AND_RETURN:
00329 ON_XML_FLOW_ERROR_THROW;
00330 }
00331
00332
00333
00334
00335 void SAXSQLHandler::releaseDataStream()
00336 {
00337 DEBUG_FUNC(SQLHandler::executeSQL)
00338
00339 if(m_varsDesc)
00340 {
00341 m_varsDesc->~otl_var_desc();
00342 m_varsDesc=0;
00343 }
00344 if( m_rsData)
00345 {
00346 OTL_releaseDataStream( m_rsData);;
00347 m_rsData=0;
00348 }
00349 }
00350
00351
00352
00353
00354
00355
00356 void SAXSQLHandler::flushRecords()
00357 {
00358 PREP_CATCH_XML_FLOW_ERROR;
00359
00360
00361 try
00362 {
00363 logout << "\t" << SQLSTREAMER_TAGNAME<< " : " << "flush "<< m_uiFlush << " records\n";
00364 m_rsData->flush();
00365 m_uiFlush=0;
00366 makeXML();
00367 }
00368 catch(otl_exception& p)
00369 {
00370
00371 MAKE_XMLFLOW_EXCEPTION ( ERRCODE_LOC_SQLHANDLER + ERRCODE_CAUSE_DATAACCESS , "Error while flushing records.",(char*)(&(p.msg)), "SAXSQLHandler::flushRecords", "", false);
00372 p.~otl_tmpl_exception();
00373 rollbackRecords();
00374 }
00375 if( (m_iCommitPack>0) && (m_iCommit >= m_iCommitPack)) commitRecords();
00376
00377
00378 ON_XML_FLOW_ERROR_THROW;
00379
00380 }
00381
00382
00383
00384
00385 void SAXSQLHandler::commitRecords()
00386 {
00387 PREP_CATCH_XML_FLOW_ERROR;
00388
00389 if(m_iCommit>0)
00390 {
00391 try
00392 {
00393 logout << "\t" << SQLSTREAMER_TAGNAME<< " : " << "commit "<< m_iCommit << " records..";;
00394 m_pCNN_connection->commit();
00395 m_uiFlush=0;
00396 m_iCommit=0;
00397 logout << ".ok\n";
00398 }
00399 catch(otl_exception& p)
00400 {
00401 MAKE_XMLFLOW_EXCEPTION ( ERRCODE_LOC_SQLHANDLER + ERRCODE_CAUSE_DATAACCESS , "Error while committing records." ,(char*)(&(p.msg)), "SAXSQLHandler::commitRecords", "", false);
00402 p.~otl_tmpl_exception();
00403 rollbackRecords();
00404 }
00405 }
00406
00407 ON_XML_FLOW_ERROR_THROW;
00408 }
00409
00410
00411
00412
00413 void SAXSQLHandler::rollbackRecords()
00414 {
00415 PREP_CATCH_XML_FLOW_ERROR;
00416
00417
00418
00419
00420 try
00421 {
00422 logout << "\t" << SQLSTREAMER_TAGNAME<< " : " << "rollback "<< m_iCommit << " records.\n";
00423 m_rsData->clean(1);
00424 m_pCNN_connection->rollback();
00425 m_uiFlush=0;
00426 m_iCommit = 0;
00427 }
00428 catch(otl_exception& p)
00429 {
00430 MAKE_XMLFLOW_EXCEPTION ( ERRCODE_LOC_SQLHANDLER + ERRCODE_CAUSE_DATAACCESS , "Error while rollbacking records." ,(char*)(&(p.msg)), "SAXSQLHandler::rollbackRecords", "", false);
00431 p.~otl_tmpl_exception();
00432 }
00433
00434
00435
00436
00437
00438
00439 ON_XML_FLOW_ERROR_THROW;
00440 }
00441
00442
00443
00444
00445 void SAXSQLHandler::setInVar( const char* p_pszName, const char* p_pszVarValue)
00446 {
00447
00448 bool l_boolInsert;
00449 char* l_pszSearchName=0;
00450 char* l_pszVarName=0;
00451
00452
00453 DEBUG_FUNC(SQLHandler::setInVar)
00454 PREP_CATCH_XML_FLOW_ERROR;
00455
00456 if(!m_rsData && !m_fDBErrorFatal) goto RELEASE_AND_RETURN;
00457
00458 WATCH_XML_FLOW_ERROR {
00459 l_boolInsert=false;
00460
00461 if(m_varsDesc && (m_iDescLen> m_iDescrPos) )
00462 {
00463 switch(m_iFilterMode)
00464 {
00465 case 0:
00466 l_boolInsert=true;
00467 break;
00468 case 1:
00469 if( strlen(m_varsDesc[m_iDescrPos].name)>0)
00470 {
00471 l_pszSearchName = copyCharBuffer( ":");
00472 l_pszSearchName = concatCharBuffer( l_pszSearchName, p_pszName);
00473 upperize( l_pszSearchName);
00474 l_pszVarName = copyCharBuffer( m_varsDesc[m_iDescrPos].name);
00475 upperize( l_pszVarName);
00476
00477 if( strcmp( l_pszSearchName, l_pszVarName) == 0)
00478 {
00479 l_boolInsert=true;
00480 }
00481
00482 l_pszVarName = releaseCharBuffer( l_pszVarName);
00483 l_pszSearchName = releaseCharBuffer( l_pszSearchName);
00484
00485 }
00486 else
00487 {
00488 l_boolInsert=true;
00489 }
00490 break;
00491 case 2:
00492 l_pszSearchName = copyCharBuffer( ",");
00493 l_pszSearchName = concatCharBuffer( l_pszSearchName, p_pszName);
00494 l_pszSearchName = concatCharBuffer( l_pszSearchName, ",");
00495 upperize( l_pszSearchName);
00496 if( strstr( m_pszFilterData, l_pszSearchName) ) l_boolInsert=true;
00497 l_pszSearchName = releaseCharBuffer( l_pszSearchName);
00498 break;
00499 }
00500
00501 if(l_boolInsert)
00502 {
00503 try
00504 {
00505 OTL_insertInVar(m_rsData, m_varsDesc[m_iDescrPos].ftype, p_pszVarValue);
00506 }
00507 catch(otl_exception& p)
00508 {
00509 MAKE_XMLFLOW_EXCEPTION ( ERRCODE_LOC_SQLHANDLER + ERRCODE_CAUSE_DATAACCESS , "Error while sending data." ,(char*)(&(p.msg)), "SAXSQLHandler::setInVar", "", false);
00510 p.~otl_tmpl_exception();
00511 }
00512 m_iDescrPos++;
00513 }
00514 }
00515 } CATCH_XML_FLOW_ERROR_RELEASE_AND_RETURN;
00516
00517 RELEASE_AND_RETURN:
00518 l_pszVarName = releaseCharBuffer( l_pszVarName);
00519 l_pszSearchName = releaseCharBuffer( l_pszVarName);
00520
00521 ON_XML_FLOW_ERROR_THROW;
00522 }
00523
00524
00525
00526
00527
00528 void SAXSQLHandler::makeXML()
00529 {
00530
00531 otl_column_desc* l_pColumnDesc=0;
00532 char* l_cColumnName=0;
00533 int l_intDescLen;
00534 char* l_pszValue=0;
00535 const char* l_pszXMLValue=0;
00536 int l_iSize=0;
00537 int l_iSize2=0;
00538 bool l_fEnclose;
00539 int i;
00540 otl_var_desc* l_pOutVarDesc=0;
00541
00542
00543 DEBUG_IN(SQLHandler::makeXML)
00544 PREP_CATCH_XML_FLOW_ERROR;
00545
00546 if(!m_fTransmitData)
00547 {
00548
00549 if(!m_rsData && !m_fDBErrorFatal) goto RELEASE_AND_RETURN;
00550
00551
00552 l_pColumnDesc=0;
00553
00554 if(!m_rsData->eof())
00555 {
00556 logout << "\t" << SQLSTREAMER_TAGNAME<< " : " << "make xml\n";
00557 l_pColumnDesc = m_rsData->describe_select(l_intDescLen);
00558
00559 if(!l_intDescLen)
00560 {
00561 l_pColumnDesc = 0;
00562 l_pOutVarDesc = m_rsData->describe_out_vars(l_intDescLen);
00563 }
00564
00565
00566 l_fEnclose=false;
00567 if( strlen( m_pszEncloseRecord)>0) l_fEnclose=true;
00568
00569
00570 while( !m_rsData->eof())
00571 {
00572 if(l_fEnclose) (*m_pOutput) << "\n<" << m_pszEncloseRecord << ">\n";
00573
00574 for( i=0 ; i< l_intDescLen ; i++)
00575 {
00576 WATCH_XML_FLOW_ERROR
00577 {
00578 if(l_pColumnDesc)
00579 {
00580 l_cColumnName=l_pColumnDesc[i].name;
00581 l_iSize = l_pColumnDesc[i].dbsize;
00582 }
00583 else
00584 {
00585 l_cColumnName=l_pOutVarDesc[i].name+1;
00586 l_iSize = l_pOutVarDesc[i].elem_size;
00587 }
00588 if(l_iSize>l_iSize2) l_pszValue = (char*) realloc(l_pszValue, l_iSize+1);
00589 l_iSize2=l_iSize;
00590 try
00591 {
00592 (*m_rsData) >> l_pszValue;
00593 }
00594 catch(otl_exception& p)
00595 {
00596 MAKE_XMLFLOW_EXCEPTION ( ERRCODE_LOC_SQLHANDLER + ERRCODE_CAUSE_DATAACCESS , "Error while extracting data." , (char*)(&(p.msg)), "SAXSQLHandler::makeXML", "", false);
00597 p.~otl_tmpl_exception();
00598 goto RELEASE_AND_RETURN;
00599 }
00600
00601 if(!m_rsData->is_null()) l_pszXMLValue = l_pszValue;
00602 else l_pszXMLValue = REQ_NULL;
00603 (*m_pOutput) << "\t<" << l_cColumnName << "><![CDATA[" << l_pszXMLValue << "]]></" << l_cColumnName << ">\n";
00604 }
00605 CATCH_XML_FLOW_ERROR_RELEASE_AND_RETURN;
00606 }
00607
00608 if(l_fEnclose) (*m_pOutput) << "</" << m_pszEncloseRecord << ">\n";
00609
00610 m_uiToStream++;
00611
00612 if((m_uiToStream >=m_uiStreamObjects)|| (m_rsData->eof()))
00613 {
00614 logout << "\t" << SQLSTREAMER_TAGNAME<< " : " << "output "<< m_uiToStream <<" record.\n";
00615 WATCH_XML_FLOW_ERROR
00616 {
00617 m_pOutput->commitStream(false);
00618 }
00619 CATCH_XML_FLOW_ERROR_RELEASE_AND_RETURN;
00620 m_uiToStream=0;
00621 }
00622 }
00623
00624 }
00625 }
00626
00627 RELEASE_AND_RETURN:
00628
00629 free( l_pszValue);
00630
00631 DEBUG_OUT(SQLHandler::makeXML)
00632 ON_XML_FLOW_ERROR_THROW;
00633 }
00634
00635
00636
00637
00638 END_XDFLENGINE_NS
00639