Saltar al contenido

Soluciones: CSV y Multiprocesamiento en Python

Lenguaje: Python 3 · Compatibilidad: Windows / Linux / macOS


Ejercicio 1 · Generar un CSV N×M con números consecutivos

Solución robusta con csv.writer (recomendada)

from pathlib import Path
import csv

def generar_csv_consecutivo(filas: int, columnas: int, salida: Path) -> None:
    if filas <= 0 or columnas <= 0:
        raise ValueError("Filas y columnas deben ser > 0")

    salida = Path(salida)
    salida.parent.mkdir(parents=True, exist_ok=True)

    # newline='' evita líneas en blanco extra en Windows
    with open(salida, "w", encoding="utf-8", newline="") as f:
        w = csv.writer(f)
        for i in range(filas):
            fila = [i * columnas + (j + 1) for j in range(columnas)]
            w.writerow(fila)

if __name__ == "__main__":
    filas = int(input("Número de filas: "))
    columnas = int(input("Número de columnas: "))
    ruta = Path(input("Ruta de salida (p.ej., datos/matriz.csv): "))
    generar_csv_consecutivo(filas, columnas, ruta)

Cómo funciona (resumen): crea la carpeta de salida, escribe cada fila con enteros consecutivos y usa newline='' para que el CSV quede limpio en Windows.

Prueba rápida: filas=3, columnas=4 → el fichero debe quedar como 1..4 / 5..8 / 9..12.


Ejercicio 2 · Leer un CSV y generar otro con la suma por fila

Solución en streaming (línea a línea) con limpieza y control de errores

from pathlib import Path

def sumar_filas_csv(origen: Path, destino: Path) -> None:
    origen = Path(origen)
    destino = Path(destino)
    destino.parent.mkdir(parents=True, exist_ok=True)

    with open(origen, "r", encoding="utf-8") as fin, \
         open(destino, "w", encoding="utf-8", newline="") as fout:
        for idx, linea in enumerate(fin, start=1):
            linea = linea.strip()
            if not linea:
                # Política: línea vacía -> escribir 0
                fout.write("0\n")
                continue
            try:
                cols = [c.strip() for c in linea.split(",") if c.strip() != ""]
                nums = list(map(int, cols))
            except ValueError as e:
                # Mensaje claro para depuración
                raise ValueError(f"Línea {idx}: valor no numérico. Texto='{linea}'") from e

            fout.write(str(sum(nums)) + "\n")

if __name__ == "__main__":
    origen = Path(input("CSV de entrada: "))
    destino = Path(input("CSV con sumas por fila: "))
    sumar_filas_csv(origen, destino)

Cómo funciona: lee, limpia, separa por comas, convierte a int, suma y escribe una suma por línea. Si la línea está vacía, escribe 0.

Prueba rápida: entrada 1,2,3 y 10, 20, 30 → salida 6 y 60 en líneas separadas.


Ejercicio 3 · Multiprocesamiento básico: lanzar N trabajadores

Patrón mínimo con multiprocessing.Process

from multiprocessing import Process
from time import sleep
import os

def tarea(nombre: str, t: int = 2) -> None:
    print(f"[PID {os.getpid()}] Inicio - {nombre}")
    sleep(t)
    print(f"[PID {os.getpid()}] Fin    - {nombre}")

if __name__ == "__main__":  # Imprescindible en Windows
    n = int(input("¿Cuántos procesos lanzar?: "))
    procesos = []
    for i in range(n):
        p = Process(target=tarea, args=(f"Trabajador_{i}", 2))
        p.start()
        procesos.append(p)

    for p in procesos:
        p.join()

    print("Todos los procesos han finalizado.")

Cómo funciona: crea N procesos, cada uno imprime su PID, “trabaja” 2s y termina; el principal espera con join() a que acaben todos.

Prueba rápida: N=3 → verás mensajes intercalados con distintos PID.


Ejercicio 4 · CSV grande + Multiprocesamiento con chunks (salidas parciales)

Divide la carga en trozos, cada proceso escribe su propio CSV con sumas

from pathlib import Path
from multiprocessing import Process
from math import ceil
import csv, os

def carga_csv(ruta: Path) -> list[list[int]]:
    datos = []
    with open(ruta, "r", encoding="utf-8") as f:
        for l in f:
            l = l.strip()
            if not l:
                continue
            datos.append([int(x) for x in l.split(",") if x.strip() != ""])
    return datos

def dividir_lista(lst: list, n: int) -> list[list]:
    n = max(1, n)
    if not lst:
        return []
    tam = ceil(len(lst) / n)
    return [lst[i:i+tam] for i in range(0, len(lst), tam)]

