c#中LINQ的基本用法(三)

 

一.并行LINQ

System.Linq名称空间中包含的类ParallelEnumerable可以分解查询的工作,使其分布在多个线程上。
尽管Enumerable类给IEnumerable<T>接口定义了扩展方法,但ParallelEnumerable类的大多数扩展方法是ParallerQuery<TSource>类的扩展。例如,AsParallel()方法,它扩展了IEnumerable<T>接口,返回ParallelQuery<T>类,所以正常的集合类可以以平行方式查询。

1.并行查询

下面演示并行LINQ(Parallel LINQ,PLINQ):

//用随机值填充一个大型的int集合
      // Enumerable.Range(0, arraySize),生成指定范围内的整数的空序列。
      //Select(x => r.Next(140)),用小于140的数填充集合
      static IEnumerable<int> SampleData()
      {
        const int arraySize = 100000000;
        var r = new Random();
        return Enumerable.Range(0, arraySize).Select(x => r.Next(140)).ToList();
      }

      static void IntroParallel()
      {
        var data = SampleData();

        var watch = new Stopwatch();

      //非并行LINQ
        watch.Start();
        var q1 = (from x in data
                  where Math.Log(x) < 4
                  select x).Average();
        watch.Stop();
        Console.WriteLine("sync {0}, result: {1}", watch.ElapsedMilliseconds, q1);

        watch.Reset();
        
        //使用data.AsParallel()进行并行LINQ
        watch.Start();
        var q2 = (from x in data.AsParallel()
              where Math.Log(x) < 4
              select x).Average();
        watch.Stop();
        Console.WriteLine("async {0}, result: {1}", watch.ElapsedMilliseconds, q2);
      }

输出;

