博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
CQRS学习——Storage实现(EF+Code First+DynamicReponsitory)[其四]
阅读量:5824 次
发布时间:2019-06-18

本文共 17089 字,大约阅读时间需要 56 分钟。

【这里是的实现,指的是针对各个数据访问框架的一个基础实现】

目标

  •   定义仓储/QueryEntry的基本功能
  •   实现仓储的基本功能,以利于复用
  •   实现一些常用的功能
  •   提供一些便利的功能

目标框架

博主使用的ORM框架是EF6.x,使用MAP来配置模型和数据库之间的映射(因为模型是定义在领域层[CQRS]的),所以不打算使用声明式的Attribute。使用code first来生成数据库。

仓储基本功能

使用一个泛型接口定义了一个仓储需要实现的功能:

public interface IBasicReponsitory
{ void Insert(T item); void Delete(T item); void Delete(Guid aggregateId); void Update(T category); T Fetch(Guid aggregateId); T TryFetch(Guid aggregateId); bool Exists(Expression
> predict); /*以下是额外的一些接口方法,待商榷*/ IQueryable
Query(); Task
FetchAsync(Guid id); Task
TryFetchAsync(Guid id); Task
> RetriveAsync(Expression
> predict); }
View Code

以及一个QueryEntry需要实现的一些基本功能:

public interface IQueryEntry
where T : IHasPrimaryKey { T TryFetch(Guid id); Task
TryFetchAsync(Guid id); bool Exsits(Guid id); bool Exsits(Expression
> selector); }
View Code

随着时间的推移,这个接口会发生变更,添加一些更多的功能。同时,并不要求所有的仓储或者QueryEntry继承接口,基本接口的定义和实现仅仅是为了提供便利。

为了方便QueryEntry的实现,提供了一个抽象类:

public abstract class ReponsitoryBasedQueryEntry
: IQueryEntry
where T : IHasPrimaryKey { public abstract IBasicReponsitory
BasicReponsitory { get; } public T TryFetch(Guid id) { return BasicReponsitory.TryFetch(id); } public Task
TryFetchAsync(Guid id) { return BasicReponsitory.TryFetchAsync(id); } public bool Exsits(Guid id) { return BasicReponsitory.Query().Any(i => i.Id == id); } public bool Exsits(System.Linq.Expressions.Expression
> selector) { return BasicReponsitory.Query().Any(selector); } }
View Code

基本实现

