# Technology stack: .
NET Framework
import pandas as pd
import tkinter as tk
from tkinter import filedialog, messagebox
import asyncio
import codecs
class FrmConsultaMultipleRUCSUNAT(tk.Tk):
def __init__(self):
super().__init__()
self.title("Consulta Multiple RUC SUNAT")
self.btnSeleccionarArchivoPlano = tk.Button(self, text="Seleccionar Archivo Plano",
command=self.select_file)
self.btnSeleccionarArchivoPlano.pack()
self.txtNombreCompletoArchivo = tk.Entry(self)
self.txtNombreCompletoArchivo.pack()
self.dgvRUCs = tk.Listbox(self)
self.dgvRUCs.pack()
def select_file(self):
file_path = filedialog.askopenfilename(title="Seleccione el archivo a guardar",
filetypes=(("Archivos permitidos", "*.txt;*.csv;*.dat"),))
if file_path:
tipo_respuesta = 2
mensaje_respuesta = ""
self.txtNombreCompletoArchivo.insert(0, file_path)
tabla = pd.DataFrame(columns=["NumeroRUC", "RazonSocial", "TipoContribuyente",
"NombreComercial", "CondicionContribuyente", "EstadoContribuyente", "FechaInscripcion",
"FechaInicioActividades", "DomicilioFiscal", "ActividadComercioExterior",
"ActividadesEconomicas", "Padrones", "TipoRespuesta", "MensajeRespuesta"])
self.dgvRUCs.delete(0, tk.END)
try:
with codecs.open(file_path, "r", encoding="utf-8") as file:
cFila = 0
for line in file:
cFila += 1
filaTabla = pd.Series({"NumeroRUC": line.strip()})
tabla = tabla.append(filaTabla, ignore_index=True)
tipo_respuesta = 1
except Exception as ex:
tipo_respuesta = 3
mensaje_respuesta = str(ex)
for index, row in tabla.iterrows():
self.dgvRUCs.insert(tk.END, row["NumeroRUC"])
if tipo_respuesta > 1:
messagebox.showinfo("Consultar documento mediante archivo plano",
mensaje_respuesta, icon=messagebox.WARNING if tipo_respuesta == 2 else
messagebox.ERROR)
if __name__ == "__main__":
app = FrmConsultaMultipleRUCSUNAT()
app.mainloop()
private async def btnConsultarRUC_Click(sender, e):
nAsegurado = dgvRUCs.Rows.Count
if nAsegurado == 0:
lblMensaje.Text = "Debe seleccionar el archivo plano (.txt, .csv, .dat)"
else:
tipoRespuesta = 2
mensajeRespuesta = ""
lDatosRUC = []
oEnSUNAT = EnSUNAT()
oCuTexto = CuTexto()
oCronometro = Stopwatch()
oCronometro.Start()
btnConsultarRUC.Enabled = False
# consulta directa a la pagina
try:
cookies = CookieContainer()
controladorMensaje = HttpClientHandler()
controladorMensaje.CookieContainer = cookies
controladorMensaje.UseCookies = True
with HttpClient(controladorMensaje) as cliente:
cliente.DefaultRequestHeaders.Add("Host", "e-consultaruc.sunat.gob.pe")
cliente.DefaultRequestHeaders.Add("sec-ch-ua",
" \" Not A;Brand\";v=\"99\", \"Chromium\";v=\"90\", \"Google
Chrome\";v=\"90\"")
cliente.DefaultRequestHeaders.Add("sec-ch-ua-mobile", "?0")
cliente.DefaultRequestHeaders.Add("Sec-Fetch-Dest", "document")
cliente.DefaultRequestHeaders.Add("Sec-Fetch-Mode", "navigate")
cliente.DefaultRequestHeaders.Add("Sec-Fetch-Site", "same-origin")
cliente.DefaultRequestHeaders.Add("Sec-Fetch-User", "?1")
cliente.DefaultRequestHeaders.Add("Upgrade-Insecure-Requests", "1")
cliente.DefaultRequestHeaders.Add("User-Agent",
"Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like
Gecko) Chrome/88.0.4324.150 Safari/537.36")
ServicePointManager.SecurityProtocol = SecurityProtocolType.Tls |
SecurityProtocolType.Tls11 | SecurityProtocolType.Tls12
url_random = "https://e-consultaruc.sunat.gob.pe/cl-ti-itmrconsmulruc/captcha?accion=random"
url_base_ruc = "https://e-consultaruc.sunat.gob.pe/cl-ti-itmrconsmulruc/jrmS00Alias"
url_completa_ruc = ""
contenido_html = ""
numero_random = ""
with cliente.GetAsync(url_random) as resultado_consulta:
if resultado_consulta.IsSuccessStatusCode:
numero_random = resultado_consulta.Content.ReadAsStringAsync()
cliente.DefaultRequestHeaders.Add("Referer", url_base_ruc)
numero_ruc = ""
nombre_completo_archivo = txtNombreCompletoArchivo.Text
extension_archivo = Path.GetExtension(nombre_completo_archivo)
nombre_completo_archivo_resultado =
"{0}Resultado{1}".format(nombre_completo_archivo[:len(nombre_completo_archivo) -
len(extension_archivo)], extension_archivo)
if File.Exists(nombre_completo_archivo_resultado):
File.Delete(nombre_completo_archivo_resultado)
fila_grilla = None
n_filas_grilla = dgvRUCs.Rows.Count
d_numeros_ruc = {}
i=0
c_filas_grilla = 0
c_numero_rucs_buscados = 0
while i < n_filas_grilla:
c_filas_grilla += 1
numero_ruc = dgvRUCs.Rows[i].Cells[0].Value.ToString()
if not oCuTexto.ValidarSoloNumero(numero_ruc):
numero_ruc = ""
d_numeros_ruc[i] = numero_ruc
if c_filas_grilla == n_filas_grilla or (n_filas_grilla > 10 and c_filas_grilla % 10 == 0):
tipo_respuesta = 2
mensaje_respuesta = ""
c_numero_rucs_buscados = 0
url_completa_ruc = url_base_ruc
url_completa_ruc += "?accion=consManual&txtRuc="
for k_numero_ruc in d_numeros_ruc.values():
if k_numero_ruc == "":
continue
url_completa_ruc += "&selRuc="
url_completa_ruc += k_numero_ruc
c_numero_rucs_buscados++
if c_numero_rucs_buscados > 0:
url_completa_ruc += "&random="
url_completa_ruc += numero_random
async with cliente.get(url_completa_ruc) as resultado_consulta_datos:
if resultado_consulta_datos.status == 200:
contenido_html = await resultado_consulta_datos.text()
nombre_inicio = "<td class=\"bg\" align=\"center\"><a href=\""
nombre_fin = "\""
url_descarga_archivo = o_cu_texto.extraer_contenido_entre_tag_string(contenido_html,
0, nombre_inicio, nombre_fin)
if url_descarga_archivo == "":
nombre_inicio = "<HEAD><TITLE>"
nombre_fin = "</TITLE></HEAD>"
contenido_busqueda =
o_cu_texto.extraer_contenido_entre_tag_string(contenido_html, 0, nombre_inicio, nombre_fin,
StringComparison.OrdinalIgnoreCase)
if contenido_busqueda == ".:: Pagina de Mensajes ::.":
nombre_inicio = "<p class=\"error\">"
nombre_fin = "</p>"
mensaje_respuesta =
o_cu_texto.extraer_contenido_entre_tag_string(contenido_html, 0, nombre_inicio, nombre_fin)
elif contenido_busqueda == ".:: Pagina de Error ::.":
nombre_inicio = "<p class=\"error\">"
nombre_fin = "</p>"
tipo_respuesta = 3
mensaje_respuesta =
o_cu_texto.extraer_contenido_entre_tag_string(contenido_html, 0, nombre_inicio, nombre_fin)
elif contenido_busqueda == "Request Rejected":
nombre_inicio = "<body>"
nombre_fin = "<br><br>"
tipo_respuesta = 3
mensaje_respuesta =
o_cu_texto.extraer_contenido_entre_tag_string(contenido_html, 0, nombre_inicio, nombre_fin)
if mensaje_respuesta == "":
mensaje_respuesta = "No se puede obtener el detalle del error Request Rejected,
porque no existe los nombres \"<body>\" y \"<br><br>\" en el contenido HTML"
else:
mensaje_respuesta = "No se puede obtener el error, porque no existe los
nombres \"<HEAD><TITLE>\" y \"</TITLE></HEAD>\" en el contenido HTML"
for k_numero_ruc in d_numeros_ruc.keys():
o_en_sunat = EnSUNAT()
o_en_sunat.numero_ruc = dgv_rucs.rows[k_numero_ruc].cells[0].value.to_string()
o_en_sunat.tipo_respuesta = tipo_respuesta
o_en_sunat.mensaje_respuesta = mensaje_respuesta
l_datos_ruc.add(o_en_sunat)
tipo_respuesta = 1
mensaje_respuesta = ""
else:
cliente.default_request_headers.remove("Host")
cliente.default_request_headers.add("Host", "ww1.sunat.gob.pe")
resultado_descarga_archivo = await cliente.get_async(url_descarga_archivo)
if resultado_descarga_archivo.is_success_status_code:
stm_contenido = await resultado_descarga_archivo.content.read_as_stream()
archivo_zip = ZipArchive(stm_contenido)
archivo = archivo_zip.entries[0]
linea_texto = ""
arr_linea_texto = []
with StreamReader(archivo.open()) as sr:
linea_texto = await sr.read_line_async()
while not sr.end_of_stream:
o_en_sunat = EnSUNAT()
linea_texto = await sr.read_line_async()
arr_linea_texto = linea_texto.split('|')
o_en_sunat.numero_ruc = arr_linea_texto[0]
o_en_sunat.razon_social = arr_linea_texto[1]
o_en_sunat.tipo_contribuyente = arr_linea_texto[2]
o_en_sunat.nombre_comercial = arr_linea_texto[4]
o_en_sunat.condicion_contribuyente = arr_linea_texto[5]
o_en_sunat.estado_contribuyente = arr_linea_texto[6]
o_en_sunat.fecha_inscripcion = arr_linea_texto[7]
o_en_sunat.fecha_inicio_actividades = arr_linea_texto[8]
o_en_sunat.domicilio_fiscal = "{0} {1} - {2} - {3}".format(arr_linea_texto[12],
arr_linea_texto[9], arr_linea_texto[10], arr_linea_texto[11])
o_en_sunat.actividad_comercio_exterior = arr_linea_texto[15]
o_en_sunat.actividades_economicas = "{0};{1};{2}".format(arr_linea_texto[16],
arr_linea_texto[17], arr_linea_texto[18])
o_en_sunat.padrones = arr_linea_texto[21]
o_en_sunat.tipo_respuesta = 1
l_datos_ruc.add(o_en_sunat)
if archivoZip.Entries.Count > 1:
# Rucs_Invalidos.txt
archivo = archivoZip.Entries[1]
with StreamReader(archivo.Open()) as sr:
while not sr.EndOfStream:
lineaTexto = await sr.ReadLineAsync()
arrLineaTexto = lineaTexto.Split('|')
for sLineaTexto in arrLineaTexto:
if sLineaTexto == "":
continue
oEnSUNAT = EnSUNAT()
oEnSUNAT.NumeroRUC = sLineaTexto
oEnSUNAT.TipoRespuesta = 2
oEnSUNAT.MensajeRespuesta = "El número de RUC ingresado no es válido o no
existe"
lDatosRUC.append(oEnSUNAT)
else:
tipoRespuesta = 1
mensajeRespuesta = ""
else:
mensajeRespuesta = await resultadoDescargaArchivo.Content.ReadAsStringAsync()
mensajeRespuesta = string.Format("Ocurrió un inconveniente ({0}) al obtener el archivo
plano.\r\nDetalle:{1}", resultadoDescargaArchivo.StatusCode, mensajeRespuesta)
else:
mensaje_respuesta = await resultado_consulta_datos.Content.ReadAsStringAsync()
mensaje_respuesta = (
f"Ocurrió un inconveniente ({resultado_consulta_datos.StatusCode}) en la consulta
multiple de los datos de los RUCs {url_completa_RUC}.\r\nDetalle:{mensaje_respuesta}"
)
for k_numero_RUC in d_numeros_RUC.keys():
for datos_RUC in l_datos_RUC:
if dgvRUCs.Rows[k_numero_RUC].Cells[0].Value.ToString() ==
datos_RUC.NumeroRUC:
fila_grilla = dgvRUCs.Rows[k_numero_RUC]
fila_grilla.Cells[1].Value = datos_RUC.RazonSocial
fila_grilla.Cells[2].Value = datos_RUC.TipoContribuyente
fila_grilla.Cells[3].Value = datos_RUC.NombreComercial
fila_grilla.Cells[4].Value = datos_RUC.CondicionContribuyente
fila_grilla.Cells[5].Value = datos_RUC.EstadoContribuyente
fila_grilla.Cells[6].Value = datos_RUC.FechaInscripcion
fila_grilla.Cells[7].Value = datos_RUC.FechaInicioActividades
fila_grilla.Cells[8].Value = datos_RUC.DomicilioFiscal
fila_grilla.Cells[9].Value = datos_RUC.ActividadComercioExterior
fila_grilla.Cells[10].Value = datos_RUC.ActividadesEconomicas
fila_grilla.Cells[11].Value = datos_RUC.Padrones
fila_grilla.Cells[12].Value = datos_RUC.TipoRespuesta
fila_grilla.Cells[13].Value = datos_RUC.MensajeRespuesta
await oCuTexto.GuardarTexto(
nombre_completo_archivo_resultado,
f"{datos_RUC.NumeroRUC}|{datos_RUC.RazonSocial}|
{datos_RUC.TipoContribuyente}|{datos_RUC.NombreComercial}|
{datos_RUC.CondicionContribuyente}|{datos_RUC.EstadoContribuyente}|
{datos_RUC.FechaInscripcion}|{datos_RUC.FechaInicioActividades}|
{datos_RUC.DomicilioFiscal}|{datos_RUC.ActividadesEconomicas}|{datos_RUC.Padrones}|
{datos_RUC.TipoRespuesta}|{datos_RUC.MensajeRespuesta}"
)
break
else:
tipo_respuesta = 2
mensaje_respuesta = "El número de RUC no es válido"
for k_numero_RUC in d_numeros_RUC.keys():
fila_grilla = dgvRUCs.Rows[k_numero_RUC]
fila_grilla.Cells[12].Value = tipo_respuesta
fila_grilla.Cells[13].Value = mensaje_respuesta
# Technology Stack: C#
# Framework: .NET
await oCuTexto.GuardarTexto(nombreCompletoArchivoResultado
, f"{dgvRUCs.Rows[kNumeroRUC].Cells[0].Value}|||||||||||{tipoRespuesta}|
{mensajeRespuesta}")
tipoRespuesta = 1
mensajeRespuesta = ""
dNumerosRUC = {}
i=0
if resultadoConsulta.IsSuccessStatusCode:
mensajeRespuesta = await resultadoConsulta.Content.ReadAsStringAsync()
mensajeRespuesta = f"Ocurrió un inconveniente ({resultadoConsulta.StatusCode}) al
consultar el número random.\r\nDetalle:{mensajeRespuesta}"
else:
try:
raise Exception("Error")
except Exception as ex:
tipoRespuesta = 3
mensajeRespuesta = ex.args[0]
oCronometro.Stop()
if tipoRespuesta > 1:
MessageBox.Show(mensajeRespuesta, "Consulta multiple de RUCs mediante número
random", MessageBoxButtons.OK,
MessageBoxIcon.Warning if tipoRespuesta == 2 else MessageBoxIcon.Error)
lblMensaje.Text = f"Procesado en {oCronometro.Elapsed.TotalSeconds} seg."
btnConsultarRUC.Enabled = True
txtNombreCompletoArchivo.Focus()
txtNombreCompletoArchivo.SelectAll()