libmongo-client 0.1.4
src/mongo-wire.c
Go to the documentation of this file.
00001 /* mongo-wire.c - libmongo-client's MongoDB wire protocoll implementation.
00002  * Copyright 2011 Gergely Nagy <algernon@balabit.hu>
00003  *
00004  * Licensed under the Apache License, Version 2.0 (the "License");
00005  * you may not use this file except in compliance with the License.
00006  * You may obtain a copy of the License at
00007  *
00008  *     http://www.apache.org/licenses/LICENSE-2.0
00009  *
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS,
00012  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  * See the License for the specific language governing permissions and
00014  * limitations under the License.
00015  */
00016 
00021 #include <glib.h>
00022 #include <string.h>
00023 #include <stdarg.h>
00024 #include <errno.h>
00025 
00026 #include "bson.h"
00027 #include "mongo-wire.h"
00028 #include "libmongo-private.h"
00029 
00031 static const gint32 zero = 0;
00032 
00039 struct _mongo_packet
00040 {
00041   mongo_packet_header header; 
00042   guint8 *data; 
00043   gint32 data_size; 
00044 };
00045 
00047 typedef enum
00048   {
00049     OP_REPLY = 1, 
00050     OP_MSG = 1000, 
00051     OP_UPDATE = 2001, 
00052     OP_INSERT = 2002, 
00053     OP_RESERVED = 2003, 
00054     OP_QUERY = 2004, 
00055     OP_GET_MORE = 2005, 
00056     OP_DELETE = 2006, 
00057     OP_KILL_CURSORS = 2007 
00058   } mongo_wire_opcode;
00059 
00060 mongo_packet *
00061 mongo_wire_packet_new (void)
00062 {
00063   mongo_packet *p = (mongo_packet *)g_new0 (mongo_packet, 1);
00064 
00065   p->header.length = GINT32_TO_LE (sizeof (mongo_packet_header));
00066   return p;
00067 }
00068 
00069 gboolean
00070 mongo_wire_packet_get_header (const mongo_packet *p,
00071                               mongo_packet_header *header)
00072 {
00073   if (!p || !header)
00074     {
00075       errno = EINVAL;
00076       return FALSE;
00077     }
00078 
00079   header->length = GINT32_FROM_LE (p->header.length);
00080   header->id = GINT32_FROM_LE (p->header.id);
00081   header->resp_to = GINT32_FROM_LE (p->header.resp_to);
00082   header->opcode = GINT32_FROM_LE (p->header.opcode);
00083 
00084   return TRUE;
00085 }
00086 
00087 gboolean
00088 mongo_wire_packet_get_header_raw (const mongo_packet *p,
00089                                   mongo_packet_header *header)
00090 {
00091   if (!p || !header)
00092     {
00093       errno = EINVAL;
00094       return FALSE;
00095     }
00096 
00097   header->length = p->header.length;
00098   header->id = p->header.id;
00099   header->resp_to = p->header.resp_to;
00100   header->opcode = p->header.opcode;
00101 
00102   return TRUE;
00103 }
00104 
00105 gboolean
00106 mongo_wire_packet_set_header (mongo_packet *p,
00107                               const mongo_packet_header *header)
00108 {
00109   if (!p || !header)
00110     {
00111       errno = EINVAL;
00112       return FALSE;
00113     }
00114   if (GINT32_FROM_LE (header->length) < (gint32)sizeof (mongo_packet_header))
00115     {
00116       errno = ERANGE;
00117       return FALSE;
00118     }
00119 
00120   p->header.length = GINT32_TO_LE (header->length);
00121   p->header.id = GINT32_TO_LE (header->id);
00122   p->header.resp_to = GINT32_TO_LE (header->resp_to);
00123   p->header.opcode = GINT32_TO_LE (header->opcode);
00124 
00125   p->data_size = header->length - sizeof (mongo_packet_header);
00126 
00127   return TRUE;
00128 }
00129 
00130 gboolean
00131 mongo_wire_packet_set_header_raw (mongo_packet *p,
00132                                   const mongo_packet_header *header)
00133 {
00134   if (!p || !header)
00135     {
00136       errno = EINVAL;
00137       return FALSE;
00138     }
00139 
00140   p->header.length = header->length;
00141   p->header.id = header->id;
00142   p->header.resp_to = header->resp_to;
00143   p->header.opcode = header->opcode;
00144 
00145   p->data_size = header->length - sizeof (mongo_packet_header);
00146 
00147   return TRUE;
00148 }
00149 
00150 gint32
00151 mongo_wire_packet_get_data (const mongo_packet *p, const guint8 **data)
00152 {
00153   if (!p || !data)
00154     {
00155       errno = EINVAL;
00156       return -1;
00157     }
00158   if (p->data == NULL)
00159     {
00160       errno = EINVAL;
00161       return -1;
00162     }
00163 
00164   *data = (const guint8 *)p->data;
00165   return p->data_size;
00166 }
00167 
00168 gboolean
00169 mongo_wire_packet_set_data (mongo_packet *p, const guint8 *data, gint32 size)
00170 {
00171   if (!p || !data || size <= 0)
00172     {
00173       errno = EINVAL;
00174       return FALSE;
00175     }
00176 
00177   if (p->data)
00178     g_free (p->data);
00179   p->data = g_malloc (size);
00180   memcpy (p->data, data, size);
00181 
00182   p->data_size = size;
00183   p->header.length =
00184     GINT32_TO_LE (p->data_size + sizeof (mongo_packet_header));
00185 
00186   return TRUE;
00187 }
00188 
00189 void
00190 mongo_wire_packet_free (mongo_packet *p)
00191 {
00192   if (!p)
00193     {
00194       errno = EINVAL;
00195       return;
00196     }
00197 
00198   if (p->data)
00199     g_free (p->data);
00200   g_free (p);
00201 }
00202 
00203 mongo_packet *
00204 mongo_wire_cmd_update (gint32 id, const gchar *ns, gint32 flags,
00205                        const bson *selector, const bson *update)
00206 {
00207   mongo_packet *p;
00208   gint32 t_flags = GINT32_TO_LE (flags);
00209   gint nslen;
00210 
00211   if (!ns || !selector || !update)
00212     {
00213       errno = EINVAL;
00214       return NULL;
00215     }
00216 
00217   if (bson_size (selector) < 0 ||
00218       bson_size (update) < 0)
00219     {
00220       errno = EINVAL;
00221       return NULL;
00222     }
00223 
00224   p = (mongo_packet *)g_new0 (mongo_packet, 1);
00225   p->header.id = GINT32_TO_LE (id);
00226   p->header.opcode = GINT32_TO_LE (OP_UPDATE);
00227 
00228   nslen = strlen (ns) + 1;
00229   p->data_size = bson_size (selector) + bson_size (update) +
00230     sizeof (gint32) * 2 + nslen;
00231 
00232   p->data = g_malloc (p->data_size);
00233 
00234   memcpy (p->data, (void *)&zero, sizeof (gint32));
00235   memcpy (p->data + sizeof (gint32), (void *)ns, nslen);
00236   memcpy (p->data + sizeof (gint32) + nslen, (void *)&t_flags,
00237           sizeof (gint32));
00238   memcpy (p->data + sizeof (gint32) * 2 + nslen,
00239           bson_data (selector), bson_size (selector));
00240   memcpy (p->data + sizeof (gint32) * 2 + nslen + bson_size (selector),
00241           bson_data (update), bson_size (update));
00242 
00243   p->header.length = GINT32_TO_LE (sizeof (p->header) + p->data_size);
00244 
00245   return p;
00246 }
00247 
00248 mongo_packet *
00249 mongo_wire_cmd_insert_n (gint32 id, const gchar *ns, gint32 n,
00250                          const bson **docs)
00251 {
00252   mongo_packet *p;
00253   gint32 pos, dsize = 0;
00254   gint32 i;
00255 
00256   if (!ns || !docs)
00257     {
00258       errno = EINVAL;
00259       return NULL;
00260     }
00261 
00262   if (n <= 0)
00263     {
00264       errno = ERANGE;
00265       return NULL;
00266     }
00267 
00268   for (i = 0; i < n; i++)
00269     {
00270       if (bson_size (docs[i]) <= 0)
00271         {
00272           errno = EINVAL;
00273           return NULL;
00274         }
00275       dsize += bson_size (docs[i]);
00276     }
00277 
00278   p = (mongo_packet *)g_new0 (mongo_packet, 1);
00279   p->header.id = GINT32_TO_LE (id);
00280   p->header.opcode = GINT32_TO_LE (OP_INSERT);
00281 
00282   pos = sizeof (gint32) + strlen (ns) + 1;
00283   p->data_size = pos + dsize;
00284   p->data = (guint8 *)g_malloc (p->data_size);
00285 
00286   memcpy (p->data, (void *)&zero, sizeof (gint32));
00287   memcpy (p->data + sizeof (gint32), (void *)ns, strlen (ns) + 1);
00288 
00289   for (i = 0; i < n; i++)
00290     {
00291       memcpy (p->data + pos, bson_data (docs[i]), bson_size (docs[i]));
00292       pos += bson_size (docs[i]);
00293     }
00294 
00295   p->header.length = GINT32_TO_LE (sizeof (p->header) + p->data_size);
00296 
00297   return p;
00298 }
00299 
00300 mongo_packet *
00301 mongo_wire_cmd_insert (gint32 id, const gchar *ns, ...)
00302 {
00303   mongo_packet *p;
00304   bson **docs, *d;
00305   gint32 n = 0;
00306   va_list ap;
00307 
00308   if (!ns)
00309     {
00310       errno = EINVAL;
00311       return NULL;
00312     }
00313 
00314   docs = (bson **)g_new0 (bson *, 1);
00315 
00316   va_start (ap, ns);
00317   while ((d = (bson *)va_arg (ap, gpointer)))
00318     {
00319       if (bson_size (d) < 0)
00320         {
00321           g_free (docs);
00322           errno = EINVAL;
00323           return NULL;
00324         }
00325 
00326       docs = (bson **)g_renew (bson *, docs, n + 1);
00327       docs[n++] = d;
00328     }
00329   va_end (ap);
00330 
00331   p = mongo_wire_cmd_insert_n (id, ns, n, (const bson **)docs);
00332   g_free (docs);
00333   return p;
00334 }
00335 
00336 mongo_packet *
00337 mongo_wire_cmd_query (gint32 id, const gchar *ns, gint32 flags,
00338                       gint32 skip, gint32 ret, const bson *query,
00339                       const bson *sel)
00340 {
00341   mongo_packet *p;
00342   gint32 tmp, nslen;
00343 
00344   if (!ns || !query)
00345     {
00346       errno = EINVAL;
00347       return NULL;
00348     }
00349 
00350   if (bson_size (query) < 0 || (sel && bson_size (sel) < 0))
00351     {
00352       errno = EINVAL;
00353       return NULL;
00354     }
00355 
00356   p = (mongo_packet *)g_new0 (mongo_packet, 1);
00357   p->header.id = GINT32_TO_LE (id);
00358   p->header.opcode = GINT32_TO_LE (OP_QUERY);
00359 
00360   nslen = strlen (ns) + 1;
00361   p->data_size =
00362     sizeof (gint32) + nslen + sizeof (gint32) * 2 + bson_size (query);
00363 
00364   if (sel)
00365     p->data_size += bson_size (sel);
00366   p->data = g_malloc (p->data_size);
00367 
00368   tmp = GINT32_TO_LE (flags);
00369   memcpy (p->data, (void *)&tmp, sizeof (gint32));
00370   memcpy (p->data + sizeof (gint32), (void *)ns, nslen);
00371   tmp = GINT32_TO_LE (skip);
00372   memcpy (p->data + sizeof (gint32) + nslen, (void *)&tmp, sizeof (gint32));
00373   tmp = GINT32_TO_LE (ret);
00374   memcpy (p->data + sizeof (gint32) * 2 + nslen,
00375           (void *)&tmp, sizeof (gint32));
00376   memcpy (p->data + sizeof (gint32) * 3 + nslen, bson_data (query),
00377           bson_size (query));
00378 
00379   if (sel)
00380     memcpy (p->data + sizeof (gint32) * 3 + nslen + bson_size (query),
00381             bson_data (sel), bson_size (sel));
00382 
00383   p->header.length = GINT32_TO_LE (sizeof (p->header) + p->data_size);
00384 
00385   return p;
00386 }
00387 
00388 mongo_packet *
00389 mongo_wire_cmd_get_more (gint32 id, const gchar *ns,
00390                          gint32 ret, gint64 cursor_id)
00391 {
00392   mongo_packet *p;
00393   gint32 t_ret;
00394   gint64 t_cid;
00395   gint32 nslen;
00396 
00397   if (!ns)
00398     {
00399       errno = EINVAL;
00400       return NULL;
00401     }
00402 
00403   p = (mongo_packet *)g_new0 (mongo_packet, 1);
00404   p->header.id = GINT32_TO_LE (id);
00405   p->header.opcode = GINT32_TO_LE (OP_GET_MORE);
00406 
00407   t_ret = GINT32_TO_LE (ret);
00408   t_cid = GINT64_TO_LE (cursor_id);
00409 
00410   nslen = strlen (ns) + 1;
00411   p->data_size = sizeof (gint32) + nslen + sizeof (gint32) + sizeof (gint64);
00412   p->data = g_malloc (p->data_size);
00413 
00414   memcpy (p->data, (void *)&zero, sizeof (gint32));
00415   memcpy (p->data + sizeof (gint32), (void *)ns, nslen);
00416   memcpy (p->data + sizeof (gint32) + nslen, (void *)&t_ret, sizeof (gint32));
00417   memcpy (p->data + sizeof (gint32) * 2 + nslen,
00418           (void *)&t_cid, sizeof (gint64));
00419 
00420   p->header.length = GINT32_TO_LE (sizeof (p->header) + p->data_size);
00421 
00422   return p;
00423 }
00424 
00425 mongo_packet *
00426 mongo_wire_cmd_delete (gint32 id, const gchar *ns,
00427                        gint32 flags, const bson *sel)
00428 {
00429   mongo_packet *p;
00430   gint32 t_flags, nslen;
00431 
00432   if (!ns || !sel)
00433     {
00434       errno = EINVAL;
00435       return NULL;
00436     }
00437 
00438   if (bson_size (sel) < 0)
00439     {
00440       errno = EINVAL;
00441       return NULL;
00442     }
00443 
00444   p = (mongo_packet *)g_new0 (mongo_packet, 1);
00445   p->header.id = GINT32_TO_LE (id);
00446   p->header.opcode = GINT32_TO_LE (OP_DELETE);
00447 
00448   nslen = strlen (ns) + 1;
00449   p->data_size = sizeof (gint32) + nslen + sizeof (gint32) + bson_size (sel);
00450   p->data = g_malloc (p->data_size);
00451 
00452   t_flags = GINT32_TO_LE (flags);
00453 
00454   memcpy (p->data, (void *)&zero, sizeof (gint32));
00455   memcpy (p->data + sizeof (gint32), (void *)ns, nslen);
00456   memcpy (p->data + sizeof (gint32) + nslen,
00457           (void *)&t_flags, sizeof (gint32));
00458   memcpy (p->data + sizeof (gint32) * 2 + nslen,
00459           bson_data (sel), bson_size (sel));
00460 
00461   p->header.length = GINT32_TO_LE (sizeof (p->header) + p->data_size);
00462 
00463   return p;
00464 }
00465 
00466 mongo_packet *
00467 mongo_wire_cmd_kill_cursors_va (gint32 id, gint32 n, va_list ap)
00468 {
00469   mongo_packet *p;
00470   gint32 i, t_n, pos;
00471   gint64 t_cid;
00472 
00473   p = (mongo_packet *)g_new0 (mongo_packet, 1);
00474   p->header.id = GINT32_TO_LE (id);
00475   p->header.opcode = GINT32_TO_LE (OP_KILL_CURSORS);
00476 
00477   p->data_size = sizeof (gint32) + sizeof (gint32) + sizeof (gint64)* n;
00478   p->data = g_malloc (p->data_size);
00479 
00480   t_n = GINT32_TO_LE (n);
00481   pos = sizeof (gint32) * 2;
00482   memcpy (p->data, (void *)&zero, sizeof (gint32));
00483   memcpy (p->data + sizeof (gint32), (void *)&t_n, sizeof (gint32));
00484 
00485   for (i = 1; i <= n; i++)
00486     {
00487       t_cid = va_arg (ap, gint64);
00488       t_cid = GINT64_TO_LE (t_cid);
00489 
00490       memcpy (p->data + pos, (void *)&t_cid, sizeof (gint64));
00491       pos += sizeof (gint64);
00492     }
00493 
00494   p->header.length = GINT32_TO_LE (sizeof (p->header) + p->data_size);
00495 
00496   return p;
00497 }
00498 
00499 mongo_packet *
00500 mongo_wire_cmd_kill_cursors (gint32 id, gint32 n, ...)
00501 {
00502   va_list ap;
00503   mongo_packet *p;
00504 
00505   if (n <= 0)
00506     {
00507       errno = EINVAL;
00508       return NULL;
00509     }
00510 
00511   va_start (ap, n);
00512   p = mongo_wire_cmd_kill_cursors_va (id, n, ap);
00513   va_end (ap);
00514 
00515   return p;
00516 }
00517 
00518 mongo_packet *
00519 mongo_wire_cmd_custom (gint32 id, const gchar *db, gint32 flags,
00520                        const bson *command)
00521 {
00522   mongo_packet *p;
00523   gchar *ns;
00524   bson *empty;
00525 
00526   if (!db || !command)
00527     {
00528       errno = EINVAL;
00529       return NULL;
00530     }
00531 
00532   if (bson_size (command) < 0)
00533     {
00534       errno = EINVAL;
00535       return NULL;
00536     }
00537 
00538   ns = g_strconcat (db, ".$cmd", NULL);
00539 
00540   empty = bson_new ();
00541   bson_finish (empty);
00542 
00543   p = mongo_wire_cmd_query (id, ns, flags, 0, 1, command, empty);
00544   g_free (ns);
00545   bson_free (empty);
00546   return p;
00547 }
00548 
00549 gboolean
00550 mongo_wire_reply_packet_get_header (const mongo_packet *p,
00551                                     mongo_reply_packet_header *hdr)
00552 {
00553   mongo_reply_packet_header h;
00554   const guint8 *data;
00555 
00556   if (!p || !hdr)
00557     {
00558       errno = EINVAL;
00559       return FALSE;
00560     }
00561 
00562   if (p->header.opcode != OP_REPLY)
00563     {
00564       errno = EPROTO;
00565       return FALSE;
00566     }
00567 
00568   if (mongo_wire_packet_get_data (p, &data) == -1)
00569     return FALSE;
00570 
00571   memcpy (&h, data, sizeof (mongo_reply_packet_header));
00572 
00573   hdr->flags = GINT32_FROM_LE (h.flags);
00574   hdr->cursor_id = GINT64_FROM_LE (h.cursor_id);
00575   hdr->start = GINT32_FROM_LE (h.start);
00576   hdr->returned = GINT32_FROM_LE (h.returned);
00577 
00578   return TRUE;
00579 }
00580 
00581 gboolean
00582 mongo_wire_reply_packet_get_data (const mongo_packet *p,
00583                                   const guint8 **data)
00584 {
00585   const guint8 *d;
00586 
00587   if (!p || !data)
00588     {
00589       errno = EINVAL;
00590       return FALSE;
00591     }
00592 
00593   if (p->header.opcode != OP_REPLY)
00594     {
00595       errno = EPROTO;
00596       return FALSE;
00597     }
00598 
00599   if (mongo_wire_packet_get_data (p, &d) == -1)
00600     return FALSE;
00601 
00602   *data = d + sizeof (mongo_reply_packet_header);
00603   return TRUE;
00604 }
00605 
00606 gboolean
00607 mongo_wire_reply_packet_get_nth_document (const mongo_packet *p,
00608                                           gint32 n,
00609                                           bson **doc)
00610 {
00611   const guint8 *d;
00612   mongo_reply_packet_header h;
00613   gint32 i;
00614   gint32 pos = 0;
00615 
00616   if (!p || !doc || n <= 0)
00617     {
00618       errno = EINVAL;
00619       return FALSE;
00620     }
00621 
00622   if (p->header.opcode != OP_REPLY)
00623     {
00624       errno = EPROTO;
00625       return FALSE;
00626     }
00627 
00628   if (!mongo_wire_reply_packet_get_header (p, &h))
00629     return FALSE;
00630 
00631   if (h.returned < n)
00632     {
00633       errno = ERANGE;
00634       return FALSE;
00635     }
00636 
00637   if (!mongo_wire_reply_packet_get_data (p, &d))
00638     return FALSE;
00639 
00640   for (i = 1; i < n; i++)
00641     pos += bson_stream_doc_size (d, pos);
00642 
00643   *doc = bson_new_from_data (d + pos, bson_stream_doc_size (d, pos) - 1);
00644   return TRUE;
00645 }
 All Data Structures Files Functions Variables Enumerations Enumerator Defines