Archiv der Kategorie: Azure Data Factory

Zugriff auf CDS von Azure Data Factory: Umgang mit Spalten mit vielen NULL-Werten

Fehlersituation

In meinem aktuellen Projekt lesen wir aus Dynamics CRM 365 Daten in unser Data Warehouse, das wir unter Azure gebaut haben. Wir hatten dabei ein Problem: In einigen Tabellen gibt es Spalten, die in vielen Zeilen nicht gefüllt sind. Wir hatten identische Testdatensätze, die an zwei unterschiedlichen Tagen nach CRM importiert wurden (durch ein anderes Team). Beim Laden der Daten fiel uns auf, dass an einem Tag eine Spalte leer blieb, obwohl sie am Abzug am Tag zuvor manchmal Werte enthalten hatte – und sich in der Quelle nichts geändert hatte.

Das Ergebnis unserer Suche und die Fehlerbehebung möchte ich hier kurz beschreiben.

Einstieg

Als Einstieg empfehlen sich diese beiden Seiten: Microsoft-Dokumentation und dieser SQL Server Central-Artikel. Aber es gibt natürlich noch etliche Artikel im Netz.

Interessanter Weise enthält der Microsoft-Artikel folgende Stelle – markiert als wichtig (ca. in der Mitte des Artikels):

  • Wenn Sie Daten aus Dynamics kopieren, ist die explizite Spaltenzuordnung aus Dynamics zur Senke optional. Es wird jedoch dringend zu der Zuordnung geraten, um ein deterministisches Kopierergebnis sicherzustellen.
  • Wenn ein Schema in Data Factory in der Erstellungsbenutzeroberfläche importiert wird, wird auf das Schema rückgeschlossen. Dies erfolgt, in dem die obersten Zeilen aus dem Ergebnis der Dynamics-Abfrage gesamplet werden, um die Liste der Quellspalten zu initialisieren. In diesem Fall werden Spalten ohne Werte in den obersten Reihen ausgelassen. Dasselbe Verhalten gilt für Kopiervorgänge, wenn keine explizite Zuordnung vorliegt. Sie können Spalten überprüfen und weitere zur Zuordnung hinzufügen. Diese Spalten werden auch während der Laufzeit des Kopiervorgangs berücksichtigt.

Hier ist also genau unser Problem beschrieben: Wenn eine Zeile viele NULL-Werte enthält, kann es (nicht deterministisch) passieren, dass die Inhalte der Spalte nicht übertragen wird, wenn kein Schema definiert ist.

Unser ursrünglicher Code

Unsere Einstellungen der Copy-Aktivität sahen so aus:

Zunächste die Quelle:

Einstellungen der Quelle
Quelle der Copy-Aktivität

Wie man sieht, verwenden wir ein Fetch-XML als Abfrage mit

  • einem Filter-Kriterium, da wir nur bestimmte Datensätze ermitteln wollen. (Deswegen ist die Abfrage auch dynamisch)
  • genannten Attributnamen, da wir nicht alle Spalten übertragen wollen
  • einem Join (link-entity) auf eine andere Entität, aus der wir auch einige Attribute entnehmen.

Die Senke sieht so aus:

Senke
Senke der Copy-Aktivität

Hier ist eigentlich nur zu beachten, dass wir keine Tabellen-Option gewählt haben, also die Tabelle nicht automatisch gemäß des Schemas aus der Quelle erstellen lassen.

Der Zuordnungs-Reiter sieht wie folgt aus:

Zuordnung der Copy-Aktivität

Also verwenden wir ein dynamisches Mapping.

Lösung

Die Lösung ist damit auch klar: Wir brauchen ein Schema in der Zuordnung. Es reicht aber nicht aus, das Schema zu importieren, weil beim Importieren des Schemas das gleiche passieren kann: Wenn eine Spalte NULL-Werte hat (im Sample) wird sie nicht als Quell-Spalte gesehen, z.B. hier (man sieht nur einen Ausschnitt):

Zuordnung mit importierten Schemas

Wir müssen also ein dynamisches Schema in der Zuordnung angeben:

dynamische Zuordnung

Wenn wir diese dynamische Zuordnung definiert haben, wobei wir für jede Spalte eine Definition hinterlegt haben, funktioniert es 🙂

Wie kommen wir nun zu der Lösung?

Vorgehen

Wir müssen also ein Mapping definieren, das so aussieht:

