Monday, October 29, 2007

Data distribution and distribution awareness

How come performance degrade with more data nodes?
Some of you might have noticed that performance might actually drop slightly when you move from a two node cluster to a four node cluster. To understand this we need to understand how information is and tables are organized in MySQL Cluster. Moreover, I will also mention a bit on distribution awareness, a feature that can be used to minimze this performance drop in bigger configurations.

In MySQL Cluster a table divided into fragments and spread on the data nodes.
You have as many fragments (Fx) as you have data nodes (Nx).
If you have four data nodes, then a table will have four fragments:

Table T1= {F1, F2, F3, F4}

These fragments will be laid out on the data nodes as follows:

N1=F1P
N2=F2P
N3=F3P
N4=F4P

I have written FxP, where P indicates the Primary fragment and potential locking conflicts are resolved on the data node having the Primary fragment for the particular data.
But this does not give you any redundancy. For redundancy you need two two replicas (copies) of the fragments (NoOfReplicas=2 in config.ini) :

N1=F1P, F2S
N2=F2P, F1S
N3=F3P, F4S
N4=F4P, F3S

Some explanations:
  • Nx = data node x
  • S = secondary fragment (copy).
  • P = primary fragment, lock conflicts are handled.
  • Each data node has a transaction coordinator that is active.
  • Nodes sharing the same information is called a node group. NG0={N1, N2} shares the same information (same fragments), and NG1{N3, N4} shares the same information.

