跳至主要內容

【BI】⑤系统优化

holic-x...大约 48 分钟项目bi-platform

【BI】⑤系统优化

系统问题分析

系统问题分析总结

​ 问题场景:面临服务处理能力有限,或者接口处理(或返回)时长较长时,就应该考虑采用异步化。

​ 具体来看,可能会遇到以下问题:

(1)用户等待时间过长:这是因为需要等待AI生成结果

(2)业务服务器可能面临大量请求处理,导致系统资源紧张,严重时甚至可能导致服务器宕机或无法处理新的请求

(3)调用的第三方服务(Al能力)的处理能力有限。比如每3秒只能处理1个请求,就会导致Al处理不过来;严重时,Al可能会对我们的后台系统拒绝服务

综上所述,面对这些问题,我们应当考虑异步化的解决方案

实现说明

【1】异步化和线程池改造概念

【2】智能分析AI接口:同步=》异步

  • 同步逻辑:用户上传文件,等待响应=》后台解析文件,调用AI服务接口生成图表信息,图表信息入库=》将响应结果返回给前端
  • 异步逻辑:用户上传文件=》后台生成图表基础信息(初始状态为awit),异步调用AI服务接口(生成图表信息后更新图表数据(图表信息、图表状态:成功或失败)、通知用户)=》用户可等待一段时间后在我的图表页面跟踪图表信息是否生成成功,也可等待系统发送通知

1.AI图表生成时间比较长

​ AI 接口调用响应的时间比较长,考虑将同步=》异步进行切换,优化系统性能

2.如何处理大量用户请求

​ 当系统面临大量用户请求时,如果处理能力有限,例如服务器的内存、CPU、网络带宽等资源有限,这可能导致用户处在一个长时间的等待状态。特别是在许多用户同时提交请求的情况下,服务器可能需要较长的时间来处理。 ​ 此外,如果后端的Al处理能力有限,也有可能引发问题。比如,为了确保平台的安全性,我们可能会限制用户的访问频率,即每秒或每几秒用户只能访问一次或几次。一旦用户过多地提交请求,就会增大Al处理的服务器的压力,导致Al服务器处理不了这么多请求。在这种情况下,其他用户只能等待,而在前端界面也只能显示持续等待的状态。长时间等待后,用户可能会收到服务器繁忙的错误信息。这不仅影响了用户的体验,也对服务器和我们使用的第三方服务带来压力。

​ 还需要考虑服务器如Tomcat的线程数限制。在极端情况下,比如每十秒只能处理一个请求,但却有200个用户在一秒钟内同时提交请求,这就会导致大量用户请求在服务器上积压,数据也无法及时插入到数据库中。如果用户长时间等待最终仍得到请求失败的结果,这种情况下也会对服务器造成压力。

3.调用第三方服务受限

​ 当调用第三方服务,假设Al处理能力是有限的,如每三秒只能处理一个请求。在 这种情况下,大量用户同时请求可能导致Al过载,甚至拒绝系统请求。

​ 假设我们正在使用的AI平台提供Al回答功能的服务()。在开发的智能BI中,如果有IO0个用户同时访问,就需要IO0次调用鱼聪明AI。然而,AI平台可能无法在一秒钟内服务IO0个用户。这种情况下,AI服务会认为我们在攻击它或者超过了它的处理能力,可能会对系统施加限制。这构成了一个潜在的风险。

4.其他问题

​ 解决方式是将这类问题归类,然后采用异步化的思路来解决。需要思考在哪些场景下可能会遇到服务处理能力有限或处理时间长的问题。

​ 另一个导致处理时间过长的因素是返回的数据量过大,这可能导致数据传输时间延长,从而增加了返回时间。举例说明,当调用一个第三方服务时,可能需要IO秒、20秒甚至半小时才能返回结果。在处理繁重任务的时候,我们应该考虑采用异步处理,避免让用户长时间等待。此处通过实现异步化有效解决这个问题。

异步化

基础概念

同步 VS 异步

同步:—件事情做完,再做另外一件事情(烧水后才能处理工作)

异步:在处理一件事情的同时,可以处理另一件事情。当第一件事完成时,会收到一个通知告诉你这件事已经完成,这样就可以进行后续的处理(烧水时,可以同时处理其他工作。水壶上的蜂鸣器会在水烧好时发出声音,就知道水已经烧好了,可以进行下一步操作)

​ 通常,如果想将同步变为异步,必须知道何时任务已经完成(需要一个通知机制)

异步业务流程分析

​ 针对目前系统实现,异步的流程是怎样的呢?

​ 用户在点击提交后就不需要在当前界面等待,可以直接回到主界面,或者继续填写下一个需要生成或分析的数据。提交完成后,回到主页,在主页上就可以看到图表的生成状态。如果图表已经生成好,那么系统可以在界面的右上角添加一个消息通知功能,用户可以在那里看到相关信息

标准化异步业务流程

标准化异步业务流程概念

​ 在用户需要进行长时间的操作时,点击提交后不需要在界面空等。而是先保存至数据库。以往会等图表完全生成后再保存,但现在,任务一提交,就立即存储,避免让用户在界面上等待。

​ 之后需要将用户的操作或任务添加到任务队列中,让程序或线程执行。想象一下,将用户的操作加入任务队列,这个队列就像个备忘录。比如我是公司唯一的员工,正在进行一个项目,当有用户请求修复bug时。我无法立即处理,但可以记下这个修复bug 的任务,待完成项目后再处理。这个队列就像我的备忘录。

​ 由于程序的处理能力或线程数有限,我们可以先把待处理的任务放入队列中等待。当我们有空的时候,再按顺序执行,而不是直接拒绝。因此,如果任务队列有空位,我们可以接受新任务;如果有空闲的线程或员工,我们可以立即开始这个任务;如果所有线程都在忙碌,那么我们可以把任务放入等待队列。但是,如果所有线程都在忙,且任务队列已满,那我们该怎么办? 有很多策略供我们选择。一种做法是直接拒绝任务。更好的方式是记录下这个任务,待有空时再处理。无论任务提交成功与否,我们都应该将其保存到数据库中以备查阅。这样,当我们在后期检查时,可以看到哪些任务由于程序处理能力不足而未得到处理。即使任务队列已满,我们也可以通过查阅数据库记录,找出提交失败的任务,并在程序空闲时取出这些任务,放入任务队列中执行。

​ 当用户需要执行新任务时,即使任务提交失败,或者消息队列满了,也要将其记录下来。建议将这个任务保存到数据库中记录,或者至少打一个日志。我们不能只把消息放入消息队列当做唯一的流程。要有一些保险措施,如打更多的日志,以应对网络或电力的突发情况。在开始编写程序前,我们应该清楚这些流程,这些都是我们可能需要处理的情况。

