Apache Spark en Amazon S3 - Gotchas en best practices

S3 is een objectarchief en geen bestandssysteem, vandaar dat de problemen die voortvloeien uit eventuele consistentie, niet-atomaire hernamen moeten worden behandeld in de toepassingscode. De directoryserver in een bestandssysteem is vervangen door een hash-algoritme van de bestandsnaam. Dit is slecht voor het weergeven van zaken, directory-bewerkingen, verwijderen en hernoemen (kopiëren en verwijderen omdat er technisch gezien geen hernoemen is in objectwinkels)

Begin met het gebruik van S3A (URI-schema: s3a: //) - Hadoop 2.7+. S3a is de aanbevolen S3-client voor Hadoop 2.7 en hoger. S3a is beter en ondersteunt grotere bestanden (tot 5 TB) en biedt ondersteuning voor multipart-upload. Alle objecten die toegankelijk zijn via s3n: // URL's moeten ook toegankelijk zijn via s3a door het URL-schema te vervangen. De meeste bugrapporten tegen S3N worden gesloten als WONTFIX

Spark 2.0.1 laten werken met S3a Gebruik voor Spark 2.0.1 hadoop-aws-2.7.3.jar, aws-java-sdk-1.7.4.jar, joda-time-2.9.3.jar in je classpath; vergeet niet om spark-default.conf bij te werken met de AWS-toetsen en de S3A FileSystemClass

Spark.hadoop.fs.s3a.access.key XXXXXXX
spark.hadoop.fs.s3a.secret.key XXXXXXX
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem

Gebruik absoluut Dataframes omdat het opnieuw rangschikken van query's en predicaat push-down direct beschikbaar is en dus minder gegevens worden opgehaald, waardoor uw zoekopdrachten sneller worden uitgevoerd

Als u dezelfde gegevens meerdere keren leest, probeer dan .cache of s3distcp om de bestanden over te dragen naar uw lokale EMR-cluster om te profiteren van de betere leesprestaties van bestanden van een echt bestandssysteem. De optie groupBy van s3distcp is een geweldige optie om het probleem met kleine bestanden op te lossen door een groot aantal kleine bestanden samen te voegen.

Dat brengt me bij het lezen van een groot aantal kleine bestanden. Als het samenvoegen van de bestanden met behulp van een tool geen optie is, probeer dan de volgende code die effectief werkt rond het trage S3 directory-lijstknelpunt

import com.amazonaws.services.s3._, model._
    import com.amazonaws.auth.BasicAWSCreferenties

    val request = new ListObjectsRequest ()
    request.setBucketName (bak)
    request.setPrefix (prefix)
    request.setMaxKeys (pageLength)
    def s3 = nieuwe AmazonS3Client (nieuwe BasicAWSCredentials (sleutel, geheim))

    val objs = s3.listObjects (request) // Merk op dat deze methode afgekorte gegevens retourneert als deze langer zijn dan de "pageLength" hierboven. Misschien moet u daarmee omgaan.
    sc.parallelize (objs.getObjectSummaries.map (_. getKey) .ToList)
        .flatMap {key => Source.fromInputStream (s3.getObject (bucket, key) .getObjectContent: InputStream) .getLines}

Zorg ervoor dat de optie spark.sql.parquet.filterPushdown waar is en spark.sql.parquet.mergeSchema onwaar is (om schemasamenvoegingen tijdens het schrijven te voorkomen die het schrijfstadium echt vertragen). Gelukkig heeft Spark 2.0 de juiste standaard

Heb je je afgevraagd waarom juist op het moment dat een taak wordt voltooid, er niets naar de logboeken wordt geschreven en alle vonkenbewerkingen lijken te zijn gestopt, maar de resultaten staan ​​nog niet in de uitvoermap van S3 ... wat is er aan de hand? Welnu, elke keer dat de executeurs het resultaat van de taak schrijven, schrijven ze naar een tijdelijke directory buiten de hoofddirectory waar de bestanden moesten worden geschreven en zodra alle executors zijn gedaan, wordt de naam gewijzigd om atomaire exclusiviteit te krijgen. Dit is allemaal prima in een standaard bestandssysteem zoals hdf's waar de hernoeming onmiddellijk is, maar in een object store zoals S3 is dit niet bevorderlijk omdat de hernoeming op S3 wordt gedaan op 6MB / s.

Schrijf indien mogelijk de uitvoer van de opdrachten naar hdr's van de EMR (om gebruik te maken van de bijna onmiddellijke hernoemen en beter bestand IO van lokale hdf's) en voeg een dstcp-stap toe om de bestanden naar S3 te verplaatsen, om uzelf alle problemen te besparen bij het omgaan met de ingewanden van een object store die probeert een bestandssysteem te zijn. Door ook naar lokale hdf's te schrijven, kunt u speculatie inschakelen om weggelopen taken te besturen zonder in de impasse te vallen die aan DirectOutputCommiter is gekoppeld.

Als u S3 als uitvoermap moet gebruiken, zorg er dan voor dat de volgende Spark-configuraties zijn ingesteld

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.speculation false

Opmerking: DirectParquetOutputCommitter wordt verwijderd uit Spark 2.0 vanwege de kans op gegevensverlies. Helaas moeten we, totdat we de consistentie van S3a hebben verbeterd, met de tijdelijke oplossingen werken. Het gaat beter met Hadoop 2.8

Vermijd sleutelnamen in lexicografische volgorde. Je kunt hashing / willekeurige voorvoegsels gebruiken of de datum-tijd omdraaien. De truc is om je sleutels hiërarchisch te benoemen en de meest voorkomende dingen waar je op filtert aan de linkerkant van je sleutel te plaatsen. En heb nooit onderstrepingstekens in bucketnamen vanwege DNS-problemen.

Het inschakelen van fs.s3a.fast.upload uploadt delen van een enkel bestand parallel naar Amazon S3

Nou, dat was de hersenkraker van problemen in de productie die ik onlangs heb opgelost om Spark met S3 te laten werken. Houd dit in de gaten voor meer informatie in het volgende bericht ...