select count(*) from article
select * from article order by publish_time desc limit 0,20
这个操作是一般我们的常规分页操作,先进行total然后进行分页获取,这种做法的好处是支持任意规则的分页,缺点就是需要查询两次,一次count一次limit当然后期数据量实在太大可以只需要第一次count,但是也有一个问题就是如果数据量一直在变化会出现下一次分页中还会有上一次的部分数据,因为数据在不断地新增,你的分页没跟上发布的速度那么就会有这个情况发送.
瀑布流分页
除了上述常规分页操作外,我们针对特定顺序的分页也可以进行特定的分页方式来实现高性能,因为基于大前提我们是大数量下的瀑布流,我们的文章假设是以雪花id作为主键,那么我们的分页可以这么写
select * from article where id
首先我们来分析一下,这个语句是利用了插入的数据分布是顺序和你需要查询的排序一直来实现的,又因为id不会重复并且雪花id的顺序和时间是一致的都是同向的所以可以利用这种方式来进行排序,limit每次不需要跳过任何数目,直接获取需要的数目即可,只需要传递上一次的查询结果的id即可,这个方式弥补了上述常规分页带来的问题,并且拥有非常高的性能,但是缺点也显而易见,不支持跳页,不支持任意排序,所以这个方式目前来说非常适合前端app的瀑布流排序。
分片下的实现
首先分片下需要实现这个功能我们需要有id支持分片,并且publish_time按时间分表,两者缺一不可。
原理
假设文章表article我们是以publish_time作为分片字段,假设按天分表,那么我们会拥有如下的表
article_20220101、article_20220102、article_20220103、article_20220104、article_20220105、article_20220106......
雪花id辅助分片
因为
select * from
(select * from article_20220101 union all select * from article_20220102 union all select * from article_20220103....) t
where id
流式分片,顺序查询
如果你是流式分片模式进行聚合通常我们会将20220101-20220105的所有的表进行并行的分别查询,然后针对每个查询的结果集进行优先级队列的排序后获取,优点:语句简单便于优化,性能可控,支持分库,缺点:实现复杂,连接数消耗多
select * from article_20220101 where id
流式分片下的优化
目前
# ShardingCore核心框架 版本6.4.2.4+
PM> Install-Package ShardingCore
# 数据库驱动这边选择的是mysql的社区驱动 efcore6最新版本即可
PM> Install-Package Pomelo.EntityFrameworkCore.MySql
第二步添加对象和上下文
有很多朋友问我一定需要使用fluentapi来使用
//文章表
[Table(nameof(Article))]
public class Article
{
[MaxLength(128)]
[Key]
public string Id { get; set; }
[MaxLength(128)]
[Required]
public string Title { get; set; }
[MaxLength(256)]
[Required]
public string Content { get; set; }
public DateTime PublishTime { get; set; }
}
//dbcontext
public class MyDbContext:AbstractShardingDbContext,IShardingTableDbContext
{
public MyDbContext(DbContextOptions options) : base(options)
{
//请勿添加会导致efcore 的model提前加载的方法如Database.xxxx
}
public IRouteTail RouteTail { get; set; }
public DbSet Articles { get; set; }
}
第三步:添加文章路由
public class ArticleRoute:AbstractSimpleShardingDayKeyDateTimeVirtualTableRoute
{
public override void Configure(EntityMetadataTableBuilder builder)
{
builder.ShardingProperty(o => o.PublishTime);
}
public override bool AutoCreateTableByTime()
{
return true;
}
public override DateTime GetBeginTime()
{
return new DateTime(2022, 3, 1);
}
}
到目前为止基本上Article已经支持了按天分表
第四步:添加查询配置,让框架知道我们是顺序分表且定义分表的顺序
public class TailDayReverseComparer : IComparer
{
public int Compare(string? x, string? y)
{
//程序默认使用的是正序也就是按时间正序排序我们需要使用倒序所以直接调用原生的比较器然后乘以负一即可
return Comparer.Default.Compare(x, y) * -1;
}
}
//当前查询满足的复核条件必须是单个分片对象的查询,可以join普通非分片表
public class ArticleEntityQueryConfiguration:IEntityQueryConfiguration
{
public void Configure(EntityQueryBuilder builder)
{
//设置默认的框架针对Article的排序顺序,这边设置的是倒序
builder.ShardingTailComparer(new TailDayReverseComparer());
////如下设置和上述是一样的效果让框架真对Article的后缀排序使用倒序
//builder.ShardingTailComparer(Comparer.Default, false);
//简单解释一下下面这个配置的意思
//第一个参数表名Article的哪个属性是顺序排序和Tail按天排序是一样的这边使用了PublishTime
//第二个参数表示对属性PublishTime asc时是否和上述配置的ShardingTailComparer一致,true表示一致,很明显这边是相反的因为默认已经设置了tail排序是倒序
//第三个参数表示是否是Article属性才可以,这边设置的是名称一样也可以,因为考虑到匿名对象的select
builder.AddOrder(o => o.PublishTime, false,SeqOrderMatchEnum.Owner|SeqOrderMatchEnum.Named);
//这边为了演示使用的id是简单的时间格式化所以和时间的配置一样
builder.AddOrder(o => o.Id, false,SeqOrderMatchEnum.Owner|SeqOrderMatchEnum.Named);
//这边设置如果本次查询默认没有带上述配置的order的时候才用何种排序手段
//第一个参数表示是否和ShardingTailComparer配置的一样,目前配置的是倒序,也就是从最近时间开始查询,如果是false就是从最早的时间开始查询
//后面配置的是熔断器,也就是复核熔断条件的比如FirstOrDefault只需要满足一个就可以熔断
builder.AddDefaultSequenceQueryTrip(true, CircuitBreakerMethodNameEnum.Enumerator, CircuitBreakerMethodNameEnum.FirstOrDefault);
//这边配置的是当使用顺序查询配置的时候默认开启的连接数限制是多少,startup一开始可以设置一个默认是当前cpu的线程数,这边优化到只需要一个线程即可,当然如果跨表那么就是串行执行
builder.AddConnectionsLimit(1, LimitMethodNameEnum.Enumerator, LimitMethodNameEnum.FirstOrDefault);
}
}
第五步:添加配置到路由
public class ArticleRoute:AbstractSimpleShardingDayKeyDateTimeVirtualTableRoute
{
//省略.....
public override IEntityQueryConfiguration CreateEntityQueryConfiguration()
{
return new ArticleEntityQueryConfiguration();
}
}
第六步:startup配置
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
ILoggerFactory efLogger = LoggerFactory.Create(builder =>
{
builder.AddFilter((category, level) => category == DbLoggerCategory.Database.Command.Name && level == LogLevel.Information).AddConsole();
});
builder.Services.AddControllers();
builder.Services.AddShardingDbContext()
.AddEntityConfig(o =>
{
o.CreateShardingTableOnStart = true;
o.EnsureCreatedWithOutShardingTable = true;
o.AddShardingTableRoute();
})
.AddConfig(o =>
{
o.ConfigId = "c1";
o.UseShardingQuery((conStr, b) =>
{
b.UseMySql(conStr, new MySqlServerVersion(new Version())).UseLoggerFactory(efLogger);
});
o.UseShardingTransaction((conn, b) =>
{
b.UseMySql(conn, new MySqlServerVersion(new Version())).UseLoggerFactory(efLogger);
});
o.AddDefaultDataSource("ds0", "server=127.0.0.1;port=3306;database=ShardingWaterfallDB;userid=root;password=root;");
o.ReplaceTableEnsureManager(sp => new MySqlTableEnsureManager());
}).EnsureConfig();
var app = builder.Build();
app.Services.GetRequiredService().Start();
using (var scope = app.Services.CreateScope())
{
var myDbContext = scope.ServiceProvider.GetRequiredService();
if (!myDbContext.Articles.Any())
{
List articles = new List();
var beginTime = new DateTime(2022, 3, 1, 1, 1,1);
for (int i = 0; i
第七步编写查询表达式
public async Task Waterfall([FromQuery] string lastId,[FromQuery]int take)
{
Console.WriteLine($"-----------开始查询,lastId:[{lastId}],take:[{take}]-----------");
var list = await _myDbContext.Articles.WhereIf(o => String.Compare(o.Id, lastId) o.PublishTime)ToListAsync();
return Ok(list);
}
运行程序