【这里是的实现,指的是针对各个数据访问框架的一个基础实现】
目标
- 定义仓储/QueryEntry的基本功能
- 实现仓储的基本功能,以利于复用
- 实现一些常用的功能
- 提供一些便利的功能
目标框架
博主使用的ORM框架是EF6.x,使用MAP来配置模型和数据库之间的映射(因为模型是定义在领域层[CQRS]的),所以不打算使用声明式的Attribute。使用code first来生成数据库。
仓储基本功能
使用一个泛型接口定义了一个仓储需要实现的功能:
public interface IBasicReponsitory{ void Insert(T item); void Delete(T item); void Delete(Guid aggregateId); void Update(T category); T Fetch(Guid aggregateId); T TryFetch(Guid aggregateId); bool Exists(Expression > predict); /*以下是额外的一些接口方法,待商榷*/ IQueryable Query(); Task FetchAsync(Guid id); Task TryFetchAsync(Guid id); Task > RetriveAsync(Expression > predict); }
以及一个QueryEntry需要实现的一些基本功能:
public interface IQueryEntrywhere T : IHasPrimaryKey { T TryFetch(Guid id); Task TryFetchAsync(Guid id); bool Exsits(Guid id); bool Exsits(Expression > selector); }
随着时间的推移,这个接口会发生变更,添加一些更多的功能。同时,并不要求所有的仓储或者QueryEntry继承接口,基本接口的定义和实现仅仅是为了提供便利。
为了方便QueryEntry的实现,提供了一个抽象类:
public abstract class ReponsitoryBasedQueryEntry: IQueryEntry where T : IHasPrimaryKey { public abstract IBasicReponsitory BasicReponsitory { get; } public T TryFetch(Guid id) { return BasicReponsitory.TryFetch(id); } public Task TryFetchAsync(Guid id) { return BasicReponsitory.TryFetchAsync(id); } public bool Exsits(Guid id) { return BasicReponsitory.Query().Any(i => i.Id == id); } public bool Exsits(System.Linq.Expressions.Expression > selector) { return BasicReponsitory.Query().Any(selector); } }
基本实现
public class BasicEntityFrameworkReponsitory: IBasicReponsitory where T : class, IHasPrimaryKey { public BasicEntityFrameworkReponsitory() { Table = StorageConfiguration.DbContext.Set (); } public DbSet Table { get; private set; } public virtual void Insert(T item) { Table.Add(item); } public virtual void Delete(T item) { Table.Remove(item); } public virtual void Delete(Guid aggregateId) { var item = TryFetch(aggregateId); Delete(item); } public virtual void Update(T category) { //do nothing... } public T Fetch(Guid aggregateId) { var item = TryFetch(aggregateId); if (item == null) { throw new AggregateRootNotFoundException(aggregateId); } return item; } public T TryFetch(Guid aggregateId) { var item = Query().FirstOrDefault(i => i.Id == aggregateId); return item; } public virtual IQueryable Query() { return Table; } public async Task FetchAsync(Guid id) { return await Table.FirstAsync(i => i.Id == id); } public async Task TryFetchAsync(Guid id) { return await Table.FirstOrDefaultAsync(i => i.Id == id); } public bool Exists(Expression > predict) { return Table.Any(predict); } public async Task > RetriveAsync(Expression > predict) { return await Table.Where(predict).ToArrayAsync(); } }
这部分代码表达了个人的几个想法:
1.DbContext的生命周期是由Storage自行管理的。当然,可以通过一定的方式指定。2.提供了基础的Query()方法,并设置为虚方法。个人并不抵制使用IQueryable对象进行查询。我觉得可以把使用IQueryable对象进行查询的代码片段看作匿名方法。
常用的功能:软删除
这里是继承基本实现的一个实现:
public class SoftDeleteEntityFrameworkReponsitory: BasicEntityFrameworkReponsitory where T : class, IHasPrimaryKey, ISoftDelete { public override IQueryable Query() { return base.Query().Where(i => !i.IsDeleted); } public override void Delete(T item) { item.IsDeleted = true; Update(item); } }
这里要求仓储对应的模型实现接口ISoftDelete,为软删除提供支持:
public interface ISoftDelete { bool IsDeleted { get; set; } }
同时override了Query()方法,过滤了已删除的内容。
常用的功能:操作跟踪
好吧,这应该是事件溯源干的事,然而事件溯源目前太难了。原理和软删除差不多:
////// 既然开启了跟踪,那么这条数据必然是不能硬删除的 /// ///public class TraceEnabledEntityFrameworkReponsitory : SoftDeleteEntityFrameworkReponsitory where T : class, ISoftDelete, ITrackLastModifying, IHasPrimaryKey { /// /// 开启跟踪时,不允许匿名操作 /// [Dependency] public IDpfbSession Session { get; set; } public override void Update(T item) { if (!Session.UserId.HasValue) throw new Exception(); //todo 提供一个明确的异常 item.LastModifiedBy = Session.UserId.Value; item.LastModifiedTime = DateTime.Now; } public override void Insert(T item) { if (!Session.UserId.HasValue) throw new Exception(); //todo 提供一个明确的异常 item.LastModifiedBy = Session.UserId.Value; item.LastModifiedTime = DateTime.Now; base.Insert(item); } }
不过这个功能的侵入性很强,Storage应该无法感知“用户”这种概念才对。
便利的功能:动态仓储(DynamicReponsitory)
前一篇文章中说过,引入QueryEntry是为了将查询和提交分来,同时为查询操作提供更大的优化空间。在面对数据库的查询中,多表联查是非常普遍的。所以打算针对多表联查提供一个遍历的组件。同时,直接提交语句查询是和数据库相关的,所以要针对不同的数据库提供不同的DynamicReponsitory。
这个组件解决的问题是:直接提交数据库多表联查,查询结果自动转换模型,提供分页支持。
模型转换
先来解决这个比较有趣的问题:将一个DataReader转换为一个值或者一个可枚举的集合。直接上实现代码:
public class DataReaderTransfer: CacheBlock > where T : new() { protected DataReaderTransfer() { } /// /// /// /// /// 编译缓存所使用的key,建议使用查询字符串的hash ///public Func Compile(string[] filedsNameArray, string key) { var outType = typeof (T); var func = ConcurrentDic.GetOrAdd(key, k => { var expressions = new List (); //public T xxx(IDataReader reader){ var param = Expression.Parameter(typeof (IDataReader)); //var instance = new T(); var newExp = Expression.New(outType); var varExp = Expression.Variable(outType, "instance"); var varAssExp = Expression.Assign(varExp, newExp); expressions.Add(varAssExp); var indexProp = typeof (IDataRecord).GetProperties().Last(p => p.Name == "Item"); //表示 reader[""] foreach (var fieldName in filedsNameArray) { //if(xxx)xxx.xxx=null;else xxx.xxx = (xxx)value; var prop = outType.GetProperty(fieldName); if (prop == null) continue; var propExp = Expression.PropertyOrField(varExp, fieldName); Expression value = Expression.MakeIndex(param, indexProp, new Expression[] {Expression.Constant(fieldName)}); //处理空值 var defaultExp = Expression.Default(prop.PropertyType); var isDbNullExp = Expression.TypeIs(value, typeof (DBNull)); //处理枚举以及可空枚举 if (prop.PropertyType.IsEnum || prop.PropertyType.IsGenericType && prop.PropertyType.GetGenericArguments()[0].IsEnum) { value = Expression.Convert(value, typeof (int)); } var convertedExp = Expression.Convert(value, prop.PropertyType); //读取到dbnull的时候,使用一个默认值 var condExp = Expression.IfThenElse(isDbNullExp, Expression.Assign(propExp, defaultExp), Expression.Assign(propExp, convertedExp)); expressions.Add(condExp); } //return instance; var retarget = Expression.Label(outType); var returnExp = Expression.Return(retarget, varExp); expressions.Add(returnExp); //} var relabel = Expression.Label(retarget, Expression.Default(outType)); expressions.Add(relabel); var blockExp = Expression.Block(new[] {varExp}, expressions); var expression = Expression.Lambda >(blockExp, param); return expression.Compile(); }); return func; } public Func Compile(IDataReader reader, string key) { var length = reader.FieldCount; var names = Enumerable.Range(1, length).Select(i => reader.GetName(i - 1)).ToArray(); return Compile(names, key); } public static DataReaderTransfer Instance = new DataReaderTransfer (); //基于反射的映射.... //private static T DynamicMap (IDataReader reader) where T : new() //{ // var instance = new T(); // var count = reader.FieldCount; // while (count-- > 1) // { // object value = reader[count - 1]; // var name = reader.GetName(count - 1); // var prop = typeof (T).GetProperty(name); // if (prop == null) // { // continue; // } // if (value is DBNull) // { // value = null; // } // prop.SetValue(instance, value); // } // return instance; //} }
主要的思想是:在运行期间对一个特定的模型分析一次,分析构造这个模型需要如何访问DataReader,并将访问操作编译为Func<>,通过一个静态字典缓存。下一次构造的时候,直接访问静态字典的Func<>,将DataReader的行转换为模型。这个耗时,大概是硬编码转换的2倍,可以获得比反射好的性能受益。
链式调用以及延时查询
先来看一段调用代码:
[TestClass] public class DynamicReponsitorySamples { static DynamicReponsitorySamples() { //DbContext 配置 StorageConfiguration.Config.Use(new ContainerControlledLifetimeManager()); //无法使用.UseDbContext (),因为无法提供基于HTTP生命周期的管理对象 DynamicReponsitory = new DynamicReponsitory(); } public static DynamicReponsitory DynamicReponsitory { get; set; } [TestMethod] public void Query() { //直接提交一个SQL查询,并映射到实体 var queryText = "SELECT A.*,D.Name AS DepartmentName FROM [ADMIN] A LEFT JOIN [Department] D ON A.DepartmentCode = D.Code"; var query = DynamicReponsitory.Query (queryText); //QueryResult对象遵循延时查询的规则,直到执行枚举才会执行查询操作 query.Foreach(i => Trace.WriteLine(string.Format("{0}\t{1}", i.UserName, i.RealName))); } [TestMethod] public void Count() { //可以直接执行一个COUNT(*)语句 var countQueryText = "SELECT COUNT(*) FROM [ADMIN]"; var countQuery = DynamicReponsitory.Count(countQueryText); Trace.WriteLine("Count:" + countQuery.Value); //可以提供一个SELECT * 语句 countQueryText = "SELECT * FROM [ADMIN]"; //但是需要将重载的第二个参数置为true countQuery = DynamicReponsitory.Count(countQueryText, true); Trace.WriteLine("Count:" + countQuery.Value); //可以对一个query对象执行CmountAmount()扩展方法,但是这个query对象代表的查询必须很普通 var query = DynamicReponsitory.Query ( "SELECT A.*,D.Name AS DepartmentName FROM [ADMIN] A LEFT JOIN [Department] D ON A.DepartmentCode = D.Code"); countQuery = query.CountAmount(); //Value的值同样遵循延时查询的规则,但是重复访问会导致访问内存中缓存的数据 Trace.WriteLine("Count:" + countQuery.Value); Trace.WriteLine("Count:" + countQuery.Value); //如果需要重新查询,可以调用Result.ReQuery()方法 var reQuery = countQuery.ReQuery(); Trace.WriteLine("Count:" + reQuery.Value); } /// /// 分页调用,支持分页信息和分页列表信息的无序访问 /// [TestMethod] public void Page() { //可以对所有的query对象执行Page()扩展方法,从而进行分页 //必须执行要求OrderBy参数的重载,否则会进行内存分页(加载所有行) var query = DynamicReponsitory.Query( "SELECT A.*,D.Name AS DepartmentName FROM [ADMIN] A LEFT JOIN [Department] D ON A.DepartmentCode = D.Code"); var paged = query.Page("ORDER BY DepartmentName", 1, 2); /* * 以下表示支持分页信息和分页列表信息的无序访问 * 如果使用一条sql同时返回这些信息,必须先枚举集合才能继续访问分页信息 */ Trace.WriteLine(string.Format("从{0}行到{1}行,在所有的{2}行中", paged.From, paged.To, paged.Amount)); paged.Foreach(i => Trace.WriteLine(string.Format("{0}\t{1}", i.UserName, i.RealName))); //重复访问会导致访问内存中缓存的数据 var resultArray = paged.Take(1); Trace.WriteLine(string.Format("从{0}行到{1}行,在所有的{2}行中", paged.From, paged.To, paged.Amount)); resultArray.Foreach(i => Trace.WriteLine(string.Format("{0}\t{1}", i.UserName, i.RealName))); } }
链式调用是指,我调用了DynamicReponsitory.Query()方法之后,可以紧接着调用Page()或者Count()方法。那么,显而易见,如果查询不是延时的,很容易导致这个问题:我把服务器上1W条数据全down下来了,然后在内存里面数数或者分页。
为了实现延时查询的目标,引入了这几个类型:public class SqlQueryExpression : ICloneable { public SqlQueryExpression() { Parameters = new List
public class SqlQueryResult { public SqlQueryExpression SqlQueryExpression { get; private set; } public DbConnection DbConnection { get; private set; } public virtual bool Enumerated { get; protected set; } protected IDataReader DataReader; protected void Query() { DataReader = DataReader ?? SqlQueryExpression.Read(DbConnection); } public SqlQueryResult(SqlQueryExpression expression, DbConnection connection) { SqlQueryExpression = expression; DbConnection = connection; } } ////// 代表DynamicReponsitory的查询结果 /// ///代表需要构造的类型 public class SqlQueryResult: SqlQueryResult, IEnumerable where T : new() { public SqlQueryResult(SqlQueryExpression expression, DbConnection connection) : base(expression, connection) { } public IEnumerator GetEnumerator() { //对于一个Query对象,在第一次访问的时候,要求加载所有数据,防止Skip与Take导致数据丢失 if (!Enumerated) { Query(); using (DataReader) { Enumerated = true; var uniqueKey = typeof (T).FullName + SqlQueryExpression.ExpressionText; var func = DataReaderTransfer .Instance.Compile(DataReader, uniqueKey); while (DataReader.Read()) { var item = func(DataReader); ResultSet.Add(item); //yield return item; } } } return ResultSet.GetEnumerator(); //return ((IEnumerable ) ResultSet).GetEnumerator(); } protected List ResultSet = new List (); IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } public SqlQueryResult ReQuery() { var exp = SqlQueryExpression.Clone() as SqlQueryExpression; return new SqlQueryResult (exp, DbConnection); } }
SqlQueryExpression存储了将要执行的查询,而SqlQueryResult则存储了查询返回的结果。同时,SqlQueryExpression实现了拷贝,以支持ReQuery()。
关于具体的分页支持,实际上是使用了一个开窗函数,通过注入子查询的方式,从而支持了各种查询的分页(不奇葩的查询)。为了防止查询被锁住,默认开启了最低的事务隔离级别。
...
【想到什么再补充】