This commit is contained in:
yourok
2018-08-29 12:33:14 +03:00
commit 0ca43a2c4d
54 changed files with 5669 additions and 0 deletions

182
src/server/torr/BTServer.go Normal file
View File

@@ -0,0 +1,182 @@
package torr
import (
"fmt"
"io"
"path/filepath"
"sync"
"server/settings"
"server/torr/storage"
"server/torr/storage/memcache"
"server/torr/storage/state"
"server/utils"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/iplist"
"github.com/anacrolix/torrent/metainfo"
)
type BTServer struct {
config *torrent.ClientConfig
client *torrent.Client
storage storage.Storage
torrents map[metainfo.Hash]*Torrent
mu sync.Mutex
wmu sync.Mutex
watching bool
}
func NewBTS() *BTServer {
bts := new(BTServer)
bts.torrents = make(map[metainfo.Hash]*Torrent)
return bts
}
func (bt *BTServer) Connect() error {
bt.mu.Lock()
defer bt.mu.Unlock()
var err error
bt.configure()
bt.client, err = torrent.NewClient(bt.config)
bt.torrents = make(map[metainfo.Hash]*Torrent)
return err
}
func (bt *BTServer) Disconnect() {
bt.mu.Lock()
defer bt.mu.Unlock()
if bt.client != nil {
bt.client.Close()
bt.client = nil
utils.FreeOSMemGC()
}
}
func (bt *BTServer) Reconnect() error {
bt.Disconnect()
return bt.Connect()
}
func (bt *BTServer) configure() {
bt.storage = memcache.NewStorage(settings.Get().CacheSize)
blocklist, _ := iplist.MMapPackedFile(filepath.Join(settings.Path, "blocklist"))
userAgent := "uTorrent/3.4.9"
peerID := "-UT3490-"
bt.config = torrent.NewDefaultClientConfig()
bt.config.DisableIPv6 = true
bt.config.DisableTCP = settings.Get().DisableTCP
bt.config.DisableUTP = settings.Get().DisableUTP
bt.config.NoDefaultPortForwarding = settings.Get().DisableUPNP
bt.config.NoDHT = settings.Get().DisableDHT
bt.config.NoUpload = settings.Get().DisableUpload
bt.config.EncryptionPolicy = torrent.EncryptionPolicy{
DisableEncryption: settings.Get().Encryption == 1,
ForceEncryption: settings.Get().Encryption == 2,
}
bt.config.IPBlocklist = blocklist
bt.config.DefaultStorage = bt.storage
bt.config.Bep20 = peerID
bt.config.PeerID = utils.PeerIDRandom(peerID)
bt.config.HTTPUserAgent = userAgent
bt.config.EstablishedConnsPerTorrent = settings.Get().ConnectionsLimit
bt.config.TorrentPeersHighWater = 3000
bt.config.HalfOpenConnsPerTorrent = 50
if settings.Get().DownloadRateLimit > 0 {
bt.config.DownloadRateLimiter = utils.Limit(settings.Get().DownloadRateLimit * 1024)
}
if settings.Get().UploadRateLimit > 0 {
bt.config.UploadRateLimiter = utils.Limit(settings.Get().UploadRateLimit * 1024)
}
//bt.config.Debug = true
fmt.Println("Configure client:", settings.Get())
}
func (bt *BTServer) AddTorrent(magnet metainfo.Magnet, onAdd func(*Torrent)) (*Torrent, error) {
torr, err := NewTorrent(magnet, bt)
if err != nil {
return nil, err
}
if onAdd != nil {
go func() {
if torr.GotInfo() {
onAdd(torr)
}
}()
} else {
go torr.GotInfo()
}
return torr, nil
}
func (bt *BTServer) List() []*Torrent {
bt.mu.Lock()
defer bt.mu.Unlock()
list := make([]*Torrent, 0)
for _, t := range bt.torrents {
list = append(list, t)
}
return list
}
func (bt *BTServer) GetTorrent(hash metainfo.Hash) *Torrent {
bt.mu.Lock()
defer bt.mu.Unlock()
if t, ok := bt.torrents[hash]; ok {
return t
}
return nil
}
func (bt *BTServer) RemoveTorrent(hash torrent.InfoHash) {
if torr, ok := bt.torrents[hash]; ok {
torr.Close()
}
}
func (bt *BTServer) BTState() *BTState {
bt.mu.Lock()
defer bt.mu.Unlock()
btState := new(BTState)
btState.LocalPort = bt.client.LocalPort()
btState.PeerID = fmt.Sprintf("%x", bt.client.PeerID())
btState.BannedIPs = len(bt.client.BadPeerIPs())
for _, dht := range bt.client.DhtServers() {
btState.DHTs = append(btState.DHTs, dht)
}
for _, t := range bt.torrents {
btState.Torrents = append(btState.Torrents, t)
}
return btState
}
func (bt *BTServer) CacheState(hash metainfo.Hash) *state.CacheState {
st := bt.GetTorrent(hash)
if st == nil {
return nil
}
cacheState := bt.storage.GetStats(hash)
return cacheState
}
func (bt *BTServer) WriteState(w io.Writer) {
bt.client.WriteStatus(w)
}