发现并行查询时间用的少,在并行查询时CPU利用率达到100%
与LINQ基础(二)(https://www.jb51.net/article/244215.htm)中的LINQ查询一样,编译器会修改语法,以调用AsParallel,Where(),Select(),Average()方法:

  var q2 = data.AsParallel().Where(x => Math.Log(x)<4).Select(x => x).Average();

AsParallel()方法用ParallerEnumerable类定义,以扩展IEnumerable<T>接口,所以可以对简单的数组调用它。AsParallel()方法返回ParallerQuery<T>。因为返回的类型,所以编译器选择的Where()方法是ParallerEnumerable.Where(),而不是Enumerable.Where()。
对于PrarllelEnumerable类,查询是分区的,以便多个线程可以同时处理该查询。集合可以分为多个部分,其中每个部分由不同的线程处理。完成分区的工作后,就需要合并,获得所有部分的总和。

2.分区器

AsParallel()方法不仅扩展了IEnumerable<T>接口,还扩展了Partitioner类。通过它可以影响要创建的分区。
Partitioner类用System,Collection.Concurrent名称空间定义,并且有不同的变体。Create()方法接受实现了IList<T>类的数组或对象,以及Boolean类型的参数,返回一个不同的Partitioner类型。Create()方法有多个重载版本。

    var q2 = (from x in Partitioner.Create(data).AsParallel()
      where Math.Log(x) < 4
        select x).Average();

也可以对AsParallel()方法接着调用WithExecutionMode()和WithDegreeOfParallelism()方法,来影响并行机制。WithExecutionMode()方法可以传递ParallelExecutionMode的一个Default值或者ForceParallelism值。默认情况下,并行LINQ避免使用系统开销很高的并行机制。WithDegreeOfParallelism()方法,可以传递一个整数值,以指定应并行运行的最大任务数。如果查询不应使用全部CPU,这个方法很有用。

3.取消

要取消长时间运行的查询,可以给查询添加WithCancellation()方法,并传递一个CancellationToken令牌作为参数。CancellationToken令牌从CancellationTokenSource类中创建。
举个例子,下面的查询在单独的线程中运行,如果取消了查询,在该线程中捕获一个OperationCanceledException类型的异常。在主线程中,可以调用CancellationTokenSource类的Cancle()方法取消任务。

var data = SampleData();
        var watch = new Stopwatch();

        watch.Start();
        Console.WriteLine("filled array");
        var sum1 = (from x in data
                    where Math.Log(x) < 4
                    select x).Average();
        Console.WriteLine("sync result {0}", sum1);

        var cts = new CancellationTokenSource();
        
        Task.Factory.StartNew(() =>
          {
            try
            {
              var res = (from x in data.AsParallel().WithCancellation(cts.Token)
                         where Math.Log(x) < 4
                         select x).Average();
              Console.WriteLine("query finished, result: {0}", res);
            }
            catch (OperationCanceledException ex)
            {
              Console.WriteLine(ex.Message);
            }
          });
        watch.Stop();
        Console.WriteLine("async {0}, result: {1}", watch.ElapsedMilliseconds, "res");
        Console.WriteLine("query started");
        Console.Write("cancel? ");
        string input = Console.ReadLine();
        if (input.ToLower().Equals("y"))
        {
            cts.Cancel();
            Console.WriteLine("sent a cancel");
        }

        Console.WriteLine("press return to exit");
        Console.ReadLine();

 

二.表达式树

在LINQ To Object 中,扩展方法需要将一个委托类型作为参数,这样就可以将lambda表达式赋予参数。lambda表达式也可以赋予Expression<T>类型的参数,C#编译器根据类型给lambda表达式定义不同的行为。如果类型是Expression<T>,编译器就从lambda表达式中创建一个表达式树,并存储在程序集中。这样就可以在运行期间分析表达式树,并进行优化,以便查询数据源。

  var racers = from r in Formula1.GetChampions()
    where r.Wins > 15 && (r.Country == "Brazil" || r.Country == "Austria")
      select r;

这个查询表达式使用了扩展方法Where(),Select()方法。Enumerable类定义了Where()方法,并将委托类型Func<T,bool>作为参数谓词:

  public static IEnumerable<TSource> Where<TSource>(this IEnumerable<TSource> source,Func<TSource,bool> predicate);

这样,就可以把lambda表达式赋予委托predicate。
除了使用委托之外,编译器还会把表达式树放在程序集中。表达式树可以在运行期间读取。表达式树从派生自抽象基类Expression的类中构建。Expression和Expression<T>不同。继承自Expression类的表达式类有BinaryExpression,ConstantExpression,InvocationExpression等。编译器会从lambda表达式中创建表达式树。
例如,lambda表达式r.Country == "Brazil"使用了ParameterExpression,MemberExpression,ConstantExpression,MethodCallExpression,来创建一个表达式树,并将该树存储在程序集中,之后在运行期间使用这个树,创建一个用于底层数据源的优化查询:

//DisplayTree方法在控制台上图形化的显示表达式树。其中传递一个Expression对象,并根据表达式的类型,把表达式的一些信息写到控制台上
              private static void DisplayTree(int indent, string message, Expression expression)
              {
                  string output = String.Format("{0} {1} ! NodeType: {2}; Expr: {3} ",
                        "".PadLeft(indent, '>'), message, expression.NodeType, expression);

                  indent++;
                  switch (expression.NodeType)
                  {
                      case ExpressionType.Lambda:
                          Console.WriteLine(output);
                          LambdaExpression lambdaExpr = (LambdaExpression)expression;
                          foreach (var parameter in lambdaExpr.Parameters)
                          {
                              DisplayTree(indent, "Parameter", parameter);
                          }
                          DisplayTree(indent, "Body", lambdaExpr.Body);
                          break;
                      case ExpressionType.Constant:
                          ConstantExpression constExpr = (ConstantExpression)expression;
                          Console.WriteLine("{0} Const Value: {1}", output, constExpr.Value);
                          break;
                      case ExpressionType.Parameter:
                          ParameterExpression paramExpr = (ParameterExpression)expression;
                          Console.WriteLine("{0} Param Type: {1}", output, paramExpr.Type.Name);
                          break;
                      case ExpressionType.Equal:
                      case ExpressionType.AndAlso:
                      case ExpressionType.GreaterThan:
                          BinaryExpression binExpr = (BinaryExpression)expression;
                          if (binExpr.Method != null)
                          {
                              Console.WriteLine("{0} Method: {1}", output, binExpr.Method.Name);
                          }
                          else
                          {
                              Console.WriteLine(output);
                          }
                          DisplayTree(indent, "Left", binExpr.Left);
                          DisplayTree(indent, "Right", binExpr.Right);
                          break;
                      case ExpressionType.MemberAccess:
                          MemberExpression memberExpr = (MemberExpression)expression;
                          Console.WriteLine("{0} Member Name: {1}, Type: {2}", output,
                             memberExpr.Member.Name, memberExpr.Type.Name);
                          DisplayTree(indent, "Member Expr", memberExpr.Expression);
                          break;
                      default:
                          Console.WriteLine();
                          Console.WriteLine("{0} {1}", expression.NodeType, expression.Type.Name);
                          break;
                  }
              }


              static void Main()
              {
                  Expression<Func<Racer, bool>> expression = r => r.Country == "Brazil" && r.Wins > 6;

                  DisplayTree(0, "Lambda", expression);

              }

输出:

使用Expression<T>类型的一个例子是ADO.NET EF 和WCF数据服务的客户端提供程序。这些技术用Expression<T>参数定义了扩展方法。这样,访问数据库的LINQ提供程序就可以读取表达式,创建一个运行期间优化的查询,从数据库中获取数据。
后面会单独介绍表达式树的使用。

 

三.LINQ提供程序

.NET包含几个LINQ提供程序。LINQ提供程序为特定的数据源实现了标准的查询操作符。LINQ提供程序也许会实现比LINQ定义的更多扩展方法,但至少要实现标准操作符。LINQ To XML实现了一些专门用于XML的方法,后面会详细介绍。
LINQ提供程序的实现方案是根据名称空间和第一个参数的类型来选择的。实现扩展方法的类的名称空间必须是开放的,否则扩展方法就不在作用域内。在LINQ to Objects中定义的Where()方法的参数和LINQ To Entities中定义的Where()方法的参数不同:
LINQ to Objects中定义的Where()方法:

public static IEnumerable<TSource> Where<TSource>(this IEnumerable<TSource> source,Func<TSource,bool> predicate);

LINQ To Entities中定义的Where()方法:

public static IQueryable<TSource> Where<TSource>(this IQueryable<TSource> source,Expression<Func<TSource,bool>> predicate);

这两个类都在System.Linq的Syste,.Core程序集中实现。无论是用Func<TSource,bool>传递参数,还是用Expression<Func<TSource,bool>>参数传递,lambda表达式都相同。只是编译器的行为不同,它根据source参数来选择。编译器根据其参数选择最匹配的方法。在ADO.NET EF中定义的ObjectContext类CreateQuery<T>()方法返回一个实现了IQueryable<TSource>接口的ObjectQuery<T>对象,因此EF使用Querable类的Where()方法。

关于c#中LINQ的基本用法的文章就介绍到这了。希望对大家的学习有所帮助,也希望大家多多支持编程宝库

mongodb的数据插入速度是其一个亮点,同样的10000条数据,插入的速度要比Mysql和sqlserver都要快,当然这也是要看使用者怎么个使用法,你代码如果10000次写入使用10000次连接 ...