Kotlin 随手记(一): InfluxDB Client 查询封装思路

InfluxDB

1. 关于 InfluxDB-JAVA

InfluxDB 是 InfluxData 公司的一款开源时间序列数据库,多用于储存日志、持续事件、监控数据之类时间相关的序列化数据。并提供了多种语言的接入方式,并且官方提供了 Java client: inflxudb-java,Kotlin 项目可以直接引入依赖并且使用。


2. 接入方式

InfluxDB 使用 RESTful 接口来查询与写入,官方 Client 便在 OKhttp 类库上使用了一层非常简单的封装结构来和数据库交互。

在项目中接入只需在模块 pom 引入依赖即可:

<dependency>  
  <groupId>org.influxdb</groupId>
  <artifactId>influxdb-java</artifactId>
  <version>2.5</version>
</dependency>  

接下来便可以初始化连接并操作数据库了:

InfluxDB influxDB = InfluxDBFactory.connect("http://172.17.0.2:8086", "root", "root");

Query query = new Query("SELECT idle FROM cpu", dbName);  
influxDB.query(query);  

在使用过程中,渐渐发现有几个显而易见的问题不能满足一个生产项目的需求:

  1. 在代码中直接用 String 构建一个查询,对复杂条件查询,即动态查询语句的支持将会非常痛苦,并且安全性、防注入不佳。

  2. 返回数据结构不能直接使用,需要每个调用方法自己处理。

  3. 由于内部项目要求多个数据中心,原生的 Client 不能满足连接多库的需求。


3. 封装思路

3.1 查询帮助类

借鉴了 Elixir 的 Ecto 数据库思路后,决定使用一个帮助类来生成查询语句:

val query = BaseQuery.select("field-not-safe!", "tag")  
    .from("table-name-not-s@fe!")
    .where(eq("name",dirtyValue))
    .group("tag")
    .group(time(12, UNIT.min))
    .order("time", DESC)
    .limit(10)

selectfromeq 等方法中,允许直接接受一个 Dirty value,并透明地 Escape,在生成 sql 时,把它们都用引号包裹起来。

同时在设计时便考虑到让 where(condition: String?) 方法变成 Null-Safe ,以支持复杂查询条件,在有多个查询条件时可以放心大胆地传入 condition,如果某个条件为空, where() 方法将自动忽略它 —— 这边意味着你可以如下例构建一个动态查询:

val name: String? = "James"  
val age: Int? = null  
val extra: Map<String, Any?> = mapOf("no" to 123)

// null 或者空字符串会被忽略
val query = BaseQuery.select().from("employers")  
    .where(eq("name", name))
    .or(eq("age", age))
    .and(eq(extra))

// 结果:select * from "employers" where "name"='james' and "no"=123
query.getSql()  

除了 from(measurement: String!),其他方法也被设计成 Null-Safe,这给开发过程省去了大量重复的控制逻辑,并且让动态构建过程更加流畅。

BaseQuery.Functions 更是提供了诸如 first()count()mean()fill()eq()lt()lte()gt()gte() 等等 InfluxDB 的内建方法帮助生成器。

3.2 Client 拦截器思路

拦截器的思路大同小异,都是为了链式处理、自定义配置。

基于保持简单的原则,实际应用中,我们一般只需要在查询前拦截查询语句,查询后拦截查询结果,一个帮助类即可解决:

data class InfluxDbInterceptorChain(  
        val queryInterceptors: List<IXQueryInterceptor> = emptyList(),
        val respInterceptors: List<IXRespInterceptor> = emptyList()) {
    fun interceptQuery(baseQuery: List<BaseQuery>, db: String): Query
    fun interceptResult(results: MutableList<Result>): List<Result>
}

/**
 * 查询前拦截 sql
 */
open class IXQueryInterceptor {  
    open fun handle(queries: List<BaseQuery>, db: String): Boolean = true

    open fun handleEach(baseQuery : BaseQuery, db: String) :Boolean = true
}

/**
 * 查询后拦截 result
 */
open class IXRespInterceptor {  
    open fun handle(results: List<Result>): Boolean = true
}

在每次查询数据库前 client 都会执行 InterceptorChain 预处理 SQL,此时可以检查注入、加入默认查询条件、鉴权等等等等。

比如注册一个为每个查询都加入一个 appKey 的拦截器:

@Configuration
class InfluxDBClientConfiguration {  
        @Bean
        open fun influxdbInterceptorChain(): InfluxDbInterceptorChain {
            return InfluxDbInterceptorChain(
                    queryInterceptors(),
                    emptyList())
        }

        private fun queryInterceptors(): List<IXQueryInterceptor> {
            val appKeyInterceptor = object : IXQueryInterceptor() {
                override fun handleEach(baseQuery: BaseQuery, db: String): Boolean {
                    baseQuery.and("appKey='${appKey}'")
                    return true
                }
            }
            return listOf(terminusKeyInterceptor)
        }
}

同样,你也可以注册若干查询结果拦截器,用来格式化数据结构,过滤敏感数据等等,这里不再展开。

3.3 Client 多库多中心思路

因为 InfluxDB 数据库本身为基于 HTTP 的 RESTful 数据库,创建一个新的 client 并不会打开一个 TCP 连接,只有创建对象的开销,因此只需要让客户端支持切换数据中心、数据库即可,后续再加如对象缓存进一步降低创建对象的开销。

因此切换时只需:

val hangZhouClient = client.connect(Conn(host = xxx), Config(db = "metrics"))  
val beiJingClient = client.connect(Conn(host = yyy), Config(db = "metrics"))

val appMetricsClient = client.use(db = "app")  
val webMetricsClient = client.use(db = "web")  

To be continue...

张成栋

Read more posts by this author.

中国浙江省杭州市

Subscribe to The Terminus Blog

Get the latest posts delivered right to your inbox.

or subscribe via RSS with Feedly!