Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

解秘 Node.js 单线程实现高并发请求原理,以及串联同步执行并发请求的方案 #86

Open
biaochenxuying opened this issue Mar 21, 2021 · 2 comments
Assignees

Comments

@biaochenxuying
Copy link
Owner

最近在做一个支持多进程请求的 Node 服务,要支持多并发请求,而且请求要按先后顺序串联同步执行返回结果。

对,这需求就是这么奇琶,业务场景也是那么奇琶。

需求是完成了,为了对 Node.js 高并发请求原理有更深一些的理解,特意写一篇文章来巩固一下相关的知识点。

问题

Node.js 由这些关键字组成: 事件驱动、非阻塞I/O、高效、轻量

于是在我们刚接触 Node.js 时,会有所疑问:

  • 为什么在浏览器中运行的 JavaScript 能与操作系统进行如此底层的交互?

  • Node 真的是单线程吗?

  • 如果是单线程,他是如何处理高并发请求的?

  • Node 事件驱动是如何实现的?

下来我们一起来解秘这是怎么一回事!

架构一览

上面的问题,都挺底层的,所以我们从 Node.js 本身入手,先来看看 Node.js 的结构。

  • Node.js 标准库,这部分是由 Javascript编写的,即我们使用过程中直接能调用的 API。在源码中的 lib 目录下可以看到。

  • Node bindings,这一层是 Javascript 与底层 C/C++ 能够沟通的关键,前者通过 bindings 调用后者,相互交换数据。

  • 第三层是支撑 Node.js 运行的关键,由 C/C++ 实现。

  • V8:Google 推出的 Javascript VM,也是 Node.js 为什么使用的是 JavaScript 的关键,它为 JavaScript 提供了在非浏览器端运行的环境,它的高效是 Node.js 之所以高效的原因之一。

  • Libuv:它为 Node.js 提供了跨平台,线程池,事件池,异步 I/O 等能力,是 Node.js 如此强大的关键。

  • C-ares:提供了异步处理 DNS 相关的能力。

  • http_parser、OpenSSL、zlib 等:提供包括 http 解析、SSL、数据压缩等其他的能力。

单线程、异步

  • 单线程:所有任务需要排队,前一个任务结束,才会执行后一个任务。如果前一个任务耗时很长,后一个任务就不得不一直等着。Node 单线程指的是 Node 在执行程序代码时,主线程是单线程

  • 异步:主线程之外,还维护了一个"事件队列"(Event queue)。当用户的网络请求或者其它的异步操作到来时,Node 都会把它放到 Event Queue 之中,此时并不会立即执行它,代码也不会被阻塞,继续往下走,直到主线程代码执行完毕。

注:

  • JavaScript 是单线程的,Node 本身其实是多线程的,只是 I/O 线程使用的 CPU 比较少;还有个重要的观点是,除了用户的代码无法并行执行外,所有的 I/O (磁盘 I/O 和网络 I/O) 则是可以并行起来的。
  • libuv 线程池默认打开 4 个,最多打开 128 个 线程。

事件循环

Nodejs 所谓的单线程,只是主线程是单线程。

  • 主线程运行 V8 和 JavaScript
  • 多个子线程通过 事件循环 被调度

可以抽象为:主线程对应于老板,正在工作。一旦发现有任务可以分配给职员(子线程)来做,将会把任务分配给底下的职员来做。同时,老板继续做自己的工作,等到职员(子线程)把任务做完,就会通过事件把结果回调给老板。老板又不停重复处理职员(子线程)子任务的完成情况。

老板(主线程)给职员(子线程)分配任务,当职员(子线程)把任务做完之后,通过事件把结果回调给老板。老板(主线程)处理回调结果,执行相应的 JavaScript。

更具体的解释请看下图:

1、每个 Node.js 进程只有一个主线程在执行程序代码,形成一个执行栈(execution context stack)。