51
src/server/torr/Play.go Normal file
View File

@@ -0,0 +1,51 @@
package torr
import (
"fmt"
"net/http"
"time"
"server/settings"
"server/utils"
"github.com/anacrolix/missinggo/httptoo"
"github.com/anacrolix/torrent"
"github.com/labstack/echo"
)
func (bt *BTServer) View(torr *Torrent, file *torrent.File, c echo.Context) error {
go settings.SetViewed(torr.Hash().HexString(), file.Path())
reader := torr.NewReader(file, 0)
fmt.Println("Connect reader:", len(torr.readers))
c.Response().Header().Set("Connection", "close")
c.Response().Header().Set("ETag", httptoo.EncodeQuotedString(fmt.Sprintf("%s/%s", torr.Hash().HexString(), file.Path())))
http.ServeContent(c.Response(), c.Request(), file.Path(), time.Time{}, reader)
fmt.Println("Disconnect reader:", len(torr.readers))
torr.CloseReader(reader)
return c.NoContent(http.StatusOK)
}
func (bt *BTServer) Play(torr *Torrent, file *torrent.File, preload int64, c echo.Context) error {
if torr.status == TorrentAdded {
if !torr.GotInfo() {
return echo.NewHTTPError(http.StatusBadRequest, "torrent closed befor get info")
}
}
if torr.status == TorrentGettingInfo {
if !torr.WaitInfo() {
return echo.NewHTTPError(http.StatusBadRequest, "torrent closed befor get info")
}
}
if torr.PreloadedBytes == 0 {
torr.Preload(file, preload)
}
redirectUrl := c.Scheme() + "://" + c.Request().Host + "/torrent/view/" + torr.Hash().HexString() + "/" + utils.CleanFName(file.Path())
return c.Redirect(http.StatusFound, redirectUrl)
//return bt.View(torr, file, c)
}

57
src/server/torr/State.go Normal file
View File

@@ -0,0 +1,57 @@
package torr
import (
"github.com/anacrolix/dht"
)
type BTState struct {
LocalPort int
PeerID string
BannedIPs int
DHTs []*dht.Server
Torrents []*Torrent
}
type TorrentStats struct {
Name string
Hash string
TorrentStatus TorrentStatus
TorrentStatusString string
LoadedSize int64
TorrentSize int64
PreloadedBytes int64
PreloadSize int64
DownloadSpeed float64
UploadSpeed float64
TotalPeers int
PendingPeers int
ActivePeers int
ConnectedSeeders int
HalfOpenPeers int
BytesWritten int64
BytesWrittenData int64
BytesRead int64
BytesReadData int64
BytesReadUsefulData int64
ChunksWritten int64
ChunksRead int64
ChunksReadUseful int64
ChunksReadWasted int64
PiecesDirtiedGood int64
PiecesDirtiedBad int64
FileStats []TorrentFileStat
}
type TorrentFileStat struct {
Id int
Path string
Length int64
}

