从零搭建 Node Faas(五)数据库能力
这篇文章将梳理一下 Faas 在数据库能力上的一些实践。
一、背景
1. 设想
可以站在用户的角度设想一下,在使用数据库的时候,希望的使用体验是怎么样的。那当然是希望能够开箱即用。
一般公司级的数据库创建一般都是要走工单审批的,这个流程非常的繁琐,而且管控可能也会比较严格。因此一个 Faas app 应用对应一张表的设计是更加合理的。
2. 非关系型数据库?
那这样就带来一个问题,Faas 的一张表需要存储数据库的多张表的数据。很容易想到一个方案,比如用非关系型数据库 MongoDB 来存储,这样问题好像就迎刃而解了。
但是如果这么简单能实现的话,那就不需要写这篇文章了。实际上,我们的数据库是关系型数据库,原因是因为基建不支持使用非关系型数据库,因此需要在关系型数据库中实现这个功能。
3. 关系型数据库!
再进一步想一个替代方案,就是将 Mysql 的一个字段设计成一个 JSON 字段,这就就可以近似实现非关系型数据库的功能。但是这样会有一个问题,就是需要提供给用户一个更加友好的接口,让用户能更加方便的操作这个 JSON 字段。
这篇文章就是要解决这个问题。
二、技术设计
1. SDK 设计
设计的 SDK 如下,比如下面的代码包含了插入,查询的操作。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 
 | interface Model {name: string
 age: number
 }
 
 export async function main() {
 const table = sdk.db.table<Model>("test")
 await table.insert({ name: 'John', age: 18 })
 const data = await table.select('*').where({ name: 'John' }).first()
 return data
 }
 
 | 
2. 具体实现
这里选用 Knex.js 来实现数据库的封装,想让用户简便的操作,就需要给每个方法都封装一层。
2.1 Select
比如这里用 Knex.js 的 jsonExtract 方法来包一层,实现 select 方法。参见文档:https://knexjs.org/guide/query-builder.html#jsonextract
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 
 | select(...args: (keyof TableType<T> & string)[]): thisselect(...args: string[]): this
 select(...args: string[]) {
 this.hasCallSelect = true
 for (const arg of args) {
 if (fixedColumns.includes(arg as any)) {
 this.query.select(arg)
 } else if (arg === '*') {
 this.query.jsonExtract(JSON_COLUMN_NAME, `$`, '__extract_*_from_json')
 for (const column of fixedColumns) {
 this.query.select(column)
 }
 } else {
 this.query.jsonExtract(JSON_COLUMN_NAME, `$.${arg}`, arg)
 }
 }
 return this
 }
 
 | 
用户就可以如下使用:
| 1
 | await table.select('name', 'age').first()
 | 
2.2 Where
这里设计了一个 where 方法,支持多种参数的传递。参见文档:https://knexjs.org/guide/query-builder.html#wherejsonpath
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 
 | where(data:
 | Partial<{ [K in keyof TableType<T>]: TableType<T>[K] | WhereOperator }>
 | { [key: string]: any },
 ): this
 where(column: keyof TableType<T> & string, value: any): this
 where(column: keyof TableType<T> & string, operator: string, value: any): this
 where(...args: any[]) {
 if (typeof args[0] !== 'string') {
 for (const [key, value] of Object.entries(args[0])) {
 if (!isWhereOperator(value)) {
 this.where(key as any, value)
 continue
 }
 this.where(key as any, value.operator, value.value)
 }
 } else if (args.length === 2) {
 this.where(args[0] as any, '=', args[1])
 } else {
 if (fixedColumns.includes(args[0] as any)) {
 this.query.where(args[0], args[1], args[2])
 } else {
 this.query.whereJsonPath(
 JSON_COLUMN_NAME,
 `$.${args[0]}`,
 args[1],
 args[2],
 )
 }
 }
 return this
 }
 
 | 
比如用户可以如下使用:
| 12
 
 | const data2 = await table.where('name', 'John').first()const data3 = await table.where('age', '<', 18)
 
 | 