2、Node.js 在主线程里维护了一个"事件队列"(Event queue),当用户的网络请求或者其它的异步操作到来时,Node 都会把它放到 Event Queue之中,此时并不会立即执行它,代码也不会被阻塞,继续往下走,直到主线程代码执行完毕。

3、主线程代码执行完毕完成后,然后通过 Event Loop,也就是事件循环机制,检查队列中是否有要处理的事件,这时要分两种情况:如果是非 I/O 任务,就亲自处理,并通过回调函数返回到上层调用;如果是 I/O 任务,就从 线程池 中拿出一个线程来处理这个事件,并指定回调函数,当线程中的 I/O 任务完成以后,就执行指定的回调函数,并把这个完成的事件放到事件队列的尾部,线程归还给线程池,等待事件循环。当主线程再次循环到该事件时,就直接处理并返回给上层调用。 这个过程就叫 事件循环 (Event Loop)

4、期间,主线程不断的检查事件队列中是否有未执行的事件,直到事件队列中所有事件都执行完了,此后每当有新的事件加入到事件队列中,都会通知主线程按顺序取出交 Event Loop 处理。

优缺点

Nodejs 的优点:I/O 密集型处理是 Nodejs 的强项,因为 Nodejs 的 I/O 请求都是异步的(如:sql 查询请求、文件流操作操作请求、http 请求...)

Nodejs 的缺点:不擅长 cpu 密集型的操作(复杂的运算、图片的操作)

总结

1、Nodejs 与操作系统交互,我们在 JavaScript 中调用的方法,最终都会通过 process.binding 传递到 C/C++ 层面,最终由他们来执行真正的操作。Node.js 即这样与操作系统进行互动。

2、Nodejs 所谓的单线程,只是主线程是单线程,所有的网络请求或者异步任务都交给了内部的线程池去实现,本身只负责不断的往返调度,由事件循环不断驱动事件执行。

3、Nodejs 之所以单线程可以处理高并发的原因,得益于 libuv 层的事件循环机制,和底层线程池实现。

4、Event loop 就是主线程从主线程的事件队列里面不停循环的读取事件,驱动了所有的异步回调函数的执行,Event loop 总共 7 个阶段,每个阶段都有一个任务队列,当所有阶段被顺序执行一次后,event loop 完成了一个 tick。

参考文章:Nodejs探秘:深入理解单线程实现高并发原理

串联同步执行并发请求

就像上面说的:Node.js 在主线程里维护了一个"事件队列"(Event queue),当用户的网络请求或者其它的异步操作到来时,Node 都会把它放到 Event Queue之中,此时并不会立即执行它,代码也不会被阻塞,继续往下走,直到主线程代码执行完毕。

所以要串联同步执行并发请求的关键在于维护一个队列,队列的特点是 先进先出,按队列里面的顺序执行就可以达到串联同步执行并发请求的目的。

方案

  • 根据每个请求的 uniqueId 变量作为唯一令牌
  • 队列里面维护一个结果数组和一个执行队列,把执行队列完成的 令牌与结果 存储在结果数组里面
  • 根据唯一令牌,一直去获取执行完成的结果,间隔 200 毫秒,超时等待时间为 10 分钟
  • 一直等待并获取结果,等待到有结果时,才返回给请求;并根据令牌把结果数组里面相应的项删除

队列

代码:

class Recorder {
    private list: any[];

    private queueList: any[];

    private intervalTimer;

    constructor() {
        this.list = [];
        this.queueList = [];
        this.intervalTimer = null;
    }

    // 根据 id 获取任务结果
    public get(id: string) {
        let data;
        console.log('this.list: ', this.list);
        let index;
        for (let i = 0; i < this.list.length; i++) {
            const item = this.list[i];
            if (id === item.id) {
                data = item.data;
                index = i;
                break;
            }
        }
        // 删除获取到结果的项
        if (index !== undefined) {
            this.list.splice(index, 1);
        }
        return data;
    }

