Munka sok fájllal (C#)

Munka sok fájllal (C#)
2021-11-02T19:22:32+01:00
2021-11-06T11:04:51+01:00
2022-10-15T21:20:34+02:00
helloc
Sziasztok!

Egy figyelt mappába (path_import) letöltődnek a *.mp4 videófájlok.

Egy letöltetlen fájl: valami.mp4.part

Egy már letöltődött fájl átneveződik: valami.mp4

Ekkor elkapja a programom a fájlt mert figyeli a path_import (C:\videos) mappát és átnevezi: valami.mp4.firstapp

Az átnevezés után futtat egy ffmpeg.exe-t, ami az adott videót szerkesztés után elmenti a path_export (C:\videos2) mappába.

A probléma, hogy 10 gbit-es nettel nagyon gyorsan letöltődnek a fájlok, viszont az ffmpeg.exe-vel a konvertálás több időt vesz igénybe, így előfordulhat, hogy egyszerre akár rengeteg ffmpeg.exe fut egy időben.

Trailer-ekről van szó, kis fájlokról, pár száz MB-sek.

Nem lehetne sorakoztatni a konvertálásra váró fájlokat valahogyan, hogy egyszerre mindig csak 1 ffmpeg.exe fájl fusson?

Form1.cs

using System; using System.IO; using System.Diagnostics; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Windows.Forms; using System.Globalization; namespace FirstApp { public partial class Form1 : Form { public static string path_import = ""; public static string path_export = ""; FileSystemWatcher watcher; public Form1() { InitializeComponent(); } private void Form1_Load(object sender, EventArgs e) { } private void button3_Click(object sender, EventArgs e) { this.watch(); } private void watch() { watcher = new FileSystemWatcher(); watcher.Path = Form1.path_import; watcher.NotifyFilter = NotifyFilters.LastAccess | NotifyFilters.LastWrite | NotifyFilters.FileName | NotifyFilters.DirectoryName; watcher.Filter = "*.mp4"; watcher.Renamed += OnRenamed; watcher.EnableRaisingEvents = true; } private void OnRenamed(object source, RenamedEventArgs e) { Task.Run(async() => { await Task.Delay(1000); ProcessRenaming(e.FullPath, e.Name); }); } private void ProcessRenaming(string fullPath, string name) { var lastPart = fullPath.Split('.').Last(); if (lastPart == "mp4") { File.Move(fullPath, fullPath + ".firstapp"); string nf = CultureInfo.CurrentCulture.TextInfo.ToTitleCase(name); Process prcExecuteFFMPEG = new Process(); prcExecuteFFMPEG.StartInfo.FileName = "ffmpeg.exe"; prcExecuteFFMPEG.StartInfo.Arguments = "-ss 00:00:30 -i "" + fullPath + ".firstapp" -c copy "" + path_export + "" + nf + """; prcExecuteFFMPEG.StartInfo.UseShellExecute = false; prcExecuteFFMPEG.StartInfo.RedirectStandardOutput = true; prcExecuteFFMPEG.StartInfo.CreateNoWindow = true; prcExecuteFFMPEG.Start(); prcExecuteFFMPEG.WaitForExit(); string strOutput = prcExecuteFFMPEG.StandardOutput.ReadToEnd(); File.Delete(fullPath + ".firstapp"); } } } }
Mutasd a teljes hozzászólást!
Neked egy message queue-ra van szükséged. Ez egy aszinkron kommunikációs módszert jelent egy alkalmazás két szálja, vagy két teljesen különálló alkalmazás között. Azért aszinkron, mert nem várja be a művelet eredményét - csak betesz a queue-ba egy üzenetet, ami leírja a feladatot, és a másik (feldolgozó) fél pedig majd megcsinálja, amikor eljut odáig.

Az elv a következő lesz:

interface IMessageQueue<T> { void Enqueue(T item); Task<T> DequeueAsync(CancellationToken cancellationToken); } class WorkItem { ... } class Producer { private readonly IMessageQueue<WorkItem> queue; public Producer(IMessageQueue<WorkItem> queue) { this.queue = queue; } public void QueueTask() { queue.Enqueue(new WorkItem(...)); } } class Consumer { private readonly IMessageQueue<WorkItem> queue; public Consumer(IMessageQueue<WorkItem> queue) { this.queue = queue; } public async Task StartConsumingAsync(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) { WorkItem taskToDo = await queue.DequeueAsync(cancellationToken); if (!cancellationToken.IsCancellationRequested) //ebben az esetben lehet null-érték az eredmény await ProcessWorkItemAsync(taskToDo, cancellationToken); } } private async Task ProcessWorkItemAsync(WorkItem workItem, cancellationToken) { ... } }
A WorkItem tartalmaz minden olyan információt, amit tudni kell ahhoz, hogy az ffmpeg-rész működni tudjon: input fájl neve, output fájl neve, ffmpeg paraméterek, stb. A Producer lesz a fájlletöltőd, ami ha végzett egy letöltéssel, berak egy új WorkItem-et a queue-ba. A Consumer pedig egy háttérszálon működő folyamat, ami folyamatosan várja, hogy mikor van munka, és ha van, veszi a következőt és megcsinálja.

A nehézséget az IMessageQueue implementálása fogja jelenteni, de a neten erre is vannak példák.

Plusz dolog, amire oda kell figyelned:
Mivel a Consumer háttérszálon fut, az alkalmazásod leállításával annak is le kell állnia. Vagyis a CancellationToken-nel jelezned kell, hogy most már jó lenne leállni, és az alkalmazásod meg is várja, amíg le nem áll a Consumer.
Mutasd a teljes hozzászólást!

  • Összeraktam egy demo programot, amiben kipróbálhatod, hogy működne ez. Figyeld majd az Output window Debug nézetét! Itt egy példa output tőlem:

    Work item added: a042f16c-fa6a-4ba1-99cc-ea9e5ae9119e Work item added: 6dcf4b2f-07a6-42a0-96fd-8b70fecf4b6f Work item added: d7136e89-0b9c-464b-832c-48f9258ce7a1 Work item added: 51329087-d53f-4fd9-9b4d-e2870747bf78 Processing work item a042f16c-fa6a-4ba1-99cc-ea9e5ae9119e Work item added: 97b5b4b2-4aeb-4afd-b1b1-6f75ee73d876 Done processing. Processing work item 6dcf4b2f-07a6-42a0-96fd-8b70fecf4b6f Done processing. Processing work item d7136e89-0b9c-464b-832c-48f9258ce7a1 Done processing. Processing work item 51329087-d53f-4fd9-9b4d-e2870747bf78 Done processing. Processing work item 97b5b4b2-4aeb-4afd-b1b1-6f75ee73d876 Done processing. Work item added: 400c436c-3721-406e-9b8b-9af6bf1272b6 Work item added: bce0af56-93ff-4983-b53c-a200e1e586f4 Work item added: 6a1292a4-a105-4191-b0c3-76e338842568 Work item added: 48bac806-8063-4631-985d-83504ceaa17e Work item added: 1261b81b-f4c7-4140-897a-4acda44803e1 Processing work item 400c436c-3721-406e-9b8b-9af6bf1272b6 Done processing. Processing work item bce0af56-93ff-4983-b53c-a200e1e586f4 Waiting for background thread to stop... Waiting for background thread to stop... Done processing. Background thread stopped. The program '[19040] AsyncMessageQueueDemo.exe' has exited with code 0 (0x0).
    Mutasd a teljes hozzászólást!
    Csatolt állomány
  • Ez lehet, hogy a SemaphoreSlim osztállyal is működne:

    SemaphoreSlim semaphore = new SemaphoreSlim(0, 3); // Egyszerre 3 feldolgozás futhat private void OnRenamed(object source, RenamedEventArgs e) { Task.Run(async() => { await Task.Delay(1000); await semaphore.WaitAsync(); try ProcessRenaming(e.FullPath, e.Name); finally semaphore.Release(); end; }); }
    Mutasd a teljes hozzászólást!
  • Nem lehetne sorakoztatni a konvertálásra váró fájlokat valahogyan

    Dehogynem

    Szerintem a legegyszerűbb módszer, ha kihasználod a program stack-jét, így nem kell saját queue-t létrehozni.

    Az OnRenamed()-t sok példányban meghívja, vagyis ha hagyod minden alkalommal elindítja a 
    ProcessRenaming()-t.

    Hát ne engedd neki!

    Várjon és csak akkor indítsa el, ha nem fut korábbi ffmpeg.

    Én egy statikus változóban (ugye lock-al) inkrementálnám a futó ffmpeg-ek számát a ProcessRenaming()-be belépéskor és decrementálnám a kilépéskor. 
    Az OnRenamed() függvény (ugye beszúrt delay-el) végtelen ciklusban figyelné a statikus számlálót és csak akkor indít el újat, ha a megengedett számnál kevesebb fut.
    Persze a statikus változó legyen volatile. (olvasd el miért)

    Azért írtam így le neked, mert így átlátod a lényegét.
    A valóságban vannak megfelelő gyári (beépített) megoldások erre:

    Semaphore Class (System.Threading) | Microsoft Docs


    A stack elbírja az akár több száz vagy ezer várakozó (vagyis nem lefutott) függvény lokális változóit, az belefér.
    Mutasd a teljes hozzászólást!
  • Köszönöm, de valami nem tetszik neki a kódban: firstapp.png
    Mutasd a teljes hozzászólást!
  • Köszönöm, ki fogom próbálni ezt is. Sajnos nem értek hozzá annyira, hogy megállapíthassam, hogy melyik a jobb ehhez a projekthez (az IMessageQueue vagy a SemaphoreSlim).
    Mutasd a teljes hozzászólást!
  • Bocs, tegnap túl sokat paszkálozhattam, megártott :)

    private void OnRenamed(object source, RenamedEventArgs e) { Task.Run(async() => { await Task.Delay(1000); await semaphore.WaitAsync(); try { ProcessRenaming(e.FullPath, e.Name); } finally { semaphore.Release(); } }); }
    Mutasd a teljes hozzászólást!
  • Mindjárt kipróbálom. Köszönöm. A SemaphoreSlim(0, 3) második paraméterét megtudnám adni valahogyan egy NumericUpDown elemmel?

    C# és Pascal?

    Mutasd a teljes hozzászólást!
  • Sajnos ezzel nem működik. Nem kapja el a fájlokat, tehát nem nevezi át őket .firstapp végződésűekre.
    Mutasd a teljes hozzászólást!
  • Akkor próbáld meg így konstruálni a semaphore-t (nem emlékszem már tisztán)

    = new SemaphoreSlim(3, 3);
    Mutasd a teljes hozzászólást!
  • Köszönöm, így már működik.

    SemaphoreSlim(Int32)

    Initializes a new instance of the SemaphoreSlim class, specifying the initial number of requests that can be granted concurrently.

    SemaphoreSlim(Int32, Int32)

    Initializes a new instance of the SemaphoreSlim class, specifying the initial and maximum number of requests that can be granted concurrently.

    new SemaphoreSlim(3);

    Elég lehet ez is, vagy csak nem tudok angolul?
    Mutasd a teljes hozzászólást!
  • Ez jó lehet?

    namespace FirstApp { public partial class Form1 : Form { SemaphoreSlim semaphore; // SemaphoreSlim semaphore = new SemaphoreSlim(3, 3); // ... private void button3_Click(object sender, EventArgs e) { // ... numericUpDown1.Enabled = false; semaphore = new SemaphoreSlim(numericUpDown1.Value, numericUpDown1.Value); this.watch(); }
    Mutasd a teljes hozzászólást!
  • Igen, így működik:

    int i = (int)numericUpDown1.Value; semaphore = new SemaphoreSlim(i, i);
    Mutasd a teljes hozzászólást!
  • Mindkettő működik (IMessageQueue/SemaphoreSlim), de a SemaphoreSlim-et választottam. Nekem, hozzá nem értőnek ez szimpatikusabb volt, de mivel H.Lilla gyorsabban reagált a kérdésemre és a megoldása is jó, így végül neki adtam a pontot, de nagyon köszönöm gyk (és FBS) segítségét is!
    Mutasd a teljes hozzászólást!
Tetszett amit olvastál? Szeretnél a jövőben is értesülni a hasonló érdekességekről?
abcd