{
  "type": "TabularTranslator",
  "mappings": [
    {
      "source": {
        "name": "statecode",
        "type": "Int32"
      },
      "sink": {
        "name": "statecode",
        "type": "Int32"
      }
    },
    {
      "source": {
        "name": "statuscode",
        "type": "Int32"
      },
      "sink": {
        "name": "statuscode",
        "type": "Int32"
      }
    }
  ]
}

In diesem Beispiel werden zwei Spalten gemappt (statecode und statuscode) – und die Spalten heißen jeweils in der Quelle und im Ziel gleich.

Wie können wir nun ein solches JSON einfach erzeugen?

In unserem Falll war es so, dass wir alle Zieltabellen bereits angelegt hatten. Das kann man zum Beispiel dadurch erreichen, dass man als erstes die Tabelle als ganzes (also nicht mit Fetch-XML) kopiert – und in der Senke „Tabelle automatisch erstellen“ anwählt.

Dann sind wir so vorgegangen (was ich im Nachfolgenden genauer beschreiben werde):

  1. In der Zuordnung Schema importieren
  2. Im Code der Pipeline den entsprechenden Block (translator) kopieren
  3. In einem JSON-Editor fehlende Mappings (es steht nur eine Sink aber keine Source drin) korrigieren
  4. Zuordnung wieder löschen
  5. dynamischen Inhalt hinzufügen als @json(<generiertes JSON>)

Nun die Punkte im Detail:

1. In der Zuordnung Schema importieren

Das ist klar.

2. Im Code der Pipeline den entsprechenden Block (translator) kopieren

Über das Symbol {} gelangt man zur Codeansicht der Pipeline:

Codeansicht via {}

Diesen Code kopiert man in einen JSON-Editor (z.B. Visual Studio). Dort sucht man nach „TabularTranslator“. Man findet dann eine Stelle, die ungefähr so aussieht:

...
"translator": {
                  "type": "TabularTranslator",
                  "mappings": [
                    {
                      "source": {
                        "name": "statecode",
                        "type": "Int32"
                      },
                      "sink": {
                        "name": "statecode",
                        "type": "Int32"
                      }
                    },
                    {
                      "source": {...

(Falls man in seinem Code mehrere Copy-Tasks hat, muss man natürlich aufpassen. Allerdings ist vor dem translator-Knoten der sink-Knoten und danach der inputs- und outputs-Knoten, so dass man genau sieht, an welcher Stelle man gerade ist)

Wir entnehmen nun diesen Code – die geschweifte Klammer nach >>“translator“ : <<

3. In einem JSON-Editor fehlende Mappings (es steht nur eine Sink aber keine Source drin) korrigieren

Wenn wir diesen Code genauer anschauen, sehen wir die source-sink-Paare. Allerdings sind dort, wo der automatische Schema-Import keine Treffer finden konnte (weil die Spalten in der Quelle wegen der NULL-Werte nicht drin waren), keine source-Knoten vorhanden.

Aus

…
{
"sink": {
"name": "importsequencenumber",
"type": "Int32"
}
},
…


machen wir also folgendes:

...
    {
      "source": {
        "name": "importsequencenumber",
        "type": "Int32"
      },
      "sink": {
        "name": "importsequencenumber",
        "type": "Int32"
      }
    },
...

Wir verwenden also den gleichen Namen und den gleichen Datentyp.

4. Zuordnung wieder löschen

Dieser Schritt ist wieder einfach. Man muss auf den Papierkorb klicken.

5. dynamischen Inhalt hinzufügen als @json()

dynamischen Inhalt hinzufügen

Der Link „Dynamischen Inhalt hinzufügen“ erscheint, wenn man mit der Maus in die Nähe kommt. Dort klickt man drauf und kann im Editor Code eingeben.

Der Code sieht dann z.B. so aus:

@json('{
    "type": "TabularTranslator",
    "mappings": [
        {
            "source": {
                "name": "statecode",
                "type": "Int32"
            },
            "sink": {
                "name": "statecode",
                "type": "Int32"
            }
        },
        { ...

Man beachte, dass ich nach @json( ein einfaches Anführungszeichen und innerhalb des JSON doppelte Anführungszeichen verwendate habe.

Eine Sache muss man hier noch beachten – es kann sein, dass der Code zu lang wird, so dass Azure Data Factory einen Fehler anzeigt. Wie man den behebt, beschreibe ich im nächsten Blogeintrag.

Azure Analysis Services von Azure Data Factory aus verarbeiten

Vor kurzem habe ich ja beschrieben, wie man Azure Analysis Services-Cubes von onprem aus verarbeiten kann (s. Beitrag)

Von Azure aus kann man die Verarbeitung sehr schön aus Azure Data Factory v2 starten. Dies ist hier ausführlich beschrieben, auch wenn dieser andere Artikel behauptet, man bräuchte eine Logic App – was aber eben nicht der Fall ist.

Allerdings ist die Beschreibung in dem Artikel nicht mehr ganz aktuell. Der wesentliche Punkt ist, dass man das „ADF service principal“ (in der Form app:applicationid@tenant) als Administrator des Azure Analysis Services-Servers eintragen muss.

Wo findet man die applicationId?

Man muss im Azure Portal unter Enterprise Applications als Filter „Managed Identities“ einstellen:

Managed Identities in Enterprise Applications
Application ID unter Enterprise applications

In der Zeile mit dem Namen der Data Factory findet man rechts die Application ID.

Der Rest aus dem Artikel funktioniert weiter

Die tenant ID findet man unter der Azure Data Factory im Reiter Settings:

tenant ID unter den Eigenschaften der Azure Data Factory

Bewertung

Vorteil:

  • Wenn man die ETLs mit Azure Data factry baut, hat man eine schöne Möglichkeit, in der gleichen Umgebung zu bleiben, wenn man die Cubes verarbeiten muss.
  • Es ist nett, die Managed Identity zu verwenden. Dann muss man nicht einen speziellen User erstellen, dessen Passwort man dann verwalten müsste.

Nachteil:

  • Die Berechtigung als Server-Administrator erscheint mir als unschön, da eigentlich die Berechtigung zur Verarbeitung des entsprechenden Cubes ausreichen würde. Leider reicht das hier aber nicht.
  • Unglücklich finde ich auch bei dem Web-Call, dass das JSON zum Verarbeiten nur ähnlich aber nicht gleich zu dem JSON ist, das zum Beispiel im Management Studio verwendet wird.

Azure Analysis Services automatisch Pausieren

Bei einem meiner Kunden hatten wir mehrere Azure Analysis Services im Einsatz. Wenn die kleinsten Tarife nicht mehr ausreichen, kann das mit der Zeit eine teure Angelegenheit werden.

Deshalb haben wir folgende Struktur in den Analysis Services, die in Azure gehostet werden, aufgebaut:

  • Jedes Projekt hat seinen eigenen Entwicklungsserver
  • Jedes Projekt hat seinen eigenen Testserver
  • Es gibt für die Projekte gemeinsame genutzte Produktivserver

Dabei verwenden wir bei den Entwicklungs- und Testservern den niedrigst möglichen Tarif (in der Regel D1, B1).

Darüber sollen die Entwicklungs- und Testserver nur dann laufen und somit Geld kosten, wenn sie benötigt werden. Das heißt, wenn an einem Projekt gerade weder entwickelt noch getestet wird, sind die beiden betreffenden Azure Analysis Services (AAS) pausiert.

Um dies nicht nur manuell durch den Entwickler umzusetzen, haben wir einen Automatismus implementiert:

  • Zunächst gibt es eine relationale Tabelle, in der alle AAS aufgeführt sind
  • Für jeden AAS kann man definieren, bis wann er laufen soll
    (Wenn man z.B. im Test ist, möchte man nicht, dass der Server nachts automatisiert pausiert wird)
  • Abends läuft ein Job (eine Data Factory Pipeline), die alle auszuschaltenden AAS pausiert, falls sie noch laufen.

Bei der Umsetzung habe ich mich vom hier zu findenden guten Artikel inspierieren lassen.

Erste Pipeline „SuspendOrResumeAAS“

Zunächst erstelle ich eine Pipeline „SuspendOrResumeAAS“:

Pipeline SuspendOrResumeAAS

Man sieht hier schon gut, woraus diese Pipeline besteht:

  • Es gibt 3 Parameter:
    • action: Soll die AAS-Instanz pausiert (supend) oder gestartet (resume) werden?
    • aasName: Name der AAS
    • ressourceGroupName: Name der Ressourcengruppe
  • Es gibt eine Variable (was man im Screen Shot nicht sieht):
    • subscriptionId (Wir werden das später für den API-Aufruf benötigen)

Was macht die Pipeline?

  • Sie ermittelt zuerst den Status des AAS
  • Dann wird überprüft, ob der Status zu der Aktion passt (Man kann eine pausierte AAS nicht pausieren 🙂 )
  • Wenn der Status OK ist, wird der API-Aufruf mit der gewünschten Aktion durchgeführt.

Die Schritte beleuchten wir jetzt näher:

Für die Ermittlung des Status (infoZuAAS) verwenden wir eine Web-Aktivität:

In den Einstellungen bauen wir die URL via dynamischen Inhalt zusammen:

@concat('https://management.azure.com/subscriptions/', variables('subscriptionId') , '/resourceGroups/', pipeline().parameters.ressourceGroupName, '/providers/Microsoft.AnalysisServices/servers/', pipeline().parameters.aasName, '?api-version=2017-08-01')

Als Methode wird „GET“ eingestellt.

Wie auch im zitierten Artikel vorgeschlagen, verweden wir MSI als Authentifizierung (als Ressource „https://management.azure.com/“ eintragen).

Dazu müssen wir für jede betreffende AAS unter „Access Control (IAM)“ den Punkt „Add role assignments“ auswählen. Dort definieren wir als Contributor die Data Factory, in der wir unsere Pipeline erstellen:

Add role assignment: DF als Contributor des AAS

Wenn wir diese Aktivität im Debug-Modus starten, sehen wir

Debuginformation zu infoZuAAS

Über die Pfeile (links: Input, rechts: Output) können wir die aufrufende URL kontrollieren:

Input

Unter Output sieht man:

Output

Hier sehen wir unter properties > state „Paused“, was heißt, dass die AAS pausiert ist. Für eine laufende AAS ist der state „Succeeded“.

Dies nutzen wir dann gleich in der If-Abfrage (isInStateForAction): Um den Status in der Bedingung abzufragen, kann man via Code darauf zugreifen. Mit activity().output erhält man den gesamten Output und kann dann im JSON über . auf die einzelnen Attribute in der Hierarchie zugreifen:

activity('infoZuAAS').output.properties.state

Der gesamte Code für die „Expression“ in der „If Condition“ sieht so aus:

@or(
and(equals(activity('infoZuAAS').output.properties.state, 'Paused'),equals(toLower(pipeline().parameters.action), 'resume'))
,
and(equals(activity('infoZuAAS').output.properties.state, 'Succeeded'),equals(toLower(pipeline().parameters.action), 'suspend'))
)

Die innere Aktivität ist wieder eine Web-Aktivität (pause or resume AAS):

Diese Web-Aktivität unterscheidet sich von der vorhergehenden in der URL, in der nun die Aktion (suspend oder resume) mit angegeben wird. Außerdem ist die Methode POST. Deshalb muss der Text mit angegeben werden – auch wenn die AAS-API das eigentlich nicht benötigt, weswegen wir hier {"Dummy":"Dummy"} eintragen.

Die URL wird mit dieser Formel definiert:

@concat('https://management.azure.com/subscriptions/', variables('subscriptionId'), '/resourceGroups/', pipeline().parameters.ressourceGroupName, '/providers/Microsoft.AnalysisServices/servers/', pipeline().parameters.aasName, '/' , pipeline().parameters.action, '?api-version=2017-08-01')

Authentifizierung und Ressource wird wie oben gesetzt.

Zweite Pipeline: alleAASausschalten

Pipeline alleAASausschalten

Die Suche/Lookup-Aktivität „lies Config Tabelle“ liest per Query aus einer SQL-Server-Tabelle, welche AAS auszuschalten sind:

SELECT AAS_Name, RessourceGroupName FROM Management.[Config_AAS_Shutdown]
WHERE getdate() > keep_Online_Until

In der For-Each-Schleife „jedenAASausschalten“ ist folgende Einstellung zu machen:

@activity('lies Config Tabelle').output.value

Und innerhalb der Schleife wird eine Aktivität ausgeführt:

die unter (1) erstellte Pipeline aufrufen

Als Parameter übergeben wir an diese Pipeline:

  • action: suspend (fest definiert)
  • aasName: Formel @item().AAS_Name
  • ressourceGroupName @item().RessourceGroupName

Somit müssen wir nur noch einen Trigger täglich um 19 Uhr definieren.

Weitere Möglichkeiten

Wir haben darauf verzichtet, einen Automatismus zu erstellen, der am Vormittag die Entwickler-Server wieder hochfährt.

Hintergrund war, dass wir die Entwicklungs- und Testserver nicht jeden Tag brauchen. Sie schnell manuell zu starten, ist kein großer Aufwand – deswegen lassen wir das so.

Aber natürlich wäre es einfach, ganz analog eine Pipeline zu erstellen.

Uns war aber wichtig, den hier vorgestellten Weg zu implementieren, da man als Entwickler schnell mal vergisst, einen AAS auszuschalten – und mit diesem Automatismus ist das kein Problem, da jede Nacht überprüft wird, ob die AAS aus sind – und, wenn nicht, wieder ausgeschaltet werden.