400
src/server/torr/Torrent.go Normal file
View File

@@ -0,0 +1,400 @@
package torr
import (
"fmt"
"io"
"sort"
"sync"
"time"
"server/settings"
"server/utils"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
"github.com/labstack/gommon/bytes"
)
type TorrentStatus int
func (t TorrentStatus) String() string {
switch t {
case TorrentAdded:
return "Torrent added"
case TorrentGettingInfo:
return "Torrent getting info"
case TorrentPreload:
return "Torrent preload"
case TorrentWorking:
return "Torrent working"
case TorrentClosed:
return "Torrent closed"
default:
return "Torrent unknown status"
}
}
const (
TorrentAdded = TorrentStatus(iota)
TorrentGettingInfo
TorrentPreload
TorrentWorking
TorrentClosed
)
type Torrent struct {
*torrent.Torrent
status TorrentStatus
readers map[torrent.Reader]struct{}
muTorrent sync.Mutex
muReader sync.Mutex
bt *BTServer
lastTimeSpeed time.Time
DownloadSpeed float64
UploadSpeed float64
BytesReadUsefulData int64
BytesWrittenData int64
PreloadSize int64
PreloadedBytes int64
hash metainfo.Hash
expiredTime time.Time
closed <-chan struct{}
progressTicker *time.Ticker
}
func NewTorrent(magnet metainfo.Magnet, bt *BTServer) (*Torrent, error) {
switch settings.Get().RetrackersMode {
case 1:
magnet.Trackers = append(magnet.Trackers, utils.GetDefTrackers()...)
case 2:
magnet.Trackers = nil
case 3:
magnet.Trackers = utils.GetDefTrackers()
}
goTorrent, _, err := bt.client.AddTorrentSpec(&torrent.TorrentSpec{
Trackers: [][]string{magnet.Trackers},
DisplayName: magnet.DisplayName,
InfoHash: magnet.InfoHash,
})
if err != nil {
return nil, err
}
bt.mu.Lock()
defer bt.mu.Unlock()
if tor, ok := bt.torrents[magnet.InfoHash]; ok {
return tor, nil
}
torr := new(Torrent)
torr.Torrent = goTorrent
torr.status = TorrentAdded
torr.lastTimeSpeed = time.Now()
torr.bt = bt
torr.readers = make(map[torrent.Reader]struct{})
torr.hash = magnet.InfoHash
torr.closed = goTorrent.Closed()
go torr.watch()
bt.torrents[magnet.InfoHash] = torr
return torr, nil
}
func (t *Torrent) WaitInfo() bool {
if t.Torrent == nil {
return false
}
select {
case <-t.Torrent.GotInfo():
return true
case <-t.closed:
return false
}
}
func (t *Torrent) GotInfo() bool {
if t.status == TorrentClosed {
return false
}
t.status = TorrentGettingInfo
if t.WaitInfo() {
t.status = TorrentWorking
t.expiredTime = time.Now().Add(time.Minute * 5)
return true
} else {
t.Close()
return false
}
}
func (t *Torrent) watch() {
t.progressTicker = time.NewTicker(time.Second)
defer t.progressTicker.Stop()
for {
select {
case <-t.progressTicker.C:
go t.progressEvent()
case <-t.closed:
t.Close()
return
}
}
}
func (t *Torrent) progressEvent() {
if t.expired() {
t.drop()
return
}
t.muTorrent.Lock()
if t.Torrent != nil && t.Torrent.Info() != nil {
st := t.Torrent.Stats()
deltaDlBytes := st.BytesReadUsefulData.Int64() - t.BytesReadUsefulData
deltaUpBytes := st.BytesWrittenData.Int64() - t.BytesWrittenData
deltaTime := time.Since(t.lastTimeSpeed).Seconds()
t.DownloadSpeed = float64(deltaDlBytes) / deltaTime
t.UploadSpeed = float64(deltaUpBytes) / deltaTime
t.BytesWrittenData = st.BytesWrittenData.Int64()
t.BytesReadUsefulData = st.BytesReadUsefulData.Int64()
} else {
t.DownloadSpeed = 0
t.UploadSpeed = 0
}
t.muTorrent.Unlock()
t.lastTimeSpeed = time.Now()
}
func (t *Torrent) expired() bool {
return len(t.readers) == 0 && t.expiredTime.Before(time.Now()) && (t.status == TorrentWorking || t.status == TorrentClosed)
}
func (t *Torrent) Files() []*torrent.File {
if t.Torrent != nil && t.Torrent.Info() != nil {
files := t.Torrent.Files()
return files
}
return nil
}
func (t *Torrent) Hash() metainfo.Hash {
return t.hash
}
func (t *Torrent) Status() TorrentStatus {
return t.status
}
func (t *Torrent) Length() int64 {
if t.Info() == nil {
return 0
}
return t.Torrent.Length()
}
func (t *Torrent) NewReader(file *torrent.File, readahead int64) torrent.Reader {
t.muReader.Lock()
if t.status == TorrentClosed {
return nil
}
defer t.muReader.Unlock()
reader := file.NewReader()
if readahead <= 0 {
readahead = utils.GetReadahead()
}
reader.SetReadahead(readahead)
t.readers[reader] = struct{}{}
return reader
}
func (t *Torrent) CloseReader(reader torrent.Reader) {
t.muReader.Lock()
reader.Close()
delete(t.readers, reader)
t.expiredTime = time.Now().Add(time.Second * 5)
t.muReader.Unlock()
}
func (t *Torrent) Preload(file *torrent.File, size int64) {
if size < 0 {
return
}
if t.status == TorrentGettingInfo {
t.WaitInfo()
}
t.muTorrent.Lock()
if t.status != TorrentWorking {
t.muTorrent.Unlock()
return
}
if size == 0 {
size = settings.Get().PreloadBufferSize
}
if size == 0 {
t.muTorrent.Unlock()
return
}
t.status = TorrentPreload
t.muTorrent.Unlock()
defer func() {
if t.status == TorrentPreload {
t.status = TorrentWorking
}
}()
buff5mb := int64(5 * 1024 * 1024)
startPreloadLength := size
endPreloadOffset := int64(0)
if startPreloadLength > buff5mb {
endPreloadOffset = file.Offset() + file.Length() - buff5mb
}
readerPre := t.NewReader(file, startPreloadLength)
if readerPre == nil {
return
}
defer func() {
t.CloseReader(readerPre)
t.expiredTime = time.Now().Add(time.Minute * 1)
}()
if endPreloadOffset > 0 {
readerPost := t.NewReader(file, 1)
if readerPre == nil {
return
}
readerPost.Seek(endPreloadOffset, io.SeekStart)
readerPost.SetReadahead(buff5mb)
defer func() {
t.CloseReader(readerPost)
t.expiredTime = time.Now().Add(time.Minute * 1)
}()
}
if size > file.Length() {
size = file.Length()
}
t.PreloadSize = size
var lastSize int64 = 0
errCount := 0
for t.status == TorrentPreload {
t.expiredTime = time.Now().Add(time.Minute * 1)
t.PreloadedBytes = t.Torrent.BytesCompleted()
fmt.Println("Preload:", file.Torrent().InfoHash().HexString(), bytes.Format(t.PreloadedBytes), "/", bytes.Format(t.PreloadSize), "Speed:", utils.Format(t.DownloadSpeed), "Peers:[", t.Torrent.Stats().ConnectedSeeders, "]", t.Torrent.Stats().ActivePeers, "/", t.Torrent.Stats().TotalPeers)
if t.PreloadedBytes >= t.PreloadSize {
return
}
if lastSize == t.PreloadedBytes {
errCount++
} else {
lastSize = t.PreloadedBytes
errCount = 0
}
if errCount > 120 {
return
}
time.Sleep(time.Second)
}
}
func (t *Torrent) drop() {
t.muTorrent.Lock()
if t.Torrent != nil {
t.Torrent.Drop()
t.Torrent = nil
}
t.muTorrent.Unlock()
}
func (t *Torrent) Close() {
t.status = TorrentClosed
t.bt.mu.Lock()
defer t.bt.mu.Unlock()
t.muReader.Lock()
defer t.muReader.Unlock()
for r := range t.readers {
r.Close()
}
if _, ok := t.bt.torrents[t.hash]; ok {
delete(t.bt.torrents, t.hash)
}
t.drop()
}
func (t *Torrent) Stats() TorrentStats {
t.muTorrent.Lock()
defer t.muTorrent.Unlock()
st := TorrentStats{}
st.Name = t.Name()
st.Hash = t.hash.HexString()
st.TorrentStatus = t.status
st.TorrentStatusString = t.status.String()
if t.Torrent != nil {
st.LoadedSize = t.Torrent.BytesCompleted()
st.TorrentSize = t.Length()
st.PreloadedBytes = t.PreloadedBytes
st.PreloadSize = t.PreloadSize
st.DownloadSpeed = t.DownloadSpeed
st.UploadSpeed = t.UploadSpeed
tst := t.Torrent.Stats()
st.BytesWritten = tst.BytesWritten.Int64()
st.BytesWrittenData = tst.BytesWrittenData.Int64()
st.BytesRead = tst.BytesRead.Int64()
st.BytesReadData = tst.BytesReadData.Int64()
st.BytesReadUsefulData = tst.BytesReadUsefulData.Int64()
st.ChunksWritten = tst.ChunksWritten.Int64()
st.ChunksRead = tst.ChunksRead.Int64()
st.ChunksReadUseful = tst.ChunksReadUseful.Int64()
st.ChunksReadWasted = tst.ChunksReadWasted.Int64()
st.PiecesDirtiedGood = tst.PiecesDirtiedGood.Int64()
st.PiecesDirtiedBad = tst.PiecesDirtiedBad.Int64()
st.TotalPeers = tst.TotalPeers
st.PendingPeers = tst.PendingPeers
st.ActivePeers = tst.ActivePeers
st.ConnectedSeeders = tst.ConnectedSeeders
st.HalfOpenPeers = tst.HalfOpenPeers
for i, f := range t.Files() {
st.FileStats = append(st.FileStats, TorrentFileStat{
Id: i,
Path: f.Path(),
Length: f.Length(),
})
}
sort.Slice(st.FileStats, func(i, j int) bool {
return st.FileStats[i].Path < st.FileStats[j].Path
})
}
return st
}