public class BasicEntityFrameworkReponsitory
: IBasicReponsitory
where T : class, IHasPrimaryKey { public BasicEntityFrameworkReponsitory() { Table = StorageConfiguration.DbContext.Set
(); } public DbSet
Table { get; private set; } public virtual void Insert(T item) { Table.Add(item); } public virtual void Delete(T item) { Table.Remove(item); } public virtual void Delete(Guid aggregateId) { var item = TryFetch(aggregateId); Delete(item); } public virtual void Update(T category) { //do nothing... } public T Fetch(Guid aggregateId) { var item = TryFetch(aggregateId); if (item == null) { throw new AggregateRootNotFoundException(aggregateId); } return item; } public T TryFetch(Guid aggregateId) { var item = Query().FirstOrDefault(i => i.Id == aggregateId); return item; } public virtual IQueryable
Query() { return Table; } public async Task
FetchAsync(Guid id) { return await Table.FirstAsync(i => i.Id == id); } public async Task
TryFetchAsync(Guid id) { return await Table.FirstOrDefaultAsync(i => i.Id == id); } public bool Exists(Expression
> predict) { return Table.Any(predict); } public async Task
> RetriveAsync(Expression
> predict) { return await Table.Where(predict).ToArrayAsync(); } }
View Code

这部分代码表达了个人的几个想法:

1.DbContext的生命周期是由Storage自行管理的。当然,可以通过一定的方式指定。

2.提供了基础的Query()方法,并设置为虚方法。个人并不抵制使用IQueryable对象进行查询。我觉得可以把使用IQueryable对象进行查询的代码片段看作匿名方法。

 常用的功能:软删除

这里是继承基本实现的一个实现:

public class SoftDeleteEntityFrameworkReponsitory
: BasicEntityFrameworkReponsitory
where T : class, IHasPrimaryKey, ISoftDelete { public override IQueryable
Query() { return base.Query().Where(i => !i.IsDeleted); } public override void Delete(T item) { item.IsDeleted = true; Update(item); } }
View Code

这里要求仓储对应的模型实现接口ISoftDelete,为软删除提供支持:

public interface ISoftDelete    {        bool IsDeleted { get; set; }    }

同时override了Query()方法,过滤了已删除的内容。

常用的功能:操作跟踪

好吧,这应该是事件溯源干的事,然而事件溯源目前太难了。原理和软删除差不多:

///     /// 既然开启了跟踪,那么这条数据必然是不能硬删除的    ///     /// 
public class TraceEnabledEntityFrameworkReponsitory
: SoftDeleteEntityFrameworkReponsitory
where T : class, ISoftDelete, ITrackLastModifying, IHasPrimaryKey { ///
/// 开启跟踪时,不允许匿名操作 /// [Dependency] public IDpfbSession Session { get; set; } public override void Update(T item) { if (!Session.UserId.HasValue) throw new Exception(); //todo 提供一个明确的异常 item.LastModifiedBy = Session.UserId.Value; item.LastModifiedTime = DateTime.Now; } public override void Insert(T item) { if (!Session.UserId.HasValue) throw new Exception(); //todo 提供一个明确的异常 item.LastModifiedBy = Session.UserId.Value; item.LastModifiedTime = DateTime.Now; base.Insert(item); } }
View Code

不过这个功能的侵入性很强,Storage应该无法感知“用户”这种概念才对。

便利的功能:动态仓储(DynamicReponsitory)

前一篇文章中说过,引入QueryEntry是为了将查询和提交分来,同时为查询操作提供更大的优化空间。在面对数据库的查询中,多表联查是非常普遍的。所以打算针对多表联查提供一个遍历的组件。同时,直接提交语句查询是和数据库相关的,所以要针对不同的数据库提供不同的DynamicReponsitory。

这个组件解决的问题是:直接提交数据库多表联查,查询结果自动转换模型,提供分页支持。

模型转换

先来解决这个比较有趣的问题:将一个DataReader转换为一个值或者一个可枚举的集合。直接上实现代码:

public class DataReaderTransfer
: CacheBlock
> where T : new() { protected DataReaderTransfer() { } ///
/// /// ///
///
编译缓存所使用的key,建议使用查询字符串的hash ///
public Func
Compile(string[] filedsNameArray, string key) { var outType = typeof (T); var func = ConcurrentDic.GetOrAdd(key, k => { var expressions = new List
(); //public T xxx(IDataReader reader){
var param = Expression.Parameter(typeof (IDataReader)); //var instance = new T(); var newExp = Expression.New(outType); var varExp = Expression.Variable(outType, "instance"); var varAssExp = Expression.Assign(varExp, newExp); expressions.Add(varAssExp); var indexProp = typeof (IDataRecord).GetProperties().Last(p => p.Name == "Item"); //表示 reader[""] foreach (var fieldName in filedsNameArray) { //if(xxx)xxx.xxx=null;else xxx.xxx = (xxx)value; var prop = outType.GetProperty(fieldName); if (prop == null) continue; var propExp = Expression.PropertyOrField(varExp, fieldName); Expression value = Expression.MakeIndex(param, indexProp, new Expression[] {Expression.Constant(fieldName)}); //处理空值 var defaultExp = Expression.Default(prop.PropertyType); var isDbNullExp = Expression.TypeIs(value, typeof (DBNull)); //处理枚举以及可空枚举 if (prop.PropertyType.IsEnum || prop.PropertyType.IsGenericType && prop.PropertyType.GetGenericArguments()[0].IsEnum) { value = Expression.Convert(value, typeof (int)); } var convertedExp = Expression.Convert(value, prop.PropertyType); //读取到dbnull的时候,使用一个默认值 var condExp = Expression.IfThenElse(isDbNullExp, Expression.Assign(propExp, defaultExp), Expression.Assign(propExp, convertedExp)); expressions.Add(condExp); } //return instance; var retarget = Expression.Label(outType); var returnExp = Expression.Return(retarget, varExp); expressions.Add(returnExp); //} var relabel = Expression.Label(retarget, Expression.Default(outType)); expressions.Add(relabel); var blockExp = Expression.Block(new[] {varExp}, expressions); var expression = Expression.Lambda
>(blockExp, param); return expression.Compile(); }); return func; } public Func
Compile(IDataReader reader, string key) { var length = reader.FieldCount; var names = Enumerable.Range(1, length).Select(i => reader.GetName(i - 1)).ToArray(); return Compile(names, key); } public static DataReaderTransfer
Instance = new DataReaderTransfer
(); //基于反射的映射.... //private static T DynamicMap
(IDataReader reader) where T : new() //{ // var instance = new T(); // var count = reader.FieldCount; // while (count-- > 1) // { // object value = reader[count - 1]; // var name = reader.GetName(count - 1); // var prop = typeof (T).GetProperty(name); // if (prop == null) // { // continue; // } // if (value is DBNull) // { // value = null; // } // prop.SetValue(instance, value); // } // return instance; //} }
View Code

主要的思想是:在运行期间对一个特定的模型分析一次,分析构造这个模型需要如何访问DataReader,并将访问操作编译为Func<>,通过一个静态字典缓存。下一次构造的时候,直接访问静态字典的Func<>,将DataReader的行转换为模型。这个耗时,大概是硬编码转换的2倍,可以获得比反射好的性能受益。

链式调用以及延时查询

先来看一段调用代码:

[TestClass]    public class DynamicReponsitorySamples    {        static DynamicReponsitorySamples()        {            //DbContext 配置            StorageConfiguration.Config.Use
(new ContainerControlledLifetimeManager()); //无法使用.UseDbContext
(),因为无法提供基于HTTP生命周期的管理对象 DynamicReponsitory = new DynamicReponsitory(); } public static DynamicReponsitory DynamicReponsitory { get; set; } [TestMethod] public void Query() { //直接提交一个SQL查询,并映射到实体 var queryText = "SELECT A.*,D.Name AS DepartmentName FROM [ADMIN] A LEFT JOIN [Department] D ON A.DepartmentCode = D.Code"; var query = DynamicReponsitory.Query
(queryText); //QueryResult对象遵循延时查询的规则,直到执行枚举才会执行查询操作 query.Foreach(i => Trace.WriteLine(string.Format("{0}\t{1}", i.UserName, i.RealName))); } [TestMethod] public void Count() { //可以直接执行一个COUNT(*)语句 var countQueryText = "SELECT COUNT(*) FROM [ADMIN]"; var countQuery = DynamicReponsitory.Count(countQueryText); Trace.WriteLine("Count:" + countQuery.Value); //可以提供一个SELECT * 语句 countQueryText = "SELECT * FROM [ADMIN]"; //但是需要将重载的第二个参数置为true countQuery = DynamicReponsitory.Count(countQueryText, true); Trace.WriteLine("Count:" + countQuery.Value); //可以对一个query对象执行CmountAmount()扩展方法,但是这个query对象代表的查询必须很普通 var query = DynamicReponsitory.Query
( "SELECT A.*,D.Name AS DepartmentName FROM [ADMIN] A LEFT JOIN [Department] D ON A.DepartmentCode = D.Code"); countQuery = query.CountAmount(); //Value的值同样遵循延时查询的规则,但是重复访问会导致访问内存中缓存的数据 Trace.WriteLine("Count:" + countQuery.Value); Trace.WriteLine("Count:" + countQuery.Value); //如果需要重新查询,可以调用Result.ReQuery()方法 var reQuery = countQuery.ReQuery(); Trace.WriteLine("Count:" + reQuery.Value); } ///
/// 分页调用,支持分页信息和分页列表信息的无序访问 /// [TestMethod] public void Page() { //可以对所有的query对象执行Page()扩展方法,从而进行分页 //必须执行要求OrderBy参数的重载,否则会进行内存分页(加载所有行) var query = DynamicReponsitory.Query
( "SELECT A.*,D.Name AS DepartmentName FROM [ADMIN] A LEFT JOIN [Department] D ON A.DepartmentCode = D.Code"); var paged = query.Page("ORDER BY DepartmentName", 1, 2); /* * 以下表示支持分页信息和分页列表信息的无序访问 * 如果使用一条sql同时返回这些信息,必须先枚举集合才能继续访问分页信息 */ Trace.WriteLine(string.Format("从{0}行到{1}行,在所有的{2}行中", paged.From, paged.To, paged.Amount)); paged.Foreach(i => Trace.WriteLine(string.Format("{0}\t{1}", i.UserName, i.RealName))); //重复访问会导致访问内存中缓存的数据 var resultArray = paged.Take(1); Trace.WriteLine(string.Format("从{0}行到{1}行,在所有的{2}行中", paged.From, paged.To, paged.Amount)); resultArray.Foreach(i => Trace.WriteLine(string.Format("{0}\t{1}", i.UserName, i.RealName))); } }
View Code

链式调用是指,我调用了DynamicReponsitory.Query()方法之后,可以紧接着调用Page()或者Count()方法。那么,显而易见,如果查询不是延时的,很容易导致这个问题:我把服务器上1W条数据全down下来了,然后在内存里面数数或者分页。

为了实现延时查询的目标,引入了这几个类型:

public class SqlQueryExpression : ICloneable    {        public SqlQueryExpression()        {            Parameters = new List();        }        public SqlQueryExpression(string expressionText) : this()        {            ExpressionText = expressionText;        }        public string ExpressionText { get; set; }        public IList Parameters { get; private set; }        public IDataReader Read(DbConnection connection)        {            var parameters = Parameters.ToArray();            if (connection.State != ConnectionState.Open)                connection.Open();            //查询,开启最低级别的事务隔离,防止默认事务产生争用锁            var trans = connection.BeginTransaction(IsolationLevel.ReadUncommitted);            var command = connection.CreateCommand();            command.CommandType = CommandType.Text;            command.CommandText = ExpressionText;            command.Parameters.AddRange(parameters);            command.Transaction = trans;            return command.ExecuteReader(CommandBehavior.CloseConnection);        }        public virtual object Clone()        {            //实现拷贝接口            var cloned = new SqlQueryExpression(ExpressionText);            Parameters.Foreach(i =>            {                var parameter = (SqlParameter) i;                var clonedParameter = new SqlParameter(parameter.ParameterName, parameter.Value);                clonedParameter.Direction = parameter.Direction;                cloned.Parameters.Add(clonedParameter);            });            return cloned;        }    }
View Code
public class SqlQueryResult    {        public SqlQueryExpression SqlQueryExpression { get; private set; }        public DbConnection DbConnection { get; private set; }        public virtual bool Enumerated { get; protected set; }        protected IDataReader DataReader;        protected void Query()        {            DataReader = DataReader ?? SqlQueryExpression.Read(DbConnection);        }        public SqlQueryResult(SqlQueryExpression expression, DbConnection connection)        {            SqlQueryExpression = expression;            DbConnection = connection;        }    }    ///     /// 代表DynamicReponsitory的查询结果    ///     /// 
代表需要构造的类型
public class SqlQueryResult
: SqlQueryResult, IEnumerable
where T : new() { public SqlQueryResult(SqlQueryExpression expression, DbConnection connection) : base(expression, connection) { } public IEnumerator
GetEnumerator() { //对于一个Query对象,在第一次访问的时候,要求加载所有数据,防止Skip与Take导致数据丢失 if (!Enumerated) { Query(); using (DataReader) { Enumerated = true; var uniqueKey = typeof (T).FullName + SqlQueryExpression.ExpressionText; var func = DataReaderTransfer
.Instance.Compile(DataReader, uniqueKey); while (DataReader.Read()) { var item = func(DataReader); ResultSet.Add(item); //yield return item; } } } return ResultSet.GetEnumerator(); //return ((IEnumerable
) ResultSet).GetEnumerator(); } protected List
ResultSet = new List
(); IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } public SqlQueryResult
ReQuery() { var exp = SqlQueryExpression.Clone() as SqlQueryExpression; return new SqlQueryResult
(exp, DbConnection); } }
View Code

SqlQueryExpression存储了将要执行的查询,而SqlQueryResult则存储了查询返回的结果。同时,SqlQueryExpression实现了拷贝,以支持ReQuery()。

关于具体的分页支持,实际上是使用了一个开窗函数,通过注入子查询的方式,从而支持了各种查询的分页(不奇葩的查询)。

为了防止查询被锁住,默认开启了最低的事务隔离级别。

...

【想到什么再补充】

 

转载于:https://www.cnblogs.com/lightluomeng/p/4922671.html

你可能感兴趣的文章
html5纲要,细谈HTML 5新增的元素
查看>>
Android应用集成支付宝接口的简化
查看>>
[分享]Ubuntu12.04安装基础教程(图文)
查看>>
django 目录结构修改
查看>>
win8 关闭防火墙
查看>>
CSS——(2)与标准流盒模型
查看>>
MYSQL 基本SQL语句
查看>>
C#中的Marshal
查看>>
linux命令:ls
查看>>
Using RequireJS in AngularJS Applications
查看>>
hdu 2444(二分图最大匹配)
查看>>
【SAP HANA】关于SAP HANA中带层次结构的计算视图Cacultation View创建、激活状况下在系统中生成对象的研究...
查看>>
DevOps 前世今生 | mPaaS 线上直播 CodeHub #1 回顾
查看>>
iOS 解决UITabelView刷新闪动
查看>>
CentOS 7 装vim遇到的问题和解决方法
查看>>
JavaScript基础教程1-20160612
查看>>
ios xmpp demo
查看>>
python matplotlib 中文显示参数设置
查看>>
【ros】Create a ROS package:package dependencies报错
查看>>
通过容器编排和服务网格来改进Java微服务的可测性
查看>>