import {createOrmConfig} from '@subsquid/typeorm-config' import {assertNotNull, last, maybeLast} from '@subsquid/util-internal' import assert from 'assert' import {DataSource, EntityManager, type DataSourceOptions} from 'typeorm' import {ChangeTracker, rollbackBlock} from './hot' import {DatabaseState, FinalTxInfo, HashAndHeight, HotTxInfo} from './interfaces' import {Store} from './store' import process from 'process' export type IsolationLevel = 'SERIALIZABLE' | 'READ COMMITTED' | 'REPEATABLE READ' export type RollbackHook = (block: number) => Promise export interface TypeormDatabaseOptions { supportHotBlocks?: boolean isolationLevel?: IsolationLevel stateSchema?: string projectDir?: string customTypeOrmOptions?: DataSourceOptions rollbackHook?: RollbackHook } export class TypeormDatabase { private statusSchema: string private isolationLevel: IsolationLevel private con?: DataSource private projectDir: string private customTypeOrmOptions?: DataSourceOptions public readonly supportsHotBlocks: boolean private rollbackHook?: RollbackHook constructor(options?: TypeormDatabaseOptions) { this.statusSchema = options?.stateSchema || 'squid_processor' this.isolationLevel = options?.isolationLevel || 'SERIALIZABLE' this.supportsHotBlocks = options?.supportHotBlocks !== false this.projectDir = options?.projectDir || process.cwd() this.customTypeOrmOptions = options?.customTypeOrmOptions this.rollbackHook = options?.rollbackHook } async connect(): Promise { assert(this.con == null, 'already connected') let cfg = this.customTypeOrmOptions ? this.customTypeOrmOptions : createOrmConfig({projectDir: this.projectDir}) this.con = new DataSource(cfg) await this.con.initialize() try { return await this.con.transaction('SERIALIZABLE', em => this.initTransaction(em)) } catch(e: any) { await this.con.destroy().catch(() => {}) // ignore error this.con = undefined throw e } } async disconnect(): Promise { await this.con?.destroy().finally(() => this.con = undefined) } private async initTransaction(em: EntityManager): Promise { let schema = this.escapedSchema() await em.query( `CREATE SCHEMA IF NOT EXISTS ${schema}` ) await em.query( `CREATE TABLE IF NOT EXISTS ${schema}.status (` + `id int4 primary key, ` + `height int4 not null, ` + `hash text DEFAULT '0x', ` + `nonce int4 DEFAULT 0`+ `)` ) await em.query( // for databases created by prev version of typeorm store `ALTER TABLE ${schema}.status ADD COLUMN IF NOT EXISTS hash text DEFAULT '0x'` ) await em.query( // for databases created by prev version of typeorm store `ALTER TABLE ${schema}.status ADD COLUMN IF NOT EXISTS nonce int DEFAULT 0` ) await em.query( `CREATE TABLE IF NOT EXISTS ${schema}.hot_block (height int4 primary key, hash text not null)` ) await em.query( `CREATE TABLE IF NOT EXISTS ${schema}.hot_change_log (` + `block_height int4 not null references ${schema}.hot_block on delete cascade, ` + `index int4 not null, ` + `change jsonb not null, ` + `PRIMARY KEY (block_height, index)` + `)` ) let status: (HashAndHeight & {nonce: number})[] = await em.query( `SELECT height, hash, nonce FROM ${schema}.status WHERE id = 0` ) if (status.length == 0) { await em.query(`INSERT INTO ${schema}.status (id, height, hash) VALUES (0, -1, '0x')`) status.push({height: -1, hash: '0x', nonce: 0}) } let top: HashAndHeight[] = await em.query( `SELECT height, hash FROM ${schema}.hot_block ORDER BY height` ) return assertStateInvariants({...status[0], top}) } private async getState(em: EntityManager): Promise { let schema = this.escapedSchema() let status: (HashAndHeight & {nonce: number})[] = await em.query( `SELECT height, hash, nonce FROM ${schema}.status WHERE id = 0` ) assert(status.length == 1) let top: HashAndHeight[] = await em.query( `SELECT hash, height FROM ${schema}.hot_block ORDER BY height` ) return assertStateInvariants({...status[0], top}) } transact(info: FinalTxInfo, cb: (store: Store) => Promise): Promise { return this.submit(async em => { let state = await this.getState(em) let {prevHead: prev, nextHead: next} = info assert(state.hash === info.prevHead.hash, RACE_MSG) assert(state.height === prev.height) assert(prev.height < next.height) assert(prev.hash != next.hash) for (let i = state.top.length - 1; i >= 0; i--) { let block = state.top[i] await rollbackBlock(this.statusSchema, em, block.height, this.rollbackHook) } await this.performUpdates(cb, em) await this.updateStatus(em, state.nonce, next) }) } transactHot(info: HotTxInfo, cb: (store: Store, block: HashAndHeight) => Promise): Promise { return this.transactHot2(info, async (store, sliceBeg, sliceEnd) => { for (let i = sliceBeg; i < sliceEnd; i++) { await cb(store, info.newBlocks[i]) } }) } transactHot2(info: HotTxInfo, cb: (store: Store, sliceBeg: number, sliceEnd: number) => Promise): Promise { return this.submit(async em => { let state = await this.getState(em) let chain = [state, ...state.top] assertChainContinuity(info.baseHead, info.newBlocks) assert(info.finalizedHead.height <= (maybeLast(info.newBlocks) ?? info.baseHead).height) assert(chain.find(b => b.hash === info.baseHead.hash), RACE_MSG) if (info.newBlocks.length == 0) { assert(last(chain).hash === info.baseHead.hash, RACE_MSG) } assert(chain[0].height <= info.finalizedHead.height, RACE_MSG) let rollbackPos = info.baseHead.height + 1 - chain[0].height for (let i = chain.length - 1; i >= rollbackPos; i--) { await rollbackBlock(this.statusSchema, em, chain[i].height, this.rollbackHook) } if (info.newBlocks.length) { let finalizedEnd = info.finalizedHead.height - info.newBlocks[0].height + 1 if (finalizedEnd > 0) { await this.performUpdates(store => cb(store, 0, finalizedEnd), em) } else { finalizedEnd = 0 } for (let i = finalizedEnd; i < info.newBlocks.length; i++) { let b = info.newBlocks[i] await this.insertHotBlock(em, b) await this.performUpdates( store => cb(store, i, i + 1), em, new ChangeTracker(em, this.statusSchema, b.height) ) } } chain = chain.slice(0, rollbackPos).concat(info.newBlocks) let finalizedHeadPos = info.finalizedHead.height - chain[0].height assert(chain[finalizedHeadPos].hash === info.finalizedHead.hash) await this.deleteHotBlocks(em, info.finalizedHead.height) await this.updateStatus(em, state.nonce, info.finalizedHead) }) } private deleteHotBlocks(em: EntityManager, finalizedHeight: number): Promise { return em.query( `DELETE FROM ${this.escapedSchema()}.hot_block WHERE height <= $1`, [finalizedHeight] ) } private insertHotBlock(em: EntityManager, block: HashAndHeight): Promise { return em.query( `INSERT INTO ${this.escapedSchema()}.hot_block (height, hash) VALUES ($1, $2)`, [block.height, block.hash] ) } private async updateStatus(em: EntityManager, nonce: number, next: HashAndHeight): Promise { let schema = this.escapedSchema() let result: [data: any[], rowsChanged: number] = await em.query( `UPDATE ${schema}.status SET height = $1, hash = $2, nonce = nonce + 1 WHERE id = 0 AND nonce = $3`, [next.height, next.hash, nonce] ) let rowsChanged = result[1] // Will never happen if isolation level is SERIALIZABLE or REPEATABLE_READ, // but occasionally people use multiprocessor setups and READ_COMMITTED. assert.strictEqual( rowsChanged, 1, RACE_MSG ) } private async performUpdates( cb: (store: Store) => Promise, em: EntityManager, changeTracker?: ChangeTracker ): Promise { let running = true let store = new Store( () => { assert(running, `too late to perform db updates, make sure you haven't forgot to await on db query`) return em }, changeTracker ) try { await cb(store) } finally { running = false } } private async submit(tx: (em: EntityManager) => Promise): Promise { let retries = 3 while (true) { try { let con = this.con assert(con != null, 'not connected') return await con.transaction(this.isolationLevel, tx) } catch(e: any) { if (e.code == '40001' && retries) { retries -= 1 } else { throw e } } } } private escapedSchema(): string { let con = assertNotNull(this.con) return con.driver.escape(this.statusSchema) } } const RACE_MSG = 'status table was updated by foreign process, make sure no other processor is running' function assertStateInvariants(state: DatabaseState): DatabaseState { let height = state.height // Sanity check. Who knows what driver will return? assert(Number.isSafeInteger(height)) assertChainContinuity(state, state.top) return state } function assertChainContinuity(base: HashAndHeight, chain: HashAndHeight[]) { let prev = base for (let b of chain) { assert(b.height === prev.height + 1, 'blocks must form a continues chain') prev = b } }