View File

@@ -0,0 +1,15 @@
package storage
import (
"server/torr/storage/state"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
)
type Storage interface {
storage.ClientImpl
GetStats(hash metainfo.Hash) *state.CacheState
CloseHash(hash metainfo.Hash)
}

View File

@@ -0,0 +1 @@
package filecache

View File

@@ -0,0 +1,69 @@
package filecache
import (
"path/filepath"
"sync"
"server/settings"
"server/torr/storage"
"server/torr/storage/state"
"github.com/anacrolix/missinggo/filecache"
"github.com/anacrolix/torrent/metainfo"
storage2 "github.com/anacrolix/torrent/storage"
)
type Storage struct {
storage.Storage
caches map[metainfo.Hash]*filecache.Cache
capacity int64
mu sync.Mutex
}
func NewStorage(capacity int64) storage.Storage {
stor := new(Storage)
stor.capacity = capacity
stor.caches = make(map[metainfo.Hash]*filecache.Cache)
return stor
}
func (s *Storage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage2.TorrentImpl, error) {
s.mu.Lock()
defer s.mu.Unlock()
path := filepath.Join(settings.Path, "cache", infoHash.String())
cache, err := filecache.NewCache(path)
if err != nil {
return nil, err
}
cache.SetCapacity(s.capacity)
s.caches[infoHash] = cache
return storage2.NewResourcePieces(cache.AsResourceProvider()).OpenTorrent(info, infoHash)
}
func (s *Storage) GetStats(hash metainfo.Hash) *state.CacheState {
s.mu.Lock()
defer s.mu.Unlock()
return nil
}
func (s *Storage) Clean() {
s.mu.Lock()
defer s.mu.Unlock()
}
func (s *Storage) CloseHash(hash metainfo.Hash) {
if s.caches == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
}
func (s *Storage) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
return nil
}