Transactions are started in a round-robin fashion on the cluster from the application, unless you use distribution awareness (startTransaction with a hint). Since transactions are started in round-robin, the transaction might be started on a node not having the particular data! When the the transcation coordinator (who is picked randomly by the application (mysql server of ndbapi program) receives a primary key request (read/write) it calculates a 128-bit hash on the primary key. The first 64-bits tells where in the priamry key hash table the entry should be stored and the second 64-bits identify which fragment the data should be located on. It could be a fragment residing on another node than the TC. If so, the TC has to forward the request (if it is a read it is forwared and the node having the data will reply to the application directly) to that fragment, or initiate the 2PC protocol to update both fragments that should have the data. If the update of a row identified by PK=1 hashes to F1, then both F1P and F1S has to be synchronously updated. The 2PC takes care of that and a little bit more about that later.

Pretend that you want to read data that is located in fragment F1:
  • If you do a read without a lock (CommittedRead), then we can read from both primary and secondary fragment, hence you have 50% chance on hitting the correct nodes (N1 and N2) having you data.
  • If you read with a lock, the we can only read from F1P, hence we have 25% chance of hitting the right node (N1)!
And if you want to write (delete, update, insert)
  • If you write to F1, then the nodes N1 and N2 are involved, since both copies have to be updated using the 2PC protocol.

The more nodes you have, then more node groups you will have and the probability that a read transaction will end up on a node that does not have the information increases.
For eight nodes, two replicas, you have four node groups and only 25% chance that a read without lock will go to the correct nodes.

Now, what you have seen is that performance degrades when you go from two data nodes to four nodes.
If you have two nodes, then a table is divided into two fragments:
Table T1= {F1, F2}

And the information is laid out as (with NoOfReplicas=2):
N1=F1P, F2S
N2=F2P, F1S

Pretend that you want to read data that is located in fragment F1.
  • If you do a read without a lock (CommittedRead), you have 100% chance starting the transaction on the nodes having the data (N1 and N2 both has the data in F1P or F1S)!
  • If you read with a lock, the we can only read from F1P, hence we have 50% chance of hitting the right node (N1)!
And if you write:
  • If you write to F1, then the nodes N1 and N2 are involved, since both copies have to be updated using the 2PC protocol. I will not explain the 2PC protocol much more than saying that the transaction coordinator (TC) (can be on any of the nodes) will send: Prepare phase: TC --> F1P --> F1S --> TC , Commit phase: TC --> F1S --> F1P --> TC.

In this case three different nodes can be involved in a four node cluster, but only two nodes can be involved in the two node cluster, thus reducing the number of messages sent between computers.

To summarize up to this point
  • Reads becomes more expensive with bigger cluster because the chance getting to the wrong nodes increases, thus an extra (and maximally one) network hop is required.
  • Writes becomes more a little bit more expensive since there are more network hops involved (the mysqld or the ndbapi/ndbj application starts the transaction on data node N3, but the data you are updating is on data nodes N1 and N2).

Distribution awareness

By using distribution awareness you can get around these problems and minimize the number of network hops. Distribution awareness ensures that the transaction is started on the node that has the data, or it can also be used to write data to a particular set of nodes.
  • Reads (with locks or without locks) will now have 100% chance getting to the right node
  • Writes will now always be handled by the data nodes in one particular node group.
  • Distribution awareness is supported in NDBAPI and NDBJ, is supported from the MySQL Server from MySQL 5.1.22 - 6.3.4 Carrier Grade Edition.
In my next post I plan to illustrate how to use these features from respective access method (SQL or NDBAPI).

Tuesday, October 16, 2007

Memory allocation and deallocation in MySQL Cluster

In this post I will try to explain how memory management works inside MySQL Cluster.
Memory management differs between different versions of MySQL Cluster.

The general idea is as follows:
  • Data (and ordered indexes) is stored in the DataMemory section. The DataMemory is organized in pages where each page is 32KB.
  • You can view the DataMemory as a pool of pages consisting of X number of 32 KB pages.
  • A table is associated with one or more pages from the DataMemory (pool of pages) depending on how much data is stored in the table.
That is the general stuff, now let's see how this is used.


MySQL Cluster 6.2/6.3/7.0
Except a number of optimizations and new features the memory allocator has also been revamped and now allocates pages on a per page basis.

Allocation
  1. If the table is empty and there is an insert - one page will be allocated from the page pool (DataMemory) and associated with that table.
  2. Subsequent inserts will fill up the data page until it is full.
  3. The allocator allocates one page at a time!
Deallocation
  • Currently, a delete resulting in a free page, will not return the free page to DataMemory page pool! It will still be associated with the table it was allocated to. Thus, a "DELETE FROM table1" will not free up any pages. The pages are still bound to the table. From MySQL Cluster 6.3 there is per page de-allocation.
  • truncate/drop table will release the pages to the DataMemory page pool.
  • Rolling restarts of the data nodes will not defragment empty pages, except for pages that store VAR* attributes (VAR* attrs are stored separate from fixed size attributes).
MySQL 5.0 (don't use)

Allocation
  1. If the table is empty and there is an insert - one page will be allocated from the page pool (DataMemory) and associated with that table.
  2. Subsequent inserts will fill up the data page until it is full.
  3. When the page is full the memory allocator allocating pages to a table will grow the number of pages associated with the table with 18.75%! This means that if a table is using 100 pages and all those pages are full, then the next insert will force the allocator to allocate another 18.75 (19 rounded up) pages from the DataMemory page pool to the table, resulting in that the table now has 119 pages associated with it.
Moreover, you can have free DataMemory (if you look using ndb_mgm -e "all dump 1000" , then check the cluster log what it prints out) , but still fail to do one insert with the error message "Table full", because the allocator cannot grow the table with 18.75%, because there are not enough free pages in the DataMemory page pool!!

E.g, the table is currenty uses 1000 pages, next allocation will force the table to allocate 200 additional pages, but there are only 199 pages available in the DataMemory page pool! Then you will get the "Table full" error message. However, another table may of course be able to do the allocation.

Best practice is to never fill up your database with more than 80% of the DataMemory because of this, and the fact that during System Restart the data nodes may recover the data in another order than you inserted it (thus the alllocator may have succeeded to insert the data into the database, but during recovery the allocation pattern may, and is likely to, be different), thus it may be impossible to recover your data in case of a cluster failure.

Deallocation
  • A delete resulting in a free page, will not return the free page to DataMemory page pool! It will still be associated with the table it was allocated to. Thus, a "DELETE FROM table1" will not free up any pages. The pages are still bound to table1, and subsequent inserts will be made on the pages already associated with the table.
  • truncate/drop table will release the pages to the DataMemory page pool.
  • In 5.0 - rolling restarts of the data nodes will reclaim and defragment empty pages.
  • In 5.1 - rolling restarts of the data nodes will not defragment empty pages. This has to do with a new faster node recovery protocol, called Optimized Node Recovery.
Please note that you can use ndb_mgm -e "all report mem" to view the memory utilization.

Thanks Jonas and Tomas at MySQL for your input on this.

Tuesday, August 07, 2007

Evaluating MySQL Cluster

How can you evaluate if MySQL Cluster is a good fit for you or not?
Me myself and Jimmy Guerrero (Snr Product Manager) has written a whitepaper about it.

Of course, answering that question is not easy, but in the whitepaper we present ideas for how to make it easier and to increase your chances of success whether it is evaluating MySQL Cluster for a migration project or designing a new mission critical database application where high performance, scalability, and high availability are key requirements.

Jimmy will also have a Webex on the subject:
Wednesday, August 08, 2007, 10:00 am PDT, 1:00 pm EDT, 18:00 GMT
and you are very welcome to register here


Also, I want to mention my friend Jim Dowlings's work on a few scripts making your life easier when it comes to installing and administering MySQL Cluster. You can find the scripts here! Great work and thank you very much Jim for your contributions to MySQL Cluster!

Monday, May 21, 2007

Good configuration

I get the opportunity see a lot of different MySQL Cluster configurations and most of them does not configure the disk subsystem in a good way. E.g, the redo log is way to often not correctly dimensioned. Wrongly dimensioned buffers affects system stability!!

There are also some buffers related to communication and disk, that is important to setup. At the end here you will find a good configuration template that you can use, but first we will have a brief discussion about the disk


Configuring the disk

In MySQL Cluster, it is very important to dimension the redo log properly in order for System restart and Node restarts to work. In MySQL Cluster, the redo log must be large enough to accomodate changes that happens over three Local Checkpoints (LCP). When the third LCP has been written, then the redo log can be recycled. The LCP is the image of the DataMemory which is written to disk, so if you have filled up 8GB of DataMemory, then one LCP will take approx. 8GB of disk space. Writing the LCP takes time, and in 5.1 it is written with a speed determined by DiskCheckpointSpeed. DiskCheckpointSpeed is 10MB/s by default writing an LCP of 8GB will then take 8192MB/10MB/s = ~820seconds. MySQL Cluster wiill write three LCPs, and clearly we need to have a REDO log big enough to handle changes that happens over three LCPs, which is then 2460 seconds. However, we should have some head room in the redo log, so we pretend that we should save the redo log for four LCPs. That is 3280 seconds of changes!

Pretend that you write 1MB/s to a two node, two replica cluster (remember, that provisioning or restore data is often the heaviest write phase in a database). Then the size of the REDO log should be atleast:

Size of redo log = 1MB/s * 3280seconds = 3300MB

Moreover, the redo log file is segmented over a number of 64MB large segments on disk. How many segments you have is set by NoOfFragmentLogFiles! So the NoOfFragmentLogFiles determines the size of the REDO log.

NoOfFragmentLogFiles=3300MB/64MB=52

But, way to often the NoOfFragmentLogFiles is left to the default. Not good, and system stability is at stake.

A heuristic can also be used to determine the size of the redo log, if you don't have all data, or if you have ample of free disk space:

NoOfFragmentLogFiles= DataMemory (in MB) * 6 / 64 MB

If you have DataMemory=8G and want to use the heuristic, then:
NoOfFragmentLogFiles= 8192 * 6 / 64 MB = 768
are needed!

RedoBuffer, which sits in front of the disk should also be increased if you have the resources (highly recommended) from its default value to:
RedoBuffer=32M

Configuration template
Most things can be left with default values, but these things are worth considering:


[tcp default]
SendBufferMemory=2M

[ndbd default]
##avoid swapping, not good at all
LockPagesInMainMemory=1

## Very recommended to increase RedoBuffer:
RedoBuffer=32M


## Set the correct size of the redo log (heuristic):
NoOfFragmentLogFiles= 6 * / 64MB



In the my.cnf file, set:

[mysqld]
...
ndb-force-send=1
ndb-use-exact-count=0
engine-condition-pushdown=1
...

What's next?

If you have any questions, are interested in more info, please contact me at <> and I will try to address your inquiries.

Saturday, April 28, 2007

NDB/J - A direct Java API to MySQL Cluster

YOU SHOULD USE NDB/BINDINGS WHICH YOU CAN FIND HERE! NDB/BINDINGS IS THE OFFICIAL VERSION BEING MAINTAINED AND INCLUDES NDB/J.

IGNORE EVERYTHING BELOW THIS POINT (except the first couple of sentences):

NDB/J is a type II Java connector that makes it possible for a Java application to connect directly to the NDBCluster storage engine, thus by-passing the MySQL Server.
Initial benchmarks we have conducted shows that NDB/J is in parity with the C++ NDBAPI when it comes to performance. Moreover, NDB/J is a wrapper of the C++ NDBAPI, but offers Java-like abstractions such as Exceptions and ResultSets.

If you are interested in evaluating NDB/J and trying it out, you can download it here.
The following versions of MySQL Cluster should work:
  • MySQL Cluster 5.0.x
  • MySQL Cluster 5.1.16 and above, an interface change prohibits earlier releases from compiling ndb/j.

Documentation will also be put there, but it is also possible to generate the documentation using Javadoc from the source code.

There is also a mailing list, which you can use:
ndb-j@lists.mysql.com
Register here and scroll down to NDB/J and press "subscribe".
MySQL offers no support for NDB/J at this stage, but we will try to help you as fast as possible on the mailing lists and you can also get help from Professional Services at MySQL.

Below you will find a simple test program and how to get going! Good luck!


Quick start

Here is a quick start for how to get going:
  1. Install MySQL Cluster 5.0 or 5.1, you need it on the same computer as from which you will compile NDB/J.
  2. Download ndb/j and untar it.
  3. export JAVA_HOME=/path/to/java/
  4. ./configure --with-mysql=/path/to/mysql/ --prefix=/path/to/mysql
    1. --prefix = where ndb/j libraries will be installed
    2. --with-mysql = where mysql is installed
  5. make
  6. make install
    1. this installs mysql-ndbj-0.1.jar and libndbj.so in the "--prefix" directory
In the docs directory you will also get Javadoc documentation for ndb/j and mgm/j (which is a wrapper of the management API, MGMAPI).

We have tested this mostly on 32-bit Linux and Sun JDK 1.5 and 1.6, gcc 3.3.6 (ubuntu) and g++ 3.3.6 (ubuntu). Please let us know on ndb-j@lists.mysql if you have problems compiling on your particular Linux platform. And yes, we have some work to do on the build system :)

Simple example program
Cut this and paste it into "Test.java" (and sorry for the indentation):


import com.mysql.storage.ndb.ndbapi.*;


/*
create table t1(a integer primary key,
b char(32), c varchar(255), d bigint) engine=ndb;

This program inserts and reads a row form this table.

*/

class Test
{
public static void main(String args[])
{

System.loadLibrary("ndbj");
NdbClusterConnection conn=null;
Ndb ndb=null;

try
{

/**
* Connect to MySQL Cluster
* 1). Connect to management server (localhost in this case)
* 2). Connect to the data nodes (waitUntilReady)
*/
conn = NdbFactory.createNdbClusterConnection("localhost");
conn.connect(5,3,true);
conn.waitUntilReady(30,0);

/**
* Get a connection to NDB (once / thread)
*/
ndb = conn.createNdb("test",false);
ndb.init(1000);



/*
* INSERT EXAMPLE
* INSERT INTO t1 VALUES(1,"hello", "ndb/j", 20070428);
*/

/*
* define what to insert:
*/

int a=1;
String b="hello";
String c="ndb/j";
long d = 20070428;

NdbTransaction t=null;
NdbOperation op=null;

t = ndb.startTransaction();


op = t.getNdbOperation("t1");
op.insertTuple();
op.setInt( "a", a);
op.setString("b", b ); //char
op.setString("c", c ); //varchar
op.setLong("d", d ); //bigint

t.execute(NdbTransaction.ExecType.Commit,
NdbTransaction.AbortOption.AbortOnError,
true);

/*
* MUST CLOSE TRANSACTION WHEN DONE WITH IT
* ELSE IT WILL FAIL LATER
* AT SOME STAGE!!
*/
t.close();



/**
* READ EXAMPLE - implements:
* SELECT * FROM t1 WHERE a = 1;
*/

int read_pk_a = 1; //pk to read


NdbResultSet rs = null;

t = ndb.startTransaction();

op = t.getNdbOperation("t1");
op.committedRead();
op.equal( "a", read_pk_a);
op.getValue("b");
op.getValue("c");
op.getValue("d");
rs =op.resultData();

t.execute(NdbTransaction.ExecType.NoCommit,
NdbTransaction.AbortOption.AbortOnError,
true);

while(rs.next())
{
System.out.println("a= " + read_pk_a +
"\nb= " + rs.getString("b") +
"\nc= " + rs.getString("c") +
"\nd= " + rs.getLong("d"));
}
/*
* MUST CLOSE TRANSACTION WHEN DONE WITH IT
* ELSE IT WILL FAIL LATER
* AT SOME STAGE!!
*/
t.close();

}
catch (NdbClusterConnectionPermanentException e) {
// Serious problem. Retrying connecting will not work.
}
catch (NdbClusterConnectionTemporaryException e) {
// We can retry connecting if we catch this exception
}
catch (NdbApiException e)
{
System.out.println("NdbAPI exception= " + e.getMessage());

}
catch(Exception e)
{
System.out.println("exception= " + e.getMessage());
}
finally {
if (conn != null)
conn.close();
}
}
}


Compiling and running example program
Please note that you need to have a running MySQL Cluster to run this program.
  1. export CLASSPATH=/path/mysql-ndbj-0.1.jar:.
  2. export LD_LIBRARY_PATH=/path where libndbj.so is/:$LD_LIBRARY_PATH
  3. javac Test.java
  4. mysql -u root
    1. mysql> use test;
    2. create table t1(a integer primary key,
      b char(32), c varchar(255), d bigint) engine=ndb;
  5. java Test
Congratulations!

Wednesday, March 07, 2007

Benchmarking node restart time in 5.1

When you have a node failure the best practice is to restart the ndbd node as soon as possible.
This post illustrates what happens with restart times:
  • if you have configured TimeBetweenLocalCheckpoints wrong and have a high load
  • if you don't restart the failed ndbd node immediately
  • how long time it takes to resync 1GB of data with and without ongoing transactions.

5.0 and 5.1 differences in node recovery protocol

In 5.0 the node recovery protocol copies all data from the other node in its node group.
In 5.1, there is a new node recovery protocol, called "Optimized Node Recovery", called ONR below.
When a node has failed, and is restarted, it will recover its local logs (Local Checkpoint (LCP) + redo log) and only copy the changes that has been made on the started node.
This is faster than copy all information as done in 5.0, but which can also happen in 5.1 as we will see.


Current limitations of Optimized Node Recovery

However, if the ndbd node is down for three consecutive Local Checkpoints (LCPs), then it will not be possible to run the ONR protocol, but will resort back to copying all information. This is because both ndbd nodes have to have kind of the same view of the redo log, and the redo log stored locally on the failed node is deemed completely obsolete after three LCPs on the started node.


How does affect configuration?


The start of LCPs depends on the load. Currently 4MB of changes triggers a LCP. If you have a node failure during high load then the likelihood of starting three LCPs before the failed ndbd node is recovered is increased. Thus it invalidates its local redo log and the recovering node will have to copy all info from the other node.
Therefor it is important to set TimeBetweenLocalCheckpoints to a higher value than the default on highly loaded systems.


Verifying correctness of the TimeBetweenLocalCheckpoints

This can easily be benchmarked and verified by doing
ndb_mgm>ALL CLUSTERLOG CHECKPOINT=8
This will print out when a LCP is started and completed in the cluster log.
ndb_mgm>ALL CLUSTERLOG NODERESTART=15
will print out restart info in the cluster log.

When you then have a node failure (or shutdown a node yourself) , verify that three LCPs are _not_ started until the failed ndbd node is recovered. If three LCPs are started before the failed ndbd node is recovered, then you should increase the TimeBetweenLocalCheckpoints to suit the load you have.
I found TimeBetweenLocalCheckpoints=24 (64MB of changes to trigger LCP start) to be a good value on a decently loaded system.

You can also verify, by looking at the ndb_X_out.log of the restarting node that it is using the RESTORE block which reads in its local data instead of copying all data from the other ndbd node. If you see RESTORE print outs there while recovering the node, then you are in good shape.

If you are in the population phase of the database I also suggest you to run:
ndb_mgm>all dump 7099
when the population has been done which will trigger an LCP, just to be on the safe side, so that all ndbd nodes write have completed atleast one LCP. Please note that all dump commands are combined with a certain risk and are not yet productized.


Benchmark results of node restart time

I used the following values in config.ini for these etse

TimeBetweenLocalCheckpoints=24
DiskCheckpointSpeed=[default]


Test 1:
  • 1GB of data in database,
  • 2 NDB nodes,
  • no ongoing transactions
Optimized node recovery = 3m 3sec (~2min spent writing initial LCP, speed of this can be tuned with DiskCheckpointSpeed),

Node recovery= 5min 27 sec (~2min spent writing initial LCP, speed of this can be tuned with DiskCheckpointSpeed)

Test 2:
  • 1GB of data in database
  • 2 NDB nodes,
  • ~2000 updates/sec each updating 512B. (1MB/s)
Optimized node recovery = 4m 42sec (~2min spent writing initial LCP, speed of this can be tuned with DiskCheckpointSpeed)

Node recovery= 7m 24 sec (~2min spent writing initial LCP, speed of this can be tuned with DiskCheckpointSpeed),


Conclusion

Taking out the cost of the initial LCP that must be written after all info has been resynced on the recovering ndbd node, then it is a big difference between Optimized Node Recovery and Node Recovery.

Thus, by having good disks so we can checkpoint faster (DiskCheckpointSpeed) and carefully tuning:
  • DiskCheckpointSpeed
  • TimeBetweenLocalCheckpoints
it is possible to reduce node restart times quite a bit.


Test setup

Tests was carried out on dual CPU, dual core AMD Opteron 275, 1.8GHz, using the following tables:

create table t1(a char(32), b integer unsigned not null, c char(32), d integer unsigned not null, e char(220), f char(220), primary key(a,b), index (d)) engine=ndb;

create table t2(a char(32), b integer unsigned not null, c char(32), d integer unsigned not null, e char(220), f char(220), primary key(a,b), index (d)) engine=ndb;

Each filled up with 1M records using hugoLoad.
Updating transactions were generated with hugoPkUpdate
These two programs can be found in the 5.1 source distribution.