import test from 'ava'; import { Callback } from '../src/Types'; import { JoinTransformOut, InnerJoinTransform, JoinTransform, LeftOrRight, MemoryJoinStorage, LeftOrRightDecider } from '../src/join'; import { Writable, ArrayReadable } from 'streamdash'; interface Thing { name: string; type: string; } class Outer extends Writable { private out: T[] = []; _write(thing: T, encoding, cb) { this.out.push(thing); cb(); } get() { return this.out; } } function aReduce(f: (acc: R, xs: X, cb: Callback) => void, acc: R, xs: X[], next: Callback): void { let myXs: X[] = xs.concat([]); let initiator = () => { if (myXs.length == 0) { return next(null, acc); } f(acc, myXs.shift(), (err, newAcc) => { if (err) { return next(err); } acc = newAcc; initiator(); }); }; initiator(); } test.cb('Can find things in storage', function(tst) { let worker = (acc: MemoryJoinStorage, x: Thing, next) => { acc.add(x, (e) => { next(null, acc); }); }; let items = [ { name: "A", type: "Letter" }, { name: "B", type: "Letter" }, { name: "C", type: "Letter" } ]; let initial = new MemoryJoinStorage(); aReduce(worker, initial, items, (e, storage: MemoryJoinStorage) => { storage.find({name: "B"}, (e, vs) => { tst.deepEqual(vs, [{ name: "B", type: "Letter" }]); tst.end(); }); }); }); const CUSTOMER = "customer"; interface Item { customerId: string; type: string; item: string; } interface Customer { id: string; type: string; name: string; } function getJoin() { let input: (Customer|Item)[] = [ { customerId: "A", type: "order", item: "Shoe" }, { type: "customer", id: "A", name: "Jack" }, { customerId: "B", type: "order", item: "Ruler" }, { type: "customer", id: "B", name: "Jane" }, { customerId: "A", type: "order", item: "Car" }, ]; let src: ArrayReadable = new ArrayReadable(input); let isLeft: LeftOrRightDecider = (t) => { if (t.type == CUSTOMER) { return LeftOrRight.Left; } return LeftOrRight.Right; }; let qb = (t: Customer|Item) => { if (t.type == CUSTOMER) { return { lQry: { id: (t).id }, rQry: { customerId: (t).id } }; } return { lQry: { id: (t).customerId }, rQry: { customerId: (t).customerId } }; }; let jt = new JoinTransform(isLeft, qb, qb, {objectMode: true}); src.pipe(jt); return jt; } test.cb('Can join', function(tst) { let jt = getJoin(); let dst = new Outer>({objectMode: true}); let expected = [ { l: [], r: [{ customerId: "A", type: "order", item: "Shoe" }] }, { l: [{ type: "customer", id: "A", name: "Jack" }], r: [{ customerId: "A", type: "order", item: "Shoe" }] }, { l: [], r: [{ customerId: "B", type: "order", item: "Ruler" }] }, { l: [{ type: "customer", id: "B", name: "Jane" }], r: [{ customerId: "B", type: "order", item: "Ruler" }] }, { l: [{ type: "customer", id: "A", name: "Jack" }], r: [ { customerId: "A", type: "order", item: "Shoe" }, { customerId: "A", type: "order", item: "Car" } ] } ]; jt.pipe(dst); dst.on('finish', () => { tst.deepEqual(dst.get(), expected); tst.end(); }); }); interface Order { customerId: string; name: string; item: string; } test.cb('Can get as inner join', function(tst) { let jt = getJoin(); let expected = [ { customerId: "A", name: "Jack", item: "Shoe" }, { customerId: "B", name: "Jane", item: "Ruler" }, { customerId: "A", name: "Jack", item: "Car" } ]; let makeOrder = new InnerJoinTransform( (l: Customer, r: Item): Order => { return { customerId: l.id, name: l.name, item: r.item }; }, (o: Order) => `${o.customerId}:${o.item}`, {objectMode: true} ); let dst = new Outer({objectMode: true}); jt.pipe(makeOrder).pipe(dst); dst.on('finish', () => { tst.deepEqual(dst.get(), expected); tst.end(); }); });