View File

@@ -0,0 +1,86 @@
package memcache
import (
"fmt"
"sync"
"server/utils"
)
type buffer struct {
pieceId int
buf []byte
used bool
}
type BufferPool struct {
buffs map[int]*buffer
frees int
size int64
mu sync.Mutex
}
func NewBufferPool(bufferLength int64, capacity int64) *BufferPool {
bp := new(BufferPool)
buffsSize := int(capacity/bufferLength) + 3
bp.frees = buffsSize
bp.size = bufferLength
return bp
}
func (b *BufferPool) mkBuffs() {
if b.buffs != nil {
return
}
b.buffs = make(map[int]*buffer, b.frees)
fmt.Println("Create", b.frees, "buffers")
for i := 0; i < b.frees; i++ {
buf := buffer{
-1,
make([]byte, b.size),
false,
}
b.buffs[i] = &buf
}
}
func (b *BufferPool) GetBuffer(p *Piece) (buff []byte, index int) {
b.mu.Lock()
defer b.mu.Unlock()
b.mkBuffs()
for id, buf := range b.buffs {
if !buf.used {
buf.used = true
buf.pieceId = p.Id
buff = buf.buf
index = id
b.frees--
//fmt.Printf("Get buffer: %v %v %v %p\n", id, p.Id, b.frees, buff)
return
}
}
fmt.Println("Create slow buffer")
return make([]byte, b.size), -1
}
func (b *BufferPool) ReleaseBuffer(index int) {
if index == -1 {
utils.FreeOSMem()
return
}
b.mu.Lock()
defer b.mu.Unlock()
b.mkBuffs()
if buff, ok := b.buffs[index]; ok {
buff.used = false
buff.pieceId = -1
b.frees++
//fmt.Println("Release buffer:", index, b.frees)
} else {
utils.FreeOSMem()
}
}
func (b *BufferPool) Len() int {
return b.frees
}

