pin框架 200行PHP代码 实现无序的分片上传
Nash
共 26801字,需浏览 54分钟
·
2024-03-21 18:30
最近在为自己自己开发的 pin 框架编写文档,想着写一个案例,于是拿出前阵子涉及到的上传文件的场景做了个案例,先分享出来。
编写HTML文件如下:
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport"
content="width=device-width, user-scalable=no, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0">
<meta http-equiv="X-UA-Compatible" content="ie=edge">
<title>Document</title>
</head>
<body>
<script src="//127.0.0.1:9501/upload.js"></script>
<script>
let uploader = new Uploader();
uploader.onCancel = function () {
console.log('上传取消');
};
uploader.onProgress = function (progress) {
console.log('上传进度', progress);
};
uploader.onSuccess = function (res) {
console.log('上传完成', res);
};
uploader.onStart = function (files) {
console.log('开始上传,文件列表:', files);
}
uploader.onFail = function (err) {
console.log('错误信息:', err);
}
</script>
<button onclick="uploader.start({multiple:true})">upload</button>
</body>
</html>
其中的 upload.js 文件内容如下:
;
class Uploader {
/** @var {HTMLInputElement} */
#uploadDom
/** @var {Boolean} */
#selected
/** @var {Number} */
#checkTimes
/** @var {Boolean} */
#startSelect
/** @var {number} */
#chunkSize = 1024 * 516
/** @var {null|Function} */
onCancel
/** @var {null|Function} */
onStart
/** @var {null|Function} */
onProgress
/** @var {null|Function} */
onSuccess
/** @var {null|Function} */
onFail
/** @var {{count: Number, index: Number, total: Number, loaded: Number[], file: File|null}} */
progress
constructor({accept = null, multiple = false} = {}) {
this.#initUploadDom(accept, multiple)
this.#initDocumentDom()
this.#selected = false
this.#checkTimes = 0
this.#startSelect = false
this.onStart = null
this.onProgress = null
this.onSuccess = null
this.onFail = null
this.onCancel = null
this.progress = {count: 0, index: 0, total: 0, loaded: [], file: null}
}
start({accept = null, multiple = false} = {}) {
this.#startSelect = true
this.#selected = false
this.#checkTimes = 0
this.#uploadDom.value = ''
if (accept) this.#uploadDom.accept = accept
if (multiple) this.#uploadDom.multiple = multiple
this.#uploadDom.click()
this.progress = {count: 0, index: 0, total: 0, loaded: [], file: null}
}
async #startUpload() {
try {
this.#selected = true
let files = this.#uploadDom.files
this.onStart && this.onStart.call(this, files)
this.progress.count = files.length
for (let i = 0; i < files.length; i++) {
this.progress.index = i + 1
this.progress.file = files[i]
this.progress.total = files[i].size
this.progress.loaded = []
await this.#doUpload(files[i])
}
} catch (err) {
this.onFail && this.onFail.call(this, err)
}
}
/**
*
* @param {File} file
*/
#doUpload(file) {
let fileSize = file.size
let blockCount = Math.ceil(fileSize / this.#chunkSize)
let uploadHash = window.btoa(encodeURIComponent(Math.random() + (new Date()).toString()))
let asyncNum = 6
return new Promise((s, us) => {
let finished = []
for (let i = 0; i < asyncNum; i++) {
new Promise(async (f, uf) => {
let p = 0
try {
while (true) {
let index = (p++) * asyncNum + i
if (index >= blockCount)
break
await new Promise((d, ud) => this.#doUploadChunk(file, fileSize, uploadHash, blockCount, index, d, ud))
}
f()
} catch (err) {
uf(err)
}
}).then(()=>{
finished.push(true)
if (finished.length === asyncNum)
s()
}).catch(err=>us(err))
}
})
}
/**
*
* @param {File} file
* @param {Number} fileSize
* @param {string} uploadHash
* @param {Number} blockCount
* @param {Number} index
* @param {Function} finalDone
* @param {Function} ud
*/
#doUploadChunk(file, fileSize, uploadHash, blockCount, index, finalDone, ud) {
if (blockCount <= index)
return finalDone()
let form = new FormData()
let min = index * this.#chunkSize
let max = Math.min((index + 1) * this.#chunkSize, fileSize)
form.append('file', this.#getSlice(file, min, max))
let xhr = new XMLHttpRequest()
let isFail = false
if (this.onProgress) {
xhr.upload.onprogress = (e) => {
if(!isFail && e.lengthComputable) {
this.progress.loaded[index] = Math.floor(e.loaded * (max - min) / e.total)
this.onProgress.call(this, {
count: this.progress.count, index: this.progress.index, total: this.progress.total, file: this.progress.file,
loaded: this.progress.loaded.reduce((prev,cur) => prev + cur),
})
}
}
}
if (this.onSuccess) {
xhr.onload = () => {
if (xhr.status === 200) {
let res = JSON.parse(xhr.responseText)
if (res.data) {
this.onSuccess.call(this, {count: this.progress.count, index: this.progress.index, total: this.progress.total, file: this.progress.file, result: res})
}
finalDone()
} else {
isFail = true
ud({status: xhr.status, response: xhr.response})
}
}
}
xhr.onerror = () => {
isFail = true
ud({status: xhr.status, response: xhr.response})
}
xhr.open('POST', this.currentUrl.protocol + '//' + this.currentUrl.host + '/upload', true)
xhr.setRequestHeader('File-Name', file.name)
xhr.setRequestHeader('File-Hash', uploadHash)
xhr.setRequestHeader('File-Count', blockCount)
xhr.setRequestHeader('File-Index', index + 1)
xhr.send(form)
}
/**
* @param {File} file
* @param {Number} start
* @param {Number} end
* @returns {Blob}
*/
#getSlice(file, start, end) {
if (File.prototype.slice) {
return file.slice(start, end)
} else if (File.prototype.mozSlice) {
return file.mozSlice(start, end)
} else if (File.prototype.webkitSlice) {
return file.webkitSlice(start, end)
}
}
#documentFocus() {
if (!this.#startSelect)
return
this.#checkTimes++
if (this.#selected) {
this.#startSelect = false
} else if (this.#checkTimes >= 50) {
this.#startSelect = false
this.onCancel && this.onCancel.call(this)
} else
setTimeout(this.#documentFocus.bind(this), 10)
}
#initUploadDom(accept = null, multiple = false) {
this.#uploadDom = document.createElement('input')
this.#uploadDom.type = 'file'
if (accept) this.#uploadDom.accept = accept
if (multiple) this.#uploadDom.multiple = true
this.#uploadDom.addEventListener('change', this.#startUpload.bind(this))
}
#initDocumentDom() {
window.addEventListener('focus', this.#documentFocus.bind(this))
}
}
(function (d) {
if (!Uploader.prototype.currentUrl) {
let scripts= d.getElementsByTagName('script')
Uploader.prototype.currentUrl = new URL(scripts[scripts.length - 1].src)
}
})(document)
原理也很简单,在单击按钮的时候,出发Uploader对象的start方法,生成一个input:file 并点击它选择文件,文件选好之后,对文件大小进行分片,然后按照6个请求的池子做并发上传(分片上传不分先后,可以是无序的)
PHP的代码如下:
use Hyperf\Coroutine\Coroutine;
use Hyperf\HttpMessage\Upload\UploadedFile;
use Nash\Pin\Server\{Request, Response};
use Hyperf\HttpMessage\Exception\HttpException;
require __DIR__ . '/vendor/autoload.php';
(function() {
class BusinessException extends HttpException {}
// 全局的异常处理器
App::addExceptionHandler(function (Throwable $throwable) {
if ($throwable instanceof BusinessException)
Response::instance()->json(['code' => $throwable->getCode(), 'data' => null, 'message' => $throwable->getMessage()]);
if ($throwable instanceof HttpException)
return Response::instance()->withStatus($throwable->getStatusCode());
Response::instance()->withStatus(500);
return $throwable;
});
// 全局的中间件
App::addMiddleware(function ($next) {
Response::instance()->withHeader('Access-Control-Allow-Origin', '*');
Response::instance()->withHeader('Access-Control-Allow-Headers', 'File-Hash, File-Name, File-Count, File-Index');
if (Request::instance()->isMethod('OPTIONS'))
return Response::instance()->withStatus(200);
return $next();
});
$js = file_get_contents(__DIR__ . '/upload.js');
// 获取上传脚本
App::get('/upload.js', fn() => $js);
// 执行上传操作
App::post('/upload', function (Request $request) {
if ($fileName = (new Uploader($request))->setUploadDirectory(__DIR__ . '/upload')->doUpload())
return ['code' => 0, 'data' => $fileName, 'message' => '文件上传完成'];
return ['code' => 0, 'data' => null, 'message' => '分片上传完成'];
});
class Uploader {
private UploadedFile $file;
private string $hash;
private string $name;
private int $index;
private int $count;
private string $uploadDirectory;
private string $uploadTaskDirectory;
private string $targetName;
private bool $isFirst = false;
/**
* @param Request $request
*/
public function __construct(Request $request) {
if (!$request->hasFile('file'))
throw new BusinessException(400, '没有上传的文件', 10001);
$this->init($request);
$this->setUploadDirectory('/tmp');
}
/**
* @param Request $request
* @return void
*/
private function init(Request $request): void {
[$this->file, $this->hash] = [$request->file('file'), $request->query('file-hash', $request->header('File-Hash'))];
if (empty($this->hash)) {
$this->name = $this->hash = md5(mt_rand() . time());
$this->index = $this->count = 1;
} else {
$this->name = $request->query('file-name', $request->header('File-Name', $this->hash));
$this->index = (int)$request->query('file-index', $request->header('File-Index', 1));
$this->count = (int)$request->query('file-count', $request->header('File-Count', 1));
}
}
/**
* @param string $uploadDirectory
* @return Uploader
*/
public function setUploadDirectory(string $uploadDirectory): self {
$this->uploadDirectory = rtrim($uploadDirectory, '/');
return $this;
}
/**
* @return $this
*/
public function doUpload(): ?string {
$this->targetName = $this->uploadDirectory . '/' . $this->name;
if ($this->count === 1) {
$this->singleUpload();
} else {
$this->multipleUpload();
}
return $this->waitMerge();
}
/**
* @return string|null
*/
private function waitMerge(): ?string {
if ($this->count === 1)
return $this->targetName;
if ($this->isFirst) Coroutine::create(fn()=>$this->merge());
if (count(scandir($this->uploadTaskDirectory)) === 2 + 2 * ($this->count)) {
$fp = fopen($this->targetName . '.lock', 'w+');
if (flock($fp, LOCK_EX | LOCK_NB)) {
$fileFp = fopen($this->targetName, 'rb+');
flock($fileFp, LOCK_EX);
Coroutine::create(function () {
exec('rm -rf "' . $this->uploadTaskDirectory . '"');
unlink($this->targetName . '.lock');
});
return $this->targetName;
}
}
return null;
}
/**
* @return void
*/
private function merge(): void {
[$currentIndex, $startTime, $fp] = [1, time(), fopen($this->targetName, 'wb+')];
flock($fp, LOCK_EX);
while (true) {
$currentTmpFile = "$this->uploadTaskDirectory/$currentIndex-$this->name.tmp";
if (file_exists($currentTmpFile) && file_exists($currentTmpFile . '.md5') && $currentIndex++) {
$tmpFp = fopen($currentTmpFile, 'rb');
fwrite($fp, fread($tmpFp, filesize($currentTmpFile)));
if ($currentIndex > $this->count)
break;
} else
Coroutine::sleep(0.001);
if ($startTime + 3600 * 2 < time())
break;
}
fclose($fp);
if ($startTime + 3600 * 2 < time()) {
@unlink($this->targetName);
exec('rm -rf ' . $this->uploadTaskDirectory);
}
}
/**
* @return void
*/
private function multipleUpload(): void {
$this->uploadTaskDirectory = $this->uploadDirectory . '/' . md5($this->hash);
is_dir($this->uploadTaskDirectory) || $this->isFirst = @mkdir($this->uploadTaskDirectory, 0755, true);
$filename = $this->save($this->uploadTaskDirectory . '/' . $this->index . '-' . $this->name . '.tmp');
file_put_contents($filename . '.md5', md5_file($filename));
}
/**
* @return void
*/
private function singleUpload(): void {
$this->save($this->targetName);
}
/**
* @param string $fileName
* @return string
*/
private function save(string $fileName): string {
$this->file->moveTo($fileName);
return $fileName;
}
}
})();
原理也很简单,就是根据分片的信息来创建任务的目录,如果创建成功的话,就新建协程异步地开启文件锁合并分片,当最后一个分片完成后,释放锁,最后一个分片获得锁成功的话,就可以得知文件合并已经完成,响应给前端即可。从这个例子也可以看到,使用 pin 框架,只需要180行PHP代码、220行JS代码即可完成一个分片上传的功能。
评论