libmongo-client 0.1.4
|
00001 /* mongo-wire.c - libmongo-client's MongoDB wire protocoll implementation. 00002 * Copyright 2011 Gergely Nagy <algernon@balabit.hu> 00003 * 00004 * Licensed under the Apache License, Version 2.0 (the "License"); 00005 * you may not use this file except in compliance with the License. 00006 * You may obtain a copy of the License at 00007 * 00008 * http://www.apache.org/licenses/LICENSE-2.0 00009 * 00010 * Unless required by applicable law or agreed to in writing, software 00011 * distributed under the License is distributed on an "AS IS" BASIS, 00012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00013 * See the License for the specific language governing permissions and 00014 * limitations under the License. 00015 */ 00016 00021 #include <glib.h> 00022 #include <string.h> 00023 #include <stdarg.h> 00024 #include <errno.h> 00025 00026 #include "bson.h" 00027 #include "mongo-wire.h" 00028 #include "libmongo-private.h" 00029 00031 static const gint32 zero = 0; 00032 00039 struct _mongo_packet 00040 { 00041 mongo_packet_header header; 00042 guint8 *data; 00043 gint32 data_size; 00044 }; 00045 00047 typedef enum 00048 { 00049 OP_REPLY = 1, 00050 OP_MSG = 1000, 00051 OP_UPDATE = 2001, 00052 OP_INSERT = 2002, 00053 OP_RESERVED = 2003, 00054 OP_QUERY = 2004, 00055 OP_GET_MORE = 2005, 00056 OP_DELETE = 2006, 00057 OP_KILL_CURSORS = 2007 00058 } mongo_wire_opcode; 00059 00060 mongo_packet * 00061 mongo_wire_packet_new (void) 00062 { 00063 mongo_packet *p = (mongo_packet *)g_new0 (mongo_packet, 1); 00064 00065 p->header.length = GINT32_TO_LE (sizeof (mongo_packet_header)); 00066 return p; 00067 } 00068 00069 gboolean 00070 mongo_wire_packet_get_header (const mongo_packet *p, 00071 mongo_packet_header *header) 00072 { 00073 if (!p || !header) 00074 { 00075 errno = EINVAL; 00076 return FALSE; 00077 } 00078 00079 header->length = GINT32_FROM_LE (p->header.length); 00080 header->id = GINT32_FROM_LE (p->header.id); 00081 header->resp_to = GINT32_FROM_LE (p->header.resp_to); 00082 header->opcode = GINT32_FROM_LE (p->header.opcode); 00083 00084 return TRUE; 00085 } 00086 00087 gboolean 00088 mongo_wire_packet_get_header_raw (const mongo_packet *p, 00089 mongo_packet_header *header) 00090 { 00091 if (!p || !header) 00092 { 00093 errno = EINVAL; 00094 return FALSE; 00095 } 00096 00097 header->length = p->header.length; 00098 header->id = p->header.id; 00099 header->resp_to = p->header.resp_to; 00100 header->opcode = p->header.opcode; 00101 00102 return TRUE; 00103 } 00104 00105 gboolean 00106 mongo_wire_packet_set_header (mongo_packet *p, 00107 const mongo_packet_header *header) 00108 { 00109 if (!p || !header) 00110 { 00111 errno = EINVAL; 00112 return FALSE; 00113 } 00114 if (GINT32_FROM_LE (header->length) < (gint32)sizeof (mongo_packet_header)) 00115 { 00116 errno = ERANGE; 00117 return FALSE; 00118 } 00119 00120 p->header.length = GINT32_TO_LE (header->length); 00121 p->header.id = GINT32_TO_LE (header->id); 00122 p->header.resp_to = GINT32_TO_LE (header->resp_to); 00123 p->header.opcode = GINT32_TO_LE (header->opcode); 00124 00125 p->data_size = header->length - sizeof (mongo_packet_header); 00126 00127 return TRUE; 00128 } 00129 00130 gboolean 00131 mongo_wire_packet_set_header_raw (mongo_packet *p, 00132 const mongo_packet_header *header) 00133 { 00134 if (!p || !header) 00135 { 00136 errno = EINVAL; 00137 return FALSE; 00138 } 00139 00140 p->header.length = header->length; 00141 p->header.id = header->id; 00142 p->header.resp_to = header->resp_to; 00143 p->header.opcode = header->opcode; 00144 00145 p->data_size = header->length - sizeof (mongo_packet_header); 00146 00147 return TRUE; 00148 } 00149 00150 gint32 00151 mongo_wire_packet_get_data (const mongo_packet *p, const guint8 **data) 00152 { 00153 if (!p || !data) 00154 { 00155 errno = EINVAL; 00156 return -1; 00157 } 00158 if (p->data == NULL) 00159 { 00160 errno = EINVAL; 00161 return -1; 00162 } 00163 00164 *data = (const guint8 *)p->data; 00165 return p->data_size; 00166 } 00167 00168 gboolean 00169 mongo_wire_packet_set_data (mongo_packet *p, const guint8 *data, gint32 size) 00170 { 00171 if (!p || !data || size <= 0) 00172 { 00173 errno = EINVAL; 00174 return FALSE; 00175 } 00176 00177 if (p->data) 00178 g_free (p->data); 00179 p->data = g_malloc (size); 00180 memcpy (p->data, data, size); 00181 00182 p->data_size = size; 00183 p->header.length = 00184 GINT32_TO_LE (p->data_size + sizeof (mongo_packet_header)); 00185 00186 return TRUE; 00187 } 00188 00189 void 00190 mongo_wire_packet_free (mongo_packet *p) 00191 { 00192 if (!p) 00193 { 00194 errno = EINVAL; 00195 return; 00196 } 00197 00198 if (p->data) 00199 g_free (p->data); 00200 g_free (p); 00201 } 00202 00203 mongo_packet * 00204 mongo_wire_cmd_update (gint32 id, const gchar *ns, gint32 flags, 00205 const bson *selector, const bson *update) 00206 { 00207 mongo_packet *p; 00208 gint32 t_flags = GINT32_TO_LE (flags); 00209 gint nslen; 00210 00211 if (!ns || !selector || !update) 00212 { 00213 errno = EINVAL; 00214 return NULL; 00215 } 00216 00217 if (bson_size (selector) < 0 || 00218 bson_size (update) < 0) 00219 { 00220 errno = EINVAL; 00221 return NULL; 00222 } 00223 00224 p = (mongo_packet *)g_new0 (mongo_packet, 1); 00225 p->header.id = GINT32_TO_LE (id); 00226 p->header.opcode = GINT32_TO_LE (OP_UPDATE); 00227 00228 nslen = strlen (ns) + 1; 00229 p->data_size = bson_size (selector) + bson_size (update) + 00230 sizeof (gint32) * 2 + nslen; 00231 00232 p->data = g_malloc (p->data_size); 00233 00234 memcpy (p->data, (void *)&zero, sizeof (gint32)); 00235 memcpy (p->data + sizeof (gint32), (void *)ns, nslen); 00236 memcpy (p->data + sizeof (gint32) + nslen, (void *)&t_flags, 00237 sizeof (gint32)); 00238 memcpy (p->data + sizeof (gint32) * 2 + nslen, 00239 bson_data (selector), bson_size (selector)); 00240 memcpy (p->data + sizeof (gint32) * 2 + nslen + bson_size (selector), 00241 bson_data (update), bson_size (update)); 00242 00243 p->header.length = GINT32_TO_LE (sizeof (p->header) + p->data_size); 00244 00245 return p; 00246 } 00247 00248 mongo_packet * 00249 mongo_wire_cmd_insert_n (gint32 id, const gchar *ns, gint32 n, 00250 const bson **docs) 00251 { 00252 mongo_packet *p; 00253 gint32 pos, dsize = 0; 00254 gint32 i; 00255 00256 if (!ns || !docs) 00257 { 00258 errno = EINVAL; 00259 return NULL; 00260 } 00261 00262 if (n <= 0) 00263 { 00264 errno = ERANGE; 00265 return NULL; 00266 } 00267 00268 for (i = 0; i < n; i++) 00269 { 00270 if (bson_size (docs[i]) <= 0) 00271 { 00272 errno = EINVAL; 00273 return NULL; 00274 } 00275 dsize += bson_size (docs[i]); 00276 } 00277 00278 p = (mongo_packet *)g_new0 (mongo_packet, 1); 00279 p->header.id = GINT32_TO_LE (id); 00280 p->header.opcode = GINT32_TO_LE (OP_INSERT); 00281 00282 pos = sizeof (gint32) + strlen (ns) + 1; 00283 p->data_size = pos + dsize; 00284 p->data = (guint8 *)g_malloc (p->data_size); 00285 00286 memcpy (p->data, (void *)&zero, sizeof (gint32)); 00287 memcpy (p->data + sizeof (gint32), (void *)ns, strlen (ns) + 1); 00288 00289 for (i = 0; i < n; i++) 00290 { 00291 memcpy (p->data + pos, bson_data (docs[i]), bson_size (docs[i])); 00292 pos += bson_size (docs[i]); 00293 } 00294 00295 p->header.length = GINT32_TO_LE (sizeof (p->header) + p->data_size); 00296 00297 return p; 00298 } 00299 00300 mongo_packet * 00301 mongo_wire_cmd_insert (gint32 id, const gchar *ns, ...) 00302 { 00303 mongo_packet *p; 00304 bson **docs, *d; 00305 gint32 n = 0; 00306 va_list ap; 00307 00308 if (!ns) 00309 { 00310 errno = EINVAL; 00311 return NULL; 00312 } 00313 00314 docs = (bson **)g_new0 (bson *, 1); 00315 00316 va_start (ap, ns); 00317 while ((d = (bson *)va_arg (ap, gpointer))) 00318 { 00319 if (bson_size (d) < 0) 00320 { 00321 g_free (docs); 00322 errno = EINVAL; 00323 return NULL; 00324 } 00325 00326 docs = (bson **)g_renew (bson *, docs, n + 1); 00327 docs[n++] = d; 00328 } 00329 va_end (ap); 00330 00331 p = mongo_wire_cmd_insert_n (id, ns, n, (const bson **)docs); 00332 g_free (docs); 00333 return p; 00334 } 00335 00336 mongo_packet * 00337 mongo_wire_cmd_query (gint32 id, const gchar *ns, gint32 flags, 00338 gint32 skip, gint32 ret, const bson *query, 00339 const bson *sel) 00340 { 00341 mongo_packet *p; 00342 gint32 tmp, nslen; 00343 00344 if (!ns || !query) 00345 { 00346 errno = EINVAL; 00347 return NULL; 00348 } 00349 00350 if (bson_size (query) < 0 || (sel && bson_size (sel) < 0)) 00351 { 00352 errno = EINVAL; 00353 return NULL; 00354 } 00355 00356 p = (mongo_packet *)g_new0 (mongo_packet, 1); 00357 p->header.id = GINT32_TO_LE (id); 00358 p->header.opcode = GINT32_TO_LE (OP_QUERY); 00359 00360 nslen = strlen (ns) + 1; 00361 p->data_size = 00362 sizeof (gint32) + nslen + sizeof (gint32) * 2 + bson_size (query); 00363 00364 if (sel) 00365 p->data_size += bson_size (sel); 00366 p->data = g_malloc (p->data_size); 00367 00368 tmp = GINT32_TO_LE (flags); 00369 memcpy (p->data, (void *)&tmp, sizeof (gint32)); 00370 memcpy (p->data + sizeof (gint32), (void *)ns, nslen); 00371 tmp = GINT32_TO_LE (skip); 00372 memcpy (p->data + sizeof (gint32) + nslen, (void *)&tmp, sizeof (gint32)); 00373 tmp = GINT32_TO_LE (ret); 00374 memcpy (p->data + sizeof (gint32) * 2 + nslen, 00375 (void *)&tmp, sizeof (gint32)); 00376 memcpy (p->data + sizeof (gint32) * 3 + nslen, bson_data (query), 00377 bson_size (query)); 00378 00379 if (sel) 00380 memcpy (p->data + sizeof (gint32) * 3 + nslen + bson_size (query), 00381 bson_data (sel), bson_size (sel)); 00382 00383 p->header.length = GINT32_TO_LE (sizeof (p->header) + p->data_size); 00384 00385 return p; 00386 } 00387 00388 mongo_packet * 00389 mongo_wire_cmd_get_more (gint32 id, const gchar *ns, 00390 gint32 ret, gint64 cursor_id) 00391 { 00392 mongo_packet *p; 00393 gint32 t_ret; 00394 gint64 t_cid; 00395 gint32 nslen; 00396 00397 if (!ns) 00398 { 00399 errno = EINVAL; 00400 return NULL; 00401 } 00402 00403 p = (mongo_packet *)g_new0 (mongo_packet, 1); 00404 p->header.id = GINT32_TO_LE (id); 00405 p->header.opcode = GINT32_TO_LE (OP_GET_MORE); 00406 00407 t_ret = GINT32_TO_LE (ret); 00408 t_cid = GINT64_TO_LE (cursor_id); 00409 00410 nslen = strlen (ns) + 1; 00411 p->data_size = sizeof (gint32) + nslen + sizeof (gint32) + sizeof (gint64); 00412 p->data = g_malloc (p->data_size); 00413 00414 memcpy (p->data, (void *)&zero, sizeof (gint32)); 00415 memcpy (p->data + sizeof (gint32), (void *)ns, nslen); 00416 memcpy (p->data + sizeof (gint32) + nslen, (void *)&t_ret, sizeof (gint32)); 00417 memcpy (p->data + sizeof (gint32) * 2 + nslen, 00418 (void *)&t_cid, sizeof (gint64)); 00419 00420 p->header.length = GINT32_TO_LE (sizeof (p->header) + p->data_size); 00421 00422 return p; 00423 } 00424 00425 mongo_packet * 00426 mongo_wire_cmd_delete (gint32 id, const gchar *ns, 00427 gint32 flags, const bson *sel) 00428 { 00429 mongo_packet *p; 00430 gint32 t_flags, nslen; 00431 00432 if (!ns || !sel) 00433 { 00434 errno = EINVAL; 00435 return NULL; 00436 } 00437 00438 if (bson_size (sel) < 0) 00439 { 00440 errno = EINVAL; 00441 return NULL; 00442 } 00443 00444 p = (mongo_packet *)g_new0 (mongo_packet, 1); 00445 p->header.id = GINT32_TO_LE (id); 00446 p->header.opcode = GINT32_TO_LE (OP_DELETE); 00447 00448 nslen = strlen (ns) + 1; 00449 p->data_size = sizeof (gint32) + nslen + sizeof (gint32) + bson_size (sel); 00450 p->data = g_malloc (p->data_size); 00451 00452 t_flags = GINT32_TO_LE (flags); 00453 00454 memcpy (p->data, (void *)&zero, sizeof (gint32)); 00455 memcpy (p->data + sizeof (gint32), (void *)ns, nslen); 00456 memcpy (p->data + sizeof (gint32) + nslen, 00457 (void *)&t_flags, sizeof (gint32)); 00458 memcpy (p->data + sizeof (gint32) * 2 + nslen, 00459 bson_data (sel), bson_size (sel)); 00460 00461 p->header.length = GINT32_TO_LE (sizeof (p->header) + p->data_size); 00462 00463 return p; 00464 } 00465 00466 mongo_packet * 00467 mongo_wire_cmd_kill_cursors_va (gint32 id, gint32 n, va_list ap) 00468 { 00469 mongo_packet *p; 00470 gint32 i, t_n, pos; 00471 gint64 t_cid; 00472 00473 p = (mongo_packet *)g_new0 (mongo_packet, 1); 00474 p->header.id = GINT32_TO_LE (id); 00475 p->header.opcode = GINT32_TO_LE (OP_KILL_CURSORS); 00476 00477 p->data_size = sizeof (gint32) + sizeof (gint32) + sizeof (gint64)* n; 00478 p->data = g_malloc (p->data_size); 00479 00480 t_n = GINT32_TO_LE (n); 00481 pos = sizeof (gint32) * 2; 00482 memcpy (p->data, (void *)&zero, sizeof (gint32)); 00483 memcpy (p->data + sizeof (gint32), (void *)&t_n, sizeof (gint32)); 00484 00485 for (i = 1; i <= n; i++) 00486 { 00487 t_cid = va_arg (ap, gint64); 00488 t_cid = GINT64_TO_LE (t_cid); 00489 00490 memcpy (p->data + pos, (void *)&t_cid, sizeof (gint64)); 00491 pos += sizeof (gint64); 00492 } 00493 00494 p->header.length = GINT32_TO_LE (sizeof (p->header) + p->data_size); 00495 00496 return p; 00497 } 00498 00499 mongo_packet * 00500 mongo_wire_cmd_kill_cursors (gint32 id, gint32 n, ...) 00501 { 00502 va_list ap; 00503 mongo_packet *p; 00504 00505 if (n <= 0) 00506 { 00507 errno = EINVAL; 00508 return NULL; 00509 } 00510 00511 va_start (ap, n); 00512 p = mongo_wire_cmd_kill_cursors_va (id, n, ap); 00513 va_end (ap); 00514 00515 return p; 00516 } 00517 00518 mongo_packet * 00519 mongo_wire_cmd_custom (gint32 id, const gchar *db, gint32 flags, 00520 const bson *command) 00521 { 00522 mongo_packet *p; 00523 gchar *ns; 00524 bson *empty; 00525 00526 if (!db || !command) 00527 { 00528 errno = EINVAL; 00529 return NULL; 00530 } 00531 00532 if (bson_size (command) < 0) 00533 { 00534 errno = EINVAL; 00535 return NULL; 00536 } 00537 00538 ns = g_strconcat (db, ".$cmd", NULL); 00539 00540 empty = bson_new (); 00541 bson_finish (empty); 00542 00543 p = mongo_wire_cmd_query (id, ns, flags, 0, 1, command, empty); 00544 g_free (ns); 00545 bson_free (empty); 00546 return p; 00547 } 00548 00549 gboolean 00550 mongo_wire_reply_packet_get_header (const mongo_packet *p, 00551 mongo_reply_packet_header *hdr) 00552 { 00553 mongo_reply_packet_header h; 00554 const guint8 *data; 00555 00556 if (!p || !hdr) 00557 { 00558 errno = EINVAL; 00559 return FALSE; 00560 } 00561 00562 if (p->header.opcode != OP_REPLY) 00563 { 00564 errno = EPROTO; 00565 return FALSE; 00566 } 00567 00568 if (mongo_wire_packet_get_data (p, &data) == -1) 00569 return FALSE; 00570 00571 memcpy (&h, data, sizeof (mongo_reply_packet_header)); 00572 00573 hdr->flags = GINT32_FROM_LE (h.flags); 00574 hdr->cursor_id = GINT64_FROM_LE (h.cursor_id); 00575 hdr->start = GINT32_FROM_LE (h.start); 00576 hdr->returned = GINT32_FROM_LE (h.returned); 00577 00578 return TRUE; 00579 } 00580 00581 gboolean 00582 mongo_wire_reply_packet_get_data (const mongo_packet *p, 00583 const guint8 **data) 00584 { 00585 const guint8 *d; 00586 00587 if (!p || !data) 00588 { 00589 errno = EINVAL; 00590 return FALSE; 00591 } 00592 00593 if (p->header.opcode != OP_REPLY) 00594 { 00595 errno = EPROTO; 00596 return FALSE; 00597 } 00598 00599 if (mongo_wire_packet_get_data (p, &d) == -1) 00600 return FALSE; 00601 00602 *data = d + sizeof (mongo_reply_packet_header); 00603 return TRUE; 00604 } 00605 00606 gboolean 00607 mongo_wire_reply_packet_get_nth_document (const mongo_packet *p, 00608 gint32 n, 00609 bson **doc) 00610 { 00611 const guint8 *d; 00612 mongo_reply_packet_header h; 00613 gint32 i; 00614 gint32 pos = 0; 00615 00616 if (!p || !doc || n <= 0) 00617 { 00618 errno = EINVAL; 00619 return FALSE; 00620 } 00621 00622 if (p->header.opcode != OP_REPLY) 00623 { 00624 errno = EPROTO; 00625 return FALSE; 00626 } 00627 00628 if (!mongo_wire_reply_packet_get_header (p, &h)) 00629 return FALSE; 00630 00631 if (h.returned < n) 00632 { 00633 errno = ERANGE; 00634 return FALSE; 00635 } 00636 00637 if (!mongo_wire_reply_packet_get_data (p, &d)) 00638 return FALSE; 00639 00640 for (i = 1; i < n; i++) 00641 pos += bson_stream_doc_size (d, pos); 00642 00643 *doc = bson_new_from_data (d + pos, bson_stream_doc_size (d, pos) - 1); 00644 return TRUE; 00645 }