View File

@@ -0,0 +1,196 @@
package memcache
import (
"fmt"
"sort"
"sync"
"server/torr/storage/state"
"server/utils"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
)
type Cache struct {
storage.TorrentImpl
s *Storage
capacity int64
filled int64
hash metainfo.Hash
pieceLength int64
pieceCount int
piecesBuff int
muPiece sync.Mutex
muRemove sync.Mutex
isRemove bool
pieces map[int]*Piece
bufferPull *BufferPool
prcLoaded int
}
func NewCache(capacity int64, storage *Storage) *Cache {
ret := &Cache{
capacity: capacity,
filled: 0,
pieces: make(map[int]*Piece),
s: storage,
}
return ret
}
func (c *Cache) Init(info *metainfo.Info, hash metainfo.Hash) {
fmt.Println("Create cache for:", info.Name)
//Min capacity of 2 pieces length
cap := info.PieceLength * 2
if c.capacity < cap {
c.capacity = cap
}
c.pieceLength = info.PieceLength
c.pieceCount = info.NumPieces()
c.piecesBuff = int(c.capacity / c.pieceLength)
c.hash = hash
c.bufferPull = NewBufferPool(c.pieceLength, c.capacity)
for i := 0; i < c.pieceCount; i++ {
c.pieces[i] = &Piece{
Id: i,
Length: info.Piece(i).Length(),
Hash: info.Piece(i).Hash().HexString(),
cache: c,
}
}
}
func (c *Cache) Piece(m metainfo.Piece) storage.PieceImpl {
c.muPiece.Lock()
defer c.muPiece.Unlock()
if val, ok := c.pieces[m.Index()]; ok {
return val
}
return nil
}
func (c *Cache) Close() error {
c.isRemove = false
fmt.Println("Close cache for:", c.hash)
if _, ok := c.s.caches[c.hash]; ok {
delete(c.s.caches, c.hash)
}
c.pieces = nil
c.bufferPull = nil
utils.FreeOSMemGC()
return nil
}
func (c *Cache) GetState() state.CacheState {
cState := state.CacheState{}
cState.Capacity = c.capacity
cState.PiecesLength = c.pieceLength
cState.PiecesCount = c.pieceCount
cState.Hash = c.hash.HexString()
stats := make(map[int]state.ItemState, 0)
c.muPiece.Lock()
var fill int64 = 0
for _, value := range c.pieces {
stat := value.Stat()
if stat.BufferSize > 0 {
fill += stat.BufferSize
stats[stat.Id] = stat
}
}
c.filled = fill
c.muPiece.Unlock()
cState.Filled = c.filled
cState.Pieces = stats
return cState
}
func (c *Cache) cleanPieces() {
if c.isRemove {
return
}
c.muRemove.Lock()
if c.isRemove {
c.muRemove.Unlock()
return
}
c.isRemove = true
defer func() { c.isRemove = false }()
c.muRemove.Unlock()
remPieces := c.getRemPieces()
if len(remPieces) > 0 && (c.capacity < c.filled || c.bufferPull.Len() <= 1) {
remCount := int((c.filled - c.capacity) / c.pieceLength)
if remCount < 1 {
remCount = 1
}
if remCount > len(remPieces) {
remCount = len(remPieces)
}
remPieces = remPieces[:remCount]
for _, p := range remPieces {
c.removePiece(p)
}
}
}
func (c *Cache) getRemPieces() []*Piece {
pieces := make([]*Piece, 0)
fill := int64(0)
loading := 0
used := make(map[int]struct{})
for _, b := range c.bufferPull.buffs {
if b.used {
used[b.pieceId] = struct{}{}
}
}
for u := range used {
v := c.pieces[u]
if v.Size > 0 {
if v.Id > 0 {
pieces = append(pieces, v)
}
fill += v.Size
if !v.complete {
loading++
}
}
}
c.filled = fill
sort.Slice(pieces, func(i, j int) bool {
return pieces[i].accessed.Before(pieces[j].accessed)
})
c.prcLoaded = prc(c.piecesBuff-loading, c.piecesBuff)
return pieces
}
func (c *Cache) removePiece(piece *Piece) {
c.muPiece.Lock()
defer c.muPiece.Unlock()
piece.Release()
st := fmt.Sprintf("%v%% %v\t%s\t%s", c.prcLoaded, piece.Id, piece.accessed.Format("15:04:05.000"), piece.Hash)
if c.prcLoaded >= 95 {
fmt.Println("Clean memory GC:", st)
utils.FreeOSMemGC()
} else {
fmt.Println("Clean memory:", st)
utils.FreeOSMem()
}
}
func prc(val, of int) int {
return int(float64(val) * 100.0 / float64(of))
}