​ 在第三步中,我们的程序(线程)会按照任务队列的顺序逐一执行任务,这就像员工按照备忘录一项接一项地完成任务。任务完成后,我们会更新任务状态,将相关任务记录在数据库中标记为已完成。接下来需要考虑的问题是如何让用户知道他们的任务何时完成。在用户提交任务后,我们应该提供一个查询任务状态的地方,而不是让他们无尽地等待。 通过这一系列流程,用户的体验会比直接等待任务完成更好。尤其是在需要进行复杂分析的情况下,用户不太可能在界面上等待那么久。这时,我们可以采取异步执行,让用户先去做其他事情,可以继续提交新任务,也可以实时查看任务状态。这样的体验更好,远优于等待长时间后任务失败。

​ 注意:并非所有的操作都需要异步化。只有在任务执行时间较长的场景下,才考虑采用异步化方式。因为多线程和异步处理会增加代码的复杂度,并可能带来更多的问题。如果同步方式能够解决问题,那么就无需使用异步。

​ 异步处理是一个复杂的过程。在异步执行中,开发者可能无法清楚地知道程序执行到了哪一步。对于复杂的任务,我们需要在每一个小任务完成时记录下任务的执行状态或进度,这就像我们下载文件时看到的进度条一样。所以,对于大型、复杂的任务,为了提供更好的用户体验,我们应该提供进度条,让用户在查询状态时能看到任务执行到了哪一步。这是任何异步操作的重要环节,也是优化业务流程的方法。

标准异步化流程总结

(1)当用户要进行耗时很长的操作时,点击提交后,不需要在界面空等,而是应该把这个任务保存到数据库中记录下来

(2)用户要执行新任务时

​ 任务提交成功:1)若程序存在空闲线程,可以立即执行此任务;2)若所有线程均繁忙,任务将入队列等待处理

​ 任务提交失败:比如所有线程都在忙碌且任务队列满了

​ 1)选择拒绝此任务,不再执行

​ 2)通过查阅数据库记录,发现提交失败的任务,并在程序空闲时将这些任务取出执行

(3)程序(线程)从任务队列中取出任务依次执行,每完成一项任务,就更新任务状态

(4)用户可以查询任务的执行状态,或者在任务执行成功或失败时接收通知(例如:发邮件、系统消息提示或短信),从而优化体验

(5)对于复杂且包含多个环节的任务,在每个小任务完成时,要在程序(数据库中))记录任务的执行状态(进度)

系统业务流程梳理

​ 回归到系统设计,流程其实可以大大简化。比如,在创建图表的过程中,每次提交可以视为一个任务,甚至可以直接省略一个新的任务记录表。具体来说,将创建图表这个过程视作一个任务,并直接在图表的数据库或数据表中添加一个字段来表示图表的生成状态或任务状态,这样就无需再建立新的表了。

​ 要查询图表是否生成完毕,可以直接在页面上给出提示,或者展示一个等待标志、给出其他形式的反馈。这就避免了新建一个任务查询页面的需要,可以直接在这个界面完成任务状态的展示。

​ 首先,用户在点击智能分析页的提交按钮时,系统会立即将图表保存到数据库中作为一个任务。这是立即保存的,而不是等待Al生成结果后再保存。这样即使图表生成失败,用户也能明了其状态。

​ 其次,用户可以在图表管理页面查看所有图表的信息和状态,包括已生成的、正在生成的和生成失败的。此处,如果你想添加一些通知功能,或者定时更新功能,如让这个页面每三秒自动更新一次数据,都是可以的。

​ 最后,用户可以修改生成失败的图表信息,或者选择重新生成等操作。这个功能一不一定在这个系统中实现,它类似于增删改查操作中的修改功能,很像鱼聪明的重新生成功能。

业务流程总结

(1)用户点击智能分析页的提交按钮时,先把图表立刻保存到数据库中(作为一个任务)

(2)用户可以在图表管理页面查看所有图表(已生成的、生成中的、生成失败)的信息和状态

(3)用户可以修改生成失败的图表信息,点击重新生成,以尝试再次创建图表

架构图改造

原业务流程

image一20240419165455094

改造后业务流程

image一20240419165524023

​ 假设有很多用户都要提交图表,首先,系统后端会立即将这些数据保存到数据库中,随后将生成图表的任务放入队列。这样就可以向用户返回信息了,告诉用户该任务已经提交,稍候就可以查看结果。

​ 在后台,任务被放入队列后,程序中的线程会逐一从队列中取出消息,然后将这些任务交给Al服务去执行。这样一来,AI服务的运行就能够得到保证,确保其安全和稳定。

​ 为什么要这么做呢?因为系统中将消息放入了队列,逐一取出,而不是将所有用户的请求一股脑的发给Al服务。所以,消息队列在这里起到了消峰的作用。这个消息队列可以理解为任务队列。任务队列可以将突然增大的流量以平稳的方式,如管道一样,逐一传给下游处理系统,或者说是我们的第三方服务。

​ 第三方服务生成好图表之后,任务处理模块就可以去更新数据库中的图表信息。比如,之前的状态是正在生成,更新后的状态就变成了已生成

​ 对于异步开发可能存在两个疑问:

​ 首先,任务队列应该如何设置最大容量,毕竟能力是有限的,不能无限制地接受任务

​ 回归到系统中,应该如何限制任务队列的最大容量?应该设置多少个任务为最大值?另一个关键的实现点是,程序如何从任务队列中逐一取出任务进行执行?更进一步的话,应该如何实现这个任务队列的流程?

​ 首先,应该如何设置任务队列的最大容量。其次,应该如何实现一个程序,使其能自动从任务队列中取任务执行,并确保它在取任务时,最多只能同时取出多少个任务,最多只能执行多少个任务。例如,假设用户提交了八个任务,但程序最多只能同时执行四个任务,需要等待第四个任务执行完毕后才能取出下一个任务,这又该如何实现呢?

​ 实际上,这需要用到数据结构中的队列知识,可能还要结合一些编程思想去实现。可以考虑使用阻塞队列,或者说适当增加人手,这也是一种实现方式。

​ 另一种实现方式是分配四个线程,然后这四个线程执行完一个任务之后,才能再执行下一个任务。也就是说,线程需要取出任务来执行。

问题总结

任务队列的最大容量应该设置为多少?

程序怎么从任务队列中取出任务去执行?这个任务队列的流程怎么实现?怎么保证程序

最多同时执行多少个任务?

异步实现:引入线程池概念

线程池

1.线程池概念

​ 首先,我们需要实现的流程必须有一个工作者, 也就是程序的线程,称之为线程小李。此外,还需要一个任务队列,这个队列中有待处理的任务。

​ 设想一个情景,用户提交了一个任务,比如智能分析,任务1便加入队列。

