From 35f37622eddfc73ba8ec9618d08d2a0213331400 Mon Sep 17 00:00:00 2001 From: YouROK <8yourok8@mail.ru> Date: Thu, 3 Jun 2021 16:57:35 +0300 Subject: [PATCH] fix bug and add readahead, need fix bug with end preload --- server/torr/preload.go | 131 ++++++++++++++--------------------------- 1 file changed, 44 insertions(+), 87 deletions(-) diff --git a/server/torr/preload.go b/server/torr/preload.go index e671cc9..d2f3fd4 100644 --- a/server/torr/preload.go +++ b/server/torr/preload.go @@ -3,13 +3,14 @@ package torr import ( "fmt" "io" + "sync" "time" "github.com/anacrolix/torrent" "server/log" + "server/settings" "server/torr/state" - "server/torr/storage/torrstor" utils2 "server/utils" ) @@ -57,6 +58,7 @@ func (t *Torrent) Preload(index int, size int64) { for t.Stat == state.TorrentPreload { stat := fmt.Sprint(file.Torrent().InfoHash().HexString(), " ", utils2.Format(float64(t.PreloadedBytes)), "/", utils2.Format(float64(t.PreloadSize)), " Speed:", utils2.Format(t.DownloadSpeed), " Peers:[", t.Torrent.Stats().ConnectedSeeders, "]", t.Torrent.Stats().ActivePeers, "/", t.Torrent.Stats().TotalPeers) log.TLogln("Preload:", stat) + t.AddExpiredTime(time.Second * time.Duration(settings.BTsets.TorrentDisconnectTimeout)) time.Sleep(time.Second) } }() @@ -67,42 +69,54 @@ func (t *Torrent) Preload(index int, size int64) { defer readerStart.Close() readerStart.SetResponsive() readerStart.SetReadahead(0) - readerStartEnd := file.Offset() + size - mb5 + readerStartEnd := size - mb5 - if readerStartEnd < file.Offset() { + if readerStartEnd < 0 { // Если конец начального ридера оказался за началом - readerStartEnd = file.Offset() + size + readerStartEnd = size } - if readerStartEnd > file.Offset()+file.Length() { + if readerStartEnd > file.Length() { // Если конец начального ридера оказался после конца файла - readerStartEnd = file.Offset() + file.Length() + readerStartEnd = file.Length() } - readerEndStart := file.Offset() + file.Length() - mb5 - readerEndEnd := file.Offset() + file.Length() + readerEndStart := file.Length() - mb5 + readerEndEnd := file.Length() - tmp := make([]byte, 32768, 32768) offset := int64(0) - if readerEndStart > readerStartEnd { - // Если конечный ридер не входит в диапозон начального - readerEnd := file.NewReader() - readerEnd.SetResponsive() - readerEnd.SetReadahead(0) - readerEnd.Seek(readerEndStart, io.SeekStart) - offset = readerEndStart - for offset+int64(len(tmp)) < readerEndEnd { - n, err := readerEnd.Read(tmp) - if err != nil { - log.TLogln("Error preload:", err) - readerEnd.Close() - return + var wa sync.WaitGroup + go func() { + if readerEndStart > readerStartEnd { + // Если конечный ридер не входит в диапозон начального + wa.Add(1) + defer wa.Done() + readerEnd := file.NewReader() + readerEnd.SetResponsive() + readerEnd.SetReadahead(0) + readerEnd.Seek(readerEndStart, io.SeekStart) + offset = readerEndStart + tmp := make([]byte, 32768, 32768) + for offset+int64(len(tmp)) < readerEndEnd { + n, err := readerEnd.Read(tmp) + if err != nil { + log.TLogln("Error preload:", err) + readerEnd.Close() + return + } + offset += int64(n) } - offset += int64(n) + readerEnd.Close() } - readerEnd.Close() - } + }() + pieceLength := t.Info().PieceLength + readahead := pieceLength * 4 + if readerStartEnd < readahead { + readahead = 0 + } + readerStart.SetReadahead(readahead) offset = 0 + tmp := make([]byte, 32768, 32768) for offset+int64(len(tmp)) < readerStartEnd { n, err := readerStart.Read(tmp) if err != nil { @@ -110,74 +124,17 @@ func (t *Torrent) Preload(index int, size int64) { return } offset += int64(n) + if readahead > 0 && readerStartEnd-(offset+int64(len(tmp))) < readahead { + readahead = 0 + readerStart.SetReadahead(0) + } } - /*pieceLength := t.Info().PieceLength - mb5 := int64(5 * 1024 * 1024) - - pieceFileStart := int(file.Offset() / pieceLength) - pieceFileStartEnd := int((file.Offset()+size-mb5)/pieceLength) - 1 - if pieceFileStartEnd < pieceFileStart { - pieceFileStartEnd = pieceFileStart - } - - pieceFileEnd := int((file.Offset() + file.Length() - mb5) / pieceLength) - pieceFileEndEnd := int((file.Offset() + file.Length()) / pieceLength) - if file.Length() < mb5 { - pieceFileStartEnd = pieceFileEndEnd - pieceFileEnd = -1 - pieceFileEndEnd = -1 - } - - lastStat := time.Now().Add(-time.Second) - - for true { - t.muTorrent.Lock() - if t.Torrent == nil { - return - } - - t.PreloadedBytes = t.cache.GetState().Filled - t.muTorrent.Unlock() - - stat := fmt.Sprint(file.Torrent().InfoHash().HexString(), " ", utils2.Format(float64(t.PreloadedBytes)), "/", utils2.Format(float64(t.PreloadSize)), " Speed:", utils2.Format(t.DownloadSpeed), " Peers:[", t.Torrent.Stats().ConnectedSeeders, "]", t.Torrent.Stats().ActivePeers, "/", t.Torrent.Stats().TotalPeers) - if time.Since(lastStat) > time.Second { - log.TLogln("Preload:", stat) - lastStat = time.Now() - } - - beginLoadingPieces := t.piecesLoading(pieceFileStart, pieceFileStartEnd) - endLoadingPieces := t.piecesLoading(pieceFileEnd, pieceFileEndEnd) - - if beginLoadingPieces == 0 && endLoadingPieces == 0 { - break - } - - t.AddExpiredTime(time.Second * time.Duration(settings.BTsets.TorrentDisconnectTimeout)) - time.Sleep(time.Second) - }*/ + wa.Wait() } log.TLogln("End preload:", file.Torrent().InfoHash().HexString(), "Peers:[", t.Torrent.Stats().ConnectedSeeders, "]", t.Torrent.Stats().ActivePeers, "/", t.Torrent.Stats().TotalPeers) } -func (t *Torrent) piecesLoading(start, end int) int { - count := 0 - if start < 0 || end < 0 { - return 0 - } - limitLoading := 5 - for i := start; i <= end; i++ { - if !t.Piece(i).Storage().PieceImpl.(*torrstor.Piece).Complete { - count++ - if limitLoading > 0 && t.PieceState(i).Priority == torrent.PiecePriorityNone { - t.Piece(i).SetPriority(torrent.PiecePriorityNormal) - } - limitLoading-- - } - } - return count -} - func (t *Torrent) findFileIndex(index int) *torrent.File { st := t.Status() var stFile *state.TorrentFileStat