View File

@@ -0,0 +1,115 @@
package memcache
import (
"errors"
"io"
"sync"
"time"
"server/torr/storage/state"
"github.com/anacrolix/torrent/storage"
)
type Piece struct {
storage.PieceImpl
Id int
Hash string
Length int64
Size int64
complete bool
readed bool
accessed time.Time
buffer []byte
bufIndex int
mu sync.RWMutex
cache *Cache
}
func (p *Piece) WriteAt(b []byte, off int64) (n int, err error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.buffer == nil {
go p.cache.cleanPieces()
p.buffer, p.bufIndex = p.cache.bufferPull.GetBuffer(p)
if p.buffer == nil {
return 0, errors.New("Can't get buffer write")
}
}
n = copy(p.buffer[off:], b[:])
p.Size += int64(n)
p.accessed = time.Now()
return
}
func (p *Piece) ReadAt(b []byte, off int64) (n int, err error) {
p.mu.RLock()
defer p.mu.RUnlock()
size := len(b)
if size+int(off) > len(p.buffer) {
size = len(p.buffer) - int(off)
if size < 0 {
size = 0
}
}
if len(p.buffer) < int(off) || len(p.buffer) < int(off)+size {
return 0, io.ErrUnexpectedEOF
}
n = copy(b, p.buffer[int(off) : int(off)+size][:])
p.accessed = time.Now()
if int(off)+size >= len(p.buffer) {
p.readed = true
}
if int64(len(b))+off >= p.Size {
go p.cache.cleanPieces()
}
return n, nil
}
func (p *Piece) MarkComplete() error {
if len(p.buffer) == 0 {
return errors.New("piece is not complete")
}
p.complete = true
return nil
}
func (p *Piece) MarkNotComplete() error {
p.complete = false
return nil
}
func (p *Piece) Completion() storage.Completion {
return storage.Completion{
Complete: p.complete && len(p.buffer) > 0,
Ok: true,
}
}
func (p *Piece) Release() {
p.mu.Lock()
defer p.mu.Unlock()
if p.buffer != nil {
p.buffer = nil
p.cache.bufferPull.ReleaseBuffer(p.bufIndex)
p.bufIndex = -1
}
p.Size = 0
p.complete = false
}
func (p *Piece) Stat() state.ItemState {
itm := state.ItemState{
Id: p.Id,
Hash: p.Hash,
Accessed: p.accessed,
Completed: p.complete,
BufferSize: p.Size,
}
return itm
}