    public clear() {
        this.list = [];
        this.queueList = [];
    }

    // 添加项
    public async addQueue(item: any) {
        this.queueList.push(item);
    }

    public async runQueue() {
        clearInterval(this.intervalTimer);
        if (!this.queueList.length) {
            // console.log('队列执行完毕');
            return;
        }
        // 取出队列里面的最后一项
        const item = this.queueList.shift();
        console.log('item: ', item);
        // 执行队列的回调
        const data = await item.callback();
        console.log('回调执行完成: ', data);
        // 把结果放进 结果数组
        this.list.push({ id: item.id, data });
    }

    public interval() {
        clearInterval(this.intervalTimer);
        this.intervalTimer = setInterval(async () => {
            clearInterval(this.intervalTimer);
            // 一直执行里面的任务
            await this.runQueue();
            this.interval();
        }, 200);
    }
}

const recorder = new Recorder();
recorder.interval();

export default recorder;

服务

下面模拟一个请求端口的的 Node 服务。

代码:

const Koa = require('koa')
const Router = require('koa-router')
const cuid = require('cuid');
const bodyParser = require('koa-bodyparser')
import recorder from "./libs/recorder";

const MAX_WAITING_TIME = 60 * 5; // 最大等待时长
// web服务端口
const SERVER_PORT: number = 3000;
const app = new Koa();
app.use(bodyParser());
const router = new Router();


/**
 * 程序睡眠
 * @param time 毫秒
 */
const timeSleep = (time: number) => {
    return new Promise((resolve) => {
        setTimeout(() => {
            resolve("");
        }, time);
    });
};

/**
 * 程序睡眠
 * @param second 秒
 */
const sleep = (second: number) => {
    return timeSleep(second * 1000);
};

router.post("/getPort", async (ctx, next) => {
	const { num } = ctx.request.body;
	const uniqueId = cuid();
	console.log('uniqueId: ', uniqueId);
    recorder.addQueue({
		id: uniqueId,
		callback: getPortFun(num)
	});
    let waitTime = 0;
	while (!ctx.body) {
		await sleep(0.2);
		console.log('1');
		const data: any = recorder.get(uniqueId);
		if (data) {
			ctx.body = {
				code: 0,
				data: data,
				msg: 'success'
			};
		}
		waitTime++;
        // 超过最大时间就返回一个结果
		if (waitTime > MAX_WAITING_TIME) {
			ctx.body = {};
		}
	}
});

// 返回一个函数
function getPortFun(num) {
	return () => {
		return new Promise((resolve) => {
            // 模拟异步程序
			setTimeout(() => {
				console.log(`num${num}: `, num);
				resolve(num * num);
			}, num * 1000);
		});
	};
}

app.use(router.routes()).use(router.allowedMethods());

app.listen(SERVER_PORT);

最后

最近状态很差劲,上班工作,多人的时候还好,但是自己一个人的时候,心情常常不能平静,心好乱,有点心慌 😥。

心情不好时,啥都不想干,心态有点扭转不过来,集中不了注意力,所以最近想专心写篇原创技术文章都不行,想重构自己开源的 blog 项目也不行,很糟糕 😭

所以最近的原创技术文章有点难产了 😥

心态急需调整,周末想出去玩,放松一下自己,找回那个斗志满满的真我才行,唉。

@biaochenxuying biaochenxuying self-assigned this Mar 21, 2021
@caihaibin1991
Copy link

单线程局限性很大,如果有一个地方死循环,整个系统就会卡死。跳出单线程的框,走多线程~
image
参考下这边封装的多线程调用, 模块使用:tiny-worker, 代码不方便贴,你看能不能推出写法。

@lizyChy0329
Copy link

如果遇到 cpu 密集型的操作,那么就将他异步处理可行?如果你说会沾满单个 cpu 运算量,那么上 Cluster 是否OK?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants