Wednesday, January 20, 2010

Gearman meets MySQL Cluster (NDBAPI)

After a discussion with my colleague Stephane Varoqui we decided to see how Gearman and the NDBAPI could be used together. The result of the POC was a Gearman worker and a couple of clients (clients and workers use Google Protocol Buffers as the protocol). The worker can:
  • set/get/delete records on a single table in MySQL Cluster using the primary key
  • set/get/delete "any" type. It is not possible to dynamically add types but this is done at compile time.
  • supports the following SQL data types: (UNSIGNED) INTEGER, (UNSIGNED) BIGINT, CHAR, VARCHAR/VARBINARY
  • supports the following Google Protocol Buffer scalars: int32, uint32, int64, uint64, string, bytes.
  • not handle much errors for the time being
and a client that can
  • create a message and send it to the Gearman Job Server
  • clients ca n be written in either C++, Java, or Python (subject to what languages that Google Protocol Buffers supports)
  • receive (deserialize) the data.
So basically this is a new, albeit simple, connector to MySQL Cluster! Hopefully someone will find it useful.

The code can be downloaded here and some short instructions are here, and if you guys out there thinks this is usable, then it might make it to launchpad. Let me know!

Here follows some information what has been done and how to use this.

First you have to create the relation tables (engine=ndb). I will use a 'Person' (for the rest of the examples) that I will persist to the database. I have created the following relational table:
CREATE TABLE `Person` (
`id` int(11) NOT NULL,
`name` varchar(128) DEFAULT NULL,
`message` varchar(1024) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=ndbcluster DEFAULT CHARSET=latin1
The relational tables needs to be translated to Google Protocol Buffers:
> cat proto/Person.proto
message Person {
required int32 id = 1; //PRIMARY KEY attributes are 'required'
optional string name = 2;
optional string message = 3;
}
All columns in the relational table must exist in the protocol buffer definition.

There is union-proto buffer called NdbMessage.proto that then contains all the proto buffers that can be sent between the client and worker:
> cat proto/NdbMessage.proto
// import the relevant protos that can be in
// the NdbMessage
import "Person.proto";

message NdbMessage {
enum Type { Person=1;}
// Identifies which field is filled in.
required Type type = 1;
// list of possible protos comes here:
optional Person person = 2;
}

The proto files then needs to be run through the protocol buffer compiler:
> /usr/local/bin/protoc --proto_path=proto --cpp_out=`pwd` proto/Person.proto proto/NdbMessage.proto
# this generates .h and .cc files for Proto buffer files.
There are three clients for this, one for each operation (set,get,delete).

In the ndbapi_set_client.cpp (the clients are based on the reverse_client.cpp in gearman) we invoke the 'ndbapi_set' function, that will be executed by the worker:

#include "NdbMessage.pb.h"
#include "Person.pb.h"

.
/**instantiate a NdbMessage object and associate a Person to it*/
NdbMessage m;
m.set_type(NdbMessage_Type_Person);
Person * p=m.mutable_person();
/* I must set all fields for now */
p->set_id(1);
p->set_name("Johan Andersson");
p->set_message("hello world, my first insert");

string s;
m.SerializeToString(&s);

...

const char * data = s.data();
result= (char*)gearman_client_do(&client,
"ndbapi_set",
NULL,
(void *)data,
(size_t)p.ByteSize(),
&result_size,
&ret);

The worker (ndbapi_worker.cpp) registers three functions:
ret=gearman_worker_add_function(&worker, "ndbapi_get", 0, ndbapi_get,NULL);
ret=gearman_worker_add_function(&worker, "ndbapi_set", 0, ndbapi_set,NULL);
ret=gearman_worker_add_function(&worker, "ndbapi_delete", 0, ndbapi_delete,NULL);

And when the worker receives the function call to 'ndbapi_set' it has to deserialize the received message into a NdbMessage:

static void *ndbapi_set(gearman_job_st *job,
void *context,
size_t *result_size,
gearman_return_t *ret_ptr)
{
/** receive message and convert into c++ string
* construct the wanted object (dataObject) from parsing c++ string.
*/
const void * rawmessage;
rawmessage= gearman_job_workload(job);
string s((char*)rawmessage, gearman_job_workload_size(job));

NdbMessage dataObject;
if(! dataObject.ParseFromString(s))
{
*ret_ptr= GEARMAN_WORK_FAIL;
return NULL;
}

The worker then looks at the type of the message and gets the underlying object (in this case Person):

google::protobuf::Message * message;
switch(dataObject.type())
{
case NdbMessage_Type_Person:
{
message= (google::protobuf::Message*)&dataObject.person();
reflection = (google::protobuf::Reflection *)message->GetReflection();
descriptor = (google::protobuf::Descriptor*)message->GetDescriptor();
}
break;
/*
case NdbMessage_Type_MyType:
{
// the myType() .. is the name of the field in MyType.proto:
// MyType myType = ;
message= (google::protobuf::Message*)&dataObject.myType();
reflection = (google::protobuf::Reflection *)message->GetReflection();
descriptor = (google::protobuf::Descriptor*)message->GetDescriptor();
}
break;
*/
default:
cout << "unknown type: "<< ret_ptr=" GEARMAN_WORK_FAIL;"> the insert was successful */
*result_size=0;
*ret_ptr= GEARMAN_SUCCESS;
return NULL;
}

In order to add a new type, you need to add a new 'case' to handle the type and how to get that object from the NdbMessage object (dataObject).

The worker loops over all fields in the received Proto Message and creates a transaction in the NDBAPI and executes it. Thus this part agnostic to the type you give it. As long as the following is true:
  • The relational table only uses (UNSIGNED) INTEGER, (UNSIGNED) BIGINT, CHAR, VARCHAR/VARBINARY data types.
  • The .proto definition contains all columns in the relational table
  • The .proto file marks the PRIMARY KEY of the relational table as 'required'
  • For 'ndbapi_set' you need to set all columns in the table ( i will fix that as soon as possible)
The data is then persisted in the table (currently the worker expects all tables to be stored in the 'test' database):
mysql> select * from Person;
+----+-----------------+------------------------------+
| id | name | message |
+----+-----------------+------------------------------+
| 1 | Johan Andersson | hello world, my first insert |
+----+-----------------+------------------------------+
1 row in set (0.00 sec)


Now the worker can also handle 'get' requests, and by using the get_client:
> ./get_client  1
name: Johan Andersson
message: hello world, my first insert
And there is also a client that does deletes (delete_client):
> ./delete_client  1
Delete successful: id=1
Summary/Conclusions
  • What are the performance implications of using Proto Buffer's reflection mechanism?
  • Proto Buffer only works for C++, Java, and Python - currently no support for PHP.
  • Is it better to use something else than Proto Buffers for this?
  • Gearman was super-easy to install so thanks for that!
  • Google Protocol Buffers was super-easy to install so thanks for that!
  • The worker needs also to be extended to support range searches and to make use of the batching interface so that it is possible persist either many types or many instances of a type in a batch.
  • NO-SQL -- YES-NDB !

2 comments:

Monty Taylor said...

Fantastic, Mr. Andersson! I've been meaning to play with a Gearman/NDB combination for a while - so I'm glad you've done this!

Johan Andersson said...

Thank you! But a lot of creds to Steph that came with the idea.