libmongo-client 0.1.4
|
00001 /* sync-gridfs-stream.c - libmongo-client GridFS streaming implementation 00002 * Copyright 2011 Gergely Nagy <algernon@balabit.hu> 00003 * 00004 * Licensed under the Apache License, Version 2.0 (the "License"); 00005 * you may not use this file except in compliance with the License. 00006 * You may obtain a copy of the License at 00007 * 00008 * http://www.apache.org/licenses/LICENSE-2.0 00009 * 00010 * Unless required by applicable law or agreed to in writing, software 00011 * distributed under the License is distributed on an "AS IS" BASIS, 00012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00013 * See the License for the specific language governing permissions and 00014 * limitations under the License. 00015 */ 00016 00021 #include "sync-gridfs-stream.h" 00022 #include "libmongo-private.h" 00023 00024 #include <unistd.h> 00025 #include <errno.h> 00026 00027 mongo_sync_gridfs_stream * 00028 mongo_sync_gridfs_stream_find (mongo_sync_gridfs *gfs, 00029 const bson *query) 00030 { 00031 mongo_sync_gridfs_stream *stream; 00032 bson *meta = NULL; 00033 bson_cursor *c; 00034 mongo_packet *p; 00035 const guint8 *oid; 00036 00037 if (!gfs) 00038 { 00039 errno = ENOTCONN; 00040 return NULL; 00041 } 00042 if (!query) 00043 { 00044 errno = EINVAL; 00045 return NULL; 00046 } 00047 00048 p = mongo_sync_cmd_query (gfs->conn, gfs->ns.files, 0, 0, 1, query, NULL); 00049 if (!p) 00050 return NULL; 00051 00052 stream = g_new0 (mongo_sync_gridfs_stream, 1); 00053 stream->gfs = gfs; 00054 stream->file.type = LMC_GRIDFS_FILE_STREAM_READER; 00055 00056 mongo_wire_reply_packet_get_nth_document (p, 1, &meta); 00057 bson_finish (meta); 00058 mongo_wire_packet_free (p); 00059 00060 c = bson_find (meta, "_id"); 00061 if (!bson_cursor_get_oid (c, &oid)) 00062 { 00063 bson_cursor_free (c); 00064 bson_free (meta); 00065 g_free (stream); 00066 00067 errno = EPROTO; 00068 return NULL; 00069 } 00070 stream->file.id = g_malloc (12); 00071 memcpy (stream->file.id, oid, 12); 00072 00073 bson_cursor_find (c, "length"); 00074 bson_cursor_get_int64 (c, &stream->file.length); 00075 if (stream->file.length == 0) 00076 { 00077 gint32 i = 0; 00078 00079 bson_cursor_get_int32 (c, &i); 00080 stream->file.length = i; 00081 } 00082 00083 bson_cursor_find (c, "chunkSize"); 00084 bson_cursor_get_int32 (c, &stream->file.chunk_size); 00085 00086 bson_cursor_free (c); 00087 bson_free (meta); 00088 00089 if (stream->file.length == 0 || 00090 stream->file.chunk_size == 0) 00091 { 00092 g_free (stream->file.id); 00093 g_free (stream); 00094 00095 errno = EPROTO; 00096 return NULL; 00097 } 00098 00099 return stream; 00100 } 00101 00102 mongo_sync_gridfs_stream * 00103 mongo_sync_gridfs_stream_new (mongo_sync_gridfs *gfs, 00104 const bson *metadata) 00105 { 00106 mongo_sync_gridfs_stream *stream; 00107 bson_cursor *c; 00108 00109 if (!gfs) 00110 { 00111 errno = ENOTCONN; 00112 return NULL; 00113 } 00114 00115 stream = g_new0 (mongo_sync_gridfs_stream, 1); 00116 stream->file.type = LMC_GRIDFS_FILE_STREAM_WRITER; 00117 stream->gfs = gfs; 00118 00119 stream->file.chunk_size = gfs->chunk_size; 00120 00121 stream->writer.metadata = bson_new_from_data (bson_data (metadata), 00122 bson_size (metadata) - 1); 00123 00124 c = bson_find (metadata, "_id"); 00125 if (!c) 00126 { 00127 stream->file.id = mongo_util_oid_new 00128 (mongo_connection_get_requestid ((mongo_connection *)gfs->conn)); 00129 if (!stream->file.id) 00130 { 00131 bson_free (stream->writer.metadata); 00132 g_free (stream); 00133 00134 errno = EFAULT; 00135 return NULL; 00136 } 00137 bson_append_oid (stream->writer.metadata, "_id", stream->file.id); 00138 } 00139 else 00140 { 00141 const guint8 *oid; 00142 00143 if (!bson_cursor_get_oid (c, &oid)) 00144 { 00145 bson_cursor_free (c); 00146 bson_free (stream->writer.metadata); 00147 g_free (stream); 00148 00149 errno = EPROTO; 00150 return NULL; 00151 } 00152 00153 stream->file.id = g_malloc (12); 00154 memcpy (stream->file.id, oid, 12); 00155 } 00156 bson_cursor_free (c); 00157 bson_finish (stream->writer.metadata); 00158 00159 stream->writer.buffer = g_malloc (stream->file.chunk_size); 00160 stream->writer.checksum = g_checksum_new (G_CHECKSUM_MD5); 00161 00162 return stream; 00163 } 00164 00165 static inline gboolean 00166 _stream_seek_chunk (mongo_sync_gridfs_stream *stream, 00167 gint64 chunk) 00168 { 00169 bson *b; 00170 mongo_packet *p; 00171 bson_cursor *c; 00172 bson_binary_subtype subt = BSON_BINARY_SUBTYPE_USER_DEFINED; 00173 gboolean r; 00174 00175 b = bson_new_sized (32); 00176 bson_append_oid (b, "files_id", stream->file.id); 00177 bson_append_int64 (b, "n", chunk); 00178 bson_finish (b); 00179 00180 p = mongo_sync_cmd_query (stream->gfs->conn, 00181 stream->gfs->ns.chunks, 0, 00182 0, 1, b, NULL); 00183 bson_free (b); 00184 00185 bson_free (stream->reader.bson); 00186 stream->reader.bson = NULL; 00187 stream->reader.chunk.data = NULL; 00188 00189 mongo_wire_reply_packet_get_nth_document (p, 1, &stream->reader.bson); 00190 mongo_wire_packet_free (p); 00191 bson_finish (stream->reader.bson); 00192 00193 c = bson_find (stream->reader.bson, "data"); 00194 r = bson_cursor_get_binary (c, &subt, &stream->reader.chunk.data, 00195 &stream->reader.chunk.size); 00196 if (!r || subt != BSON_BINARY_SUBTYPE_GENERIC) 00197 { 00198 bson_cursor_free (c); 00199 bson_free (stream->reader.bson); 00200 stream->reader.bson = NULL; 00201 stream->reader.chunk.data = NULL; 00202 00203 errno = EPROTO; 00204 return FALSE; 00205 } 00206 bson_cursor_free (c); 00207 00208 stream->reader.chunk.offset = 0; 00209 return TRUE; 00210 } 00211 00212 gint64 00213 mongo_sync_gridfs_stream_read (mongo_sync_gridfs_stream *stream, 00214 guint8 *buffer, 00215 gint64 size) 00216 { 00217 gint64 pos = 0; 00218 00219 if (!stream) 00220 { 00221 errno = ENOENT; 00222 return -1; 00223 } 00224 if (stream->file.type != LMC_GRIDFS_FILE_STREAM_READER) 00225 { 00226 errno = EOPNOTSUPP; 00227 return -1; 00228 } 00229 if (!buffer || size <= 0) 00230 { 00231 errno = EINVAL; 00232 return -1; 00233 } 00234 00235 if (!stream->reader.chunk.data) 00236 { 00237 if (!_stream_seek_chunk (stream, 0)) 00238 return -1; 00239 } 00240 00241 while (pos < size && stream->file.offset < stream->file.length) 00242 { 00243 gint32 csize = stream->reader.chunk.size - stream->reader.chunk.offset; 00244 00245 if (size - pos < csize) 00246 csize = size - pos; 00247 00248 memcpy (buffer + pos, 00249 stream->reader.chunk.data + stream->reader.chunk.offset, csize); 00250 00251 stream->reader.chunk.offset += csize; 00252 stream->file.offset += csize; 00253 pos += csize; 00254 00255 if (stream->reader.chunk.offset >= stream->reader.chunk.size && 00256 stream->file.offset < stream->file.length) 00257 { 00258 stream->file.current_chunk++; 00259 if (!_stream_seek_chunk (stream, stream->file.current_chunk)) 00260 return -1; 00261 } 00262 } 00263 00264 return pos; 00265 } 00266 00267 static gboolean 00268 _stream_chunk_write (mongo_sync_gridfs *gfs, 00269 const guint8 *oid, gint64 n, 00270 const guint8 *buffer, gint32 size) 00271 { 00272 bson *chunk; 00273 00274 chunk = bson_new_sized (size + 128); 00275 bson_append_oid (chunk, "files_id", oid); 00276 bson_append_int64 (chunk, "n", n); 00277 bson_append_binary (chunk, "data", BSON_BINARY_SUBTYPE_GENERIC, 00278 buffer, size); 00279 bson_finish (chunk); 00280 00281 if (!mongo_sync_cmd_insert (gfs->conn, gfs->ns.chunks, chunk, NULL)) 00282 { 00283 int e = errno; 00284 00285 bson_free (chunk); 00286 errno = e; 00287 return FALSE; 00288 } 00289 bson_free (chunk); 00290 00291 return TRUE; 00292 } 00293 00294 gboolean 00295 mongo_sync_gridfs_stream_write (mongo_sync_gridfs_stream *stream, 00296 const guint8 *buffer, 00297 gint64 size) 00298 { 00299 gint64 pos = 0; 00300 00301 if (!stream) 00302 { 00303 errno = ENOENT; 00304 return FALSE; 00305 } 00306 if (stream->file.type != LMC_GRIDFS_FILE_STREAM_WRITER) 00307 { 00308 errno = EOPNOTSUPP; 00309 return FALSE; 00310 } 00311 if (!buffer || size <= 0) 00312 { 00313 errno = EINVAL; 00314 return FALSE; 00315 } 00316 00317 while (pos < size) 00318 { 00319 gint32 csize = stream->file.chunk_size - stream->writer.buffer_offset; 00320 00321 if (size - pos < csize) 00322 csize = size - pos; 00323 00324 memcpy (stream->writer.buffer + stream->writer.buffer_offset, 00325 buffer + pos, csize); 00326 stream->writer.buffer_offset += csize; 00327 stream->file.offset += csize; 00328 stream->file.length += csize; 00329 pos += csize; 00330 00331 if (stream->writer.buffer_offset == stream->file.chunk_size) 00332 { 00333 if (!_stream_chunk_write (stream->gfs, 00334 stream->file.id, 00335 stream->file.current_chunk, 00336 stream->writer.buffer, 00337 stream->file.chunk_size)) 00338 return FALSE; 00339 g_checksum_update (stream->writer.checksum, stream->writer.buffer, 00340 stream->file.chunk_size); 00341 00342 stream->writer.buffer_offset = 0; 00343 stream->file.current_chunk++; 00344 } 00345 } 00346 00347 return TRUE; 00348 } 00349 00350 gboolean 00351 mongo_sync_gridfs_stream_seek (mongo_sync_gridfs_stream *stream, 00352 gint64 pos, 00353 gint whence) 00354 { 00355 gint64 real_pos = 0; 00356 gint64 chunk; 00357 gint32 offs; 00358 00359 if (!stream) 00360 { 00361 errno = ENOENT; 00362 return FALSE; 00363 } 00364 if (stream->file.type != LMC_GRIDFS_FILE_STREAM_READER) 00365 { 00366 errno = EOPNOTSUPP; 00367 return FALSE; 00368 } 00369 00370 switch (whence) 00371 { 00372 case SEEK_SET: 00373 if (pos == stream->file.offset) 00374 return TRUE; 00375 if (pos < 0 || pos > stream->file.length) 00376 { 00377 errno = ERANGE; 00378 return FALSE; 00379 } 00380 real_pos = pos; 00381 break; 00382 case SEEK_CUR: 00383 if (pos + stream->file.offset < 0 || 00384 pos + stream->file.offset > stream->file.length) 00385 { 00386 errno = ERANGE; 00387 return FALSE; 00388 } 00389 if (pos == 0) 00390 return TRUE; 00391 real_pos = pos + stream->file.offset; 00392 break; 00393 case SEEK_END: 00394 if (pos > 0 || pos + stream->file.length < 0) 00395 { 00396 errno = ERANGE; 00397 return FALSE; 00398 } 00399 real_pos = pos + stream->file.length; 00400 break; 00401 default: 00402 errno = EINVAL; 00403 return FALSE; 00404 } 00405 00406 chunk = real_pos / stream->file.chunk_size; 00407 offs = real_pos % stream->file.chunk_size; 00408 00409 if (!_stream_seek_chunk (stream, chunk)) 00410 return FALSE; 00411 00412 stream->reader.chunk.offset = offs; 00413 stream->file.current_chunk = chunk; 00414 stream->file.offset = real_pos; 00415 00416 return TRUE; 00417 } 00418 00419 gboolean 00420 mongo_sync_gridfs_stream_close (mongo_sync_gridfs_stream *stream) 00421 { 00422 if (!stream) 00423 { 00424 errno = ENOENT; 00425 return FALSE; 00426 } 00427 00428 if (stream->file.type != LMC_GRIDFS_FILE_STREAM_READER && 00429 stream->file.type != LMC_GRIDFS_FILE_STREAM_WRITER) 00430 { 00431 errno = EINVAL; 00432 return FALSE; 00433 } 00434 00435 if (stream->file.type == LMC_GRIDFS_FILE_STREAM_WRITER) 00436 { 00437 bson *meta; 00438 gint64 upload_date; 00439 GTimeVal tv; 00440 gboolean closed = FALSE; 00441 00442 if (stream->writer.buffer_offset > 0) 00443 { 00444 closed = _stream_chunk_write (stream->gfs, 00445 stream->file.id, 00446 stream->file.current_chunk, 00447 stream->writer.buffer, 00448 stream->writer.buffer_offset); 00449 00450 if (closed) 00451 g_checksum_update (stream->writer.checksum, 00452 stream->writer.buffer, 00453 stream->writer.buffer_offset); 00454 } 00455 00456 if (closed) 00457 { 00458 g_get_current_time (&tv); 00459 upload_date = (((gint64) tv.tv_sec) * 1000) + 00460 (gint64)(tv.tv_usec / 1000); 00461 00462 /* _id is guaranteed by _stream_new() */ 00463 meta = bson_new_from_data (bson_data (stream->writer.metadata), 00464 bson_size (stream->writer.metadata) - 1); 00465 bson_append_int64 (meta, "length", stream->file.length); 00466 bson_append_int32 (meta, "chunkSize", stream->file.chunk_size); 00467 bson_append_utc_datetime (meta, "uploadDate", upload_date); 00468 if (stream->file.length) 00469 bson_append_string (meta, "md5", 00470 g_checksum_get_string (stream->writer.checksum), -1); 00471 bson_finish (meta); 00472 00473 if (!mongo_sync_cmd_insert (stream->gfs->conn, 00474 stream->gfs->ns.files, meta, NULL)) 00475 { 00476 int e = errno; 00477 00478 bson_free (meta); 00479 errno = e; 00480 return FALSE; 00481 } 00482 bson_free (meta); 00483 } 00484 00485 bson_free (stream->writer.metadata); 00486 g_checksum_free (stream->writer.checksum); 00487 g_free (stream->writer.buffer); 00488 } 00489 else 00490 bson_free (stream->reader.bson); 00491 00492 g_free (stream->file.id); 00493 g_free (stream); 00494 return TRUE; 00495 }