All files remote-files.ts

8.11% Statements 9/111
0% Branches 0/24
0% Functions 0/23
8.91% Lines 9/101

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180  1x 1x 1x 1x       1x   1x                                   1x                                                                                                                     1x                                                                                                                                                                                       1x
import { shuffle } from "lodash";
import { hashBlob } from "./common";
import { chunkSize, connections, IDeviceConnection } from "./connections";
import { getDB, hasPermission, IData, IFile } from "./db";
import { getCurrentConnection, RPC, setRemotelyCallableFunction, verifyRemoteUser } from "./remote-calls";
 
// remotelyCallableFunctions.getFile = getFile;
 
console.log('remote-files');
 
export async function getFileFromPeers(fileId: string, updateProgress?: (percent: number) => any): Promise<IFile> {
  // for (const connection of shuffle(connections.filter(c => c.remoteUserVerified))) {
  for (const connection of connections()) {
    Iif (!connection.remoteUserVerified) {
      try {
        await verifyRemoteUser(connection);
      } catch (err) {
        console.error('error verifying connection', err);
        continue;
      }
    }
    const file = await getFileFromPeer(fileId, connection, updateProgress);
    Iif (file) {
      return file;
    }
  }
}
 
export async function getFileFromPeer(fileId: string, connection?: IDeviceConnection, updateProgress?: (percent: number) => any): Promise<IFile> {
  Iif (!connection.remoteUserVerified) {
    try {
      await verifyRemoteUser(connection);
    } catch (err) {
      console.error('error verifying connection', err);
      return;
    }
  }
  const file = await RPC(connection, getFile)(fileId).catch(err => console.error('Error getting file from peers', err));
  Iif (!file) {
    return;
  }
  return new Promise((resolve, reject) => {
    const dcReceive = connection.pc.createDataChannel(`file-${file.id}`);
    dcReceive.onopen = e => console.log('receive dc open');
    let receiveBuffer = [];
    let receivedSize = 0;
    // let pid;
    // const TRANSFER_TIMEOUT_MS = 3000;
    // function refreshWatchDog() {
    //   clearTimeout(pid);
    //   pid = setTimeout(() => {
    //     dcReceive.close();
    //     console.log('file transfer timed out', dcReceive.label)
    //     reject(new Error('file transfer timed out'));
    //   }, TRANSFER_TIMEOUT_MS);
    // }
 
    dcReceive.onmessage = e => {
      // refreshWatchDog();
      receiveBuffer.push(e.data);
      receivedSize += e.data.byteLength;
 
      Iif (updateProgress) updateProgress(receivedSize / file.size);
 
      Iif (receivedSize === file.size) {
        file.blob = new Blob(receiveBuffer, { type: file.fileType });
        hashBlob(file.blob, updateProgress)
          .then(sha => {
            Iif (sha != file.id) return reject(new Error('File failed verification after transfer'))
            receiveBuffer = [];
            resolve(file);
            dcReceive.close();
          })
      }
    }
    dcReceive.onbufferedamountlow = e => console.log('buffered amount low');
    dcReceive.onclose = e => console.log('dc closed');
    dcReceive.onerror = e => {
      console.log('Error receiving file', e);
      reject(e);
    }
  });
}
 
 
// This is used to stream one file at a time per connection. 
//  It's better to get one file all the way through than many files a little bit through
const getFilePromises: { [connectionId: string]: Promise<void> } = {}
 
async function getFile(fileId: string) {
  const connection = getCurrentConnection() as IDeviceConnection;
  const db = await getDB();
  const file = await db.files.get(fileId);
  Iif (!file) return;
 
  // validate peer has permissions to file
  Iif (connection.me.id !== connection.remoteUser.id && !file.isPublic) {
    const remoteUserId = connection.remoteUser.id;
    Iif (!(file.shareUsers || []).includes(remoteUserId)) {
      const hasReadPermissions = (file.shareGroups || []).some(groupId => hasPermission(remoteUserId, groupId, 'read', db));
      Iif (!hasReadPermissions) {
        throw new Error(`Unauthorized access to file ${fileId}`);
      }
    }
  }
 
  let getFilePromise = getFilePromises[connection.id];
  Iif (!getFilePromise) {
    getFilePromise = Promise.resolve();
  }
 
  // getFilePromise = 
  getFilePromise.then(() => new Promise<void>((resolve) => {
    connection.waitForDataChannel(`file-${file.id}`).then(dcSend => {
      console.log('send dc open', dcSend);
      dcSend.onclose = e => {
        console.log('file transfer data channel closed', e);
        resolve();
      }
      dcSend.onerror = e => {
        console.error('error', e);
        resolve();
      }
      dcSend.onmessage = e => console.log('Error: message was received from a send only data channel', e);
 
      const fileReader = new FileReader();
      let offset = 0;
      fileReader.addEventListener('error', error => {
        console.error('Error reading file:', error);
        resolve();
      })
      fileReader.addEventListener('abort', event => {
        console.log('File reading aborted:', event)
        resolve()
      });
      fileReader.addEventListener('load', e => {
        const bytes = e.target.result as ArrayBuffer;
        dcSend.send(bytes);
        offset += bytes.byteLength;
        if (offset < file.size) {
          readSlice();
        } else {
          resolve();
        }
      });
      let backPressure = 0;
      const maxBufferedAmount = chunkSize * 100;
      const readSlice = () => {
        Iif (dcSend.readyState === 'closed' || 2 ** backPressure >= 1000) {
          resolve();
          return console.log('connection closed or not processing data, halting file transfer')
        }
        Iif (dcSend.bufferedAmount > maxBufferedAmount) {
          console.log(`waiting for buffer to get processed`, { backPressure, waitTimeMs: 2 ** backPressure }, dcSend.bufferedAmount);
          return setTimeout(() => {
            readSlice();
          }, 2 ** backPressure++);
        }
        backPressure = 0;
        const slice = file.blob.slice(offset, offset + chunkSize);
        fileReader.readAsArrayBuffer(slice);
      };
 
      // Event should be better than polling (with setTimeout) but couldn't get it to work
      // dcSend.onbufferedamountlow = e => {
      //   console.log('buffered amount low', e);
      //   if (offset < file.size) {
      //     readSlice(offset);
      //   }
      // }
      readSlice();
    })
  }))
 
  getFilePromises[connection.id] = getFilePromise;
 
  return file;
}
 
setRemotelyCallableFunction(getFile, 'getFile');