Friday, April 26, 2013

Exploring Riak with F# and CorrugatedIron

Many thanks to David and OJ for recommending CorrugatedIron .Net Riak client library. The following blog entry is to document my experiments with Riak CRUD operations using CorrugatedIron.

Pinging RIAK

I tested CorrugatedIron with F# script and here is the setup code along with testing ping capability:

// Needed to load the following libraries to get F# script to work
#r @"c:\dev\FsRiak\packages\CorrugatedIron.1.3.0\lib\net40\CorrugatedIron.dll"
#r @"c:\dev\FsRiak\packages\protobuf-net.2.0.0.621\lib\net40\protobuf-net.dll"
#r @"c:\dev\FsRiak\packages\Newtonsoft.Json.4.5.11\lib\net40\Newtonsoft.Json.dll"

open CorrugatedIron
open CorrugatedIron.Models
open Newtonsoft.Json
open System

// Setup connections
let cluster = RiakCluster.FromConfig("riakConfig", @"c:\dev\FsRiak\App.config");
let client = cluster.CreateClient();

// Ping the Riak Cluster
client.Ping()

Here is my App.config file used by CorrugatedIron:

<?xml version="1.0" encoding="utf-8"?>
<configuration>
  <configSections>
    <section name="riakConfig" type="CorrugatedIron.Config.RiakClusterConfiguration, CorrugatedIron" />
  </configSections>
  <riakConfig nodePollTime="5000" defaultRetryWaitTime="200" defaultRetryCount="3">
    <nodes>
      <node name="dev1"  hostAddress="mydevhost-a" pbcPort="8087" restScheme="http" restPort="8098" poolSize="10" />
      <node name="dev2" hostAddress="mydevhost-b" pbcPort="8087" restScheme="http" restPort="8098" poolSize="10" />
      <node name="dev3" hostAddress="mydevhost-c" pbcPort="8087" restScheme="http" restPort="8098" poolSize="10" />
    </nodes>
  </riakConfig>
</configuration>

Here's the result from running ping:

val it : RiakResult = CorrugatedIron.RiakResult {ErrorMessage = null;
                                                 IsSuccess = true;
                                                 ResultCode = Success;}

Get List of Buckets

The following method call gets you the list of buckets along with metadata for the call status:

client.ListBuckets()

This method returns the following RiakResult object:

val it : RiakResult<seq<string>> =
  CorrugatedIron.RiakResult`1[System.Collections.Generic.IEnumerable`1[System.String]]
    {ErrorMessage = null;
     IsSuccess = true;
     ResultCode = Success;
     Value = seq ["photos"; "favs"; "animals"; "cages"; ...];}

Get Bucket Keys

Getting a list of keys for a bucket is also simple:

client.ListKeys("animals")

For the novice, this library warns you to not to do this in production environments....

*** [CI] -> ListKeys is an expensive operation and should not be used in Production scenarios. ***
val it : RiakResult<seq<string>> =
  CorrugatedIron.RiakResult`1[System.Collections.Generic.IEnumerable`1[System.String]]
    {ErrorMessage = null;
     IsSuccess = true;
     ResultCode = Success;
     Value = seq ["ace"; "polly"];}

Retrieve Content from Riak

Getting a value from Riak is pretty easy with this library:

client.Get("animals","ace")

A dump of the return object shows the actual data plus metadata about the Get operation:

