Files
TorrServerJellyfin/server/torr/torrent.go
2024-03-24 14:28:31 +01:00

361 lines
7.8 KiB
Go

package torr
import (
"errors"
"sort"
"sync"
"time"
utils2 "server/utils"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
"server/log"
"server/settings"
"server/torr/state"
cacheSt "server/torr/storage/state"
"server/torr/storage/torrstor"
"server/torr/utils"
)
type Torrent struct {
Title string
Category string
Poster string
Data string
*torrent.TorrentSpec
Stat state.TorrentStat
Timestamp int64
Size int64
*torrent.Torrent
muTorrent sync.Mutex
bt *BTServer
cache *torrstor.Cache
lastTimeSpeed time.Time
DownloadSpeed float64
UploadSpeed float64
BytesReadUsefulData int64
BytesWrittenData int64
PreloadSize int64
PreloadedBytes int64
DurationSeconds float64
BitRate string
expiredTime time.Time
closed <-chan struct{}
progressTicker *time.Ticker
}
func NewTorrent(spec *torrent.TorrentSpec, bt *BTServer) (*Torrent, error) {
// https://github.com/anacrolix/torrent/issues/747
if bt == nil || bt.client == nil {
return nil, errors.New("BT client not connected")
}
switch settings.BTsets.RetrackersMode {
case 1:
spec.Trackers = append(spec.Trackers, [][]string{utils.GetDefTrackers()}...)
case 2:
spec.Trackers = nil
case 3:
spec.Trackers = [][]string{utils.GetDefTrackers()}
}
trackers := utils.GetTrackerFromFile()
if len(trackers) > 0 {
spec.Trackers = append(spec.Trackers, [][]string{trackers}...)
}
goTorrent, _, err := bt.client.AddTorrentSpec(spec)
if err != nil {
return nil, err
}
bt.mu.Lock()
defer bt.mu.Unlock()
if tor, ok := bt.torrents[spec.InfoHash]; ok {
return tor, nil
}
torr := new(Torrent)
torr.Torrent = goTorrent
torr.Stat = state.TorrentAdded
torr.lastTimeSpeed = time.Now()
torr.bt = bt
torr.closed = goTorrent.Closed()
torr.TorrentSpec = spec
torr.AddExpiredTime(time.Minute)
torr.Timestamp = time.Now().Unix()
go torr.watch()
bt.torrents[spec.InfoHash] = torr
return torr, nil
}
func (t *Torrent) WaitInfo() bool {
if t.Torrent == nil {
return false
}
// Close torrent if not info while 5 minutes
tm := time.NewTimer(time.Minute * 5)
select {
case <-t.Torrent.GotInfo():
t.cache = t.bt.storage.GetCache(t.Hash())
t.cache.SetTorrent(t.Torrent)
return true
case <-t.closed:
return false
case <-tm.C:
return false
}
}
func (t *Torrent) GotInfo() bool {
// log.TLogln("GotInfo state:", t.Stat)
if t.Stat == state.TorrentClosed {
return false
}
// assume we have info in preload state
// and dont override with TorrentWorking
if t.Stat == state.TorrentPreload {
return true
}
t.Stat = state.TorrentGettingInfo
if t.WaitInfo() {
t.Stat = state.TorrentWorking
t.AddExpiredTime(time.Minute * 5)
return true
} else {
t.Close()
return false
}
}
func (t *Torrent) AddExpiredTime(duration time.Duration) {
t.expiredTime = time.Now().Add(duration)
}
func (t *Torrent) watch() {
t.progressTicker = time.NewTicker(time.Second)
defer t.progressTicker.Stop()
for {
select {
case <-t.progressTicker.C:
go t.progressEvent()
case <-t.closed:
return
}
}
}
func (t *Torrent) progressEvent() {
if t.expired() {
if t.TorrentSpec != nil {
log.TLogln("Torrent close by timeout", t.TorrentSpec.InfoHash.HexString())
}
t.bt.RemoveTorrent(t.Hash())
return
}
t.muTorrent.Lock()
if t.Torrent != nil && t.Torrent.Info() != nil {
st := t.Torrent.Stats()
deltaDlBytes := st.BytesRead.Int64() - t.BytesReadUsefulData
deltaUpBytes := st.BytesWritten.Int64() - t.BytesWrittenData
deltaTime := time.Since(t.lastTimeSpeed).Seconds()
t.DownloadSpeed = float64(deltaDlBytes) / deltaTime
t.UploadSpeed = float64(deltaUpBytes) / deltaTime
t.BytesReadUsefulData = st.BytesRead.Int64()
t.BytesWrittenData = st.BytesWritten.Int64()
if t.cache != nil {
t.PreloadedBytes = t.cache.GetState().Filled
}
} else {
t.DownloadSpeed = 0
t.UploadSpeed = 0
}
t.muTorrent.Unlock()
t.lastTimeSpeed = time.Now()
t.updateRA()
}
func (t *Torrent) updateRA() {
// t.muTorrent.Lock()
// defer t.muTorrent.Unlock()
// if t.Torrent != nil && t.Torrent.Info() != nil {
// pieceLen := t.Torrent.Info().PieceLength
// adj := pieceLen * int64(t.Torrent.Stats().ActivePeers) / int64(1+t.cache.Readers())
// switch {
// case adj < pieceLen:
// adj = pieceLen
// case adj > pieceLen*4:
// adj = pieceLen * 4
// }
// go t.cache.AdjustRA(adj)
// }
adj := int64(16 << 20) // 16 MB fixed RA
go t.cache.AdjustRA(adj)
}
func (t *Torrent) expired() bool {
return t.cache.Readers() == 0 && t.expiredTime.Before(time.Now()) && (t.Stat == state.TorrentWorking || t.Stat == state.TorrentClosed)
}
func (t *Torrent) Files() []*torrent.File {
if t.Torrent != nil && t.Torrent.Info() != nil {
files := t.Torrent.Files()
return files
}
return nil
}
func (t *Torrent) Hash() metainfo.Hash {
if t.Torrent != nil {
t.Torrent.InfoHash()
}
if t.TorrentSpec != nil {
return t.TorrentSpec.InfoHash
}
return [20]byte{}
}
func (t *Torrent) Length() int64 {
if t.Info() == nil {
return 0
}
return t.Torrent.Length()
}
func (t *Torrent) NewReader(file *torrent.File) *torrstor.Reader {
if t.Stat == state.TorrentClosed {
return nil
}
reader := t.cache.NewReader(file)
return reader
}
func (t *Torrent) CloseReader(reader *torrstor.Reader) {
t.cache.CloseReader(reader)
t.AddExpiredTime(time.Second * time.Duration(settings.BTsets.TorrentDisconnectTimeout))
}
func (t *Torrent) GetCache() *torrstor.Cache {
return t.cache
}
func (t *Torrent) drop() {
t.muTorrent.Lock()
defer t.muTorrent.Unlock()
if t.Torrent != nil {
t.Torrent.Drop()
t.Torrent = nil
}
}
func (t *Torrent) Close() bool {
if t.cache != nil && t.cache.GetUseReaders() > 0 {
return false
}
t.Stat = state.TorrentClosed
t.bt.mu.Lock()
delete(t.bt.torrents, t.Hash())
t.bt.mu.Unlock()
t.drop()
return true
}
func (t *Torrent) Status() *state.TorrentStatus {
t.muTorrent.Lock()
defer t.muTorrent.Unlock()
st := new(state.TorrentStatus)
st.Stat = t.Stat
st.StatString = t.Stat.String()
st.Title = t.Title
st.Category = t.Category
st.Poster = t.Poster
st.Data = t.Data
st.Timestamp = t.Timestamp
st.TorrentSize = t.Size
st.BitRate = t.BitRate
st.DurationSeconds = t.DurationSeconds
if t.TorrentSpec != nil {
st.Hash = t.TorrentSpec.InfoHash.HexString()
}
if t.Torrent != nil {
st.Name = t.Torrent.Name()
st.Hash = t.Torrent.InfoHash().HexString()
st.LoadedSize = t.Torrent.BytesCompleted()
st.PreloadedBytes = t.PreloadedBytes
st.PreloadSize = t.PreloadSize
st.DownloadSpeed = t.DownloadSpeed
st.UploadSpeed = t.UploadSpeed
tst := t.Torrent.Stats()
st.BytesWritten = tst.BytesWritten.Int64()
st.BytesWrittenData = tst.BytesWrittenData.Int64()
st.BytesRead = tst.BytesRead.Int64()
st.BytesReadData = tst.BytesReadData.Int64()
st.BytesReadUsefulData = tst.BytesReadUsefulData.Int64()
st.ChunksWritten = tst.ChunksWritten.Int64()
st.ChunksRead = tst.ChunksRead.Int64()
st.ChunksReadUseful = tst.ChunksReadUseful.Int64()
st.ChunksReadWasted = tst.ChunksReadWasted.Int64()
st.PiecesDirtiedGood = tst.PiecesDirtiedGood.Int64()
st.PiecesDirtiedBad = tst.PiecesDirtiedBad.Int64()
st.TotalPeers = tst.TotalPeers
st.PendingPeers = tst.PendingPeers
st.ActivePeers = tst.ActivePeers
st.ConnectedSeeders = tst.ConnectedSeeders
st.HalfOpenPeers = tst.HalfOpenPeers
if t.Torrent.Info() != nil {
st.TorrentSize = t.Torrent.Length()
files := t.Files()
sort.Slice(files, func(i, j int) bool {
return utils2.CompareStrings(files[i].Path(), files[j].Path())
})
for i, f := range files {
st.FileStats = append(st.FileStats, &state.TorrentFileStat{
Id: i + 1, // in web id 0 is undefined
Path: f.Path(),
Length: f.Length(),
})
}
}
}
return st
}
func (t *Torrent) CacheState() *cacheSt.CacheState {
if t.Torrent != nil && t.cache != nil {
st := t.cache.GetState()
st.Torrent = t.Status()
return st
}
return nil
}