| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 |
29x
2x
2x
1x
1x
1x
1x
1x
2x
2x
2x
2x
14x
3x
3x
3x
3x
3x
14x
14x
14x
14x
1x
2x
1x
1x
| // HOF: Higher order function
import Promise from 'bluebird';
import { MultiError } from 'verror';
export const sleep = async duration => new Promise(ok => setTimeout(ok, duration));
// executes a given async function, logs any errors that are thrown, then re-throws the exception
export async function execute(fn, ...args) {
try {
return await fn.apply(this, args);
} catch (err) {
console.error(err.stack || err);
for (let key in err) {
Eif (key !== 'stack') {
console.error(`${key} = ${err[key]}`);
}
}
throw err;
}
}
// [HOF]
//
// throttles the number of promises a promise generating function can execute. This is useful
// when you are firing off a large number of promises without waiting for them to be complete.
//
// the decorator maintains a count of the number of executions, and if the count hits a defined
// batch size, it blocks all subsequent promises until the current batch of promises complete.
export function throttled(promiseGeneratingFn, batchSize = 100) {
let count = 0;
let waitChain = Promise.resolve();
let promiseQueue = [];
return (...args) => {
if (count % batchSize === 0) {
// replaces the current wait chain with a Promise
// that waits for the current set of Promises to complete
// first before allowing any more promises to execute
//
// awkward, because of the mutable aspect of promiseQueue
// and count, we use an immediately executing function
// block where they do not mutate.
waitChain = (
(queue, count) =>
waitChain.then(() => {
console.log(`Completed ${count}`);
return Promise.all(queue)
})
)(promiseQueue, count);
promiseQueue = [];
}
// we always return the tail.
const tail = waitChain.then(() => promiseGeneratingFn(...args));
promiseQueue.push(tail);
count += 1;
return tail;
}
}
// [HOF]
//
// decorates a promise generating function such that if it takes too long to
// complete, it will time out.
//
// WARNING: Do not mix with the throttled function above, because promises
// will take arbitrarily longer when throttled. At the very least, put the
// timed before throttling.
export const timed = (promiseFn, timeout, task = 'promise resolution') =>
() => Promise.resolve(promiseFn()).timeout(timeout, `${task} timed out`);
// [HOF]
//
// forces a promise-generating function to retry if it falis, until a predetermiend
// number of attempts, after which the error is simply thrown.
export const retry = (promiseFn, attempts = 1, task = 'promise resolution', errors = []) =>
() => Promise.resolve(promiseFn()).catch(err => {
if (attempts <= 0) {
throw new MultiError(errors);
} else {
errors.push(err);
return retry(promiseFn, attempts - 1, task, errors);
}
});
// [HOF]
//
// Initially takes a speecified retries and timeout
// the resulting higher order function will take in a promise generating function and name.
// the function is _immediately_ executed as a timed promise that will be retried before
// failing.
export const retryWithTimeout = ({ RETRIES, TIMEOUT }) => async (promiseFn, name) => {
return await retry(timed(promiseFn, TIMEOUT, name), RETRIES, name)();
};
// [HOF]
//
// for a given Promise-generating function, track each execution by the stringified
// arguments. if the function is called again with the same arguments, then instead
// of generating a new promise, an existing in-flight promise is used instead. This
// prevents unnecessary repetition of async function calls while the same function
// is still in flight.
export function reuseInFlight(promiseFn, createKey = (...args) => JSON.stringify(args)) {
const inflight = {};
return function debounced(...args) {
const key = createKey(...args);
if (!inflight.hasOwnProperty(key)) {
inflight[key] = promiseFn.apply(this, args).then(results => {
// self invalidate
delete inflight[key];
return results;
}, err => {
// still self-invalidate, then rethrow
delete inflight[key];
throw err;
});
}
return inflight[key];
};
} |