val it : RiakResult<Models.RiakObject> =
  CorrugatedIron.RiakResult`1[CorrugatedIron.Models.RiakObject]
    {ErrorMessage = null;
     IsSuccess = true;
     ResultCode = Success;
     Value = CorrugatedIron.Models.RiakObject;}

A deeper dive into the Value field of RiakResult object gives the following:

val it : Models.RiakObject =
  CorrugatedIron.Models.RiakObject
    {BinIndexes = dict [];
     Bucket = "animals";
     CharSet = null;
     ContentEncoding = null;
     ContentType = "application/json";
     HasChanged = false;
     IntIndexes = dict [];
     Key = "ace";
     LastModified = 1359744019u;
     LastModifiedUsec = 788127u;
     Links = seq [];
     Siblings = seq [];
     UserMetaData = dict [];
     VTag = "7aPFusRQHlQ36ZP6G6GSyE";
     VTags = seq ["7aPFusRQHlQ36ZP6G6GSyE"];
     Value = [|123uy; 32uy; 34uy; 110uy; 105uy; 99uy; 107uy; 110uy; 97uy;
               109uy; 101uy; 34uy; 32uy; 58uy; 32uy; 34uy; 84uy; 104uy; 101uy;
               32uy; 87uy; 111uy; 110uy; 100uy; 101uy; 114uy; 32uy; 68uy;
               111uy; 103uy; 34uy; 32uy; 44uy; 32uy; 34uy; 98uy; 114uy; 101uy;
               101uy; 100uy; 34uy; 32uy; 58uy; 32uy; 34uy; 71uy; 101uy; 114uy;
               109uy; 97uy; 110uy; 32uy; 83uy; 104uy; 101uy; 112uy; 104uy;
               101uy; 114uy; 100uy; 34uy; 32uy; 125uy|];
     VectorClock = [|107uy; 206uy; 97uy; 96uy; 96uy; 96uy; 204uy; 96uy; 202uy;
                     5uy; 82uy; 28uy; 172uy; 90uy; 225uy; 175uy; 2uy; 57uy;
                     59uy; 156uy; 51uy; 152uy; 18uy; 25uy; 243uy; 88uy; 25uy;
                     132uy; 59uy; 26uy; 78uy; 241uy; 101uy; 1uy; 0uy|];}

The data I really wanted is embedded in another Value field where it is represented as an array of bytes, which is not the final format I want. I wanted to get back the JSON representation of the data I put in. In my previous blog, I had rolled my own JSON serializer/deserializer, but now I wanted to leverage the other pieces of library bundle by CorrugateIron, namely Json.NET. To do so,I define the Animal type and use Json.NET serializer/deserializer to convert between the objects and it's corresponding JSON representation

type Animal =
    { nickname: string; breed: string}

// Getting ace from animals bucket 
client.Get("animals","ace").Value.GetObject<Animal>() 

Adding Content to Riak

Adding content to Riak is pretty easy also, after you define a specific type for Json.NET to serialize the fields of that type:

new RiakObject("animals","delta",{nickname="Snoopy"; breed="Beagle"}) 
|> client.Put

If you dump the RiakResult object and the Value field of RiakResult you get the following:


val it : RiakResult =
  CorrugatedIron.RiakResult`1[CorrugatedIron.Models.RiakObject]
    {ErrorMessage = null;
     IsSuccess = true;
     ResultCode = Success;
     Value = CorrugatedIron.Models.RiakObject;}
  
val it : RiakObject =
  CorrugatedIron.Models.RiakObject
    {BinIndexes = dict [];
     Bucket = "animals";
     CharSet = null;
     ContentEncoding = null;
     ContentType = "application/json";
     HasChanged = false;
     IntIndexes = dict [];
     Key = "delta";
     LastModified = 1366930771u;
     LastModifiedUsec = 271921u;
     Links = seq [];
     Siblings = seq [];
     UserMetaData = dict [];
     VTag = "OvZlH7bsYKdO8zL76QdDY";
     VTags = seq ["OvZlH7bsYKdO8zL76QdDY"];
     Value = [|123uy; 34uy; 110uy; 105uy; 99uy; 107uy; 110uy; 97uy; 109uy;
               101uy; 34uy; 58uy; 34uy; 83uy; 110uy; 111uy; 111uy; 112uy;
               121uy; 34uy; 44uy; 34uy; 98uy; 114uy; 101uy; 101uy; 100uy; 34uy;
               58uy; 34uy; 66uy; 101uy; 97uy; 103uy; 108uy; 101uy; 34uy; 125uy|];
     VectorClock = [|107uy; 206uy; 97uy; 96uy; 96uy; 96uy; 204uy; 96uy; 202uy;
                     5uy; 82uy; 28uy; 169uy; 111uy; 239uy; 241uy; 7uy; 114uy;
                     230uy; 184uy; 103uy; 48uy; 37uy; 50uy; 230uy; 177uy; 50uy;
                     4uy; 27uy; 190uy; 59uy; 197uy; 151uy; 5uy; 0uy|];}

To verify interoperability, I would check the newly added data with curl:

$ curl -X GET http://192.168.56.1:8098/riak/animals/delta
$ {"nickname":"Snoopy","breed":"Beagle"}

Delete Riak Contents

Delete content by calling the intuitively named Delete method:

client.Delete("animals","delta")

