Bug 28792 - System.Threading.Tasks.Dataflow sample - Producer-Consumer Dataflow Pattern
Summary: System.Threading.Tasks.Dataflow sample - Producer-Consumer Dataflow Pattern
Status: RESOLVED ANSWERED
Alias: None
Product: Runtime
Classification: Mono
Component: General ()
Version: unspecified
Hardware: Other Other
: --- normal
Target Milestone: ---
Assignee: Bugzilla
URL:
Depends on:
Blocks:
 
Reported: 2015-04-05 14:40 UTC by Alexandre Faria
Modified: 2015-06-17 00:53 UTC (History)
4 users (show)

Tags:
Is this bug a regression?: ---
Last known good build:

Notice (2018-05-24): bugzilla.xamarin.com is now in read-only mode.

Please join us on Visual Studio Developer Community and in the Xamarin and Mono organizations on GitHub to continue tracking issues. Bugzilla will remain available for reference in read-only mode. We will continue to work on open Bugzilla bugs, copy them to the new locations as needed for follow-up, and add the new items under Related Links.

Our sincere thanks to everyone who has contributed on this bug tracker over the years. Thanks also for your understanding as we make these adjustments and improvements for the future.


Please create a new report on GitHub or Developer Community with your current version information, steps to reproduce, and relevant error messages or log files if you are hitting an issue that looks similar to this resolved bug and you do not yet see a matching new report.

Related Links:
Status:
RESOLVED ANSWERED

Description Alexandre Faria 2015-04-05 14:40:58 UTC
Description of Problem:
It crashes.

Steps to reproduce the problem:
Just run the sample:
from https://msdn.microsoft.com/en-us/library/hh228601(v=vs.110).aspx

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates a basic producer and consumer pattern that uses dataflow. 
class DataflowProducerConsumer
{
   // Demonstrates the production end of the producer and consumer pattern. 
   static void Produce(ITargetBlock<byte[]> target)
   {
      // Create a Random object to generate random data.
      Random rand = new Random();

      // In a loop, fill a buffer with random data and 
      // post the buffer to the target block. 
      for (int i = 0; i < 100; i++)
      {
         // Create an array to hold random byte data. 
         byte[] buffer = new byte[1024];

         // Fill the buffer with random bytes.
         rand.NextBytes(buffer);

         // Post the result to the message block.
         target.Post(buffer);
      }

      // Set the target to the completed state to signal to the consumer 
      // that no more data will be available.
      target.Complete();
   }

   // Demonstrates the consumption end of the producer and consumer pattern. 
   static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
   {
      // Initialize a counter to track the number of bytes that are processed. 
      int bytesProcessed = 0;

      // Read from the source buffer until the source buffer has no  
      // available output data. 
      while (await source.OutputAvailableAsync())
      {
         byte[] data = source.Receive();

         // Increment the count of bytes received.
         bytesProcessed += data.Length;
      }

      return bytesProcessed;
   }

   static void Main(string[] args)
   {
      // Create a BufferBlock<byte[]> object. This object serves as the  
      // target block for the producer and the source block for the consumer. 
      var buffer = new BufferBlock<byte[]>();

      // Start the consumer. The Consume method runs asynchronously.  
      var consumer = ConsumeAsync(buffer);

      // Post source data to the dataflow block.
      Produce(buffer);

      // Wait for the consumer to process all data.
      consumer.Wait();

      // Print the count of bytes processed to the console.
      Console.WriteLine("Processed {0} bytes.", consumer.Result);
   }
}

/* Output:
Processed 102400 bytes.
*/