​ 线程小李现在可以从任务队列中取出任务1进行执行,他关注的就是这个任务队列,有任务则取出,但每次只能执行一个,因为每个线程的工作能力是有限的,可能同时处理多个任务。

​ 那么问题来了,又来了一个任务2,但线程小李正在处理任务1,没有办法处理任务2。此时,不是可以考虑再派一个线程小鱼来帮忙?

​ 假如再有任务3,但员工已经没有了,只有小李和小鱼,此时又该怎么办?可以把新来的任务放入任务队列中,等待小李或者小鱼完成当前任务后,再去取任务3。

image一20240419203531769

​ 但是,如果任务队列满了怎么办?此时就没办法再接收新任务了。这时,为了系统的稳定性,可以选择把新任务记录到数据库中,但不再加入任务队列,等有空闲的线程或者有新的线程加入时,再把这些任务加入队列。

​ 然而,这个方案还不够完美。在某些情况下,如果任务队列满了,不一定非要拒绝新任务。还可以有其他策略,比如开个队列,开个公司。同时,还需要考虑,当小李正在处理任务1的时候,任务2来了,是否有必要立刻调用线程小鱼?

​ 例如,如果我们有四个任务,如果小李处理任务的速度非常快,可能一秒钟就能完成一个任务, 而小鱼处理任务的速度较慢,需要一个小时才能完成一个任务。 那是不是就让小李依次执行这四个任务就可以了。因此,如何分配任务,如何分配线程,这都需要制定出适合的策略。因此需要线程池来管理这些线程,任务队列就像是一个备忘录,任务就是我们需要完成的工作。

​ 为什么需要线程池?因为线程管理比较复杂。比如,突然来了很多任务,需要临时调用更多的线程来处理,但这些任务可能只是暂时的,正常情况下可能每天只有一个任务。当任务处理完毕后,这些临时的线程就没有工作可做了,那怎么办呢?应该把他们看作临时工,当任务处理完毕后就开除,释放系统资源,等到下一次有大任务需要处理的时候,再临时招募他们。

为什么需要线程池?

(1)线程的管理比较复杂(比如什么时候新增线程、什么时候减少空闲线程)

(2)任务存取比较复杂(什么时候接受任务、什么时候拒绝任务、怎么保证大家不抢到同一个任务)

线程池的作用:帮助系统轻松管理线程、协调任务的执行过程

​ 扩充:可以向线程池表达需求,比如最多只允许四个人同时执行任务。线程池就能自动进行管理。在任务急时,它会将任务放入队列。而在任务不紧急或者还有线程空闲时,它会直接将任务交给空闲的线程,不放入队列。

2.线程池实现

​ 针对自行实现线程池的场景,可以借助之前提到的几种场景,例如何时增加线程,何时减少线程,来逐步解答这个问题。

​ 实际上这是一项繁琐的任务。比如,有一种情况,如果大家执行速度都很快,可能会抢到同一个任务,如何协调各线程不去抢相同的任务就成了问题,这就涉及到线程的协调问题。

​ 在Linux中,有一种称为任务窃取的概念。比如,如果小鱼的工作效率非常高,而小李的工作效率较低,可以将原本分配给小李的任务,转交给小鱼去做,这就是任务窃取的概念。这个问题非常复杂,涉级到的内容很深。任务是否需要锁,取决于使用的数据结构类型,例如是否使用了阻塞队列等来实现任务队列,这些策略都相当复杂。

​ 然后来看看如何在程序中实现线程池。事实上,大多数程序都会有一个基本的线程池实现。

(1)在Spring中,可以利用ThreadPoolTaskExecutor配合@Async注解来实现线程池(不太建议)

​ 虽然Spring框架提供了线程池的实现,但并不特别推荐使用。因为Spring是一个框架, 它进行了一定程度的封装,可能隐藏了一些细节(对其使用有一定的保留意见)。更推荐直接使用Java 并发包中的线程池

(2)在Java中,可以使用JUC并发编程包中的ThreadPoolExecutor, 来现非常灵活地自定义线程池

​ 建议学完SpringBoot并能够实现一个项目、以及学完Redis之后,再系统学习Java并发编程(JUC)。可以避免过早的压力和困扰,在具备实践基础的情况下,可以更好地理解并发编程的概念和应用

​ 要做线程池,肯定是为了解决问题。在刚刚提出的问题中,任务队列的最大容量应设置为多少?换句话说,允许有多少个线程同时执行BI任务?这个问题如何确定呢?如何确定线程池参数?否需要结合业务场景?可以根据需求进行测试。

线程池参数在初次设定时并不可能一次就能百分之百精准,要结合实际测试情况,系统资源和实际业务场景来调整

​ 假设回到业务场景。为什么需要使用线程?为什么需要异步?因为调用Al服务处理速度很慢, Al服务的处理能力有限,所以需要配合Al服务,避免AI服务器崩溃。如果AI服务的能力强大,就可以提高并发度,让更多的线程同时去生成Al。但如果Al服务性能较差,只允许每三秒钟只能访问一次,只允许一个线程同时工作。那么线程池的参数就需要调小,只能让一个线程按顺序执行任务。

​ 需要结合实际场景,考虑系统最脆弱的环节,或者找出系统的瓶颈在哪里。比如Al生成能力的并发是一秒两次,或者并发仅允许四个线程同时执行。也就是说,不能同时生成五个任务,需要等待前四个任务完成后才能进行,那么线程池中的工作者数量最多只需要四个。如果AI服务允许20个线程或20个任务排队,那么队列长度也有了。

​ 假设最多允许四个线程同时执行,最多允许20个任务排队。系统同时向Al提交了八个任务,那么有四个任务会被执行,剩下的四个任务就会进入队列等待。如果同时提交了50个任务,那么有四个任务会被执行,20个任务排队,剩下的26个任务就会被丢弃。因此,线程池参数需要根据这些条件来设定

线程池参数

构建步骤

【1】config目录/创建ThreadPoolExecutorConfig.java

@Configuration
public class ThreadPoolExecutorConfig {
    
    @Bean
    public ThreadPoolExecutor threadPoolExecutor() {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor();
        return threadPoolExecutor;
    }
    
}

image-20240419204021175

