Squash  0.7.0
stream.c
1 /* Copyright (c) 2013-2015 The Squash Authors
2  *
3  * Permission is hereby granted, free of charge, to any person
4  * obtaining a copy of this software and associated documentation
5  * files (the "Software"), to deal in the Software without
6  * restriction, including without limitation the rights to use, copy,
7  * modify, merge, publish, distribute, sublicense, and/or sell copies
8  * of the Software, and to permit persons to whom the Software is
9  * furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be
12  * included in all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
15  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
17  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
18  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
19  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
20  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21  * SOFTWARE.
22  *
23  * Authors:
24  * Evan Nemerson <evan@nemerson.com>
25  */
26 
27 #include <assert.h>
28 
29 #include "internal.h"
30 
189 static int
190 squash_stream_thread_func (SquashStream* stream) {
191  SquashStreamPrivate* priv = stream->priv;
192  SquashOperation operation;
193 
194  mtx_lock (&(priv->io_mtx));
195  priv->result = SQUASH_OK;
196  cnd_signal (&(priv->result_cnd));
197 
198  while ((operation = priv->request) == SQUASH_OPERATION_INVALID) {
199  cnd_wait (&(priv->request_cnd), &(priv->io_mtx));
200  }
201  priv->request = SQUASH_OPERATION_INVALID;
202 
203  priv->result = stream->codec->impl.process_stream (stream, operation);
204  priv->finished = true;
205  cnd_signal (&(priv->result_cnd));
206  mtx_unlock (&(priv->io_mtx));
207 
208  return 0;
209 }
210 
223 squash_stream_yield (SquashStream* stream, SquashStatus status) {
224  SquashStreamPrivate* priv = stream->priv;
225  SquashOperation operation;
226 
227  priv->result = status;
228  cnd_signal (&(priv->result_cnd));
229  mtx_unlock (&(priv->io_mtx));
230  if (status < 0)
231  thrd_exit (status);
232 
233  mtx_lock (&(priv->io_mtx));
234  while ((operation = priv->request) == SQUASH_OPERATION_INVALID) {
235  cnd_wait (&(priv->request_cnd), &(priv->io_mtx));
236  }
237  priv->request = SQUASH_OPERATION_INVALID;
238  return operation;
239 }
240 
241 static SquashStatus
242 squash_stream_send_to_thread (SquashStream* stream, SquashOperation operation) {
243  SquashStreamPrivate* priv = stream->priv;
244  SquashStatus result;
245 
246  priv->request = operation;
247  cnd_signal (&(priv->request_cnd));
248  mtx_unlock (&(priv->io_mtx));
249 
250  mtx_lock (&(priv->io_mtx));
251  while ((result = priv->result) == SQUASH_STATUS_INVALID) {
252  cnd_wait (&(priv->result_cnd), &(priv->io_mtx));
253  }
254  priv->result = SQUASH_STATUS_INVALID;
255 
256  if (priv->finished == true) {
257  mtx_unlock (&(priv->io_mtx));
258  thrd_join (priv->thread, NULL);
259  }
260 
261  return result;
262 }
263 
276 void
277 squash_stream_init (void* stream,
278  SquashCodec* codec,
279  SquashStreamType stream_type,
280  SquashOptions* options,
281  SquashDestroyNotify destroy_notify) {
282  SquashStream* s;
283 
284  assert (stream != NULL);
285 
286  s = (SquashStream*) stream;
287 
288  squash_object_init (stream, false, destroy_notify);
289 
290  s->next_in = NULL;
291  s->avail_in = 0;
292  s->total_in = 0;
293 
294  s->next_out = NULL;
295  s->avail_out = 0;
296  s->total_out = 0;
297 
298  s->codec = codec;
299  s->options = (options != NULL) ? squash_object_ref (options) : NULL;
300  s->stream_type = stream_type;
301  s->state = SQUASH_STREAM_STATE_IDLE;
302 
303  s->user_data = NULL;
304  s->destroy_user_data = NULL;
305 
306  if (SQUASH_UNLIKELY((codec->impl.info & SQUASH_CODEC_INFO_RUN_IN_THREAD) == SQUASH_CODEC_INFO_RUN_IN_THREAD)) {
307  s->priv = malloc (sizeof (SquashStreamPrivate));
308 
309  mtx_init (&(s->priv->io_mtx), mtx_plain);
310  mtx_lock (&(s->priv->io_mtx));
311 
312  s->priv->request = SQUASH_OPERATION_INVALID;
313  cnd_init (&(s->priv->request_cnd));
314 
315  s->priv->result = SQUASH_STATUS_INVALID;
316  cnd_init (&(s->priv->result_cnd));
317 
318  s->priv->finished = false;
319 #if !defined(NDEBUG)
320  int res =
321 #endif
322  thrd_create (&(s->priv->thread), (thrd_start_t) squash_stream_thread_func, s);
323  assert (res == thrd_success);
324 
325  while (s->priv->result == SQUASH_STATUS_INVALID)
326  cnd_wait (&(s->priv->result_cnd), &(s->priv->io_mtx));
327  s->priv->result = SQUASH_STATUS_INVALID;
328  } else {
329  s->priv = NULL;
330  }
331 }
332 
341 void
342 squash_stream_destroy (void* stream) {
343  SquashStream* s;
344 
345  assert (stream != NULL);
346 
347  s = (SquashStream*) stream;
348 
349  if (SQUASH_UNLIKELY(s->priv != NULL)) {
350  SquashStreamPrivate* priv = (SquashStreamPrivate*) s->priv;
351 
352  if (!priv->finished) {
353  squash_stream_send_to_thread (s, SQUASH_OPERATION_TERMINATE);
354  }
355  cnd_destroy (&(priv->request_cnd));
356  cnd_destroy (&(priv->result_cnd));
357  mtx_destroy (&(priv->io_mtx));
358 
359  free (s->priv);
360  }
361 
362  if (s->destroy_user_data != NULL && s->user_data != NULL) {
363  s->destroy_user_data (s->user_data);
364  }
365 
366  if (s->options != NULL) {
367  s->options = squash_object_unref (s->options);
368  }
369 
370  squash_object_destroy (stream);
371 }
372 
381 SquashStream*
382 squash_stream_new (const char* codec,
383  SquashStreamType stream_type,
384  ...) {
385  va_list options_list;
386  SquashStream* stream;
387 
388  va_start (options_list, stream_type);
389  stream = squash_stream_newv (codec, stream_type, options_list);
390  va_end (options_list);
391 
392  return stream;
393 }
394 
403 SquashStream*
404 squash_stream_newv (const char* codec,
405  SquashStreamType stream_type,
406  va_list options) {
407  SquashOptions* opts;
408  SquashCodec* codec_real;
409 
410  codec_real = squash_get_codec (codec);
411  if (codec_real == NULL) {
412  return NULL;
413  }
414 
415  opts = squash_options_newv (codec_real, options);
416  if (opts == NULL) {
417  return NULL;
418  }
419 
420  return squash_codec_create_stream_with_options (codec_real, stream_type, opts);
421 }
422 
432 SquashStream*
433 squash_stream_newa (const char* codec,
434  SquashStreamType stream_type,
435  const char* const* keys,
436  const char* const* values) {
437  return NULL;
438 }
439 
448 SquashStream*
450  SquashStreamType stream_type,
451  SquashOptions* options) {
452  SquashCodec* codec_real;
453 
454  assert (codec != NULL);
455 
456  codec_real = squash_get_codec (codec);
457 
458  return (codec_real != NULL) ?
459  squash_codec_create_stream_with_options (codec_real, stream_type, options) : NULL;
460 }
461 
470 SquashStream*
471 squash_stream_new_codec (SquashCodec* codec,
472  SquashStreamType stream_type,
473  ...) {
474  assert (codec != NULL);
475 
476  va_list options_list;
477  SquashOptions* opts;
478 
479  va_start (options_list, stream_type);
480  opts = squash_options_newv (codec, options_list);
481  va_end (options_list);
482 
483  return squash_codec_create_stream_with_options (codec, stream_type, opts);
484 }
485 
494 SquashStream*
496  SquashStreamType stream_type,
497  SquashOptions* options) {
498  return squash_codec_create_stream_with_options (codec, stream_type, options);
499 }
500 
501 static SquashStatus
502 squash_stream_process_internal (SquashStream* stream, SquashOperation operation) {
503  SquashCodec* codec;
504  SquashCodecImpl* impl = NULL;
505  SquashStatus res = SQUASH_OK;
506  SquashOperation current_operation = SQUASH_OPERATION_PROCESS;
507  SquashStreamPrivate* priv = (SquashStreamPrivate*) stream->priv;
508 
509  assert (stream != NULL);
510  codec = stream->codec;
511  assert (codec != NULL);
512  impl = squash_codec_get_impl (codec);
513  assert (impl != NULL);
514 
515  /* Flush is optional, so return an error if it doesn't exist but
516  flushing was requested. */
517  if (operation == SQUASH_OPERATION_FLUSH && ((impl->info & SQUASH_CODEC_INFO_CAN_FLUSH) == 0)) {
519  }
520 
521  /* In order to take some of the load off of the plugins, there is
522  some extra logic here which may seem a bit disorienting at first
523  glance. Basically, instead of requiring that plugins handle
524  flushing or finishing with arbitrarily large inputs, we first try
525  to process as much input as we can. So, when someone calls
526  squash_stream_flush or squash_stream finish Squash may, depending
527  on the stream state, first call the process function. Note that
528  Squash will not flush a stream before finishing it (unless there
529  is logic to do so in the plugin) as it could cause an increase in
530  the output size (it does with zlib).
531 
532  One interesting consequence of this is that the stream_state
533  field may not be what you're expecting. If an earlier operation
534  returned SQUASH_PROCESSING, stream_type may never transition to
535  the new value. In this case, the stream_type does accurately
536  represent the state of the stream, though it probably isn't wise
537  to depend on that behavior. */
538 
539  if ((operation == SQUASH_OPERATION_PROCESS && stream->state > SQUASH_STREAM_STATE_RUNNING) ||
540  (operation == SQUASH_OPERATION_FLUSH && stream->state > SQUASH_STREAM_STATE_FLUSHING) ||
541  (operation == SQUASH_OPERATION_FINISH && stream->state > SQUASH_STREAM_STATE_FINISHING)) {
542  return squash_error (SQUASH_STATE);
543  }
544 
545  switch (stream->state) {
546  case SQUASH_STREAM_STATE_IDLE:
547  case SQUASH_STREAM_STATE_RUNNING:
548  current_operation = SQUASH_OPERATION_PROCESS;
549  break;
550  case SQUASH_STREAM_STATE_FLUSHING:
551  current_operation = SQUASH_OPERATION_FLUSH;
552  break;
553  case SQUASH_STREAM_STATE_FINISHING:
554  current_operation = SQUASH_OPERATION_FINISH;
555  break;
556  case SQUASH_STREAM_STATE_FINISHED:
557  current_operation = (SQUASH_OPERATION_FINISH + 1);
558  break;
559  }
560 
561  if (current_operation > operation) {
562  return squash_error (SQUASH_STATE);
563  }
564 
565  const size_t avail_in = stream->avail_in;
566  const size_t avail_out = stream->avail_out;
567 
568  /* Some libraries (like zlib) will realize that we're not providing
569  it any room for output and are eager to tell us that we don't
570  have any space instead of decoding the stream enough to know if we
571  actually need that space.
572 
573  In cases where this might be problematic, we provide a
574  single-byte buffer to the plugin instead. If anything actually
575  gets written to it then we'll return an error
576  (SQUASH_BUFFER_FULL), which is non-recoverable.
577 
578  There are a few cases where this might reasonably be a problem:
579 
580  * Decompression streams which know the exact size of the
581  decompressed output, when using codecs which contain extra
582  data at the end, such as a footer or EOS marker.
583 
584  * Compression streams writing to a fixed buffer with a length of
585  less than or equal to max_compressed_size bytes. This is a
586  pretty reasonable thing to do, since you might want to only
587  bother using compression if you can achieve a certain ratio.
588 
589  For consumers which don't satisfy either of these conditions,
590  this code should never be reached. */
591 
592  uint8_t* next_out = NULL;
593  uint8_t output_sbb = 0;
594  if (stream->avail_out == 0) {
595  next_out = stream->next_out;
596  stream->avail_out = 1;
597  stream->next_out = &output_sbb;
598  }
599 
600  while (current_operation <= operation) {
601  if (current_operation == SQUASH_OPERATION_PROCESS) {
602  /* Process */
603  if (stream->avail_in == 0 && stream->state == SQUASH_STREAM_STATE_IDLE) {
604  res = SQUASH_OK;
606  res = squash_stream_send_to_thread (stream, current_operation);
607  } else if (impl->process_stream != NULL) {
608  res = impl->process_stream (stream, current_operation);
609  } else {
610  res = squash_buffer_stream_process ((SquashBufferStream*) stream);
611  }
612  } else if (current_operation == SQUASH_OPERATION_FLUSH) {
613  /* Flush */
614  if (current_operation == operation) {
617  res = squash_stream_send_to_thread (stream, current_operation);
618  } else if (impl->process_stream == NULL) {
620  } else {
621  res = impl->process_stream (stream, current_operation);
622  }
623  } else {
624  /* We aready checked to make sure the stream is flushable if
625  the user called flush directly, so if this code is
626  reached the user didn't call flush, they called finish
627  which attempts to flush internally. Just pretend it
628  worked so we can proceed to finishing. */
629  res = SQUASH_OK;
630  }
631  }
632  } else if (current_operation == SQUASH_OPERATION_FINISH) {
633  /* Finish */
634  if (impl->process_stream != NULL) {
636  if (!(priv->finished))
637  res = squash_stream_send_to_thread (stream, current_operation);
638  } else {
639  res = impl->process_stream (stream, current_operation);
640  }
641  } else {
642  res = squash_buffer_stream_finish ((SquashBufferStream*) stream);
643  }
644 
645  /* Plugins *should* return SQUASH_OK, not SQUASH_END_OF_STREAM,
646  from the finish function, but it's an easy mistake to make
647  (and correct), so... */
648  if (SQUASH_UNLIKELY(res == SQUASH_END_OF_STREAM)) {
649  res = SQUASH_OK;
650  }
651  }
652 
653  /* Check our internal single byte buffer */
654  if (next_out != 0) {
655  if (stream->avail_out == 0) {
657  }
658  }
659 
660  if (res == SQUASH_PROCESSING) {
661  switch (current_operation) {
663  stream->state = SQUASH_STREAM_STATE_RUNNING;
664  break;
666  stream->state = SQUASH_STREAM_STATE_FLUSHING;
667  break;
669  stream->state = SQUASH_STREAM_STATE_FINISHING;
670  break;
672  squash_assert_unreachable ();
673  break;
674  }
675  break;
676  } else if (res == SQUASH_END_OF_STREAM || (current_operation == SQUASH_OPERATION_FINISH && res == SQUASH_OK)) {
677  stream->state = SQUASH_STREAM_STATE_FINISHED;
678  current_operation++;
679  break;
680  } else if (res == SQUASH_OK) {
681  stream->state = SQUASH_STREAM_STATE_IDLE;
682  current_operation++;
683  } else {
684  break;
685  }
686  }
687 
688  if (next_out != 0) {
689  stream->avail_out = 0;
690  stream->next_out = next_out;
691  }
692 
693  stream->total_in += (avail_in - stream->avail_in);
694  stream->total_out += (avail_out - stream->avail_out);
695 
696  return res;
697 }
698 
717 squash_stream_process (SquashStream* stream) {
718  return squash_stream_process_internal (stream, SQUASH_OPERATION_PROCESS);
719 }
720 
732 squash_stream_flush (SquashStream* stream) {
733  return squash_stream_process_internal (stream, SQUASH_OPERATION_FLUSH);
734 }
735 
743 squash_stream_finish (SquashStream* stream) {
744  return squash_stream_process_internal (stream, SQUASH_OPERATION_FINISH);
745 }
746 
SquashStream * squash_codec_create_stream_with_options(SquashCodec *codec, SquashStreamType stream_type, SquashOptions *options)
Create a new stream with existing SquashOptions.
Definition: codec.c:485
Performing the requested operation from the current state is not supported.
Definition: status.h:48
The requested operation is not available.
Definition: status.h:49
SquashStatus squash_stream_finish(SquashStream *stream)
Finish writing to a stream.
Definition: stream.c:743
SquashOperation squash_stream_yield(SquashStream *stream, SquashStatus status)
Yield execution back to the main thread.
Definition: stream.c:223
SquashCodec * squash_get_codec(const char *codec)
Retrieve a SquashCodec.
Definition: context.c:149
SquashStream * squash_stream_new_codec(SquashCodec *codec, SquashStreamType stream_type,...)
Create a new stream using a codec instance.
Definition: stream.c:471
void(* SquashDestroyNotify)(void *data)
Callback to be invoked when information data is no longer needed.
Definition: object.h:43
The data is processed in a background thread.
Definition: codec.h:38
Reached the end of the stream while decoding.
Definition: status.h:39
SquashStream * squash_stream_newa(const char *codec, SquashStreamType stream_type, const char *const *keys, const char *const *values)
Create a new stream with key/value option arrays.
Definition: stream.c:433
Operation partially completed.
Definition: status.h:38
SquashOptions * squash_options_newv(SquashCodec *codec, va_list options)
Create a new group of options from a variadic list.
Definition: options.c:560
SquashStream * squash_stream_new_codec_with_options(SquashCodec *codec, SquashStreamType stream_type, SquashOptions *options)
Create a new stream using codec and options intances.
Definition: stream.c:495
SquashCodecImpl * squash_codec_get_impl(SquashCodec *codec)
Get the codec's function table.
Definition: codec.c:358
Insufficient space in buffer.
Definition: status.h:46
Continue processing the stream normally.
Definition: stream.h:52
void * squash_object_unref(void *obj)
Decrement the reference count on an object.
Definition: object.c:255
SquashStream * squash_stream_newv(const char *codec, SquashStreamType stream_type, va_list options)
Create a new stream with a variadic list of options.
Definition: stream.c:404
SquashStatus squash_stream_flush(SquashStream *stream)
Flush a stream.
Definition: stream.c:732
void squash_stream_init(void *stream, SquashCodec *codec, SquashStreamType stream_type, SquashOptions *options, SquashDestroyNotify destroy_notify)
Initialize a stream.
Definition: stream.c:277
Flushing is supported.
Definition: codec.h:37
SquashStatus
Status codes.
Definition: status.h:36
Flush the stream.
Definition: stream.h:53
SquashStream * squash_stream_new_with_options(const char *codec, SquashStreamType stream_type, SquashOptions *options)
Create a new stream with options.
Definition: stream.c:449
SquashOperation
Operations to perform on a stream.
Definition: stream.h:51
SquashStatus squash_error(SquashStatus status)
Emit an error.
Definition: status.c:173
void squash_stream_destroy(void *stream)
Destroy a stream.
Definition: stream.c:342
void * squash_object_ref(void *obj)
Increment the reference count on an object.
Definition: object.c:206
SquashStream * squash_stream_new(const char *codec, SquashStreamType stream_type,...)
Create a new stream.
Definition: stream.c:382
SquashStatus squash_stream_process(SquashStream *stream)
Process a stream.
Definition: stream.c:717
Finish processing the stream.
Definition: stream.h:54
Operation completed successfully.
Definition: status.h:37
void squash_object_destroy(void *obj)
Destroy an object.
Definition: object.c:325
SquashStreamType
Stream type.
Definition: stream.h:38
void squash_object_init(void *obj, bool is_floating, SquashDestroyNotify destroy_notify)
Initialize a new object.
Definition: object.c:302