Actual Results:
Unhandled Exception:
System.AggregateException: One or more errors occurred. ---> System.Threading.LockRecursionException: The calling thread already holds the lock.
  at System.Threading.SpinLock.ContinueTryEnterWithThreadTracking (Int32 millisecondsTimeout, UInt32 startTime, System.Boolean& lockTaken) <0x7f81dbac61d0 + 0x0010a> in <filename unknown>:0 
  at System.Threading.SpinLock.ContinueTryEnter (Int32 millisecondsTimeout, System.Boolean& lockTaken) <0x7f81dbac5da0 + 0x00080> in <filename unknown>:0 
  at System.Threading.SpinLock.Enter (System.Boolean& lockTaken) <0x7f81dbac5b70 + 0x00081> in <filename unknown>:0 
  at System.Threading.Tasks.Dataflow.OutgoingQueue`1[T].TryReceive (System.Predicate`1 filter, System.Threading.Tasks.Dataflow.T& item) <0x405f8da0 + 0x0008d> in <filename unknown>:0 
  at System.Threading.Tasks.Dataflow.BufferBlock`1[T].TryReceive (System.Predicate`1 filter, System.Threading.Tasks.Dataflow.T& item) <0x405f8d50 + 0x0002f> in <filename unknown>:0 
  at System.Threading.Tasks.Dataflow.DataflowBlock.Receive[TOutput] (ISourceBlock`1 source, TimeSpan timeout, CancellationToken cancellationToken) <0x405f88f0 + 0x00145> in <filename unknown>:0 
  at System.Threading.Tasks.Dataflow.DataflowBlock.Receive[TOutput] (ISourceBlock`1 source) <0x405f8850 + 0x0005b> in <filename unknown>:0 
  at DataflowProducerConsumer+<ConsumeAsync>c__async0.MoveNext () <0x405f3920 + 0x00077> in <filename unknown>:0 
  --- End of inner exception stack trace ---
  at System.Threading.Tasks.Task.ThrowIfExceptional (Boolean includeTaskCanceledExceptions) <0x7f81dbadc1c0 + 0x00037> in <filename unknown>:0 
  at System.Threading.Tasks.Task.Wait (Int32 millisecondsTimeout, CancellationToken cancellationToken) <0x7f81dbadd4e0 + 0x00097> in <filename unknown>:0 
  at System.Threading.Tasks.Task.Wait () <0x7f81dbadd3b0 + 0x00028> in <filename unknown>:0 
  at DataflowProducerConsumer.Main (System.String[] args) <0x405efd50 + 0x00068> in <filename unknown>:0 
---> (Inner Exception #0) System.Threading.LockRecursionException: The calling thread already holds the lock.
  at System.Threading.SpinLock.ContinueTryEnterWithThreadTracking (Int32 millisecondsTimeout, UInt32 startTime, System.Boolean& lockTaken) <0x7f81dbac61d0 + 0x0010a> in <filename unknown>:0 
  at System.Threading.SpinLock.ContinueTryEnter (Int32 millisecondsTimeout, System.Boolean& lockTaken) <0x7f81dbac5da0 + 0x00080> in <filename unknown>:0 
  at System.Threading.SpinLock.Enter (System.Boolean& lockTaken) <0x7f81dbac5b70 + 0x00081> in <filename unknown>:0 
  at System.Threading.Tasks.Dataflow.OutgoingQueue`1[T].TryReceive (System.Predicate`1 filter, System.Threading.Tasks.Dataflow.T& item) <0x405f8da0 + 0x0008d> in <filename unknown>:0 
  at System.Threading.Tasks.Dataflow.BufferBlock`1[T].TryReceive (System.Predicate`1 filter, System.Threading.Tasks.Dataflow.T& item) <0x405f8d50 + 0x0002f> in <filename unknown>:0 
  at System.Threading.Tasks.Dataflow.DataflowBlock.Receive[TOutput] (ISourceBlock`1 source, TimeSpan timeout, CancellationToken cancellationToken) <0x405f88f0 + 0x00145> in <filename unknown>:0 
  at System.Threading.Tasks.Dataflow.DataflowBlock.Receive[TOutput] (ISourceBlock`1 source) <0x405f8850 + 0x0005b> in <filename unknown>:0 
  at DataflowProducerConsumer+<ConsumeAsync>c__async0.MoveNext () <0x405f3920 + 0x00077> in <filename unknown>:0 <---