One thing that I wasn't sure is how CorrugatedIron talks to the Riak clusters. In my simple REST API example, I know exactly which host I'm talking to since I explicitly specified the url. For CorrugatedIron, I configure a pool of connections in the app.config file. I wasn't sure which of the nodes CorrugatedIron was talking to. I fire up Wireshark and notice that I'm connected to the Riak cluster via port 8087, which is through the Protocol Buffers Client (PBC) interface...which explains the need to load the PBC libraries. This Protocol Buffers client is something new to me and the bundled protobuf-net library seemed to be the code developed by Google (Many thanks to OJ for correcting me on this, this is NOT a Google library but a library written by Marc Gravell). This, in turn led me to look at Google Protocol Buffers . This little side jaunt into CorrugatedIron library led to the discovery (for me) of a whole new set of network communication protocols. In any case, after checking Wireshark output, it seems that the requests are spread out to the different nodes and not restricted to a single node. I'm guessing that there are some builtin load balancer code in CorrugatedIron that sends my request to different nodes in the Riak cluster.

For the basic CRUD operations on Riak, CorrugatedIron has made it easier to work with Riak then having to come up with my own helper functions. This has been a good start and I hope work through more of the Riak examples from the book Seven Databases in Seven Weeks with CorrugatedIron in future blog posts.

2 comments:

Anonymous said...

Hi there,

Thanks for the post. It's great to see that you've had a bit of fun with CI and Riak using F#. I'd like to clarify a few things:

Riak exposes two interfaces: HTTP and PBC. Where possible CI uses the PBC interface as this is noticeably faster than HTTP. If there is a need to use HTTP for any particular call, CI will do that for you. Hence, you don't need to know which call goes over which interface, we take care of that for you. It also means that when things change down the track the client code doesn't need to change as CI will make the switch for you.

CI currently has support for a load-balanced setup via round-robin. So you tell CI about your nodes and it will balance the requests across them in a round-robin fashion. If a node goes down for some reason, CI will pull it out of the pool and attept to reconnect to it on another thread. When the node comes back up, CI will re-add it to the pool.

CI also allows you to use an external load balancer. Check out the documentation at http://corrugatediron.org/ as it shows how to configure it to do this.

Each node in the configuration for CI allows a poolSize to be set. This is the number of connections that are established to a given node. If you'd prefer not to have connection pooling enabled, set the poolSize to zero and CI will establish a new connection, make the request and close the connection each time a call is made.

Calling ListKeys() is indeed an expensive operation, but this isn't a limitation of CI, this is something server-side. Listing keys is something that is expensive inside Riak itself.

The client is able to serialise and deserialise to/from JSON without the need of creating custom classes such as your Animal class. You can pass in anonymous types and dynamics. You can also call .GetObject() to retrieve the value as JSON without having a custom type.

Finally, CI uses protobuf-net, which is not a Google library, it is a protocol buffers implementation for .NET written by a Marc Gravell.

So thank you again for the post. I think it's important to see posts like this so that people can see that talking to Riak via .NET isn't a tough thing to do. It's also important to see that you're talking to a distributed system which is made up of a cluster of nodes instead of just a single server. A single server is a single point of failure and hence talking to a cluster makes more sense when availability is important.

Best regards
OJ

John Liao said...

Hi OJ,

Thanks for the feedbacks, clarifications and corrections. I really appreciate the thoughtful response you provided. I have corrected the references to protobuf-net as Google library in the post itself and attributed it to Marc Gravell.

With regards to anonymous types...I don't know if and I don't think F# supports anonymous types. Based on this StackOverflow entry http://stackoverflow.com/questions/8144184/name-tuples-anonymous-types-in-f, Tomas Petricek seemed to indicate that F# does not support anonymous types. I did try the code client.Get("animals","alpha").Value.GetObject() and F# returns a value restriction error.

It does not bother me that I have to explicitly declare a type since I believe that having explicitly and strongly type code is more maintainable, readable and helps identify errors earlier at compile time rather than at runtime. I believe that this translates to less buggy code in operation, albeit with sacrifices to some coding conveniences.

Finally, thanks for the clarification on ListKeys() method and the clarification of the warning is not a limitation on CorrugatedIron. I certainly understand that NoSQL solutions such as Riak is intended for "Internet Scale" or big data; data concerns that are best exemplified by Google, Amazon, Twitter, etc. I hope to cover MapReduce examples in a blog post in the future that would touch upon this. While my examples are simple 3 node cluster...I find it useful to imagine the clusters to be hundreds of nodes or even thousands of nodes with the data in a particular buckets can be in the millions and returned keys may not even fit in the available memory of the server that called ListKeys(). I kind of assumed readers of my blog would understand this but maybe I should make this more explicit.

I do really appreciate all these feedbacks and thanks again!

Thanks,
John Liao