View File

@@ -0,0 +1,66 @@
package memcache
import (
"sync"
"server/torr/storage"
"server/torr/storage/state"
"github.com/anacrolix/torrent/metainfo"
storage2 "github.com/anacrolix/torrent/storage"
)
type Storage struct {
storage.Storage
caches map[metainfo.Hash]*Cache
capacity int64
mu sync.Mutex
}
func NewStorage(capacity int64) storage.Storage {
stor := new(Storage)
stor.capacity = capacity
stor.caches = make(map[metainfo.Hash]*Cache)
return stor
}
func (s *Storage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage2.TorrentImpl, error) {
s.mu.Lock()
defer s.mu.Unlock()
ch := NewCache(s.capacity, s)
ch.Init(info, infoHash)
s.caches[infoHash] = ch
return ch, nil
}
func (s *Storage) GetStats(hash metainfo.Hash) *state.CacheState {
s.mu.Lock()
defer s.mu.Unlock()
if c, ok := s.caches[hash]; ok {
st := c.GetState()
return &st
}
return nil
}
func (s *Storage) CloseHash(hash metainfo.Hash) {
if s.caches == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
if ch, ok := s.caches[hash]; ok {
ch.Close()
delete(s.caches, hash)
}
}
func (s *Storage) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
for _, ch := range s.caches {
ch.Close()
}
return nil
}

View File

@@ -0,0 +1,22 @@
package state
import (
"time"
)
type CacheState struct {
Hash string
Capacity int64
Filled int64
PiecesLength int64
PiecesCount int
Pieces map[int]ItemState
}
type ItemState struct {
Id int
Accessed time.Time
BufferSize int64
Completed bool
Hash string
}