2.3 Insert
| 12
 3
 4
 5
 6
 7
 8
 9
 
 | async insert(_data: T | T[]) {const data = Array.isArray(_data) ? _data : [_data]
 const result = await this.query.insert(
 data.map((x) => ({
 [JSON_COLUMN_NAME]: JSON.stringify(x),
 })) as any,
 )
 return result
 }
 
 | 
使用方式:
| 1
 | await table.insert({ name: 'John', age: 18 })
 | 
2.4 Update
这里会用 JSON_MERGE_PATCH 来将非固定列的键值以 JSON 格式合并到一个 JSON 列中。参见文档:https://dev.mysql.com/doc/refman/8.4/en/json-modification-functions.html#function_json-merge-patch
最终使用原始 SQL 更新查询。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 
 | async update(data: Partial<T> | { [key: string]: any }): Promise<void> {let parts: string[] = []
 let values: any[] = []
 
 if (!Object.keys(data).length) {
 return
 }
 
 const rest: any = {}
 for (const key in data) {
 if (fixedColumns.includes(key as any)) {
 parts.push(`?? = ?`)
 values.push(key, (data as any)[key])
 } else {
 rest[key] = (data as any)[key]
 }
 }
 if (Object.keys(rest).length) {
 parts.push(`?? = JSON_MERGE_PATCH(??, ?)`)
 values.push(JSON_COLUMN_NAME, JSON_COLUMN_NAME, JSON.stringify(rest))
 }
 
 const sql = this.query
 .update({ __hack_replace_set_clause__: 0 } as any)
 .toString()
 await this.client.raw(
 sql.replace('`__hack_replace_set_clause__` = 0', parts.join(',')),
 values,
 )
 }
 
 | 
使用方式:
| 1
 | await table.where({ id: 1 }).update({ name: 'John' })
 | 
2.5 Delete
Delete 这里就没有必要再封装了,把前面查出来的数据,直接调用 delete 即可。
| 12
 3
 
 | async delete(): Promise<number> {return await this.query.delete()
 }
 
 | 
使用方法:
| 1
 | await table.where({ id: 1 }).delete()
 | 
2.6 其他
还有很多方法都要封装,这里就不一一列举了。只要熟悉了操作 Json 格式的数据库,后面都是大同小异了。这里列举一下要封装的清单:
- insert
- first
- all
- select
- delete
- update
- where / andWhere
- orWhere
- whereIn
- whereNull
- whereExists
- whereBetween
- whereLike
- count
- limit
- offset
- orderBy
- sum
- max
- min
- avg
- groupBy
三、在线执行
Faas 平台希望能方便简化用户的操作,所以也提供了在线执行 SQL 的功能,来方便用户快速验证,或者快速查询一些数据,而无需编写代码。
这里主要介绍一下 Server 部分执行 SQL 的逻辑。核心就是:用 Babel 将用户输入的内容解析成 AST。
1. 原因
因为想尽可能保证用户输入的灵活性与方便性,所以没有使用多个 Form 字段限制输入。由于用户输入的不确定性,服务端在执行语句的时候需要一些限制,所以需要尽可能的避免 eval() 的使用。所以最终设计类似如下的执行方式:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 
 | switch (name) {case 'insert':
 result = await result.insert(...args)
 break
 case 'first':
 result = await result.first()
 break
 case 'all':
 result = await result.all()
 break
 case 'select':
 result = result.select(...args)
 break
 case 'delete':
 result = await result.delete(...args)
 break
 
 }
 
 | 
会将解析出来的 token 然后依次执行对应的方法。
2. 解析语法树
| 1
 | table.select("*").where({ name: 'zhd' }).all()
 | 
比如上面的语句会被解析成如下结构:

其中函数执行的 Node 节点都为 CallExpression 的 MemberExpression 中。参数则是可以按照 Faas DB 使用文档中的 Object、Array、其他这三种来区分即可。分别关注一下 ObjectExpression 和 ArrayExpression 做特殊处理即可。
按照上面的语句遍历的顺序是 all()、where()、select() 顺序,所以存到栈中反过来取出即可。
四、总结
数据库设计就总结到这里了,也不得不说,一些轮子的诞生都是因为落实到每个场景都会有不一样的需求。这里的数据库设计也是如此,因为我们的基建不支持非关系型数据库,所以需要在关系型数据库中实现这个功能。