分析线程池参数:线程池有什么参数、平时怎么设置线程池参数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadF actory,
                          RejectedExecutionHandler handler) {

参数参数说明如何设定
corePoolSize (核心线程数)这些线程就好比是公司的正式员工,他们在正常情况下都是随时待命处理任务的如果Al服务只允许四个任务同时进行,核心线程数应该就被设置为四
maximumPoolSize (最大线程数)在极限情况下我们的系统或线程池能有多少个线程在工作。就算任务再多,你最多也只能雇佣这么多的人,因为你需要考虑成本和资源的问题假设AI服务最多只允许四个任务同时执行,那么最大线程数应当设置为四
keepAliveTime (空闲线程存活时间)这个参数决定了当任务少的时候,临时雇佣的线程会等待多久才会被剔除这个参数的设定是为了释放无用的线程资源。你可以理解为,多久之后会“解雇”没有任务做的临时工
TimeUnit (空闲线程存活时间的单位)将keepAliveTime和TimeUnit组合在一起,就能指定一个具体的时间,比如说分钟、秒等
workQueue (工作队列/任务队列)这个队列存储所有等待执行的任务。可以叫它阻塞队列,因为线程需要按顺序从队列中取出任务来执行这个队列的长度一定要设定, 因为无限长度的队列会消耗大量的系统资源
threadFactory (线程工厂)它负责控制每个线程的生成,就像一个管理员,负责招聘、管理员工,比如设定员工的名字工资,或者其他属性
RejectedExecutionHandler (拒绝策略)当任务队列已满的时候,应该怎么处理新来的任务?是抛出异常,还是使用其他策略?
比如说,可以设定任务的优先级,会员的任务优先级更高。
如果你的公司或者产品中有会员业务,或者有一些重要的业务需要保证不被打扰,可以考虑定义两个线程池或者两个任务队列,一个于处理VIP任务,一个用于处理普通任务,保证他们不互相干扰,也就是资源隔离策略。

线程池参数总结

​ 怎么确定线程池参数呢?结合实际情况(实际业务场景和系统资源来测试调整,不断优化。回归到业务,要考虑系统最脆弱的环节(系统的瓶颈)在哪里?

​ 现有条件:比如Al生成能力的并发是只允许4个任务同时去执行,Al 能力允许20个任务排队

  • corePoolSize (核心线程数=>正式员工数):正常情况下,我们的系统可以同时工作的线程数(随时就绪的状态
  • maximumPoolSize (最大线程数=>哪怕任务再多,你也最多招这些人):极限情况下,线程池最多可以拥有多少个线程?
  • keepAliveTime (空闲线程存活时间):非核心线程在没有任务的情况下,过多久要删除(理解为开除临时工),从而释放无用的线程资源
  • TimeUnit unit (空闲线程存活时间的单位):分钟、秒
  • workQueue (工作队列):用于存放给线程执行的任务,存在一个队列的长度(一 定要设置,要说队列长度无限,因为也会占用资源)
  • threadFactory (线程工厂):控制每个线程的生成线程的属性(此如线程名)
  • RejectedExecutionHandler (拒绝策略):任务队列满的时候,我们采取什么措施,比如抛异常、不抛异常、自定义策略

资源隔离策略:比如重要的任务(VIP务)一个队列,普通任务个队列, 保证这两个队列互不干扰。

线程池的工作机制(结合流程分析)

【1】一开始,没有任何的线程和任务

image一20240419215558254

【2】当有新任务进来,发现当前员工数还达到设定的正式员工数(corePoolSize= 2),则会直接增聘一名新员工来处理这个任务

image一20240419215609800

【3】又来一个新任务,发现当前员工数量还未达到设定的正式员工数(corePoolSize= 2),则会再次增聘一名新员工来处理这个任务

image一20240419215623011

【4】又来了一个新任务, 但是正式员工数已经达到上线(当前线程数= corePoolSize=2),这个新任务将被放到等待队列中(最大长度workQueue.size是2),而不是立即增聘新员工

image一20240419215636741

【5】又来了一个新任务,但是任务队列已经满了(当前线程数>corePoolSize=2,已有任务数=最大长度workQueue.size=2),将增设新线程(最大线程数maxmumPoolSize=4)来处理任务,而不是丢弃这个任务

image-20240419215649948

【6】当达到7个任务时,由于任务队列已经满了,临时工也招满了(当前线程数=maxmumPoolSize=4,已有任务数=最大长度workQueue.size=2),采用拒绝策略(RejectedExecutionHandler)来处理多余的任务

image-20240419215705999

【7】如果当前线程数超过corePoolSize(正式员工数),并且这些额外的线程没有新的任务可执行,则在keepAliveTime时间到达后,这些额外的线程将会被释放

如何设置线程池参数

​ 首先,corePoolSize 参数非常关键,它代表了在正常情况下需要的线程数量。你可以根据希望的系统运行状况以及同时执行的任务数设定这个值

​ 接着是maximumPoolSize,这个参数的设定应当与我们的下游系统的瓶颈相关联。比如,如果我们的AI系统一次只允许两个任务同时执行,那么maximumPoolSize就应设为两个,这就是其上限,不宜过大。所以,这个参数值的设定就应当对应于极限情况

​ 回到我们的业务场景,AI系统最多允许4个任务同时执行,那么我们应如何设定这些参数呢?

​ 对于核心线程数(corePoolSize),可以设定为四,这是在正常运行情况下的需求。然后,maximumPoolSize就应设定为极限条件,也就是小于等于4

​ 对于keepAliveTime空闲存活时间,并不需要过于纠结,这个问题相对较小。可以设定为几秒钟,虽然这可能稍微有点短。可能需要根据任务以及人员的变动频率进行设定,但无需过于纠结,通常设定为秒级或分钟级别就可以了

​ 再看workQueue工作队列的长度,建议结合系统的瓶颈进行设定。在目前场景中,可以设定为20。如果下游系统最多允许一小时的任务排队,那么这边就可以设置20个任务排队,而核心线程数则设定为4

​ threadFactory线程厂应根据具体情况设定

​ RejectedExecutionHandler拒绝策略,可以直接选择丢弃、抛出异常,然后交由数据处理,或者标记任务状态为已拒绝,表示任务已满,无法再接受新的任务

线程池开发

1.线程池的设计主要分为IO密集型和计算密集型

​ 关于线程池的设置,首先需要明确,设置的依据应该是具体的业务场景。通常,可以将任务分为IO密集型和CPU密集型,也称为计算密集型。对于计算密集型的任务,它会大量消耗CPU资源进行计算,例如音视频处理、图像处理、程序计算和数学计等。要最大程度上利用CPU,避免多个线程间的冲突, 一般将核心线程数设置为CPU的核数加一。这个“加一”可以理解为预留一个额外的线程,或者说一个备用线程, 来处理其他任务。这样做可以充分利海个CPU核心,减少线程间的频繁切换,降低开销。在这种情况下,maximumPoolSize的设定没有严格的规则,一般可以设为核心线程数的两倍或三倍。而耐于IO密集型的任务,它主要消耗的是带宽或内存硬盘的读写资源,对CPU的利用率不高。比如说,查询数据库或等待网络消息传输,可能需要花费几秒钟,而在这期间CPU实际上是空闲的。在这种情况下,可以适当增大corePoolSize的值,因为CPU本来就是空闲的。比如说,如果数据库能同时支持20个线程查询,那么corePoolSize就可以设置得相对较大,以提高查询效率。然有一些经验值, 比如2N+1,不太推这种经验值,建议根据IO的能力来设定。

总结

一般情况下, 任务分为IO密集型和计算密集型两种。

计算密集型:吃CPU,比如音视频处理、图像处理、数学计算等,一 般是设置corePoolSize为CPU的核数+ 1(空余线程) ,可以让每个线程都能利用好CPU的每个核,且线程之间不用频繁切换(减少打架、减少开销)

IO密集型:吃带宽/内存/硬盘的读写资源,corePoolSize 可以设置大一 一点,一般经验值是2n左右,但是建议以IO的能力为主。

对于导入百万级Excel数据到数据库这种情况,应该如何归类呢?

​ 其实答案很简单,我们只需要考虑这个过程主要消耗哪种资源。想一下,当你将百万条数据导入到数据库时,会进行大量的计算操作吗?可能有一一些, 但数量极少。大部分的时间花费在哪里呢?主要是在数据写入数据库的过程,这涉及到网络传输和磁盘/O操作。实际上,数据库操作的本质就是磁盘I/O。所以,这种情况下,可以认为是IO密集型的。

设置线程池参数总结

现有条件:比如Al生成能力的并发是只允许4个任务同时去执行,Al 能力允许20个任务排队

corePoolSize (核心线程数=>正式员工数) :正常情况下,可以设置为2-4

maximumPoolSize:设置为极限情况,设为<= 4

keepAliveTime (空闲线程存活时间) :-般设置为秒级或者分钟级

TimeUnit unit (空闲线程存活时间的单位) :分钟、秒

workQueue (工作队列) :结合实际请况去设置,可以设置为20

threadFactory (线程工厂) :控制每个线程的生成线程的属性(此如线程名)

RejectedExecutionHandler (拒绝策略) : 抛异常,标记数据库的任务状态为“任务满了已拒绝”

线程池开发

回到后端,继续去设置线程池的参数

corePoolSize (核心线程数)为2- 2个正式员工

maximumPoolSize (最大线程数)为4-最多允许4个任务同时执行

keepAliveTime (空闲线程存活时间)为100 -多余的临时工会等待100后,被开除

TimeUnit (空闲线程存活时间的单位)为TimeUnit.SECONDS,与keepAliveTime组合使用,指多余临时工等待100秒后,被开除

workQueue (工作队列)为new ArrayBlockingQueue<>(10000)一使用数组阻塞队列,并指定长度为4,最多等待4个任务

threadFactory (线程厂) 创建一个自己的线程工厂

RejectedExecutionHandler (拒绝策略)不写,默认拒绝

线程池配置

@Configuration
public class ThreadPoolExecutorConfig {

    @Bean
    public ThreadPoolExecutor threadPoolExecutor() {
        // 创建一个线程工厂
        ThreadFactory threadFactory = new ThreadFactory() {
            // 初始化线程数为1
            private int count = 1;

            // 每当线程池需要创建新线程时,就会调用newThread方法(@NotNull指定r应该永远不为null)
            @Override
            public Thread newThread(@NotNull Runnable r) {
                // 创建一个新线程
                Thread thread = new Thread(r);
                // 设置线程名称(名称中包含线程数的当前值)
                thread.setName("线程" + count);
                // 线程数递增
                count++;
                // 返回新创建的线程
                return thread;
            }
        };

        /**
         * 创建一个新的线程池,线程池核心大小为2,最大线程数为4,非核心线程空闲时间为100s,任务队列为阻塞队列,长度为4
         * 使用自定义的线程工厂创建线程
         */
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 100, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(4), threadFactory);
        // 返回创建的线程池
        return threadPoolExecutor;
    }

}

线程池控制器(添加线程、查看线程池)

@RestController
@RequestMapping("/queue")
@Slf4j
//@Profile({ "dev", "local" })
public class QueueController {

    // 注入线程池实例
    @Resource
    private ThreadPoolExecutor threadPoolExecutor;

    // 将任务添加到线程池
    @GetMapping("/add")
    public void add(String name) {
        // 借助CompletableFuture执行异步任务
        CompletableFuture.runAsync(() -> {
            // 打印日志信息(包括任务名称和执行线程的名称)
            log.info("任务执行中:" + name + ",执行人:" + Thread.currentThread().getName());
            try {
                // 让线程休眠10分钟,模拟长时间运行的任务
                Thread.sleep(600000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 异步任务在threadPoolExecutor中执行
        }, threadPoolExecutor);
    }

    // 获取线程池状态信息
    @GetMapping("/get")
    public String get() {
        // 创建Map存储线程池的状态信息
        Map<String, Object> map = new HashMap<>();
        int size = threadPoolExecutor.getQueue().size();
        map.put("队列长度", size);
        long taskCount = threadPoolExecutor.getTaskCount();
        map.put("任务总数", taskCount);
        long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
        map.put("已完成任务数", completedTaskCount);
        int activeCount = threadPoolExecutor.getActiveCount();
        map.put("正在工作的线程数", activeCount);
        // 将map转换为JSON字符串并返回
        return JSONUtil.toJsonStr(map);
    }
}

启动项目测试:依次添加任务、查看线程池状态

// 初始化
{"任务总数":0,"已完成任务数":0,"正在工作的线程数":0,"队列长度":0}

// 任务1
任务执行中:任务1,执行人:线程1
{"任务总数":1,"已完成任务数":0,"正在工作的线程数":1,"队列长度":0}

// 任务2
任务执行中:任务2,执行人:线程2
{"任务总数":2,"已完成任务数":0,"正在工作的线程数":2,"队列长度":0}


-------- 任务1、2直接被线程1、线程2处理,没有进入队列 --------


// 任务3
request start,id: 2e5fc63c-e434-4c7a-91eb-1adff6aa4db3, path: /api/queue/add, ip: 0:0:0:0:0:0:0:1, params: [任务3]
{"任务总数":3,"已完成任务数":0,"正在工作的线程数":2,"队列长度":1}

// 任务4
request start,id: ca430dab-3a7c-45e1-83e1-07aab86cb607, path: /api/queue/add, ip: 0:0:0:0:0:0:0:1, params: [任务4]
{"任务总数":4,"已完成任务数":0,"正在工作的线程数":2,"队列长度":2}

// 任务5
request start,id: 23c950de-3463-495a-9843-bae8fba2e57a, path: /api/queue/add, ip: 0:0:0:0:0:0:0:1, params: [任务5]
{"任务总数":5,"已完成任务数":0,"正在工作的线程数":2,"队列长度":3}

// 任务6
request start,id: 02ae7563-1606-49e1-9157-c7dc0070e059, path: /api/queue/add, ip: 0:0:0:0:0:0:0:1, params: [任务6]
{"任务总数":6,"已完成任务数":0,"正在工作的线程数":2,"队列长度":4}

-------- 任务3-6,由于当前没有空闲的进程处理任务,因此任务3-6全部进入队列 --------


-------- 任务7,由于当前没有空闲的进程处理任务、且队列已经满了,会增设新线程(最大线程数为4)来处理新任务,因此任务7优先被新线程执行 --------

// 任务7
任务执行中:任务7,执行人:线程3
{"任务总数":7,"已完成任务数":0,"正在工作的线程数":3,"队列长度":4}

-------- 任务8,由于当前没有空闲的进程处理任务、且队列已经满了,会增设新线程(最大线程数为4)来处理新任务,因此任务8优先被新线程执行 --------

// 任务
任务执行中:任务8,执行人:线程4
{"任务总数":8,"已完成任务数":0,"正在工作的线程数":4,"队列长度":4}



-------- 任务9,由于当前没有空闲的进程处理任务、且队列已经满了、且线程已经增设最大,因此启用拒绝策略(控制台报错) --------

// 任务9(拒绝)
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncRun@6adc8f5b rejected from java.util.concurrent.ThreadPoolExecutor@56ad704f[Running, pool size = 4, active threads = 4, queued tasks = 4, completed tasks = 0]
	at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1818) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:2033) ~[na:na]



前后端异步化改造

1.库表设计

​ 在chart表中新增两个字段,补充实体相关定义

--任务状态字段(排队中wait、执行中running、 已完成succeed、 失败failed)
status varchar(128) not nu1l default 'wait' comment 'wait,running,succeed,failed',
--任务执行信息字段
execMessage text null comment '执行信息',

​ 环境配置(QueueController设定:@Profile({ "dev", "local" }指定dev、local环境生效(不要把测试暴露出去))

image-20240419224814829

2.后端接口开发

​ 业务流程改造:(原业务流程:校验=》限流=》构造用户输入、调用AI),将调用AI这步调整为提交任务

image-20240419230227792

​ 完善业务逻辑细节:异步任务处理调用AI接口前,先锁定对应任务(将图表任务状态修改为“执行中”,等待执行成功之后再将状态修改为“已完成”,然后保存执行结果;如果执行失败则将状态修改为“失败”并记录任务失败信息,防止同一个任务被多次执行)

两种处理方式:理清为什么要引入异步流程,异步的优点、如何实现异步?同步:genChartByAiAsync、异步:genChartByAiSync

@PostMapping("/genChartByAiAsync")
    public BaseResponse<BiResponse> genChartByAiAsync(@RequestPart("file") MultipartFile multipartFile, GenChartByAiRequest genChartByAiRequest, HttpServletRequest request) {
        String name = genChartByAiRequest.getName();
        String goal = genChartByAiRequest.getGoal();
        String chartType = genChartByAiRequest.getChartType();

        // 校验
        ThrowUtils.throwIf(StringUtils.isBlank(goal), ErrorCode.PARAMS_ERROR, "目标为空");
        ThrowUtils.throwIf(StringUtils.isNotBlank(name) && name.length() > 100, ErrorCode.PARAMS_ERROR, "名称过长");

        // 文件信息校验(校验文件后缀、大小等基本信息,防止文件上传漏洞攻击)
        long size = multipartFile.getSize();
        String originalFilename = multipartFile.getOriginalFilename();
        // 校验文件大小,如果文件大于1兆则抛出异常,提示文件超过1M
        final long ONE_MB = 1024 * 1024;
        ThrowUtils.throwIf(size > ONE_MB, ErrorCode.PARAMS_ERROR,"文件超过1M");
        // 检验文件后缀(一般是xxx.csv,获取到.后缀),可借助FileUtil工具类的getSuffix方法获取
        String suffix = FileUtil.getSuffix(originalFilename);
        final List<String> validFileSuffixList = Arrays.asList(".xlsx",".xls"); // ".png",".csv",".jpg",".svg","webp","jpeg"
        ThrowUtils.throwIf(!validFileSuffixList.contains(suffix),ErrorCode.PARAMS_ERROR,"文件后缀格式非法");

        // 获取当前登陆用户
        User loginUser = userService.getLoginUser(request);
        long biModelId = CommonConstant.BI_MODEL_ID;

        // 引入限流判断
        redisLimiterManager.doRateLimit("genChartByAi_"+loginUser.getId());

        // 构造用户输入
        StringBuilder userInput = new StringBuilder();
        userInput.append("分析需求:").append("\n");

        // 拼接分析目标
        String userGoal = goal;
        if (StringUtils.isNotBlank(chartType)) {
            userGoal += ",请使用" + chartType;
        }
        userInput.append(userGoal).append("\n");
        userInput.append("原始数据:").append("\n");
        // 压缩后的数据
        String csvData = ExcelUtils.excelToCsv(multipartFile);
        userInput.append(csvData).append("\n");


        // --------------------------------- start 异步处理逻辑 ------------------------------
        // 1.先将图标数据保存到数据库中
        Chart chart = new Chart();
        chart.setName(name);
        chart.setGoal(goal);
        chart.setChartData(csvData);
        chart.setChartType(chartType);
        // 数据库插入时,还没生成结果(先插入数据,异步调用AI接口生成图表信息),将图表任务状态设置为排队中
//        chart.setGenChart(genChart);
//        chart.setGenResult(genResult);
        chart.setStatus("wait");
        chart.setUserId(loginUser.getId());
        boolean saveResult = chartService.save(chart);
        ThrowUtils.throwIf(!saveResult, ErrorCode.SYSTEM_ERROR, "图表保存失败");

        // 2.在最终的返回结果前提交一个任务(todo 建议处理任务队列满了后,抛异常的情况,因为提交任务报错了,前端会返回异常)
        CompletableFuture.runAsync(()->{
            // 2.1 修改图表任务状态为执行中
            Chart updatedChart = new Chart();
            updatedChart.setStatus("running");
            boolean updatedOp = chartService.updateById(updatedChart);
            if(!updatedOp){
                handleChartUpdateError(chart.getId(),"更新图表【执行中】状态失败");
                return ;
            }
           // 2.2 异步调用AI接口
            String result = aiManager.doChat(biModelId, userInput.toString());
            // 此处分隔符以设定为参考
            String[] splits = result.split("【【【【【"); // 【【【【【
            if (splits.length < 3) {
                throw new BusinessException(ErrorCode.SYSTEM_ERROR, "AI 生成错误");
            }
            String genChart = splits[1].trim();
            String genResult = splits[2].trim();

            // 2.3 AI结果调用成功后,更新任务状态,并将AI结果进行封装
            Chart updatedChartResult = new Chart();
            updatedChartResult.setId(chart.getId());
            updatedChartResult.setGenChart(genChart);
            updatedChartResult.setGenResult(genResult);
            updatedChartResult.setStatus("succeed");
            boolean updatedResOp = chartService.updateById(updatedChartResult);
            if(!updatedResOp){
                handleChartUpdateError(chart.getId(),"更新图表【成功】状态失败");
            }
        },threadPoolExecutor);
        // --------------------------------- end 异步处理逻辑 ------------------------------
        BiResponse biResponse = new BiResponse();
        biResponse.setChartId(chart.getId());
        return ResultUtils.success(biResponse);
    }

    /**
     * 处理更新图表执行中状态失败方法
     * @param chartId
     * @param execMessage
     */
    private void handleChartUpdateError(Long chartId, String execMessage) {
        Chart updatedChartResult = new Chart();
        updatedChartResult.setId(chartId);
        updatedChartResult.setStatus("failed");
        updatedChartResult.setExecMessage(execMessage);
        boolean updatedResOp = chartService.updateById(updatedChartResult);
        if(!updatedResOp){
            log.error("更新图表【失败】状态失败"+chartId+","+execMessage);
        }
    }

3.前端开发

【1】创建路由

{ name: '智能分析(同步)', icon: 'table', path: '/add_chart', component: './AddChart' },
{ name: '智能分析(异步)', icon: 'table', path: '/add_chart_async', component: './AddChartAsync' },

【2】创建组件(智能分析【异步】),从原AddChart中copy进行修改

image-20240419232328224

【3】更新openapi接口(同步:genChartByAiAsync、异步:genChartByAiSync)

​ 终端输入yarn run openapi

​ 修改AddChart/index.tsx:接口调用方法调整(原upload调整为genChartByAiSync)

​ 修改AddChartAsync/index.tsx:接口调用方法调整(genChartByAiAsync)

【4】清理多余代码(多余的可视化图表分析结论等)

  • 组件名修改AddChart=》AddChartAsync
  • 清理多余代码(Shift+Alt+O:清理无关引用)

image-20240419233235140

image-20240419233442709

​ 异步智能不涉及返回图表信息,因此将涉及到的定义和处理全部删除

image-20240419233805521

image-20240419233852613

​ 修改提示话术,任务提交成功后重置表单信息

image-20240419234338558

​ 执行:新增一个图表,查看数据库,此时有线程可以处理调用AI接口,此时图表状态由awit=》running,等过一段时间之后查看数据表状态,可以看到AI接口调用成功后响应则图表状态被更新为succeed且图表数据被正常封装,再去图表列表中查看图表信息

image-20240420000605748

MyChart优化:处理响应数据(只有图表状态为succeed才会渲染图表)、添加不同状态的图表展示

image-20240420001531247

image-20240420001209769

​ 生成效果:(根据图表status渲染数据)

image-20240420001923328

import { listChartByPageUsingPost } from '@/services/noob-bi/chartController';
import { useModel } from '@@/exports';
import { Avatar, List, Result, message } from 'antd';
import Search from 'antd/es/input/Search';
import EChartsReact from 'echarts-for-react';
import React, { useEffect, useState } from 'react';

const MyChart: React.FC = () => {

  // 构建初始条件(便于后面恢复初始条件)
  const initSearchParams = {
    // 默认第一页
    current: 1,
    // 初始情况每页数据返回12条
    pageSize:12,
  }

  /**
   * 定义一个状态(searchParams)和其对应的更新函数(setSearchParams),初始化为initSearchParams
   * searchParams是发送给后端的查询条件,参数类型是API。ChartQueryRequest
   * {...}是展开语法,将initSearchParams中的所有属性展开并复制到一个新对象中(不改变原始对象,可以避免在现有对象直接更改值的对象变异操作)
   * React中不推荐直接修改状态或者属性,而是通过创建要一个新对象并将其分配给状态或属性
   */
  const [searchParams,setSearchParams] = useState<API.ChartQueryRequest>({...initSearchParams});

  // 从全局状态中获取登陆用户信息
  const {initialState} = useModel('@@initialState');
  const {currentUser} = initialState??{};
  // 定义数据加载状态(控制页面是否加载:默认加载)
  // const {loading,setLoading} = useState<boolean>(true);

  // 定义变量存储图表数据
  const [chartList,setChartList] = useState<API.Chart[]>();
  // 定义数据总数(类型为number、默认为0)
  const [total,setTotal] = useState<number>(0);

  // 定义一个获取数据的异步函数
  const loadData = async()=>{
    // 获取数据(此时页面还在加载中,设定加载属性)
    // setLoading(true);

    /**
     * 调用后端接口,传入serchParams请求参数并返回响应结果
     * listChartByPageUsingPost是通过openAPI生成的接口
     * 当searchParam状态改变时,可通过setSearchParams更新该状态并重新获取数据
     */
      try {
        const res = await listChartByPageUsingPost(searchParams);
        // 响应成功,将图表数据进行渲染(如果为空则传入空数组,分页数据则拿到数据列表)
        if(res.data){
          setChartList(res.data.records ?? []);
          // 数据总数:数据列表如果为空则返回0
          setTotal(res.data.total ?? 0);

          // 列表数据处理(有些图表有标题、有些没有,此处过滤掉不展示标题信息)
          if(res.data.records){
            res.data.records.forEach(data => {
              // 当图表状态为succeed的时候才解析图表代码(渲染数据)
              if(data.status === 'succeed'){
                // 将后端返回的图表字符串修改为对象数组,如果后端返回空字符串则返回{}
                const chartOption = JSON.parse(data.genChart??'{}');
                // 标题设置为undefined
                chartOption.title = undefined;
                // 将修改后的option重新赋值给原genChart字段
                data.genChart = JSON.stringify(chartOption);
              }
            });
          }

        }else{
          // 如果后端返回数据为空,抛出异常(获取图表失败)
          message.error("获取图表失败");
        }
      } catch (e:any) {
        // 出现异常,提示失败西悉尼
        message.error('图表获取失败'+e.message);
      }

      // 数据获取成功,加载完毕,将加载状态设置为false
      // setLoading(false);
  }

  // 页面首次加载,触发加载数据
  useEffect(()=>{
    // 调用方法加载数据(页面首次渲染以及数组中的搜索条件发生变化的时候执行loadData方法触发搜索)
    loadData();
  },[searchParams])

  return (
    // 页面信息定义(my-chart)
    <div className = "my-chart">
      {/* 搜索框引入 */}
      <div>
        
        {/* <Search placeholder='请输入图表名称' enterButton loading={loading} onSearch={(value)=>{ */}
        <Search placeholder='请输入图表名称' enterButton onSearch={(value)=>{
          // 设置搜索条件
          setSearchParams({
            // 原始搜索条件
            ...initSearchParams,
            // 搜索词
            name:value,
          });
        }}></Search>
      </div>

      <div className='margin-16'/>
      {/* 数据引入 */}
      <List
          itemLayout="vertical"
          size="large"

          // 设置组件样式(栅栏格式)
          grid={{
            gutter: 16,
            xs: 1,
            sm: 1,
            md: 1,
            lg: 2,
            xl: 2,
            xxl: 2,
          }}

          // 分页组件定义
          pagination={{
            // 当切换分页,在当前搜索条件的基础上,将页数调整为当前的页数
            onChange: (page,pageSize) => {
              setSearchParams({
                ...searchParams,
                current:page,
                pageSize,
              })
            },
            // 显示当前页数
            current: searchParams.current,
            // 页面参数、总数修改为自己的
            pageSize: searchParams.pageSize,
            total: total,
          }}

          // loading状态调整为自己的状态
          // loading = {loading};

          // 数据源改成图表数据(列表组件会自动渲染)
          dataSource={chartList}
          
          renderItem={(item) => (
            // List.Item 要展示的每一条数据
            <List.Item
              // key 对应图表id
              key={item.id}
              // 要展示的图表信息
              /*
              extra={
                <img
                  width={272}
                  alt="logo"
                  src="https://gw.alipayobjects.com/zos/rmsportal/mqaQswcyDLcXyDKnZfES.png"
                />
              }
              */
            >
              {/* 要展示的图表元素信息 */}
              <List.Item.Meta
                // 头像(todo)
                // avatar={<Avatar src={'https://cos.holic-x.com/profile/avatar/avatar02.png'} />}
                avatar={<Avatar src={currentUser&&currentUser.userAvatar} />}
                // 图表名称(链接跳转查看详情)
                title={<a href={item.href}>{item.name}</a>}
                // 描述
                description={item.chartType?'图表类型'+item.chartType:undefined}
              />
              
              {/* <div style={{marginBottom:16}}></div> */}

              {/* 要展示的内容 */}
              {/* {'分析目标:'+item.goal} */}
              {/* <div style={{marginBottom:16}}></div> */}

              {/* 图表信息展示 */}
              {/* <EChartsReact option={JSON.parse(item.genChart??'{}')} /> */}
              {/* <EChartsReact option={item.genChart&&JSON.parse(item.genChart)} /> */}

              <>
                {
                  item.status === 'wait' && <>
                    <Result
                      status="warning"
                      title="待生成"
                      subTitle={item.execMessage ?? '当前图表生成队列繁忙,请耐心等候'}
                    />
                  </>
                }
                {
                  item.status === 'running' && <>
                    <Result
                      status="info"
                      title="图表生成中"
                      subTitle={item.execMessage}
                    />
                  </>
                }
                {
                  item.status === 'succeed' && <>
                    <div style={{ marginBottom: 16 }} />
                    <p>{'分析目标:' + item.goal}</p>
                    <div style={{ marginBottom: 16 }} />
                    <EChartsReact option={item.genChart && JSON.parse(item.genChart)} />
                  </>
                }
                {
                  item.status === 'failed' && <>
                    <Result
                      status="error"
                      title="图表生成失败"
                      subTitle={item.execMessage}
                    />
                  </>
                }
              </>

            </List.Item>
          )}
        />
    </div>
  );
};
export default MyChart;

优化思路

​ 同步方式:可以随时查看结果,即使可能需要等待十几秒或者20秒。如果需要等待一分钟或者五分钟的话,异步方式可能会更合适。

​ 也可以选择实时更新,比如每隔几秒刷新一下页面,自动获取新结果。批量异步也是一种可行的方式。

​ 此外,还有一种策略。 可以根据系统当前的负载动态地调整用户查询的处理方式。比如,如果系统当前状态良好,就可以选择同步返回结果。而如果用户提交请求后发现系统非常繁忙,预计需要等待很长时间,那么就可以选择异步处理方式。这种思考方式在实际的企业项目开发中也是很常见的。

​ 除了刚刚提到的一些点,还可以使用定时任务来处理失败的图表,添加重试机制。此外,也可便精确地预见AI生成错误,并在后端进行异常处理,如提取正确的字符串。例如,Al 说一些多余的话, 就需要提取出正确的信息。同时,如果任务没有提交,可以使用定时任务将其提取出来。还可以为任务增加一个超时时间。如果超时,任务将自动标记为失败,这就是超时控制,这一鯡常重要。对吁Al生成的脏数据,导致最后出现错误,因此前端也需要进行异常处理,不能仅仅依赖于后端。

反向压力:那就是在系统压力大的时候,使用异步,而在系统压力小的时候,使用同步,这就是反向压力的概念。进一步扩展一下,关于线程池,现在的核心参数不是设定为2,但实际上,如果AI最多允许四个任务同时执行,是否可以提前确认AI当前的业务是否繁忙,即调用的第三方API否还有多余的资源给我们使用。如果对方表示资源已经耗尽,为了保证系统的稳定性,否可以将核心线程数调小些。反之,如果询问AI第三方并发现它的状态是空闲,是否可以将核心线程数增加,以此来提高系统性能。这种通过下游服务来调整你的业务以及核心线程池参数,进而改变你的系统策略的方式就是反向压力。例如,你发现当前Al服务的任务队列中没有任何人提交任务,那么你是否可以提高使用率。这其实是一个很好的点,如果你能在简历上写到反向压力,将会是一个很大的加分项。 反向压力其实是在做大数据系统中,特别是在做实时数据流系统时经常会用到的一个术语。是不是可以在任务执行成功或失败后,给用户发送消息通知。比如说,在图表页面增加一个刷新或者定时自动刷新的按钮,以保证用户能够获取到图表的最新状态。这就是前端轮询的技术。还有就是在任务执行过程中,我们可以向用户发送消息通知,虽然这可能比较复杂。但是在短期内,可以自己尝试实现,如通过数据库记录消息,这是最简单的方式。当然还有其他的方式,如websocket实时通知或者server side event,这都是实时的。

优化

(1)guava Retrying重试机制

(2)提前考虑到Al生成错误的情况,在后端进行异常处理(此如Al说了多余的话,提取正确的字符串)

(3)如果说任务根本没提交到队列中(或者队列满了),不是可以用定时任务把失败状态的图表放到队列中(补偿)

(4)建议给任务的执行增加一个超时时间,超时自动标记为失败(超时控制)

(5)反向压力: https://zhuanlan.zhihu.com/p/404993753, 通过调用的服务状态来选择当前系统的策略(比如根据AI服务的当前任务队列数来控制咱们系统的核心线程数),从而最大化利用系统资源

(6)我的图表页面增加一个刷新、定时自动刷新的按钮,保证获取到图表的最新状态(前端轮询)

(7)任务执行成功或失败,给用户发送实时消息通知(实时: websocket、 server side event)

评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v3.1.3