[ERROR] FATAL UNHANDLED EXCEPTION: System.AggregateException: One or more errors occurred. ---> System.Threading.LockRecursionException: The calling thread already holds the lock.
  at System.Threading.SpinLock.ContinueTryEnterWithThreadTracking (Int32 millisecondsTimeout, UInt32 startTime, System.Boolean& lockTaken) <0x7f81dbac61d0 + 0x0010a> in <filename unknown>:0 
  at System.Threading.SpinLock.ContinueTryEnter (Int32 millisecondsTimeout, System.Boolean& lockTaken) <0x7f81dbac5da0 + 0x00080> in <filename unknown>:0 
  at System.Threading.SpinLock.Enter (System.Boolean& lockTaken) <0x7f81dbac5b70 + 0x00081> in <filename unknown>:0 
  at System.Threading.Tasks.Dataflow.OutgoingQueue`1[T].TryReceive (System.Predicate`1 filter, System.Threading.Tasks.Dataflow.T& item) <0x405f8da0 + 0x0008d> in <filename unknown>:0 
  at System.Threading.Tasks.Dataflow.BufferBlock`1[T].TryReceive (System.Predicate`1 filter, System.Threading.Tasks.Dataflow.T& item) <0x405f8d50 + 0x0002f> in <filename unknown>:0 
  at System.Threading.Tasks.Dataflow.DataflowBlock.Receive[TOutput] (ISourceBlock`1 source, TimeSpan timeout, CancellationToken cancellationToken) <0x405f88f0 + 0x00145> in <filename unknown>:0 
  at System.Threading.Tasks.Dataflow.DataflowBlock.Receive[TOutput] (ISourceBlock`1 source) <0x405f8850 + 0x0005b> in <filename unknown>:0 
  at DataflowProducerConsumer+<ConsumeAsync>c__async0.MoveNext () <0x405f3920 + 0x00077> in <filename unknown>:0 
  --- End of inner exception stack trace ---
  at System.Threading.Tasks.Task.ThrowIfExceptional (Boolean includeTaskCanceledExceptions) <0x7f81dbadc1c0 + 0x00037> in <filename unknown>:0 
  at System.Threading.Tasks.Task.Wait (Int32 millisecondsTimeout, CancellationToken cancellationToken) <0x7f81dbadd4e0 + 0x00097> in <filename unknown>:0 
  at System.Threading.Tasks.Task.Wait () <0x7f81dbadd3b0 + 0x00028> in <filename unknown>:0 
  at DataflowProducerConsumer.Main (System.String[] args) <0x405efd50 + 0x00068> in <filename unknown>:0 
---> (Inner Exception #0) System.Threading.LockRecursionException: The calling thread already holds the lock.
  at System.Threading.SpinLock.ContinueTryEnterWithThreadTracking (Int32 millisecondsTimeout, UInt32 startTime, System.Boolean& lockTaken) <0x7f81dbac61d0 + 0x0010a> in <filename unknown>:0 
  at System.Threading.SpinLock.ContinueTryEnter (Int32 millisecondsTimeout, System.Boolean& lockTaken) <0x7f81dbac5da0 + 0x00080> in <filename unknown>:0 
  at System.Threading.SpinLock.Enter (System.Boolean& lockTaken) <0x7f81dbac5b70 + 0x00081> in <filename unknown>:0 
  at System.Threading.Tasks.Dataflow.OutgoingQueue`1[T].TryReceive (System.Predicate`1 filter, System.Threading.Tasks.Dataflow.T& item) <0x405f8da0 + 0x0008d> in <filename unknown>:0 
  at System.Threading.Tasks.Dataflow.BufferBlock`1[T].TryReceive (System.Predicate`1 filter, System.Threading.Tasks.Dataflow.T& item) <0x405f8d50 + 0x0002f> in <filename unknown>:0 
  at System.Threading.Tasks.Dataflow.DataflowBlock.Receive[TOutput] (ISourceBlock`1 source, TimeSpan timeout, CancellationToken cancellationToken) <0x405f88f0 + 0x00145> in <filename unknown>:0 
  at System.Threading.Tasks.Dataflow.DataflowBlock.Receive[TOutput] (ISourceBlock`1 source) <0x405f8850 + 0x0005b> in <filename unknown>:0 
  at DataflowProducerConsumer+<ConsumeAsync>c__async0.MoveNext () <0x405f3920 + 0x00077> in <filename unknown>:0 <---


Expected Results:
Processed 102400 bytes.

How often does this happen? 
Always

Additional Information:
Mono JIT compiler version 4.1.0 (master/199cc80 Dom Abr  5 11:19:29 WEST 2015)
Copyright (C) 2002-2014 Novell, Inc, Xamarin Inc and Contributors. www.mono-project.com
	TLS:           __thread
	SIGSEGV:       altstack
	Notifications: epoll
	Architecture:  amd64
	Disabled:      none
	Misc:          softdebug 
	LLVM:          supported, not enabled.
	GC:            sgen
Comment 1 Alexander Kyte 2015-04-06 15:25:32 UTC
I can reproduce this. It looks like a lock ordering problem, let me look into it.
Comment 2 Alexander Kyte 2015-04-07 13:47:01 UTC
We intend to replace the dataflow implementation with the one released by Microsoft in corefx. We expect this incorrect queue reentrancy to go away, as it's part of our implementation.