{"version":3,"sources":["../src/events/pubsub.ts","../src/events/event-emitter/batch-policy.ts","../src/events/event-emitter/ack-handle-buffer.ts","../src/events/event-emitter/index.ts"],"names":["EventEmitter"],"mappings":";;;;;;;;;AAkBO,IAAe,SAAf,MAAsB;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EA6B3B,IAAI,cAAA,GAAoD;AACtD,IAAA,OAAO,CAAC,MAAM,CAAA;AAAA,EAChB;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAUA,IAAI,sBAAA,GAAkC;AACpC,IAAA,OAAO,KAAA;AAAA,EACT;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAWA,UAAA,CAAW,QAAgB,OAAA,EAAoC;AAC7D,IAAA,OAAO,OAAA,CAAQ,OAAA,CAAQ,EAAE,CAAA;AAAA,EAC3B;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAWA,mBAAA,CAAoB,OAAe,EAAA,EAAkC;AACnE,IAAA,OAAO,IAAA,CAAK,SAAA,CAAU,KAAA,EAAO,EAAE,CAAA;AAAA,EACjC;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAYA,mBAAA,CAAoB,KAAA,EAAe,OAAA,EAAiB,EAAA,EAAkC;AACpF,IAAA,OAAO,IAAA,CAAK,mBAAA,CAAoB,KAAA,EAAO,EAAE,CAAA;AAAA,EAC3C;AACF;;;AChFA,IAAM,WAAA,GAA+B;AAAA,EACnC,GAAA,EAAK,MAAM,IAAA,CAAK,GAAA,EAAI;AAAA,EACpB,YAAY,CAAC,EAAA,EAAI,EAAA,KAAO,UAAA,CAAW,IAAI,EAAE,CAAA;AAAA,EACzC,YAAA,EAAc,CAAA,MAAA,KAAU,YAAA,CAAa,MAAuD;AAC9F,CAAA;AAIO,IAAM,uBAAA,GAA0B,GAAA;AAChC,IAAM,gBAAA,GAAmE,yBAAA;AASzE,IAAM,cAAN,MAAkB;AAAA,EACN,IAAA;AAAA,EACA,IAAA;AAAA,EACA,aAAA;AAAA,EACA,QAAA;AAAA,EAET,aAAA,GAA+B,IAAA;AAAA,EAC/B,eAAA,GAA0B,CAAA,QAAA;AAAA,EAC1B,IAAA,GAAO,CAAA;AAAA,EACP,KAAA,GAAuC,IAAA;AAAA,EACvC,YAAA,GAAoD,IAAA;AAAA,EAE5D,WAAA,CAAY,IAAA,EAA6B,IAAA,GAAwB,WAAA,EAAa;AAC5E,IAAA,IAAA,CAAK,IAAA,GAAO,IAAA;AACZ,IAAA,IAAA,CAAK,IAAA,GAAO,IAAA;AACZ,IAAA,IAAA,CAAK,aAAA,GAAgB,KAAK,aAAA,IAAiB,uBAAA;AAC3C,IAAA,IAAA,CAAK,QAAA,GAAW,KAAK,QAAA,IAAY,gBAAA;AAAA,EACnC;AAAA;AAAA,EAGA,iBAAiB,EAAA,EAAsC;AACrD,IAAA,IAAA,CAAK,YAAA,GAAe,EAAA;AAAA,EACtB;AAAA;AAAA;AAAA;AAAA;AAAA,EAMA,UAAU,KAAA,EAA+B;AACvC,IAAA,IAAA,CAAK,IAAA,IAAQ,CAAA;AACb,IAAA,IAAI,IAAA,CAAK,kBAAkB,IAAA,EAAM;AAC/B,MAAA,IAAA,CAAK,aAAA,GAAgB,IAAA,CAAK,IAAA,CAAK,GAAA,EAAI;AAAA,IACrC;AAEA,IAAA,MAAM,GAAA,GAAM,IAAA,CAAK,IAAA,CAAK,GAAA,EAAI;AAC1B,IAAA,MAAM,aAAA,GAAgB,IAAA,CAAK,eAAA,IAAmB,IAAA,CAAK,KAAK,aAAA,IAAiB,CAAA,CAAA;AAGzE,IAAA,IAAI,IAAA,CAAK,IAAA,CAAK,WAAA,GAAc,KAAK,CAAA,EAAG;AAClC,MAAA,IAAI,OAAO,aAAA,EAAe;AACxB,QAAA,OAAO,WAAA;AAAA,MACT;AAEA,MAAA,IAAA,CAAK,WAAW,aAAa,CAAA;AAC7B,MAAA,OAAO,MAAA;AAAA,IACT;AAGA,IAAA,IAAI,IAAA,CAAK,IAAA,IAAQ,IAAA,CAAK,aAAA,EAAe;AACnC,MAAA,OAAO,WAAA;AAAA,IACT;AAGA,IAAA,IAAI,IAAA,CAAK,KAAK,OAAA,KAAY,MAAA,IAAa,KAAK,IAAA,IAAQ,IAAA,CAAK,KAAK,OAAA,EAAS;AACrE,MAAA,IAAI,OAAO,aAAA,EAAe;AACxB,QAAA,OAAO,WAAA;AAAA,MACT;AACA,MAAA,IAAA,CAAK,WAAW,aAAa,CAAA;AAC7B,MAAA,OAAO,MAAA;AAAA,IACT;AAGA,IAAA,IAAA,CAAK,gBAAA,EAAiB;AACtB,IAAA,OAAO,MAAA;AAAA,EACT;AAAA;AAAA;AAAA;AAAA;AAAA,EAMA,UAAU,cAAA,EAA8B;AACtC,IAAA,IAAA,CAAK,eAAA,GAAkB,IAAA,CAAK,IAAA,CAAK,GAAA,EAAI;AACrC,IAAA,IAAA,CAAK,OAAO,IAAA,CAAK,GAAA,CAAI,CAAA,EAAG,IAAA,CAAK,OAAO,cAAc,CAAA;AAClD,IAAA,IAAA,CAAK,aAAA,GAAgB,IAAA;AACrB,IAAA,IAAA,CAAK,WAAA,EAAY;AAAA,EACnB;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAOA,aAAa,MAAA,EAA2D;AACtE,IAAA,IAAI,OAAA,GAAU,MAAA;AAGd,IAAA,IAAI,IAAA,CAAK,KAAK,QAAA,EAAU;AACtB,MAAA,MAAM,SAAA,GAAY,IAAA,CAAK,IAAA,CAAK,QAAA,CAAS,OAAO,CAAA;AAQ5C,MAAA,MAAM,SAAA,GAAY,IAAI,GAAA,CAAW,OAAO,CAAA;AACxC,MAAA,MAAM,aAAa,SAAA,CAAU,KAAA,CAAM,OAAK,SAAA,CAAU,GAAA,CAAI,CAAC,CAAC,CAAA;AACxD,MAAA,OAAA,GAAU,UAAA,GAAa,YAAY,EAAC;AAAA,IACtC;AAEA,IAAA,MAAM,QAAA,GAAW,IAAI,GAAA,CAAW,OAAO,CAAA;AACvC,IAAA,MAAM,cAAA,GAAiB,MAAe,MAAA,CAAO,MAAA,CAAO,OAAK,CAAC,QAAA,CAAS,GAAA,CAAI,CAAC,CAAC,CAAA;AAGzE,IAAA,IAAI,OAAA,CAAQ,MAAA,IAAU,IAAA,CAAK,aAAA,EAAe;AACxC,MAAA,OAAO,EAAE,SAAA,EAAW,OAAA,EAAS,OAAA,EAAS,gBAAe,EAAE;AAAA,IACzD;AAEA,IAAA,MAAM,MAAA,GAAS,OAAA,CAAQ,MAAA,GAAS,IAAA,CAAK,aAAA;AACrC,IAAA,MAAM,WAAA,GAAc,KAAK,IAAA,CAAK,WAAA;AAE9B,IAAA,IAAI,IAAA;AACJ,IAAA,IAAI,mBAAA;AAEJ,IAAA,QAAQ,KAAK,QAAA;AAAU,MACrB,KAAK,aAAA,EAAe;AAClB,QAAA,MAAM,eAAe,IAAA,CAAK,mBAAA;AAAA,UAAoB,OAAA;AAAA,UAAS,MAAA;AAAA,UAAQ,WAAA;AAAA;AAAA,UAA2B;AAAA,SAAI;AAC9F,QAAA,IAAA,GAAO,YAAA,CAAa,IAAA;AACpB,QAAA,mBAAA,GAAsB,YAAA,CAAa,OAAA;AACnC,QAAA;AAAA,MACF;AAAA,MACA,KAAK,aAAA;AAAA,MACL,KAAK,yBAAA;AAAA,MACL,SAAS;AACP,QAAA,MAAM,iBAAiB,IAAA,CAAK,mBAAA;AAAA,UAAoB,OAAA;AAAA,UAAS,MAAA;AAAA,UAAQ,WAAA;AAAA;AAAA,UAA2B;AAAA,SAAK;AACjG,QAAA,IAAA,GAAO,cAAA,CAAe,IAAA;AACtB,QAAA,mBAAA,GAAsB,cAAA,CAAe,OAAA;AACrC,QAAA;AAAA,MACF;AAAA;AAGF,IAAA,OAAO,EAAE,SAAA,EAAW,IAAA,EAAM,OAAA,EAAS,CAAC,GAAG,cAAA,EAAe,EAAG,GAAG,mBAAmB,CAAA,EAAE;AAAA,EACnF;AAAA;AAAA,EAGA,OAAA,GAAgB;AACd,IAAA,IAAA,CAAK,WAAA,EAAY;AACjB,IAAA,IAAA,CAAK,YAAA,GAAe,IAAA;AACpB,IAAA,IAAA,CAAK,aAAA,GAAgB,IAAA;AACrB,IAAA,IAAA,CAAK,IAAA,GAAO,CAAA;AAAA,EACd;AAAA,EAEQ,gBAAA,GAAyB;AAC/B,IAAA,IAAI,KAAK,IAAA,CAAK,SAAA,KAAc,UAAa,IAAA,CAAK,IAAA,CAAK,kBAAkB,MAAA,EAAW;AAE9E,MAAA;AAAA,IACF;AAEA,IAAA,MAAM,aAAA,GAAgB,IAAA,CAAK,aAAA,IAAiB,IAAA,CAAK,KAAK,GAAA,EAAI;AAC1D,IAAA,MAAM,QAAA,GAAW,KAAK,IAAA,CAAK,SAAA,KAAc,SAAY,aAAA,GAAgB,IAAA,CAAK,IAAA,CAAK,SAAA,GAAY,MAAA,CAAO,iBAAA;AAClG,IAAA,MAAM,KAAA,GAAQ,IAAA,CAAK,eAAA,IAAmB,IAAA,CAAK,KAAK,aAAA,IAAiB,CAAA,CAAA;AACjE,IAAA,MAAM,EAAA,GAAK,IAAA,CAAK,GAAA,CAAI,QAAA,EAAU,KAAK,CAAA;AACnC,IAAA,IAAA,CAAK,WAAW,EAAE,CAAA;AAAA,EACpB;AAAA,EAEQ,WAAW,EAAA,EAAkB;AACnC,IAAA,IAAI,CAAC,QAAA,CAAS,EAAE,CAAA,EAAG;AACjB,MAAA;AAAA,IACF;AACA,IAAA,IAAA,CAAK,WAAA,EAAY;AACjB,IAAA,MAAM,KAAA,GAAQ,KAAK,GAAA,CAAI,CAAA,EAAG,KAAK,IAAA,CAAK,IAAA,CAAK,KAAK,CAAA;AAC9C,IAAA,IAAA,CAAK,KAAA,GAAQ,IAAA,CAAK,IAAA,CAAK,UAAA,CAAW,MAAM;AACtC,MAAA,IAAA,CAAK,KAAA,GAAQ,IAAA;AACb,MAAA,MAAM,UAAU,IAAA,CAAK,YAAA;AACrB,MAAA,IAAI,OAAA,EAAS;AACX,QAAA,KAAK,OAAA,EAAQ;AAAA,MACf;AAAA,IACF,GAAG,KAAK,CAAA;AAAA,EACV;AAAA,EAEQ,WAAA,GAAoB;AAC1B,IAAA,IAAI,IAAA,CAAK,UAAU,IAAA,EAAM;AACvB,MAAA,IAAA,CAAK,IAAA,CAAK,YAAA,CAAa,IAAA,CAAK,KAAK,CAAA;AACjC,MAAA,IAAA,CAAK,KAAA,GAAQ,IAAA;AAAA,IACf;AAAA,EACF;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAOQ,mBAAA,CACN,KAAA,EACA,KAAA,EACA,WAAA,EACA,OAAA,EACqC;AACrC,IAAA,MAAM,UAAmB,EAAC;AAC1B,IAAA,MAAM,MAAA,GAAS,CAAC,GAAG,KAAK,CAAA;AACxB,IAAA,IAAI,SAAA,GAAY,KAAA;AAEhB,IAAA,MAAM,KAAA,GAAQ,OAAA,GAAU,CAAC,GAAG,OAAO,IAAA,EAAM,CAAA,CAAE,OAAA,EAAQ,GAAI,CAAC,GAAG,MAAA,CAAO,MAAM,CAAA;AAExE,IAAA,KAAA,MAAW,OAAO,KAAA,EAAO;AACvB,MAAA,IAAI,cAAc,CAAA,EAAG;AACrB,MAAA,MAAM,EAAA,GAAK,OAAO,GAAG,CAAA;AACrB,MAAA,IAAI,WAAA,GAAc,EAAE,CAAA,EAAG;AACvB,MAAA,OAAA,CAAQ,KAAK,EAAE,CAAA;AACf,MAAA,MAAA,CAAO,GAAG,CAAA,GAAI,MAAA;AACd,MAAA,SAAA,IAAa,CAAA;AAAA,IACf;AAEA,IAAA,MAAM,OAAO,MAAA,CAAO,MAAA,CAAO,CAAC,CAAA,KAAkB,MAAM,MAAS,CAAA;AAC7D,IAAA,OAAO,EAAE,MAAM,OAAA,EAAQ;AAAA,EACzB;AACF,CAAA;;;ACjOO,IAAM,kBAAN,MAAsB;AAAA,EAO3B,WAAA,CACmB,EAAA,EACjB,IAAA,EACA,IAAA,EACiB,OAAA,EACjB;AAJiB,IAAA,IAAA,CAAA,EAAA,GAAA,EAAA;AAGA,IAAA,IAAA,CAAA,OAAA,GAAA,OAAA;AAEjB,IAAA,IAAA,CAAK,MAAA,GAAS,IAAI,WAAA,CAAY,IAAA,EAAM,IAAI,CAAA;AAQxC,IAAA,IAAA,CAAK,MAAA,CAAO,gBAAA;AAAA,MAAiB,MAC3B,IAAA,CAAK,KAAA,EAAM,CAAE,MAAM,CAAA,GAAA,KAAO;AACxB,QAAA,IAAA,CAAK,OAAA,GAAU,GAAA,EAAK,EAAE,KAAA,EAAO,MAAM,CAAA;AAAA,MACrC,CAAC;AAAA,KACH;AAAA,EACF;AAAA,EAlBmB,EAAA;AAAA,EAGA,OAAA;AAAA,EAVF,MAAA;AAAA,EACT,QAAiB,EAAC;AAAA,EAClB,QAAA,GAAW,KAAA;AAAA,EACX,OAAA,GAAU,KAAA;AAAA,EACV,QAAA,GAAW,KAAA;AAAA;AAAA;AAAA;AAAA,EA0BnB,MAAM,IAAA,CAAK,KAAA,EAAc,GAAA,EAA2B,IAAA,EAA2C;AAC7F,IAAA,IAAI,KAAK,QAAA,EAAU;AACnB,IAAA,IAAA,CAAK,MAAM,IAAA,CAAK,EAAE,KAAA,EAAO,GAAA,EAAK,MAAM,CAAA;AACpC,IAAA,MAAM,QAAA,GAAW,IAAA,CAAK,MAAA,CAAO,SAAA,CAAU,KAAK,CAAA;AAC5C,IAAA,IAAI,aAAa,WAAA,EAAa;AAC5B,MAAA,MAAM,KAAK,KAAA,EAAM;AAAA,IACnB;AAAA,EACF;AAAA;AAAA;AAAA;AAAA;AAAA,EAMA,MAAM,KAAA,GAAuB;AAK3B,IAAA,IAAI,KAAK,QAAA,EAAU;AACjB,MAAA,IAAA,CAAK,OAAA,GAAU,IAAA;AACf,MAAA;AAAA,IACF;AAIA,IAAA,IAAI,IAAA,CAAK,KAAA,CAAM,MAAA,KAAW,CAAA,EAAG;AAE7B,IAAA,IAAA,CAAK,QAAA,GAAW,IAAA;AAChB,IAAA,IAAI;AACF,MAAA,GAAG;AACD,QAAA,IAAA,CAAK,OAAA,GAAU,KAAA;AACf,QAAA,IAAI,IAAA,CAAK,KAAA,CAAM,MAAA,KAAW,CAAA,EAAG;AAE7B,QAAA,MAAM,WAAW,IAAA,CAAK,KAAA;AACtB,QAAA,IAAA,CAAK,QAAQ,EAAC;AAEd,QAAA,MAAM,MAAA,GAAS,QAAA,CAAS,GAAA,CAAI,CAAA,CAAA,KAAK,EAAE,KAAK,CAAA;AAGxC,QAAA,MAAM,OAAA,uBAAc,GAAA,EAAkB;AACtC,QAAA,KAAA,MAAW,KAAK,QAAA,EAAU,OAAA,CAAQ,GAAA,CAAI,CAAA,CAAE,OAAO,CAAC,CAAA;AAEhD,QAAA,MAAM,EAAE,SAAA,EAAW,OAAA,KAAY,IAAA,CAAK,MAAA,CAAO,aAAa,MAAM,CAAA;AAI9D,QAAA,KAAA,MAAW,MAAM,OAAA,EAAS;AACxB,UAAA,MAAM,KAAA,GAAQ,OAAA,CAAQ,GAAA,CAAI,EAAE,CAAA;AAC5B,UAAA,IAAI,OAAO,GAAA,EAAK;AACd,YAAA,IAAI;AACF,cAAA,MAAM,MAAM,GAAA,EAAI;AAAA,YAClB,SAAS,GAAA,EAAK;AACZ,cAAA,IAAA,CAAK,OAAA,GAAU,GAAA,EAAK,EAAE,KAAA,EAAO,eAAe,CAAA;AAAA,YAC9C;AAAA,UACF;AAAA,QACF;AAEA,QAAA,KAAA,MAAW,MAAM,SAAA,EAAW;AAI1B,UAAA,IAAI,KAAK,QAAA,EAAU;AACnB,UAAA,MAAM,KAAA,GAAQ,OAAA,CAAQ,GAAA,CAAI,EAAE,CAAA;AAC5B,UAAA,IAAI;AAIF,YAAA,MAAO,KAAK,EAAA,CAAG,EAAA,EAAI,KAAA,EAAO,GAAA,EAAK,OAAO,IAAI,CAAA;AAAA,UAC5C,SAAS,GAAA,EAAK;AACZ,YAAA,IAAA,CAAK,OAAA,GAAU,GAAA,EAAK,EAAE,KAAA,EAAO,MAAM,CAAA;AAAA,UACrC;AAAA,QACF;AAKA,QAAA,IAAA,CAAK,MAAA,CAAO,SAAA,CAAU,SAAA,CAAU,MAAA,GAAS,QAAQ,MAAM,CAAA;AAAA,MACzD,CAAA,QAAS,IAAA,CAAK,OAAA,IAAW,CAAC,IAAA,CAAK,QAAA;AAAA,IACjC,CAAA,SAAE;AACA,MAAA,IAAA,CAAK,QAAA,GAAW,KAAA;AAAA,IAClB;AAAA,EACF;AAAA,EAEA,OAAA,GAAgB;AACd,IAAA,IAAA,CAAK,QAAA,GAAW,IAAA;AAChB,IAAA,IAAA,CAAK,QAAQ,EAAC;AACd,IAAA,IAAA,CAAK,OAAO,OAAA,EAAQ;AAAA,EACtB;AACF,CAAA;;;AC1HA,IAAM,WAAW,YAA2B;AAAC,CAAA;AAEtC,IAAM,kBAAA,GAAN,cAAiC,MAAA,CAAO;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAO7C,IAAa,cAAA,GAAoD;AAC/D,IAAA,OAAO,CAAC,QAAQ,MAAM,CAAA;AAAA,EACxB;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,EAQA,IAAa,sBAAA,GAAkC;AAC7C,IAAA,OAAO,IAAA;AAAA,EACT;AAAA,EAEQ,OAAA;AAAA;AAAA,EAGA,MAAA,uBAAwD,GAAA,EAAI;AAAA;AAAA,EAE5D,aAAA,uBAAyC,GAAA,EAAI;AAAA;AAAA,EAE7C,cAAA,uBAA0D,GAAA,EAAI;AAAA;AAAA,EAG9D,YAAA,uBAAuD,GAAA,EAAI;AAAA;AAAA,EAG3D,gBAAA,uBAA4C,GAAA,EAAI;AAAA;AAAA;AAAA;AAAA,EAKhD,cAAA,uBAA8E,GAAA,EAAI;AAAA;AAAA;AAAA;AAAA,EAKlF,YAAA,uBAAqE,GAAA,EAAI;AAAA,EAEhE,MAAA;AAAA,EAEjB,WAAA,CAAY,eAAA,EAAgC,OAAA,GAAqC,EAAC,EAAG;AACnF,IAAA,KAAA,EAAM;AACN,IAAA,IAAA,CAAK,OAAA,GAAU,eAAA,IAAmB,IAAIA,6BAAA,EAAa;AACnD,IAAA,IAAA,CAAK,SAAS,OAAA,CAAQ,MAAA;AAAA,EACxB;AAAA;AAAA;AAAA;AAAA;AAAA,EAMQ,cAAA,CAAe,KAAA,EAAe,GAAA,EAAc,GAAA,EAA4C;AAC9F,IAAA,MAAM,OAAA,GAAU,CAAA,6BAAA,EAAgC,GAAA,CAAI,KAAK,eAAe,KAAK,CAAA,CAAA;AAC7E,IAAA,IAAI,KAAK,MAAA,EAAQ;AACf,MAAA,IAAA,CAAK,MAAA,CAAO,KAAA,CAAM,OAAA,EAAS,GAAG,CAAA;AAAA,IAChC,CAAA,MAAO;AACL,MAAA,OAAA,CAAQ,KAAA,CAAM,SAAS,GAAG,CAAA;AAAA,IAC5B;AAAA,EACF;AAAA,EAEA,MAAM,OAAA,CACJ,KAAA,EACA,KAAA,EACA,QAAA,EACe;AACf,IAAA,MAAM,EAAA,GAAK,OAAO,UAAA,EAAW;AAC7B,IAAA,MAAM,SAAA,uBAAgB,IAAA,EAAK;AAC3B,IAAA,IAAA,CAAK,OAAA,CAAQ,KAAK,KAAA,EAAO;AAAA,MACvB,GAAG,KAAA;AAAA,MACH,EAAA;AAAA,MACA,SAAA;AAAA,MACA,eAAA,EAAiB;AAAA,KAClB,CAAA;AAAA,EACH;AAAA,EAEA,MAAM,SAAA,CAAU,KAAA,EAAe,EAAA,EAAmB,OAAA,EAA2C;AAC3F,IAAA,IAAI,SAAS,KAAA,EAAO;AAGlB,MAAA,MAAM,MAAA,GAAS,IAAI,eAAA,CAAgB,EAAA,EAAI,QAAQ,KAAA,EAAO,MAAA,EAAW,CAAC,GAAA,EAAK,GAAA,KAAQ;AAC7E,QAAA,IAAA,CAAK,cAAA,CAAe,KAAA,EAAO,GAAA,EAAK,GAAG,CAAA;AAAA,MACrC,CAAC,CAAA;AACD,MAAA,IAAI,IAAA,GAAO,IAAA,CAAK,YAAA,CAAa,GAAA,CAAI,KAAK,CAAA;AACtC,MAAA,IAAI,CAAC,IAAA,EAAM;AACT,QAAA,IAAA,uBAAW,GAAA,EAAI;AACf,QAAA,IAAA,CAAK,YAAA,CAAa,GAAA,CAAI,KAAA,EAAO,IAAI,CAAA;AAAA,MACnC;AACA,MAAA,IAAA,CAAK,GAAA,CAAI,IAAI,MAAM,CAAA;AAEnB,MAAA,IAAI,QAAQ,KAAA,EAAO;AAKjB,QAAA,IAAA,CAAK,kBAAA,CAAmB,KAAA,EAAO,EAAA,EAAI,OAAA,CAAQ,KAAK,CAAA;AAAA,MAClD,CAAA,MAAO;AACL,QAAA,MAAM,OAAA,GAAU,CAAC,KAAA,KAAiB;AAKhC,UAAA,KAAK,OAAO,IAAA,CAAK,KAAA,EAAO,UAAU,QAAQ,CAAA,CAAE,MAAM,CAAA,GAAA,KAAO;AACvD,YAAA,IAAA,CAAK,eAAe,KAAA,EAAO,GAAA,EAAK,EAAE,KAAA,EAAO,MAAM,CAAA;AAAA,UACjD,CAAC,CAAA;AAAA,QACH,CAAA;AACA,QAAA,IAAI,UAAA,GAAa,IAAA,CAAK,cAAA,CAAe,GAAA,CAAI,KAAK,CAAA;AAC9C,QAAA,IAAI,CAAC,UAAA,EAAY;AACf,UAAA,UAAA,uBAAiB,GAAA,EAAI;AACrB,UAAA,IAAA,CAAK,cAAA,CAAe,GAAA,CAAI,KAAA,EAAO,UAAU,CAAA;AAAA,QAC3C;AACA,QAAA,UAAA,CAAW,GAAA,CAAI,IAAI,OAAO,CAAA;AAC1B,QAAA,IAAA,CAAK,OAAA,CAAQ,EAAA,CAAG,KAAA,EAAO,OAAO,CAAA;AAAA,MAChC;AACA,MAAA;AAAA,IACF;AAEA,IAAA,IAAI,SAAS,KAAA,EAAO;AAClB,MAAA,IAAA,CAAK,kBAAA,CAAmB,KAAA,EAAO,EAAA,EAAI,OAAA,CAAQ,KAAK,CAAA;AAAA,IAClD,CAAA,MAAO;AACL,MAAA,MAAM,OAAA,GAAU,CAAC,KAAA,KAAiB;AAChC,QAAA,EAAA,CAAG,KAAA,EAAO,UAAU,QAAQ,CAAA;AAAA,MAC9B,CAAA;AACA,MAAA,IAAI,IAAA,GAAO,IAAA,CAAK,cAAA,CAAe,GAAA,CAAI,KAAK,CAAA;AACxC,MAAA,IAAI,CAAC,IAAA,EAAM;AACT,QAAA,IAAA,uBAAW,GAAA,EAAI;AACf,QAAA,IAAA,CAAK,cAAA,CAAe,GAAA,CAAI,KAAA,EAAO,IAAI,CAAA;AAAA,MACrC;AACA,MAAA,IAAA,CAAK,GAAA,CAAI,IAAI,OAAO,CAAA;AACpB,MAAA,IAAA,CAAK,OAAA,CAAQ,EAAA,CAAG,KAAA,EAAO,OAAO,CAAA;AAAA,IAChC;AAAA,EACF;AAAA,EAEA,MAAM,WAAA,CAAY,KAAA,EAAe,EAAA,EAAkC;AAIjE,IAAA,MAAM,WAAA,GAAc,IAAA,CAAK,YAAA,CAAa,GAAA,CAAI,KAAK,CAAA;AAC/C,IAAA,MAAM,MAAA,GAAS,WAAA,EAAa,GAAA,CAAI,EAAE,CAAA;AAClC,IAAA,IAAI,UAAU,WAAA,EAAa;AACzB,MAAA,MAAA,CAAO,OAAA,EAAQ;AACf,MAAA,WAAA,CAAY,OAAO,EAAE,CAAA;AACrB,MAAA,IAAI,YAAY,IAAA,KAAS,CAAA,EAAG,IAAA,CAAK,YAAA,CAAa,OAAO,KAAK,CAAA;AAAA,IAC5D;AAGA,IAAA,KAAA,MAAW,CAAC,KAAA,EAAO,QAAQ,CAAA,IAAK,KAAK,MAAA,EAAQ;AAC3C,MAAA,MAAM,OAAA,GAAU,QAAA,CAAS,GAAA,CAAI,KAAK,CAAA;AAClC,MAAA,IAAI,OAAA,EAAS;AACX,QAAA,MAAM,GAAA,GAAM,OAAA,CAAQ,OAAA,CAAQ,EAAE,CAAA;AAC9B,QAAA,IAAI,QAAQ,EAAA,EAAI;AACd,UAAA,OAAA,CAAQ,MAAA,CAAO,KAAK,CAAC,CAAA;AAErB,UAAA,IAAI,OAAA,CAAQ,WAAW,CAAA,EAAG;AACxB,YAAA,QAAA,CAAS,OAAO,KAAK,CAAA;AACrB,YAAA,MAAM,WAAA,GAAc,CAAA,EAAG,KAAK,CAAA,CAAA,EAAI,KAAK,CAAA,CAAA;AACrC,YAAA,MAAM,QAAA,GAAW,IAAA,CAAK,cAAA,CAAe,GAAA,CAAI,WAAW,CAAA;AACpD,YAAA,IAAI,QAAA,EAAU;AACZ,cAAA,IAAA,CAAK,OAAA,CAAQ,GAAA,CAAI,KAAA,EAAO,QAAQ,CAAA;AAChC,cAAA,IAAA,CAAK,cAAA,CAAe,OAAO,WAAW,CAAA;AACtC,cAAA,IAAA,CAAK,aAAA,CAAc,OAAO,WAAW,CAAA;AAAA,YACvC;AAAA,UACF;AACA,UAAA,IAAI,QAAA,CAAS,SAAS,CAAA,EAAG;AACvB,YAAA,IAAA,CAAK,MAAA,CAAO,OAAO,KAAK,CAAA;AAAA,UAC1B;AACA,UAAA;AAAA,QACF;AAAA,MACF;AAAA,IACF;AAGA,IAAA,MAAM,IAAA,GAAO,IAAA,CAAK,cAAA,CAAe,GAAA,CAAI,KAAK,CAAA;AAC1C,IAAA,MAAM,OAAA,GAAU,IAAA,EAAM,GAAA,CAAI,EAAE,CAAA;AAC5B,IAAA,IAAI,WAAW,IAAA,EAAM;AACnB,MAAA,IAAA,CAAK,OAAA,CAAQ,GAAA,CAAI,KAAA,EAAO,OAAO,CAAA;AAC/B,MAAA,IAAA,CAAK,OAAO,EAAE,CAAA;AACd,MAAA,IAAI,KAAK,IAAA,KAAS,CAAA,EAAG,IAAA,CAAK,cAAA,CAAe,OAAO,KAAK,CAAA;AAAA,IACvD,CAAA,MAAO;AACL,MAAA,IAAA,CAAK,OAAA,CAAQ,GAAA,CAAI,KAAA,EAAO,EAAE,CAAA;AAAA,IAC5B;AAAA,EACF;AAAA,EAEA,MAAM,KAAA,GAAuB;AAQ3B,IAAA,OAAO,IAAA,EAAM;AACX,MAAA,MAAM,SAAsD,EAAC;AAC7D,MAAA,KAAA,MAAW,CAAC,KAAA,EAAO,IAAI,KAAK,IAAA,CAAK,YAAA,CAAa,SAAQ,EAAG;AACvD,QAAA,KAAA,MAAW,MAAA,IAAU,IAAA,CAAK,MAAA,EAAO,EAAG;AAClC,UAAA,MAAA,CAAO,KAAK,EAAE,KAAA,EAAO,SAAS,MAAA,CAAO,KAAA,IAAS,CAAA;AAAA,QAChD;AAAA,MACF;AACA,MAAA,IAAI,MAAA,CAAO,SAAS,CAAA,EAAG;AAKrB,QAAA,MAAM,OAAA,GAAU,MAAM,OAAA,CAAQ,UAAA,CAAW,OAAO,GAAA,CAAI,CAAA,CAAA,KAAK,CAAA,CAAE,OAAO,CAAC,CAAA;AACnE,QAAA,KAAA,IAAS,CAAA,GAAI,CAAA,EAAG,CAAA,GAAI,OAAA,CAAQ,QAAQ,CAAA,EAAA,EAAK;AACvC,UAAA,MAAM,MAAA,GAAS,QAAQ,CAAC,CAAA;AACxB,UAAA,IAAI,MAAA,CAAO,WAAW,UAAA,EAAY;AAChC,YAAA,IAAA,CAAK,cAAA,CAAe,MAAA,CAAO,CAAC,CAAA,CAAG,KAAA,EAAO,OAAO,MAAA,EAAQ,EAAE,KAAA,EAAO,IAAA,EAAM,CAAA;AAAA,UACtE;AAAA,QACF;AAAA,MACF;AAEA,MAAA,IAAI,IAAA,CAAK,YAAA,CAAa,IAAA,KAAS,CAAA,EAAG;AAGhC,QAAA;AAAA,MACF;AAIA,MAAA,MAAM,IAAI,QAAc,CAAA,OAAA,KAAW;AACjC,QAAA,MAAM,QAAQ,MAAM;AAClB,UAAA,IAAI,IAAA,CAAK,YAAA,CAAa,IAAA,KAAS,CAAA,EAAG;AAChC,YAAA,OAAA,EAAQ;AAAA,UACV,CAAA,MAAO;AACL,YAAA,UAAA,CAAW,OAAO,EAAE,CAAA;AAAA,UACtB;AAAA,QACF,CAAA;AACA,QAAA,KAAA,EAAM;AAAA,MACR,CAAC,CAAA;AAAA,IACH;AAAA,EACF;AAAA;AAAA;AAAA;AAAA,EAKA,MAAM,KAAA,GAAuB;AAE3B,IAAA,KAAA,MAAW,MAAA,IAAU,KAAK,YAAA,EAAc;AACtC,MAAA,YAAA,CAAa,MAAM,CAAA;AAAA,IACrB;AACA,IAAA,IAAA,CAAK,aAAa,KAAA,EAAM;AACxB,IAAA,IAAA,CAAK,iBAAiB,KAAA,EAAM;AAG5B,IAAA,KAAA,MAAW,IAAA,IAAQ,IAAA,CAAK,YAAA,CAAa,MAAA,EAAO,EAAG;AAC7C,MAAA,KAAA,MAAW,MAAA,IAAU,IAAA,CAAK,MAAA,EAAO,EAAG;AAClC,QAAA,MAAA,CAAO,OAAA,EAAQ;AAAA,MACjB;AAAA,IACF;AACA,IAAA,IAAA,CAAK,aAAa,KAAA,EAAM;AAExB,IAAA,IAAA,CAAK,QAAQ,kBAAA,EAAmB;AAChC,IAAA,IAAA,CAAK,OAAO,KAAA,EAAM;AAClB,IAAA,IAAA,CAAK,cAAc,KAAA,EAAM;AACzB,IAAA,IAAA,CAAK,eAAe,KAAA,EAAM;AAC1B,IAAA,IAAA,CAAK,eAAe,KAAA,EAAM;AAAA,EAC5B;AAAA,EAEQ,kBAAA,CAAmB,KAAA,EAAe,EAAA,EAAmB,KAAA,EAAqB;AAChF,IAAA,IAAI,QAAA,GAAW,IAAA,CAAK,MAAA,CAAO,GAAA,CAAI,KAAK,CAAA;AACpC,IAAA,IAAI,CAAC,QAAA,EAAU;AACb,MAAA,QAAA,uBAAe,GAAA,EAAI;AACnB,MAAA,IAAA,CAAK,MAAA,CAAO,GAAA,CAAI,KAAA,EAAO,QAAQ,CAAA;AAAA,IACjC;AAEA,IAAA,IAAI,OAAA,GAAU,QAAA,CAAS,GAAA,CAAI,KAAK,CAAA;AAChC,IAAA,IAAI,CAAC,OAAA,EAAS;AACZ,MAAA,OAAA,GAAU,EAAC;AACX,MAAA,QAAA,CAAS,GAAA,CAAI,OAAO,OAAO,CAAA;AAAA,IAC7B;AAEA,IAAA,OAAA,CAAQ,KAAK,EAAE,CAAA;AAGf,IAAA,MAAM,WAAA,GAAc,CAAA,EAAG,KAAK,CAAA,CAAA,EAAI,KAAK,CAAA,CAAA;AACrC,IAAA,IAAI,CAAC,IAAA,CAAK,cAAA,CAAe,GAAA,CAAI,WAAW,CAAA,EAAG;AACzC,MAAA,MAAM,QAAA,GAAW,CAAC,KAAA,KAAiB;AACjC,QAAA,IAAA,CAAK,cAAA,CAAe,KAAA,EAAO,KAAA,EAAO,WAAA,EAAa,KAAK,CAAA;AAAA,MACtD,CAAA;AAEA,MAAA,IAAA,CAAK,cAAA,CAAe,GAAA,CAAI,WAAA,EAAa,QAAQ,CAAA;AAC7C,MAAA,IAAA,CAAK,OAAA,CAAQ,EAAA,CAAG,KAAA,EAAO,QAAQ,CAAA;AAAA,IACjC;AAAA,EACF;AAAA,EAEQ,cAAA,CAAe,KAAA,EAAe,KAAA,EAAe,WAAA,EAAqB,KAAA,EAAoB;AAC5F,IAAA,MAAM,iBAAiB,IAAA,CAAK,MAAA,CAAO,IAAI,KAAK,CAAA,EAAG,IAAI,KAAK,CAAA;AACxD,IAAA,IAAI,CAAC,cAAA,IAAkB,cAAA,CAAe,MAAA,KAAW,CAAA,EAAG;AAEpD,IAAA,MAAM,OAAA,GAAU,IAAA,CAAK,aAAA,CAAc,GAAA,CAAI,WAAW,CAAA,IAAK,CAAA;AACvD,IAAA,MAAM,GAAA,GAAM,UAAU,cAAA,CAAe,MAAA;AACrC,IAAA,IAAA,CAAK,aAAA,CAAc,GAAA,CAAI,WAAA,EAAa,OAAA,GAAU,CAAC,CAAA;AAI/C,IAAA,MAAM,UAAA,GAAa,CAAA,EAAG,WAAW,CAAA,CAAA,EAAI,MAAM,EAAE,CAAA,CAAA;AAC7C,IAAA,MAAM,OAAA,GAAU,IAAA,CAAK,gBAAA,CAAiB,GAAA,CAAI,UAAU,CAAA,IAAK,CAAA;AACzD,IAAA,MAAM,gBAAA,GAAmB,EAAE,GAAG,KAAA,EAAO,iBAAiB,OAAA,EAAQ;AAE9D,IAAA,MAAM,MAAM,YAAY;AAEtB,MAAA,IAAA,CAAK,gBAAA,CAAiB,OAAO,UAAU,CAAA;AAAA,IACzC,CAAA;AAEA,IAAA,MAAM,OAAO,YAAY;AAGvB,MAAA,IAAA,CAAK,gBAAA,CAAiB,GAAA,CAAI,UAAA,EAAY,OAAA,GAAU,CAAC,CAAA;AAEjD,MAAA,MAAM,MAAA,GAAS,WAAW,MAAM;AAC9B,QAAA,IAAA,CAAK,YAAA,CAAa,OAAO,MAAM,CAAA;AAC/B,QAAA,IAAA,CAAK,cAAA,CAAe,KAAA,EAAO,KAAA,EAAO,WAAA,EAAa,KAAK,CAAA;AAAA,MACtD,GAAG,CAAC,CAAA;AACJ,MAAA,IAAA,CAAK,YAAA,CAAa,IAAI,MAAM,CAAA;AAAA,IAC9B,CAAA;AAEA,IAAA,MAAM,MAAA,GAAS,eAAe,GAAG,CAAA;AAEjC,IAAA,MAAM,SAAS,IAAA,CAAK,YAAA,CAAa,IAAI,KAAK,CAAA,EAAG,IAAI,MAAM,CAAA;AACvD,IAAA,IAAI,MAAA,EAAQ;AAGV,MAAA,KAAK,OAAO,IAAA,CAAK,gBAAA,EAAkB,KAAK,IAAI,CAAA,CAAE,MAAM,CAAA,GAAA,KAAO;AACzD,QAAA,IAAA,CAAK,eAAe,KAAA,EAAO,GAAA,EAAK,EAAE,KAAA,EAAO,MAAM,CAAA;AAAA,MACjD,CAAC,CAAA;AAAA,IACH,CAAA,MAAO;AACL,MAAA,MAAA,CAAO,gBAAA,EAAkB,KAAK,IAAI,CAAA;AAAA,IACpC;AAAA,EACF;AACF","file":"chunk-D324RFFU.cjs","sourcesContent":["import type { Event, EventCallback, SubscribeOptions } from './types';\n\n/**\n * Delivery model for a PubSub implementation.\n *\n * - `pull`: consumers actively read from the broker (e.g. Redis Streams\n *   XREADGROUP, GCP Pub/Sub streamingPull, SQS ReceiveMessage). Mastra runs\n *   a long-lived `OrchestrationWorker` that owns a subscription loop.\n *\n * - `push`: events arrive without the consumer asking — either in-process\n *   (EventEmitter dispatching to a registered listener) or out-of-process\n *   (the broker POSTs to an HTTP endpoint, e.g. GCP Pub/Sub push, SNS,\n *   EventBridge). Mastra wires the workflow handler directly to the pubsub\n *   for in-process push, or relies on `POST /api/workers/events` for\n *   broker push delivered over HTTP.\n */\nexport type PubSubDeliveryMode = 'pull' | 'push';\n\nexport abstract class PubSub {\n  abstract publish(\n    topic: string,\n    event: Omit<Event, 'id' | 'createdAt'>,\n    options?: { localOnly?: boolean },\n  ): Promise<void>;\n  abstract subscribe(topic: string, cb: EventCallback, options?: SubscribeOptions): Promise<void>;\n  abstract unsubscribe(topic: string, cb: EventCallback): Promise<void>;\n  /**\n   * Drain any buffered or in-flight deliveries before resolving.\n   *\n   * Best-effort: a `flush()` that resolves successfully does not guarantee\n   * every subscriber callback succeeded — implementations surface per-event\n   * delivery errors via their configured logger rather than re-throwing,\n   * so a single failed callback does not mask later cleanup work.\n   */\n  abstract flush(): Promise<void>;\n\n  /**\n   * Delivery modes this PubSub implementation supports.\n   *\n   * Defaults to `['pull']` for backward compatibility — third-party\n   * implementations that don't override this property are treated as\n   * pull-mode, which preserves today's behavior.\n   *\n   * Implementations that deliver events without an active read loop (e.g.\n   * EventEmitter, GCP Pub/Sub push subscriptions) should declare `'push'`.\n   * Implementations that support both modes should declare both.\n   */\n  get supportedModes(): ReadonlyArray<PubSubDeliveryMode> {\n    return ['pull'];\n  }\n\n  /**\n   * Whether this implementation honors `options.batch` on `subscribe()`\n   * natively. Defaults to `false`.\n   *\n   * Implementations that integrate batching internally (e.g. against their\n   * own broker retention or via an `AckHandleBuffer`) override this getter\n   * and return `true`.\n   */\n  get supportsNativeBatching(): boolean {\n    return false;\n  }\n\n  /**\n   * Get historical events for a topic.\n   * Default implementation returns empty array (no history support).\n   * Override in implementations that support event caching.\n   *\n   * @param topic - The topic to get history for\n   * @param offset - Starting index (0-based), defaults to 0\n   * @returns Array of events from the specified index\n   */\n  getHistory(_topic: string, _offset?: number): Promise<Event[]> {\n    return Promise.resolve([]);\n  }\n\n  /**\n   * Subscribe to a topic with automatic replay of cached events.\n   * First replays any cached history, then subscribes to live events.\n   * Default implementation falls back to regular subscribe (no replay).\n   * Override in implementations that support event caching.\n   *\n   * @param topic - The topic to subscribe to\n   * @param cb - Callback invoked for each event (both cached and live)\n   */\n  subscribeWithReplay(topic: string, cb: EventCallback): Promise<void> {\n    return this.subscribe(topic, cb);\n  }\n\n  /**\n   * Subscribe to a topic with replay starting from a specific index.\n   * This is more efficient than full replay when the client knows their last position.\n   * Default implementation falls back to subscribeWithReplay (full replay).\n   * Override in implementations that support indexed event caching.\n   *\n   * @param topic - The topic to subscribe to\n   * @param offset - Start replaying from this index (0-based)\n   * @param cb - Callback invoked for each event\n   */\n  subscribeFromOffset(topic: string, _offset: number, cb: EventCallback): Promise<void> {\n    return this.subscribeWithReplay(topic, cb);\n  }\n}\n","import type { Event, SubscribeBatchOptions } from '../types';\n\n/**\n * Opaque timer handle. We don't care whether the runtime returns a number\n * (browser) or a `Timeout` (Node) — we only ever hand it back to\n * `clearTimeout`. Branding it keeps the type honest (you can't pass an\n * arbitrary number) while staying erasure-free at runtime.\n */\ndeclare const batchPolicyTimerHandleBrand: unique symbol;\nexport type BatchPolicyTimerHandle = { readonly [batchPolicyTimerHandleBrand]: true };\n\n/**\n * Injectable dependencies for `BatchPolicy`. Tests pass fake timers /\n * controllable clocks; production uses Node's `Date.now` / `setTimeout` /\n * `clearTimeout`.\n */\nexport interface BatchPolicyDeps {\n  now: () => number;\n  setTimeout: (cb: () => void, ms: number) => BatchPolicyTimerHandle;\n  clearTimeout: (handle: BatchPolicyTimerHandle) => void;\n}\n\nconst defaultDeps: BatchPolicyDeps = {\n  now: () => Date.now(),\n  setTimeout: (cb, ms) => setTimeout(cb, ms) as unknown as BatchPolicyTimerHandle,\n  clearTimeout: handle => clearTimeout(handle as unknown as Parameters<typeof clearTimeout>[0]),\n};\n\nexport type EnqueueDecision = 'flush-now' | 'wait';\n\nexport const DEFAULT_MAX_BUFFER_SIZE = 256;\nexport const DEFAULT_OVERFLOW: NonNullable<SubscribeBatchOptions['overflow']> = 'coalesce-or-drop-oldest';\n\n/**\n * Internal to `EventEmitterPubSub`. Embedded by `AckHandleBuffer` to decide\n * when a batched subscription should flush (size, time, coalesce, overflow).\n *\n * Not part of the public API — users configure batching via\n * `SubscribeBatchOptions` on `subscribe`.\n */\nexport class BatchPolicy {\n  private readonly opts: SubscribeBatchOptions;\n  private readonly deps: BatchPolicyDeps;\n  private readonly maxBufferSize: number;\n  private readonly overflow: NonNullable<SubscribeBatchOptions['overflow']>;\n\n  private firstQueuedAt: number | null = null;\n  private lastDeliveredAt: number = -Infinity;\n  private size = 0;\n  private timer: BatchPolicyTimerHandle | null = null;\n  private flushHandler: (() => void | Promise<void>) | null = null;\n\n  constructor(opts: SubscribeBatchOptions, deps: BatchPolicyDeps = defaultDeps) {\n    this.opts = opts;\n    this.deps = deps;\n    this.maxBufferSize = opts.maxBufferSize ?? DEFAULT_MAX_BUFFER_SIZE;\n    this.overflow = opts.overflow ?? DEFAULT_OVERFLOW;\n  }\n\n  /** Bind the function invoked when the deadline timer fires. */\n  bindFlushHandler(fn: () => void | Promise<void>): void {\n    this.flushHandler = fn;\n  }\n\n  /**\n   * Called by the integrator each time an event is enqueued.\n   * Returns whether the integrator should flush immediately.\n   */\n  onEnqueue(event: Event): EnqueueDecision {\n    this.size += 1;\n    if (this.firstQueuedAt === null) {\n      this.firstQueuedAt = this.deps.now();\n    }\n\n    const now = this.deps.now();\n    const intervalFloor = this.lastDeliveredAt + (this.opts.minIntervalMs ?? 0);\n\n    // Immediate event — bypass maxWait/maxSize gating, but still respect interval floor.\n    if (this.opts.isImmediate?.(event)) {\n      if (now >= intervalFloor) {\n        return 'flush-now';\n      }\n      // Hold until the floor; reschedule timer there.\n      this.scheduleAt(intervalFloor);\n      return 'wait';\n    }\n\n    // Overflow trigger (regardless of interval — overflow is a budget enforcement).\n    if (this.size >= this.maxBufferSize) {\n      return 'flush-now';\n    }\n\n    // maxSize trigger — respects interval floor.\n    if (this.opts.maxSize !== undefined && this.size >= this.opts.maxSize) {\n      if (now >= intervalFloor) {\n        return 'flush-now';\n      }\n      this.scheduleAt(intervalFloor);\n      return 'wait';\n    }\n\n    // No immediate trigger — schedule the deadline if there is one.\n    this.scheduleDeadline();\n    return 'wait';\n  }\n\n  /**\n   * Called by the integrator after a successful flush has delivered\n   * `deliveredCount` events. Resets timer + firstQueuedAt.\n   */\n  onFlushed(deliveredCount: number): void {\n    this.lastDeliveredAt = this.deps.now();\n    this.size = Math.max(0, this.size - deliveredCount);\n    this.firstQueuedAt = null;\n    this.cancelTimer();\n  }\n\n  /**\n   * Pure helper. Given the caller-owned queue contents, applies `coalesce`\n   * and `overflow` to decide what to deliver and what to drop.\n   * Order-preserving for kept events.\n   */\n  prepareBatch(events: Event[]): { delivered: Event[]; dropped: Event[] } {\n    let working = events;\n\n    // 1. Coalesce, if configured.\n    if (this.opts.coalesce) {\n      const coalesced = this.opts.coalesce(working);\n      // Contract: `coalesce` MUST return a subset of `working` by reference\n      // identity. Anything else (fresh objects, even with matching ids)\n      // breaks ack routing downstream — AckHandleBuffer keys ack/nack by\n      // event reference, and there's no way to map a manufactured event\n      // back to the original transport handle. Detect the violation here\n      // and discard the whole batch (treat all originals as dropped) rather\n      // than silently deliver references with no ack/nack wired up.\n      const inputRefs = new Set<Event>(working);\n      const allInInput = coalesced.every(e => inputRefs.has(e));\n      working = allInInput ? coalesced : [];\n    }\n\n    const keptRefs = new Set<Event>(working);\n    const computeDropped = (): Event[] => events.filter(e => !keptRefs.has(e));\n\n    // 2. Overflow handling — only if still over `maxBufferSize`.\n    if (working.length <= this.maxBufferSize) {\n      return { delivered: working, dropped: computeDropped() };\n    }\n\n    const overBy = working.length - this.maxBufferSize;\n    const isImmediate = this.opts.isImmediate;\n\n    let kept: Event[];\n    let droppedFromOverflow: Event[];\n\n    switch (this.overflow) {\n      case 'drop-newest': {\n        const splitFromEnd = this.takeWithoutDropping(working, overBy, isImmediate, /* fromEnd */ true);\n        kept = splitFromEnd.kept;\n        droppedFromOverflow = splitFromEnd.dropped;\n        break;\n      }\n      case 'drop-oldest':\n      case 'coalesce-or-drop-oldest':\n      default: {\n        const splitFromStart = this.takeWithoutDropping(working, overBy, isImmediate, /* fromEnd */ false);\n        kept = splitFromStart.kept;\n        droppedFromOverflow = splitFromStart.dropped;\n        break;\n      }\n    }\n\n    return { delivered: kept, dropped: [...computeDropped(), ...droppedFromOverflow] };\n  }\n\n  /** Stop the timer and clear policy state. */\n  dispose(): void {\n    this.cancelTimer();\n    this.flushHandler = null;\n    this.firstQueuedAt = null;\n    this.size = 0;\n  }\n\n  private scheduleDeadline(): void {\n    if (this.opts.maxWaitMs === undefined && this.opts.minIntervalMs === undefined) {\n      // No time-based trigger — only `maxSize` / `isImmediate` can flush.\n      return;\n    }\n\n    const firstQueuedAt = this.firstQueuedAt ?? this.deps.now();\n    const deadline = this.opts.maxWaitMs !== undefined ? firstQueuedAt + this.opts.maxWaitMs : Number.POSITIVE_INFINITY;\n    const floor = this.lastDeliveredAt + (this.opts.minIntervalMs ?? 0);\n    const at = Math.max(deadline, floor);\n    this.scheduleAt(at);\n  }\n\n  private scheduleAt(at: number): void {\n    if (!isFinite(at)) {\n      return;\n    }\n    this.cancelTimer();\n    const delay = Math.max(0, at - this.deps.now());\n    this.timer = this.deps.setTimeout(() => {\n      this.timer = null;\n      const handler = this.flushHandler;\n      if (handler) {\n        void handler();\n      }\n    }, delay);\n  }\n\n  private cancelTimer(): void {\n    if (this.timer !== null) {\n      this.deps.clearTimeout(this.timer);\n      this.timer = null;\n    }\n  }\n\n  /**\n   * Drop `count` non-immediate items from the start (or end) of `items`.\n   * Immediate items are never dropped — if every candidate is immediate,\n   * fewer than `count` items are dropped.\n   */\n  private takeWithoutDropping(\n    items: Event[],\n    count: number,\n    isImmediate: ((e: Event) => boolean) | undefined,\n    fromEnd: boolean,\n  ): { kept: Event[]; dropped: Event[] } {\n    const dropped: Event[] = [];\n    const result = [...items];\n    let remaining = count;\n\n    const order = fromEnd ? [...result.keys()].reverse() : [...result.keys()];\n\n    for (const idx of order) {\n      if (remaining === 0) break;\n      const ev = result[idx]!;\n      if (isImmediate?.(ev)) continue;\n      dropped.push(ev);\n      result[idx] = undefined as unknown as Event;\n      remaining -= 1;\n    }\n\n    const kept = result.filter((e): e is Event => e !== undefined);\n    return { kept, dropped };\n  }\n}\n","import type { Event, EventCallback, SubscribeBatchOptions } from '../types';\nimport type { BatchPolicyDeps } from './batch-policy';\nimport { BatchPolicy } from './batch-policy';\n\ninterface Entry {\n  event: Event;\n  ack?: () => Promise<void>;\n  nack?: () => Promise<void>;\n}\n\n/**\n * In-process queue used by `EventEmitterPubSub` to turn its\n * one-event-per-emit stream into batched callback invocations.\n * Owns a `BatchPolicy` that decides when to flush (size, time,\n * quiet-period) and holds (event, ack, nack) triples in publish\n * order until that decision fires.\n *\n * Extracted from `EventEmitterPubSub` only so the batching state\n * machine can be tested in isolation. Not a public extension point.\n * State is per-process; the queue dies with the process.\n */\nexport class AckHandleBuffer {\n  private readonly policy: BatchPolicy;\n  private queue: Entry[] = [];\n  private flushing = false;\n  private reflush = false;\n  private disposed = false;\n\n  constructor(\n    private readonly cb: EventCallback,\n    opts: SubscribeBatchOptions,\n    deps?: BatchPolicyDeps,\n    private readonly onError?: (err: unknown, ctx: { phase: 'cb' | 'ack-dropped' }) => void,\n  ) {\n    this.policy = new BatchPolicy(opts, deps);\n    // The policy's deadline timer fires this handler fire-and-forget (it\n    // discards the returned promise), so a rejection from `flush()` — e.g. a\n    // user-supplied `coalesce` throwing inside `prepareBatch`, which lands\n    // outside the per-event try/catch below — would otherwise escape as an\n    // unhandled rejection on the timer path. The inline flush-now, group, and\n    // explicit `flush()` paths each catch this already; route the timer path\n    // through the same `onError` channel.\n    this.policy.bindFlushHandler(() =>\n      this.flush().catch(err => {\n        this.onError?.(err, { phase: 'cb' });\n      }),\n    );\n  }\n\n  /**\n   * Called by the adapter for each event arriving from the underlying transport.\n   */\n  async push(event: Event, ack?: () => Promise<void>, nack?: () => Promise<void>): Promise<void> {\n    if (this.disposed) return;\n    this.queue.push({ event, ack, nack });\n    const decision = this.policy.onEnqueue(event);\n    if (decision === 'flush-now') {\n      await this.flush();\n    }\n  }\n\n  /**\n   * Drain the current queue regardless of policy state. Safe to call from\n   * adapter `flush()` or external code that wants to force delivery.\n   */\n  async flush(): Promise<void> {\n    // A flush-now request that lands while we're already draining is not\n    // dropped — latch it so the current pass picks up the new events as\n    // soon as it finishes its current snapshot, instead of forcing those\n    // events to wait until the policy timer fires.\n    if (this.flushing) {\n      this.reflush = true;\n      return;\n    }\n    // Empty buffer is a true no-op. `policy.onFlushed` bumps `lastDeliveredAt`,\n    // which extends the `minIntervalMs` floor — calling it on every empty\n    // flush silently corrupts the cadence for callers that flush() defensively.\n    if (this.queue.length === 0) return;\n\n    this.flushing = true;\n    try {\n      do {\n        this.reflush = false;\n        if (this.queue.length === 0) break;\n\n        const snapshot = this.queue;\n        this.queue = [];\n\n        const events = snapshot.map(e => e.event);\n        // Build a reverse index once so we don't pay O(n) per event looking up\n        // the original Entry below.\n        const byEvent = new Map<Event, Entry>();\n        for (const e of snapshot) byEvent.set(e.event, e);\n\n        const { delivered, dropped } = this.policy.prepareBatch(events);\n\n        // Ack events that were coalesced or overflow-dropped — they should\n        // not be redelivered. The transport's own ack is the right hook.\n        for (const ev of dropped) {\n          const entry = byEvent.get(ev);\n          if (entry?.ack) {\n            try {\n              await entry.ack();\n            } catch (err) {\n              this.onError?.(err, { phase: 'ack-dropped' });\n            }\n          }\n        }\n\n        for (const ev of delivered) {\n          // A cb may dispose the buffer mid-flush (e.g. subscriber tearing\n          // itself down on a fatal event). Honor it immediately — don't keep\n          // feeding events into a callback that asked to stop.\n          if (this.disposed) break;\n          const entry = byEvent.get(ev);\n          try {\n            // The declared EventCallback return type is `void`, but real\n            // implementations frequently return a Promise. Await both kinds\n            // so per-event isolation actually waits for the cb to settle.\n            await (this.cb(ev, entry?.ack, entry?.nack) as void | Promise<void>);\n          } catch (err) {\n            this.onError?.(err, { phase: 'cb' });\n          }\n        }\n\n        // `policy.size` was incremented once per push; decrement it by\n        // everything that left the queue (delivered + dropped) so it doesn't\n        // drift upward and trip maxSize prematurely.\n        this.policy.onFlushed(delivered.length + dropped.length);\n      } while (this.reflush && !this.disposed);\n    } finally {\n      this.flushing = false;\n    }\n  }\n\n  dispose(): void {\n    this.disposed = true;\n    this.queue = [];\n    this.policy.dispose();\n  }\n}\n","import EventEmitter from 'node:events';\nimport type { IMastraLogger } from '../../logger';\nimport { PubSub } from '../pubsub';\nimport type { PubSubDeliveryMode } from '../pubsub';\nimport type { Event, EventCallback, SubscribeOptions } from '../types';\nimport { AckHandleBuffer } from './ack-handle-buffer';\n\nexport interface EventEmitterPubSubOptions {\n  /**\n   * Optional logger for surfacing batched-delivery errors. Falls back to\n   * `console.error` when not provided.\n   */\n  logger?: IMastraLogger;\n}\n\n// Reused for the fan-out delivery path where ack/nack are no-ops: the process\n// is the broker, there is no transport-level redelivery to negotiate. Hoisted\n// to module scope so we don't allocate two new closures per emitted event.\nconst NOOP_ACK = async (): Promise<void> => {};\n\nexport class EventEmitterPubSub extends PubSub {\n  // EventEmitter dispatches synchronously to listeners, so it can serve both\n  // a push consumer (no worker) and a pull-style worker that simply calls\n  // `subscribe()` to register a listener. Both modes are advertised so the\n  // default in-process setup keeps using OrchestrationWorker, while\n  // genuinely push-only transports (GCP Pub/Sub push, SNS, EventBridge)\n  // declare `['push']` only and skip the worker.\n  override get supportedModes(): ReadonlyArray<PubSubDeliveryMode> {\n    return ['pull', 'push'];\n  }\n\n  /**\n   * `EventEmitterPubSub` is strictly in-process, so the `AckHandleBuffer`\n   * queue it uses for batching shares the same lifetime as everything\n   * else here. Nothing more durable is promised, and nothing less is\n   * needed.\n   */\n  override get supportsNativeBatching(): boolean {\n    return true;\n  }\n\n  private emitter: EventEmitter;\n\n  // group → topic → callbacks[]\n  private groups: Map<string, Map<string, EventCallback[]>> = new Map();\n  // \"topic:group\" → round-robin counter\n  private groupCounters: Map<string, number> = new Map();\n  // \"topic:group\" → the single listener registered on the emitter for this group\n  private groupListeners: Map<string, (event: Event) => void> = new Map();\n\n  // Track pending nack redeliveries so flush() can wait and close() can cancel them\n  private pendingNacks: Set<ReturnType<typeof setTimeout>> = new Set();\n\n  // Track delivery attempts per message id\n  private deliveryAttempts: Map<string, number> = new Map();\n\n  // topic → (original callback → wrapped listener) for fan-out (non-group) subscribers.\n  // Nested keying so the same callback registered on multiple topics keeps\n  // a distinct wrapper per topic.\n  private fanoutWrappers: Map<string, Map<EventCallback, (event: Event) => void>> = new Map();\n\n  // topic → (original callback → buffer). Present only for subscribers that\n  // opt into batching via `options.batch`. The buffer is the destination of\n  // the emitter listener; it invokes the user cb according to its policy.\n  private batchBuffers: Map<string, Map<EventCallback, AckHandleBuffer>> = new Map();\n\n  private readonly logger?: IMastraLogger;\n\n  constructor(existingEmitter?: EventEmitter, options: EventEmitterPubSubOptions = {}) {\n    super();\n    this.emitter = existingEmitter ?? new EventEmitter();\n    this.logger = options.logger;\n  }\n\n  /**\n   * Debug-hostile silent failures are the default for emitter listeners.\n   * Surface buffer-side errors on a single channel so they're at least visible.\n   */\n  private logBufferError(topic: string, err: unknown, ctx: { phase: 'cb' | 'ack-dropped' }): void {\n    const message = `[EventEmitterPubSub] batched ${ctx.phase} failed for ${topic}`;\n    if (this.logger) {\n      this.logger.error(message, err);\n    } else {\n      console.error(message, err);\n    }\n  }\n\n  async publish(\n    topic: string,\n    event: Omit<Event, 'id' | 'createdAt'>,\n    _options?: { localOnly?: boolean },\n  ): Promise<void> {\n    const id = crypto.randomUUID();\n    const createdAt = new Date();\n    this.emitter.emit(topic, {\n      ...event,\n      id,\n      createdAt,\n      deliveryAttempt: 1,\n    });\n  }\n\n  async subscribe(topic: string, cb: EventCallback, options?: SubscribeOptions): Promise<void> {\n    if (options?.batch) {\n      // Batched path: insert an AckHandleBuffer between the emitter and cb.\n      // ack/nack are no-ops at this layer — the process is the broker.\n      const buffer = new AckHandleBuffer(cb, options.batch, undefined, (err, ctx) => {\n        this.logBufferError(topic, err, ctx);\n      });\n      let byCb = this.batchBuffers.get(topic);\n      if (!byCb) {\n        byCb = new Map();\n        this.batchBuffers.set(topic, byCb);\n      }\n      byCb.set(cb, buffer);\n\n      if (options.group) {\n        // Group path: the group's member list keeps the original `cb` so\n        // `unsubscribe(topic, cb)` and round-robin tracking work unchanged.\n        // `deliverToGroup` checks `batchBuffers` and routes through the\n        // buffer when present.\n        this.subscribeWithGroup(topic, cb, options.group);\n      } else {\n        const wrapper = (event: Event) => {\n          // Fire-and-forget — buffer.push can reject if the user-supplied\n          // `coalesce` throws or any other policy step fails during an\n          // inline flush-now. Surface those through the logger rather\n          // than letting them become unhandled rejections.\n          void buffer.push(event, NOOP_ACK, NOOP_ACK).catch(err => {\n            this.logBufferError(topic, err, { phase: 'cb' });\n          });\n        };\n        let byCbFanout = this.fanoutWrappers.get(topic);\n        if (!byCbFanout) {\n          byCbFanout = new Map();\n          this.fanoutWrappers.set(topic, byCbFanout);\n        }\n        byCbFanout.set(cb, wrapper);\n        this.emitter.on(topic, wrapper);\n      }\n      return;\n    }\n\n    if (options?.group) {\n      this.subscribeWithGroup(topic, cb, options.group);\n    } else {\n      const wrapper = (event: Event) => {\n        cb(event, NOOP_ACK, NOOP_ACK);\n      };\n      let byCb = this.fanoutWrappers.get(topic);\n      if (!byCb) {\n        byCb = new Map();\n        this.fanoutWrappers.set(topic, byCb);\n      }\n      byCb.set(cb, wrapper);\n      this.emitter.on(topic, wrapper);\n    }\n  }\n\n  async unsubscribe(topic: string, cb: EventCallback): Promise<void> {\n    // Tear down a batching buffer for this (topic, cb) pair, if one was set\n    // up by `subscribe`. Done first so any in-flight emitter dispatches\n    // ignore further events into a disposed buffer.\n    const byCbBuffers = this.batchBuffers.get(topic);\n    const buffer = byCbBuffers?.get(cb);\n    if (buffer && byCbBuffers) {\n      buffer.dispose();\n      byCbBuffers.delete(cb);\n      if (byCbBuffers.size === 0) this.batchBuffers.delete(topic);\n    }\n\n    // Check if this callback is in any group for this topic\n    for (const [group, topicMap] of this.groups) {\n      const members = topicMap.get(topic);\n      if (members) {\n        const idx = members.indexOf(cb);\n        if (idx !== -1) {\n          members.splice(idx, 1);\n          // If group is now empty for this topic, remove the emitter listener\n          if (members.length === 0) {\n            topicMap.delete(topic);\n            const listenerKey = `${topic}:${group}`;\n            const listener = this.groupListeners.get(listenerKey);\n            if (listener) {\n              this.emitter.off(topic, listener);\n              this.groupListeners.delete(listenerKey);\n              this.groupCounters.delete(listenerKey);\n            }\n          }\n          if (topicMap.size === 0) {\n            this.groups.delete(group);\n          }\n          return;\n        }\n      }\n    }\n\n    // Not in a group — remove as fan-out listener\n    const byCb = this.fanoutWrappers.get(topic);\n    const wrapper = byCb?.get(cb);\n    if (wrapper && byCb) {\n      this.emitter.off(topic, wrapper);\n      byCb.delete(cb);\n      if (byCb.size === 0) this.fanoutWrappers.delete(topic);\n    } else {\n      this.emitter.off(topic, cb);\n    }\n  }\n\n  async flush(): Promise<void> {\n    // A batched cb can nack mid-delivery, which schedules a redelivery via\n    // setTimeout(0). The redelivered event lands back in `batchBuffers`\n    // (the buffer is the group member's destination) and may sit there\n    // below maxSize/maxWaitMs thresholds. So we loop: drain buffers, wait\n    // for pending nacks to fire, then check whether either side produced\n    // more work. Stable-state termination requires both to be empty at the\n    // top of a single iteration.\n    while (true) {\n      const drains: { topic: string; promise: Promise<void> }[] = [];\n      for (const [topic, byCb] of this.batchBuffers.entries()) {\n        for (const buffer of byCb.values()) {\n          drains.push({ topic, promise: buffer.flush() });\n        }\n      }\n      if (drains.length > 0) {\n        // allSettled — a single throwing buffer should not block the rest from\n        // flushing during shutdown. Rejections that propagate this far skipped\n        // the per-event try/catch in AckHandleBuffer (e.g. a throwing coalesce)\n        // and must be surfaced or they vanish at shutdown.\n        const results = await Promise.allSettled(drains.map(d => d.promise));\n        for (let i = 0; i < results.length; i++) {\n          const result = results[i]!;\n          if (result.status === 'rejected') {\n            this.logBufferError(drains[i]!.topic, result.reason, { phase: 'cb' });\n          }\n        }\n      }\n\n      if (this.pendingNacks.size === 0) {\n        // Nothing scheduled — and the drain above either did nothing or\n        // produced no new pending nacks, so we're stable.\n        return;\n      }\n\n      // Wait for the currently-scheduled nacks to fire. Each redelivery\n      // may land in a buffer; loop and re-drain.\n      await new Promise<void>(resolve => {\n        const check = () => {\n          if (this.pendingNacks.size === 0) {\n            resolve();\n          } else {\n            setTimeout(check, 10);\n          }\n        };\n        check();\n      });\n    }\n  }\n\n  /**\n   * Clean up all listeners during graceful shutdown.\n   */\n  async close(): Promise<void> {\n    // Cancel pending nack redeliveries\n    for (const handle of this.pendingNacks) {\n      clearTimeout(handle);\n    }\n    this.pendingNacks.clear();\n    this.deliveryAttempts.clear();\n\n    // Dispose every batching buffer so timers are cleared.\n    for (const byCb of this.batchBuffers.values()) {\n      for (const buffer of byCb.values()) {\n        buffer.dispose();\n      }\n    }\n    this.batchBuffers.clear();\n\n    this.emitter.removeAllListeners();\n    this.groups.clear();\n    this.groupCounters.clear();\n    this.groupListeners.clear();\n    this.fanoutWrappers.clear();\n  }\n\n  private subscribeWithGroup(topic: string, cb: EventCallback, group: string): void {\n    let topicMap = this.groups.get(group);\n    if (!topicMap) {\n      topicMap = new Map();\n      this.groups.set(group, topicMap);\n    }\n\n    let members = topicMap.get(topic);\n    if (!members) {\n      members = [];\n      topicMap.set(topic, members);\n    }\n\n    members.push(cb);\n\n    // Register a single emitter listener per topic:group pair\n    const listenerKey = `${topic}:${group}`;\n    if (!this.groupListeners.has(listenerKey)) {\n      const listener = (event: Event) => {\n        this.deliverToGroup(topic, group, listenerKey, event);\n      };\n\n      this.groupListeners.set(listenerKey, listener);\n      this.emitter.on(topic, listener);\n    }\n  }\n\n  private deliverToGroup(topic: string, group: string, listenerKey: string, event: Event): void {\n    const currentMembers = this.groups.get(group)?.get(topic);\n    if (!currentMembers || currentMembers.length === 0) return;\n\n    const counter = this.groupCounters.get(listenerKey) ?? 0;\n    const idx = counter % currentMembers.length;\n    this.groupCounters.set(listenerKey, counter + 1);\n\n    // Track delivery attempts scoped per group listener, so ack/nack in one\n    // group doesn't disturb another group's attempt counter for the same event.\n    const attemptKey = `${listenerKey}:${event.id}`;\n    const attempt = this.deliveryAttempts.get(attemptKey) ?? 1;\n    const eventWithAttempt = { ...event, deliveryAttempt: attempt };\n\n    const ack = async () => {\n      // Message successfully processed — clean up attempt tracking\n      this.deliveryAttempts.delete(attemptKey);\n    };\n\n    const nack = async () => {\n      // Message processing failed — redeliver to the group after a short delay\n      // Increment delivery attempt counter\n      this.deliveryAttempts.set(attemptKey, attempt + 1);\n\n      const handle = setTimeout(() => {\n        this.pendingNacks.delete(handle);\n        this.deliverToGroup(topic, group, listenerKey, event);\n      }, 0);\n      this.pendingNacks.add(handle);\n    };\n\n    const member = currentMembers[idx]!;\n    // If this member opted into batching, route through its buffer.\n    const buffer = this.batchBuffers.get(topic)?.get(member);\n    if (buffer) {\n      // Same rationale as the fan-out push above: surface rejections\n      // through the logger so they don't escape as unhandled.\n      void buffer.push(eventWithAttempt, ack, nack).catch(err => {\n        this.logBufferError(topic, err, { phase: 'cb' });\n      });\n    } else {\n      member(eventWithAttempt, ack, nack);\n    }\n  }\n}\n"]}