package upload import ( "errors" "fmt" tele "gopkg.in/telebot.v4" "log" "math" "path/filepath" "server/torr" "server/torr/state" "strconv" "sync" "time" ) type Worker struct { id int c tele.Context msg *tele.Message torrentHash string isCancelled bool from int to int ti *state.TorrentStatus } type Manager struct { queue []*Worker working map[int]*Worker ids int wrkSync sync.Mutex queueLock sync.Mutex } func (m *Manager) Start() { m.working = make(map[int]*Worker) go m.work() } func (m *Manager) AddRange(c tele.Context, hash string, from, to int) { m.queueLock.Lock() defer m.queueLock.Unlock() if len(m.queue) > 50 { c.Bot().Send(c.Recipient(), "Очередь переполнена, попробуйте попозже\n\nЭлементов в очереди:"+strconv.Itoa(len(m.queue))) return } m.ids++ if m.ids > math.MaxInt { m.ids = 0 } var msg *tele.Message var err error for i := 0; i < 20; i++ { msg, err = c.Bot().Send(c.Recipient(), "Подключение к торренту\n"+hash+"") if err == nil { break } else { log.Println("Error send msg, try again:", i+1, "/", 20) } } if err != nil { log.Println("Error send msg:", err) return } t := torr.GetTorrent(hash) t.WaitInfo() for t.Status().Stat != state.TorrentWorking { time.Sleep(time.Second) t = torr.GetTorrent(hash) } ti := t.Status() if from == 1 && to == -1 { to = len(ti.FileStats) } if to > len(ti.FileStats) { to = len(ti.FileStats) } if from < 1 { from = 1 } if to >= 0 && to < from { from, to = to, from } if to > len(ti.FileStats) { to = len(ti.FileStats) } w := &Worker{ id: m.ids, c: c, torrentHash: hash, msg: msg, ti: ti, from: from, to: to, } m.queue = append(m.queue, w) } func (m *Manager) Cancel(id int) { m.queueLock.Lock() defer m.queueLock.Unlock() var rem []int for i, w := range m.queue { if w.id == id { w.isCancelled = true w.c.Bot().Delete(w.msg) rem = append(rem, i) return } } for _, i := range rem { m.queue = append(m.queue[:i], m.queue[i+1:]...) } if wrk, ok := m.working[id]; ok { wrk.isCancelled = true return } } func (m *Manager) work() { for { m.queueLock.Lock() if len(m.working) > 0 { m.queueLock.Unlock() m.sendQueueStatus() time.Sleep(time.Second) continue } if len(m.queue) == 0 { m.queueLock.Unlock() time.Sleep(time.Second) continue } wrk := m.queue[0] m.queue = m.queue[1:] m.working[wrk.id] = wrk m.queueLock.Unlock() m.sendQueueStatus() loading(wrk) m.queueLock.Lock() delete(m.working, wrk.id) m.queueLock.Unlock() } } func (m *Manager) sendQueueStatus() { m.queueLock.Lock() defer m.queueLock.Unlock() for i, wrk := range m.queue { if wrk.msg == nil { continue } torrKbd := &tele.ReplyMarkup{} torrKbd.Inline([]tele.Row{torrKbd.Row(torrKbd.Data("Отмена", "cancel", strconv.Itoa(wrk.id)))}...) msg := "Номер в очереди " + strconv.Itoa(i+1) wrk.c.Bot().Edit(wrk.msg, msg, torrKbd) } } func loading(wrk *Worker) { iserr := false t := torr.GetTorrent(wrk.torrentHash) t.WaitInfo() for t.Status().Stat != state.TorrentWorking { time.Sleep(time.Second) t = torr.GetTorrent(wrk.torrentHash) } wrk.ti = t.Status() for i := wrk.from - 1; i <= wrk.to-1; i++ { file := wrk.ti.FileStats[i] if wrk.isCancelled { return } err := uploadFile(wrk, file, i+1, len(wrk.ti.FileStats)) if err != nil { errstr := fmt.Sprintf("Ошибка загрузки в телеграм: %v", err.Error()) wrk.c.Bot().Edit(wrk.msg, errstr) iserr = true break } } if !iserr { wrk.c.Bot().Delete(wrk.msg) } } func uploadFile(wrk *Worker, file *state.TorrentFileStat, fi, fc int) error { caption := filepath.Base(file.Path) torrFile, err := NewTorrFile(wrk, file) if err != nil { return err } var wa sync.WaitGroup wa.Add(1) complete := false go func() { for !complete { updateLoadStatus(wrk, torrFile, fi, fc) time.Sleep(1 * time.Second) } wa.Done() }() d := &tele.Document{} d.FileName = file.Path d.Caption = caption d.File.FileReader = torrFile for i := 0; i < 20; i++ { err = wrk.c.Send(d) if err == nil || errors.Is(err, ERR_STOPPED) { break } else { log.Println("Error send msg, try again:", i+1, "/", 20) } } complete = true wa.Wait() torrFile.Close() if errors.Is(err, ERR_STOPPED) { err = nil } return err }