def trabajador(chunk: list[list[int]], salida_dir: Path, id_trab: int) -> None:
    salida_dir.mkdir(parents=True, exist_ok=True)
    nombre = salida_dir / f"suma_{id_trab}_{os.getpid()}.csv"
    with open(nombre, "w", encoding="utf-8", newline="") as f:
        w = csv.writer(f)
        for fila in chunk:
            w.writerow([sum(fila)])

if __name__ == "__main__":
    origen = Path(input("CSV de entrada: "))
    salida_dir = Path(input("Directorio de salidas parciales: "))
    nproc = int(input("Nº de procesos: "))

    datos = carga_csv(origen)
    nproc = max(1, min(nproc, len(datos)))  # evita chunks vacíos
    chunks = dividir_lista(datos, nproc)

    procesos = []
    for i, c in enumerate(chunks):
        if not c:
            continue
        p = Process(target=trabajador, args=(c, salida_dir, i))
        p.start()
        procesos.append(p)

    for p in procesos:
        p.join()

    print("Listo: salidas parciales creadas en", salida_dir)

Cómo funciona: carga el CSV en memoria, reparte filas en trozos casi iguales, cada proceso suma las filas de su trozo y escribe su CSV (el nombre lleva el PID para evitar colisiones).

Prueba rápida: usa un CSV de 1000 filas, lanza 4 procesos; deben aparecer 3–4 ficheros de sumas (según división).


(Reto) Un único CSV final con orden global usando Queue + recolector

Los workers envían (índice, suma) a una cola; un proceso recolector escribe suma_total.csv en orden

from multiprocessing import Process, Queue
from pathlib import Path
from math import ceil
import csv

def leer_csv_lineas(ruta: Path) -> list[str]:
    with open(ruta, "r", encoding="utf-8") as f:
        return [l.rstrip("\n") for l in f]

def rangos_indices(n_total: int, n_chunks: int) -> list[range]:
    n_chunks = max(1, min(n_chunks, n_total))
    tam = ceil(n_total / n_chunks)
    return [range(i, min(i + tam, n_total)) for i in range(0, n_total, tam)]

def worker_sumas(lineas: list[str], idxs: range, q: Queue) -> None:
    for idx in idxs:
        linea = lineas[idx].strip()
        if not linea:
            q.put((idx, 0))
            continue
        nums = [int(x) for x in linea.split(",") if x.strip() != ""]
        q.put((idx, sum(nums)))

def recolector(q: Queue, n_items: int, destino: Path) -> None:
    resultados = [None] * n_items
    recibidos = 0
    while recibidos < n_items:
        idx, suma = q.get()     # bloquea hasta recibir
        resultados[idx] = suma
        recibidos += 1
    with open(destino, "w", encoding="utf-8", newline="") as f:
        w = csv.writer(f)
        for s in resultados:
            w.writerow([s])

if __name__ == "__main__":
    origen = Path(input("CSV de entrada: "))
    destino = Path(input("CSV de salida total (p.ej., suma_total.csv): "))
    nproc = int(input("Nº de procesos: "))

    lineas = leer_csv_lineas(origen)
    n = len(lineas)
    if n == 0:
        raise ValueError("El CSV de entrada está vacío")

    q = Queue()
    ranges = rangos_indices(n, nproc)

    proc_reco = Process(target=recolector, args=(q, n, destino))
    proc_reco.start()

    workers = []
    for r in ranges:
        p = Process(target=worker_sumas, args=(lineas, r, q))
        p.start()
        workers.append(p)

    for p in workers:
        p.join()

    proc_reco.join()
    print("OK: suma_total.csv creado en orden.")

Alternativa corta: concurrent.futures.ProcessPoolExecutor.map preserva el orden de entrada y simplifica el código si el CSV cabe en memoria.

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

Información básica sobre protección de datos Ver más

  • Responsable: Tomas Gonzalez.
  • Finalidad:  Moderar los comentarios.
  • Legitimación:  Por consentimiento del interesado.
  • Destinatarios y encargados de tratamiento:  No se ceden o comunican datos a terceros para prestar este servicio.
  • Derechos: Acceder, rectificar y suprimir los datos.
  • Información Adicional: Puede consultar la información detallada en la Política de Privacidad.

¿Atascado con tu proyecto? Presupuesto GRATIS

X
error: Content is protected !!
Este sitio web utiliza cookies, si necesitas más información puedes visitar nuestra política de privacidad    Ver
Privacidad