从零搭建 Node Faas(五)数据库能力

这篇文章将梳理一下 Faas 在数据库能力上的一些实践。

一、背景

1. 设想

可以站在用户的角度设想一下,在使用数据库的时候,希望的使用体验是怎么样的。那当然是希望能够开箱即用。

一般公司级的数据库创建一般都是要走工单审批的,这个流程非常的繁琐,而且管控可能也会比较严格。因此一个 Faas app 应用对应一张表的设计是更加合理的。

2. 非关系型数据库?

那这样就带来一个问题,Faas 的一张表需要存储数据库的多张表的数据。很容易想到一个方案,比如用非关系型数据库 MongoDB 来存储,这样问题好像就迎刃而解了。

但是如果这么简单能实现的话,那就不需要写这篇文章了。实际上,我们的数据库是关系型数据库,原因是因为基建不支持使用非关系型数据库,因此需要在关系型数据库中实现这个功能。

3. 关系型数据库!

再进一步想一个替代方案,就是将 Mysql 的一个字段设计成一个 JSON 字段,这就就可以近似实现非关系型数据库的功能。但是这样会有一个问题,就是需要提供给用户一个更加友好的接口,让用户能更加方便的操作这个 JSON 字段。

这篇文章就是要解决这个问题。

二、技术设计

1. SDK 设计

设计的 SDK 如下,比如下面的代码包含了插入,查询的操作。

1
2
3
4
5
6
7
8
9
10
11
interface Model {
name: string
age: number
}

export async function main() {
const table = sdk.db.table<Model>("test")
await table.insert({ name: 'John', age: 18 })
const data = await table.select('*').where({ name: 'John' }).first()
return data
}

2. 具体实现

这里选用 Knex.js 来实现数据库的封装,想让用户简便的操作,就需要给每个方法都封装一层。

2.1 Select

比如这里用 Knex.js 的 jsonExtract 方法来包一层,实现 select 方法。参见文档:https://knexjs.org/guide/query-builder.html#jsonextract

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
select(...args: (keyof TableType<T> & string)[]): this
select(...args: string[]): this
select(...args: string[]) {
this.hasCallSelect = true
for (const arg of args) {
if (fixedColumns.includes(arg as any)) {
this.query.select(arg)
} else if (arg === '*') {
this.query.jsonExtract(JSON_COLUMN_NAME, `$`, '__extract_*_from_json')
for (const column of fixedColumns) {
this.query.select(column)
}
} else {
this.query.jsonExtract(JSON_COLUMN_NAME, `$.${arg}`, arg)
}
}
return this
}

用户就可以如下使用:

1
await table.select('name', 'age').first()

2.2 Where

这里设计了一个 where 方法,支持多种参数的传递。参见文档:https://knexjs.org/guide/query-builder.html#wherejsonpath

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
where(
data:
| Partial<{ [K in keyof TableType<T>]: TableType<T>[K] | WhereOperator }>
| { [key: string]: any },
): this
where(column: keyof TableType<T> & string, value: any): this
where(column: keyof TableType<T> & string, operator: string, value: any): this
where(...args: any[]) {
if (typeof args[0] !== 'string') {
for (const [key, value] of Object.entries(args[0])) {
if (!isWhereOperator(value)) {
this.where(key as any, value)
continue
}
this.where(key as any, value.operator, value.value)
}
} else if (args.length === 2) {
this.where(args[0] as any, '=', args[1])
} else {
if (fixedColumns.includes(args[0] as any)) {
this.query.where(args[0], args[1], args[2])
} else {
this.query.whereJsonPath(
JSON_COLUMN_NAME,
`$.${args[0]}`,
args[1],
args[2],
)
}
}
return this
}

比如用户可以如下使用:

1
2
const data2 = await table.where('name', 'John').first()
const data3 = await table.where('age', '<', 18)

2.3 Insert

