Monday, July 17, 2006

Writing NDBAPI programs - simple primary key operations (cntd)

We continue were we left off.
Using the basic structure from the last example where we implemented a simple query, let's have a look at how to do inserts, deletes, and updates based on the primary key.

First of all, use the same table structure as last time:

> mysql -u root test
and create the following table in the test database:


create table t1 ( a integer primary key, b integer, c char(20)) engine=ndb;
insert into t1 values(1, 1, 'hello');
insert into t1 values(2, 2, 'hello again');


Insert operation - insert into t1 values (3, 1, 'good bye');

Before we do the actual insert, let us prepare the data of the tuple we should insert:

int pk_a =3 ; //primary key
int b=1;
char * string = "good bye";

char c[20];
memset(c,32,sizeof(c)); // mysql requires chars to be space padded.
memcpy(c,string, strlen(string)); //copy string to spacepadded storage

If we do not space pad the string we want to insert, then it will be difficult to present the data in a correct way when accessing it using SQL from the MySQL server.

When we do an insert, we simply have to get an operation on the table we want to insert the tuple in and define it should be an insertTuple operation (for brewity I leave out some of the error checking, but the complete code is at the end of this) :

op->insertTuple();

After that we have to define the primary key we want to write:

op->equal("a", (const char*)&a);

and then for the rest of the non-pk columns:

op->setValue("b", (const char*)&b);

op->setValue("c", (const char*)c);


After that, we call commit the transaction as in the previous example. Quite simple!



Update operation - update t1 set b=4, c='mysql cluster rock' where a=2;

Again, we have to define the data for the tuple we want to update:

int pk_a =2; //primary key
int b=4;
char * string = "mysql cluster rocks";

char c[20];
memset(c,32,sizeof(c)); // mysql requires chars to be space padded.
memcpy(c,string, strlen(string)); //copy string to spacepadded storage

And we simple denote that this should be an update operation:

op->updateTuple();

After that we have to define the primary key of the tuple we want to update:

op->equal("a", (const char*)&a);

and then for the rest of the non-pk columns:

op->setValue("b", (const char*)&b);

op->setValue("c", (const char*)c);


And commit! Voila!


Delete operation - delete from t1 where a=1;

For the delete operation it is only necessary to give the primary key of the tuple we want to delete:

int pk_a =1; //primary key

And define that we want to do a delete operation:

op->deleteTuple();


After that we have to define the primary key of the tuple we want to update:

op->equal("a", (const char*)&a);

Commit!

Source code

(Compile and link as in previous examples)


#include
#include

#include



using namespace std;

/**
* prototypes
*/
int pk_read( Ndb * ndb);
int pk_insert( Ndb * ndb);
int pk_update( Ndb * ndb);
int pk_delete( Ndb * ndb);





int main()
{
ndb_init();
/**
* define a connect string to the management server
*/
char * connectstring = "localhost";

/**
* Create a Ndb_cluster_connection object using the connectstring
*/
Ndb_cluster_connection * conn = new Ndb_cluster_connection(connectstring);


/**
* Connect to the management server
* try 12 times, wait 5 seconds between each retry,
* and be verbose (1), if connection attempt failes
*/
if(conn->connect(12, 5, 1) != 0)
{
cout << "Unable to connect to management server." <<>wait_until_ready(30,0) <0) ndb =" new">init() != 0)
{
/**
* just exit if we can't initialize the Ndb object
*/
return -1;
}



