libmongo-client 0.1.4
src/sync-gridfs-stream.c
Go to the documentation of this file.
00001 /* sync-gridfs-stream.c - libmongo-client GridFS streaming implementation
00002  * Copyright 2011 Gergely Nagy <algernon@balabit.hu>
00003  *
00004  * Licensed under the Apache License, Version 2.0 (the "License");
00005  * you may not use this file except in compliance with the License.
00006  * You may obtain a copy of the License at
00007  *
00008  *     http://www.apache.org/licenses/LICENSE-2.0
00009  *
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS,
00012  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  * See the License for the specific language governing permissions and
00014  * limitations under the License.
00015  */
00016 
00021 #include "sync-gridfs-stream.h"
00022 #include "libmongo-private.h"
00023 
00024 #include <unistd.h>
00025 #include <errno.h>
00026 
00027 mongo_sync_gridfs_stream *
00028 mongo_sync_gridfs_stream_find (mongo_sync_gridfs *gfs,
00029                                const bson *query)
00030 {
00031   mongo_sync_gridfs_stream *stream;
00032   bson *meta = NULL;
00033   bson_cursor *c;
00034   mongo_packet *p;
00035   const guint8 *oid;
00036 
00037   if (!gfs)
00038     {
00039       errno = ENOTCONN;
00040       return NULL;
00041     }
00042   if (!query)
00043     {
00044       errno = EINVAL;
00045       return NULL;
00046     }
00047 
00048   p = mongo_sync_cmd_query (gfs->conn, gfs->ns.files, 0, 0, 1, query, NULL);
00049   if (!p)
00050     return NULL;
00051 
00052   stream = g_new0 (mongo_sync_gridfs_stream, 1);
00053   stream->gfs = gfs;
00054   stream->file.type = LMC_GRIDFS_FILE_STREAM_READER;
00055 
00056   mongo_wire_reply_packet_get_nth_document (p, 1, &meta);
00057   bson_finish (meta);
00058   mongo_wire_packet_free (p);
00059 
00060   c = bson_find (meta, "_id");
00061   if (!bson_cursor_get_oid (c, &oid))
00062     {
00063       bson_cursor_free (c);
00064       bson_free (meta);
00065       g_free (stream);
00066 
00067       errno = EPROTO;
00068       return NULL;
00069     }
00070   stream->file.id = g_malloc (12);
00071   memcpy (stream->file.id, oid, 12);
00072 
00073   bson_cursor_find (c, "length");
00074   bson_cursor_get_int64 (c, &stream->file.length);
00075   if (stream->file.length == 0)
00076     {
00077       gint32 i = 0;
00078 
00079       bson_cursor_get_int32 (c, &i);
00080       stream->file.length = i;
00081     }
00082 
00083   bson_cursor_find (c, "chunkSize");
00084   bson_cursor_get_int32 (c, &stream->file.chunk_size);
00085 
00086   bson_cursor_free (c);
00087   bson_free (meta);
00088 
00089   if (stream->file.length == 0 ||
00090       stream->file.chunk_size == 0)
00091     {
00092       g_free (stream->file.id);
00093       g_free (stream);
00094 
00095       errno = EPROTO;
00096       return NULL;
00097     }
00098 
00099   return stream;
00100 }
00101 
00102 mongo_sync_gridfs_stream *
00103 mongo_sync_gridfs_stream_new (mongo_sync_gridfs *gfs,
00104                               const bson *metadata)
00105 {
00106   mongo_sync_gridfs_stream *stream;
00107   bson_cursor *c;
00108 
00109   if (!gfs)
00110     {
00111       errno = ENOTCONN;
00112       return NULL;
00113     }
00114 
00115   stream = g_new0 (mongo_sync_gridfs_stream, 1);
00116   stream->file.type = LMC_GRIDFS_FILE_STREAM_WRITER;
00117   stream->gfs = gfs;
00118 
00119   stream->file.chunk_size = gfs->chunk_size;
00120 
00121   stream->writer.metadata = bson_new_from_data (bson_data (metadata),
00122                                                 bson_size (metadata) - 1);
00123 
00124   c = bson_find (metadata, "_id");
00125   if (!c)
00126     {
00127       stream->file.id = mongo_util_oid_new
00128         (mongo_connection_get_requestid ((mongo_connection *)gfs->conn));
00129       if (!stream->file.id)
00130         {
00131           bson_free (stream->writer.metadata);
00132           g_free (stream);
00133 
00134           errno = EFAULT;
00135           return NULL;
00136         }
00137       bson_append_oid (stream->writer.metadata, "_id", stream->file.id);
00138     }
00139   else
00140     {
00141       const guint8 *oid;
00142 
00143       if (!bson_cursor_get_oid (c, &oid))
00144         {
00145           bson_cursor_free (c);
00146           bson_free (stream->writer.metadata);
00147           g_free (stream);
00148 
00149           errno = EPROTO;
00150           return NULL;
00151         }
00152 
00153       stream->file.id = g_malloc (12);
00154       memcpy (stream->file.id, oid, 12);
00155     }
00156   bson_cursor_free (c);
00157   bson_finish (stream->writer.metadata);
00158 
00159   stream->writer.buffer = g_malloc (stream->file.chunk_size);
00160   stream->writer.checksum = g_checksum_new (G_CHECKSUM_MD5);
00161 
00162   return stream;
00163 }
00164 
00165 static inline gboolean
00166 _stream_seek_chunk (mongo_sync_gridfs_stream *stream,
00167                     gint64 chunk)
00168 {
00169   bson *b;
00170   mongo_packet *p;
00171   bson_cursor *c;
00172   bson_binary_subtype subt = BSON_BINARY_SUBTYPE_USER_DEFINED;
00173   gboolean r;
00174 
00175   b = bson_new_sized (32);
00176   bson_append_oid (b, "files_id", stream->file.id);
00177   bson_append_int64 (b, "n", chunk);
00178   bson_finish (b);
00179 
00180   p = mongo_sync_cmd_query (stream->gfs->conn,
00181                             stream->gfs->ns.chunks, 0,
00182                             0, 1, b, NULL);
00183   bson_free (b);
00184 
00185   bson_free (stream->reader.bson);
00186   stream->reader.bson = NULL;
00187   stream->reader.chunk.data = NULL;
00188 
00189   mongo_wire_reply_packet_get_nth_document (p, 1, &stream->reader.bson);
00190   mongo_wire_packet_free (p);
00191   bson_finish (stream->reader.bson);
00192 
00193   c = bson_find (stream->reader.bson, "data");
00194   r = bson_cursor_get_binary (c, &subt, &stream->reader.chunk.data,
00195                               &stream->reader.chunk.size);
00196   if (!r || subt != BSON_BINARY_SUBTYPE_GENERIC)
00197     {
00198       bson_cursor_free (c);
00199       bson_free (stream->reader.bson);
00200       stream->reader.bson = NULL;
00201       stream->reader.chunk.data = NULL;
00202 
00203       errno = EPROTO;
00204       return FALSE;
00205     }
00206   bson_cursor_free (c);
00207 
00208   stream->reader.chunk.offset = 0;
00209   return TRUE;
00210 }
00211 
00212 gint64
00213 mongo_sync_gridfs_stream_read (mongo_sync_gridfs_stream *stream,
00214                                guint8 *buffer,
00215                                gint64 size)
00216 {
00217   gint64 pos = 0;
00218 
00219   if (!stream)
00220     {
00221       errno = ENOENT;
00222       return -1;
00223     }
00224   if (stream->file.type != LMC_GRIDFS_FILE_STREAM_READER)
00225     {
00226       errno = EOPNOTSUPP;
00227       return -1;
00228     }
00229   if (!buffer || size <= 0)
00230     {
00231       errno = EINVAL;
00232       return -1;
00233     }
00234 
00235   if (!stream->reader.chunk.data)
00236     {
00237       if (!_stream_seek_chunk (stream, 0))
00238         return -1;
00239     }
00240 
00241   while (pos < size && stream->file.offset < stream->file.length)
00242     {
00243       gint32 csize = stream->reader.chunk.size - stream->reader.chunk.offset;
00244 
00245       if (size - pos < csize)
00246         csize = size - pos;
00247 
00248       memcpy (buffer + pos,
00249               stream->reader.chunk.data + stream->reader.chunk.offset, csize);
00250 
00251       stream->reader.chunk.offset += csize;
00252       stream->file.offset += csize;
00253       pos += csize;
00254 
00255       if (stream->reader.chunk.offset >= stream->reader.chunk.size &&
00256           stream->file.offset < stream->file.length)
00257         {
00258           stream->file.current_chunk++;
00259           if (!_stream_seek_chunk (stream, stream->file.current_chunk))
00260             return -1;
00261         }
00262     }
00263 
00264   return pos;
00265 }
00266 
00267 static gboolean
00268 _stream_chunk_write (mongo_sync_gridfs *gfs,
00269                      const guint8 *oid, gint64 n,
00270                      const guint8 *buffer, gint32 size)
00271 {
00272   bson *chunk;
00273 
00274   chunk = bson_new_sized (size + 128);
00275   bson_append_oid (chunk, "files_id", oid);
00276   bson_append_int64 (chunk, "n", n);
00277   bson_append_binary (chunk, "data", BSON_BINARY_SUBTYPE_GENERIC,
00278                       buffer, size);
00279   bson_finish (chunk);
00280 
00281   if (!mongo_sync_cmd_insert (gfs->conn, gfs->ns.chunks, chunk, NULL))
00282     {
00283       int e = errno;
00284 
00285       bson_free (chunk);
00286       errno = e;
00287       return FALSE;
00288     }
00289   bson_free (chunk);
00290 
00291   return TRUE;
00292 }
00293 
00294 gboolean
00295 mongo_sync_gridfs_stream_write (mongo_sync_gridfs_stream *stream,
00296                                 const guint8 *buffer,
00297                                 gint64 size)
00298 {
00299   gint64 pos = 0;
00300 
00301   if (!stream)
00302     {
00303       errno = ENOENT;
00304       return FALSE;
00305     }
00306   if (stream->file.type != LMC_GRIDFS_FILE_STREAM_WRITER)
00307     {
00308       errno = EOPNOTSUPP;
00309       return FALSE;
00310     }
00311   if (!buffer || size <= 0)
00312     {
00313       errno = EINVAL;
00314       return FALSE;
00315     }
00316 
00317   while (pos < size)
00318     {
00319       gint32 csize = stream->file.chunk_size - stream->writer.buffer_offset;
00320 
00321       if (size - pos < csize)
00322         csize = size - pos;
00323 
00324       memcpy (stream->writer.buffer + stream->writer.buffer_offset,
00325               buffer + pos, csize);
00326       stream->writer.buffer_offset += csize;
00327       stream->file.offset += csize;
00328       stream->file.length += csize;
00329       pos += csize;
00330 
00331       if (stream->writer.buffer_offset == stream->file.chunk_size)
00332         {
00333           if (!_stream_chunk_write (stream->gfs,
00334                                     stream->file.id,
00335                                     stream->file.current_chunk,
00336                                     stream->writer.buffer,
00337                                     stream->file.chunk_size))
00338             return FALSE;
00339           g_checksum_update (stream->writer.checksum, stream->writer.buffer,
00340                              stream->file.chunk_size);
00341 
00342           stream->writer.buffer_offset = 0;
00343           stream->file.current_chunk++;
00344         }
00345     }
00346 
00347   return TRUE;
00348 }
00349 
00350 gboolean
00351 mongo_sync_gridfs_stream_seek (mongo_sync_gridfs_stream *stream,
00352                                gint64 pos,
00353                                gint whence)
00354 {
00355   gint64 real_pos = 0;
00356   gint64 chunk;
00357   gint32 offs;
00358 
00359   if (!stream)
00360     {
00361       errno = ENOENT;
00362       return FALSE;
00363     }
00364   if (stream->file.type != LMC_GRIDFS_FILE_STREAM_READER)
00365     {
00366       errno = EOPNOTSUPP;
00367       return FALSE;
00368     }
00369 
00370   switch (whence)
00371     {
00372     case SEEK_SET:
00373       if (pos == stream->file.offset)
00374         return TRUE;
00375       if (pos < 0 || pos > stream->file.length)
00376         {
00377           errno = ERANGE;
00378           return FALSE;
00379         }
00380       real_pos = pos;
00381       break;
00382     case SEEK_CUR:
00383       if (pos + stream->file.offset < 0 ||
00384           pos + stream->file.offset > stream->file.length)
00385         {
00386           errno = ERANGE;
00387           return FALSE;
00388         }
00389       if (pos == 0)
00390         return TRUE;
00391       real_pos = pos + stream->file.offset;
00392       break;
00393     case SEEK_END:
00394       if (pos > 0 || pos + stream->file.length < 0)
00395         {
00396           errno = ERANGE;
00397           return FALSE;
00398         }
00399       real_pos = pos + stream->file.length;
00400       break;
00401     default:
00402       errno = EINVAL;
00403       return FALSE;
00404     }
00405 
00406   chunk = real_pos / stream->file.chunk_size;
00407   offs = real_pos % stream->file.chunk_size;
00408 
00409   if (!_stream_seek_chunk (stream, chunk))
00410     return FALSE;
00411 
00412   stream->reader.chunk.offset = offs;
00413   stream->file.current_chunk = chunk;
00414   stream->file.offset = real_pos;
00415 
00416   return TRUE;
00417 }
00418 
00419 gboolean
00420 mongo_sync_gridfs_stream_close (mongo_sync_gridfs_stream *stream)
00421 {
00422   if (!stream)
00423     {
00424       errno = ENOENT;
00425       return FALSE;
00426     }
00427 
00428   if (stream->file.type != LMC_GRIDFS_FILE_STREAM_READER &&
00429       stream->file.type != LMC_GRIDFS_FILE_STREAM_WRITER)
00430     {
00431       errno = EINVAL;
00432       return FALSE;
00433     }
00434 
00435   if (stream->file.type == LMC_GRIDFS_FILE_STREAM_WRITER)
00436     {
00437       bson *meta;
00438       gint64 upload_date;
00439       GTimeVal tv;
00440       gboolean closed = FALSE;
00441 
00442       if (stream->writer.buffer_offset > 0)
00443         {
00444           closed = _stream_chunk_write (stream->gfs,
00445                                         stream->file.id,
00446                                         stream->file.current_chunk,
00447                                         stream->writer.buffer,
00448                                         stream->writer.buffer_offset);
00449 
00450           if (closed)
00451             g_checksum_update (stream->writer.checksum,
00452                                stream->writer.buffer,
00453                                stream->writer.buffer_offset);
00454         }
00455 
00456       if (closed)
00457         {
00458           g_get_current_time (&tv);
00459           upload_date =  (((gint64) tv.tv_sec) * 1000) +
00460             (gint64)(tv.tv_usec / 1000);
00461 
00462           /* _id is guaranteed by _stream_new() */
00463           meta = bson_new_from_data (bson_data (stream->writer.metadata),
00464                                      bson_size (stream->writer.metadata) - 1);
00465           bson_append_int64 (meta, "length", stream->file.length);
00466           bson_append_int32 (meta, "chunkSize", stream->file.chunk_size);
00467           bson_append_utc_datetime (meta, "uploadDate", upload_date);
00468           if (stream->file.length)
00469             bson_append_string (meta, "md5",
00470                                 g_checksum_get_string (stream->writer.checksum), -1);
00471           bson_finish (meta);
00472 
00473           if (!mongo_sync_cmd_insert (stream->gfs->conn,
00474                                       stream->gfs->ns.files, meta, NULL))
00475             {
00476               int e = errno;
00477 
00478               bson_free (meta);
00479               errno = e;
00480               return FALSE;
00481             }
00482           bson_free (meta);
00483         }
00484 
00485       bson_free (stream->writer.metadata);
00486       g_checksum_free (stream->writer.checksum);
00487       g_free (stream->writer.buffer);
00488     }
00489   else
00490     bson_free (stream->reader.bson);
00491 
00492   g_free (stream->file.id);
00493   g_free (stream);
00494   return TRUE;
00495 }
 All Data Structures Files Functions Variables Enumerations Enumerator Defines