1
2
3
4
5
6
7
8
9
async insert(_data: T | T[]) {
const data = Array.isArray(_data) ? _data : [_data]
const result = await this.query.insert(
data.map((x) => ({
[JSON_COLUMN_NAME]: JSON.stringify(x),
})) as any,
)
return result
}

使用方式:

1
await table.insert({ name: 'John', age: 18 })

2.4 Update

这里会用 JSON_MERGE_PATCH 来将非固定列的键值以 JSON 格式合并到一个 JSON 列中。参见文档:https://dev.mysql.com/doc/refman/8.4/en/json-modification-functions.html#function_json-merge-patch

最终使用原始 SQL 更新查询。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
async update(data: Partial<T> | { [key: string]: any }): Promise<void> {
let parts: string[] = []
let values: any[] = []

if (!Object.keys(data).length) {
return
}

const rest: any = {}
for (const key in data) {
if (fixedColumns.includes(key as any)) {
parts.push(`?? = ?`)
values.push(key, (data as any)[key])
} else {
rest[key] = (data as any)[key]
}
}
if (Object.keys(rest).length) {
parts.push(`?? = JSON_MERGE_PATCH(??, ?)`)
values.push(JSON_COLUMN_NAME, JSON_COLUMN_NAME, JSON.stringify(rest))
}

const sql = this.query
.update({ __hack_replace_set_clause__: 0 } as any)
.toString()
await this.client.raw(
sql.replace('`__hack_replace_set_clause__` = 0', parts.join(',')),
values,
)
}

使用方式:

1
await table.where({ id: 1 }).update({ name: 'John' })

2.5 Delete

Delete 这里就没有必要再封装了,把前面查出来的数据,直接调用 delete 即可。

1
2
3
async delete(): Promise<number> {
return await this.query.delete()
}

使用方法:

1
await table.where({ id: 1 }).delete()

2.6 其他

还有很多方法都要封装,这里就不一一列举了。只要熟悉了操作 Json 格式的数据库,后面都是大同小异了。这里列举一下要封装的清单:

  • insert
  • first
  • all
  • select
  • delete
  • update
  • where / andWhere
  • orWhere
  • whereIn
  • whereNull
  • whereExists
  • whereBetween
  • whereLike
  • count
  • limit
  • offset
  • orderBy
  • sum
  • max
  • min
  • avg
  • groupBy

三、在线执行

Faas 平台希望能方便简化用户的操作,所以也提供了在线执行 SQL 的功能,来方便用户快速验证,或者快速查询一些数据,而无需编写代码。

这里主要介绍一下 Server 部分执行 SQL 的逻辑。核心就是:用 Babel 将用户输入的内容解析成 AST。

1. 原因

因为想尽可能保证用户输入的灵活性与方便性,所以没有使用多个 Form 字段限制输入。由于用户输入的不确定性,服务端在执行语句的时候需要一些限制,所以需要尽可能的避免 eval() 的使用。所以最终设计类似如下的执行方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
switch (name) {
case 'insert':
result = await result.insert(...args)
break
case 'first':
result = await result.first()
break
case 'all':
result = await result.all()
break
case 'select':
result = result.select(...args)
break
case 'delete':
result = await result.delete(...args)
break
// ...
}

会将解析出来的 token 然后依次执行对应的方法。

2. 解析语法树

1
table.select("*").where({ name: 'zhd' }).all()

比如上面的语句会被解析成如下结构:

其中函数执行的 Node 节点都为 CallExpression 的 MemberExpression 中。参数则是可以按照 Faas DB 使用文档中的 Object、Array、其他这三种来区分即可。分别关注一下 ObjectExpression 和 ArrayExpression 做特殊处理即可。

按照上面的语句遍历的顺序是 all()、where()、select() 顺序,所以存到栈中反过来取出即可。

四、总结

数据库设计就总结到这里了,也不得不说,一些轮子的诞生都是因为落实到每个场景都会有不一样的需求。这里的数据库设计也是如此,因为我们的基建不支持非关系型数据库,所以需要在关系型数据库中实现这个功能。