Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

many fixes to reuse a connection and handle all errors #38

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

shimondoodkin
Copy link

many fixes to reuse a connection and handle all errors
add them if you like them

my keep connection module:

verticadb.js: (seems working tested manually once only on a simple insert query-may not work for other's queries, or there might be unexpected errors)

var Vertica = require('vertica');

vertica_pritdebug=false;

var RetryQueue=function(cb){
 this.q=[];
 this.cb=cb;//callback to execute an item in the queue
};
RetryQueue.prototype={ //simple async retry queue
 q:null,//array
 cb:null,// function(cb){ cb(true/false) },
 add:function(a)
 {
  this.q.push(a);
  this.run();
 },
 run:function()
 {
  if(this.q.length>0)
  {
   var that=this;
   this.cb(this.q[0],function(done)
   {
    if(done) that.q.shift();
     that.run();
   });
  }
 }
}
//queue test
//vertica_queue=new RetryQueue(function(item,cb){ console.log('queue execute item',item); if(Math.random()>0.5){console.log('queue success');cb(true)} else {console.log('queue failed');cb(false)} });
//var queuetest=1;
//setInterval(function(){ vertica_queue.add("item"+(queuetest++)); },1000);


//require('longjohn');

vertica = new Vertica.Connection({});
vertica.querycols=function querycols(query, cb)
{
 var err=null;
 var result={cols:{}};
 var query =vertica.query(query)
 //'fields' is emitted once.
 query.on('fields', function(fields) {result.fields=fields;for(var i=0,l=result.fields.length;i<l;i++) result.cols[result.fields[i].name]=[]; })
 // 'row' is emitted 0..* times, once for every row in the resultset.
 query.on('row', function(row) { for(var i=0,l=result.fields.length;i<l;i++) result.cols[result.fields[i].name].push(row[i]); })
 // 'end' is emitted once.
 query.on('end', function(status) { result.status=status; var cbb=cb;cb=function(){}; cbb(err,result) })
 // If 'error' is emitted, no more events will follow.
 // If no event handler is implemented, an exceptions gets thrown instead.
 query.on('error', function(err) { var cbb=cb;cb=function(){}; cbb(err,result) });
}

vertica.on('error', function(err) {
var printed=false;
 if(!is_expected_connection_error(err))
  {printed=true;console.log('seems unhandled vertica error',err)}

 if(vertica_pritdebug&&!printed)console.log('seems unhandled vertica error',err)

 vertica_connecting=false;
 setTimeout(vartica_connect,1000);
});

var hosts=[/*{host:'12.12.12.12',user:'',password:'some bad host'},*/{host:'127.0.0.1',user:'admin',password:'admin'}],activehost=-1;

vartica_connect_callbacks=[];
vertica_connecting=false;
vartica_connect=function(cb)
{
 try
 {
  if(vertica.conencted)
  {
   if(vertica_pritdebug)console.log('vertica.conencted already ');
   vertica_queue.run(false);
   return;
  }
  if(vertica_connecting)
  {
   if(vertica_pritdebug)console.log('connecting already ');
   return;
  }
  vertica_connecting=true;

  activehost++;
  if(activehost>hosts.length-1)activehost=hosts.length-1;
  console.log("vertica connecting:",hosts[activehost]);
  vertica.connectionOptions.host=hosts[activehost].host
  vertica.connectionOptions.user=hosts[activehost].user
  vertica.connectionOptions.password=hosts[activehost].password
  if(vertica_pritdebug)console.log('reset');
  vertica.reset();
  vertica_in_query=false;
  var timeout=false;
  vertica.connect(function(err)
  {
   if(timeout)clearTimeout(timeout);
   if(vertica_pritdebug)console.log("vertica connection result:",err?err:'success');
   vertica_connecting=false;
   if(err)
   {
    setTimeout(vartica_connect,2500);
   }
   else 
   {
    setTimeout(function(){ vertica_queue.run(false); },2500);
   }
  } ); //calback called multiple times
   timeout=setTimeout(function(){
   if(!vertica.conencted)
   {
    vertica_connecting=false;
   }
  },1000)
 }catch(e)
 {
  if(vertica_pritdebug)console.log('vertica connect catch',e.stack)
  setTimeout(vartica_connect,2500);
 }
}

function is_expected_connection_error(err)
{
     if(err.message=='called connecttion.reset') return true;
     if(err.message=='connect ECONNREFUSED') return true;
     if(err.message=='read ECONNRESET') return true;
     if(err.message=='write ECONNRESET') return true;
     if(err.message=='connect ETIMEDOUT') return true;
     if(err.message=='Invalid frontend message type 112') return true;
     if(err.message=='Node startup/recovery in progress. Not yet ready to accept connections') return true;
     if(err.message=='write EPIPE') return true;
     if(err.message=='read EPIPE') return true;
     if(err.message=='This socket is closed.') return true;
     return false;
}

safequery=function (query,cb)
{
 vertica_queue.add({query:query,cb:cb,retries:0});
}
vertica_in_query=false;
safequery_item=function (query_item,cb)
{

  function retry(){ cb(false); }
  if(!vertica.connected)
  {
    //console.log(2)
    vartica_connect(retry);
    return;
  }
  if(vertica_in_query)
  {
   if(vertica_pritdebug)console.log('vertica_in_query');
   return;
  }
  vertica_in_query=true;
  if(vertica_pritdebug)console.log('safequery_item',query_item);
  vertica.query('START TRANSACTION', function(err, resultset1)
  {
   //console.log(3)
   //console.log( err,resultset?resultset.rows:'no result');
   if(err)
   {
     //console.log(4)
     if(!is_expected_connection_error(err))
     if(vertica_pritdebug)console.log('vertica error(START TRANSACTION):',err,query_item.query)
     if(vertica.connected)
      setTimeout(retry,2500); // only if connection error...
     else
       vartica_connect(retry);
     vertica_in_query=false;
     return;
   }
   //console.log(5)
   vertica.query(query_item.query, function(err2, resultset2)
   {
    //console.log(6)
    //if(!cb)console.log( err,resultset2?resultset2.rows:'no result');
    if(err2)
    {
     //console.log(7)
     if(!is_expected_connection_error(err2))
     console.log('vertica error(query):',err2,query_item.query)
     //if(retries-->0)setTimeout(retry,2500); // only if connection error...
     //else if(cb)cb(err)
     //return;
     if(is_expected_connection_error(err2))
     {
       vertica_in_query=false;
       retry();
       return;
     }
    }
    //console.log(8)
    vertica.query('COMMIT', function(err, resultset)
    {
     //console.log(9)
     if(err)
     {
      //console.log(10)
      if(!is_expected_connection_error(err))
      if(vertica_pritdebug)console.log('vertica error(COMMIT):',err,query_item.query)
      if(vertica.connected)
       setTimeout(retry,2500); // only if connection error...
      else 
       vartica_connect(retry);
      vertica_in_query=false;
      return;
     }
     //console.log( err,resultset?resultset.rows:'no result')
     if(query_item.cb)query_item.cb(err2,resultset2);
     else console.log(err2,resultset2?resultset2.rows:'no result rows',query_item.query);
     vertica_in_query=false;
      cb(true);
     //console.log(11)
    });
   });
  });
}

vertica_queue=new RetryQueue(safequery_item);


verticaiprintq=false
verticainsert=function (t,x,cb,printq)
{
 var k=Object.keys(x)
 var ks=k.map(function(k){return '"'+k+'"'}).join(',');
 var vs=k.map(function(k){
 var v=x[k];
 if(typeof v=='string') return "'"+v+"'";
 else if(typeof v=='number') return (Math.round(v)==v?v:v.toFixed(9));
 else if(v===null)      return "null";
 else if(v===undefined)  return "null";
 else return 'E';

 }).join(',');
 var q="insert into "+t+"("+ks+") values("+vs+") ";
 if(printq||verticaiprintq)console.log('q=',q);
 safequery(q,cb);
 //vertica.query(q, cb?cb:function(err, resultset) {  if(err) console.log(x,q, err, resultset?resultset.rows:'no result'); });
}

//vertica usage examples:
//vertica_autocommit();
//var query = vertica.query("SELECT * FROM exchanges");
//'fields' is emitted once.
//query.on('fields', function(fields) {console.log("Fields:", fields)})
// 'row' is emitted 0..* times, once for every row in the resultset.
//query.on('row', function(row) {console.log(row)})
// 'end' is emitted once.
//query.on('end', function(status) {console.log("Finished!", status)})
// If 'error' is emitted, no more events will follow.
// If no event handler is implemented, an exceptions gets thrown instead.
//query.on('error', function(err) { console.log("vertica error:", err) })
//vertica.query("SELECT * FROM exchanges", function(err, resultset) {   console.log( err, resultset.fields, resultset.rows, resultset.status) });
//vertica.query("SELECT * FROM exchanges", function(err, resultset) {   console.log( err, resultset.rows) });
//vertica.querycols("SELECT * FROM exchanges", function(err, resultset) {   console.log( err, resultset.cols) })
//vertica_autocommit_off();
//safequery("insert into public.test(a) values('hello')",function(){console.log('cb',arguments)});

vertica_autocommit=function()
{
 vertica.query('SET SESSION AUTOCOMMIT TO ON', function(err, resultset) {   console.log( err,resultset?resultset.rows:'no result') });
}


vertica_autocommit_off=function()
{
 vertica.query('SET SESSION AUTOCOMMIT TO OFF', function(err, resultset) {   console.log( err,resultset?resultset.rows:'no result') });
}




//tests:
// to run a test paste a line in to node repl
// single run
// require('./verticadb.js');safequery("insert into public.test(a) values('hello')",function(){console.log('cb',arguments)});
//
// run folowing test than turn off vertica then turn on. you should see continues query times starting with null means no error. set vertica_pritdebug to true to see some errors
// require('./verticadb.js'); setInterval(function(){safequery("insert into public.test(a) values('hello "+(new Date().toString())+"')");},1000);

@wvanbergen
Copy link
Owner

Sorry for the late reply. Is there any way we can test this behaviour?

@shimondoodkin
Copy link
Author

not sure,
sorry for bad news
anyways i have found this driver is slower than
java driver. after i saw something is slow. i have tried node jdbc as a comparison and used the vertika jdbc driver.
the java driver was faster.

@shimondoodkin
Copy link
Author

maybe to test reusing connection.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants