package shaded.hologres.com.aliyun.datahub.example;

import java.util.Iterator;
import java.util.List;
import shaded.hologres.com.aliyun.datahub.DatahubClient;
import shaded.hologres.com.aliyun.datahub.DatahubConfiguration;
import shaded.hologres.com.aliyun.datahub.common.data.RecordSchema;
import shaded.hologres.com.aliyun.datahub.exception.OffsetResetedException;
import shaded.hologres.com.aliyun.datahub.exception.OffsetSessionChangedException;
import shaded.hologres.com.aliyun.datahub.exception.SubscriptionOfflineException;
import shaded.hologres.com.aliyun.datahub.model.GetCursorRequest;
import shaded.hologres.com.aliyun.datahub.model.GetRecordsResult;
import shaded.hologres.com.aliyun.datahub.model.OffsetContext;
import shaded.hologres.com.aliyun.datahub.model.RecordEntry;

/* compiled from: SubscriptionExample.java */
/* loaded from: input_file:shaded/hologres/com/aliyun/datahub/example/Consumer.class */
class Consumer extends Thread {
    private String projectName;
    private String topicName;
    private String subId;
    private String shardId;
    private RecordSchema schema;
    private DatahubClient client;

    public Consumer(String str, String str2, String str3, String str4, RecordSchema recordSchema, DatahubConfiguration datahubConfiguration) {
        this.projectName = null;
        this.topicName = null;
        this.subId = null;
        this.shardId = null;
        this.schema = null;
        this.client = null;
        this.projectName = str;
        this.topicName = str2;
        this.subId = str3;
        this.shardId = str4;
        this.schema = recordSchema;
        this.client = new DatahubClient(datahubConfiguration);
    }

    private void commit(OffsetContext offsetContext) {
        this.client.commitOffset(offsetContext);
        System.out.println("commit offset suc! offset context: " + offsetContext.toObjectNode().toString());
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            boolean z = false;
            OffsetContext initOffsetContext = this.client.initOffsetContext(this.projectName, this.topicName, this.subId, this.shardId);
            String cursor = !initOffsetContext.hasOffset() ? this.client.getCursor(this.projectName, this.topicName, this.shardId, GetCursorRequest.CursorType.OLDEST).getCursor() : this.client.getCursor(this.projectName, this.topicName, this.shardId, GetCursorRequest.CursorType.SEQUENCE, initOffsetContext.getOffset().getSequence() + 1).getCursor();
            System.out.println("Start consume shard:" + this.shardId + ", start offset:" + initOffsetContext.toObjectNode().toString() + ", cursor:" + cursor);
            long j = 0;
            while (!z) {
                try {
                    try {
                        GetRecordsResult records = this.client.getRecords(this.projectName, this.topicName, this.shardId, cursor, 10, this.schema);
                        List<RecordEntry> records2 = records.getRecords();
                        if (records2.size() == 0) {
                            commit(initOffsetContext);
                            Thread.sleep(1000L);
                            System.out.println("sleep 1s and continue consume records! shard id:" + this.shardId);
                        } else {
                            Iterator<RecordEntry> it = records2.iterator();
                            while (it.hasNext()) {
                                initOffsetContext.setOffset(it.next().getOffset());
                                j++;
                                if (j % 100 == 0) {
                                    commit(initOffsetContext);
                                }
                            }
                            cursor = records.getNextCursor();
                        }
                    } catch (OffsetResetedException e) {
                        this.client.updateOffsetContext(initOffsetContext);
                        cursor = this.client.getCursor(this.projectName, this.topicName, this.shardId, GetCursorRequest.CursorType.SEQUENCE, initOffsetContext.getOffset().getSequence() + 1).getCursor();
                        System.out.println("Restart consume shard:" + this.shardId + ", reset offset:" + initOffsetContext.toObjectNode().toString() + ", cursor:" + cursor);
                    } catch (SubscriptionOfflineException e2) {
                        z = true;
                        e2.printStackTrace();
                    }
                } catch (OffsetSessionChangedException e3) {
                    z = true;
                    e3.printStackTrace();
                } catch (Exception e4) {
                    z = true;
                    e4.printStackTrace();
                }
            }
        } catch (Exception e5) {
            e5.printStackTrace();
        }
    }
}
