Last time we looked at how to connect to MySQL Cluster using the NDBAPI. This time we will look into how to implement simple primary key operations.
First of all we need a table to operate on. Connect a mysql client to the MySQL server:
> mysql -u root
and create the following table in the
test database:
create table t1 ( a integer primary key, b integer, c char(20)) engine=ndb;
Insert some data into it:
insert into t1 values(1, 1, 'hello');
insert into t1 values(2, 2, 'hello again');
On this table we then want to issue the query:
SELECT b,c FROM t1 WHERE a=1;
expressed in the NDBAPI.
Before we start...With
Ndb_cluster_connection that we discussed in the last example, we were able to connect to MySQL Cluster.
However, we have not yet connected to a database. This service is provided in the
Ndb class (note that the code excerps comes from the example code show at the end).
The line:
Ndb * ndb = new Ndb(conn, "test");
initialises an ndb object. The next thing to do is to initiliase it in order to setup some internal buffers for e.g transaction records etc :
if ( ndb->init() != 0) { //failure}Finally we are able to do we are able what we want to do!! Start transactions!!
Transactions..In the NDBAPI, the recipe to perform a transaction is as follows:
- Start a transaction
- Define one or more operations within the transaction
- For each primary key operation define the primary key and what to read or write
- Execute the transaction
- Close the transaction
Transactions are represented in the class NdbTransaction. In order to start a transaction we have to get a transaction record (a pointer to a NdbTransaction object) from the Ndb object:
NdbTransaction * trans = ndb->startTransaction();
if (trans == 0) {/*failure*/ return -1;}
From the transaction record it is now possible to define an operation on a particular table:
NdbOperation * op = trans->getNdbOperation("t1");
if (op == 0) {/*failure*/ trans->close(); return -1;}
Please note that we
have to close the transaction if anything fails.
Now we have to define what type of operation we want to do.
We can define the following types of operations on the primary key:
- committedRead() - read w/o lock
- readTuple(NdbOperation::LM_Read) - shared lock
- readTuple(NdbOperation::LM_Exclusive) - exclusive lock
- insertTuple()
- deleteTuple()
- updateTuple()
We want to do a committed read so we do:
op->committedRead();
At this stage we have now started a transaction, defined an committed operation on table t1, but we have not decided what we want to read! That is, we have to define the primary key! The primary key is defined by calling
NdbOperation::equal (..) on all parts of the primary key (this must be done prior to any
NdbOperation::setValue/getValue is called):
...
int pk_a=1; //define a variable to represent the primary key (where a=1)
...
if (op->equal("a", (const char*)&pk_a) trans->close(); return -1}
The above sets matches, by using the method
equal , the column "a" to the value stored in pk_a. which the address is taken of and is casted to a const char*.
Then we want to project the corresponding values of column "b" and "c" which matches a=1. To do this we need to define storage, because we want to store the results somewhere so that we can use them!
int b=0;
char c[21]; //column size + 1 in order to get a null-terminated string
memset(c,0,sizeof(c)); //set it to 0
To read a value, we use
NdbOperation::getValue(..). This method must be called
after that the primary key has been defined.
NdbOperation::getValue(..) takes a column name and an buffer to where to store the result. It will also return a NdbRecAttr object, which can be used to check if what we read was null, and actually it will also store the result, but it is preferred to store the result in a user defined buffer in order to avoid unnecessary copying of data.
So, to store the value of column "b" in the buffer "int b=0" is done as follows:
if (op->getValue("b", (char*)&b) == 0) { /*failure*/ trans->close(); return -1;}
Again, we have to get the adress of the storage and cast it to a char*.
Then to store the value of column "c" in the buffer "char c[21]" is done as follows:
if (op->getValue("c", (char*)&c) == 0) { /*failure*/ trans->close(); return -1;}
At this point we have now defined the query SELECT b,c FROM t1 WHERE a=1, but we have not yet executed it. Thus,
nothing has been sent to the NDB nodes (data nodes) at this stage. In order to do that we have to call
NdbTransaction::execute(...):
if( trans->execute(NdbTransaction::Commit,
NdbTransaction::AbortOnError,
1) { .. trans->close();...}
The above sends the transaction and all operations (currently one) to the transaction coordinator on one of the NDB nodes (data nodes). Futhermore, it says that the transaction should be committed (had we taken any locks, then they would be released), and abort if there is an error and the magical "1" means that NDBAPI will send it directly (remember ndb_force_send=1 in my.cnf?).
Examplecat ndbapi_pk_read.cpp :
#include <ndb_init.h>
#include <NdbApi.hpp>
#include <iostream>
using namespace std;
/**
* prototype for pk_read function
*/
int pk_read( Ndb * ndb);
int main()
{
ndb_init();
/**
* define a connect string to the management server
*/
char * connectstring = "localhost";
/**
* Create a Ndb_cluster_connection object using the connectstring
*/
Ndb_cluster_connection * conn = new Ndb_cluster_connection(connectstring);
/**
* Connect to the management server
* try 12 times, wait 5 seconds between each retry,
* and be verbose (1), if connection attempt failes
*/
if(conn->connect(12, 5, 1) != 0)
{
cout << "Unable to connect to management server." << endl;
return -1;
}
/**
* Join the cluster
* wait for 30 seconds for first node to be alive, and 0 seconds
* for the rest.
*/
if (conn->wait_until_ready(30,0) <0)
{
cout << "Cluster nodes not ready in 30 seconds." << endl;
return -1;
}
/**
* The first thing we have to do is to instantiate an Ndb object.
* The Ndb object represents a connection to a database.
* It is important to note that the Ndb object is not thread safe!!
* Thus, if it is a multi-threaded application, then typically each thread
* uses its own Ndb object.
*
* Now we create an Ndb object that connects to the test database.
*/
Ndb * ndb = new Ndb(conn, "test");
if (ndb->init() != 0)
{
/**
* just exit if we can't initialize the Ndb object
*/
return -1;
}
if (pk_read( ndb ) < 0)
{
cout << "failed to read tuple" << endl;
return -1;
}
return 0;
}
/**
* This function implements
* SELECT b,c FROM t1 WHERE a=1;
*/
int pk_read(Ndb * ndb)
{
/**
* Define a variable to hold the primary key
*/
int pk_a = 1;
/**
* Define storage for a and b
* b is an integer in t1
* c is a char(20) in t1.
* char's in the mysql are spacepadded to the end.
* In order to get a null terminated string, c has
* the length + 1 (21 bytes in total), and then memset is
* used to null-terminate the string.
* Varsized attributes are handled differently and we will
* look at this later.
*/
int b=0;
char c[21];
memset(c,0,sizeof(c));
/**
* Start a transaction.
* Later on the transaction must be closed, irrespectively of
* success or failure!
*/
NdbTransaction * trans = ndb->startTransaction();
/**
* If there is an error then we should check the
* calling object to find out what the problem is.
* The error is stored in an NdbError struct and contains
* an error message, error code etc.
* Later on, we will do more elaborate error handling, but for
* now we will just print out the error message!
*/
if ( trans == 0)
{
NdbError err = ndb->getNdbError();
cout << "err code: " << err.code << " err msg: "
<< err.message << endl;
return -1;
}
/**
* Get an operation record on table t1 to do a primary key operation
*/
NdbOperation * op = trans->getNdbOperation("t1");
/**
* if op == 0 then we have to check the error.
* The error is stored at the parent in call stack.
* We also have to close the transaction since it is no longer
* usable!!
*/
if ( op == 0)
{
trans->close();
NdbError err = trans->getNdbError();
cout << "err code: " << err.code << " err msg: "
<< err.message << endl;
return -1;
}
/**
* Define the operation to be a committed read (read w/o lock)
*/
op->committedRead();
/**
* Define the primary key!
*/
if (op->equal("a", (const char*)&pk_a) < 0)
{
trans->close();
NdbError err = op->getNdbError();
cout << "err code: " << err.code << " err msg: "
<< err.message << endl;
return -1;
}
if (op->getValue("b", (char*)&b) == 0)
{
trans->close();
NdbError err = op->getNdbError();
cout << "err code: " << err.code << " err msg: "
<< err.message << endl;
return -1;
}
if (op->getValue("c", c) == 0)
{
trans->close();
NdbError err = op->getNdbError();
cout << "err code: " << err.code << " err msg: "
<< err.message << endl;
return -1;
}
/**
* Now we have defined a transaction consisting of
* one operation. It is of course possible to define
* many operations in one transaction that either read
* from the same table or from other tables. This is
* called batching and is very efficient.
* It is important to know that nothing (absolutely nothing)
* has been sent to the ndbd nodes at this stage and we do
* not have anything in the result buffers.
* In order to send the transaction to ndbd, we have to call
* execute!
* execute can be called in many different ways (we will also
* look at them at a later stage), but this particular
* execute does a commit of the transaction and aborts if anything
* goes wrong. Since we only do a committed read, the commit will not commit
* any changes or release any locks, but a committed transaction is
* "finished" and the only thing we can do then is to call close on
* the transaction, which we must do, irrespectively of success or failure!
*/
if( trans->execute(NdbTransaction::Commit,
NdbTransaction::AbortOnError,
1) < 0)
{
/**
* The transaction must always be closed!
*/
trans->close();
/**
* If there is an error, it is stored in NdbError.
*/
NdbError err = trans->getNdbError();
cout << "err code: " << err.code << " err msg: " << err.message << endl;
return -1;
}
/**
* The transaction must always be closed!
*/
trans->close();
/**
* print out the result:
*/
cout << "b= " << b << " c= " << c << endl;
return 0;
}
Compile it: g++ -c -Wall -fno-exceptions -I/home/johan/mysql/include/ -I/home/johan/mysql/include/ndb -I/home/johan/mysql/include/ndb/ndbapi ndbapi_pk_read.cpp
g++ ndbapi_pk_read.o -L/home/johan/mysql/lib -lndbclient -lmysys -lmystrings -lpthread -o ndbapi_pk_read