if (pk_read( ndb ) < a="1;" pk_a =" 1;" b="0;" trans =" ndb-">startTransaction();

/**
* If there is an error then we should check the
* calling object to find out what the problem is.
* The error is stored in an NdbError struct and contains
* an error message, error code etc.
* Later on, we will do more elaborate error handling, but for
* now we will just print out the error message!
*/
if ( trans == 0)
{
NdbError err = ndb->getNdbError();
cout << "err code: " << op =" trans-">getNdbOperation("t1");

/**
* if op == 0 then we have to check the error.
* The error is stored at the parent in call stack.
* We also have to close the transaction since it is no longer
* usable!!
*/
if ( op == 0)
{
trans->close();
NdbError err = trans->getNdbError();
cout << "err code: " <<>committedRead();


/**
* Define the primary key!
*/

if (op->equal("a", (const char*)&pk_a) <>close();
NdbError err = op->getNdbError();
cout << "err code: " <<>getValue("b", (char*)&b) == 0)
{
trans->close();
NdbError err = op->getNdbError();
cout << "err code: " <<>getValue("c", c) == 0)
{
trans->close();
NdbError err = op->getNdbError();
cout << "err code: " <<>execute(NdbTransaction::Commit,
NdbTransaction::AbortOnError,
1) <>close();

/**
* If there is an error, it is stored in NdbError.
*/
NdbError err = trans->getNdbError();
cout << "err code: " <<>close();


/**
* print out the result:
*/

cout << "b= " << c= " << c << endl; return 0; } /** * This function implements * insert into t1 values (3, 1, 'good bye'); */ int pk_insert(Ndb * ndb) { /** * Define the tuple to insert: */ int pk_a =3 ; //primary key int b=1; char * string = " trans =" ndb-">startTransaction();
if ( trans == 0)
{
NdbError err = ndb->getNdbError();
cout << "err code: " << op =" trans-">getNdbOperation("t1");
if ( op == 0)
{
trans->close();
NdbError err = trans->getNdbError();
cout << "err code: " <<>insertTuple();


/**
* Define the primary key!
*/

if (op->equal("a", (const char*)&pk_a) <>close();
NdbError err = op->getNdbError();
cout << "err code: " <<>setValue("b", (const char*)&b) <>close();
NdbError err = op->getNdbError();
cout << "err code: " <<>setValue("c", (const char*)c) <>close();
NdbError err = op->getNdbError();
cout << "err code: " <<>execute(NdbTransaction::Commit,
NdbTransaction::AbortOnError,
1) <>close();

/**
* If there is an error, it is stored in NdbError.
*/
NdbError err = trans->getNdbError();
cout << "err code: " <<>close();

return 0;
}



/**
* This function implements
* update t1 set b=4, c='mysql cluster rocks' where a=2;
*/
int pk_update(Ndb * ndb)
{
/**
* Define the tuple to insert:
*/
int pk_a =2 ; //primary key
int b=4;
char * string = "mysql cluster rocks";

char c[20];
memset(c,32,sizeof(c)); // mysql requires chars to be space padded.
memcpy(c,string, strlen(string)); //copy string to spacepadded storage


NdbTransaction * trans = ndb->startTransaction();
if ( trans == 0)
{
NdbError err = ndb->getNdbError();
cout << "err code: " << op =" trans-">getNdbOperation("t1");
if ( op == 0)
{
trans->close();
NdbError err = trans->getNdbError();
cout << "err code: " <<>updateTuple();


/**
* Define the primary key!
*/

if (op->equal("a", (const char*)&pk_a) <>close();
NdbError err = op->getNdbError();
cout << "err code: " <<>setValue("b", (const char*)&b) <>close();
NdbError err = op->getNdbError();
cout << "err code: " <<>setValue("c", (const char*)c) <>close();
NdbError err = op->getNdbError();
cout << "err code: " <<>execute(NdbTransaction::Commit,
NdbTransaction::AbortOnError,
1) <>close();

/**
* If there is an error, it is stored in NdbError.
*/
NdbError err = trans->getNdbError();
cout << "err code: " <<>close();

return 0;
}



/**
* This function implements
* delete from t1 where a=1
*/
int pk_delete(Ndb * ndb)
{
/**
* Define the tuple to delete:
*/
int pk_a =1 ; //primary key


NdbTransaction * trans = ndb->startTransaction();
if ( trans == 0)
{
NdbError err = ndb->getNdbError();
cout << "err code: " << op =" trans-">getNdbOperation("t1");
if ( op == 0)
{
trans->close();
NdbError err = trans->getNdbError();
cout << "err code: " <<>deleteTuple();


/**
* Define the primary key!
*/

if (op->equal("a", (const char*)&pk_a) <>close();
NdbError err = op->getNdbError();
cout << "err code: " <<>execute(NdbTransaction::Commit,
NdbTransaction::AbortOnError,
1) <>close();

/**
* If there is an error, it is stored in NdbError.
*/
NdbError err = trans->getNdbError();
cout << "err code: " <<>close();

return 0;
}

Sunday, July 09, 2006

Writing NDBAPI programs - simple primary key operations

Last time we looked at how to connect to MySQL Cluster using the NDBAPI. This time we will look into how to implement simple primary key operations.
First of all we need a table to operate on. Connect a mysql client to the MySQL server:
> mysql -u root
and create the following table in the test database:

create table t1 ( a integer primary key, b integer, c char(20)) engine=ndb;

Insert some data into it:

insert into t1 values(1, 1, 'hello');
insert into t1 values(2, 2, 'hello again');

On this table we then want to issue the query:

SELECT b,c FROM t1 WHERE a=1;

expressed in the NDBAPI.


Before we start...


With Ndb_cluster_connection that we discussed in the last example, we were able to connect to MySQL Cluster.
However, we have not yet connected to a database. This service is provided in the Ndb class (note that the code excerps comes from the example code show at the end).

The line:

Ndb * ndb = new Ndb(conn, "test");

initialises an ndb object. The next thing to do is to initiliase it in order to setup some internal buffers for e.g transaction records etc :

if ( ndb->init() != 0)
{
//failure
}

Finally we are able to do we are able what we want to do!! Start transactions!!


Transactions..

In the NDBAPI, the recipe to perform a transaction is as follows:
  1. Start a transaction
  2. Define one or more operations within the transaction
  3. For each primary key operation define the primary key and what to read or write
  4. Execute the transaction
  5. Close the transaction
Transactions are represented in the class NdbTransaction. In order to start a transaction we have to get a transaction record (a pointer to a NdbTransaction object) from the Ndb object:

NdbTransaction * trans = ndb->startTransaction();
if (trans == 0) {/*failure*/ return -1;}

From the transaction record it is now possible to define an operation on a particular table:

NdbOperation * op = trans->getNdbOperation("t1");
if (op == 0) {/*failure*/ trans->close(); return -1;}

Please note that we have to close the transaction if anything fails.

Now we have to define what type of operation we want to do.
We can define the following types of operations on the primary key:
  • committedRead() - read w/o lock
  • readTuple(NdbOperation::LM_Read) - shared lock
  • readTuple(NdbOperation::LM_Exclusive) - exclusive lock
  • insertTuple()
  • deleteTuple()
  • updateTuple()
We want to do a committed read so we do:

op->committedRead();

At this stage we have now started a transaction, defined an committed operation on table t1, but we have not decided what we want to read! That is, we have to define the primary key! The primary key is defined by calling NdbOperation::equal (..) on all parts of the primary key (this must be done prior to any NdbOperation::setValue/getValue is called):

...
int pk_a=1; //define a variable to represent the primary key (where a=1)
...
if (op->equal("a", (const char*)&pk_a) trans->close(); return -1}

The above sets matches, by using the method equal , the column "a" to the value stored in pk_a. which the address is taken of and is casted to a const char*.

Then we want to project the corresponding values of column "b" and "c" which matches a=1. To do this we need to define storage, because we want to store the results somewhere so that we can use them!

int b=0;
char c[21]; //column size + 1 in order to get a null-terminated string
memset(c,0,sizeof(c)); //set it to 0

To read a value, we use NdbOperation::getValue(..). This method must be called after that the primary key has been defined. NdbOperation::getValue(..) takes a column name and an buffer to where to store the result. It will also return a NdbRecAttr object, which can be used to check if what we read was null, and actually it will also store the result, but it is preferred to store the result in a user defined buffer in order to avoid unnecessary copying of data.

So, to store the value of column "b" in the buffer "int b=0" is done as follows:

if (op->getValue("b", (char*)&b) == 0) { /*failure*/ trans->close(); return -1;}

Again, we have to get the adress of the storage and cast it to a char*.
Then to store the value of column "c" in the buffer "char c[21]" is done as follows:

if (op->getValue("c", (char*)&c) == 0) { /*failure*/ trans->close(); return -1;}

At this point we have now defined the query SELECT b,c FROM t1 WHERE a=1, but we have not yet executed it. Thus, nothing has been sent to the NDB nodes (data nodes) at this stage. In order to do that we have to call NdbTransaction::execute(...):

if( trans->execute(NdbTransaction::Commit,
NdbTransaction::AbortOnError,
1) { .. trans->close();...}

The above sends the transaction and all operations (currently one) to the transaction coordinator on one of the NDB nodes (data nodes). Futhermore, it says that the transaction should be committed (had we taken any locks, then they would be released), and abort if there is an error and the magical "1" means that NDBAPI will send it directly (remember ndb_force_send=1 in my.cnf?).

Example


cat ndbapi_pk_read.cpp :

#include <ndb_init.h>
#include <NdbApi.hpp>

#include <iostream>



using namespace std;

/**
* prototype for pk_read function
*/
int pk_read( Ndb * ndb);





int main()
{
ndb_init();
/**
* define a connect string to the management server
*/
char * connectstring = "localhost";

/**
* Create a Ndb_cluster_connection object using the connectstring
*/
Ndb_cluster_connection * conn = new Ndb_cluster_connection(connectstring);


/**
* Connect to the management server
* try 12 times, wait 5 seconds between each retry,
* and be verbose (1), if connection attempt failes
*/
if(conn->connect(12, 5, 1) != 0)
{
cout << "Unable to connect to management server." << endl;
return -1;
}

/**
* Join the cluster
* wait for 30 seconds for first node to be alive, and 0 seconds
* for the rest.
*/
if (conn->wait_until_ready(30,0) <0)
{
cout << "Cluster nodes not ready in 30 seconds." << endl;
return -1;
}



/**
* The first thing we have to do is to instantiate an Ndb object.
* The Ndb object represents a connection to a database.
* It is important to note that the Ndb object is not thread safe!!
* Thus, if it is a multi-threaded application, then typically each thread
* uses its own Ndb object.
*
* Now we create an Ndb object that connects to the test database.
*/

Ndb * ndb = new Ndb(conn, "test");


if (ndb->init() != 0)
{
/**
* just exit if we can't initialize the Ndb object
*/
return -1;
}



if (pk_read( ndb ) < 0)
{
cout << "failed to read tuple" << endl;
return -1;
}


return 0;

}


/**
* This function implements
* SELECT b,c FROM t1 WHERE a=1;
*/

int pk_read(Ndb * ndb)
{
/**
* Define a variable to hold the primary key
*/
int pk_a = 1;

/**
* Define storage for a and b
* b is an integer in t1
* c is a char(20) in t1.
* char's in the mysql are spacepadded to the end.
* In order to get a null terminated string, c has
* the length + 1 (21 bytes in total), and then memset is
* used to null-terminate the string.
* Varsized attributes are handled differently and we will
* look at this later.
*/
int b=0;
char c[21];
memset(c,0,sizeof(c));

/**
* Start a transaction.
* Later on the transaction must be closed, irrespectively of
* success or failure!
*/
NdbTransaction * trans = ndb->startTransaction();

/**
* If there is an error then we should check the
* calling object to find out what the problem is.
* The error is stored in an NdbError struct and contains
* an error message, error code etc.
* Later on, we will do more elaborate error handling, but for
* now we will just print out the error message!
*/
if ( trans == 0)
{
NdbError err = ndb->getNdbError();
cout << "err code: " << err.code << " err msg: "
<< err.message << endl;
return -1;
}

/**
* Get an operation record on table t1 to do a primary key operation
*/
NdbOperation * op = trans->getNdbOperation("t1");

/**
* if op == 0 then we have to check the error.
* The error is stored at the parent in call stack.
* We also have to close the transaction since it is no longer
* usable!!
*/
if ( op == 0)
{
trans->close();
NdbError err = trans->getNdbError();
cout << "err code: " << err.code << " err msg: "
<< err.message << endl;
return -1;
}

/**
* Define the operation to be a committed read (read w/o lock)
*/
op->committedRead();


/**
* Define the primary key!
*/

if (op->equal("a", (const char*)&pk_a) < 0)
{
trans->close();
NdbError err = op->getNdbError();
cout << "err code: " << err.code << " err msg: "
<< err.message << endl;
return -1;
}

if (op->getValue("b", (char*)&b) == 0)
{
trans->close();
NdbError err = op->getNdbError();
cout << "err code: " << err.code << " err msg: "
<< err.message << endl;
return -1;
}
if (op->getValue("c", c) == 0)
{
trans->close();
NdbError err = op->getNdbError();
cout << "err code: " << err.code << " err msg: "
<< err.message << endl;
return -1;
}


/**
* Now we have defined a transaction consisting of
* one operation. It is of course possible to define
* many operations in one transaction that either read
* from the same table or from other tables. This is
* called batching and is very efficient.
* It is important to know that nothing (absolutely nothing)
* has been sent to the ndbd nodes at this stage and we do
* not have anything in the result buffers.
* In order to send the transaction to ndbd, we have to call
* execute!
* execute can be called in many different ways (we will also
* look at them at a later stage), but this particular
* execute does a commit of the transaction and aborts if anything
* goes wrong. Since we only do a committed read, the commit will not commit
* any changes or release any locks, but a committed transaction is
* "finished" and the only thing we can do then is to call close on
* the transaction, which we must do, irrespectively of success or failure!
*/

if( trans->execute(NdbTransaction::Commit,
NdbTransaction::AbortOnError,
1) < 0)
{
/**
* The transaction must always be closed!
*/
trans->close();

/**
* If there is an error, it is stored in NdbError.
*/
NdbError err = trans->getNdbError();
cout << "err code: " << err.code << " err msg: " << err.message << endl;
return -1;
}
/**
* The transaction must always be closed!
*/
trans->close();


/**
* print out the result:
*/

cout << "b= " << b << " c= " << c << endl;

return 0;
}




Compile it:

g++ -c -Wall -fno-exceptions -I/home/johan/mysql/include/ -I/home/johan/mysql/include/ndb -I/home/johan/mysql/include/ndb/ndbapi ndbapi_pk_read.cpp

g++ ndbapi_pk_read.o -L/home/johan/mysql/lib -lndbclient -lmysys -lmystrings -lpthread -o ndbapi_pk_read